#include <yaz/xmalloc.h>
#include <yaz/mutex.h>
#include <yaz/poll.h>
+#include <sys/resource.h>
#include "eventl.h"
#include "sel_thread.h"
int log_level;
YAZ_MUTEX iochan_mutex;
int size_fds;
+ int limit_fd;
struct yaz_poll_fd *fds;
};
-iochan_man_t iochan_man_create(int no_threads)
+iochan_man_t iochan_man_create(int no_threads, int max_sockets)
{
iochan_man_t man = xmalloc(sizeof(*man));
man->channel_list = 0;
man->iochan_mutex = 0;
man->size_fds = 0;
man->fds = 0;
+ man->limit_fd = 0;
+#if HAVE_GETRLIMIT
+ {
+ struct rlimit limit_data;
+ getrlimit(RLIMIT_NOFILE, &limit_data);
+ yaz_log(YLOG_LOG, "getrlimit NOFILE cur=%ld max=%ld",
+ (long) limit_data.rlim_cur, (long) limit_data.rlim_max);
+ man->limit_fd = limit_data.rlim_cur - 200;
+ }
+#endif
+ if (max_sockets)
+ man->limit_fd = max_sockets;
+ yaz_log(YLOG_LOG, "iochan threads %d limit fd %d", no_threads,
+ man->limit_fd);
yaz_mutex_create(&man->iochan_mutex);
return man;
}
}
}
-void iochan_add(iochan_man_t man, IOCHAN chan)
+int iochan_add(iochan_man_t man, IOCHAN chan, int slack)
{
- chan->man = man;
- yaz_mutex_enter(man->iochan_mutex);
+ int r = 0, no_fds = 0;
+ IOCHAN p;
+
yaz_log(man->log_level, "iochan_add : chan=%p channel list=%p", chan,
man->channel_list);
- chan->next = man->channel_list;
- man->channel_list = chan;
+ yaz_mutex_enter(man->iochan_mutex);
+ for (p = man->channel_list; p; p = p->next)
+ {
+ if (p->fd >= 0)
+ no_fds++;
+ }
+ if (slack >= 0 && man->limit_fd > 0 && no_fds >= man->limit_fd - slack)
+ {
+ r = -1;
+ yaz_log(YLOG_WARN, "max channels %d in use", no_fds);
+ }
+ else
+ {
+ chan->man = man;
+ chan->next = man->channel_list;
+ man->channel_list = chan;
+ }
yaz_mutex_leave(man->iochan_mutex);
+ return r;
+}
+
+void iochan_destroy(IOCHAN chan)
+{
+ if (chan->man)
+ chan->destroyed = 1;
+ else
+ iochan_destroy_real(chan);
}
IOCHAN iochan_create(int fd, IOC_CALLBACK cb, int flags, const char *name)
{
p->poll_offset = i;
fds[i].client_data = p;
- fds[i].fd = p->fd;
+ fds[i].fd = -1;
fds[i].input_mask = 0;
if (p->thread_users > 0)
continue;
fds[i].input_mask |= yaz_poll_write;
if (p->flags & EVENT_EXCEPT)
fds[i].input_mask |= yaz_poll_except;
+ if (fds[i].input_mask)
+ fds[i].fd = p->fd;
}
- yaz_log(man->log_level, "yaz_poll begin nofds=%d", no_fds);
+ assert(i == no_fds);
+ yaz_log(man->log_level, "yaz_poll begin tv_sec=%d nofds=%d", tv_sec,
+ no_fds);
res = yaz_poll(fds, no_fds, tv_sec, 0);
yaz_log(man->log_level, "yaz_poll returned res=%d", res);
if (res < 0)
else
{
yaz_log(YLOG_ERRNO | YLOG_WARN, "poll");
- return 0;
+ abort();
}
}
if (man->sel_fd != -1)