1 /* This file is part of the yazpp toolkit.
2 * Copyright (C) 1998-2012 Index Data and Mike Taylor
3 * See the file LICENSE for details.
13 #include <sys/types.h>
27 #include <yazpp/socket-manager.h>
30 using namespace yazpp_1;
32 struct SocketManager::SocketEntry {
33 ISocketObserver *observer;
42 struct SocketManager::SocketEvent {
43 ISocketObserver *observer;
45 SocketEvent *next; // front in queue
46 SocketEvent *prev; // back in queue
49 struct SocketManager::Rep {
50 void putEvent(SocketEvent *event);
51 SocketEvent *getEvent();
52 void removeEvent(ISocketObserver *observer);
53 void inspect_poll_result(int res, struct yaz_poll_fd *fds, int no_fds,
55 SocketEntry **lookupObserver(ISocketObserver *observer);
56 SocketEntry *observers; // all registered observers
57 SocketEvent *queue_front;
58 SocketEvent *queue_back;
62 SocketManager::SocketEntry **SocketManager::Rep::lookupObserver(
63 ISocketObserver *observer)
67 for (se = &observers; *se; se = &(*se)->next)
68 if ((*se)->observer == observer)
73 int SocketManager::getNumberOfObservers()
77 for (se = m_p->observers; se; se = se->next, i++)
82 void SocketManager::addObserver(int fd, ISocketObserver *observer)
86 se = *m_p->lookupObserver(observer);
90 se->next= m_p->observers;
92 se->observer = observer;
96 se->last_activity = 0;
100 void SocketManager::deleteObserver(ISocketObserver *observer)
102 SocketEntry **se = m_p->lookupObserver(observer);
105 m_p->removeEvent(observer);
106 SocketEntry *se_tmp = *se;
112 void SocketManager::deleteObservers()
114 SocketEntry *se = m_p->observers;
118 SocketEntry *se_next = se->next;
125 void SocketManager::maskObserver(ISocketObserver *observer, int mask)
129 yaz_log(m_p->log, "obs=%p read=%d write=%d except=%d", observer,
130 mask & SOCKET_OBSERVE_READ,
131 mask & SOCKET_OBSERVE_WRITE,
132 mask & SOCKET_OBSERVE_EXCEPT);
134 se = *m_p->lookupObserver(observer);
139 void SocketManager::timeoutObserver(ISocketObserver *observer,
144 se = *m_p->lookupObserver(observer);
146 se->timeout = timeout;
149 void SocketManager::Rep::inspect_poll_result(int res, struct yaz_poll_fd *fds,
150 int no_fds, int timeout)
153 yaz_log(log, "yaz_poll returned res=%d", res);
154 time_t now = time(0);
156 int no_put_events = 0;
157 int no_lost_observers = 0;
159 for (i = 0; i < no_fds; i++)
162 for (p = observers; p; p = p->next)
163 if (p->fd == fds[i].fd)
167 // m_p->observers list changed since poll started
172 enum yaz_poll_mask output_mask = fds[i].output_mask;
175 if (output_mask & yaz_poll_read)
176 mask |= SOCKET_OBSERVE_READ;
178 if (output_mask & yaz_poll_write)
179 mask |= SOCKET_OBSERVE_WRITE;
181 if (output_mask & yaz_poll_except)
182 mask |= SOCKET_OBSERVE_EXCEPT;
186 SocketEvent *event = new SocketEvent;
187 p->last_activity = now;
188 event->observer = p->observer;
192 yaz_log(log, "putEvent I/O mask=%d", mask);
194 else if (res == 0 && p->timeout_this == timeout)
196 SocketEvent *event = new SocketEvent;
197 assert(p->last_activity);
198 yaz_log(log, "putEvent timeout fd=%d, now = %ld "
199 "last_activity=%ld timeout=%d",
200 p->fd, now, p->last_activity, p->timeout);
201 p->last_activity = now;
202 event->observer = p->observer;
203 event->event = SOCKET_OBSERVE_TIMEOUT;
209 SocketEvent *event = getEvent();
212 event->observer->socketNotify(event->event);
217 if (no_lost_observers == 0)
220 yaz_log(YLOG_WARN, "unhandled socket event. yaz_poll returned %d",
222 yaz_log(YLOG_WARN, "no_put_events=%d no_fds=%d i=%d timeout=%d",
223 no_put_events, no_fds, i, timeout);
228 int SocketManager::processEvent()
231 SocketEvent *event = m_p->getEvent();
233 yaz_log(m_p->log, "SocketManager::processEvent manager=%p", this);
236 event->observer->socketNotify(event->event);
242 time_t now = time(0);
245 for (p = m_p->observers; p; p = p->next)
250 struct yaz_poll_fd *fds = new yaz_poll_fd [no_fds];
251 for (i = 0, p = m_p->observers; p; p = p->next, i++)
255 if (p->mask & SOCKET_OBSERVE_READ)
256 input_mask += yaz_poll_read;
257 if (p->mask & SOCKET_OBSERVE_WRITE)
258 input_mask += yaz_poll_write;
259 if (p->mask & SOCKET_OBSERVE_EXCEPT)
260 input_mask += yaz_poll_except;
261 if (p->timeout > 0 ||
262 (p->timeout == 0 && (p->mask & SOCKET_OBSERVE_WRITE) == 0))
265 timeout_this = p->timeout;
266 if (p->last_activity)
267 timeout_this -= now - p->last_activity;
269 p->last_activity = now;
270 if (timeout_this < 0 || timeout_this > 2147483646)
272 if (timeout == -1 || timeout_this < timeout)
273 timeout = timeout_this;
274 p->timeout_this = timeout_this;
275 yaz_log(m_p->log, "SocketManager::select timeout_this=%d",
279 p->timeout_this = -1;
280 fds[i].input_mask = (enum yaz_poll_mask) input_mask;
284 while ((res = yaz_poll(fds, no_fds, timeout, 0)) < 0 && pass < 10)
291 yaz_log(YLOG_ERRNO|YLOG_WARN, "yaz_poll");
292 yaz_log(YLOG_WARN, "errno=%d timeout=%d", errno, timeout);
296 m_p->inspect_poll_result(res, fds, no_fds, timeout);
299 return res >= 0 ? 1 : -1;
302 // n p n p ...... n p n p
305 void SocketManager::Rep::putEvent(SocketEvent *event)
307 // put in back of queue
310 queue_back->prev = event;
315 assert(!queue_front);
318 event->next = queue_back;
323 SocketManager::SocketEvent *SocketManager::Rep::getEvent()
325 // get from front of queue
326 SocketEvent *event = queue_front;
330 queue_front = event->prev;
334 queue_front->next = 0;
341 void SocketManager::Rep::removeEvent(ISocketObserver *observer)
343 SocketEvent *ev = queue_back;
346 SocketEvent *ev_next = ev->next;
347 if (observer == ev->observer)
350 ev->prev->next = ev->next;
352 queue_back = ev->next;
354 ev->next->prev = ev->prev;
356 queue_front = ev->prev;
363 SocketManager::SocketManager()
367 m_p->queue_front = 0;
369 m_p->log = YLOG_DEBUG;
372 SocketManager::~SocketManager()
380 * c-file-style: "Stroustrup"
381 * indent-tabs-mode: nil
383 * vim: shiftwidth=4 tabstop=8 expandtab