all tests on Unix.
-/* $Id: filter.cpp,v 1.2 2005-10-31 09:40:18 marc Exp $
+/* $Id: filter.cpp,v 1.3 2005-11-07 21:57:10 adam Exp $
Copyright (c) 2005, Index Data.
%LICENSE%
#include <stdexcept>
+#include "config.hpp"
#include "filter.hpp"
-/* $Id: filter_factory.hpp,v 1.4 2005-10-31 09:40:18 marc Exp $
+/* $Id: filter_factory.hpp,v 1.5 2005-11-07 21:57:10 adam Exp $
Copyright (c) 2005, Index Data.
%LICENSE%
#include <string>
#include <map>
-#include "config.hpp"
#include "filter.hpp"
-/* $Id: filter_frontend_net.cpp,v 1.8 2005-11-07 12:31:43 adam Exp $
+/* $Id: filter_frontend_net.cpp,v 1.9 2005-11-07 21:57:10 adam Exp $
Copyright (c) 2005, Index Data.
%LICENSE%
#include "config.hpp"
+#include "pipe.hpp"
#include "filter.hpp"
#include "router.hpp"
#include "package.hpp"
class My_Timer_Thread : public yazpp_1::ISocketObserver {
private:
yazpp_1::ISocketObservable *m_obs;
- int m_fd[2];
+ Pipe m_pipe;
bool m_timeout;
public:
My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
yp2::My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
int duration) :
- m_obs(obs), m_timeout(false)
+ m_obs(obs), m_pipe(9123), m_timeout(false)
{
- pipe(m_fd);
- obs->addObserver(m_fd[0], this);
+ obs->addObserver(m_pipe.read_fd(), this);
obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
obs->timeoutObserver(this, duration);
}
{
m_timeout = true;
m_obs->deleteObserver(this);
- close(m_fd[0]);
- close(m_fd[1]);
}
void yp2::filter::FrontendNet::process(Package &package) const {
-/* $Id: pipe.cpp,v 1.1 2005-11-07 12:32:01 adam Exp $
+/* $Id: pipe.cpp,v 1.2 2005-11-07 21:57:10 adam Exp $
Copyright (c) 2005, Index Data.
%LICENSE%
#include <unistd.h>
#endif
+#include <errno.h>
#ifdef WIN32
#include <winsock.h>
#else
#include <netdb.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
+
+#include <fcntl.h>
#endif
#if HAVE_SYS_SOCKET_H
Rep();
int m_fd[2];
int m_socket;
+ bool nonblock(int s);
};
}
m_socket = -1;
}
+bool Pipe::Rep::nonblock(int s)
+{
+#ifdef WIN32
+ if (ioctlsocket(s, FIONBIO, &tru) < 0)
+ return false;
+#else
+ if (fcntl(s, F_SETFL, O_NONBLOCK) < 0)
+ return false;
+#ifndef MSG_NOSIGNAL
+ signal (SIGPIPE, SIG_IGN);
+#endif
+#endif
+ return true;
+}
+
Pipe::Pipe(int port_to_use) : m_p(new Rep)
{
if (port_to_use)
{
+ // create server socket
m_p->m_socket = socket(AF_INET, SOCK_STREAM, 0);
if (m_p->m_socket < 0)
throw Pipe::Error("could not create socket");
-
- m_p->m_fd[1] = socket(AF_INET, SOCK_STREAM, 0);
- if (m_p->m_fd[1] < 0)
- throw Pipe::Error("could not create socket");
-
+#ifndef WIN32
+ unsigned long one = 1;
+ if (setsockopt(m_p->m_socket, SOL_SOCKET, SO_REUSEADDR, (char*)
+ &one, sizeof(one)) < 0)
+ throw Pipe::Error("setsockopt error");
+#endif
+ // bind server socket
struct sockaddr_in add;
add.sin_family = AF_INET;
add.sin_port = htons(port_to_use);
add.sin_addr.s_addr = INADDR_ANY;
struct sockaddr *addr = ( struct sockaddr *) &add;
-
+
if (bind(m_p->m_socket, addr, sizeof(struct sockaddr_in)))
throw Pipe::Error("could not bind on socket");
if (listen(m_p->m_socket, 3) < 0)
throw Pipe::Error("could not listen on socket");
+ // client socket
+ in_addr_t tmpadd;
+ tmpadd = (unsigned) inet_addr("127.0.0.1");
+ if (tmpadd)
+ memcpy(&add.sin_addr.s_addr, &tmpadd, sizeof(struct in_addr));
+ else
+ throw Pipe::Error("inet_addr failed");
+
+ m_p->m_fd[1] = socket(AF_INET, SOCK_STREAM, 0);
+ if (m_p->m_fd[1] < 0)
+ throw Pipe::Error("could not create socket");
+
+ m_p->nonblock(m_p->m_fd[1]);
+
+ if (connect(m_p->m_fd[1], addr, sizeof(*addr)) < 0 &&
+ errno != EINPROGRESS)
+ {
+ fprintf(stderr, "errno=%d[%s] tmpadd=%x\n",
+ errno, strerror(errno), tmpadd);
+ throw Pipe::Error("could not connect to socket");
+ }
+
+ // server accept
struct sockaddr caddr;
socklen_t caddr_len = sizeof(caddr);
m_p->m_fd[0] = accept(m_p->m_socket, &caddr, &caddr_len);
if (m_p->m_fd[0] < 0)
throw Pipe::Error("could not accept on socket");
-
- if (connect(m_p->m_fd[1], addr, sizeof(addr)) < 0)
- throw Pipe::Error("could not connect to socket");
+
+ // complete connect
+ fd_set write_set;
+ FD_ZERO(&write_set);
+ FD_SET(m_p->m_fd[1], &write_set);
+ int r = select(m_p->m_fd[1]+1, 0, &write_set, 0, 0);
+ if (r != 1)
+ throw Pipe::Error("could not complete connect");
+
+ close(m_p->m_socket);
+ m_p->m_socket = -1;
}
else
{
- m_p->m_socket = 0;
pipe(m_p->m_fd);
}
}
-/* $Id: pipe.hpp,v 1.1 2005-11-07 12:32:01 adam Exp $
+/* $Id: pipe.hpp,v 1.2 2005-11-07 21:57:10 adam Exp $
Copyright (c) 2005, Index Data.
%LICENSE%
#ifndef YP2_PIPE_HPP
#define YP2_PIPE_HPP
+#include <stdexcept>
#include <boost/scoped_ptr.hpp>
#include <yaz/yconfig.h>
-/* $Id: test_pipe.cpp,v 1.1 2005-11-07 12:32:01 adam Exp $
+/* $Id: test_pipe.cpp,v 1.2 2005-11-07 21:57:10 adam Exp $
Copyright (c) 2005, Index Data.
%LICENSE%
using namespace boost::unit_test;
-class My_Timer_Thread : public yazpp_1::ISocketObserver {
+class Timer : public yazpp_1::ISocketObserver {
private:
yazpp_1::ISocketObservable *m_obs;
yp2::Pipe m_pipe;
bool m_timeout;
public:
- My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
+ Timer(yazpp_1::ISocketObservable *obs, int duration);
void socketNotify(int event);
bool timeout() { return m_timeout; };
};
-My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
+Timer::Timer(yazpp_1::ISocketObservable *obs,
int duration) :
m_obs(obs), m_pipe(0), m_timeout(false)
{
obs->timeoutObserver(this, duration);
}
-void My_Timer_Thread::socketNotify(int event)
+void Timer::socketNotify(int event)
{
m_timeout = true;
m_obs->deleteObserver(this);
{
yazpp_1::SocketManager mySocketManager;
- yp2::Pipe pipe(0);
+ yp2::Pipe pipe(9999);
- My_Timer_Thread t(&mySocketManager, 0);
+ Timer t(&mySocketManager, 0);
while (mySocketManager.processEvent() > 0)
if (t.timeout())
break;
- BOOST_CHECK (t.timeout());
+ BOOST_CHECK(t.timeout());
}
/*
-/* $Id: test_thread_pool_observer.cpp,v 1.6 2005-10-15 14:09:09 adam Exp $
+/* $Id: test_thread_pool_observer.cpp,v 1.7 2005-11-07 21:57:10 adam Exp $
Copyright (c) 2005, Index Data.
%LICENSE%
*/
-/* $Id: test_thread_pool_observer.cpp,v 1.6 2005-10-15 14:09:09 adam Exp $
+/* $Id: test_thread_pool_observer.cpp,v 1.7 2005-11-07 21:57:10 adam Exp $
Copyright (c) 1998-2005, Index Data.
This file is part of the yaz-proxy.
#include <yaz++/pdu-assoc.h>
#include <yaz++/socket-manager.h>
#include <yaz/log.h>
+#include "pipe.hpp"
#include "thread_pool_observer.hpp"
#define BOOST_AUTO_TEST_MAIN
class My_Timer_Thread : public ISocketObserver {
private:
ISocketObservable *m_obs;
- int m_fd[2];
+ yp2::Pipe m_pipe;
yp2::ThreadPoolSocketObserver *m_t;
public:
int m_sum;
}
My_Timer_Thread::My_Timer_Thread(ISocketObservable *obs,
- yp2::ThreadPoolSocketObserver *t) : m_obs(obs)
+ yp2::ThreadPoolSocketObserver *t) :
+ m_obs(obs), m_pipe(9123)
{
- pipe(m_fd);
m_t = t;
m_sum = 0;
m_requests = 0;
m_responses = 0;
- obs->addObserver(m_fd[0], this);
+ obs->addObserver(m_pipe.read_fd(), this);
obs->maskObserver(this, SOCKET_OBSERVE_READ);
obs->timeoutObserver(this, 0);
}
-/* $Id: thread_pool_observer.cpp,v 1.10 2005-11-07 12:31:05 adam Exp $
+/* $Id: thread_pool_observer.cpp,v 1.11 2005-11-07 21:57:10 adam Exp $
Copyright (c) 2005, Index Data.
%LICENSE%
#include <yaz/log.h>
#include "thread_pool_observer.hpp"
+#include "pipe.hpp"
namespace yp2 {
class ThreadPoolSocketObserver::Worker {
~Rep();
private:
yazpp_1::ISocketObservable *m_socketObservable;
- int m_fd[2];
+ Pipe m_pipe;
boost::thread_group m_thrds;
boost::mutex m_mutex_input_data;
boost::condition m_cond_input_data;
using namespace yp2;
ThreadPoolSocketObserver::Rep::Rep(ISocketObservable *obs)
- : m_socketObservable(obs)
+ : m_socketObservable(obs), m_pipe(9123)
{
}
int no_threads)
: m_p(new Rep(obs))
{
- pipe(m_p->m_fd);
- obs->addObserver(m_p->m_fd[0], this);
+ obs->addObserver(m_p->m_pipe.read_fd(), this);
obs->maskObserver(this, SOCKET_OBSERVE_READ);
m_p->m_stop_flag = false;
m_p->m_thrds.join_all();
m_p->m_socketObservable->deleteObserver(this);
-
- close(m_p->m_fd[0]);
- close(m_p->m_fd[1]);
}
void ThreadPoolSocketObserver::socketNotify(int event)
if (event & SOCKET_OBSERVE_READ)
{
char buf[2];
- read(m_p->m_fd[0], buf, 1);
+ read(m_p->m_pipe.read_fd(), buf, 1);
IThreadPoolMsg *out;
{
boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
{
boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
m_p->m_output.push_back(out);
- write(m_p->m_fd[1], "", 1);
+ write(m_p->m_pipe.write_fd(), "", 1);
}
}
}