XRootD
Loading...
Searching...
No Matches
XrdCl::PostMaster Class Reference

A hub for dispatching and receiving messages. More...

#include <XrdClPostMaster.hh>

Collaboration diagram for XrdCl::PostMaster:

Public Member Functions

 PostMaster ()
 Constructor.
virtual ~PostMaster ()
 Destructor.
void CollapseRedirect (const URL &oldurl, const URL &newURL)
 Collapse channel URL - replace the URL of the channel.
void DecFileInstCnt (const URL &url)
 Decrement file object instance count bound to this channel.
bool Finalize ()
 Finalizer.
Status ForceDisconnect (const URL &url)
 Shut down a channel.
Status ForceDisconnect (const URL &url, bool hush)
 Shut down a channel.
Status ForceDisconnect (std::shared_ptr< Channel > channel, const uint64_t sess)
 Shut down a channel. This version is used by the channel itself.
Status ForceReconnect (const URL &url)
 Reconnect the channel.
JobManagerGetJobManager ()
 Get the job manager object user by the post master.
TaskManagerGetTaskManager ()
 Get the task manager object user by the post master.
bool Initialize ()
 Initializer.
bool IsRunning ()
uint16_t NbConnectedStrm (const URL &url)
 Get the number of connected data streams.
void NotifyConnectHandler (const URL &url)
 Notify the global on-connect handler.
void NotifyConnErrHandler (const URL &url, const XRootDStatus &status)
 Notify the global error connection handler.
Status QueryTransport (const URL &url, uint16_t query, AnyObject &result)
Status Redirect (const URL &url, Message *msg, MsgHandler *handler)
Status RegisterEventHandler (const URL &url, ChannelEventHandler *handler)
 Register channel event handler.
bool Reinitialize ()
 Reinitialize after fork.
Status RemoveEventHandler (const URL &url, ChannelEventHandler *handler)
 Remove a channel event handler.
XRootDStatus Send (const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
void SetConnectionErrorHandler (std::function< void(const URL &, const XRootDStatus &)> handler)
 Set the global on-error on-connect handler for control streams.
void SetOnConnectHandler (std::unique_ptr< Job > onConnJob)
 Set the global connection error handler.
void SetOnDataConnectHandler (const URL &url, std::shared_ptr< Job > onConnJob)
 Set the on-connect handler for data streams.
bool Start ()
 Start the post master.
bool Stop ()
 Stop the postmaster.

Detailed Description

A hub for dispatching and receiving messages.

Definition at line 47 of file XrdClPostMaster.hh.

Constructor & Destructor Documentation

◆ PostMaster()

XrdCl::PostMaster::PostMaster ( )

Constructor.

Definition at line 127 of file XrdClPostMaster.cc.

127 : pImpl( new PostMasterImpl() )
128 {
129 }

◆ ~PostMaster()

XrdCl::PostMaster::~PostMaster ( )
virtual

Destructor.

Definition at line 134 of file XrdClPostMaster.cc.

135 {
136 }

Member Function Documentation

◆ CollapseRedirect()

void XrdCl::PostMaster::CollapseRedirect ( const URL & oldurl,
const URL & newURL )

Collapse channel URL - replace the URL of the channel.

Definition at line 478 of file XrdClPostMaster.cc.

479 {
480 XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
481 //--------------------------------------------------------------------------
482 // Get the passive channel
483 //--------------------------------------------------------------------------
484 std::shared_ptr<Channel> passive;
485 PostMasterImpl::ChannelMap::iterator it =
486 pImpl->pChannelMap.find( alias.GetChannelId() );
487 if( it != pImpl->pChannelMap.end() )
488 passive = it->second;
489
490 //--------------------------------------------------------------------------
491 // If the channel does not exist there's nothing to do
492 //--------------------------------------------------------------------------
493 if( !passive ) return;
494
495 //--------------------------------------------------------------------------
496 // Check if this URL is eligible for collapsing. To avoid depencencies
497 // we don't call CanCollapse while holding the channel map mutex. So we
498 // reverify the content of the map afterwards.
499 //--------------------------------------------------------------------------
500 scopedLock.UnLock();
501 if( !passive->CanCollapse( url ) ) return;
502
503 scopedLock.Lock( &pImpl->pChannelMapMutex );
504 it = pImpl->pChannelMap.find( alias.GetChannelId() );
505 if( it == pImpl->pChannelMap.end() || it->second != passive )
506 {
507 // something changed. Retry.
508 scopedLock.UnLock();
509 CollapseRedirect( alias, url );
510 return;
511 }
512
513 //--------------------------------------------------------------------------
514 // Create the active channel
515 //--------------------------------------------------------------------------
516 TransportManager *trManager = DefaultEnv::GetTransportManager();
517 TransportHandler *trHandler = trManager->GetHandler( url.GetProtocol() );
518
519 if( !trHandler )
520 {
521 Log *log = DefaultEnv::GetLog();
522 log->Error( PostMasterMsg, "Unable to get transport handler for %s "
523 "protocol", url.GetProtocol().c_str() );
524 return;
525 }
526
527 Log *log = DefaultEnv::GetLog();
528 log->Info( PostMasterMsg, "Label channel %s with alias %s.",
529 url.GetHostId().c_str(), alias.GetHostId().c_str() );
530
531 std::shared_ptr<Channel> active(new Channel{ alias,
532 pImpl->pPoller, trHandler, pImpl->pTaskManager, pImpl->pJobManager, url },
533 [this](Channel *ch) { this->pImpl->removeFinalize( ch ); delete ch; });
534 pImpl->addFinalize( active.get() );
535 active->SetSelf( active );
536
537 pImpl->pChannelMap[alias.GetChannelId()] = active;
538 //--------------------------------------------------------------------------
539 // The passive channel will be deallocated by TTL
540 //--------------------------------------------------------------------------
541 }
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
const uint64_t PostMasterMsg
XrdSysError Log
Definition XrdConfig.cc:113

References CollapseRedirect(), XrdCl::Log::Error(), XrdCl::URL::GetChannelId(), XrdCl::TransportManager::GetHandler(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::URL::GetProtocol(), XrdCl::DefaultEnv::GetTransportManager(), XrdCl::Log::Info(), XrdSysMutexHelper::Lock(), XrdCl::PostMasterMsg, and XrdSysMutexHelper::UnLock().

Referenced by CollapseRedirect().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ DecFileInstCnt()

void XrdCl::PostMaster::DecFileInstCnt ( const URL & url)

Decrement file object instance count bound to this channel.

Definition at line 546 of file XrdClPostMaster.cc.

547 {
548 auto channel = pImpl->GetChannel( url );
549
550 if( !channel ) return;
551
552 return channel->DecFileInstCnt();
553 }

Referenced by XrdCl::FileStateHandler::~FileStateHandler().

Here is the caller graph for this function:

◆ Finalize()

bool XrdCl::PostMaster::Finalize ( )

Finalizer.

Definition at line 168 of file XrdClPostMaster.cc.

169 {
170 //--------------------------------------------------------------------------
171 // Clean up the channels
172 //--------------------------------------------------------------------------
173 if( !pImpl->pInitialized )
174 return true;
175
176 pImpl->pInitialized = false;
177 pImpl->pJobManager->Finalize();
178
179 //--------------------------------------------------------------------------
180 // Finalize may cause some of the channels to remove themselves from the
181 // finalize set. So make a copy. Should be no concurrency as poller and
182 // jobmanager are stopped, so no lock.
183 //--------------------------------------------------------------------------
184 auto finSet = pImpl->pFinalizeSet;
185 for( auto ch: finSet ) ch->Finalize();
186
187 pImpl->pChannelMap.clear();
188 return pImpl->pPoller->Finalize();
189 }

Referenced by XrdCl::DefaultEnv::GetPostMaster().

Here is the caller graph for this function:

◆ ForceDisconnect() [1/3]

Status XrdCl::PostMaster::ForceDisconnect ( const URL & url)

Shut down a channel.

Definition at line 345 of file XrdClPostMaster.cc.

346 {
347 return ForceDisconnect(url, false);
348 }
Status ForceDisconnect(const URL &url)
Shut down a channel.

References ForceDisconnect().

Referenced by ForceDisconnect(), and XrdCl::Stream::OnReadTimeout().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ ForceDisconnect() [2/3]

Status XrdCl::PostMaster::ForceDisconnect ( const URL & url,
bool hush )

Shut down a channel.

Definition at line 353 of file XrdClPostMaster.cc.

354 {
355 std::shared_ptr<Channel> channel;
356 {
357 XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
358 PostMasterImpl::ChannelMap::iterator it =
359 pImpl->pChannelMap.find( url.GetChannelId() );
360
361 if( it == pImpl->pChannelMap.end() )
362 return Status( stError, errInvalidOp );
363 channel = it->second;
364 pImpl->pChannelMap.erase( it );
365 }
366
367 channel->ForceDisconnect( hush );
368 return Status();
369 }
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInvalidOp

References XrdCl::errInvalidOp, XrdCl::URL::GetChannelId(), and XrdCl::stError.

Here is the call graph for this function:

◆ ForceDisconnect() [3/3]

Status XrdCl::PostMaster::ForceDisconnect ( std::shared_ptr< Channel > channel,
const uint64_t sess )

Shut down a channel. This version is used by the channel itself.

Definition at line 374 of file XrdClPostMaster.cc.

376 {
377 if( !channel )
378 return Status( stError, errNotSupported );
379
380 {
381 XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
382 PostMasterImpl::ChannelMap::iterator it =
383 pImpl->pChannelMap.find( channel->GetURL().GetChannelId() );
384
385 if( it != pImpl->pChannelMap.end() && it->second == channel )
386 pImpl->pChannelMap.erase( it );
387 }
388
389 channel->ForceDisconnect( channel, sess );
390 return Status();
391 }
const uint16_t errNotSupported

References XrdCl::errNotSupported, and XrdCl::stError.

◆ ForceReconnect()

Status XrdCl::PostMaster::ForceReconnect ( const URL & url)

Reconnect the channel.

Definition at line 393 of file XrdClPostMaster.cc.

394 {
395 std::shared_ptr<Channel> channel;
396 {
397 XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
398 PostMasterImpl::ChannelMap::iterator it =
399 pImpl->pChannelMap.find( url.GetChannelId() );
400
401 if( it == pImpl->pChannelMap.end() )
402 return Status( stError, errInvalidOp );
403 channel = it->second;
404 }
405
406 channel->ForceReconnect();
407 return Status();
408 }

References XrdCl::errInvalidOp, XrdCl::URL::GetChannelId(), and XrdCl::stError.

Here is the call graph for this function:

◆ GetJobManager()

JobManager * XrdCl::PostMaster::GetJobManager ( )

Get the job manager object user by the post master.

Definition at line 337 of file XrdClPostMaster.cc.

338 {
339 return pImpl->pJobManager;
340 }

Referenced by XrdCl::FileStateHandler::Close(), XrdCl::Operation< HasHndl >::Run(), XrdEc::ScheduleHandler(), XrdEc::ScheduleHandler(), and XrdCl::FileStateHandler::TimeOutRequests().

Here is the caller graph for this function:

◆ GetTaskManager()

TaskManager * XrdCl::PostMaster::GetTaskManager ( )

Get the task manager object user by the post master.

Definition at line 329 of file XrdClPostMaster.cc.

330 {
331 return pImpl->pTaskManager;
332 }

Referenced by XrdCl::DefaultEnv::GetPostMaster().

Here is the caller graph for this function:

◆ Initialize()

bool XrdCl::PostMaster::Initialize ( )

Initializer.

Definition at line 141 of file XrdClPostMaster.cc.

142 {
143 Env *env = DefaultEnv::GetEnv();
144 std::string pollerPref = DefaultPollerPreference;
145 env->GetString( "PollerPreference", pollerPref );
146
147 pImpl->pPoller = PollerFactory::CreatePoller( pollerPref );
148
149 if( !pImpl->pPoller )
150 return false;
151
152 bool st = pImpl->pPoller->Initialize();
153
154 if( !st )
155 {
156 delete pImpl->pPoller;
157 return false;
158 }
159
160 pImpl->pJobManager->Initialize();
161 pImpl->pInitialized = true;
162 return true;
163 }
static Env * GetEnv()
Get default client environment.
static Poller * CreatePoller(const std::string &preference)
const char *const DefaultPollerPreference

References XrdCl::PollerFactory::CreatePoller(), XrdCl::DefaultPollerPreference, XrdCl::DefaultEnv::GetEnv(), and XrdCl::Env::GetString().

Referenced by XrdCl::DefaultEnv::GetPostMaster().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ IsRunning()

bool XrdCl::PostMaster::IsRunning ( )
Returns
: true if underlying threads are running, false otherwise

Definition at line 558 of file XrdClPostMaster.cc.

559 {
560 return pImpl->pRunning;
561 }

◆ NbConnectedStrm()

uint16_t XrdCl::PostMaster::NbConnectedStrm ( const URL & url)

Get the number of connected data streams.

Definition at line 413 of file XrdClPostMaster.cc.

414 {
415 auto channel = pImpl->GetChannel( url );
416 if( !channel ) return 0;
417 return channel->NbConnectedStrm();
418 }

◆ NotifyConnectHandler()

void XrdCl::PostMaster::NotifyConnectHandler ( const URL & url)

Notify the global on-connect handler.

Definition at line 452 of file XrdClPostMaster.cc.

453 {
454 XrdSysMutexHelper lck( pImpl->pMtx );
455 if( pImpl->pOnConnJob )
456 {
457 URL *ptr = new URL( url );
458 pImpl->pJobManager->QueueJob( pImpl->pOnConnJob.get(), ptr );
459 }
460 }

Referenced by XrdCl::Stream::OnConnect().

Here is the caller graph for this function:

◆ NotifyConnErrHandler()

void XrdCl::PostMaster::NotifyConnErrHandler ( const URL & url,
const XRootDStatus & status )

Notify the global error connection handler.

Definition at line 465 of file XrdClPostMaster.cc.

466 {
467 XrdSysMutexHelper lck( pImpl->pMtx );
468 if( pImpl->pOnConnErrCB )
469 {
470 ConnErrJob *job = new ConnErrJob( url, status, pImpl->pOnConnErrCB );
471 pImpl->pJobManager->QueueJob( job, nullptr );
472 }
473 }

Referenced by XrdCl::Stream::OnConnectError().

Here is the caller graph for this function:

◆ QueryTransport()

Status XrdCl::PostMaster::QueryTransport ( const URL & url,
uint16_t query,
AnyObject & result )

Query the transport handler for a given URL

Parameters
urlthe channel to be queried
querythe query as defined in the TransportQuery struct or others that may be recognized by the protocol transport
resultthe result of the query
Returns
status of the query

Definition at line 276 of file XrdClPostMaster.cc.

279 {
280 std::shared_ptr<Channel> channel;
281 {
282 XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
283 PostMasterImpl::ChannelMap::iterator it =
284 pImpl->pChannelMap.find( url.GetChannelId() );
285 if( it == pImpl->pChannelMap.end() )
286 return Status( stError, errInvalidOp );
287 channel = it->second;
288 }
289
290 if( !channel )
291 return Status( stError, errNotSupported );
292
293 return channel->QueryTransport( query, result );
294 }

References XrdCl::errInvalidOp, XrdCl::errNotSupported, XrdCl::URL::GetChannelId(), and XrdCl::stError.

Referenced by XrdCl::FileSystem::DirList(), and XrdCl::FileStateHandler::PgRead().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Redirect()

Status XrdCl::PostMaster::Redirect ( const URL & url,
Message * msg,
MsgHandler * handler )

Definition at line 262 of file XrdClPostMaster.cc.

265 {
266 RedirectorRegistry &registry = RedirectorRegistry::Instance();
267 VirtualRedirector *redirector = registry.Get( url );
268 if( !redirector )
269 return Status( stError, errInvalidOp );
270 return redirector->HandleRequest( msg, inHandler );
271 }
static RedirectorRegistry & Instance()
Returns reference to the single instance.

References XrdCl::errInvalidOp, XrdCl::RedirectorRegistry::Get(), XrdCl::VirtualRedirector::HandleRequest(), XrdCl::RedirectorRegistry::Instance(), and XrdCl::stError.

Referenced by XrdCl::MessageUtils::RedirectMessage().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ RegisterEventHandler()

Status XrdCl::PostMaster::RegisterEventHandler ( const URL & url,
ChannelEventHandler * handler )

Register channel event handler.

Definition at line 299 of file XrdClPostMaster.cc.

301 {
302 auto channel = pImpl->GetChannel( url );
303
304 if( !channel )
305 return Status( stError, errNotSupported );
306
307 channel->RegisterEventHandler( handler );
308 return Status();
309 }

References XrdCl::errNotSupported, and XrdCl::stError.

◆ Reinitialize()

bool XrdCl::PostMaster::Reinitialize ( )

Reinitialize after fork.

Definition at line 240 of file XrdClPostMaster.cc.

241 {
242 return true;
243 }

◆ RemoveEventHandler()

Status XrdCl::PostMaster::RemoveEventHandler ( const URL & url,
ChannelEventHandler * handler )

Remove a channel event handler.

Definition at line 314 of file XrdClPostMaster.cc.

316 {
317 auto channel = pImpl->GetChannel( url );
318
319 if( !channel )
320 return Status( stError, errNotSupported );
321
322 channel->RemoveEventHandler( handler );
323 return Status();
324 }

References XrdCl::errNotSupported, and XrdCl::stError.

◆ Send()

XRootDStatus XrdCl::PostMaster::Send ( const URL & url,
Message * msg,
MsgHandler * handler,
bool stateful,
time_t expires )

Send the message asynchronously - the message is inserted into the send queue and a listener is called when the message is succesfsully pushed through the wire or when the timeout elapses

DEADLOCK WARNING: no lock should be taken while calling this method that are used in the callback as well.

Parameters
urlrecipient of the message
msgmessage to be sent
expiresunix timestamp after which a failure is reported to the handler
handlerhandler will be notified about the status
statefulphysical stream disconnection causes an error
Returns
success if the message was successfully inserted into the send queues, failure otherwise

Definition at line 248 of file XrdClPostMaster.cc.

253 {
254 auto channel = pImpl->GetChannel( url );
255
256 if( !channel )
257 return XRootDStatus( stError, errNotSupported );
258
259 return channel->Send( msg, handler, stateful, expires );
260 }

References XrdCl::errNotSupported, and XrdCl::stError.

Referenced by XrdCl::MessageUtils::SendMessage().

Here is the caller graph for this function:

◆ SetConnectionErrorHandler()

void XrdCl::PostMaster::SetConnectionErrorHandler ( std::function< void(const URL &, const XRootDStatus &)> handler)

Set the global on-error on-connect handler for control streams.

Definition at line 443 of file XrdClPostMaster.cc.

444 {
445 XrdSysMutexHelper lck( pImpl->pMtx );
446 pImpl->pOnConnErrCB = std::move( handler );
447 }

◆ SetOnConnectHandler()

void XrdCl::PostMaster::SetOnConnectHandler ( std::unique_ptr< Job > onConnJob)

Set the global connection error handler.

Set the global on-connect handler for control streams.

Definition at line 434 of file XrdClPostMaster.cc.

435 {
436 XrdSysMutexHelper lck( pImpl->pMtx );
437 pImpl->pOnConnJob = std::move( onConnJob );
438 }

Referenced by XrdPosixConfig::conTracker().

Here is the caller graph for this function:

◆ SetOnDataConnectHandler()

void XrdCl::PostMaster::SetOnDataConnectHandler ( const URL & url,
std::shared_ptr< Job > onConnJob )

Set the on-connect handler for data streams.

Definition at line 423 of file XrdClPostMaster.cc.

425 {
426 auto channel = pImpl->GetChannel( url );
427 if( !channel ) return;
428 channel->SetOnDataConnectHandler( onConnJob );
429 }

◆ Start()

bool XrdCl::PostMaster::Start ( )

Start the post master.

Definition at line 194 of file XrdClPostMaster.cc.

195 {
196 if( !pImpl->pInitialized )
197 return false;
198
199 if( !pImpl->pPoller->Start() )
200 return false;
201
202 if( !pImpl->pTaskManager->Start() )
203 {
204 pImpl->pPoller->Stop();
205 return false;
206 }
207
208 if( !pImpl->pJobManager->Start() )
209 {
210 pImpl->pPoller->Stop();
211 pImpl->pTaskManager->Stop();
212 return false;
213 }
214
215 pImpl->pRunning = true;
216 return true;
217 }

Referenced by XrdCl::DefaultEnv::GetPostMaster().

Here is the caller graph for this function:

◆ Stop()

bool XrdCl::PostMaster::Stop ( )

Stop the postmaster.

Definition at line 222 of file XrdClPostMaster.cc.

223 {
224 if( !pImpl->pInitialized || !pImpl->pRunning )
225 return true;
226
227 if( !pImpl->pJobManager->Stop() )
228 return false;
229 if( !pImpl->pPoller->Stop() )
230 return false;
231 if( !pImpl->pTaskManager->Stop() )
232 return false;
233 pImpl->pRunning = false;
234 return true;
235 }

Referenced by main().

Here is the caller graph for this function:

The documentation for this class was generated from the following files: