XRootD
Loading...
Searching...
No Matches
XrdThrottleManager.cc
Go to the documentation of this file.
1
3
4#include "XrdOuc/XrdOucEnv.hh"
12
13#define XRD_TRACE m_trace->
14
15#include "INIReader.h"
17
18#include <algorithm>
19#include <array>
20#include <cmath>
21#include <random>
22#include <sstream>
23
24#if defined(__linux__)
25
26#include <sched.h>
27unsigned XrdThrottleManager::GetTimerListHash() {
28 int cpu = sched_getcpu();
29 if (cpu < 0) {
30 return 0;
31 }
32 return cpu % m_timer_list_size;
33}
34
35#else
36
37unsigned XrdThrottleManager::GetTimerListHash() {
38 return 0;
39}
40
41#endif
42
43const char *
44XrdThrottleManager::TraceID = "ThrottleManager";
45
47 m_trace(tP),
48 m_log(lP),
49 m_interval_length_seconds(1.0),
50 m_bytes_per_second(-1),
51 m_ops_per_second(-1),
52 m_concurrency_limit(-1),
53 m_last_round_allocation(100*1024),
54 m_loadshed_host(""),
55 m_loadshed_port(0),
56 m_loadshed_frequency(0)
57{
58}
59
60void
62{
63
64 auto max_open = config.GetMaxOpen();
65 if (max_open != -1) SetMaxOpen(max_open);
66 auto max_conn = config.GetMaxConn();
67 if (max_conn != -1) SetMaxConns(max_conn);
68 auto max_wait = config.GetMaxWait();
69 if (max_wait != -1) SetMaxWait(max_wait);
70
72 config.GetThrottleIOPSRate(),
74 static_cast<float>(config.GetThrottleRecomputeIntervalMS())/1000.0);
75
76 m_trace->What = config.GetTraceLevels();
77
78 auto loadshed_host = config.GetLoadshedHost();
79 auto loadshed_port = config.GetLoadshedPort();
80 auto loadshed_freq = config.GetLoadshedFreq();
81 if (!loadshed_host.empty() && loadshed_port > 0 && loadshed_freq > 0)
82 {
83 // Loadshed specified, so set it.
84 SetLoadShed(loadshed_host, loadshed_port, loadshed_freq);
85 }
86
87 // Load per-user configuration if specified
88 auto user_config_file = config.GetUserConfigFile();
89 if (!user_config_file.empty())
90 {
91 m_user_config_file = user_config_file;
92 if (LoadUserLimits(user_config_file) != 0)
93 {
94 m_log->Emsg("ThrottleManager", "Failed to load per-user configuration file", user_config_file.c_str());
95 }
96 }
97}
98
99void
101{
102 TRACE(DEBUG, "Initializing the throttle manager.");
103 // Initialize all our shares to zero.
104 m_primary_bytes_shares.resize(m_max_users);
105 m_secondary_bytes_shares.resize(m_max_users);
106 m_primary_ops_shares.resize(m_max_users);
107 m_secondary_ops_shares.resize(m_max_users);
108 for (auto & waiter : m_waiter_info) {
109 waiter.m_manager = this;
110 }
111
112 // Allocate each user 100KB and 10 ops to bootstrap;
113 for (int i=0; i<m_max_users; i++)
114 {
115 m_primary_bytes_shares[i] = m_last_round_allocation;
116 m_secondary_bytes_shares[i] = 0;
117 m_primary_ops_shares[i] = 10;
118 m_secondary_ops_shares[i] = 0;
119 }
120
121 int rc;
122 pthread_t tid;
123 if ((rc = XrdSysThread::Run(&tid, XrdThrottleManager::RecomputeBootstrap, static_cast<void *>(this), 0, "Buffer Manager throttle")))
124 m_log->Emsg("ThrottleManager", rc, "create throttle thread");
125
126}
127
128std::tuple<std::string, uint16_t>
130 // client can be null, if so, return nobody
131 if (!client) {
132 return std::make_tuple("nobody", GetUid("nobody"));
133 }
134
135 // Try various potential "names" associated with the request, from the most
136 // specific to most generic.
137 std::string user;
138
139 if (client->eaAPI && client->eaAPI->Get("token.subject", user)) {
140 if (client->vorg) user = std::string(client->vorg) + ":" + user;
141 } else if (client->eaAPI) {
142 std::string request_name;
143 if (client->eaAPI->Get("request.name", request_name) && !request_name.empty()) user = request_name;
144 }
145 if (user.empty()) {user = client->name ? client->name : "nobody";}
146 uint16_t uid = GetUid(user.c_str());
147 return std::make_tuple(user, uid);
148}
149
150/*
151 * Take as many shares as possible to fulfill the request; update
152 * request with current remaining value, or zero if satisfied.
153 */
154inline void
155XrdThrottleManager::GetShares(int &shares, int &request)
156{
157 int remaining;
158 AtomicFSub(remaining, shares, request);
159 if (remaining > 0)
160 {
161 request -= (remaining < request) ? remaining : request;
162 }
163}
164
165/*
166 * Iterate through all of the secondary shares, attempting
167 * to steal enough to fulfill the request.
168 */
169void
170XrdThrottleManager::StealShares(int uid, int &reqsize, int &reqops)
171{
172 if (!reqsize && !reqops) return;
173 TRACE(BANDWIDTH, "Stealing shares to fill request of " << reqsize << " bytes");
174 TRACE(IOPS, "Stealing shares to fill request of " << reqops << " ops.");
175
176 for (int i=uid+1; i % m_max_users == uid; i++)
177 {
178 if (reqsize) GetShares(m_secondary_bytes_shares[i % m_max_users], reqsize);
179 if (reqops) GetShares(m_secondary_ops_shares[ i % m_max_users], reqops);
180 }
181
182 TRACE(BANDWIDTH, "After stealing shares, " << reqsize << " of request bytes remain.");
183 TRACE(IOPS, "After stealing shares, " << reqops << " of request ops remain.");
184}
185
186/*
187 * Increment the number of files held open by a given entity. Returns false
188 * if the user is at the maximum; in this case, the internal counter is not
189 * incremented.
190 */
191bool
192XrdThrottleManager::OpenFile(const std::string &entity, std::string &error_message)
193{
194 // Get per-user connection limit (0 means use global)
195 unsigned long user_max_conn = GetUserMaxConn(entity);
196 unsigned long effective_max_conn = user_max_conn < m_max_conns ? user_max_conn : m_max_conns;
197
198 if (m_max_open == 0 && effective_max_conn == 0) return true;
199
200 const std::lock_guard<std::mutex> lock(m_file_mutex);
201 auto iter = m_file_counters.find(entity);
202 unsigned long cur_open_files = 0, cur_open_conns;
203 if (m_max_open) {
204 if (iter == m_file_counters.end()) {
205 m_file_counters[entity] = 1;
206 TRACE(FILES, "User " << entity << " has opened their first file");
207 cur_open_files = 1;
208 } else if (iter->second < m_max_open) {
209 iter->second++;
210 cur_open_files = iter->second;
211 } else {
212 std::stringstream ss;
213 ss << "User " << entity << " has hit the limit of " << m_max_open << " open files";
214 TRACE(FILES, ss.str());
215 error_message = ss.str();
216 return false;
217 }
218 }
219
220 if (effective_max_conn > 0) {
221 auto pid = XrdSysThread::Num();
222 auto conn_iter = m_active_conns.find(entity);
223 auto conn_count_iter = m_conn_counters.find(entity);
224 if ((conn_count_iter != m_conn_counters.end()) && (conn_count_iter->second == effective_max_conn) &&
225 (conn_iter == m_active_conns.end() || ((*(conn_iter->second))[pid] == 0)))
226 {
227 // note: we are rolling back the increment in open files
228 if (m_max_open) iter->second--;
229 std::stringstream ss;
230 ss << "User " << entity << " has hit the limit of " << effective_max_conn <<
231 " open connections";
232 if (user_max_conn > 0) {
233 ss << " (per-user limit)";
234 }
235 TRACE(CONNS, ss.str());
236 error_message = ss.str();
237 return false;
238 }
239 if (conn_iter == m_active_conns.end()) {
240 std::unique_ptr<std::unordered_map<pid_t, unsigned long>> conn_map(
241 new std::unordered_map<pid_t, unsigned long>());
242 (*conn_map)[pid] = 1;
243 m_active_conns[entity] = std::move(conn_map);
244 if (conn_count_iter == m_conn_counters.end()) {
245 m_conn_counters[entity] = 1;
246 cur_open_conns = 1;
247 } else {
248 m_conn_counters[entity] ++;
249 cur_open_conns = m_conn_counters[entity];
250 }
251 } else {
252 auto pid_iter = conn_iter->second->find(pid);
253 if (pid_iter == conn_iter->second->end() || pid_iter->second == 0) {
254 (*(conn_iter->second))[pid] = 1;
255 conn_count_iter->second++;
256 cur_open_conns = conn_count_iter->second;
257 } else {
258 (*(conn_iter->second))[pid] ++;
259 cur_open_conns = conn_count_iter->second;
260 }
261 }
262 TRACE(CONNS, "User " << entity << " has " << cur_open_conns << " open connections (limit: " << effective_max_conn << ")");
263 }
264 if (m_max_open) TRACE(FILES, "User " << entity << " has " << cur_open_files << " open files");
265 return true;
266}
267
268
269/*
270 * Decrement the number of files held open by a given entity.
271 *
272 * Returns false if the value would have fallen below zero or
273 * if the entity isn't tracked.
274 */
275bool
276XrdThrottleManager::CloseFile(const std::string &entity)
277{
278 if (m_max_open == 0 && m_max_conns == 0) return true;
279
280 bool result = true;
281 const std::lock_guard<std::mutex> lock(m_file_mutex);
282 if (m_max_open) {
283 auto iter = m_file_counters.find(entity);
284 if (iter == m_file_counters.end()) {
285 TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin never saw an open file");
286 result = false;
287 } else if (iter->second == 0) {
288 TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin thinks all files were already closed");
289 result = false;
290 } else {
291 iter->second--;
292 }
293 if (result) TRACE(FILES, "User " << entity << " closed a file; " << iter->second <<
294 " remain open");
295 }
296
297 if (m_max_conns) {
298 auto pid = XrdSysThread::Num();
299 auto conn_iter = m_active_conns.find(entity);
300 auto conn_count_iter = m_conn_counters.find(entity);
301 if (conn_iter == m_active_conns.end() || !(conn_iter->second)) {
302 TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
303 " tracking");
304 return false;
305 }
306 auto pid_iter = conn_iter->second->find(pid);
307 if (pid_iter == conn_iter->second->end()) {
308 TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
309 " tracking");
310 return false;
311 }
312 if (pid_iter->second == 0) {
313 TRACE(CONNS, "WARNING: User " << entity << " closed a file on connection the throttle"
314 " plugin thinks was idle");
315 } else {
316 pid_iter->second--;
317 }
318 if (conn_count_iter == m_conn_counters.end()) {
319 TRACE(CONNS, "WARNING: User " << entity << " closed a file but the throttle plugin never"
320 " observed an open file");
321 } else if (pid_iter->second == 0) {
322 if (conn_count_iter->second == 0) {
323 TRACE(CONNS, "WARNING: User " << entity << " had a connection go idle but the "
324 " throttle plugin already thought all connections were idle");
325 } else {
326 conn_count_iter->second--;
327 TRACE(CONNS, "User " << entity << " had connection on thread " << pid << " go idle; "
328 << conn_count_iter->second << " active connections remain");
329 }
330 }
331 }
332
333 return result;
334}
335
336
337/*
338 * Apply the throttle. If there are no limits set, returns immediately. Otherwise,
339 * this applies the limits as best possible, stalling the thread if necessary.
340 */
341void
342XrdThrottleManager::Apply(int reqsize, int reqops, int uid)
343{
344 if (m_bytes_per_second < 0)
345 reqsize = 0;
346 if (m_ops_per_second < 0)
347 reqops = 0;
348 while (reqsize || reqops)
349 {
350 // Subtract the requested out of the shares
351 AtomicBeg(m_compute_var);
352 GetShares(m_primary_bytes_shares[uid], reqsize);
353 if (reqsize)
354 {
355 TRACE(BANDWIDTH, "Using secondary shares; request has " << reqsize << " bytes left.");
356 GetShares(m_secondary_bytes_shares[uid], reqsize);
357 TRACE(BANDWIDTH, "Finished with secondary shares; request has " << reqsize << " bytes left.");
358 }
359 else
360 {
361 TRACE(BANDWIDTH, "Filled byte shares out of primary; " << m_primary_bytes_shares[uid] << " left.");
362 }
363 GetShares(m_primary_ops_shares[uid], reqops);
364 if (reqops)
365 {
366 GetShares(m_secondary_ops_shares[uid], reqops);
367 }
368 StealShares(uid, reqsize, reqops);
369 AtomicEnd(m_compute_var);
370
371 if (reqsize || reqops)
372 {
373 if (reqsize) TRACE(BANDWIDTH, "Sleeping to wait for throttle fairshare.");
374 if (reqops) TRACE(IOPS, "Sleeping to wait for throttle fairshare.");
375 m_compute_var.Wait();
376 m_loadshed_limit_hit++;
377 }
378 }
379
380}
381
382void
383XrdThrottleManager::UserIOAccounting()
384{
385 std::chrono::steady_clock::duration::rep total_active_time = 0;
386 for (size_t idx = 0; idx < m_timer_list.size(); idx++) {
387 auto &timerList = m_timer_list[idx];
388 std::unique_lock<std::mutex> lock(timerList.m_mutex);
389 auto timer = timerList.m_first;
390 while (timer) {
391 auto next = timer->m_next;
392 auto uid = timer->m_owner;
393 auto &waiter = m_waiter_info[uid];
394 auto recent_duration = timer->Reset();
395 waiter.m_io_time += recent_duration.count();
396
397 total_active_time += recent_duration.count();
398 timer = next;
399 }
400 }
401 m_io_active_time += total_active_time;
402}
403
404void
405XrdThrottleManager::ComputeWaiterOrder()
406{
407 // Update the IO time for long-running I/O operations. This prevents,
408 // for example, a 2-minute I/O operation from causing a spike in
409 // concurrency because it's otherwise only reported at the end.
410 UserIOAccounting();
411
412 auto now = std::chrono::steady_clock::now();
413 auto elapsed = now - m_last_waiter_recompute_time;
414 m_last_waiter_recompute_time = now;
415 std::chrono::duration<double> elapsed_secs = elapsed;
416 // Alpha is the decay factor for the exponential moving average. One window is 10 seconds,
417 // so every 10 seconds we decay the prior average by 1/e (that is, the weight is 64% of the
418 // total). This means the contribution of I/O load from a minute ago is 0.2% of the total.
419
420 // The moving average will be used to determine how close the user is to their "fair share"
421 // of the concurrency limit among the users that are waiting.
422 auto alpha = 1 - std::exp(-1 * elapsed_secs.count() / 10.0);
423
424 std::vector<double> share;
425 share.resize(m_max_users);
426 size_t users_with_waiters = 0;
427 // For each user, compute their current concurrency and determine how many waiting users
428 // total there are.
429 for (int i = 0; i < m_max_users; i++)
430 {
431 auto &waiter = m_waiter_info[i];
432 auto io_duration_rep = waiter.m_io_time.exchange(std::chrono::steady_clock::duration(0).count());
433 std::chrono::steady_clock::duration io_duration = std::chrono::steady_clock::duration(io_duration_rep);
434 std::chrono::duration<double> io_duration_secs = io_duration;
435 auto prev_concurrency = io_duration_secs.count() / elapsed_secs.count();
436 float new_concurrency = waiter.m_concurrency;
437
438 new_concurrency = (1 - alpha) * new_concurrency + alpha * prev_concurrency;
439 waiter.m_concurrency = new_concurrency;
440 if (new_concurrency > 0) {
441 TRACE(DEBUG, "User " << i << " has concurrency of " << new_concurrency);
442 }
443 unsigned waiting;
444 {
445 std::lock_guard<std::mutex> lock(waiter.m_mutex);
446 waiting = waiter.m_waiting;
447 }
448 if (waiting > 0)
449 {
450 share[i] = new_concurrency;
451 TRACE(DEBUG, "User " << i << " has concurrency of " << share[i] << " and is waiting for " << waiting);
452 // Handle the division-by-zero case; if we have no history of usage whatsoever, we should pretend we
453 // have at least some minimal load
454 if (share[i] == 0) {
455 share[i] = 0.1;
456 }
457 users_with_waiters++;
458 }
459 else
460 {
461 share[i] = 0;
462 }
463 }
464 auto fair_share = static_cast<double>(m_concurrency_limit) / static_cast<double>(users_with_waiters);
465 std::vector<uint16_t> waiter_order;
466 waiter_order.resize(m_max_users);
467
468 // Calculate the share for each user. We assume the user should get a share proportional to how
469 // far above or below the fair share they are. So, a user with concurrency of 20 when the fairshare
470 // is 10 will get 0.5 shares; a user with concurrency of 5 when the fairshare is 10 will get 2.0 shares.
471 double shares_sum = 0;
472 for (int idx = 0; idx < m_max_users; idx++)
473 {
474 if (share[idx]) {
475 shares_sum += fair_share / share[idx];
476 }
477 }
478
479 // We must quantize the overall shares into an array of 1024 elements. We do this by
480 // scaling up (or down) based on the total number of shares computed above. Note this
481 // quantization can lead to an over-provisioned user being assigned zero shares; thus,
482 // we scale based on (1024-#users) so we can give one extra share to each user.
483 auto scale_factor = (static_cast<double>(m_max_users) - static_cast<double>(users_with_waiters)) / shares_sum;
484 size_t offset = 0;
485 for (int uid = 0; uid < m_max_users; uid++) {
486 if (share[uid] > 0) {
487 auto shares = static_cast<unsigned>(scale_factor * fair_share / share[uid]) + 1;
488 TRACE(DEBUG, "User " << uid << " has " << shares << " shares");
489 for (unsigned idx = 0; idx < shares; idx++)
490 {
491 waiter_order[offset % m_max_users] = uid;
492 offset++;
493 }
494 }
495 }
496 if (offset < m_max_users) {
497 for (size_t idx = offset; idx < m_max_users; idx++) {
498 waiter_order[idx] = -1;
499 }
500 }
501 // Shuffle the order to randomize the wakeup order.
502 std::shuffle(waiter_order.begin(), waiter_order.end(), std::default_random_engine());
503
504 // Copy the order to the inactive array. We do not shuffle in-place because RAtomics are
505 // not move constructible, which is a requirement for std::shuffle.
506 auto &waiter_order_to_modify = (m_wake_order_active == 0) ? m_wake_order_1 : m_wake_order_0;
507 std::copy(waiter_order.begin(), waiter_order.end(), waiter_order_to_modify.begin());
508
509 // Set the array we just modified to be the active one. Since this is a relaxed write, it could take
510 // some time for other CPUs to see the change; that's OK as this is all stochastic anyway.
511 m_wake_order_active = (m_wake_order_active + 1) % 2;
512
513 m_waiter_offset = 0;
514
515 // If we find ourselves below the concurrency limit because we woke up too few operations in the last
516 // interval, try waking up enough operations to fill the gap. If we race with new incoming operations,
517 // the threads will just go back to sleep.
518 if (users_with_waiters) {
519 m_waiting_users = users_with_waiters;
520 auto io_active = m_io_active.load(std::memory_order_acquire);
521 for (size_t idx = io_active; idx < static_cast<size_t>(m_concurrency_limit); idx++) {
522 NotifyOne();
523 }
524 }
525}
526
527void *
528XrdThrottleManager::RecomputeBootstrap(void *instance)
529{
530 XrdThrottleManager * manager = static_cast<XrdThrottleManager*>(instance);
531 manager->Recompute();
532 return NULL;
533}
534
535void
536XrdThrottleManager::Recompute()
537{
538 while (1)
539 {
540 // The connection counter can accumulate a number of known-idle connections.
541 // We only need to keep long-term memory of idle ones. Take this chance to garbage
542 // collect old connection counters.
543 if (m_max_open || m_max_conns) {
544 const std::lock_guard<std::mutex> lock(m_file_mutex);
545 for (auto iter = m_active_conns.begin(); iter != m_active_conns.end();)
546 {
547 auto & conn_count = *iter;
548 if (!conn_count.second) {
549 iter = m_active_conns.erase(iter);
550 continue;
551 }
552 for (auto iter2 = conn_count.second->begin(); iter2 != conn_count.second->end();) {
553 if (iter2->second == 0) {
554 iter2 = conn_count.second->erase(iter2);
555 } else {
556 iter2++;
557 }
558 }
559 if (!conn_count.second->size()) {
560 iter = m_active_conns.erase(iter);
561 } else {
562 iter++;
563 }
564 }
565 for (auto iter = m_conn_counters.begin(); iter != m_conn_counters.end();) {
566 if (!iter->second) {
567 iter = m_conn_counters.erase(iter);
568 } else {
569 iter++;
570 }
571 }
572 for (auto iter = m_file_counters.begin(); iter != m_file_counters.end();) {
573 if (!iter->second) {
574 iter = m_file_counters.erase(iter);
575 } else {
576 iter++;
577 }
578 }
579 }
580
581 TRACE(DEBUG, "Recomputing fairshares for throttle.");
582 RecomputeInternal();
583 ComputeWaiterOrder();
584 TRACE(DEBUG, "Finished recomputing fairshares for throttle; sleeping for " << m_interval_length_seconds << " seconds.");
585 XrdSysTimer::Wait(static_cast<int>(1000*m_interval_length_seconds));
586 }
587}
588
589/*
590 * The heart of the manager approach.
591 *
592 * This routine periodically recomputes the shares of each current user.
593 * Each user has a "primary" and a "secondary" share. At the end of the
594 * each time interval, the remaining primary share is moved to secondary.
595 * A user can utilize both shares; if both are gone, they must block until
596 * the next recompute interval.
597 *
598 * The secondary share can be "stolen" by any other user; so, if a user
599 * is idle or under-utilizing, their share can be used by someone else.
600 * However, they can never be completely starved, as no one can steal
601 * primary share.
602 *
603 * In this way, we violate the throttle for an interval, but never starve.
604 *
605 */
606void
607XrdThrottleManager::RecomputeInternal()
608{
609 // Compute total shares for this interval;
610 float intervals_per_second = 1.0/m_interval_length_seconds;
611 float total_bytes_shares = m_bytes_per_second / intervals_per_second;
612 float total_ops_shares = m_ops_per_second / intervals_per_second;
613
614 // Compute the number of active users; a user is active if they used
615 // any primary share during the last interval;
616 AtomicBeg(m_compute_var);
617 float active_users = 0;
618 long bytes_used = 0;
619 for (int i=0; i<m_max_users; i++)
620 {
621 int primary = AtomicFAZ(m_primary_bytes_shares[i]);
622 if (primary != m_last_round_allocation)
623 {
624 active_users++;
625 if (primary >= 0)
626 m_secondary_bytes_shares[i] = primary;
627 primary = AtomicFAZ(m_primary_ops_shares[i]);
628 if (primary >= 0)
629 m_secondary_ops_shares[i] = primary;
630 bytes_used += (primary < 0) ? m_last_round_allocation : (m_last_round_allocation-primary);
631 }
632 }
633
634 if (active_users == 0)
635 {
636 active_users++;
637 }
638
639 // Note we allocate the same number of shares to *all* users, not
640 // just the active ones. If a new user becomes active in the next
641 // interval, we'll go over our bandwidth budget just a bit.
642 m_last_round_allocation = static_cast<int>(total_bytes_shares / active_users);
643 int ops_shares = static_cast<int>(total_ops_shares / active_users);
644 TRACE(BANDWIDTH, "Round byte allocation " << m_last_round_allocation << " ; last round used " << bytes_used << ".");
645 TRACE(IOPS, "Round ops allocation " << ops_shares);
646 for (int i=0; i<m_max_users; i++)
647 {
648 m_primary_bytes_shares[i] = m_last_round_allocation;
649 m_primary_ops_shares[i] = ops_shares;
650 }
651
652 AtomicEnd(m_compute_var);
653
654 // Reset the loadshed limit counter.
655 int limit_hit = m_loadshed_limit_hit.exchange(0);
656 TRACE(DEBUG, "Throttle limit hit " << limit_hit << " times during last interval.");
657
658 // Update the IO counters
659 m_compute_var.Lock();
660 m_stable_io_active = m_io_active.load(std::memory_order_acquire);
661 auto io_active = m_stable_io_active;
662 m_stable_io_total = m_io_total;
663 auto io_total = m_stable_io_total;
664 auto io_wait_rep = m_io_active_time.exchange(std::chrono::steady_clock::duration(0).count());
665 m_stable_io_wait += std::chrono::steady_clock::duration(io_wait_rep);
666
667 m_compute_var.UnLock();
668
669 auto io_wait_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_stable_io_wait).count();
670 TRACE(IOLOAD, "Current IO counter is " << io_active << "; total IO active time is " << io_wait_ms << "ms.");
671 if (m_gstream)
672 {
673 char buf[128];
674 auto len = snprintf(buf, 128,
675 R"({"event":"throttle_update","io_wait":%.4f,"io_active":%d,"io_total":%llu})",
676 static_cast<double>(io_wait_ms) / 1000.0, io_active, static_cast<long long unsigned>(io_total));
677 auto suc = (len < 128) ? m_gstream->Insert(buf, len + 1) : false;
678 if (!suc)
679 {
680 TRACE(IOLOAD, "Failed g-stream insertion of throttle_update record (len=" << len << "): " << buf);
681 }
682 }
683 m_compute_var.Broadcast();
684}
685
686/*
687 * Do a simple hash across the username.
688 */
689uint16_t
690XrdThrottleManager::GetUid(const std::string &username)
691{
692 std::hash<std::string> hash_fn;
693 auto hash = hash_fn(username);
694 auto uid = static_cast<uint16_t>(hash % m_max_users);
695 TRACE(DEBUG, "Mapping user " << username << " to UID " << uid);
696 return uid;
697}
698
699/*
700 * Notify a single waiter thread that it can proceed.
701 */
702void
703XrdThrottleManager::NotifyOne()
704{
705 auto &wake_order = (m_wake_order_active == 0) ? m_wake_order_0 : m_wake_order_1;
706
707 for (size_t idx = 0; idx < m_max_users; ++idx)
708 {
709 auto offset = m_waiter_offset.fetch_add(1, std::memory_order_acq_rel);
710 int16_t uid = wake_order[offset % m_max_users];
711 if (uid < 0)
712 {
713 continue;
714 }
715 auto &waiter_info = m_waiter_info[uid];
716 std::unique_lock<std::mutex> lock(waiter_info.m_mutex);
717 if (waiter_info.m_waiting) {
718 waiter_info.NotifyOne(std::move(lock));
719 return;
720 }
721 }
722}
723
724/*
725 * Create an IO timer object; increment the number of outstanding IOs.
726 */
728XrdThrottleManager::StartIOTimer(uint16_t uid, bool &ok)
729{
730 int cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
731 m_io_total++;
732
733 while (m_concurrency_limit >= 0 && cur_counter >= m_concurrency_limit)
734 {
735 // If the user has essentially no concurrency, then we let them
736 // temporarily exceed the limit. This prevents potential waits for
737 // every single read for an infrequent user.
738 if (m_waiter_info[uid].m_concurrency < 1)
739 {
740 break;
741 }
742 m_loadshed_limit_hit++;
743 m_io_active.fetch_sub(1, std::memory_order_acq_rel);
744 TRACE(DEBUG, "ThrottleManager (user=" << uid << "): IO concurrency limit hit; waiting for other IOs to finish.");
745 ok = m_waiter_info[uid].Wait();
746 if (!ok) {
747 TRACE(DEBUG, "ThrottleManager (user=" << uid << "): timed out waiting for other IOs to finish.");
748 return XrdThrottleTimer();
749 }
750 cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
751 }
752
753 ok = true;
754 return XrdThrottleTimer(this, uid);
755}
756
757/*
758 * Finish recording an IO timer.
759 */
760void
761XrdThrottleManager::StopIOTimer(std::chrono::steady_clock::duration & event_duration, uint16_t uid)
762{
763 m_io_active_time += event_duration.count();
764 auto old_active = m_io_active.fetch_sub(1, std::memory_order_acq_rel);
765 m_waiter_info[uid].m_io_time += event_duration.count();
766 if (old_active == static_cast<unsigned>(m_concurrency_limit))
767 {
768 // If we are below the concurrency limit threshold and have another waiter
769 // for our user, then execute it immediately. Otherwise, we will give
770 // someone else a chance to run (as we have gotten more than our share recently).
771 unsigned waiting_users = m_waiting_users;
772 if (waiting_users == 0) waiting_users = 1;
773 if (m_waiter_info[uid].m_concurrency < m_concurrency_limit / waiting_users)
774 {
775 std::unique_lock<std::mutex> lock(m_waiter_info[uid].m_mutex);
776 if (m_waiter_info[uid].m_waiting > 0)
777 {
778 m_waiter_info[uid].NotifyOne(std::move(lock));
779 return;
780 }
781 }
782 NotifyOne();
783 }
784}
785
786/*
787 * Check the counters to see if we have hit any throttle limits in the
788 * current time period. If so, shed the client randomly.
789 *
790 * If the client has already been load-shedded once and reconnected to this
791 * server, then do not load-shed it again.
792 */
793bool
794XrdThrottleManager::CheckLoadShed(const std::string &opaque)
795{
796 if (m_loadshed_port == 0)
797 {
798 return false;
799 }
800 if (m_loadshed_limit_hit == 0)
801 {
802 return false;
803 }
804 if (static_cast<unsigned>(rand()) % 100 > m_loadshed_frequency)
805 {
806 return false;
807 }
808 if (opaque.empty())
809 {
810 return false;
811 }
812 return true;
813}
814
815void
816XrdThrottleManager::PrepLoadShed(const char * opaque, std::string &lsOpaque)
817{
818 if (m_loadshed_port == 0)
819 {
820 return;
821 }
822 if (opaque && opaque[0])
823 {
824 XrdOucEnv env(opaque);
825 // Do not load shed client if it has already been done once.
826 if (env.Get("throttle.shed") != 0)
827 {
828 return;
829 }
830 lsOpaque = opaque;
831 lsOpaque += "&throttle.shed=1";
832 }
833 else
834 {
835 lsOpaque = "throttle.shed=1";
836 }
837}
838
839void
840XrdThrottleManager::PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
841{
842 host = m_loadshed_host;
843 host += "?";
844 host += opaque;
845 port = m_loadshed_port;
846}
847
848bool
849XrdThrottleManager::Waiter::Wait()
850{
851 auto timeout = std::chrono::steady_clock::now() + m_manager->m_max_wait_time;
852 {
853 std::unique_lock<std::mutex> lock(m_mutex);
854 m_waiting++;
855 m_cv.wait_until(lock, timeout,
856 [&] { return m_manager->m_io_active.load(std::memory_order_acquire) < static_cast<unsigned>(m_manager->m_concurrency_limit) || std::chrono::steady_clock::now() >= timeout; });
857 m_waiting--;
858 }
859 if (std::chrono::steady_clock::now() > timeout) {
860 return false;
861 }
862 return true;
863}
864
865/*
866 * Load per-user limits from an INI-style configuration file.
867 * Format:
868 * [default]
869 * name = *
870 * maxconn = 200
871 *
872 * [user1]
873 * name = user1
874 * maxconn = 25
875 *
876 * [wildcarduser]
877 * name = wildcarduser*
878 * maxconn = 10
879 */
880int
881XrdThrottleManager::LoadUserLimits(const std::string &config_file)
882{
883 INIReader reader(config_file);
884 if (reader.ParseError() < 0)
885 {
886 m_log->Emsg("ThrottleManager", errno, "Unable to open per-user configuration file", config_file.c_str());
887 return 1;
888 }
889 else if (reader.ParseError() > 0)
890 {
891 std::stringstream ss;
892 ss << "Parse error on line " << reader.ParseError() << " of file " << config_file;
893 m_log->Emsg("ThrottleManager", ss.str().c_str());
894 return 1;
895 }
896
897 std::unordered_map<std::string, UserLimit> new_limits;
898
899 // Process all sections
900 for (const auto &section : reader.Sections())
901 {
902 // Get the name parameter (required for all sections)
903 std::string name = reader.Get(section, "name", "");
904 if (name.empty())
905 {
906 m_log->Say("ThrottleManager", "Section", section.c_str(), "missing 'name' parameter; skipping");
907 continue;
908 }
909
910 long max_conn = reader.GetInteger(section, "maxconn", 0);
911 if (max_conn <= 0)
912 {
913 m_log->Say("ThrottleManager", "Section", section.c_str(), "has invalid or missing 'maxconn' parameter; skipping");
914 continue;
915 }
916
917 UserLimit limit;
918 limit.max_conn = static_cast<unsigned long>(max_conn);
919 // Check if name contains wildcard (including '*' for default/catch-all)
920 limit.is_wildcard = (name.find('*') != std::string::npos);
921 new_limits[name] = limit;
922 }
923
924 // Atomically replace the limits map
925 size_t num_entries = new_limits.size();
926 {
927 std::unique_lock<std::shared_mutex> lock(m_user_limits_mutex);
928 m_user_limits = std::move(new_limits);
929 }
930
931 m_log->Say("ThrottleManager", "Loaded", std::to_string(num_entries).c_str(), "per-user limit entries from", config_file.c_str());
932 return 0;
933}
934
935/*
936 * Reload per-user limits from the configured file.
937 */
938int
940{
941 if (m_user_config_file.empty())
942 {
943 m_log->Emsg("ThrottleManager", "No per-user configuration file specified");
944 return 1;
945 }
946 return LoadUserLimits(m_user_config_file);
947}
948
949/*
950 * Get the per-user connection limit for a given username.
951 * Returns 0 if no per-user limit is set (use global), otherwise returns the limit.
952 * Supports wildcard matching (e.g., "user*" matches "user1", "user2", etc.)
953 * Special case: "*" matches all users (default/catch-all)
954 * Priority: exact match > wildcard match (longest prefix) > "*" > global
955 */
956unsigned long
957XrdThrottleManager::GetUserMaxConn(const std::string &username)
958{
959 std::shared_lock lock(m_user_limits_mutex);
960
961 // First, try exact match
962 auto exact_iter = m_user_limits.find(username);
963 if (exact_iter != m_user_limits.end() && !exact_iter->second.is_wildcard)
964 {
965 return exact_iter->second.max_conn;
966 }
967
968 // Then, try wildcard matches (prefer longest matching prefix)
969 unsigned long best_match = 0;
970 size_t best_prefix_len = 0;
971 unsigned long catch_all_match = 0;
972
973 for (const auto &entry : m_user_limits)
974 {
975 if (!entry.second.is_wildcard) continue;
976
977 const std::string &pattern = entry.first;
978
979 // Special case: "*" is a catch-all pattern - store it but don't use it yet
980 if (pattern == "*")
981 {
982 catch_all_match = entry.second.max_conn;
983 continue;
984 }
985
986 size_t wildcard_pos = pattern.find('*');
987 if (wildcard_pos == std::string::npos) continue;
988
989 // Extract prefix before wildcard
990 std::string prefix = pattern.substr(0, wildcard_pos);
991 if (username.length() >= prefix.length() &&
992 username.substr(0, prefix.length()) == prefix)
993 {
994 // Prefer longer prefix matches
995 if (prefix.length() > best_prefix_len)
996 {
997 best_prefix_len = prefix.length();
998 best_match = entry.second.max_conn;
999 }
1000 }
1001 }
1002
1003 // If we found a specific wildcard match, use it
1004 if (best_match > 0) return best_match;
1005
1006 // If no specific wildcard match, use catch-all if available
1007 // return 0 if not
1008 return catch_all_match;
1009
1010}
#define DEBUG(x)
#define AtomicFSub(w, x, y)
#define AtomicFAZ(x)
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
#define TRACE(act, x)
Definition XrdTrace.hh:63
char * Get(const char *varname)
Definition XrdOucEnv.hh:69
XrdSecAttr * Get(const void *sigkey)
char * vorg
Entity's virtual organization(s).
XrdSecEntityAttr * eaAPI
non-const API to attributes
char * name
Entity's name.
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static unsigned long Num(void)
static void Wait(int milliseconds)
void StopIOTimer(std::chrono::steady_clock::duration &event_duration, uint16_t uid)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void FromConfig(XrdThrottle::Configuration &config)
void Apply(int reqsize, int reqops, int uid)
std::tuple< std::string, uint16_t > GetUserInfo(const XrdSecEntity *client)
XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
friend class XrdThrottleTimer
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
int LoadUserLimits(const std::string &config_file)
void SetMaxWait(unsigned long max_wait)
unsigned long GetUserMaxConn(const std::string &username)
void SetMaxConns(unsigned long max_conns)
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
long long GetLoadshedPort() const
long long GetThrottleDataRate() const
const std::string & GetUserConfigFile() const
long long GetThrottleConcurrency() const
const std::string & GetLoadshedHost() const
long long GetLoadshedFreq() const
long long GetThrottleIOPSRate() const
long long GetThrottleRecomputeIntervalMS() const