XRootD
Loading...
Searching...
No Matches
XrdCl::XRootDMsgHandler Class Reference

Handle/Process/Forward XRootD messages. More...

#include <XrdClXRootDMsgHandler.hh>

Inheritance diagram for XrdCl::XRootDMsgHandler:
Collaboration diagram for XrdCl::XRootDMsgHandler:

Public Member Functions

 XRootDMsgHandler (Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
 ~XRootDMsgHandler ()
 Destructor.
virtual uint16_t Examine (std::shared_ptr< Message > &msg) override
time_t GetExpiration () override
 Get a timestamp after which we give up.
const MessageGetRequest () const
 Get the request pointer.
virtual uint16_t GetSid () const override
virtual uint16_t InspectStatusRsp () override
virtual bool IsRaw () const override
 Are we a raw writer or not?
void OnReadyToSend (Message *msg) override
virtual void OnStatusReady (const Message *message, XRootDStatus status) override
 The requested action has been performed and the status is available.
virtual uint8_t OnStreamEvent (StreamEvent event, XRootDStatus status) override
void OnWaitingToSend (Message *msg) override
 Called to indicate the message is waiting to be sent.
void PartialReceived ()
virtual void Process () override
 Process the message if it was "taken" by the examine action.
virtual XRootDStatus ReadMessageBody (Message *msg, Socket *socket, uint32_t &bytesRead) override
void SetChunkList (ChunkList *chunkList)
 Set the chunk list.
void SetCrc32cDigests (std::vector< uint32_t > &&crc32cDigests)
void SetExpiration (time_t expiration)
 Set a timestamp after which we give up.
void SetFollowMetalink (bool followMetalink)
void SetHostList (HostList *hostList)
 Set host list.
void SetKernelBuffer (XrdSys::KernelBuffer *kbuff)
 Set the kernel buffer.
void SetLoadBalancer (const HostInfo &loadBalancer)
 Set the load balancer.
void SetOksofarAsAnswer (bool oksofarAsAnswer)
void SetRedirectAsAnswer (bool redirectAsAnswer)
void SetRedirectCounter (uint16_t redirectCounter)
 Set the redirect counter.
void SetStateful (bool stateful)
void WaitDone (time_t now)
XRootDStatus WriteMessageBody (Socket *socket, uint32_t &bytesWritten) override
Public Member Functions inherited from XrdCl::MsgHandler
virtual ~MsgHandler ()
 Event types that the message handler may receive.

Friends

class HandleRspJob

Additional Inherited Members

Public Types inherited from XrdCl::MsgHandler
enum  Action {
  None = 0x0000 ,
  Nop = 0x0001 ,
  Ignore = 0x0002 ,
  RemoveHandler = 0x0004 ,
  Raw = 0x0008 ,
  NoProcess = 0x0010 ,
  Corrupted = 0x0020 ,
  More = 0x0040
}
 Actions to be taken after a message is processed by the handler. More...
enum  StreamEvent {
  Ready = 1 ,
  Broken = 2 ,
  Timeout = 3 ,
  FatalError = 4
}
 Events that may have occurred to the stream. More...

Detailed Description

Handle/Process/Forward XRootD messages.

Definition at line 119 of file XrdClXRootDMsgHandler.hh.

Constructor & Destructor Documentation

◆ XRootDMsgHandler()

XrdCl::XRootDMsgHandler::XRootDMsgHandler ( Message * msg,
ResponseHandler * respHandler,
const URL * url,
std::shared_ptr< SIDManager > sidMgr,
LocalFileHandler * lFileHandler )
inline

Constructor

Parameters
msgmessage that has been sent out
respHandlerresponse handler to be called then the final final response arrives
urlthe url the message has been sent to
sidMgrthe sid manager used to allocate SID for the initial message

Definition at line 134 of file XrdClXRootDMsgHandler.hh.

138 :
139 pRequest( msg ),
140 pResponseHandler( respHandler ),
141 pUrl( *url ),
142 pEffectiveDataServerUrl( 0 ),
143 pSidMgr( sidMgr ),
144 pLFileHandler( lFileHandler ),
145 pExpiration( 0 ),
146 pRedirectAsAnswer( false ),
147 pOksofarAsAnswer( false ),
148 pHasLoadBalancer( false ),
149 pHasSessionId( false ),
150 pChunkList( 0 ),
151 pKBuff( 0 ),
152 pRedirectCounter( 0 ),
153 pNotAuthorizedCounter( 0 ),
154
155 pAsyncOffset( 0 ),
156 pAsyncChunkIndex( 0 ),
157
158 pPgWrtCksumBuff( 4 ),
159 pPgWrtCurrentPageOffset( 0 ),
160 pPgWrtCurrentPageNb( 0 ),
161
162 pOtherRawStarted( false ),
163
164 pFollowMetalink( false ),
165
166 pStateful( false ),
167
168 pAggregatedWaitTime( 0 ),
169
170 pSendingState( 0 ),
171
172 pTimeoutFence( false ),
173
174 pDirListStarted( false ),
175 pDirListWithStat( false ),
176
177 pCV( 0 ),
178
179 pSslErrCnt( 0 )
180 {
181 pPostMaster = DefaultEnv::GetPostMaster();
182 if( msg->GetSessionId() )
183 pHasSessionId = true;
184
185 Log *log = DefaultEnv::GetLog();
186 log->Debug( ExDbgMsg, "[%s] MsgHandler created: %p (message: %s ).",
187 pUrl.GetHostId().c_str(), (void*)this,
188 pRequest->GetObfuscatedDescription().c_str() );
189
190 ClientRequestHdr *hdr = (ClientRequestHdr*)pRequest->GetBuffer();
191 if( ntohs( hdr->requestid ) == kXR_pgread )
192 {
193 ClientPgReadRequest *pgrdreq = (ClientPgReadRequest*)pRequest->GetBuffer();
194 pCrc32cDigests.reserve( XrdOucPgrwUtils::csNum( ntohll( pgrdreq->offset ),
195 ntohl( pgrdreq->rlen ) ) );
196 }
197
198 //----------------------------------------------------------------------
199 // Pass the reader our pUrl, not *url. The latter is a reference, likely
200 // from FileStateHandler such as *pDataServer. Accessing that throughout
201 // our lifetime may lead to concurrent access. In the case of read-
202 // recovery the FileStateHandler may entirely reallocate the url object.
203 //----------------------------------------------------------------------
204 if( ntohs( hdr->requestid ) == kXR_readv )
205 pBodyReader.reset( new AsyncVectorReader( pUrl, *pRequest ) );
206 else if( ntohs( hdr->requestid ) == kXR_read )
207 pBodyReader.reset( new AsyncRawReader( pUrl, *pRequest ) );
208 else
209 pBodyReader.reset( new AsyncDiscardReader( pUrl, *pRequest ) );
210 }
kXR_unt16 requestid
Definition XProtocol.hh:159
@ kXR_read
Definition XProtocol.hh:126
@ kXR_readv
Definition XProtocol.hh:138
@ kXR_pgread
Definition XProtocol.hh:143
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint64_t ExDbgMsg
XrdSysError Log
Definition XrdConfig.cc:113

References XrdOucPgrwUtils::csNum(), XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Message::GetSessionId(), kXR_pgread, kXR_read, kXR_readv, ClientPgReadRequest::offset, ClientRequestHdr::requestid, and ClientPgReadRequest::rlen.

Here is the call graph for this function:

◆ ~XRootDMsgHandler()

XrdCl::XRootDMsgHandler::~XRootDMsgHandler ( )
inline

Destructor.

Definition at line 215 of file XrdClXRootDMsgHandler.hh.

216 {
217 DumpRedirectTraceBack();
218
219 if( !pHasSessionId )
220 delete pRequest;
221 delete pEffectiveDataServerUrl;
222
223 pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
224 pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
225 pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
226 pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
227 pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
228 pEffectiveDataServerUrl = reinterpret_cast<URL*>( 0xDEADBEEF );
229
230 Log *log = DefaultEnv::GetLog();
231 log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: %p.",
232 pUrl.GetHostId().c_str(), (void*)this );
233 }
std::vector< ChunkInfo > ChunkList
List of chunks.

