XRootD
Loading...
Searching...
No Matches
XrdPollE.icc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d P o l l E . i c c */
4/* */
5/* (c) 2008 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <fcntl.h>
32#include <cstdlib>
33#include <sys/types.h>
34#include <sys/stat.h>
35#include <sys/epoll.h>
36#include <sys/eventfd.h>
37
38#include "Xrd/XrdPollE.hh"
39#include "Xrd/XrdScheduler.hh"
40
41/******************************************************************************/
42/* n e w P o l l e r */
43/******************************************************************************/
44
45XrdPoll *XrdPoll::newPoller(int pollid, int maxfd)
46{
47 int pfd, wfd, bytes, alignment, pagsz = getpagesize();
48 struct epoll_event *pp;
49
50// Open the /dev/poll driver
51//
52#ifndef EPOLL_CLOEXEC
53 if ((pfd = epoll_create(maxfd)) >= 0) fcntl(pfd, F_SETFD, FD_CLOEXEC);
54 else
55#else
56 if ((pfd = epoll_create1(EPOLL_CLOEXEC)) < 0)
57#endif
58 {Log.Emsg("Poll", errno, "create epoll device"); return 0;}
59
60 if ((wfd = eventfd(0, EFD_CLOEXEC)) < 0)
61 {Log.Emsg("Poll", errno,
62 "create an eventfd as the wait-poller descriptor");
63 close(pfd);
64 return 0;
65 }
66
67// Calculate the size of the poll table and allocate it
68//
69 bytes = maxfd * sizeof(struct epoll_event);
70 alignment = (bytes < pagsz ? 1024 : pagsz);
71 if (posix_memalign((void **)&pp, alignment, bytes))
72 {Log.Emsg("Poll", ENOMEM, "create poll table");
73 close(wfd);
74 close(pfd);
75 return 0;
76 }
77
78// Create new poll object
79//
80 memset((void *)pp, 0, bytes);
81 return (XrdPoll *)new XrdPollE(pp, maxfd, pfd, wfd);
82}
83
84/******************************************************************************/
85/* D e s t r u c t o r */
86/******************************************************************************/
87
89{
90 if (PollTab) free(PollTab);
91 if (WaitFd >= 0) close(WaitFd);
92 if (PollDfd >= 0) close(PollDfd);
93}
94
95/******************************************************************************/
96/* A d d W a i t F d */
97/******************************************************************************/
98
99int XrdPollE::AddWaitFd()
100{
101 const unsigned int myPollEvts = EPOLLIN;
102 struct epoll_event myEvent = {myPollEvts, {(void *)&WaitFd}};
103
104// Add the waitfd to the poll set
105//
106 if (epoll_ctl(PollDfd, EPOLL_CTL_ADD, WaitFd, &myEvent) < 0)
107 {int rc = errno;
108 Log.Emsg("Poll", rc, "include the wait FD in the poll set");
109 return rc;
110 }
111
112 return 0;
113}
114
115/******************************************************************************/
116/* D i s a b l e */
117/******************************************************************************/
118
119void XrdPollE::Disable(XrdPollInfo &pInfo, const char *etxt)
120{
121
122// Simply return if the link is already disabled
123//
124 if (!pInfo.isEnabled) return;
125
126// If Linux 2.6.9 we use EPOLLONESHOT to automatically disable a polled fd.
127// So, the Disable() method need not do anything. Prior kernels did not have
128// this mechanism so we need to do this manually.
129//
130#ifndef EPOLLONESHOT
131 struct epoll_event myEvents = {0, (void *)&pInfo};
132
133// Disable this fd. Unlike solaris, epoll_ctl() does not block when the pollfd
134// is being waited upon by another thread.
135//
136 if (epoll_ctl(PollDfd, EPOLL_CTL_MOD, pInfo.FD, &myEvents))
137 {Log.Emsg("Poll", errno, "disable link", lp->ID); return;}
138#endif
139
140// Trace this event
141//
142 pInfo.isEnabled = false;
143 TRACEI(POLL, "Poller " <<PID <<" async disabling link " <<pInfo.FD);
144
145// Check if this link needs to be rescheduled. If so, the caller better have
146// the link opMutex lock held for this to work!
147//
148 if (etxt && Finish(pInfo, etxt)) Sched.Schedule((XrdJob *)&pInfo.Link);
149}
150
151/******************************************************************************/
152/* E n a b l e */
153/******************************************************************************/
154
156{
157 struct epoll_event myEvents = {ePollEvents, {(void *)&pInfo}};
158
159// Simply return if the link is already enabled
160//
161 if (pInfo.isEnabled) return 1;
162
163// Enable this fd. Unlike solaris, epoll_ctl() does not block when the pollfd
164// is being waited upon by another thread.
165//
166 pInfo.isEnabled = true;
167 if (epoll_ctl(PollDfd, EPOLL_CTL_MOD, pInfo.FD, &myEvents))
168 {Log.Emsg("Poll", errno, "enable link", pInfo.Link.ID);
169 pInfo.isEnabled = false;
170 return 0;
171 }
172
173// Do final processing
174//
175 TRACE(POLL, "Poller " <<PID <<" enabled " <<pInfo.Link.ID);
176 numEnabled++;
177 return 1;
178}
179
180/******************************************************************************/
181/* E x c l u d e */
182/******************************************************************************/
183
185{
186
187// Make sure this link is not enabled
188//
189 if (pInfo.isEnabled)
190 {Log.Emsg("Poll", "Detach of enabled link", pInfo.Link.ID);
191 Disable(pInfo);
192 }
193
194// Forcibly remove the link from the poll set
195//
196 remFD(pInfo, 0);
197
198// Wait to make sure the poll thread has completed handling the last set of
199// events which may have included our Link. The handing includes examining the
200// Link's PollInfo before deciding if to schedule the Link. After we return,
201// the PollInfo may be reset and the Link could be subsequently reused so we
202// must make sure that the there are no more events.
203// This is a separate issue from ensuring that if this Link is Scheduled it
204// has completed processing. In the usual flow this is ensured by calling
205// Close -> Detach -> Exclude from within the Link's schedule task.
206//
207 Wait4Poller();
208}
209
210/******************************************************************************/
211/* H a n d l e W a i t F d */
212/******************************************************************************/
213
214void XrdPollE::HandleWaitFd(const unsigned int events)
215{
216// Used by the polling thread to signal waiters that a polling loop has been
217// completed.
218//
219 eventfd_t ic, cnt;
220
221// We don't expect anything but EPOLLIN. But if we get an error (errors
222// can be reported, despite not being selected events) abort rather than
223// potentially keeping the poll thread busy repeatedly looping.
224//
225 if (!(events & EPOLLIN) || (events & (EPOLLERR | EPOLLHUP)))
226 {char eBuff[64];
227 Log.Emsg("Poll", "wait-poller handler:", x2Text(events, eBuff));
228 if (events & (EPOLLERR | EPOLLHUP))
229 abort();
230 return;
231 }
232
233 if (eventfd_read(WaitFd, &cnt) < 0)
234 {Log.Emsg("Poll", errno, "read from the wait-poller descriptor");
235 return;
236 }
237
238 for (ic=0;ic<cnt;++ic) WaitFdSem.Post();
239 for (ic=0;ic<cnt;++ic) WaitFdSem2.Wait();
240}
241
242/******************************************************************************/
243/* I n c l u d e */
244/******************************************************************************/
245
247{
248 struct epoll_event myEvent = {0, {(void *)&pInfo}};
249 int rc;
250
251// Add this fd to the poll set
252//
253 if ((rc = epoll_ctl(PollDfd, EPOLL_CTL_ADD, pInfo.FD, &myEvent)) < 0)
254 Log.Emsg("Poll", errno, "include link", pInfo.Link.ID);
255
256// All done
257//
258 return rc == 0;
259}
260
261/******************************************************************************/
262/* r e m F D */
263/******************************************************************************/
264
265void XrdPollE::remFD(XrdPollInfo &pInfo, unsigned int events)
266{
267 struct epoll_event myEvents = {0, {(void *)&pInfo}};
268
269// It works out that ONESHOT mode or even CTL_MOD requests do not necessarily
270// prevent epoll_wait() from returning on an error event. So, we must manually
271// remove the fd from the set and assume the logic was actually correct. If it
272// wasn't then the client will eventually timeout and retry the request. This
273// may cause a double remove; so we don't consider these an arror.
274//
275 if (pInfo.FD > 0)
276 {TRACEI(POLL, "Poller " <<PID <<" removing FD " <<pInfo.FD);
277 if (epoll_ctl(PollDfd, EPOLL_CTL_DEL, pInfo.FD, &myEvents)
278 && (errno != ENOENT || events != 0))
279 {const char *why;
280 char buff[96];
281 if (events & (EPOLLHUP | EPOLLRDHUP)) why = "sever";
282 else if (events & EPOLLERR) why = "error";
283 else why = "disc";
284 snprintf(buff, sizeof(buff), "exclude fd %d during %s (%x) event; link",
285 pInfo.FD, why, events);
286 Log.Emsg("Poll", errno, buff, pInfo.Link.ID);
287 }
288 }
289}
290
291/******************************************************************************/
292/* S t a r t */
293/******************************************************************************/
294
295void XrdPollE::Start(XrdSysSemaphore *syncsem, int &retcode)
296{
297 char eBuff[64];
298 int rc, i, numpolled, num2sched;
299 unsigned int waitFdEvents;
300 bool haveWaiters;
301 XrdJob *jfirst, *jlast;
302 const short pollOK = EPOLLIN | EPOLLPRI;
303 XrdLink *lp;
304 XrdPollInfo *pInfo;
305
306 if ((rc = AddWaitFd()))
307 {retcode = rc;
308 syncsem->Post();
309 return;
310 }
311
312// Indicate to the starting thread that all went well
313//
314 retcode = 0;
315 syncsem->Post();
316
317// Now start dispatching links that are ready
318//
319 do {do {numpolled = epoll_wait(PollDfd, PollTab, PollMax, -1);}
320 while (numpolled < 0 && errno == EINTR);
321 if (numpolled == 0) continue;
322 if (numpolled < 0)
323 {Log.Emsg("Poll", errno, "poll for events");
324 abort();
325 }
326 numEvents += numpolled;
327
328 // Checkout which links must be dispatched (no need to lock)
329 //
330 jfirst = jlast = 0; num2sched = 0;
331 haveWaiters = false; waitFdEvents = 0;
332 for (i = 0; i < numpolled; i++)
333 {if (PollTab[i].data.ptr == &WaitFd)
334 {haveWaiters = true; waitFdEvents = PollTab[i].events;}
335 else if ((pInfo = (XrdPollInfo *)PollTab[i].data.ptr))
336 {if (!(pInfo->isEnabled) && pInfo->FD >= 0)
337 remFD(*pInfo, PollTab[i].events);
338 else {pInfo->isEnabled = 0;
339 if (!(PollTab[i].events & pollOK)
340 || (PollTab[i].events & POLLRDHUP))
341 Finish(*pInfo, x2Text(PollTab[i].events, eBuff));
342 lp = &(pInfo->Link);
343 lp->NextJob = jfirst; jfirst = (XrdJob *)lp;
344 if (!jlast) jlast=(XrdJob *)lp;
345 num2sched++;
346#ifndef EPOLLONESHOT
347 PollTab[i].events = 0;
348 if (epoll_ctl(PollDfd,EPOLL_CTL_MOD,pInfo.FD,&PollTab[i]))
349 Log.Emsg("Poll",errno,"disable link",pInfo.Link.ID);
350#endif
351 }
352 } else Log.Emsg("Poll", "null link event!!!!");
353 }
354
355 // Schedule the polled links
356 //
357 if (num2sched == 1) Sched.Schedule(jfirst);
358 else if (num2sched) Sched.Schedule(num2sched, jfirst, jlast);
359
360 if (haveWaiters) HandleWaitFd(waitFdEvents);
361 } while(1);
362}
363
364/******************************************************************************/
365/* W a i t 4 P o l l e r */
366/******************************************************************************/
367
368void XrdPollE::Wait4Poller()
369{
370// Triggers the polling thread to complete at least one event processing loop
371// and we wait for this.
372//
373 if (eventfd_write(WaitFd, 1) < 0)
374 {Log.Emsg("Poll", errno, "write to the wait-poller descriptor");
375 return;
376 }
377
378// First semaphore ensures we wait for polling thread to have processed a batch
379// of waitfd event and thus be outside of epoll_wait.
380// Second semaphore ensures polling thread waits for all waits in its batch
381// of waitfd events to have been completed.
382//
383 WaitFdSem.Wait();
384 WaitFdSem2.Post();
385}
386
387/******************************************************************************/
388/* x 2 T e x t */
389/******************************************************************************/
390
391const char *XrdPollE::x2Text(unsigned int events, char *buff)
392{
393 if (events & EPOLLERR) return "socket error";
394
395 if (events & (EPOLLHUP | EPOLLRDHUP)) return "hangup";
396
397 sprintf(buff, "unusual event (%.4x)", events);
398 return buff;
399}
#define EPOLLRDHUP
Definition XrdPollE.hh:37
#define close(a)
Definition XrdPosix.hh:48
#define TRACE(act, x)
Definition XrdTrace.hh:63
#define TRACEI(act, x)
Definition XrdTrace.hh:66
XrdJob * NextJob
Definition XrdJob.hh:46
const char * x2Text(unsigned int evf, char *buff)
Definition XrdPollE.icc:391
void Exclude(XrdPollInfo &pInfo)
Definition XrdPollE.icc:184
void Disable(XrdPollInfo &pInfo, const char *etxt=0)
Definition XrdPollE.icc:119
void Start(XrdSysSemaphore *syncp, int &rc)
Definition XrdPollE.icc:295
int Enable(XrdPollInfo &pInfo)
Definition XrdPollE.icc:155
int Include(XrdPollInfo &pInfo)
Definition XrdPollE.icc:246
XrdLink & Link
XrdPoll()
Definition XrdPoll.cc:118
int PID
Definition XrdPoll.hh:82
int numEvents
Definition XrdPoll.hh:133
static XrdPoll * newPoller(int pollid, int numfd)
Definition XrdPollE.icc:45
static int Finish(XrdPollInfo &pInfo, const char *etxt=0)
Definition XrdPoll.cc:204
int numEnabled
Definition XrdPoll.hh:132
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysError Log
Definition XrdConfig.cc:113