XRootD
Loading...
Searching...
No Matches
XrdClXCpCtx.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#include "XrdCl/XrdClXCpCtx.hh"
26#include "XrdCl/XrdClXCpSrc.hh"
27#include "XrdCl/XrdClLog.hh"
30
31#include <algorithm>
32
33namespace XrdCl
34{
35
36XCpCtx::XCpCtx( const std::vector<std::string> &urls, uint64_t blockSize, uint8_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks, int64_t fileSize ) :
37 pUrls( std::deque<std::string>( urls.begin(), urls.end() ) ), pBlockSize( blockSize ),
38 pParallelSrc( parallelSrc ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ),
39 pOffset( 0 ), pFileSize( -1 ), pFileSizeCV( 0 ), pDataReceived( 0 ), pDone( false ),
40 pDoneCV( 0 ), pRefCount( 1 ), pDeleteCV( 0 ), pDelete( false )
41{
42 SetFileSize( fileSize );
43}
44
45XCpCtx::~XCpCtx()
46{
47 // at this point there's no concurrency
48 // this object dies as the last one
49 while( !pSink.IsEmpty() )
50 {
51 PageInfo *chunk = pSink.Get();
52 if( chunk )
53 XCpSrc::DeleteChunk( chunk );
54 }
55}
56
57bool XCpCtx::GetNextUrl( std::string & url )
58{
59 XrdSysMutexHelper lck( pMtx );
60 if( pUrls.empty() ) return false;
61 url = pUrls.front();
62 pUrls.pop();
63 return true;
64}
65
67{
68 uint64_t transferRate = -1; // set transferRate to max uint64 value
69 XCpSrc *ret = 0;
70
71 std::list<XCpSrc*>::iterator itr;
72 XrdSysMutexHelper lck( pMtx );
73
74 for( itr = pSources.begin() ; itr != pSources.end() ; ++itr )
75 {
76 XCpSrc *src = *itr;
77 if( src == exclude ) continue;
78 uint64_t tmp = src->TransferRate();
79 if( src->HasData() && tmp < transferRate )
80 {
81 ret = src;
82 transferRate = tmp;
83 }
84 }
85
86 if( !ret ) return ret;
87 return ret->Self();
88}
89
91{
92 pSink.Put( chunk );
93}
94
95std::pair<uint64_t, uint64_t> XCpCtx::GetBlock()
96{
97 XrdSysMutexHelper lck( pMtx );
98
99 uint64_t blkSize = pBlockSize, offset = pOffset;
100 if( pOffset + blkSize > uint64_t( pFileSize ) )
101 blkSize = pFileSize - pOffset;
102 pOffset += blkSize;
103
104 return std::make_pair( offset, blkSize );
105}
106
107void XCpCtx::SetFileSize( int64_t size )
108{
109 XrdSysCondVarHelper lckcv( pFileSizeCV );
110 XrdSysMutexHelper lckmtx( pMtx );
111 if( pFileSize < 0 && size >= 0 )
112 {
113 pFileSize = size;
114 pFileSizeCV.Broadcast();
115
116 if( pBlockSize > uint64_t( pFileSize ) / pParallelSrc )
117 pBlockSize = pFileSize / pParallelSrc;
118
119 if( pBlockSize < pChunkSize )
120 pBlockSize = pChunkSize;
121 }
122}
123
125{
126 for( uint8_t i = 0; i < pParallelSrc; ++i )
127 {
128 XCpSrc *src = new XCpSrc( pChunkSize, pParallelChunks, pFileSize, this );
129 pSources.push_back( src );
130 }
131
132 auto scpy = pSources;
133 bool ok = false;
134 for(auto src: scpy) {
135 if( src->Start() )
136 {
137 // src destructor will remove src from pSources
138 src->Delete();
139 }
140 else
141 {
142 ok = true;
143 }
144 }
145
146 if( !ok )
147 {
148 Log *log = DefaultEnv::GetLog();
149 log->Error( UtilityMsg, "Failed to initialize (failed to create new threads)" );
150 return XRootDStatus( stError, errInternal, EAGAIN, "XCpCtx: failed to create new threads." );
151 }
152
153 return XRootDStatus();
154}
155
157{
158 // if we received all the data we are done here
159 if( pDataReceived == uint64_t( pFileSize ) )
160 {
161 XrdSysCondVarHelper lck( pDoneCV );
162 pDone = true;
163 pDoneCV.Broadcast();
164 return XRootDStatus( stOK, suDone );
165 }
166
167 // if we don't have active sources it means we failed
168 if( GetRunning() == 0 )
169 {
170 XrdSysCondVarHelper lck( pDoneCV );
171 pDone = true;
172 pDoneCV.Broadcast();
174 }
175
176 PageInfo *chunk = pSink.Get();
177 if( chunk )
178 {
179 pDataReceived += chunk->GetLength();
180 ci = std::move( *chunk );
181 delete chunk;
182 return XRootDStatus( stOK, suContinue );
183 }
184
185 return XRootDStatus( stOK, suRetry );
186}
187
189{
190 pDoneCV.Broadcast();
191}
192
194{
195 XrdSysCondVarHelper lck( pDoneCV );
196
197 if( !pDone )
198 pDoneCV.Wait( 60 );
199
200 return pDone;
201}
202
203size_t XCpCtx::GetRunning()
204{
205 // count active sources
206 size_t nbRunning = 0;
207 std::list<XCpSrc*>::iterator itr;
208 XrdSysMutexHelper lck( pMtx );
209
210 for( itr = pSources.begin() ; itr != pSources.end() ; ++ itr)
211 if( (*itr)->IsRunning() )
212 ++nbRunning;
213 return nbRunning;
214}
215
216
217} /* namespace XrdCl */
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void NotifyIdleSrc()
bool GetNextUrl(std::string &url)
XCpSrc * WeakestLink(XCpSrc *exclude)
void PutChunk(PageInfo *chunk)
XCpCtx(const std::vector< std::string > &urls, uint64_t blockSize, uint8_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks, int64_t fileSize)
void SetFileSize(int64_t size)
std::pair< uint64_t, uint64_t > GetBlock()
XRootDStatus Initialize()
XRootDStatus GetChunk(XrdCl::PageInfo &ci)
static void DeleteChunk(PageInfo *&chunk)
XCpSrc * Self()
uint64_t TransferRate()
const uint16_t suRetry
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint64_t UtilityMsg
const uint16_t suDone
const uint16_t suContinue
const uint16_t errNoMoreReplicas
No more replicas to try.
uint32_t GetLength() const
Get the data length.