References XrdCl::Log::Debug(), XrdCl::ExDbgMsg, and XrdCl::DefaultEnv::GetLog().

Here is the call graph for this function:

Member Function Documentation

◆ Examine()

uint16_t XrdCl::XRootDMsgHandler::Examine ( std::shared_ptr< Message > & msg)
overridevirtual

Examine an incoming message, and decide on the action to be taken

Parameters
msgthe message, may be zero if receive failed
Returns
action type that needs to be take wrt the message and the handler

Implements XrdCl::MsgHandler.

Definition at line 109 of file XrdClXRootDMsgHandler.cc.

110 {
111 const int sst = pSendingState.fetch_or( kSawResp );
112
113 if( !( sst & kSendDone ) && !( sst & kSawResp ) )
114 {
115 // we must have been sent although we haven't got the OnStatusReady
116 // notification yet. Set the inflight notice.
117
118 Log *log = DefaultEnv::GetLog();
119 log->Dump( XRootDMsg, "[%s] Message %s reply received before notification "
120 "that it was sent, assuming it was sent ok.",
121 pUrl.GetHostId().c_str(),
122 pRequest->GetObfuscatedDescription().c_str() );
123 }
124
125 //--------------------------------------------------------------------------
126 // if the MsgHandler is already being used to process another request
127 // (kXR_oksofar) we need to wait
128 //--------------------------------------------------------------------------
129 if( pOksofarAsAnswer )
130 {
131 XrdSysCondVarHelper lck( pCV );
132 while( pResponse ) pCV.Wait();
133 }
134 else
135 {
136 if( pResponse )
137 {
138 Log *log = DefaultEnv::GetLog();
139 log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
140 "it already owns a response: %p (message: %s ).",
141 pUrl.GetHostId().c_str(), (void*)this,
142 pRequest->GetObfuscatedDescription().c_str() );
143 }
144 }
145
146 if( msg->GetSize() < 8 )
147 return Ignore;
148
149 ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
150 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
151 uint16_t status = 0;
152 uint32_t dlen = 0;
153
154 //--------------------------------------------------------------------------
155 // We only care about async responses, but those are extracted now
156 // in the SocketHandler.
157 //--------------------------------------------------------------------------
158 if( rsp->hdr.status == kXR_attn )
159 {
160 return Ignore;
161 }
162 //--------------------------------------------------------------------------
163 // We got a sync message - check if it belongs to us
164 //--------------------------------------------------------------------------
165 else
166 {
167 if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
168 rsp->hdr.streamid[1] != req->header.streamid[1] )
169 return Ignore;
170
171 status = rsp->hdr.status;
172 dlen = rsp->hdr.dlen;
173 }
174
175 //--------------------------------------------------------------------------
176 // We take the ownership of the message and decide what we will do
177 // with the handler itself, the options are:
178 // 1) we want to either read in raw mode (the Raw flag) or have the message
179 // body reconstructed for us by the TransportHandler by the time
180 // Process() is called (default, no extra flag)
181 // 2) we either got a full response in which case we don't want to be
182 // notified about anything anymore (RemoveHandler) or we got a partial
183 // answer and we need to wait for more (default, no extra flag)
184 //--------------------------------------------------------------------------
185 pResponse = msg;
186 pBodyReader->SetDataLength( dlen );
187
188 Log *log = DefaultEnv::GetLog();
189 switch( status )
190 {
191 //------------------------------------------------------------------------
192 // Handle the cached cases
193 //------------------------------------------------------------------------
194 case kXR_error:
195 case kXR_redirect:
196 case kXR_wait:
197 return RemoveHandler;
198
199 case kXR_waitresp:
200 {
201 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
202 "message %s", pUrl.GetHostId().c_str(),
203 pRequest->GetObfuscatedDescription().c_str() );
204
205 pResponse.reset();
206 return Ignore; // This must be handled synchronously!
207 }
208
209 //------------------------------------------------------------------------
210 // Handle the potential raw cases
211 //------------------------------------------------------------------------
212 case kXR_ok:
213 {
214 //----------------------------------------------------------------------
215 // For kXR_read we read in raw mode
216 //----------------------------------------------------------------------
217 uint16_t reqId = ntohs( req->header.requestid );
218 if( reqId == kXR_read )
219 {
220 return Raw | RemoveHandler;
221 }
222
223 //----------------------------------------------------------------------
224 // kXR_readv is the same as kXR_read
225 //----------------------------------------------------------------------
226 if( reqId == kXR_readv )
227 {
228 return Raw | RemoveHandler;
229 }
230
231 //----------------------------------------------------------------------
232 // For everything else we just take what we got
233 //----------------------------------------------------------------------
234 return RemoveHandler;
235 }
236
237 //------------------------------------------------------------------------
238 // kXR_oksofars are special, they are not full responses, so we reset
239 // the response pointer to 0 and add the message to the partial list
240 //------------------------------------------------------------------------
241 case kXR_oksofar:
242 {
243 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
244 "%s", pUrl.GetHostId().c_str(),
245 pRequest->GetObfuscatedDescription().c_str() );
246
247 if( !pOksofarAsAnswer )
248 {
249 pPartialResps.emplace_back( std::move( pResponse ) );
250 }
251
252 //----------------------------------------------------------------------
253 // For kXR_read we either read in raw mode if the message has not
254 // been fully reconstructed already, if it has, we adjust
255 // the buffer offset to prepare for the next one
256 //----------------------------------------------------------------------
257 uint16_t reqId = ntohs( req->header.requestid );
258 if( reqId == kXR_read )
259 {
260 pTimeoutFence.store( true, std::memory_order_relaxed );
261 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
262 }
263
264 //----------------------------------------------------------------------
265 // kXR_readv is similar to read, except that the payload is different
266 //----------------------------------------------------------------------
267 if( reqId == kXR_readv )
268 {
269 pTimeoutFence.store( true, std::memory_order_relaxed );
270 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
271 }
272
273 return ( pOksofarAsAnswer ? None : NoProcess );
274 }
275
276 case kXR_status:
277 {
278 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
279 "%s", pUrl.GetHostId().c_str(),
280 pRequest->GetObfuscatedDescription().c_str() );
281
282 uint16_t reqId = ntohs( req->header.requestid );
283 if( reqId == kXR_pgwrite )
284 {
285 //--------------------------------------------------------------------
286 // In case of pgwrite by definition this wont be a partial response
287 // so we can already remove the handler from the in-queue
288 //--------------------------------------------------------------------
289 return RemoveHandler;
290 }
291
292 //----------------------------------------------------------------------
293 // Otherwise (pgread), first of all we need to read the body of the
294 // kXR_status response, we can handle the raw data (if any) only after
295 // we have the whole kXR_status body
296 //----------------------------------------------------------------------
297 pTimeoutFence.store( true, std::memory_order_relaxed );
298 return None;
299 }
300
301 //------------------------------------------------------------------------
302 // Default
303 //------------------------------------------------------------------------
304 default:
305 return RemoveHandler;
306 }
307 return RemoveHandler;
308 }
kXR_char streamid[2]
Definition XProtocol.hh:158
kXR_char streamid[2]
Definition XProtocol.hh:956
@ kXR_waitresp
Definition XProtocol.hh:948
@ kXR_redirect
Definition XProtocol.hh:946
@ kXR_oksofar
Definition XProtocol.hh:942
@ kXR_status
Definition XProtocol.hh:949
@ kXR_ok
Definition XProtocol.hh:941
@ kXR_attn
Definition XProtocol.hh:943
@ kXR_wait
Definition XProtocol.hh:947
@ kXR_error
Definition XProtocol.hh:945
struct ClientRequestHdr header
Definition XProtocol.hh:887
@ kXR_pgwrite
Definition XProtocol.hh:139
ServerResponseHeader hdr
@ Ignore
Ignore the message.
const uint64_t XRootDMsg

References ServerResponseHeader::dlen, XrdCl::Log::Dump(), XrdCl::ExDbgMsg, XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, ClientRequest::header, XrdCl::MsgHandler::Ignore, kXR_attn, kXR_error, kXR_ok, kXR_oksofar, kXR_pgwrite, kXR_read, kXR_readv, kXR_redirect, kXR_status, kXR_wait, kXR_waitresp, XrdCl::MsgHandler::None, XrdCl::MsgHandler::NoProcess, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, ClientRequestHdr::requestid, ServerResponseHeader::status, ClientRequestHdr::streamid, ServerResponseHeader::streamid, XrdCl::Log::Warning(), and XrdCl::XRootDMsg.

Here is the call graph for this function:

◆ GetExpiration()

time_t XrdCl::XRootDMsgHandler::GetExpiration ( )
inlineoverridevirtual

Get a timestamp after which we give up.

Implements XrdCl::MsgHandler.

Definition at line 336 of file XrdClXRootDMsgHandler.hh.

337 {
338 return pExpiration;
339 }

◆ GetRequest()

const Message * XrdCl::XRootDMsgHandler::GetRequest ( ) const
inline

Get the request pointer.

Definition at line 362 of file XrdClXRootDMsgHandler.hh.

363 {
364 return pRequest;
365 }

◆ GetSid()

uint16_t XrdCl::XRootDMsgHandler::GetSid ( ) const
overridevirtual

Get handler sid

return sid of the corresponding request, otherwise 0

Implements XrdCl::MsgHandler.

Definition at line 403 of file XrdClXRootDMsgHandler.cc.

404 {
405 ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
406 return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
407 }

References ClientRequest::header, and ClientRequestHdr::streamid.

◆ InspectStatusRsp()

uint16_t XrdCl::XRootDMsgHandler::InspectStatusRsp ( )
overridevirtual

Reexamine the incoming message, and decide on the action to be taken

In case of kXR_status the message can be only fully examined after reading the whole body (without raw data).

Parameters
msgthe message, may be zero if receive failed
Returns
action type that needs to be take wrt the message and the handler

Implements XrdCl::MsgHandler.

Definition at line 313 of file XrdClXRootDMsgHandler.cc.

314 {
315 if( !pResponse )
316 return 0;
317
318 Log *log = DefaultEnv::GetLog();
319 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
320
321 //--------------------------------------------------------------------------
322 // Additional action is only required for kXR_status
323 //--------------------------------------------------------------------------
324 if( rsp->hdr.status != kXR_status ) return 0;
325
326 //--------------------------------------------------------------------------
327 // Ignore malformed status response
328 //--------------------------------------------------------------------------
329 if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
330 {
331 log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
332 return Corrupted;
333 }
334
335 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
336 uint16_t reqId = ntohs( req->header.requestid );
337 //--------------------------------------------------------------------------
338 // Unmarshal the status body
339 //--------------------------------------------------------------------------
340 XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
341
342 if( !st.IsOK() && st.code == errDataError )
343 {
344 log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
345 st.GetErrorMessage().c_str() );
346 return Corrupted;
347 }
348
349 if( !st.IsOK() )
350 {
351 log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
352 pUrl.GetHostId().c_str() );
353 pStatus = st;
354 HandleRspOrQueue();
355 return Ignore;
356 }
357
358 //--------------------------------------------------------------------------
359 // Common handling for partial results
360 //--------------------------------------------------------------------------
361 ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
363 {
364 pPartialResps.push_back( std::move( pResponse ) );
365 }
366
367 //--------------------------------------------------------------------------
368 // Decide the actions that we need to take
369 //--------------------------------------------------------------------------
370 uint16_t action = 0;
371 if( reqId == kXR_pgread )
372 {
373 //----------------------------------------------------------------------
374 // The message contains only Status header and body but no raw data
375 //----------------------------------------------------------------------
376 if( !pPageReader )
377 pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
378 pPageReader->SetRsp( rspst );
379
380 action |= Raw;
381
383 action |= NoProcess;
384 else
385 action |= RemoveHandler;
386 }
387 else if( reqId == kXR_pgwrite )
388 {
389 // if data corruption has been detected on the server side we will
390 // send some additional data pointing to the pages that need to be
391 // retransmitted
392 if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
393 pResponse->GetCursor() )
394 action |= More;
395 }
396
397 return action;
398 }
ServerResponseStatus status
struct ServerResponseBody_Status bdy
struct ServerResponseHeader hdr
@ More
there are more (non-raw) data to be read
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
const uint16_t errDataError
data is corrupted
@ kXR_PartialResult

References ServerResponseStatus::bdy, XrdCl::Status::code, XrdCl::MsgHandler::Corrupted, ServerResponseBody_Status::dlen, ServerResponseHeader::dlen, XrdCl::errDataError, XrdCl::Log::Error(), XrdCl::XRootDStatus::GetErrorMessage(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, ServerResponseStatus::hdr, ClientRequest::header, XrdCl::MsgHandler::Ignore, XrdCl::Status::IsOK(), XrdProto::kXR_PartialResult, kXR_pgread, kXR_pgwrite, kXR_status, XrdCl::MsgHandler::More, XrdCl::MsgHandler::NoProcess, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, ClientRequestHdr::requestid, ServerResponseBody_Status::resptype, ServerResponseHeader::status, ServerResponseV2::status, XrdCl::XRootDTransport::UnMarshalStatusBody(), and XrdCl::XRootDMsg.

Here is the call graph for this function:

◆ IsRaw()

bool XrdCl::XRootDMsgHandler::IsRaw ( ) const
overridevirtual

Are we a raw writer or not?

Reimplemented from XrdCl::MsgHandler.

Definition at line 996 of file XrdClXRootDMsgHandler.cc.

997 {
998 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
999 uint16_t reqId = ntohs( req->header.requestid );
1000 if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
1001 return true;
1002 // checkpoint + execute
1003 if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
1004 {
1005 ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
1006 reqId = ntohs( xeq->header.requestid );
1007 return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
1008 }
1009
1010 return false;
1011 }
static const int kXR_ckpXeq
Definition XProtocol.hh:218
@ kXR_writev
Definition XProtocol.hh:144
@ kXR_write
Definition XProtocol.hh:132
@ kXR_truncate
Definition XProtocol.hh:141
@ kXR_chkpoint
Definition XProtocol.hh:125
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:890

References ClientRequest::chkpoint, ClientRequest::header, kXR_chkpoint, kXR_ckpXeq, kXR_pgwrite, kXR_truncate, kXR_write, kXR_writev, ClientChkPointRequest::opcode, and ClientRequestHdr::requestid.

◆ OnReadyToSend()

void XrdCl::XRootDMsgHandler::OnReadyToSend ( Message * msg)
inlineoverridevirtual

Called just before the message is going to be sent through a valid connection, so that the user can still make some modifications that were impossible before (ie. protocol version dependent adjustments)

Parameters
msgmessage concerned

Reimplemented from XrdCl::MsgHandler.

Definition at line 438 of file XrdClXRootDMsgHandler.hh.

439 {
440 pSendingState |= kSawReadySend;
441 }

◆ OnStatusReady()

void XrdCl::XRootDMsgHandler::OnStatusReady ( const Message * message,
XRootDStatus status )
overridevirtual

The requested action has been performed and the status is available.

Implements XrdCl::MsgHandler.

Definition at line 919 of file XrdClXRootDMsgHandler.cc.

921 {
922 Log *log = DefaultEnv::GetLog();
923
924 if( status.IsOK() )
925 {
926 log->Dump( XRootDMsg, "[%s] Got notification that outgoing message %s "
927 "was sent successfully.", pUrl.GetHostId().c_str(),
928 message->GetObfuscatedDescription().c_str() );
929 }
930
931 // After setting kSendDone processing of this object may continue in
932 // another thread. Unless we're in an error condition our object may
933 // be modified or even destroyed after this point.
934 const int sst = pSendingState.fetch_or( kSendDone );
935
936 // ignore if we're already in this state
937 if( status.IsOK() && ( sst & kSendDone ) ) return;
938
939 // if we have already seen a response we should be getting notified
940 // of a successful send. But if not, log and do our best to recover.
941 if( !status.IsOK() && ( ( sst & kFinalResp ) || ( sst & kSawResp ) ) )
942 {
943 log->Error( XRootDMsg, "[%s] Unexpected error for message %s. Trying to "
944 "recover.", pUrl.GetHostId().c_str(),
945 message->GetObfuscatedDescription().c_str() );
946 HandleError( status );
947 return;
948 }
949
950 if( sst & kFinalResp )
951 {
952 // late notification and we already have final response for user,
953 // need to queue handler callback.
954 HandleRspOrQueue();
955 return;
956 }
957
958 if( sst & kRetryAtSrv )
959 {
960 // late notification and we already received a response and know
961 // we need to retry at differnt server.
962 HandleError( RetryAtServer( pRetryAtUrl, pRetryAtEntryType ) );
963 return;
964 }
965
966 if( sst & kSawResp )
967 {
968 // late notification, response processing may be happening in another
969 // thread.
970 return;
971 }
972
973 //--------------------------------------------------------------------------
974 // We were successful, so we now need to listen for a response
975 //--------------------------------------------------------------------------
976 if( status.IsOK() )
977 {
978 // this is the expcted order, we got the notificaiton but no response
979 // received yet. However another thread is liable to be processing
980 // one or sending a final response and deleting us at any point now.
981 return;
982 }
983
984 //--------------------------------------------------------------------------
985 // We have failed, recover if possible
986 //--------------------------------------------------------------------------
987 log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
988 "recover.", pUrl.GetHostId().c_str(),
989 message->GetObfuscatedDescription().c_str() );
990 HandleError( status );
991 }

References XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Status::IsOK(), and XrdCl::XRootDMsg.

Here is the call graph for this function:

◆ OnStreamEvent()

uint8_t XrdCl::XRootDMsgHandler::OnStreamEvent ( StreamEvent event,
XRootDStatus status )
overridevirtual

Handle an event other that a message arrival

Parameters
eventtype of the event
statusstatus info

Reimplemented from XrdCl::MsgHandler.

Definition at line 880 of file XrdClXRootDMsgHandler.cc.

882 {
883 Log *log = DefaultEnv::GetLog();
884 log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
885 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
886
887 if( event == Ready )
888 return 0;
889
890 if( pTimeoutFence.load( std::memory_order_relaxed ) )
891 return 0;
892
893 HandleError( status );
894 return RemoveHandler;
895 }
@ Ready
The stream has become connected.

References XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), XrdCl::MsgHandler::Ready, XrdCl::MsgHandler::RemoveHandler, and XrdCl::XRootDMsg.

Here is the call graph for this function:

◆ OnWaitingToSend()

void XrdCl::XRootDMsgHandler::OnWaitingToSend ( Message * msg)
inlineoverridevirtual

Called to indicate the message is waiting to be sent.

Reimplemented from XrdCl::MsgHandler.

Definition at line 443 of file XrdClXRootDMsgHandler.hh.

444 {
445 pSendingState = 0;
446 }

◆ PartialReceived()

void XrdCl::XRootDMsgHandler::PartialReceived ( )

Bookkeeping after partial response has been received:

  • take down the timeout fence after oksofar response has been handled
  • reset status-response-body marshaled flag

Definition at line 1173 of file XrdClXRootDMsgHandler.cc.

1174 {
1175 pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1176 }

Referenced by XrdCl::Stream::OnIncoming().

Here is the caller graph for this function:

◆ Process()

void XrdCl::XRootDMsgHandler::Process ( )
overridevirtual

Process the message if it was "taken" by the examine action.

Process the message if it was "taken" by the examine action

Parameters
msgthe message to be processed

Reimplemented from XrdCl::MsgHandler.

Definition at line 412 of file XrdClXRootDMsgHandler.cc.

