XRootD
Loading...
Searching...
No Matches
XrdClHttp::HandlerQueue Class Reference

#include <XrdClHttpUtil.hh>

Collaboration diagram for XrdClHttp::HandlerQueue:

Public Member Functions

 HandlerQueue (unsigned max_pending_ops)
std::shared_ptr< CurlOperationConsume (std::chrono::steady_clock::duration)
void Expire ()
CURLGetHandle ()
int PollFD () const
void Produce (std::shared_ptr< CurlOperation > handler)
void RecycleHandle (CURL *)
void ReleaseHandles ()
void Shutdown ()
std::shared_ptr< CurlOperationTryConsume ()

Static Public Member Functions

static unsigned GetDefaultMaxPendingOps ()
static std::string GetMonitoringJson ()

Detailed Description

HandlerQueue is a deque of curl operations that need to be performed. The object is thread safe and can be waited on via poll().

The fact that it's poll'able is necessary because the multi-curl driver thread is based on polling FD's

Definition at line 167 of file XrdClHttpUtil.hh.

Constructor & Destructor Documentation

◆ HandlerQueue()

HandlerQueue::HandlerQueue ( unsigned max_pending_ops)

Definition at line 555 of file XrdClHttpUtil.cc.

555 :
556 m_max_pending_ops(max_pending_ops)
557{
558 int filedes[2];
559 auto result = pipe(filedes);
560 if (result == -1) {
561 throw std::runtime_error(strerror(errno));
562 }
563 if (fcntl(filedes[0], F_SETFL, O_NONBLOCK | O_CLOEXEC) == -1 || fcntl(filedes[1], F_SETFL, O_NONBLOCK | O_CLOEXEC) == -1) {
564 close(filedes[0]);
565 close(filedes[1]);
566 throw std::runtime_error(strerror(errno));
567 }
568 m_read_fd = filedes[0];
569 m_write_fd = filedes[1];
570};
#define close(a)
Definition XrdPosix.hh:48

References close.

Member Function Documentation

◆ Consume()

std::shared_ptr< CurlOperation > HandlerQueue::Consume ( std::chrono::steady_clock::duration dur)

Definition at line 788 of file XrdClHttpUtil.cc.

789{
790 std::unique_lock<std::mutex> lk(m_mutex);
791 m_consumer_cv.wait_for(lk, dur, [&]{return m_ops.size() > 0 || m_shutdown;});
792 if (m_shutdown || m_ops.empty()) {
793 return {};
794 }
795
796 std::shared_ptr<CurlOperation> result = m_ops.front();
797 m_ops.pop_front();
798
799 char ready[1];
800 while (true) {
801 auto result = read(m_read_fd, ready, 1);
802 if (result == -1) {
803 if (errno == EINTR) {
804 continue;
805 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
806 // This should never happen, but if it does, just continue
807 // as if we successfully read the byte.
808 break;
809 }
810 throw std::runtime_error(strerror(errno));
811 }
812 break;
813 }
814
815 lk.unlock();
816 m_producer_cv.notify_one();
817 m_ops_consumed.fetch_add(1, std::memory_order_relaxed);
818
819 return result;
820}
#define read(a, b, c)
Definition XrdPosix.hh:82

References read.

◆ Expire()

void HandlerQueue::Expire ( )

Definition at line 687 of file XrdClHttpUtil.cc.

688{
689 std::unique_lock<std::mutex> lk(m_mutex);
690 auto now = std::chrono::steady_clock::now();
691
692 // Iterate through the paused transfers, checking if they are done.
693 for (auto &op : m_ops) {
694 if (!op->IsPaused()) continue;
695
696 if (op->TransferStalled(0, now)) {
697 op->ContinueHandle();
698 }
699 }
700
701 std::vector<decltype(m_ops)::value_type> expired_ops;
702 unsigned expired_count = 0;
703 auto it = std::remove_if(m_ops.begin(), m_ops.end(),
704 [&](const std::shared_ptr<CurlOperation> &handler) {
705 auto expired = handler->GetOperationExpiry() < now;
706 if (expired) {
707 expired_ops.push_back(handler);
708 expired_count++;
709 }
710 return expired;
711 });
712 m_ops.erase(it, m_ops.end());
713
714 // The contents of our pipe and the in-memory queue are now off by expired_count.
715 // Read exactly that many bytes from the pipe and throw them away.
716 char throwaway[64];
717 unsigned bytes_to_read = expired_count;
718 while (bytes_to_read > 0) {
719 size_t chunk = std::min<size_t>(sizeof(throwaway), bytes_to_read);
720 ssize_t n = read(m_read_fd, throwaway, chunk);
721 if (n > 0) {
722 bytes_to_read -= n;
723 } else if (n == -1) {
724 if (errno == EINTR) {
725 continue;
726 } else {
727 // EWOULDBLOCK is a possibility if there's a synchronization error;
728 // for now, just continue on as if we were successful in reading out
729 // the missing bytes
730 break;
731 }
732 } else {
733 break;
734 }
735 }
736
737 // Note: the failure handler may trigger new operations submitted to the queue
738 // (which requires the lock to be held) such as a prefetch operation that gets split
739 // into multiple sub-operations.
740 //
741 // Thus, we must unlock the mutex protecting the queue and avoid touching the shared state of
742 // m_ops.
743 lk.unlock();
744 for (auto &handler : expired_ops) {
745 if (handler) handler->Fail(XrdCl::errOperationExpired, 0, "Operation expired while in queue");
746 }
747}
const uint16_t errOperationExpired

◆ GetDefaultMaxPendingOps()

unsigned XrdClHttp::HandlerQueue::GetDefaultMaxPendingOps ( )
inlinestatic

Definition at line 194 of file XrdClHttpUtil.hh.

194{return m_default_max_pending_ops;}

◆ GetHandle()

CURL * HandlerQueue::GetHandle ( )

Definition at line 671 of file XrdClHttpUtil.cc.

671 {
672 if (m_handles.size()) {
673 auto result = m_handles.back();
674 m_handles.pop_back();
675 return result;
676 }
677
678 return ::GetHandle(EnableCurlHeaderDump());
679}

◆ GetMonitoringJson()

std::string HandlerQueue::GetMonitoringJson ( )
static

Definition at line 823 of file XrdClHttpUtil.cc.

824{
825 auto consumed = m_ops_consumed.load(std::memory_order_relaxed);
826 auto produced = m_ops_produced.load(std::memory_order_relaxed);
827 return "{"
828 "\"produced\":" + std::to_string(produced) + ","
829 "\"consumed\":" + std::to_string(consumed) + ","
830 "\"pending\":" + std::to_string(produced - consumed) + ","
831 "\"rejected\":" + std::to_string(m_ops_rejected.load(std::memory_order_relaxed)) +
832 "}";
833}

◆ PollFD()

int XrdClHttp::HandlerQueue::PollFD ( ) const
inline

Definition at line 176 of file XrdClHttpUtil.hh.

176{return m_read_fd;}

◆ Produce()

void HandlerQueue::Produce ( std::shared_ptr< CurlOperation > handler)

Definition at line 750 of file XrdClHttpUtil.cc.

751{
752 auto handler_expiry = handler->GetOperationExpiry();
753 std::unique_lock<std::mutex> lk{m_mutex};
754 m_producer_cv.wait_until(lk,
755 handler_expiry,
756 [&]{return m_ops.size() < m_max_pending_ops;}
757 );
758 if (std::chrono::steady_clock::now() > handler_expiry) {
759 lk.unlock();
760 handler->Fail(XrdCl::errOperationExpired, 0, "Operation expired while waiting for worker");
761 m_ops_rejected.fetch_add(1, std::memory_order_relaxed);
762 return;
763 }
764
765 m_ops.push_back(handler);
766 char ready[] = "1";
767 while (true) {
768 auto result = write(m_write_fd, ready, 1);
769 if (result == -1) {
770 if (errno == EINTR) {
771 continue;
772 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
773 // This should never happen, but if it does, just continue
774 // as if we successfully wrote the notification to the pipe.
775 break;
776 }
777 throw std::runtime_error(strerror(errno));
778 }
779 break;
780 }
781
782 lk.unlock();
783 m_consumer_cv.notify_one();
784 m_ops_produced.fetch_add(1, std::memory_order_relaxed);
785}
#define write(a, b, c)
Definition XrdPosix.hh:115

References XrdCl::errOperationExpired, and write.

◆ RecycleHandle()

void HandlerQueue::RecycleHandle ( CURL * curl)

Definition at line 682 of file XrdClHttpUtil.cc.

682 {
683 m_handles.push_back(curl);
684}

◆ ReleaseHandles()

void HandlerQueue::ReleaseHandles ( )

Definition at line 879 of file XrdClHttpUtil.cc.

880{
881 for (auto handle : m_handles) {
882 curl_easy_cleanup(handle);
883 }
884 m_handles.clear();
885}

◆ Shutdown()

void HandlerQueue::Shutdown ( )

Definition at line 871 of file XrdClHttpUtil.cc.

872{
873 std::unique_lock lock(m_mutex);
874 m_shutdown = true;
875 m_consumer_cv.notify_all();
876}

◆ TryConsume()

std::shared_ptr< CurlOperation > HandlerQueue::TryConsume ( )

Definition at line 836 of file XrdClHttpUtil.cc.

837{
838 std::unique_lock<std::mutex> lk(m_mutex);
839 if (m_ops.size() == 0) {
840 std::shared_ptr<CurlOperation> result;
841 return result;
842 }
843
844 std::shared_ptr<CurlOperation> result = m_ops.front();
845 m_ops.pop_front();
846
847 char ready[1];
848 while (true) {
849 auto result = read(m_read_fd, ready, 1);
850 if (result == -1) {
851 if (errno == EINTR) {
852 continue;
853 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
854 // This should never happen, but if it does, just continue
855 // as if we successfully read the byte.
856 break;
857 }
858 throw std::runtime_error(strerror(errno));
859 }
860 break;
861 }
862
863 lk.unlock();
864 m_producer_cv.notify_one();
865 m_ops_consumed.fetch_add(1, std::memory_order_relaxed);
866
867 return result;
868}

References read.


The documentation for this class was generated from the following files: