69 friend class PgReadRetryHandler;
76 PgReadHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
77 XrdCl::ResponseHandler *userHandler,
78 uint64_t orgOffset ) :
79 stateHandler( stateHandler ),
80 userHandler( userHandler ),
81 orgOffset( orgOffset ),
91 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
92 XrdCl::AnyObject *response,
95 using namespace XrdCl;
97 std::unique_lock<std::mutex> lck( mtx );
105 if( !status->
IsOK() )
122 userHandler->HandleResponseWithHosts( st.release(), resp.release(), hosts.release() );
125 userHandler->HandleResponseWithHosts( st.release(), 0, 0 );
136 if( !status->
IsOK() )
141 userHandler->HandleResponseWithHosts( status, response, hostList );
153 response->
Get( pginf );
157 std::vector<uint32_t> &cksums = pginf->
GetCksums();
158 char *buffer =
reinterpret_cast<char*
>( pginf->
GetBuffer() );
161 if( pgsize > bytesRead ) pgsize = bytesRead;
163 for(
size_t pgnb = 0; pgnb < nbpages; ++pgnb )
166 if( crcval != cksums[pgnb] )
168 Log *log = DefaultEnv::GetLog();
169 log->
Info( FileMsg,
"[%p@%s] Received corrupted page, will retry page #%zu.",
170 (
void*)
this, stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
185 if( pgsize > bytesRead ) pgsize = bytesRead;
194 userHandler->HandleResponseWithHosts( status, response, hostList );
203 resp.reset( response );
204 hosts.reset( hostList );
208 void UpdateCksum(
size_t pgnb, uint32_t crcval )
212 XrdCl::PageInfo *pginf = 0;
220 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
221 XrdCl::ResponseHandler *userHandler;
224 std::unique_ptr<XrdCl::AnyObject> resp;
225 std::unique_ptr<XrdCl::HostList> hosts;
226 std::unique_ptr<XrdCl::XRootDStatus> st;
242 PgReadRetryHandler( PgReadHandler *pgReadHandler,
size_t pgnb ) : pgReadHandler( pgReadHandler ),
251 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
252 XrdCl::AnyObject *response,
255 using namespace XrdCl;
257 if( !status->
IsOK() )
259 Log *log = DefaultEnv::GetLog();
260 log->
Info( FileMsg,
"[%p@%s] Failed to recover page #%zu.",
261 (
void*)
this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
262 pgReadHandler->HandleResponseWithHosts( status, response, hostList );
267 XrdCl::PageInfo *pginf = 0;
268 response->
Get( pginf );
271 Log *log = DefaultEnv::GetLog();
272 log->
Info( FileMsg,
"[%p@%s] Failed to recover page #%zu.",
273 (
void*)
this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
275 DeleteArgs( status, response, hostList );
276 pgReadHandler->HandleResponseWithHosts(
new XRootDStatus( stError, errDataError ), 0, 0 );
282 if( crcval != pginf->
GetCksums().front() )
284 Log *log = DefaultEnv::GetLog();
285 log->
Info( FileMsg,
"[%p@%s] Failed to recover page #%zu.",
286 (
void*)
this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
287 DeleteArgs( status, response, hostList );
288 pgReadHandler->HandleResponseWithHosts(
new XRootDStatus( stError, errDataError ), 0, 0 );
293 Log *log = DefaultEnv::GetLog();
294 log->
Info( FileMsg,
"[%p@%s] Successfully recovered page #%zu.",
295 (
void*)
this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
297 DeleteArgs( 0, response, hostList );
298 pgReadHandler->UpdateCksum( pgnb, crcval );
299 pgReadHandler->HandleResponseWithHosts( status, 0, 0 );
305 inline void DeleteArgs( XrdCl::XRootDStatus *status,
306 XrdCl::AnyObject *response,
314 PgReadHandler *pgReadHandler;
328 PgReadSubstitutionHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
329 XrdCl::ResponseHandler *userHandler ) :
330 stateHandler( stateHandler ),
331 userHandler( userHandler )
338 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
339 XrdCl::AnyObject *rdresp,
342 if( !status->
IsOK() )
344 userHandler->HandleResponseWithHosts( status, rdresp, hostList );
349 using namespace XrdCl;
351 ChunkInfo *chunk = 0;
352 rdresp->
Get( chunk );
354 std::vector<uint32_t> cksums;
355 if( stateHandler->pIsChannelEncrypted )
360 cksums.reserve( nbpages );
362 size_t size = chunk->
length;
363 char *buffer =
reinterpret_cast<char*
>( chunk->
buffer );
365 for(
size_t pg = 0; pg < nbpages; ++pg )
368 if( pgsize > size ) pgsize = size;
370 cksums.push_back( crcval );
376 PageInfo *pages =
new PageInfo( chunk->
offset, chunk->
length,
377 chunk->
buffer, std::move( cksums ) );
379 AnyObject *response =
new AnyObject();
380 response->
Set( pages );
381 userHandler->HandleResponseWithHosts( status, response, hostList );
388 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
389 XrdCl::ResponseHandler *userHandler;
402 OpenHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
403 XrdCl::ResponseHandler *userHandler ):
404 pStateHandler( stateHandler ),
405 pUserHandler( userHandler )
412 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
413 XrdCl::AnyObject *response,
416 using namespace XrdCl;
421 OpenInfo *openInfo = 0;
423 response->
Get( openInfo );
429 if( status->
code == errRedirect )
432 EcHandler *ecHandler =
GetEcHandler( hostList->front().url, ecurl );
433 if( ecHandler && pStateHandler->NeedFileTempl() )
436 status =
new XRootDStatus( stError, errNotSupported, 0,
437 "File template not supported with Ec" );
443 pStateHandler->pPlugin = ecHandler;
444 ecHandler->
Open( pStateHandler->pOpenFlags, pUserHandler, 0 );
452 pStateHandler->OnOpen( status, openInfo, hostList );
455 pUserHandler->HandleResponseWithHosts( status, 0, hostList );
465 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
466 XrdCl::ResponseHandler *pUserHandler;
479 CloseHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
480 XrdCl::ResponseHandler *userHandler,
481 XrdCl::Message *message ):
482 pStateHandler( stateHandler ),
483 pUserHandler( userHandler ),
491 virtual ~CloseHandler()
499 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
500 XrdCl::AnyObject *response,
503 pStateHandler->OnClose( status );
505 pUserHandler->HandleResponseWithHosts( status, response, hostList );
517 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
518 XrdCl::ResponseHandler *pUserHandler;
519 XrdCl::Message *pMessage;
531 StatefulHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
532 XrdCl::ResponseHandler *userHandler,
533 XrdCl::Message *message,
534 const XrdCl::MessageSendParams &sendParams ):
535 pStateHandler( stateHandler ),
536 pUserHandler( userHandler ),
538 pSendParams( sendParams )
545 virtual ~StatefulHandler()
548 delete pSendParams.chunkList;
549 delete pSendParams.kbuff;
555 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
556 XrdCl::AnyObject *response,
559 using namespace XrdCl;
560 std::unique_ptr<AnyObject> responsePtr( response );
561 pSendParams.hostList = hostList;
566 if( !status->
IsOK() )
575 responsePtr.release();
578 pUserHandler->HandleResponseWithHosts( status, response, hostList );
591 XrdCl::ResponseHandler *GetUserHandler()
597 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
598 XrdCl::ResponseHandler *pUserHandler;
599 XrdCl::Message *pMessage;
600 XrdCl::MessageSendParams pSendParams;
613 ReleaseBufferHandler( XrdCl::Buffer &&buffer, XrdCl::ResponseHandler *handler ) :
614 buffer( std::move( buffer ) ),
622 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
623 XrdCl::AnyObject *response,
627 handler->HandleResponseWithHosts( status, response, hostList );
633 XrdCl::Buffer& GetBuffer()
639 XrdCl::Buffer buffer;
640 XrdCl::ResponseHandler *handler;
656 pWrtRecoveryRedir( 0 ),
661 pDoRecoverRead( true ),
662 pDoRecoverWrite( true ),
663 pFollowRedirects( true ),
664 pUseVirtRedirector( true ),
665 pIsChannelEncrypted( false ),
666 pAllowBundledClose( false ),
669 pFileHandle =
new uint8_t[4];
670 ResetMonitoringVars();
689 pWrtRecoveryRedir( 0 ),
694 pDoRecoverRead( true ),
695 pDoRecoverWrite( true ),
696 pFollowRedirects( true ),
697 pUseVirtRedirector( useVirtRedirector ),
698 pAllowBundledClose( false ),
701 pFileHandle =
new uint8_t[4];
702 ResetMonitoringVars();
733 ResetMonitoringVars();
738 if(
DefaultEnv::GetLog() && pUseVirtRedirector && pFileUrl && pFileUrl->IsMetalink() )
747 delete pLoadBalancer;
748 delete [] pFileHandle;
749 delete pLFileHandler;
756 std::shared_ptr<FileStateHandler> &self,
758 const std::string &url,
773 return OpenImpl( self, url, flags, mode, handler, timeout );
780 const std::string &url,
786 self->pTemplateFileWp.reset();
787 return OpenImpl( self, url, flags, mode, handler, timeout );
793 XRootDStatus FileStateHandler::OpenImpl( std::shared_ptr<FileStateHandler> &self,
794 const std::string &url,
805 if( self->pFileState ==
Error )
806 return self->pStatus;
824 if( self->pUseVirtRedirector && self->pFileUrl->IsMetalink() )
827 registry.
Release( *self->pFileUrl );
829 delete self->pFileUrl;
833 self->pFileUrl =
new URL( url );
841 char requuid[37]= {0};
842 uuid_generate( uuid );
843 uuid_unparse( uuid, requuid );
844 cgi[
"xrdcl.requuid"] = requuid;
845 self->pFileUrl->SetParams( cgi );
847 if( !self->pFileUrl->IsValid() )
849 log->
Error(
FileMsg,
"[%p@%s] Trying to open invalid url: %s",
850 (
void*)self.get(), self->pFileUrl->GetPath().c_str(), url.c_str() );
852 self->pFileState =
Closed;
853 return self->pStatus;
860 URL::ParamsMap::const_iterator it;
861 it = urlParams.find(
"xrdcl.recover-reads" );
862 if( (it != urlParams.end() && it->second ==
"false") ||
863 !self->pDoRecoverRead )
865 self->pDoRecoverRead =
false;
866 log->
Debug(
FileMsg,
"[%p@%s] Read recovery procedures are disabled",
867 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
870 it = urlParams.find(
"xrdcl.recover-writes" );
871 if( (it != urlParams.end() && it->second ==
"false") ||
872 !self->pDoRecoverWrite )
874 self->pDoRecoverWrite =
false;
875 log->
Debug(
FileMsg,
"[%p@%s] Write recovery procedures are disabled",
876 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
882 log->
Debug(
FileMsg,
"[%p@%s] Sending an open command", (
void*)self.get(),
883 self->pFileUrl->GetObfuscatedURL().c_str() );
885 self->pOpenMode = mode;
886 self->pOpenFlags = flags;
890 ClientOpenRequest *req;
891 std::string path = self->pFileUrl->GetPathWithFilteredParams();
897 req->
dlen = path.length();
899 XRootDStatus st = FillFhTempl( self, *self->pFileUrl, msg, sendUrl );
904 self->pFileState =
Closed;
907 msg->
Append( path.c_str(), path.length(), 24 );
910 MessageSendParams params; params.
timeout = timeout;
914 st = self->IssueRequest( sendUrl, msg, openHandler, params );
920 self->pFileState =
Closed;
938 if( self->pFileState ==
Error )
939 return self->pStatus;
944 if( self->pFileState ==
Closed )
950 if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
956 log->
Debug(
FileMsg,
"[%p@%s] Sending a close command for handle %#x to %s",
957 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
958 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
968 memcpy( req->
fhandle, self->pFileHandle, 4 );
972 CloseHandler *closeHandler =
new CloseHandler( self, handler, msg );
979 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, closeHandler, params );
989 self->pFileState =
Closed;
998 self->pFileState =
Error;
1014 if( self->pFileState ==
Error )
return self->pStatus;
1032 log->
Debug(
FileMsg,
"[%p@%s] Sending a stat command for handle %#x to %s",
1033 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1034 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1043 std::string path = self->pFileUrl->GetPath();
1047 memcpy( req->
fhandle, self->pFileHandle, 4 );
1056 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1058 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1074 if( self->pFileState ==
Error )
return self->pStatus;
1080 log->
Debug(
FileMsg,
"[%p@%s] Sending an read+preread command for handle %#x to %s",
1081 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1082 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1094 memcpy( req->
fhandle, self->pFileHandle, 4 );
1097 static char dummyBuff[8];
1099 list->push_back(
ChunkInfo( 0, 0, dummyBuff ) );
1105 for(
size_t i = 0; i < tracts.size(); ++i )
1107 dataTract[i].
rlen = tracts[i].length;
1108 dataTract[i].
offset = tracts[i].offset;
1109 memcpy( dataTract[i].fhandle, req->
fhandle, 4 );
1123 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1125 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1140 if( self->pFileState ==
Error )
return self->pStatus;
1146 log->
Debug(
FileMsg,
"[%p@%s] Sending a read command for handle %#x to %s",
1147 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1148 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1157 memcpy( req->
fhandle, self->pFileHandle, 4 );
1160 list->push_back(
ChunkInfo( offset, size, buffer ) );
1169 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1171 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1184 int issupported =
true;
1197 issupported =
false;
1202 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
1204 auto st =
Read( self, offset, size, buffer, substitHandler, timeout );
1205 if( !st.
IsOK() )
delete substitHandler;
1211 if( !st.
IsOK() )
delete pgHandler;
1225 "PgRead retry size exceeded 4KB." );
1229 if( !st.
IsOK() )
delete retryHandler;
1243 if( self->pFileState ==
Error )
return self->pStatus;
1249 log->
Debug(
FileMsg,
"[%p@%s] Sending a pgread command for handle %#x to %s",
1250 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1251 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1260 memcpy( req->
fhandle, self->pFileHandle, 4 );
1273 list->push_back(
ChunkInfo( offset, size, buffer ) );
1282 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1284 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1299 if( self->pFileState ==
Error )
return self->pStatus;
1305 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
1306 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1307 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1316 memcpy( req->
fhandle, self->pFileHandle, 4 );
1319 list->push_back(
ChunkInfo( 0, size, (
char*)buffer ) );
1330 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1332 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1351 log->
Info(
FileMsg,
"[%p@%s] Buffer for handle %#x is not page aligned (4KB), "
1352 "cannot convert it to kernel space buffer.", (
void*)self.get(),
1353 self->pFileUrl->GetObfuscatedURL().c_str(), *((uint32_t*)self->pFileHandle) );
1355 void *buff = buffer.GetBuffer();
1356 uint32_t size = buffer.GetSize();
1357 ReleaseBufferHandler *wrtHandler =
1358 new ReleaseBufferHandler( std::move( buffer ), handler );
1359 XRootDStatus st = self->Write( self, offset, size, buff, wrtHandler, timeout );
1362 buffer = std::move( wrtHandler->GetBuffer() );
1371 uint32_t length = buffer.GetSize();
1372 char *ubuff = buffer.Release();
1382 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1400 ssize_t ret = fdoff ?
XrdSys::Read( fd, *kbuff, size, *fdoff ) :
1408 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1418 std::vector<uint32_t> &cksums,
1435 if( cksums.empty() )
1437 const char *data =
static_cast<const char*
>( buffer );
1443 if( crc32cCnt != cksums.size() )
1466 static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
1468 if( pgoff == offset )
return 0;
1474 if( !status ) status = s;
1481 auto pgwrt = std::make_shared<pgwrt_t>( handler );
1485 uint32_t fstpglen = fLen;
1487 time_t start = ::time(
nullptr );
1490 std::unique_ptr<AnyObject> scoped( r );
1495 pgwrt->SetStatus( s );
1504 pgwrt->SetStatus( s );
1509 time_t elapsed = ::time(
nullptr ) - start;
1510 if( elapsed >= timeout )
1515 else timeout -= elapsed;
1517 for(
size_t i = 0; i < inf->
Size(); ++i )
1519 auto tpl = inf->
At( i );
1520 uint64_t pgoff = std::get<0>( tpl );
1521 uint32_t pglen = std::get<1>( tpl );
1522 const void *pgbuf =
static_cast<const char*
>( buffer ) + ( pgoff - offset );
1523 uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
1526 std::unique_ptr<AnyObject> scoped( r );
1530 pgwrt->SetStatus( s );
1540 "page: pgoff=%llu, pglen=%u, pgdigest=%u", (
void*)self.get(),
1541 self->pFileUrl->GetObfuscatedURL().c_str(), (
unsigned long long) pgoff, pglen, pgdigest );
1543 "Failed to retransmit corrupted page" ) );
1547 "page: pgoff=%llu, pglen=%u, pgdigest=%u", (
void*)self.get(),
1548 self->pFileUrl->GetObfuscatedURL().c_str(), (
unsigned long long) pgoff, pglen, pgdigest );
1550 auto st =
PgWriteRetry( self, pgoff, pglen, pgbuf, pgdigest, h, timeout );
1551 if( !st.IsOK() ) pgwrt->SetStatus(
new XRootDStatus( st ) );
1553 "pgoff=%llu, pglen=%u, pgdigest=%u", (
void*)self.get(),
1554 self->pFileUrl->GetObfuscatedURL().c_str(), (
unsigned long long) pgoff, pglen, pgdigest );
1558 auto st =
PgWriteImpl( self, offset, size, buffer, cksums, 0, h, timeout );
1561 pgwrt->handler =
nullptr;
1578 std::vector<uint32_t> cksums{ digest };
1589 std::vector<uint32_t> &cksums,
1596 if( self->pFileState ==
Error )
return self->pStatus;
1602 log->
Debug(
FileMsg,
"[%p@%s] Sending a pgwrite command for handle %#x to %s",
1603 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1604 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1615 req->
dlen = size + cksums.size() *
sizeof( uint32_t );
1617 memcpy( req->
fhandle, self->pFileHandle, 4 );
1620 list->push_back(
ChunkInfo( offset, size, (
char*)buffer ) );
1632 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1634 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1646 if( self->pFileState ==
Error )
return self->pStatus;
1652 log->
Debug(
FileMsg,
"[%p@%s] Sending a sync command for handle %#x to %s",
1653 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1654 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1661 memcpy( req->
fhandle, self->pFileHandle, 4 );
1670 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1672 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1685 if( self->pFileState ==
Error )
return self->pStatus;
1691 log->
Debug(
FileMsg,
"[%p@%s] Sending a truncate command for handle %#x to %s",
1692 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1693 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1700 memcpy( req->
fhandle, self->pFileHandle, 4 );
1710 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1712 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1729 if( self->pFileState ==
Error )
return self->pStatus;
1735 log->
Debug(
FileMsg,
"[%p@%s] Sending a vector read command for handle %#x to %s",
1736 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1737 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1750 char *cursor = (
char*)buffer;
1756 for(
size_t i = 0; i < chunks.size(); ++i )
1758 dataChunk[i].
rlen = chunks[i].length;
1759 dataChunk[i].
offset = chunks[i].offset;
1760 memcpy( dataChunk[i].fhandle, self->pFileHandle, 4 );
1765 chunkBuffer = cursor;
1766 cursor += chunks[i].length;
1769 chunkBuffer = chunks[i].buffer;
1771 list->push_back(
ChunkInfo( chunks[i].offset,
1787 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1789 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1805 if( self->pFileState ==
Error )
return self->pStatus;
1811 log->
Debug(
FileMsg,
"[%p@%s] Sending a vector write command for handle %#x to %s",
1812 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1813 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1842 for(
size_t i = 0; i < chunks.size(); ++i )
1844 writeList[i].
wlen = chunks[i].length;
1845 writeList[i].
offset = chunks[i].offset;
1846 memcpy( writeList[i].fhandle, self->pFileHandle, 4 );
1848 list->push_back(
ChunkInfo( chunks[i].offset,
1850 chunks[i].buffer ) );
1864 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1866 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1874 const struct iovec *
iov,
1881 if( self->pFileState ==
Error )
return self->pStatus;
1887 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
1888 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1889 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1898 for(
int i = 0; i < iovcnt; ++i )
1900 if(
iov[i].iov_len == 0 )
continue;
1901 size +=
iov[i].iov_len;
1903 (
char*)
iov[i].iov_base ) );
1909 memcpy( req->
fhandle, self->pFileHandle, 4 );
1920 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1922 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1937 if( self->pFileState ==
Error )
return self->pStatus;
1943 log->
Debug(
FileMsg,
"[%p@%s] Sending a read command for handle %#x to %s",
1944 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1945 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1952 size_t size = std::accumulate(
iov,
iov + iovcnt, 0, [](
size_t acc, iovec &rhs )
1954 return acc + rhs.iov_len;
1960 memcpy( req->
fhandle, self->pFileHandle, 4 );
1963 list->reserve( iovcnt );
1964 uint64_t choff = offset;
1965 for(
int i = 0; i < iovcnt; ++i )
1967 list->emplace_back( choff,
iov[i].iov_len,
iov[i].iov_base );
1968 choff +=
iov[i].iov_len;
1978 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1980 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1996 if( self->pFileState ==
Error )
return self->pStatus;
2002 log->
Debug(
FileMsg,
"[%p@%s] Sending a fcntl command for handle %#x to %s",
2003 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2004 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2013 memcpy( req->
fhandle, self->pFileHandle, 4 );
2023 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2025 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2037 if( self->pFileState ==
Error )
return self->pStatus;
2043 log->
Debug(
FileMsg,
"[%p@%s] Sending a visa command for handle %#x to %s",
2044 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2045 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2053 memcpy( req->
fhandle, self->pFileHandle, 4 );
2062 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2064 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2071 const std::vector<xattr_t> &attrs,
2077 if( self->pFileState ==
Error )
return self->pStatus;
2083 log->
Debug(
FileMsg,
"[%p@%s] Sending a fattr set command for handle %#x to %s",
2084 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2085 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2090 return XAttrOperationImpl( self,
kXR_fattrSet, 0, attrs, handler, timeout );
2097 const std::vector<std::string> &attrs,
2103 if( self->pFileState ==
Error )
return self->pStatus;
2109 log->
Debug(
FileMsg,
"[%p@%s] Sending a fattr get command for handle %#x to %s",
2110 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2111 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2116 return XAttrOperationImpl( self,
kXR_fattrGet, 0, attrs, handler, timeout );
2123 const std::vector<std::string> &attrs,
2129 if( self->pFileState ==
Error )
return self->pStatus;
2135 log->
Debug(
FileMsg,
"[%p@%s] Sending a fattr del command for handle %#x to %s",
2136 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2137 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2142 return XAttrOperationImpl( self,
kXR_fattrDel, 0, attrs, handler, timeout );
2154 if( self->pFileState ==
Error )
return self->pStatus;
2160 log->
Debug(
FileMsg,
"[%p@%s] Sending a fattr list command for handle %#x to %s",
2161 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2162 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2167 static const std::vector<std::string> nothing;
2169 nothing, handler, timeout );
2190 if( self->pFileState ==
Error )
return self->pStatus;
2196 log->
Debug(
FileMsg,
"[%p@%s] Sending a checkpoint command for handle %#x to %s",
2197 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2198 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2206 memcpy( req->
fhandle, self->pFileHandle, 4 );
2216 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2218 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2241 if( self->pFileState ==
Error )
return self->pStatus;
2247 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
2248 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2249 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2258 memcpy( req->
fhandle, self->pFileHandle, 4 );
2263 wrtreq->
dlen = size;
2264 memcpy( wrtreq->
fhandle, self->pFileHandle, 4 );
2267 list->push_back(
ChunkInfo( 0, size, (
char*)buffer ) );
2278 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2280 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2296 const struct iovec *
iov,
2303 if( self->pFileState ==
Error )
return self->pStatus;
2309 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
2310 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2311 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2320 memcpy( req->
fhandle, self->pFileHandle, 4 );
2324 for(
int i = 0; i < iovcnt; ++i )
2326 if(
iov[i].iov_len == 0 )
continue;
2327 size +=
iov[i].iov_len;
2329 (
char*)
iov[i].iov_base ) );
2335 wrtreq->
dlen = size;
2336 memcpy( wrtreq->
fhandle, self->pFileHandle, 4 );
2347 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2349 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2368 const std::string &value )
2371 if( name ==
"ReadRecovery" )
2373 if( value ==
"true" ) pDoRecoverRead =
true;
2374 else pDoRecoverRead =
false;
2377 else if( name ==
"WriteRecovery" )
2379 if( value ==
"true" ) pDoRecoverWrite =
true;
2380 else pDoRecoverWrite =
false;
2383 else if( name ==
"FollowRedirects" )
2385 if( value ==
"true" ) pFollowRedirects =
true;
2386 else pFollowRedirects =
false;
2389 else if( name ==
"BundledClose" )
2391 if( value ==
"true" ) pAllowBundledClose =
true;
2392 else pAllowBundledClose =
false;
2402 std::string &value )
const
2405 if( name ==
"ReadRecovery" )
2407 if( pDoRecoverRead ) value =
"true";
2408 else value =
"false";
2411 else if( name ==
"WriteRecovery" )
2413 if( pDoRecoverWrite ) value =
"true";
2414 else value =
"false";
2417 else if( name ==
"FollowRedirects" )
2419 if( pFollowRedirects ) value =
"true";
2420 else value =
"false";
2423 else if( name ==
"DataServer" && pDataServer )
2424 { value = pDataServer->GetHostId();
return true; }
2425 else if( name ==
"LastURL" && pDataServer )
2426 { value = pDataServer->GetURL();
return true; }
2427 else if( name ==
"WrtRecoveryRedir" && pWrtRecoveryRedir )
2428 { value = pWrtRecoveryRedir->GetHostId();
return true; }
2446 std::string lastServer = pFileUrl->GetHostId();
2450 delete pLoadBalancer;
2452 delete pWrtRecoveryRedir;
2453 pWrtRecoveryRedir = 0;
2455 pDataServer =
new URL( hostList->back().url );
2456 pDataServer->SetParams( pFileUrl->GetParams() );
2457 if( !( pUseVirtRedirector && pFileUrl->IsMetalink() ) ) pDataServer->SetPath( pFileUrl->GetPath() );
2458 lastServer = pDataServer->GetHostId();
2459 HostList::const_iterator itC;
2461 for( itC = hostList->begin(); itC != hostList->end(); ++itC )
2464 itC->url.GetParams(),
2467 pDataServer->SetParams( params );
2469 HostList::const_reverse_iterator it;
2470 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2471 if( it->loadBalancer )
2473 pLoadBalancer =
new URL( it->url );
2477 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2480 pWrtRecoveryRedir =
new URL( it->url );
2485 log->
Debug(
FileMsg,
"[%p@%s] Open has returned with status %s",
2486 (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(), status->
ToStr().c_str() );
2488 if( pDataServer && !pDataServer->IsLocalFile() )
2499 isencobj.
Get( isenc );
2500 pIsChannelEncrypted = isenc ? *isenc :
false;
2509 if( !pStatus.IsOK() || !openInfo )
2511 log->
Debug(
FileMsg,
"[%p@%s] Error while opening at %s: %s",
2512 (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(), lastServer.c_str(),
2513 pStatus.ToStr().c_str() );
2514 FailQueuedMessages( pStatus );
2551 log->
Debug(
FileMsg,
"[%p@%s] successfully opened at %s, handle: %#x, "
2552 "session id: %llu", (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(),
2553 pDataServer->GetHostId().c_str(), *((uint32_t*)pFileHandle),
2554 (
unsigned long long) pSessionId );
2559 gettimeofday( &pOpenTime, 0 );
2568 i.
fSize = pStatInfo ? pStatInfo->GetSize() : 0;
2575 ReSendQueuedMessages();
2588 log->
Debug(
FileMsg,
"[%p@%s] Close returned from %s with: %s", (
void*)
this,
2589 pFileUrl->GetObfuscatedURL().c_str(), pDataServer->GetHostId().c_str(),
2590 status->
ToStr().c_str() );
2592 log->
Dump(
FileMsg,
"[%p@%s] Items in the fly %zu, queued for recovery %zu",
2593 (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(), pInTheFly.size(), pToBeRecovered.size() );
2595 MonitorClose( status );
2596 ResetMonitoringVars();
2616 static const std::string root =
"root", xroot =
"xroot", file =
"file",
2617 roots =
"roots", xroots =
"xroots";
2619 if( !msg.compare( 0, root.size(), root ) ||
2620 !msg.compare( 0, xroot.size(), xroot ) ||
2621 !msg.compare( 0, file.size(), file ) ||
2622 !msg.compare( 0, roots.size(), roots ) ||
2623 !msg.compare( 0, xroots.size(), xroots ) )
2635 self->pInTheFly.erase( message );
2637 log->
Dump(
FileMsg,
"[%p@%s] File state error encountered. Message %s "
2638 "returned with %s", (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2648 i.
file = self->pFileUrl;
2670 if( !self->IsRecoverable( *status ) || sendParams.
kbuff )
2672 log->
Error(
FileMsg,
"[%p@%s] Fatal file state error. Message %s "
2673 "returned with %s", (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2676 self->FailMessage( RequestData( message, userHandler, sendParams ), *status );
2685 self->pCloseReason = *status;
2686 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2694 const std::string &redirectUrl,
2700 self->pInTheFly.erase( message );
2706 if( !self->pStateRedirect )
2708 std::ostringstream o;
2709 self->pStateRedirect =
new URL( redirectUrl );
2712 self->pStateRedirect->GetParams(),
2714 self->pFileUrl->SetParams( params );
2717 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2732 log->
Dump(
FileMsg,
"[%p@%s] Got state response for message %s",
2733 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2740 self->pInTheFly.erase( message );
2741 RunRecovery( self );
2756 response->
Get( info );
2757 delete self->pStatInfo;
2758 self->pStatInfo =
new StatInfo( *info );
2790 for(
size_t i = 0; i < segs; ++i )
2791 self->pVRBytes += dataChunk[i].
rlen;
2792 self->pVSegs += segs;
2825 for(
size_t i = 0; i < size; ++i )
2826 self->pVWBytes += wrtList[i].
wlen;
2837 if (pMutex.CondLock())
2848 if( !pToBeRecovered.empty() )
2851 log->
Dump(
FileMsg,
"[%p@%s] Got a timer event", (
void*)
this,
2852 pFileUrl->GetObfuscatedURL().c_str() );
2853 RequestList::iterator it;
2855 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); )
2857 if( it->params.expires <= now )
2862 0, it->params.hostList ) );
2863 it = pToBeRecovered.erase( it );
2881 if( (IsReadOnly() && pDoRecoverRead) ||
2882 (!IsReadOnly() && pDoRecoverWrite) )
2884 log->
Debug(
FileMsg,
"[%p@%s] Putting the file in recovery state in "
2885 "process %d", (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(), getpid() );
2888 pToBeRecovered.clear();
2901 if( self->pFileState !=
Opened || !self->pLoadBalancer )
2907 log->
Debug(
FileMsg,
"[%p@%s] Reopen file at next data server.",
2908 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
2911 auto lbcgi = self->pLoadBalancer->GetParams();
2912 auto dtcgi = self->pDataServer->GetParams();
2915 auto itr = lbcgi.find(
"tried" );
2916 if( itr == lbcgi.end() )
2917 lbcgi[
"tried"] = self->pDataServer->GetHostName();
2920 std::string tried = itr->second;
2921 tried +=
"," + self->pDataServer->GetHostName();
2922 lbcgi[
"tried"] = tried;
2924 self->pLoadBalancer->SetParams( lbcgi );
2926 return ReOpenFileAtServer( self, *self->pLoadBalancer, timeout );
2932 template<
typename T>
2933 Status FileStateHandler::XAttrOperationImpl( std::shared_ptr<FileStateHandler> &self,
2936 const std::vector<T> &attrs,
2951 memcpy( req->
fhandle, self->pFileHandle, 4 );
2953 if( !st.
IsOK() )
return st;
2962 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2964 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2970 Status FileStateHandler::SendOrQueue( std::shared_ptr<FileStateHandler> &self,
2981 return RecoverMessage( self, RequestData( msg, handler, sendParams ),
false );
2987 if( self->pFileState ==
Opened )
2990 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, handler, sendParams );
2997 return RecoverMessage( self, RequestData( msg, handler, sendParams ),
false );
3000 self->pInTheFly.insert(msg);
3011 bool FileStateHandler::IsRecoverable(
const XRootDStatus &status )
const
3013 const auto recoverable_errors = {
3022 if (pDoRecoverRead || pDoRecoverWrite)
3023 for (
const auto error : recoverable_errors)
3024 if (status.
code == error)
3025 return IsReadOnly() ? pDoRecoverRead : pDoRecoverWrite;
3033 bool FileStateHandler::IsReadOnly()
const
3046 Status FileStateHandler::RecoverMessage( std::shared_ptr<FileStateHandler> &self,
3048 bool callbackOnFailure )
3053 log->
Dump(
FileMsg,
"[%p@%s] Putting message %s in the recovery list",
3054 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3055 rd.request->GetObfuscatedDescription().c_str() );
3057 Status st = RunRecovery( self );
3060 self->pToBeRecovered.push_back( rd );
3064 if( callbackOnFailure )
3065 self->FailMessage( rd, st );
3073 Status FileStateHandler::RunRecovery( std::shared_ptr<FileStateHandler> &self )
3078 if( !self->pInTheFly.empty() )
3082 log->
Debug(
FileMsg,
"[%p@%s] Running the recovery procedure", (
void*)self.get(),
3083 self->pFileUrl->GetObfuscatedURL().c_str() );
3086 if( self->pStateRedirect )
3088 SendClose( self, 0 );
3089 st = ReOpenFileAtServer( self, *self->pStateRedirect, 0 );
3090 delete self->pStateRedirect; self->pStateRedirect = 0;
3092 else if( self->IsReadOnly() && self->pLoadBalancer )
3093 st = ReOpenFileAtServer( self, *self->pLoadBalancer, 0 );
3095 st = ReOpenFileAtServer( self, *self->pDataServer, 0 );
3099 self->pFileState =
Error;
3101 self->FailQueuedMessages( st );
3110 XRootDStatus FileStateHandler::SendClose( std::shared_ptr<FileStateHandler> &self,
3114 ClientCloseRequest *req;
3118 memcpy( req->
fhandle, self->pFileHandle, 4 );
3123 [self]( XRootDStatus&, AnyObject& )
mutable { self.reset(); } );
3124 MessageSendParams params;
3126 params.followRedirects =
false;
3127 params.stateful =
true;
3131 return self->IssueRequest( *self->pDataServer, msg, handler, params );
3137 XRootDStatus FileStateHandler::ReOpenFileAtServer( std::shared_ptr<FileStateHandler> &self,
3142 log->
Dump(
FileMsg,
"[%p@%s] Sending a recovery open command to %s",
3143 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(), url.
GetObfuscatedURL().c_str() );
3152 self->pOpenFlags &= ~OpenFlags::Delete;
3156 self->pOpenFlags &= ~OpenFlags::New;
3159 ClientOpenRequest *req;
3163 u.
SetPath( self->pFileUrl->GetPath() );
3169 req->
mode = self->pOpenMode;
3170 req->
options = (self->pOpenFlags & 0xffff);
3171 req->
dlen = path.length();
3173 XRootDStatus st = FillFhTempl( self, url, msg, sendUrl );
3177 self->pFileState =
Closed;
3180 msg->
Append( path.c_str(), path.length(), 24 );
3186 MessageSendParams params; params.
timeout = timeout;
3193 st = self->IssueRequest( sendUrl, msg, openHandler, params );
3200 self->pFileState =
Closed;
3208 void FileStateHandler::FailMessage( RequestData rd,
XRootDStatus status )
3211 log->
Dump(
FileMsg,
"[%p@%s] Failing message %s with %s",
3212 (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(),
3213 rd.request->GetObfuscatedDescription().c_str(),
3214 status.
ToStr().c_str() );
3216 StatefulHandler *sh =
dynamic_cast<StatefulHandler*
>(rd.handler);
3220 log->
Error(
FileMsg,
"[%p@%s] Internal error while recovering %s",
3221 (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(),
3222 rd.request->GetObfuscatedDescription().c_str() );
3227 ResponseHandler *userHandler = sh->GetUserHandler();
3230 new XRootDStatus( status ),
3231 0, rd.params.hostList ) );
3239 void FileStateHandler::FailQueuedMessages(
XRootDStatus status )
3241 RequestList::iterator it;
3242 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3243 FailMessage( *it, status );
3244 pToBeRecovered.clear();
3250 void FileStateHandler::ReSendQueuedMessages()
3252 RequestList::iterator it;
3253 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3255 it->request->SetSessionId( pSessionId );
3256 ReWriteFileHandle( it->request );
3257 XRootDStatus st = IssueRequest( *pDataServer, it->request,
3258 it->handler, it->params );
3260 FailMessage( *it, st );
3262 pToBeRecovered.clear();
3268 void FileStateHandler::ReWriteFileHandle(
Message *msg )
3270 ClientRequestHdr *hdr = (ClientRequestHdr*)msg->
GetBuffer();
3275 ClientReadRequest *req = (ClientReadRequest*)msg->
GetBuffer();
3276 memcpy( req->
fhandle, pFileHandle, 4 );
3281 ClientWriteRequest *req = (ClientWriteRequest*)msg->
GetBuffer();
3282 memcpy( req->
fhandle, pFileHandle, 4 );
3287 ClientSyncRequest *req = (ClientSyncRequest*)msg->
GetBuffer();
3288 memcpy( req->
fhandle, pFileHandle, 4 );
3293 ClientTruncateRequest *req = (ClientTruncateRequest*)msg->
GetBuffer();
3294 memcpy( req->
fhandle, pFileHandle, 4 );
3299 ClientReadVRequest *req = (ClientReadVRequest*)msg->
GetBuffer();
3300 readahead_list *dataChunk = (readahead_list*)msg->
GetBuffer( 24 );
3301 for(
size_t i = 0; i < req->
dlen/
sizeof(readahead_list); ++i )
3302 memcpy( dataChunk[i].fhandle, pFileHandle, 4 );
3307 ClientWriteVRequest *req =
3308 reinterpret_cast<ClientWriteVRequest*
>( msg->
GetBuffer() );
3309 XrdProto::write_list *wrtList =
3310 reinterpret_cast<XrdProto::write_list*
>( msg->
GetBuffer( 24 ) );
3311 size_t size = req->
dlen /
sizeof(XrdProto::write_list);
3312 for(
size_t i = 0; i < size; ++i )
3313 memcpy( wrtList[i].fhandle, pFileHandle, 4 );
3318 ClientPgReadRequest *req = (ClientPgReadRequest*) msg->
GetBuffer();
3319 memcpy( req->
fhandle, pFileHandle, 4 );
3324 ClientPgWriteRequest *req = (ClientPgWriteRequest*) msg->
GetBuffer();
3325 memcpy( req->
fhandle, pFileHandle, 4 );
3331 log->
Dump(
FileMsg,
"[%p@%s] Rewritten file handle for %s to %#x",
3333 *((uint32_t*)pFileHandle) );
3340 void FileStateHandler::MonitorClose(
const XRootDStatus *status )
3345 Monitor::CloseInfo i;
3348 gettimeofday( &i.
cTOD, 0 );
3370 sendParams, pLFileHandler );
3374 return pLFileHandler->ExecRequest( url, msg, handler, sendParams );
3378 sendParams, pLFileHandler );
3384 XRootDStatus FileStateHandler::WriteKernelBuffer( std::shared_ptr<FileStateHandler> &self,
3387 std::unique_ptr<XrdSys::KernelBuffer> kbuff,
3394 XrdSysMutexHelper scopedLock( self->pMutex );
3400 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
3401 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3402 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3405 ClientWriteRequest *req;
3411 memcpy( req->
fhandle, self->pFileHandle, 4 );
3413 MessageSendParams params;
3417 params.
kbuff = kbuff.release();
3423 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
3425 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3433 std::shared_ptr<FileStateHandler> &self,
3436 ClientOpenRequest *req = (ClientOpenRequest*)msg->
GetBuffer();
3439 if( !self->NeedFileTempl() )
3442 return XRootDStatus();
3445 using wp = std::weak_ptr<FileStateHandler>;
3446 if( !self->pTemplateFileWp.owner_before(wp{}) &&
3447 !wp{}.owner_before(self->pTemplateFileWp) )
3451 "File flags required a template file" );
3460 std::shared_ptr<FileStateHandler> tfp = self->pTemplateFileWp.lock();
3463 "Template file object does not exist" );
3465 XrdSysMutexHelper scopedLock( tfp->pMutex );
3467 if( tfp->pFileState !=
Opened )
3469 "Template file not open" );
3471 if (!tfp->pDataServer || !tfp->pFileHandle)
3473 "Template file not connected" );
3475 sendUrl.
SetHostPort( tfp->pDataServer->GetHostName(),tfp->pDataServer->GetPort() );
3476 sendUrl.
SetUserName( tfp->pDataServer->GetUserName() );
3483 return XRootDStatus();
3496 if( self->pFileState ==
Error )
return self->pStatus;
3505 log->
Debug(
FileMsg,
"[%p@%s] Sending a clone command for handle %#x to %s",
3506 self.get(), self->pFileUrl->GetURL().c_str(),
3507 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3518 memcpy( req->
fhandle, self->pFileHandle, 4 );
3526 "Template file not available" );
3531 "Template file invalid" );
3536 "Template file object does not exist" );
3539 if( tfp->pFileState !=
Opened )
3541 "Template file not open" );
3543 if( tfp->pSessionId != self->pSessionId )
3545 "Clone source not at same location as destination" );
3547 memcpy( cl[idx].srcFH, tfp->pFileHandle, 4 );
3548 cl[idx].
srcOffs = loc.srcOffs;
3549 cl[idx].
srcLen = loc.srcLen;
3550 cl[idx].
dstOffs = loc.dstOffs;
3560 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
3562 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
struct ClientPgReadRequest pgread
static const int kXR_ckpXeq
struct ClientPgWriteRequest pgwrite
struct ClientRequestHdr header
struct ClientReadRequest read
#define kXR_PROTPGRWVERSION
struct ClientWriteRequest write
static int mapError(int rc)
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
void Append(const char *buffer, uint32_t size)
Append data at the position pointed to by the append cursor.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
uint32_t GetSize() const
Get the size of the message.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static FileTimer * GetFileTimer()
Get file timer task.
static ForkHandler * GetForkHandler()
Get the fork handler.
static Env * GetEnv()
Get default client environment.
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, time_t timeout)
bool GetInt(const std::string &key, int &value)
An interface for file plug-ins.
std::weak_ptr< FileStateHandler > pTemplateFileWp
static XRootDStatus PgReadRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, size_t pgnb, void *buffer, PgReadHandler *handler, time_t timeout=0)
static XRootDStatus TryOtherServer(std::shared_ptr< FileStateHandler > &self, time_t timeout)
Try other data server.
static XRootDStatus Read(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus SetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< xattr_t > &attrs, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Visa(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
void AfterForkChild()
Called in the child process after the fork.
static XRootDStatus PgReadImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, uint16_t flags, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Clone(std::shared_ptr< FileStateHandler > &self, const CloneLocations &locs, ResponseHandler *handler, time_t timeout=0)
static void OnStateRedirection(std::shared_ptr< FileStateHandler > &self, const std::string &redirectUrl, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle stateful redirect.
void TimeOutRequests(time_t now)
Declare timeout on requests being recovered.
friend class ::PgReadRetryHandler
static XRootDStatus Fcntl(std::shared_ptr< FileStateHandler > &self, QueryCode::Code queryCode, const Buffer &arg, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ListXAttr(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
void Tick(time_t now)
Tick.
static XRootDStatus Truncate(std::shared_ptr< FileStateHandler > &self, uint64_t size, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Checkpoint(std::shared_ptr< FileStateHandler > &self, kXR_char code, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PgWrite(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, time_t timeout=0)
static void OnStateError(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle an error while sending a stateful message.
static XRootDStatus Stat(std::shared_ptr< FileStateHandler > &self, bool force, ResponseHandler *handler, time_t timeout=0)
FileStateHandler(FilePlugIn *&plugin)
Constructor.
friend class ::PgReadHandler
static XRootDStatus PgWriteImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, kXR_char flags, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ChkptWrt(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Write(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, time_t timeout=0)
@ OpenInProgress
Opening is in progress.
@ CloseInProgress
Closing operation is in progress.
@ Closed
The file is closed.
@ Opened
Opening has succeeded.
@ Error
Opening has failed.
@ Recovering
Recovering from an error.
static XRootDStatus PgWriteRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, uint32_t digest, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Close(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ReadV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
bool SetProperty(const std::string &name, const std::string &value)
static void OnStateResponse(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, AnyObject *response, HostList *hostList)
Handle stateful response.
void OnClose(const XRootDStatus *status)
Process the results of the closing operation.
static XRootDStatus DelXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, time_t timeout=0)
void OnOpen(const XRootDStatus *status, const OpenInfo *openInfo, const HostList *hostList)
Process the results of the opening operation.
static XRootDStatus Open(std::shared_ptr< FileStateHandler > &self, const std::string &url, OpenFlags::Flags flags, uint16_t mode, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PgRead(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus OpenUsingTemplate(std::shared_ptr< FileStateHandler > &self, ExportedFileTemplate *templ, const std::string &url, OpenFlags::Flags flags, uint16_t mode, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Sync(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PreRead(std::shared_ptr< FileStateHandler > &self, const TractList &tracts, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ChkptWrtV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus GetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus WriteV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
~FileStateHandler()
Destructor.
bool GetProperty(const std::string &name, std::string &value) const
static XRootDStatus VectorRead(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus VectorWrite(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, ResponseHandler *handler, time_t timeout=0)
friend class ::PgReadSubstitutionHandler
bool IsOpen() const
Check if the file is open.
friend class ::OpenHandler
void UnRegisterFileObject(FileStateHandler *file)
Un-register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file object.
void UnRegisterFileObject(FileStateHandler *file)
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Info(uint64_t topic, const char *format,...)
Print an info.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static Status CreateXAttrBody(Message *msg, const std::vector< T > &vec, const std::string &path="")
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
void SetSessionId(uint64_t sessionId)
Set the session ID which this message is meant for.
void SetVirtReqID(uint16_t virtReqID)
Set virtual request ID for the message.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
An abstract class to describe the client-side monitoring plugin interface.
@ EvClose
CloseInfo: File closed.
@ EvErrIO
ErrorInfo: An I/O error occurred.
@ EvOpen
OpenInfo: File opened.
virtual void Event(EventCode evCode, void *evData)=0
Information returned by file open operation.
void GetFileHandle(uint8_t *fileHandle) const
Get the file handle (4bytes).
const StatInfo * GetStatInfo() const
Get the stat info.
uint64_t GetSessionId() const
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
const std::string & GetPath() const
Get the path.
bool IsMetalink() const
Is it a URL to a metalink.
std::map< std::string, std::string > ParamsMap
void SetHostPort(const std::string &hostName, int port)
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
void SetPath(const std::string &path)
Set the path.
void SetUserName(const std::string &userName)
Set the username.
static bool HasKSameFS(const XrdCl::URL &url)
Check if given server supports kXR_clone and kXR_samefs.
static XrdCl::XRootDStatus GetProtocolVersion(const XrdCl::URL url, int &protver)
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static void SetDescription(Message *msg)
Get the description of a message.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static bool IsPageAligned(const void *ptr)
const uint16_t errSocketOptError
const uint16_t errTlsError
const uint16_t errOperationExpired
const uint16_t errPollerError
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInProgress
const uint16_t errSocketTimeout
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidOp
const uint16_t suAlreadyDone
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
std::vector< TractInfo > TractList
List of Tracts.
const uint16_t errInvalidArgs
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errConnectionError
const uint16_t errNotSupported
const uint16_t errSocketError
const uint16_t errOperationInterrupted
const uint16_t errInvalidSession
const uint16_t errRedirect
const uint16_t errSocketDisconnected
none object for initializing empty Optional
static const int PageSize
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
std::vector< CloneLocation > locations
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
uint64_t vwBytes
Total number of bytes written vie writev.
const XRootDStatus * status
Close status.
uint32_t wCount
Total count of writes.
uint64_t vSegs
Total count of readv segments.
uint64_t vrBytes
Total number of bytes read via readv.
timeval cTOD
gettimeofday() when file was closed
uint32_t vCount
Total count of readv.
const URL * file
The file in question.
uint64_t rBytes
Total number of bytes read via read.
timeval oTOD
gettimeofday() when file was opened
uint64_t wBytes
Total number of bytes written.
uint32_t rCount
Total count of reads.
Describe an encountered file-based error.
@ ErrUnc
Unclassified operation.
const XRootDStatus * status
Status code.
const URL * file
The file in question.
Operation opCode
The associated operation.
Describe a file open event to the monitor.
uint64_t fSize
File size in bytes.
const URL * file
File in question.
uint16_t oFlags2
OpenFlags upper 16 bits.
std::string dataServer
Actual fata server.
uint16_t oFlags
OpenFlags.
Open flags, may be or'd when appropriate.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Samefs
Open file on the same filesystem as another.
@ Update
Open for reading and writing.
@ Dup
Open file duplicating content from another.
void SetNbRepair(size_t nbrepair)
Set number of repaired pages.
std::vector< uint32_t > & GetCksums()
Get the checksums.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
Code
XRootD query request codes.
std::tuple< uint64_t, uint32_t > At(size_t i)
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static const uint16_t ServerFlags
returns server flags
static const uint16_t IsEncrypted
returns true if the channel is encrypted