1 /* $Id: filter_multi.cpp,v 1.1 2006-01-15 20:03:14 adam Exp $
2 Copyright (c) 2005, Index Data.
10 #include "package.hpp"
12 #include <boost/thread/thread.hpp>
13 #include <boost/thread/mutex.hpp>
14 #include <boost/thread/condition.hpp>
15 #include <boost/shared_ptr.hpp>
18 #include "filter_multi.hpp"
21 #include <yaz/otherinfo.h>
22 #include <yaz/diagbib1.h>
27 namespace yf = yp2::filter;
32 struct Multi::BackendSet {
37 Set(std::string setname);
41 std::list<BackendSet> m_backend_sets;
42 std::string m_setname;
44 struct Multi::Backend {
46 std::string m_backend_database;
49 void operator() (void); // thread operation
51 struct Multi::Frontend {
54 yp2::Session m_session;
57 std::list<BackendPtr> m_backend_list;
58 std::map<std::string,Multi::Set> m_sets;
60 void init(Package &package, Z_GDU *gdu);
61 void close(Package &package);
62 void search(Package &package, Z_APDU *apdu);
64 void present(Package &package, Z_APDU *apdu);
65 void scan(Package &package, Z_APDU *apdu);
70 Map(std::list<std::string> hosts, std::string route);
72 std::list<std::string> m_hosts;
77 friend class Frontend;
79 FrontendPtr get_frontend(Package &package);
80 void release_frontend(Package &package);
82 boost::mutex m_sessions_mutex;
83 std::map<std::string, Multi::Map>m_maps;
86 boost::condition m_cond_session_ready;
87 std::map<yp2::Session, FrontendPtr> m_clients;
94 yf::Multi::Frontend::Frontend(Rep *rep)
100 yf::Multi::Frontend::~Frontend()
104 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(Package &package)
106 boost::mutex::scoped_lock lock(m_mutex);
108 std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
112 it = m_clients.find(package.session());
113 if (it == m_clients.end())
116 if (!it->second->m_in_use)
118 it->second->m_in_use = true;
121 m_cond_session_ready.wait(lock);
123 FrontendPtr f(new Frontend(this));
124 m_clients[package.session()] = f;
129 void yf::Multi::Rep::release_frontend(Package &package)
131 boost::mutex::scoped_lock lock(m_mutex);
132 std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
134 it = m_clients.find(package.session());
135 if (it != m_clients.end())
137 if (package.session().is_closed())
139 it->second->close(package);
144 it->second->m_in_use = false;
146 m_cond_session_ready.notify_all();
150 yf::Multi::Set::Set(std::string setname)
156 yf::Multi::Set::Set()
161 yf::Multi::Set::~Set()
165 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
166 : m_hosts(hosts), m_route(route)
170 yf::Multi::Map::Map()
174 yf::Multi::Multi() : m_p(new Multi::Rep)
178 yf::Multi::~Multi() {
182 void yf::Multi::add_map_host2hosts(std::string host,
183 std::list<std::string> hosts,
186 m_p->m_maps[host] = Multi::Map(hosts, route);
189 void yf::Multi::Backend::operator() (void)
191 m_package->move(m_route);
194 void yf::Multi::Frontend::close(Package &package)
196 std::list<BackendPtr>::const_iterator bit;
197 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
201 b->m_package->copy_filter(package);
202 b->m_package->request() = (Z_GDU *) 0;
203 b->m_package->session().close();
204 b->m_package->move(b->m_route);
208 void yf::Multi::Frontend::multi_move()
210 std::list<BackendPtr>::const_iterator bit;
211 boost::thread_group g;
212 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
214 g.add_thread(new boost::thread(**bit));
219 void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu)
221 Z_InitRequest *req = gdu->u.z3950->u.initRequest;
223 // empty or non-existang vhost is the same..
224 const char *vhost_cstr =
225 yaz_oi_get_string_oidval(&req->otherInfo, VAL_PROXY, 1, 0);
228 vhost = std::string(vhost_cstr);
230 std::map<std::string, Map>::const_iterator it;
231 it = m_p->m_maps.find(std::string(vhost));
232 if (it == m_p->m_maps.end())
234 // might return diagnostics if no match
238 std::list<std::string>::const_iterator hit = it->second.m_hosts.begin();
239 for (; hit != it->second.m_hosts.end(); hit++)
242 Backend *b = new Backend;
244 b->m_route = it->second.m_route;
245 b->m_package = PackagePtr(new Package(s, package.origin()));
247 m_backend_list.push_back(BackendPtr(b));
249 // we're going to deal with this for sure..
253 // create init request
254 std::list<BackendPtr>::const_iterator bit;
255 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
259 Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
261 yaz_oi_set_string_oidval(&init_apdu->u.initRequest->otherInfo, odr,
262 VAL_PROXY, 1, b->m_vhost.c_str());
264 Z_InitRequest *req = init_apdu->u.initRequest;
266 ODR_MASK_SET(req->options, Z_Options_search);
267 ODR_MASK_SET(req->options, Z_Options_present);
268 ODR_MASK_SET(req->options, Z_Options_namedResultSets);
270 ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
271 ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
272 ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
274 b->m_package->request() = init_apdu;
276 b->m_package->copy_filter(package);
280 // create the frontend init response based on each backend init response
285 Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
286 Z_InitResponse *f_resp = f_apdu->u.initResponse;
288 ODR_MASK_SET(f_resp->options, Z_Options_search);
289 ODR_MASK_SET(f_resp->options, Z_Options_present);
290 ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
292 ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
293 ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
294 ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
296 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
298 PackagePtr p = (*bit)->m_package;
300 if (p->session().is_closed()) // if any backend closes, close frontend
301 package.session().close();
302 Z_GDU *gdu = p->response().get();
303 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
306 Z_APDU *b_apdu = gdu->u.z3950;
307 Z_InitResponse *b_resp = b_apdu->u.initResponse;
309 // common options for all backends
310 for (i = 0; i <= Z_Options_stringSchema; i++)
312 if (!ODR_MASK_GET(b_resp->options, i))
313 ODR_MASK_CLEAR(f_resp->options, i);
315 // common protocol version
316 for (i = 0; i <= Z_ProtocolVersion_3; i++)
317 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
318 ODR_MASK_CLEAR(f_resp->protocolVersion, i);
319 // reject if any of the backends reject
320 if (!*b_resp->result)
325 // if any target does not return init return that (close or
327 package.response() = p->response();
331 package.response() = f_apdu;
334 void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req)
336 // create search request
337 Z_SearchRequest *req = apdu_req->u.searchRequest;
339 // deal with piggy back (for now disable)
340 *req->smallSetUpperBound = 0;
341 *req->largeSetLowerBound = 1;
342 *req->mediumSetPresentNumber = 1;
344 std::list<BackendPtr>::const_iterator bit;
345 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
347 PackagePtr p = (*bit)->m_package;
348 // we don't modify database name yet!
350 p->request() = apdu_req;
351 p->copy_filter(package);
355 // look at each response
357 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
359 PackagePtr p = (*bit)->m_package;
361 if (p->session().is_closed()) // if any backend closes, close frontend
362 package.session().close();
364 Z_GDU *gdu = p->response().get();
365 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
366 Z_APDU_searchResponse)
368 Z_APDU *b_apdu = gdu->u.z3950;
369 Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
371 total_hits += *b_resp->resultCount;
375 // if any target does not return search response - return that
376 package.response() = p->response();
382 Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
383 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
385 *f_resp->resultCount = total_hits;
387 package.response() = f_apdu;
390 void yf::Multi::process(Package &package) const
392 FrontendPtr f = m_p->get_frontend(package);
394 Z_GDU *gdu = package.request().get();
396 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
397 Z_APDU_initRequest && !f->m_is_multi)
399 f->init(package, gdu);
401 else if (!f->m_is_multi)
403 else if (gdu && gdu->which == Z_GDU_Z3950)
405 Z_APDU *apdu = gdu->u.z3950;
406 if (apdu->which == Z_APDU_initRequest)
410 package.response() = odr.create_close(
412 Z_Close_protocolError,
415 package.session().close();
417 else if (apdu->which == Z_APDU_searchRequest)
419 f->search(package, apdu);
425 package.response() = odr.create_close(
426 apdu, Z_Close_protocolError,
427 "unsupported APDU in filter multi");
429 package.session().close();
432 m_p->release_frontend(package);
435 void yp2::filter::Multi::configure(const xmlNode * ptr)
437 for (ptr = ptr->children; ptr; ptr = ptr->next)
439 if (ptr->type != XML_ELEMENT_NODE)
441 if (!strcmp((const char *) ptr->name, "virtual"))
443 std::list<std::string> targets;
445 xmlNode *v_node = ptr->children;
446 for (; v_node; v_node = v_node->next)
448 if (v_node->type != XML_ELEMENT_NODE)
451 if (yp2::xml::is_element_yp2(v_node, "vhost"))
452 vhost = yp2::xml::get_text(v_node);
453 else if (yp2::xml::is_element_yp2(v_node, "target"))
454 targets.push_back(yp2::xml::get_text(v_node));
456 throw yp2::filter::FilterException
458 + std::string((const char *) v_node->name)
459 + " in virtual section"
462 std::string route = yp2::xml::get_route(ptr);
463 add_map_host2hosts(vhost, targets, route);
464 std::list<std::string>::const_iterator it;
465 for (it = targets.begin(); it != targets.end(); it++)
467 std::cout << "Add " << vhost << "->" << *it
468 << "," << route << "\n";
473 throw yp2::filter::FilterException
475 + std::string((const char *) ptr->name)
476 + " in virt_db filter");
481 static yp2::filter::Base* filter_creator()
483 return new yp2::filter::Multi;
487 struct yp2_filter_struct yp2_filter_multi = {
498 * indent-tabs-mode: nil
499 * c-file-style: "stroustrup"
501 * vim: shiftwidth=4 tabstop=8 expandtab