413 {
414 Log *log = DefaultEnv::GetLog();
415
416 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
417
418 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
419
420 //--------------------------------------------------------------------------
421 // If it is a local file, it can be only a metalink redirector
422 //--------------------------------------------------------------------------
423 if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
424 pHosts->back().protocol = kXR_PROTOCOLVERSION;
425
426 //--------------------------------------------------------------------------
427 // We got an answer, check who we were talking to
428 //--------------------------------------------------------------------------
429 else
430 {
431 AnyObject qryResult;
432 int *qryResponse = nullptr;
433 pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
434 qryResult.Get( qryResponse );
435 if (qryResponse) {
436 pHosts->back().flags = *qryResponse;
437 delete qryResponse;
438 qryResponse = nullptr;
439 }
440 pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
441 qryResult.Get( qryResponse );
442 if (qryResponse) {
443 pHosts->back().protocol = *qryResponse;
444 delete qryResponse;
445 }
446 }
447
448 //--------------------------------------------------------------------------
449 // Process the message
450 //--------------------------------------------------------------------------
451 Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
452 if( !st.IsOK() )
453 {
454 pStatus = Status( stFatal, errInvalidMessage );
455 HandleResponse();
456 return;
457 }
458
459 //--------------------------------------------------------------------------
460 // we have an response for the message so it's not in fly anymore
461 //--------------------------------------------------------------------------
462 pSendingState.fetch_or( kInFlyDone );
463
464 //--------------------------------------------------------------------------
465 // Reset the aggregated wait (used to omit wait response in case of Metalink
466 // redirector)
467 //--------------------------------------------------------------------------
468 if( rsp->hdr.status != kXR_wait )
469 pAggregatedWaitTime = 0;
470
471 switch( rsp->hdr.status )
472 {
473 //------------------------------------------------------------------------
474 // kXR_ok - we're done here
475 //------------------------------------------------------------------------
476 case kXR_ok:
477 {
478 log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
479 pUrl.GetHostId().c_str(),
480 pRequest->GetObfuscatedDescription().c_str() );
481 pStatus = Status();
482 HandleResponse();
483 return;
484 }
485
486 case kXR_status:
487 {
488 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
489 pUrl.GetHostId().c_str(),
490 pRequest->GetObfuscatedDescription().c_str() );
491 pStatus = Status();
492 HandleResponse();
493 return;
494 }
495
496 //------------------------------------------------------------------------
497 // kXR_ok - we're serving partial result to the user
498 //------------------------------------------------------------------------
499 case kXR_oksofar:
500 {
501 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
502 pUrl.GetHostId().c_str(),
503 pRequest->GetObfuscatedDescription().c_str() );
504 pStatus = Status( stOK, suContinue );
505 HandleResponse();
506 return;
507 }
508
509 //------------------------------------------------------------------------
510 // kXR_error - we've got a problem
511 //------------------------------------------------------------------------
512 case kXR_error:
513 {
514 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
515 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
516 log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
517 "[%d] %s", pUrl.GetHostId().c_str(),
518 pRequest->GetObfuscatedDescription().c_str(), rsp->body.error.errnum,
519 errmsg );
520 delete [] errmsg;
521
522 HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
523 return;
524 }
525
526 //------------------------------------------------------------------------
527 // kXR_redirect - they tell us to go elsewhere
528 //------------------------------------------------------------------------
529 case kXR_redirect:
530 {
531 if( rsp->hdr.dlen <= 4 )
532 {
533 log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
534 pUrl.GetHostId().c_str() );
535 pStatus = Status( stError, errInvalidResponse );
536 HandleResponse();
537 return;
538 }
539
540 char *urlInfoBuff = new char[rsp->hdr.dlen-3];
541 urlInfoBuff[rsp->hdr.dlen-4] = 0;
542 memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
543 std::string urlInfo = urlInfoBuff;
544 delete [] urlInfoBuff;
545 log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
546 "message %s: %s, port %d", pUrl.GetHostId().c_str(),
547 pRequest->GetObfuscatedDescription().c_str(), urlInfo.c_str(),
548 rsp->body.redirect.port );
549
550 //----------------------------------------------------------------------
551 // Check if we can proceed
552 //----------------------------------------------------------------------
553 if( !pRedirectCounter )
554 {
555 log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
556 "message %s, the last known error is: %s",
557 pUrl.GetHostId().c_str(),
558 pRequest->GetObfuscatedDescription().c_str(),
559 pLastError.ToString().c_str() );
560
561
562 pStatus = Status( stFatal, errRedirectLimit );
563 HandleResponse();
564 return;
565 }
566 --pRedirectCounter;
567
568 //----------------------------------------------------------------------
569 // Keep the info about this server if we still need to find a load
570 // balancer
571 //----------------------------------------------------------------------
572 uint32_t flags = pHosts->back().flags;
573 if( !pHasLoadBalancer )
574 {
575 if( flags & kXR_isManager )
576 {
577 //------------------------------------------------------------------
578 // If the current server is a meta manager then it supersedes
579 // any existing load balancer, otherwise we assign a load-balancer
580 // only if it has not been already assigned
581 //------------------------------------------------------------------
582 if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
583 {
584 pLoadBalancer = pHosts->back();
585 log->Dump( XRootDMsg, "[%s] Current server has been assigned "
586 "as a load-balancer for message %s",
587 pUrl.GetHostId().c_str(),
588 pRequest->GetObfuscatedDescription().c_str() );
589 HostList::iterator it;
590 for( it = pHosts->begin(); it != pHosts->end(); ++it )
591 it->loadBalancer = false;
592 pHosts->back().loadBalancer = true;
593 }
594 }
595 }
596
597 //----------------------------------------------------------------------
598 // If the redirect comes from a data server safe the URL because
599 // in case of a failure we will use it as the effective data server URL
600 // for the tried CGI opaque info
601 //----------------------------------------------------------------------
602 if( flags & kXR_isServer )
603 pEffectiveDataServerUrl = new URL( pHosts->back().url );
604
605 //----------------------------------------------------------------------
606 // Build the URL and check it's validity
607 //----------------------------------------------------------------------
608 std::vector<std::string> urlComponents;
609 std::string newCgi;
610 Utils::splitString( urlComponents, urlInfo, "?" );
611
612 std::ostringstream o;
613
614 o << urlComponents[0];
615 if( rsp->body.redirect.port > 0 )
616 o << ":" << rsp->body.redirect.port << "/";
617 else if( rsp->body.redirect.port < 0 )
618 {
619 //--------------------------------------------------------------------
620 // check if the manager wants to enforce write recovery at himself
621 // (beware we are dealing here with negative flags)
622 //--------------------------------------------------------------------
623 if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
624 pHosts->back().flags |= kXR_recoverWrts;
625
626 //--------------------------------------------------------------------
627 // check if the manager wants to collapse the communication channel
628 // (the redirect host is to replace the current host)
629 //--------------------------------------------------------------------
630 if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
631 {
632 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
633 pPostMaster->CollapseRedirect( pUrl, url );
634 }
635
636 if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
637 {
638 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
639 if( Utils::CheckEC( pRequest, url ) )
640 pRedirectAsAnswer = true;
641 }
642 }
643
644 URL newUrl = URL( o.str() );
645 if( !newUrl.IsValid() )
646 {
647 pStatus = Status( stError, errInvalidRedirectURL );
648 log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
649 pUrl.GetHostId().c_str(), urlInfo.c_str() );
650 HandleResponse();
651 return;
652 }
653
654 if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
655 newUrl.SetUserName( pUrl.GetUserName() );
656
657 if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
658 newUrl.SetPassword( pUrl.GetPassword() );
659
660 //----------------------------------------------------------------------
661 // Forward any "xrd.*" params from the original client request also to
662 // the new redirection url
663 // Also, we need to preserve any "xrdcl.*' as they are important for
664 // our internal workflows.
665 //----------------------------------------------------------------------
666 std::ostringstream ossXrd;
667 const URL::ParamsMap &urlParams = pUrl.GetParams();
668
669 for(URL::ParamsMap::const_iterator it = urlParams.begin();
670 it != urlParams.end(); ++it )
671 {
672 if( it->first.compare( 0, 4, "xrd." ) &&
673 it->first.compare( 0, 6, "xrdcl." ) )
674 continue;
675
676 ossXrd << it->first << '=' << it->second << '&';
677 }
678
679 std::string xrdCgi = ossXrd.str();
680 pRedirectUrl = newUrl.GetURL();
681
682 URL cgiURL;
683 if( urlComponents.size() > 1 )
684 {
685 pRedirectUrl += "?";
686 pRedirectUrl += urlComponents[1];
687 std::ostringstream o;
688 o << "fake://fake:111//fake?";
689 o << urlComponents[1];
690
691 if( urlComponents.size() == 3 )
692 o << '?' << urlComponents[2];
693
694 if (!xrdCgi.empty())
695 {
696 o << '&' << xrdCgi;
697 pRedirectUrl += '&';
698 pRedirectUrl += xrdCgi;
699 }
700
701 cgiURL = URL( o.str() );
702 }
703 else {
704 if (!xrdCgi.empty())
705 {
706 std::ostringstream o;
707 o << "fake://fake:111//fake?";
708 o << xrdCgi;
709 cgiURL = URL( o.str() );
710 pRedirectUrl += '?';
711 pRedirectUrl += xrdCgi;
712 }
713 }
714
715 //----------------------------------------------------------------------
716 // Check if we need to return the URL as a response
717 //----------------------------------------------------------------------
718 if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
719 newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
720 !newUrl.IsLocalFile() )
721 pRedirectAsAnswer = true;
722
723 if( pRedirectAsAnswer )
724 {
725 pStatus = Status( stError, errRedirect );
726 HandleResponse();
727 return;
728 }
729
730 //----------------------------------------------------------------------
731 // Rewrite the message in a way required to send it to another server
732 //----------------------------------------------------------------------
733 newUrl.SetParams( cgiURL.GetParams() );
734 Status st = RewriteRequestRedirect( newUrl );
735 if( !st.IsOK() )
736 {
737 pStatus = st;
738 HandleResponse();
739 return;
740 }
741
742 //----------------------------------------------------------------------
743 // Make sure we don't change the protocol by accident (root vs roots)
744 //----------------------------------------------------------------------
745 if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
746 ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
747 newUrl.SetProtocol( "roots" );
748
749 //----------------------------------------------------------------------
750 // Send the request to the new location
751 //----------------------------------------------------------------------
752 HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
753 return;
754 }
755
756 //------------------------------------------------------------------------
757 // kXR_wait - we wait, and re-issue the request later
758 //------------------------------------------------------------------------
759 case kXR_wait:
760 {
761 uint32_t waitSeconds = 0;
762
763 if( rsp->hdr.dlen >= 4 )
764 {
765 char *infoMsg = new char[rsp->hdr.dlen-3];
766 infoMsg[rsp->hdr.dlen-4] = 0;
767 memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
768 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
769 "message %s: %s", pUrl.GetHostId().c_str(),
770 rsp->body.wait.seconds, pRequest->GetObfuscatedDescription().c_str(),
771 infoMsg );
772 delete [] infoMsg;
773 waitSeconds = rsp->body.wait.seconds;
774 }
775 else
776 {
777 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
778 "message %s", pUrl.GetHostId().c_str(),
779 pRequest->GetObfuscatedDescription().c_str() );
780 }
781
782 pAggregatedWaitTime += waitSeconds;
783
784 // We need a special case if the data node comes from metalink
785 // redirector. In this case it might make more sense to try the
786 // next entry in the Metalink than wait.
787 if( OmitWait( *pRequest, pLoadBalancer.url ) )
788 {
789 int maxWait = DefaultMaxMetalinkWait;
790 DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
791 if( pAggregatedWaitTime > maxWait )
792 {
793 UpdateTriedCGI();
794 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
795 return;
796 }
797 }
798
799 //----------------------------------------------------------------------
800 // Some messages require rewriting before they can be sent again
801 // after wait
802 //----------------------------------------------------------------------
803 Status st = RewriteRequestWait();
804 if( !st.IsOK() )
805 {
806 pStatus = st;
807 HandleResponse();
808 return;
809 }
810
811 //----------------------------------------------------------------------
812 // Register a task to resend the message in some seconds, if we still
813 // have time to do that, and report a timeout otherwise
814 //----------------------------------------------------------------------
815 time_t resendTime = ::time(0)+waitSeconds;
816
817 if( resendTime < pExpiration )
818 {
819 log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: %p (message: %s ).",
820 pUrl.GetHostId().c_str(), (void*)this,
821 pRequest->GetObfuscatedDescription().c_str() );
822
823 TaskManager *taskMgr = pPostMaster->GetTaskManager();
824 taskMgr->RegisterTask( new WaitTask( this ), resendTime );
825 }
826 else
827 {
828 log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
829 pUrl.GetHostId().c_str(),
830 pRequest->GetObfuscatedDescription().c_str() );
831 HandleError( Status( stError, errOperationExpired) );
832 }
833 return;
834 }
835
836 //------------------------------------------------------------------------
837 // kXR_waitresp - the response will be returned in some seconds as an
838 // unsolicited message. Currently all messages of this type are handled
839 // one step before in the XrdClStream::OnIncoming as they need to be
840 // processed synchronously.
841 //------------------------------------------------------------------------
842 case kXR_waitresp:
843 {
844 if( rsp->hdr.dlen < 4 )
845 {
846 log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
847 pUrl.GetHostId().c_str() );
848 pStatus = Status( stError, errInvalidResponse );
849 HandleResponse();
850 return;
851 }
852
853 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
854 "message %s", pUrl.GetHostId().c_str(),
855 rsp->body.waitresp.seconds,
856 pRequest->GetObfuscatedDescription().c_str() );
857 return;
858 }
859
860 //------------------------------------------------------------------------
861 // Default - unrecognized/unsupported response, declare an error
862 //------------------------------------------------------------------------
863 default:
864 {
865 log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
866 "message %s", pUrl.GetHostId().c_str(),
867 rsp->hdr.status, pRequest->GetObfuscatedDescription().c_str() );
868 pStatus = Status( stError, errInvalidResponse );
869 HandleResponse();
870 return;
871 }
872 }
873
874 return;
875 }
#define kXR_isManager
#define kXR_collapseRedir
#define kXR_attrMeta
union ServerResponse::@040373375333017131300127053271011057331004327334 body
#define kXR_recoverWrts
#define kXR_isServer
#define kXR_PROTOCOLVERSION
Definition XProtocol.hh:70
#define kXR_ecRedir
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
const uint16_t errRedirectLimit
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidResponse
const uint16_t errInvalidRedirectURL
const uint16_t suContinue
const uint16_t errRedirect
const uint16_t errInvalidMessage
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version

