62 WaitTask( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
65 o <<
"WaitTask for: 0x" << handler->
GetRequest();
69 virtual time_t Run( time_t now )
71 pHandler->WaitDone( now );
75 XrdCl::XRootDMsgHandler *pHandler;
97 virtual void Run(
void *arg )
99 pHandler->HandleResponse();
111 const int sst = pSendingState.fetch_or( kSawResp );
113 if( !( sst & kSendDone ) && !( sst & kSawResp ) )
119 log->
Dump(
XRootDMsg,
"[%s] Message %s reply received before notification "
120 "that it was sent, assuming it was sent ok.",
121 pUrl.GetHostId().c_str(),
122 pRequest->GetObfuscatedDescription().c_str() );
129 if( pOksofarAsAnswer )
132 while( pResponse ) pCV.Wait();
139 log->
Warning(
ExDbgMsg,
"[%s] MsgHandler is examining a response although "
140 "it already owns a response: %p (message: %s ).",
141 pUrl.GetHostId().c_str(), (
void*)
this,
142 pRequest->GetObfuscatedDescription().c_str() );
146 if( msg->GetSize() < 8 )
186 pBodyReader->SetDataLength( dlen );
202 "message %s", pUrl.GetHostId().c_str(),
203 pRequest->GetObfuscatedDescription().c_str() );
243 log->
Dump(
XRootDMsg,
"[%s] Got a kXR_oksofar response to request "
244 "%s", pUrl.GetHostId().c_str(),
245 pRequest->GetObfuscatedDescription().c_str() );
247 if( !pOksofarAsAnswer )
249 pPartialResps.emplace_back( std::move( pResponse ) );
260 pTimeoutFence.store(
true, std::memory_order_relaxed );
269 pTimeoutFence.store(
true, std::memory_order_relaxed );
278 log->
Dump(
XRootDMsg,
"[%s] Got a kXR_status response to request "
279 "%s", pUrl.GetHostId().c_str(),
280 pRequest->GetObfuscatedDescription().c_str() );
297 pTimeoutFence.store(
true, std::memory_order_relaxed );
331 log->
Error(
XRootDMsg,
"[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
352 pUrl.GetHostId().c_str() );
364 pPartialResps.push_back( std::move( pResponse ) );
377 pPageReader.reset(
new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
378 pPageReader->SetRsp( rspst );
393 pResponse->GetCursor() )
423 if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
432 int *qryResponse =
nullptr;
434 qryResult.
Get( qryResponse );
436 pHosts->back().flags = *qryResponse;
438 qryResponse =
nullptr;
441 qryResult.
Get( qryResponse );
443 pHosts->back().protocol = *qryResponse;
462 pSendingState.fetch_or( kInFlyDone );
469 pAggregatedWaitTime = 0;
478 log->
Dump(
XRootDMsg,
"[%s] Got a kXR_ok response to request %s",
479 pUrl.GetHostId().c_str(),
480 pRequest->GetObfuscatedDescription().c_str() );
488 log->
Dump(
XRootDMsg,
"[%s] Got a kXR_status response to request %s",
489 pUrl.GetHostId().c_str(),
490 pRequest->GetObfuscatedDescription().c_str() );
501 log->
Dump(
XRootDMsg,
"[%s] Got a kXR_oksofar response to request %s",
502 pUrl.GetHostId().c_str(),
503 pRequest->GetObfuscatedDescription().c_str() );
514 char *errmsg =
new char[rsp->
hdr.
dlen-3]; errmsg[rsp->
hdr.
dlen-4] = 0;
515 memcpy( errmsg, rsp->
body.error.errmsg, rsp->
hdr.
dlen-4 );
516 log->
Dump(
XRootDMsg,
"[%s] Got a kXR_error response to request %s "
517 "[%d] %s", pUrl.GetHostId().c_str(),
518 pRequest->GetObfuscatedDescription().c_str(), rsp->
body.error.errnum,
534 pUrl.GetHostId().c_str() );
540 char *urlInfoBuff =
new char[rsp->
hdr.
dlen-3];
541 urlInfoBuff[rsp->
hdr.
dlen-4] = 0;
542 memcpy( urlInfoBuff, rsp->
body.redirect.host, rsp->
hdr.
dlen-4 );
543 std::string urlInfo = urlInfoBuff;
544 delete [] urlInfoBuff;
546 "message %s: %s, port %d", pUrl.GetHostId().c_str(),
547 pRequest->GetObfuscatedDescription().c_str(), urlInfo.c_str(),
548 rsp->
body.redirect.port );
553 if( !pRedirectCounter )
556 "message %s, the last known error is: %s",
557 pUrl.GetHostId().c_str(),
558 pRequest->GetObfuscatedDescription().c_str(),
559 pLastError.ToString().c_str() );
572 uint32_t flags = pHosts->back().flags;
573 if( !pHasLoadBalancer )
582 if( ( flags &
kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
584 pLoadBalancer = pHosts->back();
585 log->
Dump(
XRootDMsg,
"[%s] Current server has been assigned "
586 "as a load-balancer for message %s",
587 pUrl.GetHostId().c_str(),
588 pRequest->GetObfuscatedDescription().c_str() );
589 HostList::iterator it;
590 for( it = pHosts->begin(); it != pHosts->end(); ++it )
591 it->loadBalancer =
false;
592 pHosts->back().loadBalancer =
true;
603 pEffectiveDataServerUrl =
new URL( pHosts->back().url );
608 std::vector<std::string> urlComponents;
612 std::ostringstream o;
614 o << urlComponents[0];
615 if( rsp->
body.redirect.port > 0 )
616 o <<
":" << rsp->
body.redirect.port <<
"/";
617 else if( rsp->
body.redirect.port < 0 )
632 std::string url( rsp->
body.redirect.host, rsp->
hdr.
dlen-4 );
633 pPostMaster->CollapseRedirect( pUrl, url );
638 std::string url( rsp->
body.redirect.host, rsp->
hdr.
dlen-4 );
640 pRedirectAsAnswer =
true;
644 URL newUrl =
URL( o.str() );
649 pUrl.GetHostId().c_str(), urlInfo.c_str() );
654 if( pUrl.GetUserName() !=
"" && newUrl.
GetUserName() ==
"" )
657 if( pUrl.GetPassword() !=
"" && newUrl.
GetPassword() ==
"" )
666 std::ostringstream ossXrd;
669 for(URL::ParamsMap::const_iterator it = urlParams.begin();
670 it != urlParams.end(); ++it )
672 if( it->first.compare( 0, 4,
"xrd." ) &&
673 it->first.compare( 0, 6,
"xrdcl." ) )
676 ossXrd << it->first <<
'=' << it->second <<
'&';
679 std::string xrdCgi = ossXrd.str();
680 pRedirectUrl = newUrl.
GetURL();
683 if( urlComponents.size() > 1 )
686 pRedirectUrl += urlComponents[1];
687 std::ostringstream o;
688 o <<
"fake://fake:111//fake?";
689 o << urlComponents[1];
691 if( urlComponents.size() == 3 )
692 o <<
'?' << urlComponents[2];
698 pRedirectUrl += xrdCgi;
701 cgiURL =
URL( o.str() );
706 std::ostringstream o;
707 o <<
"fake://fake:111//fake?";
709 cgiURL =
URL( o.str() );
711 pRedirectUrl += xrdCgi;
721 pRedirectAsAnswer =
true;
723 if( pRedirectAsAnswer )
734 Status st = RewriteRequestRedirect( newUrl );
745 if( ( pUrl.GetProtocol() ==
"roots" || pUrl.GetProtocol() ==
"xroots" ) &&
761 uint32_t waitSeconds = 0;
765 char *infoMsg =
new char[rsp->
hdr.
dlen-3];
767 memcpy( infoMsg, rsp->
body.wait.infomsg, rsp->
hdr.
dlen-4 );
768 log->
Dump(
XRootDMsg,
"[%s] Got kXR_wait response of %d seconds to "
769 "message %s: %s", pUrl.GetHostId().c_str(),
770 rsp->
body.wait.seconds, pRequest->GetObfuscatedDescription().c_str(),
773 waitSeconds = rsp->
body.wait.seconds;
777 log->
Dump(
XRootDMsg,
"[%s] Got kXR_wait response of 0 seconds to "
778 "message %s", pUrl.GetHostId().c_str(),
779 pRequest->GetObfuscatedDescription().c_str() );
782 pAggregatedWaitTime += waitSeconds;
787 if( OmitWait( *pRequest, pLoadBalancer.url ) )
791 if( pAggregatedWaitTime > maxWait )
803 Status st = RewriteRequestWait();
815 time_t resendTime = ::time(0)+waitSeconds;
817 if( resendTime < pExpiration )
819 log->
Debug(
ExDbgMsg,
"[%s] Scheduling WaitTask for MsgHandler: %p (message: %s ).",
820 pUrl.GetHostId().c_str(), (
void*)
this,
821 pRequest->GetObfuscatedDescription().c_str() );
823 TaskManager *taskMgr = pPostMaster->GetTaskManager();
824 taskMgr->
RegisterTask(
new WaitTask(
this ), resendTime );
828 log->
Debug(
XRootDMsg,
"[%s] Wait time is too long, timing out %s",
829 pUrl.GetHostId().c_str(),
830 pRequest->GetObfuscatedDescription().c_str() );
847 pUrl.GetHostId().c_str() );
853 log->
Dump(
XRootDMsg,
"[%s] Got kXR_waitresp response of %d seconds to "
854 "message %s", pUrl.GetHostId().c_str(),
855 rsp->
body.waitresp.seconds,
856 pRequest->GetObfuscatedDescription().c_str() );
865 log->
Dump(
XRootDMsg,
"[%s] Got unrecognized response %d to "
866 "message %s", pUrl.GetHostId().c_str(),
867 rsp->
hdr.
status, pRequest->GetObfuscatedDescription().c_str() );
884 log->
Dump(
XRootDMsg,
"[%s] Stream event reported for msg %s",
885 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
890 if( pTimeoutFence.load( std::memory_order_relaxed ) )
893 HandleError( status );
902 uint32_t &bytesRead )
908 return pPageReader->Read( *socket, bytesRead );
910 return pBodyReader->Read( *socket, bytesRead );
926 log->
Dump(
XRootDMsg,
"[%s] Got notification that outgoing message %s "
927 "was sent successfully.", pUrl.GetHostId().c_str(),
934 const int sst = pSendingState.fetch_or( kSendDone );
937 if( status.
IsOK() && ( sst & kSendDone ) )
return;
941 if( !status.
IsOK() && ( ( sst & kFinalResp ) || ( sst & kSawResp ) ) )
943 log->
Error(
XRootDMsg,
"[%s] Unexpected error for message %s. Trying to "
944 "recover.", pUrl.GetHostId().c_str(),
946 HandleError( status );
950 if( sst & kFinalResp )
958 if( sst & kRetryAtSrv )
962 HandleError( RetryAtServer( pRetryAtUrl, pRetryAtEntryType ) );
987 log->
Error(
XRootDMsg,
"[%s] Impossible to send message %s. Trying to "
988 "recover.", pUrl.GetHostId().c_str(),
990 HandleError( status );
1017 uint32_t &bytesWritten )
1022 if( !pChunkList->empty() && !pCrc32cDigests.empty() )
1032 int fLen = 0, lLen = 0;
1038 if( pPgWrtCksumBuff.GetCursor() == 0 )
1040 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1041 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest,
sizeof( uint32_t ) );
1044 uint32_t btsLeft = chunk.
length - pAsyncOffset;
1045 uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen :
XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
1046 if( pglen > btsLeft ) pglen = btsLeft;
1047 char* pgbuf =
static_cast<char*
>( chunk.
buffer ) + pAsyncOffset;
1049 while( btsLeft > 0 )
1052 while( pPgWrtCksumBuff.GetCursor() <
sizeof( uint32_t ) )
1054 uint32_t dgstlen =
sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
1055 char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
1057 Status st = socket->
Send( dgstbuf, dgstlen, btswrt );
1058 if( !st.
IsOK() )
return st;
1059 bytesWritten += btswrt;
1060 pPgWrtCksumBuff.AdvanceCursor( btswrt );
1065 Status st = socket->
Send( pgbuf, pglen, btswrt );
1066 if( !st.
IsOK() )
return st;
1070 bytesWritten += btswrt;
1071 pAsyncOffset += btswrt;
1077 ++pPgWrtCurrentPageNb;
1078 if( pPgWrtCurrentPageNb < nbpgs )
1081 pPgWrtCksumBuff.SetCursor( 0 );
1082 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1083 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest,
sizeof( uint32_t ) );
1087 if( pglen > btsLeft ) pglen = btsLeft;
1089 pPgWrtCurrentPageOffset = 0;
1093 pPgWrtCurrentPageOffset += btswrt;
1097 else if( !pChunkList->empty() )
1099 size_t size = pChunkList->size();
1100 for(
size_t i = pAsyncChunkIndex ; i < size; ++i )
1102 char *buffer = (
char*)(*pChunkList)[i].buffer;
1103 uint32_t size = (*pChunkList)[i].length;
1104 size_t leftToBeWritten = size - pAsyncOffset;
1106 while( leftToBeWritten )
1109 Status st = socket->
Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1110 bytesWritten += btswrt;
1112 pAsyncOffset += btswrt;
1113 leftToBeWritten -= btswrt;
1133 log->
Debug(
XRootDMsg,
"[%s] Channel is encrypted: cannot use kernel buffer.",
1134 pUrl.GetHostId().c_str() );
1139 pChunkList->push_back(
ChunkInfo( 0, ret, ubuff ) );
1146 while( !pKBuff->Empty() )
1150 bytesWritten += btswrt;
1154 log->
Debug(
XRootDMsg,
"[%s] Request %s payload (kernel buffer) transferred to socket.",
1155 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
1175 pTimeoutFence.store(
false, std::memory_order_relaxed );
1181 void XRootDMsgHandler::HandleResponse()
1192 const int sst = pSendingState.fetch_or( kFinalResp );
1193 if( ( sst & kSawReadySend ) && !( sst & kSendDone ) )
1201 XRootDStatus *status = ProcessStatus();
1202 AnyObject *response = 0;
1205 log->Debug(
ExDbgMsg,
"[%s] Calling MsgHandler: %p (message: %s ) "
1209 status->ToString().c_str() );
1211 if( status->IsOK() )
1213 Status st = ParseResponse( response );
1218 status =
new XRootDStatus( st );
1228 pRdirEntry->status = *status;
1229 pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
1235 if( pSidMgr && finalrsp )
1237 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1238 if( status->IsOK() || !IsInFly() ||
1243 HostList *hosts = pHosts.release();
1245 pHosts.reset(
new HostList( *hosts ) );
1247 pResponseHandler->HandleResponseWithHosts( status, response, hosts );
1260 XrdSysCondVarHelper lck( pCV );
1262 pTimeoutFence.store(
false, std::memory_order_relaxed );
1273 XRootDStatus *st =
new XRootDStatus( pStatus );
1274 ServerResponse *rsp = 0;
1276 rsp = (ServerResponse *)pResponse->GetBuffer();
1278 if( !pStatus.IsOK() && rsp )
1282 st->errNo = rsp->
body.error.errnum;
1285 std::string errmsg( rsp->
body.error.errmsg, rsp->
hdr.
dlen-5 );
1287 errmsg +=
" Last seen error: " + pLastError.ToString();
1288 st->SetErrorMessage( errmsg );
1291 st->SetErrorMessage( pRedirectUrl );
1305 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
1306 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1314 log->Error(
XRootDMsg,
"Internal Error: unable to process redirect" );
1319 uint32_t length = 0;
1325 if( pPartialResps.empty() )
1327 buffer = rsp->
body.buffer.data;
1336 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1338 ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1343 buff.Allocate( length );
1344 uint32_t offset = 0;
1345 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1347 ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1348 buff.Append( part->
body.buffer.data, part->
hdr.
dlen, offset );
1351 buff.Append( rsp->
body.buffer.data, rsp->
hdr.
dlen, offset );
1352 buffer = buff.GetBuffer();
1383 AnyObject *obj =
new AnyObject();
1385 char *nullBuffer =
new char[length+1];
1386 nullBuffer[length] = 0;
1387 memcpy( nullBuffer, buffer, length );
1389 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as "
1390 "LocateInfo: %s", pUrl.GetHostId().c_str(),
1391 pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1392 LocationInfo *data =
new LocationInfo();
1394 if( data->ParseServerResponse( nullBuffer ) ==
false )
1398 delete [] nullBuffer;
1401 delete [] nullBuffer;
1413 AnyObject *obj =
new AnyObject();
1420 StatInfoVFS *data =
new StatInfoVFS();
1422 char *nullBuffer =
new char[length+1];
1423 nullBuffer[length] = 0;
1424 memcpy( nullBuffer, buffer, length );
1426 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as "
1427 "StatInfoVFS: %s", pUrl.GetHostId().c_str(),
1428 pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1430 if( data->ParseServerResponse( nullBuffer ) ==
false )
1434 delete [] nullBuffer;
1437 delete [] nullBuffer;
1446 StatInfo *data =
new StatInfo();
1448 char *nullBuffer =
new char[length+1];
1449 nullBuffer[length] = 0;
1450 memcpy( nullBuffer, buffer, length );
1452 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as StatInfo: "
1453 "%s", pUrl.GetHostId().c_str(),
1454 pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1456 if( data->ParseServerResponse( nullBuffer ) ==
false )
1460 delete [] nullBuffer;
1463 delete [] nullBuffer;
1476 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as ProtocolInfo",
1477 pUrl.GetHostId().c_str(),
1478 pRequest->GetObfuscatedDescription().c_str() );
1482 log->Error(
XRootDMsg,
"[%s] Got invalid redirect response.",
1483 pUrl.GetHostId().c_str() );
1487 AnyObject *obj =
new AnyObject();
1488 ProtocolInfo *data =
new ProtocolInfo( rsp->
body.protocol.pval,
1489 rsp->
body.protocol.flags );
1500 AnyObject *obj =
new AnyObject();
1501 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as "
1502 "DirectoryList", pUrl.GetHostId().c_str(),
1503 pRequest->GetObfuscatedDescription().c_str() );
1507 memcpy( path, pRequest->GetBuffer(24), req->
dirlist.
dlen );
1509 DirectoryList *data =
new DirectoryList();
1510 data->SetParentName( path );
1513 char *nullBuffer =
new char[length+1];
1514 nullBuffer[length] = 0;
1515 memcpy( nullBuffer, buffer, length );
1517 bool invalidrsp =
false;
1519 if( !pDirListStarted )
1522 pDirListStarted =
true;
1524 invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer );
1527 invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer, pDirListWithStat );
1533 delete [] nullBuffer;
1537 delete [] nullBuffer;
1548 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as OpenInfo",
1549 pUrl.GetHostId().c_str(),
1550 pRequest->GetObfuscatedDescription().c_str() );
1554 log->Error(
XRootDMsg,
"[%s] Got invalid open response.",
1555 pUrl.GetHostId().c_str() );
1559 AnyObject *obj =
new AnyObject();
1560 StatInfo *statInfo = 0;
1567 log->Dump(
XRootDMsg,
"[%s] Parsing StatInfo in response to %s",
1568 pUrl.GetHostId().c_str(),
1569 pRequest->GetObfuscatedDescription().c_str() );
1573 char *nullBuffer =
new char[rsp->
hdr.
dlen-11];
1574 nullBuffer[rsp->
hdr.
dlen-12] = 0;
1575 memcpy( nullBuffer, buffer+12, rsp->
hdr.
dlen-12 );
1577 statInfo =
new StatInfo();
1578 if( statInfo->ParseServerResponse( nullBuffer ) ==
false )
1583 delete [] nullBuffer;
1586 if( rsp->
hdr.
dlen < 12 || !statInfo )
1588 log->Error(
XRootDMsg,
"[%s] Unable to parse StatInfo in response "
1589 "to %s", pUrl.GetHostId().c_str(),
1590 pRequest->GetObfuscatedDescription().c_str() );
1596 OpenInfo *data =
new OpenInfo( (uint8_t*)buffer,
1597 pResponse->GetSessionId(),
1609 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as ChunkInfo",
1610 pUrl.GetHostId().c_str(),
1611 pRequest->GetObfuscatedDescription().c_str() );
1613 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1619 if( pPartialResps[i]->GetSize() > 8 )
1626 if( pResponse->GetSize() > 8 )
1631 return pBodyReader->GetResponse( response );
1639 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as PageInfo",
1640 pUrl.GetHostId().c_str(),
1641 pRequest->GetObfuscatedDescription().c_str() );
1646 ChunkInfo chunk = pChunkList->front();
1647 bool sizeMismatch =
false;
1648 uint32_t currentOffset = 0;
1649 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1651 ServerResponseV2 *part = (ServerResponseV2*)pPartialResps[i]->GetBuffer();
1659 if( currentOffset + datalen > chunk.length )
1661 sizeMismatch =
true;
1665 currentOffset += datalen;
1668 ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
1669 size_t datalen = rspst->
status.
bdy.
dlen - NbPgPerRsp( rspst->
info.pgread.offset,
1671 if( currentOffset + datalen <= chunk.length )
1672 currentOffset += datalen;
1674 sizeMismatch =
true;
1679 if( pChunkStatus.front().sizeError || sizeMismatch )
1681 log->Error(
XRootDMsg,
"[%s] Handling response to %s: user supplied "
1682 "buffer is too small for the received data.",
1683 pUrl.GetHostId().c_str(),
1684 pRequest->GetObfuscatedDescription().c_str() );
1688 AnyObject *obj =
new AnyObject();
1689 PageInfo *pgInfo =
new PageInfo( chunk.offset, currentOffset, chunk.buffer,
1690 std::move( pCrc32cDigests) );
1702 std::vector<std::tuple<uint64_t, uint32_t>> retries;
1704 ServerResponseV2 *rsp = (ServerResponseV2*)pResponse->GetBuffer();
1707 ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)pResponse->GetBuffer(
sizeof( ServerResponseV2 ) );
1709 retries.reserve( pgcnt );
1711 sizeof( ServerResponseBody_pgWrCSE ) );
1713 for(
size_t i = 0; i < pgcnt; ++i )
1716 if( i == 0 ) len = cse->
dlFirst;
1717 else if( i == pgcnt - 1 ) len = cse->
dlLast;
1718 retries.push_back( std::make_tuple( pgoffs[i], len ) );
1722 RetryInfo *info =
new RetryInfo( std::move( retries ) );
1723 AnyObject *obj =
new AnyObject();
1736 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as "
1737 "VectorReadInfo", pUrl.GetHostId().c_str(),
1738 pRequest->GetObfuscatedDescription().c_str() );
1740 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1746 if( pPartialResps[i]->GetSize() > 8 )
1753 if( pResponse->GetSize() > 8 )
1758 return pBodyReader->GetResponse( response );
1766 int len = rsp->hdr.dlen;
1767 char* data = rsp->body.buffer.data;
1769 return ParseXAttrResponse( data, len, response );
1780 AnyObject *obj =
new AnyObject();
1781 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as BinaryData",
1782 pUrl.GetHostId().c_str(),
1783 pRequest->GetObfuscatedDescription().c_str() );
1786 data->Allocate( length );
1787 data->Append( buffer, length );
1800 Status XRootDMsgHandler::ParseXAttrResponse(
char *data,
size_t len,
1803 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1814 if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1818 if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1821 std::vector<XAttrStatus> resp;
1823 for(
kXR_char i = 0; i < nattr; ++i )
1826 if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1834 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1839 resp.push_back( XAttrStatus( name, st ) );
1846 response =
new AnyObject();
1847 response->Set(
new std::vector<XAttrStatus>( std::move( resp ) ) );
1857 if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1861 if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1864 std::vector<XAttr> resp;
1865 resp.reserve( nattr );
1868 for(
kXR_char i = 0; i < nattr; ++i )
1871 if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1879 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1884 resp.push_back( XAttr( name, st ) );
1888 for(
kXR_char i = 0; i < nattr; ++i )
1891 if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1893 vlen = ntohl( vlen );
1896 if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1899 resp[i].value.swap( value );
1906 response =
new AnyObject();
1907 response->Set(
new std::vector<XAttr>( std::move( resp ) ) );
1915 std::vector<XAttr> resp;
1920 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1924 if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1926 vlen = ntohl( vlen );
1929 if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1932 resp.push_back( XAttr( name, value ) );
1936 response =
new AnyObject();
1937 response->Set(
new std::vector<XAttr>( std::move( resp ) ) );
1951 Status XRootDMsgHandler::RewriteRequestRedirect(
const URL &newUrl )
1959 std::string xrdCgi =
"";
1960 std::ostringstream ossXrd;
1961 for(URL::ParamsMap::const_iterator it = newCgi.begin(); it != newCgi.end(); ++it )
1963 if( it->first.compare( 0, 4,
"xrd." ) )
1965 ossXrd << it->first <<
'=' << it->second <<
'&';
1968 xrdCgi = ossXrd.str();
1978 std::string surl = newUrl.GetURL();
1979 (surl.find(
'?') == std::string::npos) ? (surl +=
'?') :
1980 ((*surl.rbegin() !=
'&') ? (surl +=
'&') : (surl +=
""));
1984 std::string surlLog = surl;
1985 if( unlikely( log->GetLevel() >= Log::ErrorMsg ) ) {
1986 surlLog = obfuscateAuth(surlLog);
1988 log->Error(
XRootDMsg,
"[%s] Failed to build redirection URL from data: %s",
1989 newUrl.GetHostId().c_str(), surl.c_str());
2006 Status XRootDMsgHandler::RewriteRequestWait()
2010 XRootDTransport::UnMarshallRequest( pRequest );
2033 XRootDTransport::SetDescription( pRequest );
2034 XRootDTransport::MarshallRequest( pRequest );
2041 void XRootDMsgHandler::HandleError( XRootDStatus status )
2049 if( pSidMgr && IsInFly() && (
2050 status.code == errOperationExpired ||
2051 status.code == errOperationInterrupted ) )
2060 if( !noreplicas ) pLastError = status;
2062 Log *log = DefaultEnv::GetLog();
2063 log->Debug( XRootDMsg,
"[%s] Handling error while processing %s: %s.",
2064 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str(),
2065 status.ToString().c_str() );
2071 if( status.IsFatal() && status.code == errTlsError && status.errNo == EAGAIN )
2073 if( pSslErrCnt < MaxSslErrRetry )
2075 status.status &= ~stFatal;
2091 if( status.code == errErrorResponse )
2093 if( RetriableErrorResponse( status ) )
2095 UpdateTriedCGI(status.errNo);
2097 SwitchOnRefreshFlag();
2098 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2115 if( status.code == errOperationExpired || pRequest->GetSessionId() ||
2116 status.code == errOperationInterrupted || time(0) >= pExpiration )
2118 log->Error( XRootDMsg,
"[%s] Unable to get the response to request %s",
2119 pUrl.GetHostId().c_str(),
2120 pRequest->GetObfuscatedDescription().c_str() );
2131 if( pLoadBalancer.url.IsValid() &&
2132 pLoadBalancer.url.GetLocation() != pUrl.GetLocation() )
2135 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2140 if( !status.IsFatal() && IsRetriable() )
2142 log->Info( XRootDMsg,
"[%s] Retrying request: %s.",
2143 pUrl.GetHostId().c_str(),
2144 pRequest->GetObfuscatedDescription().c_str() );
2147 HandleError( RetryAtServer( pUrl, RedirectEntry::EntryRetry ) );
2159 Status XRootDMsgHandler::RetryAtServer(
const URL &url, RedirectEntry::Type entryType )
2161 if( &pRetryAtUrl != &url ) pRetryAtUrl = url;
2162 pRetryAtEntryType = entryType;
2163 const int sst = pSendingState.fetch_or( kRetryAtSrv );
2169 if( ( sst & kSawReadySend ) && !( sst & kSendDone ) )
return Status();
2170 pSendingState &= ~kRetryAtSrv;
2173 Log *log = DefaultEnv::GetLog();
2178 if( pRdirEntry ) pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
2179 pRdirEntry.reset(
new RedirectEntry( pUrl.GetLocation(), url.GetLocation(), entryType ) );
2181 if( pUrl.GetLocation() != url.GetLocation() )
2183 pHosts->push_back( url );
2195 pSidMgr->ReleaseSID( req->
streamid );
2202 if( !url.IsLocalFile() )
2204 pSidMgr = SIDMgrPool::Instance().GetSIDMgr( url );
2205 Status st = pSidMgr->AllocateSID( req->
streamid );
2208 log->Error( XRootDMsg,
"[%s] Impossible to send message %s.",
2209 pUrl.GetHostId().c_str(),
2210 pRequest->GetObfuscatedDescription().c_str() );
2218 if( pUrl.IsMetalink() && pFollowMetalink )
2220 log->Debug( ExDbgMsg,
"[%s] Metaling redirection for MsgHandler: %p (message: %s ).",
2221 pUrl.GetHostId().c_str(), (
void*)
this,
2222 pRequest->GetObfuscatedDescription().c_str() );
2224 return pPostMaster->Redirect( pUrl, pRequest,
this );
2226 else if( pUrl.IsLocalFile() )
2228 HandleLocalRedirect( &pUrl );
2233 log->Debug( ExDbgMsg,
"[%s] Retry at server MsgHandler: %p (message: %s ).",
2234 pUrl.GetHostId().c_str(), (
void*)
this,
2235 pRequest->GetObfuscatedDescription().c_str() );
2236 return pPostMaster->Send( pUrl, pRequest,
this,
true, pExpiration );
2243 void XRootDMsgHandler::UpdateTriedCGI(uint32_t errNo)
2253 if( pEffectiveDataServerUrl )
2255 tried = pEffectiveDataServerUrl->GetHostName();
2256 delete pEffectiveDataServerUrl;
2257 pEffectiveDataServerUrl = 0;
2263 tried = pUrl.GetHostName();
2268 {
if (errNo ==
kXR_NotFound) cgi[
"triedrc"] =
"enoent";
2269 else if (errNo ==
kXR_IOError) cgi[
"triedrc"] =
"ioerr";
2270 else if (errNo ==
kXR_FSError) cgi[
"triedrc"] =
"fserr";
2279 if( pLoadBalancer.url.IsValid() && (pLoadBalancer.flags &
kXR_attrMeta) )
2281 HostList::reverse_iterator it;
2282 for( it = pHosts->rbegin()+1; it != pHosts->rend(); ++it )
2284 if( it->loadBalancer )
2287 tried +=
"," + it->url.GetHostName();
2294 cgi[
"tried"] = tried;
2295 XRootDTransport::UnMarshallRequest( pRequest );
2296 MessageUtils::RewriteCGIAndPath( pRequest, cgi,
false,
"" );
2297 XRootDTransport::MarshallRequest( pRequest );
2303 void XRootDMsgHandler::SwitchOnRefreshFlag()
2305 XRootDTransport::UnMarshallRequest( pRequest );
2321 XRootDTransport::SetDescription( pRequest );
2322 XRootDTransport::MarshallRequest( pRequest );
2329 void XRootDMsgHandler::HandleRspOrQueue()
2334 bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
2340 const int sst = pSendingState.fetch_or( kFinalResp );
2341 if( ( sst & kSawReadySend ) && !( sst & kSendDone ) )
2345 JobManager *jobMgr = pPostMaster->GetJobManager();
2346 if( jobMgr->IsWorker() )
2350 Log *log = DefaultEnv::GetLog();
2351 log->Debug( ExDbgMsg,
"[%s] Passing to the thread-pool MsgHandler: %p (message: %s ).",
2352 pUrl.GetHostId().c_str(), (
void*)
this,
2353 pRequest->GetObfuscatedDescription().c_str() );
2354 jobMgr->QueueJob(
new HandleRspJob(
this ), 0 );
2361 void XRootDMsgHandler::HandleLocalRedirect( URL *url )
2363 Log *log = DefaultEnv::GetLog();
2364 log->Debug( ExDbgMsg,
"[%s] Handling local redirect - MsgHandler: %p (message: %s ).",
2365 pUrl.GetHostId().c_str(), (
void*)
this,
2366 pRequest->GetObfuscatedDescription().c_str() );
2368 if( !pLFileHandler )
2370 HandleError( XRootDStatus( stFatal, errNotSupported ) );
2374 AnyObject *resp = 0;
2375 pLFileHandler->SetHostList( *pHosts );
2376 XRootDStatus st = pLFileHandler->Open( url, pRequest, resp );
2383 pResponseHandler->HandleResponseWithHosts(
new XRootDStatus(),
2394 bool XRootDMsgHandler::IsRetriable()
2397 DefaultEnv::GetEnv()->GetString(
"OpenRecovery", value );
2398 if( value ==
"true" )
return true;
2409 Log *log = DefaultEnv::GetLog();
2410 log->Debug( XRootDMsg,
2411 "[%s] Not allowed to retry open request (OpenRecovery disabled): %s.",
2412 pUrl.GetHostId().c_str(),
2413 pRequest->GetObfuscatedDescription().c_str() );
2426 bool XRootDMsgHandler::OmitWait( Message &request,
const URL &url )
2429 if( !url.IsMetalink() )
2440 RedirectorRegistry ®istry = RedirectorRegistry::Instance();
2441 VirtualRedirector *redirector = registry.Get( url );
2445 if( redirector->Count( request ) > 1 )
2454 bool XRootDMsgHandler::RetriableErrorResponse(
const Status &status )
2458 if( !( pLoadBalancer.url.IsValid() &&
2459 pUrl.GetLocation() != pLoadBalancer.url.GetLocation() ) )
2470 if( !( pLoadBalancer.flags &
kXR_attrMeta ) )
return false;
2481 DefaultEnv::GetEnv()->GetInt(
"NotAuthorizedRetryLimit", limit );
2482 bool ret = pNotAuthorizedCounter < limit;
2483 ++pNotAuthorizedCounter;
2486 Log *log = DefaultEnv::GetLog();
2487 log->Error( XRootDMsg,
2488 "[%s] Reached limit of NotAuthorized retries!",
2489 pUrl.GetHostId().c_str() );
2509 void XRootDMsgHandler::DumpRedirectTraceBack()
2511 if( pRedirectTraceBack.empty() )
return;
2513 std::stringstream sstrm;
2515 sstrm <<
"Redirect trace-back:\n";
2519 auto itr = pRedirectTraceBack.begin();
2520 sstrm <<
'\t' << counter <<
". " << (*itr)->ToString() <<
'\n';
2526 for( ; itr != pRedirectTraceBack.end(); ++itr, ++prev, ++counter )
2527 sstrm <<
'\t' << counter <<
". "
2528 << (*itr)->ToString( (*prev)->status.IsOK() ) <<
'\n';
2531 DefaultEnv::GetEnv()->GetInt(
"NotAuthorizedRetryLimit", authlimit );
2533 bool warn = !pStatus.IsOK() &&
2536 ( pStatus.code ==
errAuthFailed && pNotAuthorizedCounter >= authlimit ) );
2538 Log *log = DefaultEnv::GetLog();
2540 log->Warning( XRootDMsg,
"%s", sstrm.str().c_str() );
2542 log->Debug( XRootDMsg,
"%s", sstrm.str().c_str() );
2547 template<
typename T>
2548 Status XRootDMsgHandler::ReadFromBuffer(
char *&buffer,
size_t &buflen, T& result )
2550 if(
sizeof( T ) > buflen )
return Status( stError, errDataError );
2552 memcpy(&result, buffer,
sizeof(T));
2554 buffer +=
sizeof( T );
2555 buflen -=
sizeof( T );
2563 Status XRootDMsgHandler::ReadFromBuffer(
char *&buffer,
size_t &buflen, std::string &result )
2570 if( !( status = ReadFromBuffer( buffer, buflen, c ) ).IsOK() )
2583 Status XRootDMsgHandler::ReadFromBuffer(
char *&buffer,
size_t &buflen,
2584 size_t size, std::string &result )
2588 if( size > buflen )
return Status( stError, errDataError );
2590 result.append( buffer, size );
struct ClientFattrRequest fattr
#define kXR_collapseRedir
ServerResponseStatus status
union ServerResponse::@040373375333017131300127053271011057331004327334 body
struct ClientDirlistRequest dirlist
static const int kXR_ckpXeq
struct ClientOpenRequest open
struct ServerResponseBody_Status bdy
struct ClientRequestHdr header
union ServerResponseV2::@207342300141235315373173036347114307032363217365 info
struct ClientChkPointRequest chkpoint
struct ServerResponseHeader hdr
#define kXR_PROTOCOLVERSION
struct ClientStatRequest stat
struct ClientProtocolRequest protocol
struct ClientLocateRequest locate
void Get(Type &object)
Retrieve the object being held.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
static bool HasStatInfo(const char *data)
Returns true if data contain stat info.
bool GetInt(const std::string &key, int &value)
virtual void Run(void *arg)
The job logic.
HandleRspJob(XrdCl::XRootDMsgHandler *handler)
Interface for a job to be run by the job manager.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
static void RewriteCGIAndPath(Message *msg, const URL::ParamsMap &newCgi, bool replace, const std::string &newPath)
Append cgi to the one already present in the message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
StreamEvent
Events that may have occurred to the stream.
@ Ready
The stream has become connected.
virtual XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port).
const std::string & GetPassword() const
Get the password.
std::map< std::string, std::string > ParamsMap
bool FromString(const std::string &url)
Parse a string and fill the URL fields.
void SetPassword(const std::string &password)
Set the password.
void SetParams(const std::string ¶ms)
Set params.
const std::string & GetUserName() const
Get the username.
std::string GetURL() const
Get the URL.
void SetProtocol(const std::string &protocol)
Set protocol.
const ParamsMap & GetParams() const
Get the URL params.
const std::string & GetProtocol() const
Get the protocol.
bool IsValid() const
Is the url valid.
void SetUserName(const std::string &userName)
Set the username.
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
Handle/Process/Forward XRootD messages.
virtual uint16_t InspectStatusRsp() override
virtual void OnStatusReady(const Message *message, XRootDStatus status) override
The requested action has been performed and the status is available.
const Message * GetRequest() const
Get the request pointer.
virtual uint16_t Examine(std::shared_ptr< Message > &msg) override
void WaitDone(time_t now)
virtual void Process() override
Process the message if it was "taken" by the examine action.
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead) override
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten) override
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status) override
virtual uint16_t GetSid() const override
virtual bool IsRaw() const override
Are we a raw writer or not?
const std::string & GetErrorMessage() const
Get error message.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint16_t errRedirectLimit
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errNotFound
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidResponse
const uint16_t errInvalidRedirectURL
Buffer BinaryDataInfo
Binary buffer.
const uint16_t errOperationInterrupted
const uint16_t suContinue
const int DefaultNotAuthorizedRetryLimit
const uint16_t errRedirect
const uint16_t errAuthFailed
const uint16_t errInvalidMessage
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version