XRootD
Loading...
Searching...
No Matches
XrdClXRootDMsgHandler.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
26#include "XrdCl/XrdClLog.hh"
30#include "XrdCl/XrdClMessage.hh"
31#include "XrdCl/XrdClURL.hh"
32#include "XrdCl/XrdClUtils.hh"
39#include "XrdCl/XrdClSocket.hh"
40#include "XrdCl/XrdClTls.hh"
42
43#include "XrdOuc/XrdOucCRC.hh"
45
46#include "XrdSys/XrdSysPlatform.hh" // same as above
49#include <memory>
50#include <sstream>
51#include <numeric>
52
53namespace
54{
55 //----------------------------------------------------------------------------
56 // We need an extra task what will run the handler in the future, because
57 // tasks get deleted and we need the handler
58 //----------------------------------------------------------------------------
59 class WaitTask: public XrdCl::Task
60 {
61 public:
62 WaitTask( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
63 {
64 std::ostringstream o;
65 o << "WaitTask for: 0x" << handler->GetRequest();
66 SetName( o.str() );
67 }
68
69 virtual time_t Run( time_t now )
70 {
71 pHandler->WaitDone( now );
72 return 0;
73 }
74 private:
75 XrdCl::XRootDMsgHandler *pHandler;
76 };
77}
78
79namespace XrdCl
80{
81 //----------------------------------------------------------------------------
82 // Delegate the response handling to the thread-pool
83 //----------------------------------------------------------------------------
85 {
86 public:
87 HandleRspJob( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
88 {
89
90 }
91
92 virtual ~HandleRspJob()
93 {
94
95 }
96
97 virtual void Run( void *arg )
98 {
99 pHandler->HandleResponse();
100 delete this;
101 }
102 private:
103 XrdCl::XRootDMsgHandler *pHandler;
104 };
105
106 //----------------------------------------------------------------------------
107 // Examine an incoming message, and decide on the action to be taken
108 //----------------------------------------------------------------------------
109 uint16_t XRootDMsgHandler::Examine( std::shared_ptr<Message> &msg )
110 {
111 const int sst = pSendingState.fetch_or( kSawResp );
112
113 if( !( sst & kSendDone ) && !( sst & kSawResp ) )
114 {
115 // we must have been sent although we haven't got the OnStatusReady
116 // notification yet. Set the inflight notice.
117
118 Log *log = DefaultEnv::GetLog();
119 log->Dump( XRootDMsg, "[%s] Message %s reply received before notification "
120 "that it was sent, assuming it was sent ok.",
121 pUrl.GetHostId().c_str(),
122 pRequest->GetObfuscatedDescription().c_str() );
123 }
124
125 //--------------------------------------------------------------------------
126 // if the MsgHandler is already being used to process another request
127 // (kXR_oksofar) we need to wait
128 //--------------------------------------------------------------------------
129 if( pOksofarAsAnswer )
130 {
131 XrdSysCondVarHelper lck( pCV );
132 while( pResponse ) pCV.Wait();
133 }
134 else
135 {
136 if( pResponse )
137 {
138 Log *log = DefaultEnv::GetLog();
139 log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
140 "it already owns a response: %p (message: %s ).",
141 pUrl.GetHostId().c_str(), (void*)this,
142 pRequest->GetObfuscatedDescription().c_str() );
143 }
144 }
145
146 if( msg->GetSize() < 8 )
147 return Ignore;
148
149 ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
150 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
151 uint16_t status = 0;
152 uint32_t dlen = 0;
153
154 //--------------------------------------------------------------------------
155 // We only care about async responses, but those are extracted now
156 // in the SocketHandler.
157 //--------------------------------------------------------------------------
158 if( rsp->hdr.status == kXR_attn )
159 {
160 return Ignore;
161 }
162 //--------------------------------------------------------------------------
163 // We got a sync message - check if it belongs to us
164 //--------------------------------------------------------------------------
165 else
166 {
167 if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
168 rsp->hdr.streamid[1] != req->header.streamid[1] )
169 return Ignore;
170
171 status = rsp->hdr.status;
172 dlen = rsp->hdr.dlen;
173 }
174
175 //--------------------------------------------------------------------------
176 // We take the ownership of the message and decide what we will do
177 // with the handler itself, the options are:
178 // 1) we want to either read in raw mode (the Raw flag) or have the message
179 // body reconstructed for us by the TransportHandler by the time
180 // Process() is called (default, no extra flag)
181 // 2) we either got a full response in which case we don't want to be
182 // notified about anything anymore (RemoveHandler) or we got a partial
183 // answer and we need to wait for more (default, no extra flag)
184 //--------------------------------------------------------------------------
185 pResponse = msg;
186 pBodyReader->SetDataLength( dlen );
187
188 Log *log = DefaultEnv::GetLog();
189 switch( status )
190 {
191 //------------------------------------------------------------------------
192 // Handle the cached cases
193 //------------------------------------------------------------------------
194 case kXR_error:
195 case kXR_redirect:
196 case kXR_wait:
197 return RemoveHandler;
198
199 case kXR_waitresp:
200 {
201 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
202 "message %s", pUrl.GetHostId().c_str(),
203 pRequest->GetObfuscatedDescription().c_str() );
204
205 pResponse.reset();
206 return Ignore; // This must be handled synchronously!
207 }
208
209 //------------------------------------------------------------------------
210 // Handle the potential raw cases
211 //------------------------------------------------------------------------
212 case kXR_ok:
213 {
214 //----------------------------------------------------------------------
215 // For kXR_read we read in raw mode
216 //----------------------------------------------------------------------
217 uint16_t reqId = ntohs( req->header.requestid );
218 if( reqId == kXR_read )
219 {
220 return Raw | RemoveHandler;
221 }
222
223 //----------------------------------------------------------------------
224 // kXR_readv is the same as kXR_read
225 //----------------------------------------------------------------------
226 if( reqId == kXR_readv )
227 {
228 return Raw | RemoveHandler;
229 }
230
231 //----------------------------------------------------------------------
232 // For everything else we just take what we got
233 //----------------------------------------------------------------------
234 return RemoveHandler;
235 }
236
237 //------------------------------------------------------------------------
238 // kXR_oksofars are special, they are not full responses, so we reset
239 // the response pointer to 0 and add the message to the partial list
240 //------------------------------------------------------------------------
241 case kXR_oksofar:
242 {
243 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
244 "%s", pUrl.GetHostId().c_str(),
245 pRequest->GetObfuscatedDescription().c_str() );
246
247 if( !pOksofarAsAnswer )
248 {
249 pPartialResps.emplace_back( std::move( pResponse ) );
250 }
251
252 //----------------------------------------------------------------------
253 // For kXR_read we either read in raw mode if the message has not
254 // been fully reconstructed already, if it has, we adjust
255 // the buffer offset to prepare for the next one
256 //----------------------------------------------------------------------
257 uint16_t reqId = ntohs( req->header.requestid );
258 if( reqId == kXR_read )
259 {
260 pTimeoutFence.store( true, std::memory_order_relaxed );
261 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
262 }
263
264 //----------------------------------------------------------------------
265 // kXR_readv is similar to read, except that the payload is different
266 //----------------------------------------------------------------------
267 if( reqId == kXR_readv )
268 {
269 pTimeoutFence.store( true, std::memory_order_relaxed );
270 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
271 }
272
273 return ( pOksofarAsAnswer ? None : NoProcess );
274 }
275
276 case kXR_status:
277 {
278 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
279 "%s", pUrl.GetHostId().c_str(),
280 pRequest->GetObfuscatedDescription().c_str() );
281
282 uint16_t reqId = ntohs( req->header.requestid );
283 if( reqId == kXR_pgwrite )
284 {
285 //--------------------------------------------------------------------
286 // In case of pgwrite by definition this wont be a partial response
287 // so we can already remove the handler from the in-queue
288 //--------------------------------------------------------------------
289 return RemoveHandler;
290 }
291
292 //----------------------------------------------------------------------
293 // Otherwise (pgread), first of all we need to read the body of the
294 // kXR_status response, we can handle the raw data (if any) only after
295 // we have the whole kXR_status body
296 //----------------------------------------------------------------------
297 pTimeoutFence.store( true, std::memory_order_relaxed );
298 return None;
299 }
300
301 //------------------------------------------------------------------------
302 // Default
303 //------------------------------------------------------------------------
304 default:
305 return RemoveHandler;
306 }
307 return RemoveHandler;
308 }
309
310 //----------------------------------------------------------------------------
311 // Reexamine the incoming message, and decide on the action to be taken
312 //----------------------------------------------------------------------------
314 {
315 if( !pResponse )
316 return 0;
317
318 Log *log = DefaultEnv::GetLog();
319 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
320
321 //--------------------------------------------------------------------------
322 // Additional action is only required for kXR_status
323 //--------------------------------------------------------------------------
324 if( rsp->hdr.status != kXR_status ) return 0;
325
326 //--------------------------------------------------------------------------
327 // Ignore malformed status response
328 //--------------------------------------------------------------------------
329 if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
330 {
331 log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
332 return Corrupted;
333 }
334
335 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
336 uint16_t reqId = ntohs( req->header.requestid );
337 //--------------------------------------------------------------------------
338 // Unmarshal the status body
339 //--------------------------------------------------------------------------
340 XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
341
342 if( !st.IsOK() && st.code == errDataError )
343 {
344 log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
345 st.GetErrorMessage().c_str() );
346 return Corrupted;
347 }
348
349 if( !st.IsOK() )
350 {
351 log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
352 pUrl.GetHostId().c_str() );
353 pStatus = st;
354 HandleRspOrQueue();
355 return Ignore;
356 }
357
358 //--------------------------------------------------------------------------
359 // Common handling for partial results
360 //--------------------------------------------------------------------------
361 ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
363 {
364 pPartialResps.push_back( std::move( pResponse ) );
365 }
366
367 //--------------------------------------------------------------------------
368 // Decide the actions that we need to take
369 //--------------------------------------------------------------------------
370 uint16_t action = 0;
371 if( reqId == kXR_pgread )
372 {
373 //----------------------------------------------------------------------
374 // The message contains only Status header and body but no raw data
375 //----------------------------------------------------------------------
376 if( !pPageReader )
377 pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
378 pPageReader->SetRsp( rspst );
379
380 action |= Raw;
381
383 action |= NoProcess;
384 else
385 action |= RemoveHandler;
386 }
387 else if( reqId == kXR_pgwrite )
388 {
389 // if data corruption has been detected on the server side we will
390 // send some additional data pointing to the pages that need to be
391 // retransmitted
392 if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
393 pResponse->GetCursor() )
394 action |= More;
395 }
396
397 return action;
398 }
399
400 //----------------------------------------------------------------------------
401 // Get handler sid
402 //----------------------------------------------------------------------------
404 {
405 ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
406 return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
407 }
408
409 //----------------------------------------------------------------------------
411 //----------------------------------------------------------------------------
413 {
414 Log *log = DefaultEnv::GetLog();
415
416 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
417
418 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
419
420 //--------------------------------------------------------------------------
421 // If it is a local file, it can be only a metalink redirector
422 //--------------------------------------------------------------------------
423 if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
424 pHosts->back().protocol = kXR_PROTOCOLVERSION;
425
426 //--------------------------------------------------------------------------
427 // We got an answer, check who we were talking to
428 //--------------------------------------------------------------------------
429 else
430 {
431 AnyObject qryResult;
432 int *qryResponse = nullptr;
433 pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
434 qryResult.Get( qryResponse );
435 if (qryResponse) {
436 pHosts->back().flags = *qryResponse;
437 delete qryResponse;
438 qryResponse = nullptr;
439 }
440 pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
441 qryResult.Get( qryResponse );
442 if (qryResponse) {
443 pHosts->back().protocol = *qryResponse;
444 delete qryResponse;
445 }
446 }
447
448 //--------------------------------------------------------------------------
449 // Process the message
450 //--------------------------------------------------------------------------
451 Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
452 if( !st.IsOK() )
453 {
454 pStatus = Status( stFatal, errInvalidMessage );
455 HandleResponse();
456 return;
457 }
458
459 //--------------------------------------------------------------------------
460 // we have an response for the message so it's not in fly anymore
461 //--------------------------------------------------------------------------
462 pSendingState.fetch_or( kInFlyDone );
463
464 //--------------------------------------------------------------------------
465 // Reset the aggregated wait (used to omit wait response in case of Metalink
466 // redirector)
467 //--------------------------------------------------------------------------
468 if( rsp->hdr.status != kXR_wait )
469 pAggregatedWaitTime = 0;
470
471 switch( rsp->hdr.status )
472 {
473 //------------------------------------------------------------------------
474 // kXR_ok - we're done here
475 //------------------------------------------------------------------------
476 case kXR_ok:
477 {
478 log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
479 pUrl.GetHostId().c_str(),
480 pRequest->GetObfuscatedDescription().c_str() );
481 pStatus = Status();
482 HandleResponse();
483 return;
484 }
485
486 case kXR_status:
487 {
488 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
489 pUrl.GetHostId().c_str(),
490 pRequest->GetObfuscatedDescription().c_str() );
491 pStatus = Status();
492 HandleResponse();
493 return;
494 }
495
496 //------------------------------------------------------------------------
497 // kXR_ok - we're serving partial result to the user
498 //------------------------------------------------------------------------
499 case kXR_oksofar:
500 {
501 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
502 pUrl.GetHostId().c_str(),
503 pRequest->GetObfuscatedDescription().c_str() );
504 pStatus = Status( stOK, suContinue );
505 HandleResponse();
506 return;
507 }
508
509 //------------------------------------------------------------------------
510 // kXR_error - we've got a problem
511 //------------------------------------------------------------------------
512 case kXR_error:
513 {
514 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
515 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
516 log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
517 "[%d] %s", pUrl.GetHostId().c_str(),
518 pRequest->GetObfuscatedDescription().c_str(), rsp->body.error.errnum,
519 errmsg );
520 delete [] errmsg;
521
522 HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
523 return;
524 }
525
526 //------------------------------------------------------------------------
527 // kXR_redirect - they tell us to go elsewhere
528 //------------------------------------------------------------------------
529 case kXR_redirect:
530 {
531 if( rsp->hdr.dlen <= 4 )
532 {
533 log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
534 pUrl.GetHostId().c_str() );
535 pStatus = Status( stError, errInvalidResponse );
536 HandleResponse();
537 return;
538 }
539
540 char *urlInfoBuff = new char[rsp->hdr.dlen-3];
541 urlInfoBuff[rsp->hdr.dlen-4] = 0;
542 memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
543 std::string urlInfo = urlInfoBuff;
544 delete [] urlInfoBuff;
545 log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
546 "message %s: %s, port %d", pUrl.GetHostId().c_str(),
547 pRequest->GetObfuscatedDescription().c_str(), urlInfo.c_str(),
548 rsp->body.redirect.port );
549
550 //----------------------------------------------------------------------
551 // Check if we can proceed
552 //----------------------------------------------------------------------
553 if( !pRedirectCounter )
554 {
555 log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
556 "message %s, the last known error is: %s",
557 pUrl.GetHostId().c_str(),
558 pRequest->GetObfuscatedDescription().c_str(),
559 pLastError.ToString().c_str() );
560
561
562 pStatus = Status( stFatal, errRedirectLimit );
563 HandleResponse();
564 return;
565 }
566 --pRedirectCounter;
567
568 //----------------------------------------------------------------------
569 // Keep the info about this server if we still need to find a load
570 // balancer
571 //----------------------------------------------------------------------
572 uint32_t flags = pHosts->back().flags;
573 if( !pHasLoadBalancer )
574 {
575 if( flags & kXR_isManager )
576 {
577 //------------------------------------------------------------------
578 // If the current server is a meta manager then it supersedes
579 // any existing load balancer, otherwise we assign a load-balancer
580 // only if it has not been already assigned
581 //------------------------------------------------------------------
582 if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
583 {
584 pLoadBalancer = pHosts->back();
585 log->Dump( XRootDMsg, "[%s] Current server has been assigned "
586 "as a load-balancer for message %s",
587 pUrl.GetHostId().c_str(),
588 pRequest->GetObfuscatedDescription().c_str() );
589 HostList::iterator it;
590 for( it = pHosts->begin(); it != pHosts->end(); ++it )
591 it->loadBalancer = false;
592 pHosts->back().loadBalancer = true;
593 }
594 }
595 }
596
597 //----------------------------------------------------------------------
598 // If the redirect comes from a data server safe the URL because
599 // in case of a failure we will use it as the effective data server URL
600 // for the tried CGI opaque info
601 //----------------------------------------------------------------------
602 if( flags & kXR_isServer )
603 pEffectiveDataServerUrl = new URL( pHosts->back().url );
604
605 //----------------------------------------------------------------------
606 // Build the URL and check it's validity
607 //----------------------------------------------------------------------
608 std::vector<std::string> urlComponents;
609 std::string newCgi;
610 Utils::splitString( urlComponents, urlInfo, "?" );
611
612 std::ostringstream o;
613
614 o << urlComponents[0];
615 if( rsp->body.redirect.port > 0 )
616 o << ":" << rsp->body.redirect.port << "/";
617 else if( rsp->body.redirect.port < 0 )
618 {
619 //--------------------------------------------------------------------
620 // check if the manager wants to enforce write recovery at himself
621 // (beware we are dealing here with negative flags)
622 //--------------------------------------------------------------------
623 if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
624 pHosts->back().flags |= kXR_recoverWrts;
625
626 //--------------------------------------------------------------------
627 // check if the manager wants to collapse the communication channel
628 // (the redirect host is to replace the current host)
629 //--------------------------------------------------------------------
630 if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
631 {
632 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
633 pPostMaster->CollapseRedirect( pUrl, url );
634 }
635
636 if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
637 {
638 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
639 if( Utils::CheckEC( pRequest, url ) )
640 pRedirectAsAnswer = true;
641 }
642 }
643
644 URL newUrl = URL( o.str() );
645 if( !newUrl.IsValid() )
646 {
648 log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
649 pUrl.GetHostId().c_str(), urlInfo.c_str() );
650 HandleResponse();
651 return;
652 }
653
654 if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
655 newUrl.SetUserName( pUrl.GetUserName() );
656
657 if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
658 newUrl.SetPassword( pUrl.GetPassword() );
659
660 //----------------------------------------------------------------------
661 // Forward any "xrd.*" params from the original client request also to
662 // the new redirection url
663 // Also, we need to preserve any "xrdcl.*' as they are important for
664 // our internal workflows.
665 //----------------------------------------------------------------------
666 std::ostringstream ossXrd;
667 const URL::ParamsMap &urlParams = pUrl.GetParams();
668
669 for(URL::ParamsMap::const_iterator it = urlParams.begin();
670 it != urlParams.end(); ++it )
671 {
672 if( it->first.compare( 0, 4, "xrd." ) &&
673 it->first.compare( 0, 6, "xrdcl." ) )
674 continue;
675
676 ossXrd << it->first << '=' << it->second << '&';
677 }
678
679 std::string xrdCgi = ossXrd.str();
680 pRedirectUrl = newUrl.GetURL();
681
682 URL cgiURL;
683 if( urlComponents.size() > 1 )
684 {
685 pRedirectUrl += "?";
686 pRedirectUrl += urlComponents[1];
687 std::ostringstream o;
688 o << "fake://fake:111//fake?";
689 o << urlComponents[1];
690
691 if( urlComponents.size() == 3 )
692 o << '?' << urlComponents[2];
693
694 if (!xrdCgi.empty())
695 {
696 o << '&' << xrdCgi;
697 pRedirectUrl += '&';
698 pRedirectUrl += xrdCgi;
699 }
700
701 cgiURL = URL( o.str() );
702 }
703 else {
704 if (!xrdCgi.empty())
705 {
706 std::ostringstream o;
707 o << "fake://fake:111//fake?";
708 o << xrdCgi;
709 cgiURL = URL( o.str() );
710 pRedirectUrl += '?';
711 pRedirectUrl += xrdCgi;
712 }
713 }
714
715 //----------------------------------------------------------------------
716 // Check if we need to return the URL as a response
717 //----------------------------------------------------------------------
718 if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
719 newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
720 !newUrl.IsLocalFile() )
721 pRedirectAsAnswer = true;
722
723 if( pRedirectAsAnswer )
724 {
725 pStatus = Status( stError, errRedirect );
726 HandleResponse();
727 return;
728 }
729
730 //----------------------------------------------------------------------
731 // Rewrite the message in a way required to send it to another server
732 //----------------------------------------------------------------------
733 newUrl.SetParams( cgiURL.GetParams() );
734 Status st = RewriteRequestRedirect( newUrl );
735 if( !st.IsOK() )
736 {
737 pStatus = st;
738 HandleResponse();
739 return;
740 }
741
742 //----------------------------------------------------------------------
743 // Make sure we don't change the protocol by accident (root vs roots)
744 //----------------------------------------------------------------------
745 if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
746 ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
747 newUrl.SetProtocol( "roots" );
748
749 //----------------------------------------------------------------------
750 // Send the request to the new location
751 //----------------------------------------------------------------------
752 HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
753 return;
754 }
755
756 //------------------------------------------------------------------------
757 // kXR_wait - we wait, and re-issue the request later
758 //------------------------------------------------------------------------
759 case kXR_wait:
760 {
761 uint32_t waitSeconds = 0;
762
763 if( rsp->hdr.dlen >= 4 )
764 {
765 char *infoMsg = new char[rsp->hdr.dlen-3];
766 infoMsg[rsp->hdr.dlen-4] = 0;
767 memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
768 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
769 "message %s: %s", pUrl.GetHostId().c_str(),
770 rsp->body.wait.seconds, pRequest->GetObfuscatedDescription().c_str(),
771 infoMsg );
772 delete [] infoMsg;
773 waitSeconds = rsp->body.wait.seconds;
774 }
775 else
776 {
777 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
778 "message %s", pUrl.GetHostId().c_str(),
779 pRequest->GetObfuscatedDescription().c_str() );
780 }
781
782 pAggregatedWaitTime += waitSeconds;
783
784 // We need a special case if the data node comes from metalink
785 // redirector. In this case it might make more sense to try the
786 // next entry in the Metalink than wait.
787 if( OmitWait( *pRequest, pLoadBalancer.url ) )
788 {
789 int maxWait = DefaultMaxMetalinkWait;
790 DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
791 if( pAggregatedWaitTime > maxWait )
792 {
793 UpdateTriedCGI();
794 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
795 return;
796 }
797 }
798
799 //----------------------------------------------------------------------
800 // Some messages require rewriting before they can be sent again
801 // after wait
802 //----------------------------------------------------------------------
803 Status st = RewriteRequestWait();
804 if( !st.IsOK() )
805 {
806 pStatus = st;
807 HandleResponse();
808 return;
809 }
810
811 //----------------------------------------------------------------------
812 // Register a task to resend the message in some seconds, if we still
813 // have time to do that, and report a timeout otherwise
814 //----------------------------------------------------------------------
815 time_t resendTime = ::time(0)+waitSeconds;
816
817 if( resendTime < pExpiration )
818 {
819 log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: %p (message: %s ).",
820 pUrl.GetHostId().c_str(), (void*)this,
821 pRequest->GetObfuscatedDescription().c_str() );
822
823 TaskManager *taskMgr = pPostMaster->GetTaskManager();
824 taskMgr->RegisterTask( new WaitTask( this ), resendTime );
825 }
826 else
827 {
828 log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
829 pUrl.GetHostId().c_str(),
830 pRequest->GetObfuscatedDescription().c_str() );
831 HandleError( Status( stError, errOperationExpired) );
832 }
833 return;
834 }
835
836 //------------------------------------------------------------------------
837 // kXR_waitresp - the response will be returned in some seconds as an
838 // unsolicited message. Currently all messages of this type are handled
839 // one step before in the XrdClStream::OnIncoming as they need to be
840 // processed synchronously.
841 //------------------------------------------------------------------------
842 case kXR_waitresp:
843 {
844 if( rsp->hdr.dlen < 4 )
845 {
846 log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
847 pUrl.GetHostId().c_str() );
848 pStatus = Status( stError, errInvalidResponse );
849 HandleResponse();
850 return;
851 }
852
853 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
854 "message %s", pUrl.GetHostId().c_str(),
855 rsp->body.waitresp.seconds,
856 pRequest->GetObfuscatedDescription().c_str() );
857 return;
858 }
859
860 //------------------------------------------------------------------------
861 // Default - unrecognized/unsupported response, declare an error
862 //------------------------------------------------------------------------
863 default:
864 {
865 log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
866 "message %s", pUrl.GetHostId().c_str(),
867 rsp->hdr.status, pRequest->GetObfuscatedDescription().c_str() );
868 pStatus = Status( stError, errInvalidResponse );
869 HandleResponse();
870 return;
871 }
872 }
873
874 return;
875 }
876
877 //----------------------------------------------------------------------------
878 // Handle an event other that a message arrival - may be timeout
879 //----------------------------------------------------------------------------
881 XRootDStatus status )
882 {
883 Log *log = DefaultEnv::GetLog();
884 log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
885 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
886
887 if( event == Ready )
888 return 0;
889
890 if( pTimeoutFence.load( std::memory_order_relaxed ) )
891 return 0;
892
893 HandleError( status );
894 return RemoveHandler;
895 }
896
897 //----------------------------------------------------------------------------
898 // Read message body directly from a socket
899 //----------------------------------------------------------------------------
901 Socket *socket,
902 uint32_t &bytesRead )
903 {
904 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
905 uint16_t reqId = ntohs( req->header.requestid );
906
907 if( reqId == kXR_pgread )
908 return pPageReader->Read( *socket, bytesRead );
909
910 return pBodyReader->Read( *socket, bytesRead );
911 }
912
913 //----------------------------------------------------------------------------
914 // We're here when we requested sending something over the wire
915 // or other status update on this action.
916 // We can be called when message is still in out-queue, with an
917 // error status indicating message will not be sent.
918 //----------------------------------------------------------------------------
920 XRootDStatus status )
921 {
922 Log *log = DefaultEnv::GetLog();
923
924 if( status.IsOK() )
925 {
926 log->Dump( XRootDMsg, "[%s] Got notification that outgoing message %s "
927 "was sent successfully.", pUrl.GetHostId().c_str(),
928 message->GetObfuscatedDescription().c_str() );
929 }
930
931 // After setting kSendDone processing of this object may continue in
932 // another thread. Unless we're in an error condition our object may
933 // be modified or even destroyed after this point.
934 const int sst = pSendingState.fetch_or( kSendDone );
935
936 // ignore if we're already in this state
937 if( status.IsOK() && ( sst & kSendDone ) ) return;
938
939 // if we have already seen a response we should be getting notified
940 // of a successful send. But if not, log and do our best to recover.
941 if( !status.IsOK() && ( ( sst & kFinalResp ) || ( sst & kSawResp ) ) )
942 {
943 log->Error( XRootDMsg, "[%s] Unexpected error for message %s. Trying to "
944 "recover.", pUrl.GetHostId().c_str(),
945 message->GetObfuscatedDescription().c_str() );
946 HandleError( status );
947 return;
948 }
949
950 if( sst & kFinalResp )
951 {
952 // late notification and we already have final response for user,
953 // need to queue handler callback.
954 HandleRspOrQueue();
955 return;
956 }
957
958 if( sst & kRetryAtSrv )
959 {
960 // late notification and we already received a response and know
961 // we need to retry at differnt server.
962 HandleError( RetryAtServer( pRetryAtUrl, pRetryAtEntryType ) );
963 return;
964 }
965
966 if( sst & kSawResp )
967 {
968 // late notification, response processing may be happening in another
969 // thread.
970 return;
971 }
972
973 //--------------------------------------------------------------------------
974 // We were successful, so we now need to listen for a response
975 //--------------------------------------------------------------------------
976 if( status.IsOK() )
977 {
978 // this is the expcted order, we got the notificaiton but no response
979 // received yet. However another thread is liable to be processing
980 // one or sending a final response and deleting us at any point now.
981 return;
982 }
983
984 //--------------------------------------------------------------------------
985 // We have failed, recover if possible
986 //--------------------------------------------------------------------------
987 log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
988 "recover.", pUrl.GetHostId().c_str(),
989 message->GetObfuscatedDescription().c_str() );
990 HandleError( status );
991 }
992
993 //----------------------------------------------------------------------------
994 // Are we a raw writer or not?
995 //----------------------------------------------------------------------------
997 {
998 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
999 uint16_t reqId = ntohs( req->header.requestid );
1000 if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
1001 return true;
1002 // checkpoint + execute
1003 if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
1004 {
1005 ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
1006 reqId = ntohs( xeq->header.requestid );
1007 return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
1008 }
1009
1010 return false;
1011 }
1012
1013 //----------------------------------------------------------------------------
1014 // Write the message body
1015 //----------------------------------------------------------------------------
1017 uint32_t &bytesWritten )
1018 {
1019 //--------------------------------------------------------------------------
1020 // First check if it is a PgWrite
1021 //--------------------------------------------------------------------------
1022 if( !pChunkList->empty() && !pCrc32cDigests.empty() )
1023 {
1024 //------------------------------------------------------------------------
1025 // PgWrite will have just one chunk
1026 //------------------------------------------------------------------------
1027 ChunkInfo chunk = pChunkList->front();
1028 //------------------------------------------------------------------------
1029 // Calculate the size of the first and last page (in case the chunk is not
1030 // 4KB aligned)
1031 //------------------------------------------------------------------------
1032 int fLen = 0, lLen = 0;
1033 size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
1034
1035 //------------------------------------------------------------------------
1036 // Set the crc32c buffer if not ready yet
1037 //------------------------------------------------------------------------
1038 if( pPgWrtCksumBuff.GetCursor() == 0 )
1039 {
1040 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1041 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1042 }
1043
1044 uint32_t btsLeft = chunk.length - pAsyncOffset;
1045 uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
1046 if( pglen > btsLeft ) pglen = btsLeft;
1047 char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
1048
1049 while( btsLeft > 0 )
1050 {
1051 // first write the crc32c digest
1052 while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
1053 {
1054 uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
1055 char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
1056 int btswrt = 0;
1057 Status st = socket->Send( dgstbuf, dgstlen, btswrt );
1058 if( !st.IsOK() ) return st;
1059 bytesWritten += btswrt;
1060 pPgWrtCksumBuff.AdvanceCursor( btswrt );
1061 if( st.code == suRetry ) return st;
1062 }
1063 // then write the raw data (one page)
1064 int btswrt = 0;
1065 Status st = socket->Send( pgbuf, pglen, btswrt );
1066 if( !st.IsOK() ) return st;
1067 pgbuf += btswrt;
1068 pglen -= btswrt;
1069 btsLeft -= btswrt;
1070 bytesWritten += btswrt;
1071 pAsyncOffset += btswrt; // update the offset to the raw data
1072 if( st.code == suRetry ) return st;
1073 // if we managed to write all the data ...
1074 if( pglen == 0 )
1075 {
1076 // move to the next page
1077 ++pPgWrtCurrentPageNb;
1078 if( pPgWrtCurrentPageNb < nbpgs )
1079 {
1080 // set the digest buffer
1081 pPgWrtCksumBuff.SetCursor( 0 );
1082 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1083 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1084 }
1085 // set the page length
1086 pglen = XrdSys::PageSize;
1087 if( pglen > btsLeft ) pglen = btsLeft;
1088 // reset offset in the current page
1089 pPgWrtCurrentPageOffset = 0;
1090 }
1091 else
1092 // otherwise just adjust the offset in the current page
1093 pPgWrtCurrentPageOffset += btswrt;
1094
1095 }
1096 }
1097 else if( !pChunkList->empty() )
1098 {
1099 size_t size = pChunkList->size();
1100 for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1101 {
1102 char *buffer = (char*)(*pChunkList)[i].buffer;
1103 uint32_t size = (*pChunkList)[i].length;
1104 size_t leftToBeWritten = size - pAsyncOffset;
1105
1106 while( leftToBeWritten )
1107 {
1108 int btswrt = 0;
1109 Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1110 bytesWritten += btswrt;
1111 if( !st.IsOK() || st.code == suRetry ) return st;
1112 pAsyncOffset += btswrt;
1113 leftToBeWritten -= btswrt;
1114 }
1115 //----------------------------------------------------------------------
1116 // Remember that we have moved to the next chunk, also clear the offset
1117 // within the buffer as we are going to move to a new one
1118 //----------------------------------------------------------------------
1119 ++pAsyncChunkIndex;
1120 pAsyncOffset = 0;
1121 }
1122 }
1123 else
1124 {
1125 Log *log = DefaultEnv::GetLog();
1126
1127 //------------------------------------------------------------------------
1128 // If the socket is encrypted we cannot use a kernel buffer, we have to
1129 // convert to user space buffer
1130 //------------------------------------------------------------------------
1131 if( socket->IsEncrypted() )
1132 {
1133 log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1134 pUrl.GetHostId().c_str() );
1135
1136 char *ubuff = 0;
1137 ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1138 if( ret < 0 ) return Status( stError, errInternal );
1139 pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1140 return WriteMessageBody( socket, bytesWritten );
1141 }
1142
1143 //------------------------------------------------------------------------
1144 // Send the data
1145 //------------------------------------------------------------------------
1146 while( !pKBuff->Empty() )
1147 {
1148 int btswrt = 0;
1149 Status st = socket->Send( *pKBuff, btswrt );
1150 bytesWritten += btswrt;
1151 if( !st.IsOK() || st.code == suRetry ) return st;
1152 }
1153
1154 log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1155 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
1156 }
1157
1158 return Status();
1159 }
1160
1161 //----------------------------------------------------------------------------
1162 // We're here when we got a time event. We needed to re-issue the request
1163 // in some time in the future, and that moment has arrived
1164 //----------------------------------------------------------------------------
1166 {
1167 HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1168 }
1169
1170 //----------------------------------------------------------------------------
1171 // Bookkeeping after partial response has been received.
1172 //----------------------------------------------------------------------------
1174 {
1175 pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1176 }
1177
1178 //----------------------------------------------------------------------------
1179 // Unpack the message and call the response handler
1180 //----------------------------------------------------------------------------
1181 void XRootDMsgHandler::HandleResponse()
1182 {
1183 //--------------------------------------------------------------------------
1184 // Is it a final response?
1185 //--------------------------------------------------------------------------
1186 bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
1187 if( finalrsp )
1188 {
1189 // Do not do final processing of the response if we haven't had
1190 // confirmation the original request was sent (via OnStatusReady).
1191 // The final processing will be triggered when we get the confirm.
1192 const int sst = pSendingState.fetch_or( kFinalResp );
1193 if( ( sst & kSawReadySend ) && !( sst & kSendDone ) )
1194 return;
1195 }
1196
1197 //--------------------------------------------------------------------------
1198 // Process the response and notify the listener
1199 //--------------------------------------------------------------------------
1201 XRootDStatus *status = ProcessStatus();
1202 AnyObject *response = 0;
1203
1204 Log *log = DefaultEnv::GetLog();
1205 log->Debug( ExDbgMsg, "[%s] Calling MsgHandler: %p (message: %s ) "
1206 "with status: %s.",
1207 pUrl.GetHostId().c_str(), (void*)this,
1208 pRequest->GetObfuscatedDescription().c_str(),
1209 status->ToString().c_str() );
1210
1211 if( status->IsOK() )
1212 {
1213 Status st = ParseResponse( response );
1214 if( !st.IsOK() )
1215 {
1216 delete status;
1217 delete response;
1218 status = new XRootDStatus( st );
1219 response = 0;
1220 }
1221 }
1222
1223 //--------------------------------------------------------------------------
1224 // Close the redirect entry if necessary
1225 //--------------------------------------------------------------------------
1226 if( pRdirEntry )
1227 {
1228 pRdirEntry->status = *status;
1229 pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
1230 }
1231
1232 //--------------------------------------------------------------------------
1233 // Release the stream id
1234 //--------------------------------------------------------------------------
1235 if( pSidMgr && finalrsp )
1236 {
1237 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1238 if( status->IsOK() || !IsInFly() ||
1239 !( status->code == errOperationExpired || status->code == errOperationInterrupted ) )
1240 pSidMgr->ReleaseSID( req->header.streamid );
1241 }
1242
1243 HostList *hosts = pHosts.release();
1244 if( !finalrsp )
1245 pHosts.reset( new HostList( *hosts ) );
1246
1247 pResponseHandler->HandleResponseWithHosts( status, response, hosts );
1248
1249 //--------------------------------------------------------------------------
1250 // if it is the final response there is nothing more to do ...
1251 //--------------------------------------------------------------------------
1252 if( finalrsp )
1253 delete this;
1254 //--------------------------------------------------------------------------
1255 // on the other hand if it is not the final response, we have to keep the
1256 // MsgHandler and delete the current response
1257 //--------------------------------------------------------------------------
1258 else
1259 {
1260 XrdSysCondVarHelper lck( pCV );
1261 pResponse.reset();
1262 pTimeoutFence.store( false, std::memory_order_relaxed );
1263 pCV.Broadcast();
1264 }
1265 }
1266
1267
1268 //----------------------------------------------------------------------------
1269 // Extract the status information from the stuff that we got
1270 //----------------------------------------------------------------------------
1271 XRootDStatus *XRootDMsgHandler::ProcessStatus()
1272 {
1273 XRootDStatus *st = new XRootDStatus( pStatus );
1274 ServerResponse *rsp = 0;
1275 if( pResponse )
1276 rsp = (ServerResponse *)pResponse->GetBuffer();
1277
1278 if( !pStatus.IsOK() && rsp )
1279 {
1280 if( pStatus.code == errErrorResponse )
1281 {
1282 st->errNo = rsp->body.error.errnum;
1283 // omit the last character as the string returned from the server
1284 // (acording to protocol specs) should be null-terminated
1285 std::string errmsg( rsp->body.error.errmsg, rsp->hdr.dlen-5 );
1286 if( st->errNo == kXR_noReplicas && !pLastError.IsOK() )
1287 errmsg += " Last seen error: " + pLastError.ToString();
1288 st->SetErrorMessage( errmsg );
1289 }
1290 else if( pStatus.code == errRedirect )
1291 st->SetErrorMessage( pRedirectUrl );
1292 }
1293 return st;
1294 }
1295
1296 //------------------------------------------------------------------------
1297 // Parse the response and put it in an object that could be passed to
1298 // the user
1299 //------------------------------------------------------------------------
1300 Status XRootDMsgHandler::ParseResponse( AnyObject *&response )
1301 {
1302 if( !pResponse )
1303 return Status();
1304
1305 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
1306 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1307 Log *log = DefaultEnv::GetLog();
1308
1309 //--------------------------------------------------------------------------
1310 // Handle redirect as an answer
1311 //--------------------------------------------------------------------------
1312 if( rsp->hdr.status == kXR_redirect )
1313 {
1314 log->Error( XRootDMsg, "Internal Error: unable to process redirect" );
1315 return 0;
1316 }
1317
1318 Buffer buff;
1319 uint32_t length = 0;
1320 char *buffer = 0;
1321
1322 //--------------------------------------------------------------------------
1323 // We don't have any partial answers so pass what we have
1324 //--------------------------------------------------------------------------
1325 if( pPartialResps.empty() )
1326 {
1327 buffer = rsp->body.buffer.data;
1328 length = rsp->hdr.dlen;
1329 }
1330 //--------------------------------------------------------------------------
1331 // Partial answers, we need to glue them together before parsing
1332 //--------------------------------------------------------------------------
1333 else if( req->header.requestid != kXR_read &&
1334 req->header.requestid != kXR_readv )
1335 {
1336 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1337 {
1338 ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1339 length += part->hdr.dlen;
1340 }
1341 length += rsp->hdr.dlen;
1342
1343 buff.Allocate( length );
1344 uint32_t offset = 0;
1345 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1346 {
1347 ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1348 buff.Append( part->body.buffer.data, part->hdr.dlen, offset );
1349 offset += part->hdr.dlen;
1350 }
1351 buff.Append( rsp->body.buffer.data, rsp->hdr.dlen, offset );
1352 buffer = buff.GetBuffer();
1353 }
1354
1355 //--------------------------------------------------------------------------
1356 // Right, but what was the question?
1357 //--------------------------------------------------------------------------
1358 switch( req->header.requestid )
1359 {
1360 //------------------------------------------------------------------------
1361 // kXR_mv, kXR_truncate, kXR_rm, kXR_mkdir, kXR_rmdir, kXR_chmod,
1362 // kXR_ping, kXR_close, kXR_write, kXR_sync
1363 //------------------------------------------------------------------------
1364 case kXR_mv:
1365 case kXR_truncate:
1366 case kXR_rm:
1367 case kXR_mkdir:
1368 case kXR_rmdir:
1369 case kXR_chmod:
1370 case kXR_ping:
1371 case kXR_close:
1372 case kXR_write:
1373 case kXR_writev:
1374 case kXR_sync:
1375 case kXR_chkpoint:
1376 return Status();
1377
1378 //------------------------------------------------------------------------
1379 // kXR_locate
1380 //------------------------------------------------------------------------
1381 case kXR_locate:
1382 {
1383 AnyObject *obj = new AnyObject();
1384
1385 char *nullBuffer = new char[length+1];
1386 nullBuffer[length] = 0;
1387 memcpy( nullBuffer, buffer, length );
1388
1389 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1390 "LocateInfo: %s", pUrl.GetHostId().c_str(),
1391 pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1392 LocationInfo *data = new LocationInfo();
1393
1394 if( data->ParseServerResponse( nullBuffer ) == false )
1395 {
1396 delete obj;
1397 delete data;
1398 delete [] nullBuffer;
1399 return Status( stError, errInvalidResponse );
1400 }
1401 delete [] nullBuffer;
1402
1403 obj->Set( data );
1404 response = obj;
1405 return Status();
1406 }
1407
1408 //------------------------------------------------------------------------
1409 // kXR_stat
1410 //------------------------------------------------------------------------
1411 case kXR_stat:
1412 {
1413 AnyObject *obj = new AnyObject();
1414
1415 //----------------------------------------------------------------------
1416 // Virtual File System stat (kXR_vfs)
1417 //----------------------------------------------------------------------
1418 if( req->stat.options & kXR_vfs )
1419 {
1420 StatInfoVFS *data = new StatInfoVFS();
1421
1422 char *nullBuffer = new char[length+1];
1423 nullBuffer[length] = 0;
1424 memcpy( nullBuffer, buffer, length );
1425
1426 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1427 "StatInfoVFS: %s", pUrl.GetHostId().c_str(),
1428 pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1429
1430 if( data->ParseServerResponse( nullBuffer ) == false )
1431 {
1432 delete obj;
1433 delete data;
1434 delete [] nullBuffer;
1435 return Status( stError, errInvalidResponse );
1436 }
1437 delete [] nullBuffer;
1438
1439 obj->Set( data );
1440 }
1441 //----------------------------------------------------------------------
1442 // Normal stat
1443 //----------------------------------------------------------------------
1444 else
1445 {
1446 StatInfo *data = new StatInfo();
1447
1448 char *nullBuffer = new char[length+1];
1449 nullBuffer[length] = 0;
1450 memcpy( nullBuffer, buffer, length );
1451
1452 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as StatInfo: "
1453 "%s", pUrl.GetHostId().c_str(),
1454 pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1455
1456 if( data->ParseServerResponse( nullBuffer ) == false )
1457 {
1458 delete obj;
1459 delete data;
1460 delete [] nullBuffer;
1461 return Status( stError, errInvalidResponse );
1462 }
1463 delete [] nullBuffer;
1464 obj->Set( data );
1465 }
1466
1467 response = obj;
1468 return Status();
1469 }
1470
1471 //------------------------------------------------------------------------
1472 // kXR_protocol
1473 //------------------------------------------------------------------------
1474 case kXR_protocol:
1475 {
1476 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ProtocolInfo",
1477 pUrl.GetHostId().c_str(),
1478 pRequest->GetObfuscatedDescription().c_str() );
1479
1480 if( rsp->hdr.dlen < 8 )
1481 {
1482 log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
1483 pUrl.GetHostId().c_str() );
1484 return Status( stError, errInvalidResponse );
1485 }
1486
1487 AnyObject *obj = new AnyObject();
1488 ProtocolInfo *data = new ProtocolInfo( rsp->body.protocol.pval,
1489 rsp->body.protocol.flags );
1490 obj->Set( data );
1491 response = obj;
1492 return Status();
1493 }
1494
1495 //------------------------------------------------------------------------
1496 // kXR_dirlist
1497 //------------------------------------------------------------------------
1498 case kXR_dirlist:
1499 {
1500 AnyObject *obj = new AnyObject();
1501 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1502 "DirectoryList", pUrl.GetHostId().c_str(),
1503 pRequest->GetObfuscatedDescription().c_str() );
1504
1505 char *path = new char[req->dirlist.dlen+1];
1506 path[req->dirlist.dlen] = 0;
1507 memcpy( path, pRequest->GetBuffer(24), req->dirlist.dlen );
1508
1509 DirectoryList *data = new DirectoryList();
1510 data->SetParentName( path );
1511 delete [] path;
1512
1513 char *nullBuffer = new char[length+1];
1514 nullBuffer[length] = 0;
1515 memcpy( nullBuffer, buffer, length );
1516
1517 bool invalidrsp = false;
1518
1519 if( !pDirListStarted )
1520 {
1521 pDirListWithStat = DirectoryList::HasStatInfo( nullBuffer );
1522 pDirListStarted = true;
1523
1524 invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer );
1525 }
1526 else
1527 invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer, pDirListWithStat );
1528
1529 if( invalidrsp )
1530 {
1531 delete data;
1532 delete obj;
1533 delete [] nullBuffer;
1534 return Status( stError, errInvalidResponse );
1535 }
1536
1537 delete [] nullBuffer;
1538 obj->Set( data );
1539 response = obj;
1540 return Status();
1541 }
1542
1543 //------------------------------------------------------------------------
1544 // kXR_open - if we got the statistics, otherwise return 0
1545 //------------------------------------------------------------------------
1546 case kXR_open:
1547 {
1548 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as OpenInfo",
1549 pUrl.GetHostId().c_str(),
1550 pRequest->GetObfuscatedDescription().c_str() );
1551
1552 if( rsp->hdr.dlen < 4 )
1553 {
1554 log->Error( XRootDMsg, "[%s] Got invalid open response.",
1555 pUrl.GetHostId().c_str() );
1556 return Status( stError, errInvalidResponse );
1557 }
1558
1559 AnyObject *obj = new AnyObject();
1560 StatInfo *statInfo = 0;
1561
1562 //----------------------------------------------------------------------
1563 // Handle StatInfo if requested
1564 //----------------------------------------------------------------------
1565 if( req->open.options & kXR_retstat )
1566 {
1567 log->Dump( XRootDMsg, "[%s] Parsing StatInfo in response to %s",
1568 pUrl.GetHostId().c_str(),
1569 pRequest->GetObfuscatedDescription().c_str() );
1570
1571 if( rsp->hdr.dlen >= 12 )
1572 {
1573 char *nullBuffer = new char[rsp->hdr.dlen-11];
1574 nullBuffer[rsp->hdr.dlen-12] = 0;
1575 memcpy( nullBuffer, buffer+12, rsp->hdr.dlen-12 );
1576
1577 statInfo = new StatInfo();
1578 if( statInfo->ParseServerResponse( nullBuffer ) == false )
1579 {
1580 delete statInfo;
1581 statInfo = 0;
1582 }
1583 delete [] nullBuffer;
1584 }
1585
1586 if( rsp->hdr.dlen < 12 || !statInfo )
1587 {
1588 log->Error( XRootDMsg, "[%s] Unable to parse StatInfo in response "
1589 "to %s", pUrl.GetHostId().c_str(),
1590 pRequest->GetObfuscatedDescription().c_str() );
1591 delete obj;
1592 return Status( stError, errInvalidResponse );
1593 }
1594 }
1595
1596 OpenInfo *data = new OpenInfo( (uint8_t*)buffer,
1597 pResponse->GetSessionId(),
1598 statInfo );
1599 obj->Set( data );
1600 response = obj;
1601 return Status();
1602 }
1603
1604 //------------------------------------------------------------------------
1605 // kXR_read
1606 //------------------------------------------------------------------------
1607 case kXR_read:
1608 {
1609 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ChunkInfo",
1610 pUrl.GetHostId().c_str(),
1611 pRequest->GetObfuscatedDescription().c_str() );
1612
1613 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1614 {
1615 //--------------------------------------------------------------------
1616 // we are expecting to have only the header in the message, the raw
1617 // data have been readout into the user buffer
1618 //--------------------------------------------------------------------
1619 if( pPartialResps[i]->GetSize() > 8 )
1620 return Status( stOK, errInternal );
1621 }
1622 //----------------------------------------------------------------------
1623 // we are expecting to have only the header in the message, the raw
1624 // data have been readout into the user buffer
1625 //----------------------------------------------------------------------
1626 if( pResponse->GetSize() > 8 )
1627 return Status( stOK, errInternal );
1628 //----------------------------------------------------------------------
1629 // Get the response for the end user
1630 //----------------------------------------------------------------------
1631 return pBodyReader->GetResponse( response );
1632 }
1633
1634 //------------------------------------------------------------------------
1635 // kXR_pgread
1636 //------------------------------------------------------------------------
1637 case kXR_pgread:
1638 {
1639 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as PageInfo",
1640 pUrl.GetHostId().c_str(),
1641 pRequest->GetObfuscatedDescription().c_str() );
1642
1643 //----------------------------------------------------------------------
1644 // Glue in the cached responses if necessary
1645 //----------------------------------------------------------------------
1646 ChunkInfo chunk = pChunkList->front();
1647 bool sizeMismatch = false;
1648 uint32_t currentOffset = 0;
1649 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1650 {
1651 ServerResponseV2 *part = (ServerResponseV2*)pPartialResps[i]->GetBuffer();
1652
1653 //--------------------------------------------------------------------
1654 // the actual size of the raw data without the crc32c checksums
1655 //--------------------------------------------------------------------
1656 size_t datalen = part->status.bdy.dlen - NbPgPerRsp( part->info.pgread.offset,
1657 part->status.bdy.dlen ) * CksumSize;
1658
1659 if( currentOffset + datalen > chunk.length )
1660 {
1661 sizeMismatch = true;
1662 break;
1663 }
1664
1665 currentOffset += datalen;
1666 }
1667
1668 ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
1669 size_t datalen = rspst->status.bdy.dlen - NbPgPerRsp( rspst->info.pgread.offset,
1670 rspst->status.bdy.dlen ) * CksumSize;
1671 if( currentOffset + datalen <= chunk.length )
1672 currentOffset += datalen;
1673 else
1674 sizeMismatch = true;
1675
1676 //----------------------------------------------------------------------
1677 // Overflow
1678 //----------------------------------------------------------------------
1679 if( pChunkStatus.front().sizeError || sizeMismatch )
1680 {
1681 log->Error( XRootDMsg, "[%s] Handling response to %s: user supplied "
1682 "buffer is too small for the received data.",
1683 pUrl.GetHostId().c_str(),
1684 pRequest->GetObfuscatedDescription().c_str() );
1685 return Status( stError, errInvalidResponse );
1686 }
1687
1688 AnyObject *obj = new AnyObject();
1689 PageInfo *pgInfo = new PageInfo( chunk.offset, currentOffset, chunk.buffer,
1690 std::move( pCrc32cDigests) );
1691
1692 obj->Set( pgInfo );
1693 response = obj;
1694 return Status();
1695 }
1696
1697 //------------------------------------------------------------------------
1698 // kXR_pgwrite
1699 //------------------------------------------------------------------------
1700 case kXR_pgwrite:
1701 {
1702 std::vector<std::tuple<uint64_t, uint32_t>> retries;
1703
1704 ServerResponseV2 *rsp = (ServerResponseV2*)pResponse->GetBuffer();
1705 if( rsp->status.bdy.dlen > 0 )
1706 {
1707 ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) );
1708 size_t pgcnt = ( rsp->status.bdy.dlen - 8 ) / sizeof( kXR_int64 );
1709 retries.reserve( pgcnt );
1710 kXR_int64 *pgoffs = (kXR_int64*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) +
1711 sizeof( ServerResponseBody_pgWrCSE ) );
1712
1713 for( size_t i = 0; i < pgcnt; ++i )
1714 {
1715 uint32_t len = XrdSys::PageSize;
1716 if( i == 0 ) len = cse->dlFirst;
1717 else if( i == pgcnt - 1 ) len = cse->dlLast;
1718 retries.push_back( std::make_tuple( pgoffs[i], len ) );
1719 }
1720 }
1721
1722 RetryInfo *info = new RetryInfo( std::move( retries ) );
1723 AnyObject *obj = new AnyObject();
1724 obj->Set( info );
1725 response = obj;
1726
1727 return Status();
1728 }
1729
1730
1731 //------------------------------------------------------------------------
1732 // kXR_readv - we need to pass the length of the buffer to the user code
1733 //------------------------------------------------------------------------
1734 case kXR_readv:
1735 {
1736 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1737 "VectorReadInfo", pUrl.GetHostId().c_str(),
1738 pRequest->GetObfuscatedDescription().c_str() );
1739
1740 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1741 {
1742 //--------------------------------------------------------------------
1743 // we are expecting to have only the header in the message, the raw
1744 // data have been readout into the user buffer
1745 //--------------------------------------------------------------------
1746 if( pPartialResps[i]->GetSize() > 8 )
1747 return Status( stOK, errInternal );
1748 }
1749 //----------------------------------------------------------------------
1750 // we are expecting to have only the header in the message, the raw
1751 // data have been readout into the user buffer
1752 //----------------------------------------------------------------------
1753 if( pResponse->GetSize() > 8 )
1754 return Status( stOK, errInternal );
1755 //----------------------------------------------------------------------
1756 // Get the response for the end user
1757 //----------------------------------------------------------------------
1758 return pBodyReader->GetResponse( response );
1759 }
1760
1761 //------------------------------------------------------------------------
1762 // kXR_fattr
1763 //------------------------------------------------------------------------
1764 case kXR_fattr:
1765 {
1766 int len = rsp->hdr.dlen;
1767 char* data = rsp->body.buffer.data;
1768
1769 return ParseXAttrResponse( data, len, response );
1770 }
1771
1772 //------------------------------------------------------------------------
1773 // kXR_query
1774 //------------------------------------------------------------------------
1775 case kXR_query:
1776 case kXR_set:
1777 case kXR_prepare:
1778 default:
1779 {
1780 AnyObject *obj = new AnyObject();
1781 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as BinaryData",
1782 pUrl.GetHostId().c_str(),
1783 pRequest->GetObfuscatedDescription().c_str() );
1784
1785 BinaryDataInfo *data = new BinaryDataInfo();
1786 data->Allocate( length );
1787 data->Append( buffer, length );
1788 obj->Set( data );
1789 response = obj;
1790 return Status();
1791 }
1792 };
1793 return Status( stError, errInvalidMessage );
1794 }
1795
1796 //------------------------------------------------------------------------
1797 // Parse the response to kXR_fattr request and put it in an object that
1798 // could be passed to the user
1799 //------------------------------------------------------------------------
1800 Status XRootDMsgHandler::ParseXAttrResponse( char *data, size_t len,
1801 AnyObject *&response )
1802 {
1803 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1804// Log *log = DefaultEnv::GetLog(); //TODO
1805
1806 switch( req->fattr.subcode )
1807 {
1808 case kXR_fattrDel:
1809 case kXR_fattrSet:
1810 {
1811 Status status;
1812
1813 kXR_char nerrs = 0;
1814 if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1815 return status;
1816
1817 kXR_char nattr = 0;
1818 if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1819 return status;
1820
1821 std::vector<XAttrStatus> resp;
1822 // read the namevec
1823 for( kXR_char i = 0; i < nattr; ++i )
1824 {
1825 kXR_unt16 rc = 0;
1826 if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1827 return status;
1828 rc = ntohs( rc );
1829
1830 // count errors
1831 if( rc ) --nerrs;
1832
1833 std::string name;
1834 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1835 return status;
1836
1837 XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1838 XRootDStatus();
1839 resp.push_back( XAttrStatus( name, st ) );
1840 }
1841
1842 // check if we read all the data and if the error count is OK
1843 if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1844
1845 // set up the response object
1846 response = new AnyObject();
1847 response->Set( new std::vector<XAttrStatus>( std::move( resp ) ) );
1848
1849 return Status();
1850 }
1851
1852 case kXR_fattrGet:
1853 {
1854 Status status;
1855
1856 kXR_char nerrs = 0;
1857 if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1858 return status;
1859
1860 kXR_char nattr = 0;
1861 if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1862 return status;
1863
1864 std::vector<XAttr> resp;
1865 resp.reserve( nattr );
1866
1867 // read the name vec
1868 for( kXR_char i = 0; i < nattr; ++i )
1869 {
1870 kXR_unt16 rc = 0;
1871 if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1872 return status;
1873 rc = ntohs( rc );
1874
1875 // count errors
1876 if( rc ) --nerrs;
1877
1878 std::string name;
1879 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1880 return status;
1881
1882 XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1883 XRootDStatus();
1884 resp.push_back( XAttr( name, st ) );
1885 }
1886
1887 // read the value vec
1888 for( kXR_char i = 0; i < nattr; ++i )
1889 {
1890 kXR_int32 vlen = 0;
1891 if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1892 return status;
1893 vlen = ntohl( vlen );
1894
1895 std::string value;
1896 if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1897 return status;
1898
1899 resp[i].value.swap( value );
1900 }
1901
1902 // check if we read all the data and if the error count is OK
1903 if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1904
1905 // set up the response object
1906 response = new AnyObject();
1907 response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1908
1909 return Status();
1910 }
1911
1912 case kXR_fattrList:
1913 {
1914 Status status;
1915 std::vector<XAttr> resp;
1916
1917 while( len > 0 )
1918 {
1919 std::string name;
1920 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1921 return status;
1922
1923 kXR_int32 vlen = 0;
1924 if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1925 return status;
1926 vlen = ntohl( vlen );
1927
1928 std::string value;
1929 if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1930 return status;
1931
1932 resp.push_back( XAttr( name, value ) );
1933 }
1934
1935 // set up the response object
1936 response = new AnyObject();
1937 response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1938
1939 return Status();
1940 }
1941
1942 default:
1943 return Status( stError, errDataError );
1944 }
1945 }
1946
1947 //----------------------------------------------------------------------------
1948 // Perform the changes to the original request needed by the redirect
1949 // procedure - allocate new streamid, append redirection data and such
1950 //----------------------------------------------------------------------------
1951 Status XRootDMsgHandler::RewriteRequestRedirect( const URL &newUrl )
1952 {
1953 Log *log = DefaultEnv::GetLog();
1954
1955 Status st;
1956 // Append any "xrd.*" parameters present in newCgi so that any authentication
1957 // requirements are properly enforced
1958 const URL::ParamsMap &newCgi = newUrl.GetParams();
1959 std::string xrdCgi = "";
1960 std::ostringstream ossXrd;
1961 for(URL::ParamsMap::const_iterator it = newCgi.begin(); it != newCgi.end(); ++it )
1962 {
1963 if( it->first.compare( 0, 4, "xrd." ) )
1964 continue;
1965 ossXrd << it->first << '=' << it->second << '&';
1966 }
1967
1968 xrdCgi = ossXrd.str();
1969 // Redirection URL containing also any original xrd.* opaque parameters
1970 XrdCl::URL authUrl;
1971
1972 if (xrdCgi.empty())
1973 {
1974 authUrl = newUrl;
1975 }
1976 else
1977 {
1978 std::string surl = newUrl.GetURL();
1979 (surl.find('?') == std::string::npos) ? (surl += '?') :
1980 ((*surl.rbegin() != '&') ? (surl += '&') : (surl += ""));
1981 surl += xrdCgi;
1982 if (!authUrl.FromString(surl))
1983 {
1984 std::string surlLog = surl;
1985 if( unlikely( log->GetLevel() >= Log::ErrorMsg ) ) {
1986 surlLog = obfuscateAuth(surlLog);
1987 }
1988 log->Error( XRootDMsg, "[%s] Failed to build redirection URL from data: %s",
1989 newUrl.GetHostId().c_str(), surl.c_str());
1990 return Status(stError, errInvalidRedirectURL);
1991 }
1992 }
1993
1994 //--------------------------------------------------------------------------
1995 // Rewrite particular requests
1996 //--------------------------------------------------------------------------
1998 MessageUtils::RewriteCGIAndPath( pRequest, newCgi, true, newUrl.GetPath() );
2000 return Status();
2001 }
2002
2003 //----------------------------------------------------------------------------
2004 // Some requests need to be rewritten also after getting kXR_wait
2005 //----------------------------------------------------------------------------
2006 Status XRootDMsgHandler::RewriteRequestWait()
2007 {
2008 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2009
2010 XRootDTransport::UnMarshallRequest( pRequest );
2011
2012 //------------------------------------------------------------------------
2013 // For kXR_locate and kXR_open request the kXR_refresh bit needs to be
2014 // turned off after wait
2015 //------------------------------------------------------------------------
2016 switch( req->header.requestid )
2017 {
2018 case kXR_locate:
2019 {
2020 uint16_t refresh = kXR_refresh;
2021 req->locate.options &= (~refresh);
2022 break;
2023 }
2024
2025 case kXR_open:
2026 {
2027 uint16_t refresh = kXR_refresh;
2028 req->locate.options &= (~refresh);
2029 break;
2030 }
2031 }
2032
2033 XRootDTransport::SetDescription( pRequest );
2034 XRootDTransport::MarshallRequest( pRequest );
2035 return Status();
2036 }
2037
2038 //----------------------------------------------------------------------------
2039 // Recover error
2040 //----------------------------------------------------------------------------
2041 void XRootDMsgHandler::HandleError( XRootDStatus status )
2042 {
2043 //--------------------------------------------------------------------------
2044 // If there was no error then do nothing
2045 //--------------------------------------------------------------------------
2046 if( status.IsOK() )
2047 return;
2048
2049 if( pSidMgr && IsInFly() && (
2050 status.code == errOperationExpired ||
2051 status.code == errOperationInterrupted ) )
2052 {
2053 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2054 pSidMgr->TimeOutSID( req->header.streamid );
2055 }
2056
2057 bool noreplicas = ( status.code == errErrorResponse &&
2058 status.errNo == kXR_noReplicas );
2059
2060 if( !noreplicas ) pLastError = status;
2061
2062 Log *log = DefaultEnv::GetLog();
2063 log->Debug( XRootDMsg, "[%s] Handling error while processing %s: %s.",
2064 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str(),
2065 status.ToString().c_str() );
2066
2067 //--------------------------------------------------------------------------
2068 // Check if it is a fatal TLS error that has been marked as potentially
2069 // recoverable, if yes check if we can downgrade from fatal to error.
2070 //--------------------------------------------------------------------------
2071 if( status.IsFatal() && status.code == errTlsError && status.errNo == EAGAIN )
2072 {
2073 if( pSslErrCnt < MaxSslErrRetry )
2074 {
2075 status.status &= ~stFatal; // switch off fatal&error bits
2076 status.status |= stError; // switch on error bit
2077 }
2078 ++pSslErrCnt; // count number of consecutive SSL errors
2079 }
2080 else
2081 pSslErrCnt = 0;
2082
2083 //--------------------------------------------------------------------------
2084 // We have got an error message, we can recover it at the load balancer if:
2085 // 1) we haven't got it from the load balancer
2086 // 2) we have a load balancer assigned
2087 // 3) the error is either one of: kXR_FSError, kXR_IOError, kXR_ServerError,
2088 // kXR_NotFound
2089 // 4) in the case of kXR_NotFound a kXR_refresh flags needs to be set
2090 //--------------------------------------------------------------------------
2091 if( status.code == errErrorResponse )
2092 {
2093 if( RetriableErrorResponse( status ) )
2094 {
2095 UpdateTriedCGI(status.errNo);
2096 if( status.errNo == kXR_NotFound || status.errNo == kXR_Overloaded )
2097 SwitchOnRefreshFlag();
2098 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2099 return;
2100 }
2101 else
2102 {
2103 pStatus = status;
2104 HandleRspOrQueue();
2105 return;
2106 }
2107 }
2108
2109 //--------------------------------------------------------------------------
2110 // Nothing can be done if:
2111 // 1) a user timeout has occurred
2112 // 2) has a non-zero session id
2113 // 3) if another error occurred and the validity of the message expired
2114 //--------------------------------------------------------------------------
2115 if( status.code == errOperationExpired || pRequest->GetSessionId() ||
2116 status.code == errOperationInterrupted || time(0) >= pExpiration )
2117 {
2118 log->Error( XRootDMsg, "[%s] Unable to get the response to request %s",
2119 pUrl.GetHostId().c_str(),
2120 pRequest->GetObfuscatedDescription().c_str() );
2121 pStatus = status;
2122 HandleRspOrQueue();
2123 return;
2124 }
2125
2126 //--------------------------------------------------------------------------
2127 // At this point we're left with connection errors, we recover them
2128 // at a load balancer if we have one and if not on the current server
2129 // until we get a response, an unrecoverable error or a timeout
2130 //--------------------------------------------------------------------------
2131 if( pLoadBalancer.url.IsValid() &&
2132 pLoadBalancer.url.GetLocation() != pUrl.GetLocation() )
2133 {
2134 UpdateTriedCGI( kXR_ServerError );
2135 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2136 return;
2137 }
2138 else
2139 {
2140 if( !status.IsFatal() && IsRetriable() )
2141 {
2142 log->Info( XRootDMsg, "[%s] Retrying request: %s.",
2143 pUrl.GetHostId().c_str(),
2144 pRequest->GetObfuscatedDescription().c_str() );
2145
2146 UpdateTriedCGI( kXR_ServerError );
2147 HandleError( RetryAtServer( pUrl, RedirectEntry::EntryRetry ) );
2148 return;
2149 }
2150 pStatus = status;
2151 HandleRspOrQueue();
2152 return;
2153 }
2154 }
2155
2156 //----------------------------------------------------------------------------
2157 // Retry the message at another server
2158 //----------------------------------------------------------------------------
2159 Status XRootDMsgHandler::RetryAtServer( const URL &url, RedirectEntry::Type entryType )
2160 {
2161 if( &pRetryAtUrl != &url ) pRetryAtUrl = url;
2162 pRetryAtEntryType = entryType;
2163 const int sst = pSendingState.fetch_or( kRetryAtSrv );
2164
2165 //--------------------------------------------------------------------------
2166 // wait for any delayed send notification now. The handler may be requeued
2167 // during this function.
2168 //--------------------------------------------------------------------------
2169 if( ( sst & kSawReadySend ) && !( sst & kSendDone ) ) return Status();
2170 pSendingState &= ~kRetryAtSrv;
2171
2172 pResponse.reset();
2173 Log *log = DefaultEnv::GetLog();
2174
2175 //--------------------------------------------------------------------------
2176 // Set up a redirect entry
2177 //--------------------------------------------------------------------------
2178 if( pRdirEntry ) pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
2179 pRdirEntry.reset( new RedirectEntry( pUrl.GetLocation(), url.GetLocation(), entryType ) );
2180
2181 if( pUrl.GetLocation() != url.GetLocation() )
2182 {
2183 pHosts->push_back( url );
2184
2185 //------------------------------------------------------------------------
2186 // Assign a new stream id to the message
2187 //------------------------------------------------------------------------
2188
2189 // first release the old stream id
2190 // (though it could be a redirect from a local
2191 // metalink file, in this case there's no SID)
2192 ClientRequestHdr *req = (ClientRequestHdr*)pRequest->GetBuffer();
2193 if( pSidMgr )
2194 {
2195 pSidMgr->ReleaseSID( req->streamid );
2196 pSidMgr.reset();
2197 }
2198
2199 // then get the new SIDManager
2200 // (again this could be a redirect to a local
2201 // file and in this case there is no SID)
2202 if( !url.IsLocalFile() )
2203 {
2204 pSidMgr = SIDMgrPool::Instance().GetSIDMgr( url );
2205 Status st = pSidMgr->AllocateSID( req->streamid );
2206 if( !st.IsOK() )
2207 {
2208 log->Error( XRootDMsg, "[%s] Impossible to send message %s.",
2209 pUrl.GetHostId().c_str(),
2210 pRequest->GetObfuscatedDescription().c_str() );
2211 return st;
2212 }
2213 }
2214
2215 pUrl = url;
2216 }
2217
2218 if( pUrl.IsMetalink() && pFollowMetalink )
2219 {
2220 log->Debug( ExDbgMsg, "[%s] Metaling redirection for MsgHandler: %p (message: %s ).",
2221 pUrl.GetHostId().c_str(), (void*)this,
2222 pRequest->GetObfuscatedDescription().c_str() );
2223
2224 return pPostMaster->Redirect( pUrl, pRequest, this );
2225 }
2226 else if( pUrl.IsLocalFile() )
2227 {
2228 HandleLocalRedirect( &pUrl );
2229 return Status();
2230 }
2231 else
2232 {
2233 log->Debug( ExDbgMsg, "[%s] Retry at server MsgHandler: %p (message: %s ).",
2234 pUrl.GetHostId().c_str(), (void*)this,
2235 pRequest->GetObfuscatedDescription().c_str() );
2236 return pPostMaster->Send( pUrl, pRequest, this, true, pExpiration );
2237 }
2238 }
2239
2240 //----------------------------------------------------------------------------
2241 // Update the "tried=" part of the CGI of the current message
2242 //----------------------------------------------------------------------------
2243 void XRootDMsgHandler::UpdateTriedCGI(uint32_t errNo)
2244 {
2245 URL::ParamsMap cgi;
2246 std::string tried;
2247
2248 //--------------------------------------------------------------------------
2249 // In case a data server responded with a kXR_redirect and we fail at the
2250 // node where we were redirected to, the original data server should be
2251 // included in the tried CGI opaque info (instead of the current one).
2252 //--------------------------------------------------------------------------
2253 if( pEffectiveDataServerUrl )
2254 {
2255 tried = pEffectiveDataServerUrl->GetHostName();
2256 delete pEffectiveDataServerUrl;
2257 pEffectiveDataServerUrl = 0;
2258 }
2259 //--------------------------------------------------------------------------
2260 // Otherwise use the current URL.
2261 //--------------------------------------------------------------------------
2262 else
2263 tried = pUrl.GetHostName();
2264
2265 // Report the reason for the failure to the next location
2266 //
2267 if (errNo)
2268 { if (errNo == kXR_NotFound) cgi["triedrc"] = "enoent";
2269 else if (errNo == kXR_IOError) cgi["triedrc"] = "ioerr";
2270 else if (errNo == kXR_FSError) cgi["triedrc"] = "fserr";
2271 else if (errNo == kXR_ServerError) cgi["triedrc"] = "srverr";
2272 }
2273
2274 //--------------------------------------------------------------------------
2275 // If our current load balancer is a metamanager and we failed either
2276 // at a diskserver or at an unidentified node we also exclude the last
2277 // known manager
2278 //--------------------------------------------------------------------------
2279 if( pLoadBalancer.url.IsValid() && (pLoadBalancer.flags & kXR_attrMeta) )
2280 {
2281 HostList::reverse_iterator it;
2282 for( it = pHosts->rbegin()+1; it != pHosts->rend(); ++it )
2283 {
2284 if( it->loadBalancer )
2285 break;
2286
2287 tried += "," + it->url.GetHostName();
2288
2289 if( it->flags & kXR_isManager )
2290 break;
2291 }
2292 }
2293
2294 cgi["tried"] = tried;
2295 XRootDTransport::UnMarshallRequest( pRequest );
2296 MessageUtils::RewriteCGIAndPath( pRequest, cgi, false, "" );
2297 XRootDTransport::MarshallRequest( pRequest );
2298 }
2299
2300 //----------------------------------------------------------------------------
2301 // Switch on the refresh flag for some requests
2302 //----------------------------------------------------------------------------
2303 void XRootDMsgHandler::SwitchOnRefreshFlag()
2304 {
2305 XRootDTransport::UnMarshallRequest( pRequest );
2306 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2307 switch( req->header.requestid )
2308 {
2309 case kXR_locate:
2310 {
2311 req->locate.options |= kXR_refresh;
2312 break;
2313 }
2314
2315 case kXR_open:
2316 {
2317 req->locate.options |= kXR_refresh;
2318 break;
2319 }
2320 }
2321 XRootDTransport::SetDescription( pRequest );
2322 XRootDTransport::MarshallRequest( pRequest );
2323 }
2324
2325 //------------------------------------------------------------------------
2326 // If the current thread is a worker thread from our thread-pool
2327 // handle the response, otherwise submit a new task to the thread-pool
2328 //------------------------------------------------------------------------
2329 void XRootDMsgHandler::HandleRspOrQueue()
2330 {
2331 //--------------------------------------------------------------------------
2332 // Is it a final response?
2333 //--------------------------------------------------------------------------
2334 bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
2335 if( finalrsp )
2336 {
2337 // Do not do final processing of the response if we haven't had
2338 // confirmation the original request was sent (via OnStatusReady).
2339 // The final processing will be triggered when we get the confirm.
2340 const int sst = pSendingState.fetch_or( kFinalResp );
2341 if( ( sst & kSawReadySend ) && !( sst & kSendDone ) )
2342 return;
2343 }
2344
2345 JobManager *jobMgr = pPostMaster->GetJobManager();
2346 if( jobMgr->IsWorker() )
2347 HandleResponse();
2348 else
2349 {
2350 Log *log = DefaultEnv::GetLog();
2351 log->Debug( ExDbgMsg, "[%s] Passing to the thread-pool MsgHandler: %p (message: %s ).",
2352 pUrl.GetHostId().c_str(), (void*)this,
2353 pRequest->GetObfuscatedDescription().c_str() );
2354 jobMgr->QueueJob( new HandleRspJob( this ), 0 );
2355 }
2356 }
2357
2358 //------------------------------------------------------------------------
2359 // Notify the FileStateHandler to retry Open() with new URL
2360 //------------------------------------------------------------------------
2361 void XRootDMsgHandler::HandleLocalRedirect( URL *url )
2362 {
2363 Log *log = DefaultEnv::GetLog();
2364 log->Debug( ExDbgMsg, "[%s] Handling local redirect - MsgHandler: %p (message: %s ).",
2365 pUrl.GetHostId().c_str(), (void*)this,
2366 pRequest->GetObfuscatedDescription().c_str() );
2367
2368 if( !pLFileHandler )
2369 {
2370 HandleError( XRootDStatus( stFatal, errNotSupported ) );
2371 return;
2372 }
2373
2374 AnyObject *resp = 0;
2375 pLFileHandler->SetHostList( *pHosts );
2376 XRootDStatus st = pLFileHandler->Open( url, pRequest, resp );
2377 if( !st.IsOK() )
2378 {
2379 HandleError( st );
2380 return;
2381 }
2382
2383 pResponseHandler->HandleResponseWithHosts( new XRootDStatus(),
2384 resp,
2385 pHosts.release() );
2386 delete this;
2387
2388 return;
2389 }
2390
2391 //------------------------------------------------------------------------
2392 // Check if it is OK to retry this request
2393 //------------------------------------------------------------------------
2394 bool XRootDMsgHandler::IsRetriable()
2395 {
2396 std::string value;
2397 DefaultEnv::GetEnv()->GetString( "OpenRecovery", value );
2398 if( value == "true" ) return true;
2399
2400 // check if it is a mutable open (open + truncate or open + create)
2401 ClientRequest *req = reinterpret_cast<ClientRequest*>( pRequest->GetBuffer() );
2402 if( req->header.requestid == htons( kXR_open ) )
2403 {
2404 bool _mutable = ( req->open.options & htons( kXR_delete ) ) ||
2405 ( req->open.options & htons( kXR_new ) );
2406
2407 if( _mutable )
2408 {
2409 Log *log = DefaultEnv::GetLog();
2410 log->Debug( XRootDMsg,
2411 "[%s] Not allowed to retry open request (OpenRecovery disabled): %s.",
2412 pUrl.GetHostId().c_str(),
2413 pRequest->GetObfuscatedDescription().c_str() );
2414 // disallow retry if it is a mutable open
2415 return false;
2416 }
2417 }
2418
2419 return true;
2420 }
2421
2422 //------------------------------------------------------------------------
2423 // Check if for given request and Metalink redirector it is OK to omit
2424 // the kXR_wait and proceed straight to the next entry in the Metalink file
2425 //------------------------------------------------------------------------
2426 bool XRootDMsgHandler::OmitWait( Message &request, const URL &url )
2427 {
2428 // we can omit kXR_wait only if we have a Metalink redirector
2429 if( !url.IsMetalink() )
2430 return false;
2431
2432 // we can omit kXR_wait only for requests that can be redirected
2433 // (kXR_read is the only stateful request that can be redirected)
2434 ClientRequest *req = reinterpret_cast<ClientRequest*>( request.GetBuffer() );
2435 if( pStateful && req->header.requestid != kXR_read )
2436 return false;
2437
2438 // we can only omit kXR_wait if the Metalink redirect has more
2439 // replicas
2440 RedirectorRegistry &registry = RedirectorRegistry::Instance();
2441 VirtualRedirector *redirector = registry.Get( url );
2442
2443 // we need more than one server as the current one is not reflected
2444 // in tried CGI
2445 if( redirector->Count( request ) > 1 )
2446 return true;
2447
2448 return false;
2449 }
2450
2451 //------------------------------------------------------------------------
2452 // Checks if the given error returned by server is retriable.
2453 //------------------------------------------------------------------------
2454 bool XRootDMsgHandler::RetriableErrorResponse( const Status &status )
2455 {
2456 // we can only retry error response if we have a valid load-balancer and
2457 // it is not our current URL
2458 if( !( pLoadBalancer.url.IsValid() &&
2459 pUrl.GetLocation() != pLoadBalancer.url.GetLocation() ) )
2460 return false;
2461
2462 // following errors are retriable at any load-balancer
2463 if( status.errNo == kXR_FSError || status.errNo == kXR_IOError ||
2464 status.errNo == kXR_ServerError || status.errNo == kXR_NotFound ||
2465 status.errNo == kXR_Overloaded || status.errNo == kXR_NoMemory )
2466 return true;
2467
2468 // check if the load-balancer is a meta-manager, if yes there are
2469 // more errors that can be recovered
2470 if( !( pLoadBalancer.flags & kXR_attrMeta ) ) return false;
2471
2472 // those errors are retriable for meta-managers
2473 if( status.errNo == kXR_Unsupported || status.errNo == kXR_FileLocked )
2474 return true;
2475
2476 // in case of not-authorized error there is an imposed upper limit
2477 // on how many times we can retry this error
2478 if( status.errNo == kXR_NotAuthorized )
2479 {
2481 DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", limit );
2482 bool ret = pNotAuthorizedCounter < limit;
2483 ++pNotAuthorizedCounter;
2484 if( !ret )
2485 {
2486 Log *log = DefaultEnv::GetLog();
2487 log->Error( XRootDMsg,
2488 "[%s] Reached limit of NotAuthorized retries!",
2489 pUrl.GetHostId().c_str() );
2490 }
2491 return ret;
2492 }
2493
2494 // check if the load-balancer is a virtual (metalink) redirector,
2495 // if yes there are even more errors that can be recovered
2496 if( !( pLoadBalancer.flags & kXR_attrVirtRdr ) ) return false;
2497
2498 // those errors are retriable for virtual (metalink) redirectors
2499 if( status.errNo == kXR_noserver || status.errNo == kXR_ArgTooLong )
2500 return true;
2501
2502 // otherwise it is a non-retriable error
2503 return false;
2504 }
2505
2506 //------------------------------------------------------------------------
2507 // Dump the redirect-trace-back into the log file
2508 //------------------------------------------------------------------------
2509 void XRootDMsgHandler::DumpRedirectTraceBack()
2510 {
2511 if( pRedirectTraceBack.empty() ) return;
2512
2513 std::stringstream sstrm;
2514
2515 sstrm << "Redirect trace-back:\n";
2516
2517 int counter = 0;
2518
2519 auto itr = pRedirectTraceBack.begin();
2520 sstrm << '\t' << counter << ". " << (*itr)->ToString() << '\n';
2521
2522 auto prev = itr;
2523 ++itr;
2524 ++counter;
2525
2526 for( ; itr != pRedirectTraceBack.end(); ++itr, ++prev, ++counter )
2527 sstrm << '\t' << counter << ". "
2528 << (*itr)->ToString( (*prev)->status.IsOK() ) << '\n';
2529
2530 int authlimit = DefaultNotAuthorizedRetryLimit;
2531 DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", authlimit );
2532
2533 bool warn = !pStatus.IsOK() &&
2534 ( pStatus.code == errNotFound ||
2535 pStatus.code == errRedirectLimit ||
2536 ( pStatus.code == errAuthFailed && pNotAuthorizedCounter >= authlimit ) );
2537
2538 Log *log = DefaultEnv::GetLog();
2539 if( warn )
2540 log->Warning( XRootDMsg, "%s", sstrm.str().c_str() );
2541 else
2542 log->Debug( XRootDMsg, "%s", sstrm.str().c_str() );
2543 }
2544
2545 // Read data from buffer
2546 //------------------------------------------------------------------------
2547 template<typename T>
2548 Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, T& result )
2549 {
2550 if( sizeof( T ) > buflen ) return Status( stError, errDataError );
2551
2552 memcpy(&result, buffer, sizeof(T));
2553
2554 buffer += sizeof( T );
2555 buflen -= sizeof( T );
2556
2557 return Status();
2558 }
2559
2560 //------------------------------------------------------------------------
2561 // Read a string from buffer
2562 //------------------------------------------------------------------------
2563 Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, std::string &result )
2564 {
2565 Status status;
2566 char c = 0;
2567
2568 while( true )
2569 {
2570 if( !( status = ReadFromBuffer( buffer, buflen, c ) ).IsOK() )
2571 return status;
2572
2573 if( c == 0 ) break;
2574 result += c;
2575 }
2576
2577 return status;
2578 }
2579
2580 //------------------------------------------------------------------------
2581 // Read a string from buffer
2582 //------------------------------------------------------------------------
2583 Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen,
2584 size_t size, std::string &result )
2585 {
2586 Status status;
2587
2588 if( size > buflen ) return Status( stError, errDataError );
2589
2590 result.append( buffer, size );
2591 buffer += size;
2592 buflen -= size;
2593
2594 return status;
2595 }
2596
2597}
@ kXR_NotAuthorized
@ kXR_NotFound
@ kXR_FileLocked
@ kXR_noReplicas
@ kXR_Unsupported
@ kXR_ServerError
@ kXR_Overloaded
@ kXR_ArgTooLong
@ kXR_noserver
@ kXR_IOError
@ kXR_FSError
@ kXR_NoMemory
#define kXR_isManager
@ kXR_fattrDel
Definition XProtocol.hh:300
@ kXR_fattrSet
Definition XProtocol.hh:303
@ kXR_fattrList
Definition XProtocol.hh:302
@ kXR_fattrGet
Definition XProtocol.hh:301
struct ClientFattrRequest fattr
Definition XProtocol.hh:896
#define kXR_collapseRedir
ServerResponseStatus status
#define kXR_attrMeta
union ServerResponse::@040373375333017131300127053271011057331004327334 body
kXR_char streamid[2]
Definition XProtocol.hh:158
kXR_char streamid[2]
Definition XProtocol.hh:956
kXR_unt16 options
Definition XProtocol.hh:513
struct ClientDirlistRequest dirlist
Definition XProtocol.hh:894
static const int kXR_ckpXeq
Definition XProtocol.hh:218
@ kXR_delete
Definition XProtocol.hh:483
@ kXR_refresh
Definition XProtocol.hh:489
@ kXR_new
Definition XProtocol.hh:485
@ kXR_retstat
Definition XProtocol.hh:493
struct ClientOpenRequest open
Definition XProtocol.hh:902
@ kXR_waitresp
Definition XProtocol.hh:948
@ kXR_redirect
Definition XProtocol.hh:946
@ kXR_oksofar
Definition XProtocol.hh:942
@ kXR_status
Definition XProtocol.hh:949
@ kXR_ok
Definition XProtocol.hh:941
@ kXR_attn
Definition XProtocol.hh:943
@ kXR_wait
Definition XProtocol.hh:947
@ kXR_error
Definition XProtocol.hh:945
struct ServerResponseBody_Status bdy
struct ClientRequestHdr header
Definition XProtocol.hh:887
#define kXR_recoverWrts
union ServerResponseV2::@207342300141235315373173036347114307032363217365 info
kXR_unt16 requestid
Definition XProtocol.hh:159
@ kXR_read
Definition XProtocol.hh:126
@ kXR_open
Definition XProtocol.hh:123
@ kXR_writev
Definition XProtocol.hh:144
@ kXR_readv
Definition XProtocol.hh:138
@ kXR_mkdir
Definition XProtocol.hh:121
@ kXR_sync
Definition XProtocol.hh:129
@ kXR_chmod
Definition XProtocol.hh:115
@ kXR_dirlist
Definition XProtocol.hh:117
@ kXR_fattr
Definition XProtocol.hh:133
@ kXR_rm
Definition XProtocol.hh:127
@ kXR_query
Definition XProtocol.hh:114
@ kXR_write
Definition XProtocol.hh:132
@ kXR_set
Definition XProtocol.hh:131
@ kXR_rmdir
Definition XProtocol.hh:128
@ kXR_truncate
Definition XProtocol.hh:141
@ kXR_protocol
Definition XProtocol.hh:119
@ kXR_mv
Definition XProtocol.hh:122
@ kXR_ping
Definition XProtocol.hh:124
@ kXR_stat
Definition XProtocol.hh:130
@ kXR_pgread
Definition XProtocol.hh:143
@ kXR_chkpoint
Definition XProtocol.hh:125
@ kXR_locate
Definition XProtocol.hh:140
@ kXR_close
Definition XProtocol.hh:116
@ kXR_pgwrite
Definition XProtocol.hh:139
@ kXR_prepare
Definition XProtocol.hh:134
#define kXR_isServer
#define kXR_attrVirtRdr
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:890
struct ServerResponseHeader hdr
#define kXR_PROTOCOLVERSION
Definition XProtocol.hh:70
@ kXR_vfs
Definition XProtocol.hh:799
struct ClientStatRequest stat
Definition XProtocol.hh:915
struct ClientProtocolRequest protocol
Definition XProtocol.hh:907
#define kXR_ecRedir
struct ClientLocateRequest locate
Definition XProtocol.hh:898
ServerResponseHeader hdr
long long kXR_int64
Definition XPtypes.hh:98
int kXR_int32
Definition XPtypes.hh:89
unsigned short kXR_unt16
Definition XPtypes.hh:67
unsigned char kXR_char
Definition XPtypes.hh:65
void Get(Type &object)
Retrieve the object being held.
Object for reading out data from the PgRead response.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
static bool HasStatInfo(const char *data)
Returns true if data contain stat info.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
virtual void Run(void *arg)
The job logic.
HandleRspJob(XrdCl::XRootDMsgHandler *handler)
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void RewriteCGIAndPath(Message *msg, const URL::ParamsMap &newCgi, bool replace, const std::string &newPath)
Append cgi to the one already present in the message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
StreamEvent
Events that may have occurred to the stream.
@ Ready
The stream has become connected.
A network socket.
virtual XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
URL representation.
Definition XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port).
Definition XrdClURL.hh:99
const std::string & GetPassword() const
Get the password.
Definition XrdClURL.hh:153
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
bool FromString(const std::string &url)
Parse a string and fill the URL fields.
Definition XrdClURL.cc:62
void SetPassword(const std::string &password)
Set the password.
Definition XrdClURL.hh:161
void SetParams(const std::string &params)
Set params.
Definition XrdClURL.cc:402
const std::string & GetUserName() const
Get the username.
Definition XrdClURL.hh:135
std::string GetURL() const
Get the URL.
Definition XrdClURL.hh:86
bool IsLocalFile() const
Definition XrdClURL.cc:474
void SetProtocol(const std::string &protocol)
Set protocol.
Definition XrdClURL.hh:126
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:244
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:118
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:452
void SetUserName(const std::string &userName)
Set the username.
Definition XrdClURL.hh:143
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
Handle/Process/Forward XRootD messages.
virtual uint16_t InspectStatusRsp() override
virtual void OnStatusReady(const Message *message, XRootDStatus status) override
The requested action has been performed and the status is available.
const Message * GetRequest() const
Get the request pointer.
virtual uint16_t Examine(std::shared_ptr< Message > &msg) override
virtual void Process() override
Process the message if it was "taken" by the examine action.
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead) override
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten) override
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status) override
virtual uint16_t GetSid() const override
virtual bool IsRaw() const override
Are we a raw writer or not?
const std::string & GetErrorMessage() const
Get error message.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint16_t suRetry
const uint16_t errRedirectLimit
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errNotFound
const uint64_t XRootDMsg
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint64_t ExDbgMsg
const uint16_t errInvalidResponse
const uint16_t errInvalidRedirectURL
Buffer BinaryDataInfo
Binary buffer.
const uint16_t errOperationInterrupted
const uint16_t suContinue
const int DefaultNotAuthorizedRetryLimit
const uint16_t errRedirect
const uint16_t errAuthFailed
const uint16_t errInvalidMessage
XrdSysError Log
Definition XrdConfig.cc:113
@ kXR_PartialResult
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version