1 /* $Id: filter_frontend_net.cpp,v 1.15 2006-01-11 11:51:49 adam Exp $
2 Copyright (c) 2005, Index Data.
11 #include "package.hpp"
12 #include "thread_pool_observer.hpp"
13 #include "filter_frontend_net.hpp"
14 #include <yaz++/z-assoc.h>
15 #include <yaz++/pdu-assoc.h>
16 #include <yaz++/socket-manager.h>
23 class FrontendNet::Rep {
24 friend class FrontendNet;
26 std::vector<std::string> m_ports;
27 int m_listen_duration;
30 class My_Timer_Thread : public yazpp_1::ISocketObserver {
32 yazpp_1::ISocketObservable *m_obs;
36 My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
37 void socketNotify(int event);
40 class ZAssocChild : public yazpp_1::Z_Assoc {
43 ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
44 yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
45 const yp2::Package *package);
48 yazpp_1::IPDU_Observer* sessionNotify(
49 yazpp_1::IPDU_Observable *the_PDU_Observable,
51 void recv_GDU(Z_GDU *apdu, int len);
57 yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
58 yp2::Session m_session;
61 const yp2::Package *m_package;
63 class ThreadPoolPackage : public yp2::IThreadPoolMsg {
65 ThreadPoolPackage(yp2::Package *package, yp2::ZAssocChild *ses) :
66 m_session(ses), m_package(package) { };
68 IThreadPoolMsg *handle();
72 yp2::ZAssocChild *m_session;
73 yp2::Package *m_package;
76 class ZAssocServer : public yazpp_1::Z_Assoc {
79 ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
80 yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
81 const yp2::Package *package);
83 yazpp_1::IPDU_Observer* sessionNotify(
84 yazpp_1::IPDU_Observable *the_PDU_Observable,
86 void recv_GDU(Z_GDU *apdu, int len);
92 yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
93 const yp2::Package *m_package;
97 yp2::ThreadPoolPackage::~ThreadPoolPackage()
102 void yp2::ThreadPoolPackage::result()
104 m_session->m_no_requests--;
106 yazpp_1::GDU *gdu = &m_package->response();
110 m_session->send_GDU(gdu->get(), &len);
112 if (m_session->m_no_requests == 0 && m_package->session().is_closed())
117 yp2::IThreadPoolMsg *yp2::ThreadPoolPackage::handle()
124 yp2::ZAssocChild::ZAssocChild(yazpp_1::IPDU_Observable *PDU_Observable,
125 yp2::ThreadPoolSocketObserver *my_thread_pool,
126 const yp2::Package *package)
127 : Z_Assoc(PDU_Observable)
129 m_thread_pool_observer = my_thread_pool;
131 m_delete_flag = false;
136 yazpp_1::IPDU_Observer *yp2::ZAssocChild::sessionNotify(yazpp_1::IPDU_Observable
137 *the_PDU_Observable, int fd)
142 yp2::ZAssocChild::~ZAssocChild()
146 void yp2::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
150 yp2::Package *p = new yp2::Package(m_session, m_origin);
152 yp2::ThreadPoolPackage *tp = new yp2::ThreadPoolPackage(p, this);
153 p->copy_filter(*m_package);
154 p->request() = yazpp_1::GDU(z_pdu);
155 m_thread_pool_observer->put(tp);
158 void yp2::ZAssocChild::failNotify()
160 // TODO: send Package to signal "close"
161 if (m_session.is_closed())
167 yp2::Package *p = new yp2::Package(m_session, m_origin);
169 yp2::ThreadPoolPackage *tp = new yp2::ThreadPoolPackage(p, this);
170 p->copy_filter(*m_package);
171 m_thread_pool_observer->put(tp);
174 void yp2::ZAssocChild::timeoutNotify()
179 void yp2::ZAssocChild::connectNotify()
184 yp2::ZAssocServer::ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
185 yp2::ThreadPoolSocketObserver *thread_pool_observer,
186 const yp2::Package *package)
187 : Z_Assoc(PDU_Observable)
189 m_thread_pool_observer = thread_pool_observer;
194 yazpp_1::IPDU_Observer *yp2::ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
195 *the_PDU_Observable, int fd)
197 yp2::ZAssocChild *my =
198 new yp2::ZAssocChild(the_PDU_Observable, m_thread_pool_observer,
203 yp2::ZAssocServer::~ZAssocServer()
207 void yp2::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
211 void yp2::ZAssocServer::failNotify()
215 void yp2::ZAssocServer::timeoutNotify()
219 void yp2::ZAssocServer::connectNotify()
223 yp2::filter::FrontendNet::FrontendNet() : m_p(new Rep)
225 m_p->m_no_threads = 5;
226 m_p->m_listen_duration = 0;
229 yp2::filter::FrontendNet::~FrontendNet()
233 bool yp2::My_Timer_Thread::timeout()
238 yp2::My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
240 m_obs(obs), m_pipe(9123), m_timeout(false)
242 obs->addObserver(m_pipe.read_fd(), this);
243 obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
244 obs->timeoutObserver(this, duration);
247 void yp2::My_Timer_Thread::socketNotify(int event)
250 m_obs->deleteObserver(this);
253 void yp2::filter::FrontendNet::process(Package &package) const
255 if (m_p->m_ports.size() == 0)
258 yazpp_1::SocketManager mySocketManager;
260 My_Timer_Thread *tt = 0;
261 if (m_p->m_listen_duration)
262 tt = new My_Timer_Thread(&mySocketManager, m_p->m_listen_duration);
264 ThreadPoolSocketObserver threadPool(&mySocketManager, m_p->m_no_threads);
266 yp2::ZAssocServer **az = new yp2::ZAssocServer *[m_p->m_ports.size()];
268 // Create yp2::ZAssocServer for each port
270 for (i = 0; i<m_p->m_ports.size(); i++)
272 // create a PDU assoc object (one per yp2::ZAssocServer)
273 yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&mySocketManager);
275 // create ZAssoc with PDU Assoc
276 az[i] = new yp2::ZAssocServer(as, &threadPool, &package);
277 az[i]->server(m_p->m_ports[i].c_str());
279 while (mySocketManager.processEvent() > 0)
281 if (tt && tt->timeout())
284 for (i = 0; i<m_p->m_ports.size(); i++)
291 void yp2::filter::FrontendNet::configure(const xmlNode * ptr)
293 if (!ptr || !ptr->children)
295 throw yp2::filter::FilterException("No ports for Frontend");
297 std::vector<std::string> ports;
298 for (ptr = ptr->children; ptr; ptr = ptr->next)
300 if (ptr->type != XML_ELEMENT_NODE)
302 if (!strcmp((const char *) ptr->name, "port"))
304 std::string port = yp2::xml::get_text(ptr);
305 ports.push_back(port);
308 else if (!strcmp((const char *) ptr->name, "threads"))
310 std::string threads_str = yp2::xml::get_text(ptr);
311 int threads = atoi(threads_str.c_str());
313 throw yp2::filter::FilterException("Bad value for threads: "
315 m_p->m_no_threads = threads;
319 throw yp2::filter::FilterException("Bad element "
320 + std::string((const char *)
324 m_p->m_ports = ports;
327 std::vector<std::string> &yp2::filter::FrontendNet::ports()
332 int &yp2::filter::FrontendNet::listen_duration()
334 return m_p->m_listen_duration;
337 static yp2::filter::Base* filter_creator()
339 return new yp2::filter::FrontendNet;
343 struct yp2_filter_struct yp2_filter_frontend_net = {
353 * indent-tabs-mode: nil
354 * c-file-style: "stroustrup"
356 * vim: shiftwidth=4 tabstop=8 expandtab