From 186f7ffa23b07da1ef6549a1dc96fea1f4ea37e2 Mon Sep 17 00:00:00 2001 From: Adam Dickmeiss Date: Tue, 5 Jan 2010 22:14:25 +0100 Subject: [PATCH] Thread pool for server --- include/yaz/Makefile.am | 3 +- include/yaz/sock_man.h | 7 +- include/yaz/srv.h | 94 ++++++++++++ include/yaz/tpool.h | 64 ++++++++ src/Makefile.am | 2 +- src/nanohttp.c | 143 ------------------ src/sock_man.c | 38 +++-- src/srv.c | 382 +++++++++++++++++++++++++++++++++++++++++++++++ src/tpool.c | 164 ++++++++++++++++++++ test/Makefile.am | 7 +- test/tst_srv.c | 104 +++++++++++++ 11 files changed, 844 insertions(+), 164 deletions(-) create mode 100644 include/yaz/srv.h create mode 100644 include/yaz/tpool.h delete mode 100644 src/nanohttp.c create mode 100644 src/srv.c create mode 100644 src/tpool.c create mode 100644 test/tst_srv.c diff --git a/include/yaz/Makefile.am b/include/yaz/Makefile.am index cf41239..bf110c9 100644 --- a/include/yaz/Makefile.am +++ b/include/yaz/Makefile.am @@ -19,7 +19,8 @@ pkginclude_HEADERS= backend.h ccl.h ccl_xml.h cql.h rpn2cql.h comstack.h \ z-grs.h z-mterm2.h z-opac.h z-rrf1.h z-rrf2.h z-sum.h z-sutrs.h z-uifr1.h \ z-univ.h z-oclcui.h zes-expi.h zes-exps.h zes-order.h zes-pquery.h \ zes-psched.h zes-admin.h zes-pset.h zes-update.h zes-update0.h \ - zoom.h z-charneg.h charneg.h soap.h srw.h zgdu.h matchstr.h + zoom.h z-charneg.h charneg.h soap.h srw.h zgdu.h matchstr.h \ + sock_man.h srv.h EXTRA_DIST = yaz-version.h.in diff --git a/include/yaz/sock_man.h b/include/yaz/sock_man.h index 0efc9b1..9748491 100644 --- a/include/yaz/sock_man.h +++ b/include/yaz/sock_man.h @@ -47,11 +47,14 @@ YAZ_EXPORT void yaz_sock_man_destroy(yaz_sock_man_t man); YAZ_EXPORT -yaz_sock_chan_t yaz_sock_chan_new(yaz_sock_man_t srv, int fd, void *data, +yaz_sock_chan_t yaz_sock_man_wait(yaz_sock_man_t man); + +YAZ_EXPORT +yaz_sock_chan_t yaz_sock_chan_new(yaz_sock_man_t man, int fd, void *data, unsigned mask); YAZ_EXPORT -void yaz_sock_chan_destroy(yaz_sock_man_t srv, yaz_sock_chan_t p); +void yaz_sock_chan_destroy(yaz_sock_chan_t p); YAZ_EXPORT void yaz_sock_chan_set_mask(yaz_sock_chan_t chan, unsigned mask); diff --git a/include/yaz/srv.h b/include/yaz/srv.h new file mode 100644 index 0000000..b588172 --- /dev/null +++ b/include/yaz/srv.h @@ -0,0 +1,94 @@ +/* This file is part of the YAZ toolkit. + * Copyright (C) 1995-2009 Index Data. + * All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Index Data nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * \file + * \brief socket manager + */ +#ifndef YAZ_SRV_H +#define YAZ_SRV_H + +#include +#include +#include + +YAZ_BEGIN_CDECL + +typedef struct yaz_srv_s *yaz_srv_t; +typedef struct yaz_pkg_s *yaz_pkg_t; + +struct cs_session; + +typedef void (yaz_srv_gdu_handler_t)(yaz_pkg_t pkg, void *user); +typedef void *(yaz_srv_session_handler_t)(struct cs_session *cs); + +YAZ_EXPORT +yaz_srv_t yaz_srv_create(const char **listeners_str); + +YAZ_EXPORT +void yaz_srv_destroy(yaz_srv_t p); + +YAZ_EXPORT +void yaz_srv_run(yaz_srv_t p, yaz_srv_session_handler_t *session_handler, + yaz_srv_gdu_handler_t *gdu_handler); + +YAZ_EXPORT +void yaz_pkg_destroy(yaz_pkg_t pkg); + +YAZ_EXPORT +Z_GDU **yaz_pkg_get_gdu(yaz_pkg_t pkg); + +YAZ_EXPORT +ODR yaz_pkg_get_odr(yaz_pkg_t pkg); + +YAZ_EXPORT +void yaz_pkg_stop_server(yaz_pkg_t pkg); + +YAZ_EXPORT +void yaz_pkg_close(yaz_pkg_t pkg); + +YAZ_EXPORT +yaz_pkg_t yaz_pkg_create(yaz_pkg_t request_pkg); + +YAZ_EXPORT +Z_GDU *zget_wrap_APDU(ODR o, Z_APDU *apdu); + +YAZ_EXPORT +void yaz_pkg_send(yaz_pkg_t pkg); + +YAZ_END_CDECL + +#endif +/* + * Local variables: + * c-basic-offset: 4 + * c-file-style: "Stroustrup" + * indent-tabs-mode: nil + * End: + * vim: shiftwidth=4 tabstop=8 expandtab + */ + diff --git a/include/yaz/tpool.h b/include/yaz/tpool.h new file mode 100644 index 0000000..9290527 --- /dev/null +++ b/include/yaz/tpool.h @@ -0,0 +1,64 @@ +/* This file is part of the YAZ toolkit. + * Copyright (C) 1995-2009 Index Data. + * All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Index Data nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * \file + * \brief socket manager + */ +#ifndef YAZ_TPOOL_H +#define YAZ_TPOOL_H + +#include + + +YAZ_BEGIN_CDECL + +typedef struct yaz_tpool_s *yaz_tpool_t; + +YAZ_EXPORT +void yaz_tpool_add(yaz_tpool_t p, void *data); + +YAZ_EXPORT +void yaz_tpool_destroy(yaz_tpool_t p); + +YAZ_EXPORT +yaz_tpool_t yaz_tpool_create(void (*work_handler)(void *work_data), + void (*work_destroy)(void *work_data), + size_t no_threads); + +YAZ_END_CDECL + +#endif +/* + * Local variables: + * c-basic-offset: 4 + * c-file-style: "Stroustrup" + * indent-tabs-mode: nil + * End: + * vim: shiftwidth=4 tabstop=8 expandtab + */ + diff --git a/src/Makefile.am b/src/Makefile.am index 96ad379..d70059a 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -103,7 +103,7 @@ libyaz_la_SOURCES=version.c options.c log.c \ copy_types.c match_glob.c poll.c daemon.c \ iconv_encode_marc8.c iconv_encode_iso_8859_1.c iconv_encode_wchar.c \ iconv_decode_marc8.c iconv_decode_iso5426.c iconv_decode_danmarc.c sc.c \ - sock_man.c nanohttp.c + sock_man.c srv.c tpool.c libyaz_la_LDFLAGS=-version-info $(YAZ_VERSION_INFO) diff --git a/src/nanohttp.c b/src/nanohttp.c deleted file mode 100644 index 259ffb4..0000000 --- a/src/nanohttp.c +++ /dev/null @@ -1,143 +0,0 @@ -/* This file is part of the YAZ toolkit. - * Copyright (C) 1995-2009 Index Data - * See the file LICENSE for details. - */ -/** - * \file - * \brief Small HTTP server - */ - -#include -#include -#include -#include -#include -#include - -typedef struct yaz_nano_srv_s *yaz_nano_srv_t; -typedef struct yaz_nano_pkg_s *yaz_nano_pkg_t; - -struct yaz_nano_pkg_s { - void *handle; - int listener_id; - ODR encode_odr; - Z_GDU *request_gdu; - Z_GDU *response_gdu; -}; - -struct yaz_nano_srv_s { - COMSTACK *cs_listeners; - size_t num_listeners; - NMEM nmem; - yaz_sock_man_t sock_man; -}; - -void yaz_nano_srv_destroy(yaz_nano_srv_t p) -{ - if (p) - { - size_t i; - for (i = 0; i < p->num_listeners; i++) - if (p->cs_listeners[i]) - cs_close(p->cs_listeners[i]); - yaz_sock_man_destroy(p->sock_man); - nmem_destroy(p->nmem); - } -} - -yaz_nano_srv_t yaz_nano_srv_create(const char **listeners_str) -{ - NMEM nmem = nmem_create(); - yaz_nano_srv_t p = nmem_malloc(nmem, sizeof(*p)); - size_t i; - for (i = 0; listeners_str[i]; i++) - ; - p->nmem = nmem; - p->num_listeners = i; - p->cs_listeners = - nmem_malloc(nmem, p->num_listeners * sizeof(*p->cs_listeners)); - for (i = 0; i < p->num_listeners; i++) - { - void *ap; - const char *where = listeners_str[i]; - COMSTACK l = cs_create_host(where, 2, &ap); - p->cs_listeners[i] = 0; /* not OK (yet) */ - if (!l) - { - yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_create_host(%s) failed", where); - } - else - { - if (cs_bind(l, ap, CS_SERVER) < 0) - { - if (cs_errno(l) == CSYSERR) - yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to bind to %s", where); - else - yaz_log(YLOG_FATAL, "Failed to bind to %s: %s", where, - cs_strerror(l)); - cs_close(l); - } - else - p->cs_listeners[i] = l; /* success */ - } - } - p->sock_man = yaz_sock_man_new(); - - /* check if all are OK */ - for (i = 0; i < p->num_listeners; i++) - if (!p->cs_listeners[i]) - { - yaz_nano_srv_destroy(p); - return 0; - } - - for (i = 0; i < p->num_listeners; i++) - { - yaz_sock_chan_t chan; - - chan = yaz_sock_chan_new(p->sock_man, cs_fileno(p->cs_listeners[i]), - p->cs_listeners + i, - yaz_poll_read | yaz_poll_except); - } - return p; -} - -Z_GDU *yaz_nano_pkg_req(yaz_nano_pkg_t pkg) -{ - return pkg->request_gdu; -} - -Z_GDU *yaz_nano_pkg_response(yaz_nano_pkg_t pkg) -{ - return pkg->response_gdu; -} - -ODR yaz_nano_pkg_encode(yaz_nano_pkg_t pkg) -{ - return pkg->encode_odr; -} - -int yaz_nano_pkg_listener_id(yaz_nano_pkg_t pkg) -{ - return pkg->listener_id; -} - -yaz_nano_pkg_t yaz_nano_srv_get_pkg(yaz_nano_srv_t p) -{ - return 0; -} - -void yaz_nano_srv_put_pkg(yaz_nano_srv_t p, yaz_nano_pkg_t pkg) -{ - -} - -/* - * Local variables: - * c-basic-offset: 4 - * c-file-style: "Stroustrup" - * indent-tabs-mode: nil - * End: - * vim: shiftwidth=4 tabstop=8 expandtab - */ - diff --git a/src/sock_man.c b/src/sock_man.c index 90e6810..3988af0 100644 --- a/src/sock_man.c +++ b/src/sock_man.c @@ -1,5 +1,6 @@ #include +#include #include #include #include @@ -23,7 +24,7 @@ struct yaz_sock_chan_s { yaz_sock_chan_t next; yaz_sock_chan_t prev; int fd; - unsigned mask; + unsigned input_mask; unsigned output_mask; int max_idle; void *data; @@ -42,7 +43,7 @@ yaz_sock_man_t yaz_sock_man_new(void) man->maxevents = 30; man->event_no = 0; man->event_ret = 0; - man->timeout = 0; + man->timeout = -1; man->rescan = 0; man->events = nmem_malloc(nmem, man->maxevents * sizeof(*man->events)); if (man->epoll_handle == -1) @@ -57,9 +58,14 @@ void yaz_sock_man_destroy(yaz_sock_man_t man) { if (man) { + while (man->chan_list) + { + yaz_log(YLOG_WARN, "yaz_sock_man_destroy: closing %p", + man->chan_list); + yaz_sock_chan_destroy(man->chan_list); + } if (man->epoll_handle != -1) close(man->epoll_handle); - assert(man->chan_list == 0); nmem_destroy(man->nmem); } } @@ -69,11 +75,11 @@ static void poll_ctl(int op, yaz_sock_chan_t p) struct epoll_event event; event.events = 0; - if (p->mask & yaz_poll_read) + if (p->input_mask & yaz_poll_read) event.events |= EPOLLIN; - if (p->mask & yaz_poll_write) + if (p->input_mask & yaz_poll_write) event.events |= EPOLLOUT; - if (p->mask & yaz_poll_except) + if (p->input_mask & yaz_poll_except) event.events |= EPOLLERR; event.data.ptr = p; @@ -99,9 +105,10 @@ yaz_sock_chan_t yaz_sock_chan_new(yaz_sock_man_t srv, int fd, void *data, srv->chan_list = p; p->fd = fd; - p->mask = 0; + p->input_mask = mask; + p->output_mask = 0; p->data = data; - p->max_idle = 0; + p->max_idle = -1; p->man = srv; poll_ctl(EPOLL_CTL_ADD, p); @@ -112,18 +119,19 @@ static void rescan_timeout(yaz_sock_man_t man) { if (man->rescan) { - int timeout = 0; + int timeout = -1; yaz_sock_chan_t p; for (p = man->chan_list; p; p = p->next) - if (p->max_idle && (timeout == 0 || p->max_idle < timeout)) + if (p->max_idle != -1 && (timeout == -1 || p->max_idle < timeout)) timeout = p->max_idle; man->timeout = timeout; man->rescan = 0; } } -void yaz_sock_chan_destroy(yaz_sock_man_t srv, yaz_sock_chan_t p) +void yaz_sock_chan_destroy(yaz_sock_chan_t p) { + yaz_sock_man_t srv = p->man; if (p->prev) p->prev->next = p->next; else @@ -162,7 +170,7 @@ yaz_sock_chan_t yaz_sock_man_wait(yaz_sock_man_t man) } man->timeout_list = 0; /* no more timeout events */ } - assert(man->timeout_list = 0); + assert(man->timeout_list == 0); assert(man->event_no <= man->event_ret); if (man->event_no == man->event_ret) { /* must wait again */ @@ -202,9 +210,9 @@ yaz_sock_chan_t yaz_sock_man_wait(yaz_sock_man_t man) void yaz_sock_chan_set_mask(yaz_sock_chan_t chan, unsigned mask) { - if (chan->mask != mask) + if (chan->input_mask != mask) { - chan->mask = mask; + chan->input_mask = mask; poll_ctl(EPOLL_CTL_MOD, chan); } } @@ -220,7 +228,7 @@ void yaz_sock_chan_set_max_idle(yaz_sock_chan_t chan, int max_idle) unsigned yaz_sock_get_mask(yaz_sock_chan_t chan) { - return chan->mask; + return chan->output_mask; } void *yaz_sock_chan_get_data(yaz_sock_chan_t chan) diff --git a/src/srv.c b/src/srv.c new file mode 100644 index 0000000..fa391ae --- /dev/null +++ b/src/srv.c @@ -0,0 +1,382 @@ +/* This file is part of the YAZ toolkit. + * Copyright (C) 1995-2009 Index Data + * See the file LICENSE for details. + */ +/** + * \file + * \brief Small HTTP server + */ + +#include +#include +#include +#include +#include +#include +#include +#include + + +enum cs_ses_type { + cs_ses_type_listener, + cs_ses_type_accepting, + cs_ses_type_normal +}; + +struct cs_session { + enum cs_ses_type type; + COMSTACK cs; + yaz_sock_chan_t chan; + unsigned cs_put_mask; + unsigned cs_get_mask; + char *input_buffer; + int input_len; + void *user; +}; + +struct yaz_pkg_s { + Z_GDU *gdu; + ODR odr; + struct cs_session *ses; + yaz_srv_t srv; +}; + +struct yaz_srv_s { + struct cs_session *listeners; + size_t num_listeners; + NMEM nmem; + yaz_sock_man_t sock_man; + yaz_tpool_t tpool; + int stop_flag; + yaz_srv_session_handler_t *session_handler; + yaz_srv_gdu_handler_t *gdu_handler; +}; + +static void cs_session_init(struct cs_session *ses, enum cs_ses_type type) +{ + ses->type = type; + ses->cs = 0; + ses->chan = 0; + ses->cs_put_mask = 0; + ses->cs_get_mask = yaz_poll_read; + ses->input_buffer = 0; + ses->input_len = 0; +} + +static void cs_session_destroy(struct cs_session *ses) +{ + xfree(ses->input_buffer); + if (ses->chan) + yaz_sock_chan_destroy(ses->chan); + if (ses->cs) + cs_close(ses->cs); +} + +void yaz_srv_destroy(yaz_srv_t p) +{ + if (p) + { + size_t i; + + yaz_tpool_destroy(p->tpool); + for (i = 0; i < p->num_listeners; i++) + { + cs_session_destroy(p->listeners + i); + } + yaz_sock_man_destroy(p->sock_man); + nmem_destroy(p->nmem); + } +} + +yaz_srv_t yaz_srv_create(const char **listeners_str) +{ + NMEM nmem = nmem_create(); + yaz_srv_t p = nmem_malloc(nmem, sizeof(*p)); + size_t i; + for (i = 0; listeners_str[i]; i++) + ; + p->nmem = nmem; + + p->stop_flag = 0; + p->session_handler = 0; + p->gdu_handler = 0; + p->num_listeners = i; + p->listeners = + nmem_malloc(nmem, p->num_listeners * sizeof(*p->listeners)); + p->sock_man = yaz_sock_man_new(); + p->tpool = 0; + for (i = 0; i < p->num_listeners; i++) + { + void *ap; + const char *where = listeners_str[i]; + COMSTACK l = cs_create_host(where, CS_FLAGS_NUMERICHOST, &ap); + + cs_session_init(p->listeners +i, cs_ses_type_listener); + if (!l) + { + yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_create_host(%s) failed", where); + } + else + { + if (cs_bind(l, ap, CS_SERVER) < 0) + { + if (cs_errno(l) == CSYSERR) + yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to bind to %s", where); + else + yaz_log(YLOG_FATAL, "Failed to bind to %s: %s", where, + cs_strerror(l)); + cs_close(l); + } + else + { + p->listeners[i].cs = l; /* success */ + p->listeners[i].chan = + yaz_sock_chan_new(p->sock_man, + cs_fileno(l), + p->listeners + i, + yaz_poll_read | yaz_poll_except); + } + } + } + + /* check if all are OK */ + for (i = 0; i < p->num_listeners; i++) + if (!p->listeners[i].cs) + { + yaz_srv_destroy(p); + return 0; + } + return p; +} + +static void new_session(yaz_srv_t p, COMSTACK new_line) +{ + struct cs_session *ses = xmalloc(sizeof(*ses)); + unsigned mask = + ((new_line->io_pending & CS_WANT_WRITE) ? yaz_poll_write : 0) | + ((new_line->io_pending & CS_WANT_READ) ? yaz_poll_read : 0); + + if (mask) + { + yaz_log(YLOG_LOG, "type accepting"); + cs_session_init(ses, cs_ses_type_accepting); + } + else + { + yaz_log(YLOG_LOG, "type normal"); + cs_session_init(ses, cs_ses_type_normal); + mask = yaz_poll_read; + ses->user = p->session_handler(ses); + } + ses->cs = new_line; + ses->chan = yaz_sock_chan_new(p->sock_man, cs_fileno(new_line), ses, mask); +} + +void yaz_pkg_destroy(yaz_pkg_t pkg) +{ + if (pkg) + { + odr_destroy(pkg->odr); + xfree(pkg); + } +} + +void work_handler(void *data) +{ + yaz_pkg_t pkg = (yaz_pkg_t) data; + + pkg->srv->gdu_handler(pkg, pkg->ses->user); + yaz_pkg_destroy(pkg); +} + +void work_destroy(void *data) +{ + yaz_pkg_t pkg = (yaz_pkg_t) data; + yaz_pkg_destroy(pkg); +} + + +void yaz_srv_run(yaz_srv_t p, yaz_srv_session_handler_t session_handler, + yaz_srv_gdu_handler_t gdu_handler) +{ + yaz_sock_chan_t chan; + + p->session_handler = session_handler; + p->gdu_handler = gdu_handler; + + assert(!p->tpool); + p->tpool = yaz_tpool_create(work_handler, work_destroy, 20); + while ((chan = yaz_sock_man_wait(p->sock_man))) + { + unsigned output_mask = yaz_sock_get_mask(chan); + struct cs_session *ses = yaz_sock_chan_get_data(chan); + + if (p->stop_flag) + break; + switch (ses->type) + { + case cs_ses_type_listener: + if (yaz_sock_get_mask(chan) & yaz_poll_read) + { + int ret = cs_listen(ses->cs, 0, 0); + if (ret < 0) + { + yaz_log(YLOG_WARN|YLOG_ERRNO, "listen failed"); + } + else if (ret == 1) + { + yaz_log(YLOG_WARN, "cs_listen incomplete"); + } + else + { + COMSTACK new_line = cs_accept(ses->cs); + if (new_line) + { + yaz_log(YLOG_LOG, "new session"); + new_session(p, new_line); + } + else + { + yaz_log(YLOG_WARN|YLOG_ERRNO, "accept failed"); + } + } + } + break; + case cs_ses_type_accepting: + if (!cs_accept(ses->cs)) + { + yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_accept failed"); + cs_session_destroy(ses); + xfree(ses); + } + else + { + unsigned mask = + ((ses->cs->io_pending & CS_WANT_WRITE) ? yaz_poll_write : 0) | + ((ses->cs->io_pending & CS_WANT_READ) ? yaz_poll_read : 0); + if (mask) + { + ses->type = cs_ses_type_accepting; + } + else + { + ses->type = cs_ses_type_normal; + mask = yaz_poll_read; + } + yaz_sock_chan_set_mask(ses->chan, mask); + } + break; + case cs_ses_type_normal: + if ((ses->cs_put_mask & yaz_poll_read) == 0 && + output_mask & ses->cs_get_mask) + { + /* receiving package */ + unsigned new_mask = yaz_poll_read; + yaz_log(YLOG_LOG, "Receive"); + do + { + int res = cs_get(ses->cs, &ses->input_buffer, &ses->input_len); + if (res <= 0) + { + yaz_log(YLOG_WARN, "Connection closed by client"); + cs_session_destroy(ses); + xfree(ses); + ses = 0; + break; + } + else if (res == 1) + { + if (ses->cs->io_pending & CS_WANT_WRITE) + new_mask |= yaz_poll_write; + break; + } + else + { /* complete package */ + yaz_pkg_t pkg = xmalloc(sizeof(*pkg)); + yaz_log(YLOG_LOG, "COMPLETE PACKAGE"); + + pkg->ses = ses; + pkg->srv = p; + pkg->odr = odr_createmem(ODR_DECODE); + odr_setbuf(pkg->odr, ses->input_buffer, res, 0); + if (!z_GDU(pkg->odr, &pkg->gdu, 0, 0)) + { + yaz_log(YLOG_WARN, "decoding failed"); + odr_destroy(pkg->odr); + xfree(pkg); + } + else + { + yaz_tpool_add(p->tpool, pkg); + } + } + } while (cs_more(ses->cs)); + yaz_sock_chan_set_mask(chan, new_mask); + } + if (ses && (output_mask & ses->cs_put_mask)) + { /* sending package */ + yaz_log(YLOG_LOG, "Sending"); + } + } + } +} + +Z_GDU **yaz_pkg_get_gdu(yaz_pkg_t pkg) +{ + return &pkg->gdu; +} + +ODR yaz_pkg_get_odr(yaz_pkg_t pkg) +{ + return pkg->odr; +} + +void yaz_pkg_close(yaz_pkg_t pkg) +{ + struct cs_session *ses = pkg->ses; + if (ses) + { + cs_session_destroy(ses); + xfree(ses); + } + pkg->ses = 0; +} + +void yaz_pkg_stop_server(yaz_pkg_t pkg) +{ + pkg->srv->stop_flag = 1; +} + +yaz_pkg_t yaz_pkg_create(yaz_pkg_t request_pkg) +{ + yaz_pkg_t pkg = xmalloc(sizeof(*pkg)); + + pkg->gdu = 0; + pkg->odr = odr_createmem(ODR_ENCODE); + pkg->ses = request_pkg->ses; + pkg->srv = request_pkg->srv; + return pkg; +} + +Z_GDU *zget_wrap_APDU(ODR o, Z_APDU *apdu) +{ + Z_GDU *gdu = odr_malloc(o, sizeof(*gdu)); + gdu->which = Z_GDU_Z3950; + gdu->u.z3950 = apdu; + return gdu; +} + +void yaz_pkg_send(yaz_pkg_t pkg) +{ + yaz_log(YLOG_WARN, "send.. UNFINISHED"); +} + +/* + * Local variables: + * c-basic-offset: 4 + * c-file-style: "Stroustrup" + * indent-tabs-mode: nil + * End: + * vim: shiftwidth=4 tabstop=8 expandtab + */ + diff --git a/src/tpool.c b/src/tpool.c new file mode 100644 index 0000000..66d3dff --- /dev/null +++ b/src/tpool.c @@ -0,0 +1,164 @@ +/* This file is part of the YAZ toolkit. + * Copyright (C) 1995-2009 Index Data + * See the file LICENSE for details. + */ +/** + * \file + * \brief thread pool workers + */ + +#include +#include +#include +#include + +struct work_item { + void *data; + struct work_item *next; +}; + +struct yaz_tpool_s { + NMEM nmem; + pthread_t *thread_id; + pthread_mutex_t mutex; + pthread_cond_t input_data; + int stop_flag; + size_t no_threads; + struct work_item *input_queue; + struct work_item *output_queue; + struct work_item *free_queue; + void (*work_handler)(void *work_data); + void (*work_destroy)(void *work_data); +}; + +static struct work_item *queue_remove_last(struct work_item **q) +{ + struct work_item **work_p = q, *work_this = 0; + + while (*work_p && (*work_p)->next) + work_p = &(*work_p)->next; + if (*work_p) + { + work_this = *work_p; + *work_p = 0; + } + return work_this; +} + +static void queue_trav(struct work_item *q, void (*f)(void *data)) +{ + for (; q; q = q->next) + f(q->data); +} + +void yaz_tpool_add(yaz_tpool_t p, void *data) +{ + struct work_item *work_p; + + pthread_mutex_lock(&p->mutex); + + if (p->free_queue) + { + work_p = p->free_queue; + p->free_queue = p->free_queue->next; + } + else + work_p = nmem_malloc(p->nmem, sizeof(*work_p)); + + work_p->data = data; + work_p->next = p->input_queue; + p->input_queue = work_p; + + pthread_cond_signal(&p->input_data); + pthread_mutex_unlock(&p->mutex); +} + +void yaz_tpool_destroy(yaz_tpool_t p) +{ + if (p) + { + size_t i; + + pthread_mutex_lock(&p->mutex); + p->stop_flag = 1; + pthread_cond_broadcast(&p->input_data); + pthread_mutex_unlock(&p->mutex); + + for (i = 0; i < p->no_threads; i++) + pthread_join(p->thread_id[i], 0); + + if (p->work_destroy) + { + queue_trav(p->input_queue, p->work_destroy); + queue_trav(p->output_queue, p->work_destroy); + } + + pthread_cond_destroy(&p->input_data); + pthread_mutex_destroy(&p->mutex); + nmem_destroy(p->nmem); + } +} + +static void *tpool_thread_handler(void *vp) +{ + yaz_tpool_t p = (yaz_tpool_t) vp; + while (1) + { + struct work_item *work_this = 0; + /* wait for some work */ + pthread_mutex_lock(&p->mutex); + while (!p->stop_flag && !p->input_queue) + pthread_cond_wait(&p->input_data, &p->mutex); + /* see if we were waken up because we're shutting down */ + if (p->stop_flag) + break; + /* got something. Take the last one out of input_queue */ + assert(p->input_queue); + work_this = queue_remove_last(&p->input_queue); + assert(work_this); + + pthread_mutex_unlock(&p->mutex); + + /* work on this item */ + p->work_handler(work_this->data); + } + pthread_mutex_unlock(&p->mutex); + return 0; +} + +yaz_tpool_t yaz_tpool_create(void (*work_handler)(void *work_data), + void (*work_destroy)(void *work_data), + size_t no_threads) +{ + NMEM nmem = nmem_create(); + yaz_tpool_t p = nmem_malloc(nmem, sizeof(*p)); + size_t i; + p->nmem = nmem; + p->stop_flag = 0; + p->no_threads = no_threads; + + p->input_queue = 0; + p->output_queue = 0; + p->free_queue = 0; + + p->work_handler = work_handler; + p->work_destroy = work_destroy; + + pthread_mutex_init(&p->mutex, 0); + pthread_cond_init(&p->input_data, 0); + + p->thread_id = nmem_malloc(p->nmem, sizeof(*p->thread_id) * p->no_threads); + for (i = 0; i < p->no_threads; i++) + pthread_create (p->thread_id + i, 0, tpool_thread_handler, p); + return p; +} + +/* + * Local variables: + * c-basic-offset: 4 + * c-file-style: "Stroustrup" + * indent-tabs-mode: nil + * End: + * vim: shiftwidth=4 tabstop=8 expandtab + */ + diff --git a/test/Makefile.am b/test/Makefile.am index 5daf359..04a9b15 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -5,7 +5,8 @@ check_PROGRAMS = tstxmalloc tsticonv tstnmem tstmatchstr tstwrbuf tstodr \ tstccl tstlog tstcomstack \ tstsoap1 tstsoap2 tstodrstack tstlogthread tstxmlquery tstpquery \ tst_comstack tst_filepath tst_record_conv tst_retrieval tst_tpath \ - tst_timing tst_query_charset tst_oid tst_icu_I18N tst_match_glob tst_rpn2cql + tst_timing tst_query_charset tst_oid tst_icu_I18N tst_match_glob \ + tst_rpn2cql tst_srv check_SCRIPTS = tstmarc.sh tstmarccol.sh tstcql2xcql.sh tstcql2pqf.sh tsticu.sh TESTS = $(check_PROGRAMS) $(check_SCRIPTS) @@ -47,7 +48,7 @@ tstodrcodec.c tstodrcodec.h: tstodr.asn $(YAZCOMP) cd $(srcdir); $(YAZCOMP) tstodr.asn LDADD = ../src/libyaz.la -tst_icu_I18N_LDADD = ../src/libyaz_icu.la $(ICU_LIBS) +tst_icu_I18N_LDADD = ../src/libyaz.la ../src/libyaz_icu.la $(ICU_LIBS) CONFIG_CLEAN_FILES=*.log @@ -77,3 +78,5 @@ tst_query_charset_SOURCES = tst_query_charset.c tst_icu_I18N_SOURCES = tst_icu_I18N.c tst_match_glob_SOURCES = tst_match_glob.c tst_rpn2cql_SOURCES = tst_rpn2cql.c +tst_srv_SOURCES = tst_srv.c +tst_srv_LDADD = ../src/libyaz.la $(PTHREAD_LIBS) diff --git a/test/tst_srv.c b/test/tst_srv.c new file mode 100644 index 0000000..65029b7 --- /dev/null +++ b/test/tst_srv.c @@ -0,0 +1,104 @@ +/* This file is part of the YAZ toolkit. + * Copyright (C) 1995-2009 Index Data + * See the file LICENSE for details. + */ + +#include +#include + +#include +#include +#include +#include +#include + +struct my_info { + int x; +}; + +static void *create_session(struct cs_session *ses) +{ + struct my_info *my = xmalloc(sizeof(*my)); + my->x = 42; + yaz_log(YLOG_LOG, "create_session"); + return my; +} + +static void gdu_handler(yaz_pkg_t pkg, void *user) +{ + struct my_info *my = user; + Z_GDU **gdu = yaz_pkg_get_gdu(pkg); + ODR o = odr_createmem(ODR_PRINT); + + yaz_log(YLOG_LOG, "gdu_handler"); + YAZ_CHECK_EQ(my->x, 42); + + z_GDU(o, gdu, 0, 0); + odr_destroy(o); + + if ((*gdu)->which == Z_GDU_Z3950) + { + ODR encode = odr_createmem(ODR_ENCODE); + Z_APDU *apdu_req = (*gdu)->u.z3950; + Z_APDU *apdu_res = 0; + int must_close = 0; + + if (apdu_req->which == Z_APDU_close) + { + apdu_res = zget_APDU(encode, Z_APDU_close); + *apdu_res->u.close->closeReason = Z_Close_finished; + must_close = 1; + } + else if (apdu_req->which == Z_APDU_initRequest) + { + apdu_res = zget_APDU(encode, Z_APDU_initResponse); + } + else + { + apdu_res = zget_APDU(encode, Z_APDU_close); + + *apdu_res->u.close->closeReason = Z_Close_unspecified; + must_close = 1; + } + if (apdu_res) + { + yaz_pkg_t pkg_res = yaz_pkg_create(pkg); + *yaz_pkg_get_gdu(pkg) = zget_wrap_APDU(encode, apdu_res); + yaz_pkg_send(pkg_res); + } + if (must_close) + yaz_pkg_close(pkg); + yaz_pkg_stop_server(pkg); + } +} + +static void tst_srv(void) +{ + const char *listeners[] = {"unix:socket", 0}; + + yaz_srv_t srv = yaz_srv_create(listeners); + YAZ_CHECK(srv); + if (!srv) + return; + + yaz_srv_run(srv, create_session, gdu_handler); + yaz_srv_destroy(srv); +} + +int main (int argc, char **argv) +{ + YAZ_CHECK_INIT(argc, argv); + YAZ_CHECK_LOG(); + /* tst_srv(); */ + YAZ_CHECK_TERM; +} + +/* + * Local variables: + * c-basic-offset: 4 + * c-file-style: "Stroustrup" + * indent-tabs-mode: nil + * End: + * vim: shiftwidth=4 tabstop=8 expandtab + */ + -- 1.7.10.4