References ServerResponse::body, XrdCl::Utils::CheckEC(), XrdCl::Log::Debug(), XrdCl::DefaultMaxMetalinkWait, ServerResponseHeader::dlen, XrdCl::Log::Dump(), XrdCl::RedirectEntry::EntryRedirect, XrdCl::RedirectEntry::EntryRedirectOnWait, XrdCl::errErrorResponse, XrdCl::errInvalidMessage, XrdCl::errInvalidRedirectURL, XrdCl::errInvalidResponse, XrdCl::errOperationExpired, XrdCl::Log::Error(), XrdCl::errRedirect, XrdCl::errRedirectLimit, XrdCl::ExDbgMsg, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::URL::GetParams(), XrdCl::URL::GetPassword(), XrdCl::URL::GetProtocol(), XrdCl::URL::GetURL(), XrdCl::URL::GetUserName(), ServerResponse::hdr, ClientRequest::header, XrdCl::URL::IsLocalFile(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), kXR_attrMeta, kXR_collapseRedir, kXR_ecRedir, kXR_error, kXR_isManager, kXR_isServer, kXR_ok, kXR_oksofar, kXR_PROTOCOLVERSION, kXR_recoverWrts, kXR_redirect, kXR_status, kXR_wait, kXR_waitresp, ClientRequest::protocol, XrdCl::XRootDQuery::ProtocolVersion, XrdCl::TaskManager::RegisterTask(), ClientRequestHdr::requestid, XrdCl::XRootDQuery::ServerFlags, XrdCl::URL::SetParams(), XrdCl::URL::SetPassword(), XrdCl::URL::SetProtocol(), XrdCl::URL::SetUserName(), XrdCl::Utils::splitString(), ServerResponseHeader::status, XrdCl::stError, XrdCl::stFatal, XrdCl::stOK, XrdCl::suContinue, XrdCl::XRootDTransport::UnMarshallBody(), XrdCl::Log::Warning(), and XrdCl::XRootDMsg.

