9#include "XrdVersion.hh"
38uint64_t TPCHandler::m_monid{0};
39int TPCHandler::m_marker_period = 5;
40size_t TPCHandler::m_block_size = 16*1024*1024;
41size_t TPCHandler::m_small_block_size = 1*1024*1024;
43bool TPCHandler::allowMissingCRL =
false;
51TPCHandler::TPCLogRecord::~TPCLogRecord()
58 monInfo.
clID = clID.c_str();
60 gettimeofday(&monInfo.
endT, 0);
63 {monInfo.
dstURL = local.c_str();
64 monInfo.
srcURL = remote.c_str();
66 monInfo.
dstURL = remote.c_str();
67 monInfo.
srcURL = local.c_str();
71 if (!status) monInfo.
endRC = 0;
72 else if (tpc_status > 0) monInfo.
endRC = tpc_status;
73 else monInfo.
endRC = 1;
74 monInfo.
strm =
static_cast<unsigned char>(streams);
75 monInfo.
fSize = (bytes_transferred < 0 ? 0 : bytes_transferred);
78 tpcMonitor->Report(monInfo);
88 if (curl) curl_easy_cleanup(curl);
103int TPCHandler::sockopt_callback(
void *clientp, curl_socket_t curlfd, curlsocktype purpose) {
104 TPCLogRecord * rec = (TPCLogRecord *)clientp;
105 if (purpose == CURLSOCKTYPE_IPCXN && rec && rec->pmarkManager.isEnabled()) {
108 return CURL_SOCKOPT_ALREADY_CONNECTED;
110 return CURL_SOCKOPT_OK;
122int TPCHandler::opensocket_callback(
void *clientp,
123 curlsocktype purpose,
124 struct curl_sockaddr *aInfo)
128 if (purpose != CURLSOCKTYPE_IPCXN)
129 return CURL_SOCKET_BAD;
132 return CURL_SOCKET_BAD;
135 int fd = XrdSysFD_Socket(aInfo->family, aInfo->socktype, aInfo->protocol);
138 return CURL_SOCKET_BAD;
144 XrdNetAddr thePeer(&(aInfo->addr));
145 TPCLogRecord *rec =
static_cast<TPCLogRecord*
>(clientp);
148 if ((!rec->allow_private && thePeer.isPrivate()) || (!rec->allow_local && thePeer.isLocal())) {
149 rec->tpc_status = 403;
150 rec->m_log->Emsg(rec->log_prefix.c_str(),
151 "Connection to local/private address is forbidden");
153 return CURL_SOCKET_BAD;
158 std::stringstream connectErrMsg;
159 if(!rec->pmarkManager.connect(fd, &(aInfo->addr), aInfo->addrlen, CONNECT_TIMEOUT, connectErrMsg)) {
160 rec->m_log->Emsg(rec->log_prefix.c_str(),
"Unable to connect socket: ", connectErrMsg.str().c_str());
162 return CURL_SOCKET_BAD;
168int TPCHandler::closesocket_callback(
void *clientp, curl_socket_t fd) {
169 TPCLogRecord * rec = (TPCLogRecord *)clientp;
174 rec->pmarkManager.endPmark(fd);
189int TPCHandler::ssl_ctx_callback(
CURL *curl,
void *ssl_ctx,
void *clientp) {
191 SSL_CTX* ctx =
static_cast<SSL_CTX*
>(ssl_ctx);
192 SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, verify_callback);
193 return CURL_SOCKOPT_OK;
196int TPCHandler::verify_callback(
int preverify_ok, X509_STORE_CTX* ctx) {
197 if (preverify_ok == 1)
return 1;
199 int err = X509_STORE_CTX_get_error(ctx);
201 if (err == X509_V_ERR_UNABLE_TO_GET_CRL) {
202 X509_STORE_CTX_set_error(ctx, X509_V_OK);
214std::string TPCHandler::prepareURL(XrdHttpExtReq &req) {
219bool TPCHandler::mismatchReprDigest(
const std::map<std::string, std::string> & passiveSrvReprDigest, XrdHttpExtReq &req,
221 if(passiveSrvReprDigest.size()) {
222 for (
const auto & [digestName, digestValue]: passiveSrvReprDigest) {
223 auto clientDigestMatch = req.
mReprDigest.find(digestName);
226 if (clientDigestMatch->second != digestValue) {
228 std::stringstream errMsg;
229 errMsg <<
"Mismatch between client-provided and remote server checksums:"
230 <<
" client = (" << clientDigestMatch->first <<
"=" << clientDigestMatch->second <<
")"
231 <<
" server = (" << digestName <<
"=" << digestValue <<
")";
232 logTransferEvent(
LogMask::Error, rec,
"REPRDIGEST_VERIFY_FAIL", errMsg.str());
234 req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(errMsg, rec, CURLcode::CURLE_OK).c_str(), 0);
254 std::stringstream parser(opaque);
255 std::string sequence;
256 std::stringstream output;
258 while (
getline(parser, sequence,
'&')) {
259 if (sequence.empty()) {
continue;}
260 size_t equal_pos = sequence.find(
'=');
262 if (equal_pos != std::string::npos)
263 val = curl_easy_escape(curl, sequence.c_str() + equal_pos + 1, sequence.size() - equal_pos - 1);
265 if (!val && equal_pos != std::string::npos) {
continue;}
267 if (!first) output <<
"&";
269 output << sequence.substr(0, equal_pos);
271 output <<
"=" << val;
283TPCHandler::ConfigureCurlCA(
CURL *curl)
285 auto ca_filename = m_ca_file ? m_ca_file->CAFilename() :
"";
286 auto crl_filename = m_ca_file ? m_ca_file->CRLFilename() :
"";
287 if (!ca_filename.empty() && !crl_filename.empty()) {
288 curl_easy_setopt(curl, CURLOPT_CAINFO, ca_filename.c_str());
292 std::ifstream in(crl_filename, std::ifstream::ate | std::ifstream::binary);
293 if(in.tellg() > 0 && m_ca_file->atLeastOneValidCRLFound()){
294 curl_easy_setopt(curl, CURLOPT_CRLFILE, crl_filename.c_str());
295 if (allowMissingCRL) {
297 curl_easy_setopt(curl, CURLOPT_SSL_CTX_FUNCTION, ssl_ctx_callback);
300 std::ostringstream oss;
301 oss <<
"No valid CRL file has been found in the file " << crl_filename <<
". Disabling CRL checking.";
302 m_log.Log(
Warning,
"TpcHandler",oss.str().c_str());
305 else if (!m_cadir.empty()) {
306 curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str());
308 if (!m_cafile.empty()) {
309 curl_easy_setopt(curl, CURLOPT_CAINFO, m_cafile.c_str());
315 return !strcmp(verb,
"COPY") || !strcmp(verb,
"OPTIONS");
324 const std::string replace_schemes[] = {
"davs://",
"s3://",
"s3s://" };
326 for (
const auto& s : replace_schemes)
327 if (url.compare(0, s.size(), s) == 0)
328 return "https://" + url.substr(s.size());
335 const std::string allowed_schemes[] = {
"https://",
"http://" };
337 for (
const auto& s : allowed_schemes)
338 if (url.compare(0, s.size(), s) == 0)
349 if (req.
verb ==
"OPTIONS") {
350 return ProcessOptionsReq(req);
353 if (header != req.
headers.end()) {
354 if (header->second !=
"none") {
355 m_log.Emsg(
"ProcessReq",
"COPY requested an unsupported credential type: ", header->second.c_str());
356 return req.
SendSimpleResp(400, NULL, NULL,
"COPY requestd an unsupported Credential type", 0);
360 if (header != req.
headers.end()) {
363 const char *error_src =
"COPY rejected: disallowed scheme in source URL";
364 m_log.Emsg(
"ProcessReq", error_src, src.c_str());
367 return ProcessPullReq(src, req);
370 if (header != req.
headers.end()) {
371 const std::string& dst = header->second;
373 const char *error_dst =
"COPY rejected: disallowed scheme in destination URL";
374 m_log.Emsg(
"ProcessReq", error_dst, dst.c_str());
377 return ProcessPushReq(header->second, req);
379 m_log.Emsg(
"ProcessReq",
"COPY verb requested but no source or destination specified.");
380 return req.
SendSimpleResp(400, NULL, NULL,
"No Source or Destination specified", 0);
396 m_allow_local(false),
397 m_allow_private(true),
399 m_fixed_route(false),
401 m_first_timeout(120),
402 m_log(log->logger(),
"TPC_"),
405 if (!Configure(config, myEnv)) {
406 throw std::runtime_error(
"Failed to configure the HTTP third-party-copy handler.");
424 return req.
SendSimpleResp(200, NULL, (
char *)
"DAV: 1\r\nDAV: <http://apache.org/dav/propset/fs/1>\r\nAllow: HEAD,GET,PUT,PROPFIND,DELETE,OPTIONS,COPY", NULL, 0);
434 if (authz_header != req.
headers.end()) {
435 std::stringstream ss;
436 ss <<
"authz=" <<
encode_str(authz_header->second);
446int TPCHandler::RedirectTransfer(
CURL *curl,
const std::string &redirect_resource,
447 XrdHttpExtReq &req, XrdOucErrInfo &error, TPCLogRecord &rec)
451 if ((ptr == NULL) || (*ptr ==
'\0') || (port == 0)) {
453 std::stringstream ss;
454 ss <<
"Internal error: redirect without hostname";
455 logTransferEvent(
LogMask::Error, rec,
"REDIRECT_INTERNAL_ERROR", ss.str());
456 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
462 std::string finalTarget = ptr;
474 std::string newTarget;
479 finalTarget = std::move(newTarget);
481 logTransferEvent(
LogMask::Info, rec,
"REDIRECT_PLUGIN_REWRITE",
485 std::stringstream ess;
486 ess <<
"Redirect plugin error: " << errMsg;
490 generateClientErr(ess, rec).c_str(), 0);
502 std::string opaque = cgi.empty() ? std::string() : cgi.substr(1);
504 std::stringstream ss;
505 ss <<
"Location: http" << (m_desthttps ?
"s" :
"") <<
"://" << host <<
":" << port <<
"/" << redirect_resource;
507 if (!opaque.empty()) {
511 char sep = (redirect_resource.find(
'?') == std::string::npos) ?
'?' :
'&';
512 ss << sep << encode_xrootd_opaque_to_uri(curl, opaque);
517 return req.
SendSimpleResp(rec.status, NULL,
const_cast<char *
>(ss.str().c_str()),
525int TPCHandler::OpenWaitStall(XrdSfsFile &fh,
const std::string &resource,
526 int mode,
int openMode,
const XrdSecEntity &sec,
527 const std::string &authz)
534 size_t pos = resource.find(
'?');
536 std::string path = resource.substr(0, pos);
538 if (pos != std::string::npos) {
539 opaque = resource.substr(pos + 1);
544 opaque += (opaque.empty() ?
"" :
"&");
547 open_result = fh.
open(path.c_str(), mode, openMode, &sec, opaque.c_str());
551 if (open_result ==
SFS_STARTED) {secs_to_stall = secs_to_stall/2 + 5;}
552 std::this_thread::sleep_for (std::chrono::seconds(secs_to_stall));
568int TPCHandler::PerformHEADRequest(
CURL *curl, XrdHttpExtReq &req,
State &state,
569 bool &success, TPCLogRecord &rec,
bool shouldReturnErrorToClient) {
571 curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
573 curl_easy_setopt(curl, CURLOPT_TIMEOUT, CONNECT_TIMEOUT);
575 res = curl_easy_perform(curl);
578 curl_easy_setopt(curl, CURLOPT_NOBODY, 0);
580 curl_easy_setopt(curl, CURLOPT_TIMEOUT, 0L);
581 curl_easy_setopt(curl, CURLOPT_FAILONERROR,
true);
583 std::stringstream ss;
586 res = CURLE_HTTP_RETURNED_ERROR;
588 if (res != CURLE_OK) {
589 ss << curl_easy_strerror(res);
591 case CURLE_HTTP_RETURNED_ERROR:
593 ss <<
": remote host returned '" << rec.tpc_status <<
" "
596 case CURLE_COULDNT_CONNECT:
597 switch (rec.tpc_status) {
599 ss <<
": connection to local/private addresses is forbidden";
602 ss <<
": internal server failure";
603 rec.tpc_status = 500;
607 rec.tpc_status = 500;
612 if (rec.tpc_status >= 400) {
614 return shouldReturnErrorToClient ? req.
SendSimpleResp(rec.tpc_status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
618 ss <<
"Successfully determined remote file information for pull request: "
621 unsigned int cksumIndex = 1;
622 for(
const auto & [cksumType,cksumValue]: state.
GetReprDigest()) {
623 ss <<
" chksum" << cksumIndex <<
"=(" << cksumType <<
"," << cksumValue <<
")";
631int TPCHandler::GetRemoteFileInfoTPCPull(
CURL *curl, XrdHttpExtReq &req, uint64_t &contentLength, std::map<std::string,std::string> & reprDigest,
bool & success, TPCLogRecord &rec) {
638 if ((result = PerformHEADRequest(curl, req, state, success, rec)) || !success) {
650int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state) {
651 std::stringstream ss;
652 const std::string crlf =
"\n";
653 ss <<
"Perf Marker" << crlf;
654 ss <<
"Timestamp: " << time(NULL) << crlf;
655 ss <<
"Stripe Index: 0" << crlf;
657 ss <<
"Total Stripe Count: 1" << crlf;
662 ss <<
"RemoteConnections: " << desc << crlf;
667 return req.
ChunkResp(ss.str().c_str(), 0);
674int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, std::vector<State*> &state,
675 off_t bytes_transferred)
689 std::stringstream ss;
690 const std::string crlf =
"\n";
691 ss <<
"Perf Marker" << crlf;
692 ss <<
"Timestamp: " << time(NULL) << crlf;
693 ss <<
"Stripe Index: 0" << crlf;
694 ss <<
"Stripe Bytes Transferred: " << bytes_transferred << crlf;
695 ss <<
"Total Stripe Count: 1" << crlf;
699 std::stringstream ss2;
700 for (std::vector<State*>::const_iterator iter = state.begin();
701 iter != state.end(); iter++)
703 std::string desc = (*iter)->GetConnectionDescription();
705 ss2 << (first ?
"" :
",") << desc;
710 ss <<
"RemoteConnections: " << ss2.str() << crlf;
712 rec.bytes_transferred = bytes_transferred;
715 return req.
ChunkResp(ss.str().c_str(), 0);
722int TPCHandler::RunCurlWithUpdates(
CURL *curl, XrdHttpExtReq &req,
State &state,
726 CURLM *multi_handle = curl_multi_init();
730 "Failed to initialize a libcurl multi-handle");
731 std::stringstream ss;
732 ss <<
"Failed to initialize internal server memory";
733 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
739 mres = curl_multi_add_handle(multi_handle, curl);
742 std::stringstream ss;
743 ss <<
"Failed to add transfer to libcurl multi-handle: HTTP library failure=" << curl_multi_strerror(mres);
744 logTransferEvent(
LogMask::Error, rec,
"CURL_INIT_FAIL", ss.str());
745 curl_multi_cleanup(multi_handle);
746 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
752 curl_multi_cleanup(multi_handle);
754 "Failed to send the initial response to the TPC client");
758 "Initial transfer response sent to the TPC client");
763 int running_handles = 1;
764 time_t last_marker = 0;
766 off_t last_advance_bytes = 0;
767 time_t last_advance_time = time(NULL);
768 time_t transfer_start = last_advance_time;
769 CURLcode res =
static_cast<CURLcode
>(-1);
771 time_t now = time(NULL);
772 time_t next_marker = last_marker + m_marker_period;
773 if (now >= next_marker) {
775 if (bytes_xfer > last_advance_bytes) {
776 last_advance_bytes = bytes_xfer;
777 last_advance_time = now;
779 if (SendPerfMarker(req, rec, state)) {
780 curl_multi_remove_handle(multi_handle, curl);
781 curl_multi_cleanup(multi_handle);
783 "Failed to send a perf marker to the TPC client");
786 int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
787 if (now > last_advance_time + timeout) {
788 const char *log_prefix = rec.log_prefix.c_str();
789 bool tpc_pull = strncmp(
"Pull", log_prefix, 4) == 0;
792 std::stringstream ss;
793 ss <<
"Transfer failed because no bytes have been "
794 << (tpc_pull ?
"received from the source (pull mode) in "
795 :
"transmitted to the destination (push mode) in ") << timeout <<
" seconds.";
797 curl_multi_remove_handle(multi_handle, curl);
798 curl_multi_cleanup(multi_handle);
804 rec.pmarkManager.startTransfer();
805 mres = curl_multi_perform(multi_handle, &running_handles);
806 if (mres == CURLM_CALL_MULTI_PERFORM) {
810 }
else if (mres != CURLM_OK) {
812 }
else if (running_handles == 0) {
816 rec.pmarkManager.beginPMarks();
823 msg = curl_multi_info_read(multi_handle, &msgq);
824 if (msg && (msg->msg == CURLMSG_DONE)) {
825 CURL *easy_handle = msg->easy_handle;
826 res = msg->data.result;
827 curl_multi_remove_handle(multi_handle, easy_handle);
831 int64_t max_sleep_time = next_marker - time(NULL);
832 if (max_sleep_time <= 0) {
836 mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count);
837 if (mres != CURLM_OK) {
840 }
while (running_handles);
842 if (mres != CURLM_OK) {
843 std::stringstream ss;
844 ss <<
"Internal libcurl multi-handle error: HTTP library failure=" << curl_multi_strerror(mres);
845 logTransferEvent(
LogMask::Error, rec,
"TRANSFER_CURL_ERROR", ss.str());
847 curl_multi_remove_handle(multi_handle, curl);
848 curl_multi_cleanup(multi_handle);
850 if ((retval = req.
ChunkResp(generateClientErr(ss, rec).c_str(), 0))) {
852 "Failed to send error message to the TPC client");
862 msg = curl_multi_info_read(multi_handle, &msgq);
863 if (msg && (msg->msg == CURLMSG_DONE)) {
864 CURL *easy_handle = msg->easy_handle;
865 res = msg->data.result;
866 curl_multi_remove_handle(multi_handle, easy_handle);
870 if (!state.
GetErrorCode() && res ==
static_cast<CURLcode
>(-1)) {
871 curl_multi_remove_handle(multi_handle, curl);
872 curl_multi_cleanup(multi_handle);
873 std::stringstream ss;
874 ss <<
"Internal state error in libcurl";
875 logTransferEvent(
LogMask::Error, rec,
"TRANSFER_CURL_ERROR", ss.str());
877 if ((retval = req.
ChunkResp(generateClientErr(ss, rec).c_str(), 0))) {
879 "Failed to send error message to the TPC client");
884 curl_multi_cleanup(multi_handle);
898 std::stringstream ss;
899 bool success =
false;
902 std::stringstream ss2;
903 ss2 <<
"Remote side failed with status code " << state.
GetStatusCode();
905 std::replace(err.begin(), err.end(),
'\n',
' ');
906 ss2 <<
"; error message: \"" << err <<
"\"";
908 logTransferEvent(
LogMask::Error, rec,
"TRANSFER_FAIL", ss2.str());
909 ss << generateClientErr(ss2, rec);
912 if (err.empty()) {err =
"(no error message provided)";}
913 else {std::replace(err.begin(), err.end(),
'\n',
' ');}
914 std::stringstream ss2;
915 ss2 <<
"Error when interacting with local filesystem: " << err;
916 logTransferEvent(
LogMask::Error, rec,
"TRANSFER_FAIL", ss2.str());
917 ss << generateClientErr(ss2, rec);
918 }
else if (res != CURLE_OK) {
919 std::stringstream ss2;
920 ss2 <<
"Internal transfer failure";
921 std::stringstream ss3;
922 ss3 << ss2.str() <<
": " << curl_easy_strerror(res);
923 logTransferEvent(
LogMask::Error, rec,
"TRANSFER_FAIL", ss3.str());
924 ss << generateClientErr(ss2, rec, res);
926 ss <<
"success: Created";
930 if ((retval = req.
ChunkResp(ss.str().c_str(), 0))) {
932 "Failed to send last update to remote client");
934 }
else if (success) {
945int TPCHandler::ProcessPushReq(
const std::string & resource, XrdHttpExtReq &req) {
947 rec.allow_local = m_allow_local;
948 rec.allow_private = m_allow_private;
949 rec.log_prefix =
"PushRequest";
951 rec.remote = resource;
955 if (name) rec.name = name;
956 logTransferEvent(
LogMask::Info, rec,
"PUSH_START",
"Starting a push request");
959 auto curl = curlPtr.get();
961 std::stringstream ss;
962 ss <<
"Failed to initialize internal transfer resources";
965 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
967 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
968 curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2);
969 curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (
long) CURL_HTTP_VERSION_1_1);
970#if CURL_AT_LEAST_VERSION(7, 85, 0)
971 curl_easy_setopt(curl, CURLOPT_PROTOCOLS_STR,
"https,http");
972 curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS_STR,
"https,http");
974 long protocols = CURLPROTO_HTTP | CURLPROTO_HTTPS;
975 curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols);
976 curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols);
978 curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
979 curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, &rec);
980 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
981 curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
982 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, &rec);
983 curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
986 std::string redirect_resource = req.
resource;
987 if (query_header != req.
headers.end()) {
988 redirect_resource = query_header->second;
992 uint64_t file_monid =
AtomicInc(m_monid);
994 std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, file_monid));
997 std::stringstream ss;
998 ss <<
"Failed to initialize internal transfer file handle";
1001 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1003 std::string full_url = prepareURL(req);
1005 std::string authz = GetAuthz(req);
1007 int open_results = OpenWaitStall(*fh, full_url,
SFS_O_RDONLY, 0644,
1010 int result = RedirectTransfer(curl, redirect_resource, req, fh->
error, rec);
1012 }
else if (
SFS_OK != open_results) {
1014 std::stringstream ss;
1016 if (msg == NULL) ss <<
"Failed to open local resource";
1020 int resp_result = req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1024 ConfigureCurlCA(curl);
1025 curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
1027 Stream stream(std::move(fh), 0, 0, m_log);
1031 return RunCurlWithUpdates(curl, req, state, rec);
1038int TPCHandler::ProcessPullReq(
const std::string &resource, XrdHttpExtReq &req) {
1040 rec.allow_local = m_allow_local;
1041 rec.allow_private = m_allow_private;
1042 rec.log_prefix =
"PullRequest";
1044 rec.remote = resource;
1048 if (name) rec.name = name;
1049 logTransferEvent(
LogMask::Info, rec,
"PULL_START",
"Starting a pull request");
1052 auto curl = curlPtr.get();
1054 std::stringstream ss;
1055 ss <<
"Failed to initialize internal transfer resources";
1058 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1068 if (m_fixed_route) {
1073 int sockFD = addrInfo ? addrInfo->
SockFD() : -1;
1078 logTransferEvent(
LogMask::Error, rec,
"FIXED_ROUTE_ERR",
"Failed to determine local address of incoming fixed route request");
1081 curl_easy_setopt(curl, CURLOPT_INTERFACE, ip);
1084 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
1085 curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2);
1086 curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (
long) CURL_HTTP_VERSION_1_1);
1087#if CURL_AT_LEAST_VERSION(7, 85, 0)
1088 curl_easy_setopt(curl, CURLOPT_PROTOCOLS_STR,
"https,http");
1089 curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS_STR,
"https,http");
1091 long protocols = CURLPROTO_HTTP | CURLPROTO_HTTPS;
1092 curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols);
1093 curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols);
1095 curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
1096 curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, &rec);
1097 curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
1098 curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA , &rec);
1099 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
1100 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, &rec);
1101 curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
1102 std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, m_monid++));
1104 std::stringstream ss;
1105 ss <<
"Failed to initialize internal transfer file handle";
1108 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1111 std::string redirect_resource = req.
resource;
1112 if (query_header != req.
headers.end()) {
1113 redirect_resource = query_header->second;
1117 if ((overwrite_header == req.
headers.end()) || (overwrite_header->second ==
"T")) {
1123 if (streams_header != req.
headers.end()) {
1124 int stream_req = -1;
1126 stream_req = std::stol(streams_header->second);
1129 if (stream_req < 0 || stream_req > 100) {
1130 std::stringstream ss;
1131 ss <<
"Invalid request for number of streams";
1133 logTransferEvent(
LogMask::Info, rec,
"INVALID_REQUEST", ss.str());
1134 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1136 streams = stream_req == 0 ? 1 : stream_req;
1139 rec.streams = streams;
1140 std::string full_url = prepareURL(req);
1141 std::string authz = GetAuthz(req);
1142 curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
1143 ConfigureCurlCA(curl);
1144 uint64_t sourceFileContentLength = 0;
1148 bool success =
false;
1149 bool mismatchDigests =
false;
1150 std::map<std::string,std::string> sourceFileReprDigest;
1151 GetRemoteFileInfoTPCPull(curl, req, sourceFileContentLength, sourceFileReprDigest, success, rec);
1155 full_url +=
"&oss.asize=" + std::to_string(sourceFileContentLength);
1156 mismatchDigests = mismatchReprDigest(sourceFileReprDigest,req,rec);
1158 if(!success || mismatchDigests) {
1165 int open_result = OpenWaitStall(*fh, full_url, mode|
SFS_O_WRONLY,
1169 int result = RedirectTransfer(curl, redirect_resource, req, fh->
error, rec);
1171 }
else if (
SFS_OK != open_result) {
1173 std::stringstream ss;
1175 if ((msg == NULL) || (*msg ==
'\0')) ss <<
"Failed to open local resource";
1180 generateClientErr(ss, rec).c_str(), 0);
1184 Stream stream(std::move(fh), streams * m_pipelining_multiplier, streams > 1 ? m_block_size : m_small_block_size, m_log);
1190 return RunCurlWithStreams(req, state, streams, rec);
1192 return RunCurlWithUpdates(curl, req, state, rec);
1200void TPCHandler::logTransferEvent(
LogMask mask,
const TPCLogRecord &rec,
1201 const std::string &event,
const std::string &message)
1203 if (!(m_log.getMsgMask() & mask)) {
return;}
1205 std::stringstream ss;
1206 ss <<
"event=" <<
event <<
", local=" << rec.local <<
", remote=" << rec.remote;
1207 if (rec.name.empty())
1208 ss <<
", user=(anonymous)";
1210 ss <<
", user=" << rec.name;
1211 if (rec.streams != 1)
1212 ss <<
", streams=" << rec.streams;
1213 if (rec.bytes_transferred >= 0)
1214 ss <<
", bytes_transferred=" << rec.bytes_transferred;
1215 if (rec.status >= 0)
1216 ss <<
", status=" << rec.status;
1217 if (rec.tpc_status >= 0)
1218 ss <<
", tpc_status=" << rec.tpc_status;
1219 if (!message.empty())
1220 ss <<
"; " << message;
1221 m_log.Log(mask, rec.log_prefix.c_str(), ss.str().c_str());
1224std::string TPCHandler::generateClientErr(std::stringstream &err_ss,
const TPCLogRecord &rec, CURLcode cCode) {
1225 std::stringstream ssret;
1226 ssret <<
"failure: " << err_ss.str() <<
", local=" << rec.local <<
", remote=" << rec.remote;
1227 if(cCode != CURLcode::CURLE_OK) {
1228 ssret <<
", HTTP library failure=" << curl_easy_strerror(cCode);
1239 if (curl_global_init(CURL_GLOBAL_DEFAULT)) {
1240 log->
Emsg(
"TPCInitialize",
"libcurl failed to initialize");
1246 log->
Emsg(
"TPCInitialize",
"TPC handler requires a config filename in order to load");
1250 log->
Emsg(
"TPCInitialize",
"Will load configuration for the TPC handler from", config);
1252 }
catch (std::runtime_error &re) {
1253 log->
Emsg(
"TPCInitialize",
"Encountered a runtime failure when loading ", re.what());
XrdHttpExtHandler * XrdHttpGetExtHandler(XrdHttpExtHandlerArgs)
XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC)
static std::string PrepareURL(const std::string &url)
std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
static bool IsAllowedScheme(const std::string &url)
int mapErrNoToHttp(int errNo)
std::string httpStatusToString(int status)
Utility functions for XrdHTTP.
std::string encode_str(const std::string &str)
void splitHostCgi(std::string_view target, std::string &host, std::string &cgi)
void getline(uchar *buff, int blen)
const std::map< std::string, std::string > & GetReprDigest() const
int GetStatusCode() const
off_t BytesTransferred() const
void SetErrorMessage(const std::string &error_msg)
std::string GetErrorMessage() const
std::string GetConnectionDescription()
void SetupHeaders(XrdHttpExtReq &req)
void SetContentLength(const off_t content_length)
off_t GetContentLength() const
void SetErrorCode(int error_code)
void SetupHeadersForHEAD(XrdHttpExtReq &req)
TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv)
virtual int ProcessReq(XrdHttpExtReq &req)
virtual bool MatchesPath(const char *verb, const char *path)
Tells if the incoming path is recognized as one of the paths that have to be processed.
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
void GetClientID(std::string &clid)
std::map< std::string, std::string > & headers
std::map< std::string, std::string > mReprDigest
Repr-Digest map where the key is the digest name and the value is the base64 encoded digest value.
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
const XrdSecEntity & GetSecEntity() const
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.
static std::string prepareOpenURL(PrepareOpenURLParams ¶ms)
static int GetSokInfo(int fd, char *theAddr, int theALen, char &theType)
void * GetPtr(const char *varname)
const char * getErrText()
void setUCap(int ucval)
Set user capabilties.
static std::map< std::string, T >::const_iterator caseInsensitiveFind(const std::map< std::string, T > &m, const std::string &lowerCaseSearchKey)
XrdNetAddrInfo * addrInfo
Entity's connection details.
char * name
Entity's name.
virtual int open(const char *fileName, XrdSfsFileOpenMode openMode, mode_t createMode, const XrdSecEntity *client=0, const char *opaque=0)=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysLogger * logger(XrdSysLogger *lp=0)
static Outcome Redirect(const char *trg, int &port, XrdNetAddrInfo &clientAddr, std::string &outTarget, std::string &errMsg)
std::unique_ptr< CURL, CurlDeleter > ManagedCurlHandle
void operator()(CURL *curl)
static const int uIPv64
ucap: Supports only IPv4 info