1 /* $Id: filter_frontend_net.cpp,v 1.14 2006-01-09 21:19:11 adam Exp $
2 Copyright (c) 2005, Index Data.
12 #include "package.hpp"
13 #include "thread_pool_observer.hpp"
14 #include "filter_frontend_net.hpp"
15 #include <yaz++/z-assoc.h>
16 #include <yaz++/pdu-assoc.h>
17 #include <yaz++/socket-manager.h>
24 class FrontendNet::Rep {
25 friend class FrontendNet;
27 std::vector<std::string> m_ports;
28 int m_listen_duration;
31 class My_Timer_Thread : public yazpp_1::ISocketObserver {
33 yazpp_1::ISocketObservable *m_obs;
37 My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
38 void socketNotify(int event);
41 class ZAssocChild : public yazpp_1::Z_Assoc {
44 ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
45 yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
46 const yp2::Package *package);
49 yazpp_1::IPDU_Observer* sessionNotify(
50 yazpp_1::IPDU_Observable *the_PDU_Observable,
52 void recv_GDU(Z_GDU *apdu, int len);
58 yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
59 yp2::Session m_session;
62 const yp2::Package *m_package;
64 class ThreadPoolPackage : public yp2::IThreadPoolMsg {
66 ThreadPoolPackage(yp2::Package *package, yp2::ZAssocChild *ses) :
67 m_session(ses), m_package(package) { };
69 IThreadPoolMsg *handle();
73 yp2::ZAssocChild *m_session;
74 yp2::Package *m_package;
77 class ZAssocServer : public yazpp_1::Z_Assoc {
80 ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
81 yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
82 const yp2::Package *package);
84 yazpp_1::IPDU_Observer* sessionNotify(
85 yazpp_1::IPDU_Observable *the_PDU_Observable,
87 void recv_GDU(Z_GDU *apdu, int len);
93 yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
94 const yp2::Package *m_package;
98 yp2::ThreadPoolPackage::~ThreadPoolPackage()
103 void yp2::ThreadPoolPackage::result()
105 m_session->m_no_requests--;
107 yazpp_1::GDU *gdu = &m_package->response();
111 m_session->send_GDU(gdu->get(), &len);
113 if (m_session->m_no_requests == 0 && m_package->session().is_closed())
118 yp2::IThreadPoolMsg *yp2::ThreadPoolPackage::handle()
125 yp2::ZAssocChild::ZAssocChild(yazpp_1::IPDU_Observable *PDU_Observable,
126 yp2::ThreadPoolSocketObserver *my_thread_pool,
127 const yp2::Package *package)
128 : Z_Assoc(PDU_Observable)
130 m_thread_pool_observer = my_thread_pool;
132 m_delete_flag = false;
137 yazpp_1::IPDU_Observer *yp2::ZAssocChild::sessionNotify(yazpp_1::IPDU_Observable
138 *the_PDU_Observable, int fd)
143 yp2::ZAssocChild::~ZAssocChild()
147 void yp2::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
151 yp2::Package *p = new yp2::Package(m_session, m_origin);
153 yp2::ThreadPoolPackage *tp = new yp2::ThreadPoolPackage(p, this);
154 p->copy_filter(*m_package);
155 p->request() = yazpp_1::GDU(z_pdu);
156 m_thread_pool_observer->put(tp);
159 void yp2::ZAssocChild::failNotify()
161 // TODO: send Package to signal "close"
162 if (m_session.is_closed())
168 yp2::Package *p = new yp2::Package(m_session, m_origin);
170 yp2::ThreadPoolPackage *tp = new yp2::ThreadPoolPackage(p, this);
171 p->copy_filter(*m_package);
172 m_thread_pool_observer->put(tp);
175 void yp2::ZAssocChild::timeoutNotify()
180 void yp2::ZAssocChild::connectNotify()
185 yp2::ZAssocServer::ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
186 yp2::ThreadPoolSocketObserver *thread_pool_observer,
187 const yp2::Package *package)
188 : Z_Assoc(PDU_Observable)
190 m_thread_pool_observer = thread_pool_observer;
195 yazpp_1::IPDU_Observer *yp2::ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
196 *the_PDU_Observable, int fd)
198 yp2::ZAssocChild *my =
199 new yp2::ZAssocChild(the_PDU_Observable, m_thread_pool_observer,
204 yp2::ZAssocServer::~ZAssocServer()
208 void yp2::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
212 void yp2::ZAssocServer::failNotify()
216 void yp2::ZAssocServer::timeoutNotify()
220 void yp2::ZAssocServer::connectNotify()
224 yp2::filter::FrontendNet::FrontendNet() : m_p(new Rep)
226 m_p->m_no_threads = 5;
227 m_p->m_listen_duration = 0;
230 yp2::filter::FrontendNet::~FrontendNet()
234 bool yp2::My_Timer_Thread::timeout()
239 yp2::My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
241 m_obs(obs), m_pipe(9123), m_timeout(false)
243 obs->addObserver(m_pipe.read_fd(), this);
244 obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
245 obs->timeoutObserver(this, duration);
248 void yp2::My_Timer_Thread::socketNotify(int event)
251 m_obs->deleteObserver(this);
254 void yp2::filter::FrontendNet::process(Package &package) const
256 if (m_p->m_ports.size() == 0)
259 yazpp_1::SocketManager mySocketManager;
261 My_Timer_Thread *tt = 0;
262 if (m_p->m_listen_duration)
263 tt = new My_Timer_Thread(&mySocketManager, m_p->m_listen_duration);
265 ThreadPoolSocketObserver threadPool(&mySocketManager, m_p->m_no_threads);
267 yp2::ZAssocServer **az = new yp2::ZAssocServer *[m_p->m_ports.size()];
269 // Create yp2::ZAssocServer for each port
271 for (i = 0; i<m_p->m_ports.size(); i++)
273 // create a PDU assoc object (one per yp2::ZAssocServer)
274 yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&mySocketManager);
276 // create ZAssoc with PDU Assoc
277 az[i] = new yp2::ZAssocServer(as, &threadPool, &package);
278 az[i]->server(m_p->m_ports[i].c_str());
280 while (mySocketManager.processEvent() > 0)
282 if (tt && tt->timeout())
285 for (i = 0; i<m_p->m_ports.size(); i++)
292 void yp2::filter::FrontendNet::configure(const xmlNode * ptr)
294 if (!ptr || !ptr->children)
296 throw yp2::filter::FilterException("No ports for Frontend");
298 std::vector<std::string> ports;
299 for (ptr = ptr->children; ptr; ptr = ptr->next)
301 if (ptr->type != XML_ELEMENT_NODE)
303 if (!strcmp((const char *) ptr->name, "port"))
305 std::string port = yp2::xml::get_text(ptr);
306 ports.push_back(port);
309 else if (!strcmp((const char *) ptr->name, "threads"))
311 std::string threads_str = yp2::xml::get_text(ptr);
312 int threads = atoi(threads_str.c_str());
314 throw yp2::filter::FilterException("Bad value for threads: "
316 m_p->m_no_threads = threads;
320 throw yp2::filter::FilterException("Bad element "
321 + std::string((const char *)
325 m_p->m_ports = ports;
328 std::vector<std::string> &yp2::filter::FrontendNet::ports()
333 int &yp2::filter::FrontendNet::listen_duration()
335 return m_p->m_listen_duration;
338 static yp2::filter::Base* filter_creator()
340 return new yp2::filter::FrontendNet;
344 struct yp2_filter_struct yp2_filter_frontend_net = {
354 * indent-tabs-mode: nil
355 * c-file-style: "stroustrup"
357 * vim: shiftwidth=4 tabstop=8 expandtab