Here is the call graph for this function:

◆ ReadMessageBody()

XRootDStatus XrdCl::XRootDMsgHandler::ReadMessageBody ( Message * msg,
Socket * socket,
uint32_t & bytesRead )
overridevirtual

Read message body directly from a socket - called if Examine returns Raw flag - only socket related errors may be returned here

Parameters
msgthe corresponding message header
socketthe socket to read from
bytesReadnumber of bytes read by the method
Returns
stOK & suDone if the whole body has been processed stOK & suRetry if more data is needed stError on failure

Reimplemented from XrdCl::MsgHandler.

Definition at line 900 of file XrdClXRootDMsgHandler.cc.

903 {
904 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
905 uint16_t reqId = ntohs( req->header.requestid );
906
907 if( reqId == kXR_pgread )
908 return pPageReader->Read( *socket, bytesRead );
909
910 return pBodyReader->Read( *socket, bytesRead );
911 }

References ClientRequest::header, kXR_pgread, and ClientRequestHdr::requestid.

◆ SetChunkList()

void XrdCl::XRootDMsgHandler::SetChunkList ( ChunkList * chunkList)
inline

Set the chunk list.

Definition at line 389 of file XrdClXRootDMsgHandler.hh.

390 {
391 pChunkList = chunkList;
392 if( pBodyReader )
393 pBodyReader->SetChunkList( chunkList );
394 if( chunkList )
395 pChunkStatus.resize( chunkList->size() );
396 else
397 pChunkStatus.clear();
398 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

Here is the caller graph for this function:

◆ SetCrc32cDigests()

void XrdCl::XRootDMsgHandler::SetCrc32cDigests ( std::vector< uint32_t > && crc32cDigests)
inline

Definition at line 400 of file XrdClXRootDMsgHandler.hh.

401 {
402 pCrc32cDigests = std::move( crc32cDigests );
403 }

Referenced by XrdCl::MessageUtils::SendMessage().

Here is the caller graph for this function:

◆ SetExpiration()

void XrdCl::XRootDMsgHandler::SetExpiration ( time_t expiration)
inline

Set a timestamp after which we give up.

Definition at line 328 of file XrdClXRootDMsgHandler.hh.

329 {
330 pExpiration = expiration;
331 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

Here is the caller graph for this function:

◆ SetFollowMetalink()

void XrdCl::XRootDMsgHandler::SetFollowMetalink ( bool followMetalink)
inline

Definition at line 421 of file XrdClXRootDMsgHandler.hh.

422 {
423 pFollowMetalink = followMetalink;
424 }

Referenced by XrdCl::MessageUtils::RedirectMessage().

Here is the caller graph for this function:

◆ SetHostList()

void XrdCl::XRootDMsgHandler::SetHostList ( HostList * hostList)
inline

Set host list.

Definition at line 381 of file XrdClXRootDMsgHandler.hh.

382 {
383 pHosts.reset( hostList );
384 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

Here is the caller graph for this function:

◆ SetKernelBuffer()

void XrdCl::XRootDMsgHandler::SetKernelBuffer ( XrdSys::KernelBuffer * kbuff)
inline

Set the kernel buffer.

Definition at line 408 of file XrdClXRootDMsgHandler.hh.

409 {
410 pKBuff = kbuff;
411 }

Referenced by XrdCl::MessageUtils::SendMessage().

Here is the caller graph for this function:

◆ SetLoadBalancer()

void XrdCl::XRootDMsgHandler::SetLoadBalancer ( const HostInfo & loadBalancer)
inline

Set the load balancer.

Definition at line 370 of file XrdClXRootDMsgHandler.hh.

371 {
372 if( !loadBalancer.url.IsValid() )
373 return;
374 pLoadBalancer = loadBalancer;
375 pHasLoadBalancer = true;
376 }

References XrdCl::URL::IsValid(), and XrdCl::HostInfo::url.

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ SetOksofarAsAnswer()

void XrdCl::XRootDMsgHandler::SetOksofarAsAnswer ( bool oksofarAsAnswer)
inline

Treat the kXR_oksofar response as a valid answer to the message and notify the handler with the URL as a response

Definition at line 354 of file XrdClXRootDMsgHandler.hh.

355 {
356 pOksofarAsAnswer = oksofarAsAnswer;
357 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

Here is the caller graph for this function:

◆ SetRedirectAsAnswer()

void XrdCl::XRootDMsgHandler::SetRedirectAsAnswer ( bool redirectAsAnswer)
inline

Treat the kXR_redirect response as a valid answer to the message and notify the handler with the URL as a response

Definition at line 345 of file XrdClXRootDMsgHandler.hh.

346 {
347 pRedirectAsAnswer = redirectAsAnswer;
348 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

Here is the caller graph for this function:

◆ SetRedirectCounter()

void XrdCl::XRootDMsgHandler::SetRedirectCounter ( uint16_t redirectCounter)
inline

Set the redirect counter.

Definition at line 416 of file XrdClXRootDMsgHandler.hh.

417 {
418 pRedirectCounter = redirectCounter;
419 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

Here is the caller graph for this function:

◆ SetStateful()

void XrdCl::XRootDMsgHandler::SetStateful ( bool stateful)
inline

Definition at line 426 of file XrdClXRootDMsgHandler.hh.

427 {
428 pStateful = stateful;
429 }

Referenced by XrdCl::MessageUtils::SendMessage().

Here is the caller graph for this function:

◆ WaitDone()

void XrdCl::XRootDMsgHandler::WaitDone ( time_t now)

Called after the wait time for kXR_wait has elapsed

Parameters
nowcurrent timestamp

Definition at line 1165 of file XrdClXRootDMsgHandler.cc.

1166 {
1167 HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1168 }

References XrdCl::RedirectEntry::EntryWait.

◆ WriteMessageBody()

XRootDStatus XrdCl::XRootDMsgHandler::WriteMessageBody ( Socket * socket,
uint32_t & bytesWritten )
overridevirtual

Write message body directly to a socket - called if IsRaw returns true - only socket related errors may be returned here

Parameters
socketthe socket to read from
bytesWrittennumber of bytes written by the method
Returns
stOK & suDone if the whole body has been processed stOK & suRetry if more data needs to be written stError on failure

Reimplemented from XrdCl::MsgHandler.

Definition at line 1016 of file XrdClXRootDMsgHandler.cc.

1018 {
1019 //--------------------------------------------------------------------------
1020 // First check if it is a PgWrite
1021 //--------------------------------------------------------------------------
1022 if( !pChunkList->empty() && !pCrc32cDigests.empty() )
1023 {
1024 //------------------------------------------------------------------------
1025 // PgWrite will have just one chunk
1026 //------------------------------------------------------------------------
1027 ChunkInfo chunk = pChunkList->front();
1028 //------------------------------------------------------------------------
1029 // Calculate the size of the first and last page (in case the chunk is not
1030 // 4KB aligned)
1031 //------------------------------------------------------------------------
1032 int fLen = 0, lLen = 0;
1033 size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
1034
1035 //------------------------------------------------------------------------
1036 // Set the crc32c buffer if not ready yet
1037 //------------------------------------------------------------------------
1038 if( pPgWrtCksumBuff.GetCursor() == 0 )
1039 {
1040 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1041 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1042 }
1043
1044 uint32_t btsLeft = chunk.length - pAsyncOffset;
1045 uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
1046 if( pglen > btsLeft ) pglen = btsLeft;
1047 char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
1048
1049 while( btsLeft > 0 )
1050 {
1051 // first write the crc32c digest
1052 while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
1053 {
1054 uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
1055 char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
1056 int btswrt = 0;
1057 Status st = socket->Send( dgstbuf, dgstlen, btswrt );
1058 if( !st.IsOK() ) return st;
1059 bytesWritten += btswrt;
1060 pPgWrtCksumBuff.AdvanceCursor( btswrt );
1061 if( st.code == suRetry ) return st;
1062 }
1063 // then write the raw data (one page)
1064 int btswrt = 0;
1065 Status st = socket->Send( pgbuf, pglen, btswrt );
1066 if( !st.IsOK() ) return st;
1067 pgbuf += btswrt;
1068 pglen -= btswrt;
1069 btsLeft -= btswrt;
1070 bytesWritten += btswrt;
1071 pAsyncOffset += btswrt; // update the offset to the raw data
1072 if( st.code == suRetry ) return st;
1073 // if we managed to write all the data ...
1074 if( pglen == 0 )
1075 {
1076 // move to the next page
1077 ++pPgWrtCurrentPageNb;
1078 if( pPgWrtCurrentPageNb < nbpgs )
1079 {
1080 // set the digest buffer
1081 pPgWrtCksumBuff.SetCursor( 0 );
1082 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1083 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1084 }
1085 // set the page length
1086 pglen = XrdSys::PageSize;
1087 if( pglen > btsLeft ) pglen = btsLeft;
1088 // reset offset in the current page
1089 pPgWrtCurrentPageOffset = 0;
1090 }
1091 else
1092 // otherwise just adjust the offset in the current page
1093 pPgWrtCurrentPageOffset += btswrt;
1094
1095 }
1096 }
1097 else if( !pChunkList->empty() )
1098 {
1099 size_t size = pChunkList->size();
1100 for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1101 {
1102 char *buffer = (char*)(*pChunkList)[i].buffer;
1103 uint32_t size = (*pChunkList)[i].length;
1104 size_t leftToBeWritten = size - pAsyncOffset;
1105
1106 while( leftToBeWritten )
1107 {
1108 int btswrt = 0;
1109 Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1110 bytesWritten += btswrt;
1111 if( !st.IsOK() || st.code == suRetry ) return st;
1112 pAsyncOffset += btswrt;
1113 leftToBeWritten -= btswrt;
1114 }
1115 //----------------------------------------------------------------------
1116 // Remember that we have moved to the next chunk, also clear the offset
1117 // within the buffer as we are going to move to a new one
1118 //----------------------------------------------------------------------
1119 ++pAsyncChunkIndex;
1120 pAsyncOffset = 0;
1121 }
1122 }
1123 else
1124 {
1125 Log *log = DefaultEnv::GetLog();
1126
1127 //------------------------------------------------------------------------
1128 // If the socket is encrypted we cannot use a kernel buffer, we have to
1129 // convert to user space buffer
1130 //------------------------------------------------------------------------
1131 if( socket->IsEncrypted() )
1132 {
1133 log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1134 pUrl.GetHostId().c_str() );
1135
1136 char *ubuff = 0;
1137 ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1138 if( ret < 0 ) return Status( stError, errInternal );
1139 pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1140 return WriteMessageBody( socket, bytesWritten );
1141 }
1142
1143 //------------------------------------------------------------------------
1144 // Send the data
1145 //------------------------------------------------------------------------
1146 while( !pKBuff->Empty() )
1147 {
1148 int btswrt = 0;
1149 Status st = socket->Send( *pKBuff, btswrt );
1150 bytesWritten += btswrt;
1151 if( !st.IsOK() || st.code == suRetry ) return st;
1152 }
1153
1154 log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1155 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
1156 }
1157
1158 return Status();
1159 }
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten) override
const uint16_t suRetry
const uint16_t errInternal
Internal error.
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)

References XrdCl::ChunkInfo::buffer, XrdCl::Status::code, XrdOucPgrwUtils::csNum(), XrdCl::Log::Debug(), XrdCl::errInternal, XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::IsEncrypted(), XrdCl::Status::IsOK(), XrdCl::ChunkInfo::length, XrdSys::Move(), XrdCl::ChunkInfo::offset, XrdSys::PageSize, XrdCl::Socket::Send(), XrdCl::stError, XrdCl::suRetry, WriteMessageBody(), and XrdCl::XRootDMsg.

Referenced by WriteMessageBody().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ HandleRspJob

friend class HandleRspJob
friend

Definition at line 121 of file XrdClXRootDMsgHandler.hh.

References HandleRspJob.

Referenced by HandleRspJob.


The documentation for this class was generated from the following files: