#include <iostream>
namespace mp = metaproxy_1;
+namespace yf = metaproxy_1::filter;
namespace metaproxy_1 {
- class My_Timer_Thread;
- class ZAssocServer;
namespace filter {
class FrontendNet::Port {
friend class Rep;
yazpp_1::SocketManager mySocketManager;
ZAssocServer **az;
};
+ class FrontendNet::My_Timer_Thread : public yazpp_1::ISocketObserver {
+ private:
+ yazpp_1::ISocketObservable *m_obs;
+ Pipe m_pipe;
+ bool m_timeout;
+ public:
+ My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
+ void socketNotify(int event);
+ bool timeout();
+ };
+ class FrontendNet::ZAssocChild : public yazpp_1::Z_Assoc {
+ public:
+ ~ZAssocChild();
+ ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
+ mp::ThreadPoolSocketObserver *m_thread_pool_observer,
+ const mp::Package *package,
+ std::string route,
+ const char *msg_config);
+ int m_no_requests;
+ std::string m_route;
+ private:
+ yazpp_1::IPDU_Observer* sessionNotify(
+ yazpp_1::IPDU_Observable *the_PDU_Observable,
+ int fd);
+ void recv_GDU(Z_GDU *apdu, int len);
+
+ void failNotify();
+ void timeoutNotify();
+ void connectNotify();
+ private:
+ mp::ThreadPoolSocketObserver *m_thread_pool_observer;
+ mp::Session m_session;
+ mp::Origin m_origin;
+ bool m_delete_flag;
+ const mp::Package *m_package;
+ const char *m_msg_config;
+ };
+ class FrontendNet::ThreadPoolPackage : public mp::IThreadPoolMsg {
+ public:
+ ThreadPoolPackage(mp::Package *package,
+ yf::FrontendNet::ZAssocChild *ses,
+ const char *msg_config);
+ ~ThreadPoolPackage();
+ IThreadPoolMsg *handle();
+ void result(const char *t_info);
+ bool cleanup(void *info);
+ private:
+ yaz_timing_t timer;
+ ZAssocChild *m_assoc_child;
+ mp::Package *m_package;
+ const char *m_msg_config;
+ };
+ class FrontendNet::ZAssocServer : public yazpp_1::Z_Assoc {
+ public:
+ ~ZAssocServer();
+ ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable, int timeout,
+ int connect_max, std::string route,
+ const char *msg_config);
+ void set_package(const mp::Package *package);
+ void set_thread_pool(ThreadPoolSocketObserver *m_thread_pool_observer);
+ private:
+ yazpp_1::IPDU_Observer* sessionNotify(
+ yazpp_1::IPDU_Observable *the_PDU_Observable,
+ int fd);
+ void recv_GDU(Z_GDU *apdu, int len);
+
+ void failNotify();
+ void timeoutNotify();
+ void connectNotify();
+ private:
+ mp::ThreadPoolSocketObserver *m_thread_pool_observer;
+ const mp::Package *m_package;
+ int m_session_timeout;
+ int m_connect_max;
+ yazpp_1::LimitConnect limit_connect;
+ std::string m_route;
+ const char *m_msg_config;
+ };
}
- class My_Timer_Thread : public yazpp_1::ISocketObserver {
- private:
- yazpp_1::ISocketObservable *m_obs;
- Pipe m_pipe;
- bool m_timeout;
- public:
- My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
- void socketNotify(int event);
- bool timeout();
- };
- class ZAssocChild : public yazpp_1::Z_Assoc {
- public:
- ~ZAssocChild();
- ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
- mp::ThreadPoolSocketObserver *m_thread_pool_observer,
- const mp::Package *package,
- std::string route,
- const char *msg_config);
- int m_no_requests;
- std::string m_route;
- private:
- yazpp_1::IPDU_Observer* sessionNotify(
- yazpp_1::IPDU_Observable *the_PDU_Observable,
- int fd);
- void recv_GDU(Z_GDU *apdu, int len);
-
- void failNotify();
- void timeoutNotify();
- void connectNotify();
- private:
- mp::ThreadPoolSocketObserver *m_thread_pool_observer;
- mp::Session m_session;
- mp::Origin m_origin;
- bool m_delete_flag;
- const mp::Package *m_package;
- const char *m_msg_config;
- };
- class ThreadPoolPackage : public mp::IThreadPoolMsg {
- public:
- ThreadPoolPackage(mp::Package *package, mp::ZAssocChild *ses,
- const char *msg_config);
- ~ThreadPoolPackage();
- IThreadPoolMsg *handle();
- void result(const char *t_info);
- bool cleanup(void *info);
- private:
- yaz_timing_t timer;
- mp::ZAssocChild *m_assoc_child;
- mp::Package *m_package;
- const char *m_msg_config;
- };
- class ZAssocServer : public yazpp_1::Z_Assoc {
- public:
- ~ZAssocServer();
- ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable, int timeout,
- int connect_max, std::string route,
- const char *msg_config);
- void set_package(const mp::Package *package);
- void set_thread_pool(ThreadPoolSocketObserver *m_thread_pool_observer);
- private:
- yazpp_1::IPDU_Observer* sessionNotify(
- yazpp_1::IPDU_Observable *the_PDU_Observable,
- int fd);
- void recv_GDU(Z_GDU *apdu, int len);
-
- void failNotify();
- void timeoutNotify();
- void connectNotify();
- private:
- mp::ThreadPoolSocketObserver *m_thread_pool_observer;
- const mp::Package *m_package;
- int m_session_timeout;
- int m_connect_max;
- yazpp_1::LimitConnect limit_connect;
- std::string m_route;
- const char *m_msg_config;
- };
}
-mp::ThreadPoolPackage::ThreadPoolPackage(mp::Package *package,
- mp::ZAssocChild *ses,
- const char *msg_config) :
+yf::FrontendNet::ThreadPoolPackage::ThreadPoolPackage(mp::Package *package,
+ ZAssocChild *ses,
+ const char *msg_config) :
m_assoc_child(ses), m_package(package), m_msg_config(msg_config)
{
if (msg_config)
timer = 0;
}
-mp::ThreadPoolPackage::~ThreadPoolPackage()
+yf::FrontendNet::ThreadPoolPackage::~ThreadPoolPackage()
{
yaz_timing_destroy(&timer); // timer may be NULL
delete m_package;
}
-bool mp::ThreadPoolPackage::cleanup(void *info)
+bool yf::FrontendNet::ThreadPoolPackage::cleanup(void *info)
{
mp::Session *ses = (mp::Session *) info;
return *ses == m_package->session();
}
-void mp::ThreadPoolPackage::result(const char *t_info)
+void yf::FrontendNet::ThreadPoolPackage::result(const char *t_info)
{
m_assoc_child->m_no_requests--;
delete this;
}
-mp::IThreadPoolMsg *mp::ThreadPoolPackage::handle()
+mp::IThreadPoolMsg *yf::FrontendNet::ThreadPoolPackage::handle()
{
m_package->move(m_assoc_child->m_route);
return this;
}
-mp::ZAssocChild::ZAssocChild(yazpp_1::IPDU_Observable *PDU_Observable,
- mp::ThreadPoolSocketObserver *my_thread_pool,
- const mp::Package *package,
- std::string route,
- const char *msg_config)
+yf::FrontendNet::ZAssocChild::ZAssocChild(
+ yazpp_1::IPDU_Observable *PDU_Observable,
+ mp::ThreadPoolSocketObserver *my_thread_pool,
+ const mp::Package *package,
+ std::string route,
+ const char *msg_config)
: Z_Assoc(PDU_Observable), m_msg_config(msg_config)
{
m_thread_pool_observer = my_thread_pool;
}
-yazpp_1::IPDU_Observer *mp::ZAssocChild::sessionNotify(yazpp_1::IPDU_Observable
- *the_PDU_Observable, int fd)
+yazpp_1::IPDU_Observer *yf::FrontendNet::ZAssocChild::sessionNotify(
+ yazpp_1::IPDU_Observable
+ *the_PDU_Observable, int fd)
{
return 0;
}
-mp::ZAssocChild::~ZAssocChild()
+yf::FrontendNet::ZAssocChild::~ZAssocChild()
{
}
-void mp::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
+void yf::FrontendNet::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
{
m_no_requests++;
mp::Package *p = new mp::Package(m_session, m_origin);
- mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this,
- m_msg_config);
+ ThreadPoolPackage *tp = new ThreadPoolPackage(p, this, m_msg_config);
p->copy_route(*m_package);
p->request() = yazpp_1::GDU(z_pdu);
m_thread_pool_observer->put(tp);
}
-void mp::ZAssocChild::failNotify()
+void yf::FrontendNet::ZAssocChild::failNotify()
{
// TODO: send Package to signal "close"
if (m_session.is_closed())
mp::Package *p = new mp::Package(m_session, m_origin);
- mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this,
- m_msg_config);
+ ThreadPoolPackage *tp = new ThreadPoolPackage(p, this, m_msg_config);
p->copy_route(*m_package);
m_thread_pool_observer->cleanup(tp, &m_session);
m_thread_pool_observer->put(tp);
}
-void mp::ZAssocChild::timeoutNotify()
+void yf::FrontendNet::ZAssocChild::timeoutNotify()
{
failNotify();
}
-void mp::ZAssocChild::connectNotify()
+void yf::FrontendNet::ZAssocChild::connectNotify()
{
}
-mp::ZAssocServer::ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
- int timeout, int connect_max,
- std::string route, const char *msg_config)
- : Z_Assoc(PDU_Observable), m_session_timeout(timeout),
- m_connect_max(connect_max), m_route(route), m_msg_config(msg_config)
+yf::FrontendNet::ZAssocServer::ZAssocServer(
+ yazpp_1::IPDU_Observable *PDU_Observable,
+ int timeout, int connect_max,
+ std::string route, const char *msg_config)
+ :
+ Z_Assoc(PDU_Observable), m_session_timeout(timeout),
+ m_connect_max(connect_max), m_route(route), m_msg_config(msg_config)
{
m_package = 0;
}
-void mp::ZAssocServer::set_package(const mp::Package *package)
+void yf::FrontendNet::ZAssocServer::set_package(const mp::Package *package)
{
m_package = package;
}
-void mp::ZAssocServer::set_thread_pool(ThreadPoolSocketObserver *observer)
+void yf::FrontendNet::ZAssocServer::set_thread_pool(ThreadPoolSocketObserver *observer)
{
m_thread_pool_observer = observer;
}
-yazpp_1::IPDU_Observer *mp::ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
+yazpp_1::IPDU_Observer *yf::FrontendNet::ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
*the_PDU_Observable, int fd)
{
if (m_connect_max && con_sz > m_connect_max)
return 0;
}
- mp::ZAssocChild *my =
- new mp::ZAssocChild(the_PDU_Observable, m_thread_pool_observer,
- m_package, m_route, m_msg_config);
+ ZAssocChild *my = new ZAssocChild(the_PDU_Observable,
+ m_thread_pool_observer,
+ m_package, m_route, m_msg_config);
my->timeout(m_session_timeout);
return my;
}
-mp::ZAssocServer::~ZAssocServer()
+yf::FrontendNet::ZAssocServer::~ZAssocServer()
{
}
-void mp::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
+void yf::FrontendNet::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
{
}
-void mp::ZAssocServer::failNotify()
+void yf::FrontendNet::ZAssocServer::failNotify()
{
}
-void mp::ZAssocServer::timeoutNotify()
+void yf::FrontendNet::ZAssocServer::timeoutNotify()
{
}
-void mp::ZAssocServer::connectNotify()
+void yf::FrontendNet::ZAssocServer::connectNotify()
{
}
-mp::filter::FrontendNet::FrontendNet() : m_p(new Rep)
+yf::FrontendNet::FrontendNet() : m_p(new Rep)
{
m_p->m_no_threads = 5;
m_p->m_listen_duration = 0;
m_p->az = 0;
}
-mp::filter::FrontendNet::~FrontendNet()
+yf::FrontendNet::~FrontendNet()
{
if (m_p->az)
{
m_p->az = 0;
}
-void mp::filter::FrontendNet::stop() const
+void yf::FrontendNet::stop() const
{
if (m_p->az)
{
}
}
-bool mp::My_Timer_Thread::timeout()
+bool yf::FrontendNet::My_Timer_Thread::timeout()
{
return m_timeout;
}
-mp::My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
- int duration) :
+yf::FrontendNet::My_Timer_Thread::My_Timer_Thread(
+ yazpp_1::ISocketObservable *obs,
+ int duration) :
m_obs(obs), m_pipe(9123), m_timeout(false)
{
obs->addObserver(m_pipe.read_fd(), this);
obs->timeoutObserver(this, duration);
}
-void mp::My_Timer_Thread::socketNotify(int event)
+void yf::FrontendNet::My_Timer_Thread::socketNotify(int event)
{
m_timeout = true;
m_obs->deleteObserver(this);
}
-void mp::filter::FrontendNet::process(Package &package) const
+void yf::FrontendNet::process(Package &package) const
{
if (m_p->az == 0)
return;
delete tt;
}
-void mp::filter::FrontendNet::configure(const xmlNode * ptr, bool test_only,
- const char *path)
+void yf::FrontendNet::configure(const xmlNode * ptr, bool test_only,
+ const char *path)
{
if (!ptr || !ptr->children)
{
- throw mp::filter::FilterException("No ports for Frontend");
+ throw yf::FilterException("No ports for Frontend");
}
std::vector<Port> ports;
for (ptr = ptr->children; ptr; ptr = ptr->next)
std::string threads_str = mp::xml::get_text(ptr);
int threads = atoi(threads_str.c_str());
if (threads < 1)
- throw mp::filter::FilterException("Bad value for threads: "
+ throw yf::FilterException("Bad value for threads: "
+ threads_str);
m_p->m_no_threads = threads;
}
std::string timeout_str = mp::xml::get_text(ptr);
int timeout = atoi(timeout_str.c_str());
if (timeout < 1)
- throw mp::filter::FilterException("Bad value for timeout: "
+ throw yf::FilterException("Bad value for timeout: "
+ timeout_str);
m_p->m_session_timeout = timeout;
}
}
else
{
- throw mp::filter::FilterException("Bad element "
- + std::string((const char *)
- ptr->name));
+ throw yf::FilterException("Bad element "
+ + std::string((const char *)
+ ptr->name));
}
}
if (test_only)
set_ports(ports);
}
-void mp::filter::FrontendNet::set_ports(std::vector<std::string> &ports)
+void yf::FrontendNet::set_ports(std::vector<std::string> &ports)
{
std::vector<Port> nports;
size_t i;
}
-void mp::filter::FrontendNet::set_ports(std::vector<Port> &ports)
+void yf::FrontendNet::set_ports(std::vector<Port> &ports)
{
m_p->m_ports = ports;
- m_p->az = new mp::ZAssocServer *[m_p->m_ports.size()];
+ m_p->az = new yf::FrontendNet::ZAssocServer *[m_p->m_ports.size()];
- // Create mp::ZAssocServer for each port
+ // Create yf::FrontendNet::ZAssocServer for each port
size_t i;
for (i = 0; i<m_p->m_ports.size(); i++)
{
- // create a PDU assoc object (one per mp::ZAssocServer)
+ // create a PDU assoc object (one per yf::FrontendNet::ZAssocServer)
yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&m_p->mySocketManager);
// create ZAssoc with PDU Assoc
- m_p->az[i] = new mp::ZAssocServer(as,
+ m_p->az[i] = new yf::FrontendNet::ZAssocServer(as,
m_p->m_session_timeout,
m_p->m_connect_max,
m_p->m_ports[i].route,
m_p->m_msg_config.c_str() : 0);
if (m_p->az[i]->server(m_p->m_ports[i].port.c_str()))
{
- throw mp::filter::FilterException("Unable to bind to address "
- + std::string(m_p->m_ports[i].port));
+ throw yf::FilterException("Unable to bind to address "
+ + std::string(m_p->m_ports[i].port));
}
}
}
-void mp::filter::FrontendNet::set_listen_duration(int d)
+void yf::FrontendNet::set_listen_duration(int d)
{
m_p->m_listen_duration = d;
}
-static mp::filter::Base* filter_creator()
+static yf::Base* filter_creator()
{
- return new mp::filter::FrontendNet;
+ return new yf::FrontendNet;
}
extern "C" {