-/* $Id: filter_multi.cpp,v 1.1 2006-01-15 20:03:14 adam Exp $
+/* $Id: filter_multi.cpp,v 1.2 2006-01-16 01:10:19 adam Exp $
Copyright (c) 2005, Index Data.
%LICENSE%
struct Multi::BackendSet {
BackendPtr m_backend;
- long size;
+ int m_count;
+ bool operator < (const BackendSet &k) const;
};
- struct Multi::Set {
- Set(std::string setname);
- Set();
- ~Set();
+ struct Multi::FrontendSet {
+ struct PresentJob {
+ BackendPtr m_backend;
+ int m_pos;
+ int m_inside_pos;
+ };
+ FrontendSet(std::string setname);
+ FrontendSet();
+ ~FrontendSet();
+
+ void round_robin(int pos, int number, std::list<PresentJob> &job);
std::list<BackendSet> m_backend_sets;
std::string m_setname;
bool m_is_multi;
bool m_in_use;
std::list<BackendPtr> m_backend_list;
- std::map<std::string,Multi::Set> m_sets;
- void multi_move();
+ std::map<std::string,Multi::FrontendSet> m_sets;
+
+ void multi_move(std::list<BackendPtr> &blist);
void init(Package &package, Z_GDU *gdu);
void close(Package &package);
void search(Package &package, Z_APDU *apdu);
-#if 0
void present(Package &package, Z_APDU *apdu);
- void scan(Package &package, Z_APDU *apdu);
-#endif
Rep *m_p;
};
struct Multi::Map {
using namespace yp2;
+bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
+{
+ return m_count < k.m_count;
+}
+
yf::Multi::Frontend::Frontend(Rep *rep)
{
m_p = rep;
}
}
-yf::Multi::Set::Set(std::string setname)
+yf::Multi::FrontendSet::FrontendSet(std::string setname)
: m_setname(setname)
{
}
-yf::Multi::Set::Set()
+yf::Multi::FrontendSet::FrontendSet()
{
}
-yf::Multi::Set::~Set()
+yf::Multi::FrontendSet::~FrontendSet()
{
}
}
}
-void yf::Multi::Frontend::multi_move()
+void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
{
std::list<BackendPtr>::const_iterator bit;
boost::thread_group g;
- for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
+ for (bit = blist.begin(); bit != blist.end(); bit++)
{
g.add_thread(new boost::thread(**bit));
}
g.join_all();
}
+
+void yf::Multi::FrontendSet::round_robin(int start, int number,
+ std::list<PresentJob> &jobs)
+{
+ int fetched = 0;
+ int p = 1;
+ bool eof = true;
+
+ std::list<int> pos;
+ std::list<int> inside_pos;
+ std::list<BackendSet>::const_iterator bsit;
+ for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
+ {
+ pos.push_back(1);
+ inside_pos.push_back(0);
+ }
+
+ std::list<int>::iterator psit = pos.begin();
+ std::list<int>::iterator esit = inside_pos.begin();
+ bsit = m_backend_sets.begin();
+ while (fetched < number)
+ {
+ if (bsit == m_backend_sets.end())
+ {
+ psit = pos.begin();
+ esit = inside_pos.begin();
+ bsit = m_backend_sets.begin();
+ if (eof)
+ break;
+ eof = true;
+ }
+ if (*psit <= bsit->m_count)
+ {
+ if (p >= start)
+ {
+ PresentJob job;
+ job.m_backend = bsit->m_backend;
+ job.m_pos = *psit;
+ job.m_inside_pos = *esit;
+ jobs.push_back(job);
+ (*esit)++;
+ fetched++;
+ }
+ (*psit)++;
+ p++;
+ eof = false;
+ }
+ psit++;
+ esit++;
+ bsit++;
+ }
+}
+
void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu)
{
Z_InitRequest *req = gdu->u.z3950->u.initRequest;
b->m_package->copy_filter(package);
}
- multi_move();
+ multi_move(m_backend_list);
// create the frontend init response based on each backend init response
yp2::odr odr;
p->request() = apdu_req;
p->copy_filter(package);
}
- multi_move();
+ multi_move(m_backend_list);
// look at each response
- int total_hits = 0;
+ FrontendSet resultSet(std::string(req->resultSetName));
+
+ int total_count = 0;
+ Z_Records *z_records_diag = 0; // no diagnostics (yet)
for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
{
PackagePtr p = (*bit)->m_package;
{
Z_APDU *b_apdu = gdu->u.z3950;
Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
-
- total_hits += *b_resp->resultCount;
+
+ // see we get any errors (AKA diagnstics)
+ if (b_resp->records)
+ {
+ if (b_resp->records->which == Z_Records_NSD
+ || b_resp->records->which == Z_Records_multipleNSD)
+ z_records_diag = b_resp->records;
+ // we may set this multiple times (TOO BAD!)
+ }
+ BackendSet backendSet;
+ backendSet.m_backend = *bit;
+ backendSet.m_count = *b_resp->resultCount;
+ total_count += *b_resp->resultCount;
+ resultSet.m_backend_sets.push_back(backendSet);
}
else
{
Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
- *f_resp->resultCount = total_hits;
+ if (z_records_diag)
+ {
+ // search error
+ f_resp->records = z_records_diag;
+ }
+ else
+ { // assume OK
+ m_sets[resultSet.m_setname] = resultSet;
+ }
+ *f_resp->resultCount = total_count;
+
+ package.response() = f_apdu;
+}
+void yf::Multi::Frontend::present(Package &package, Z_APDU *apdu_req)
+{
+ // create present request
+ Z_PresentRequest *req = apdu_req->u.presentRequest;
+
+ Sets_it it;
+ it = m_sets.find(std::string(req->resultSetId));
+ if (it == m_sets.end())
+ {
+ yp2::odr odr;
+ Z_APDU *apdu =
+ odr.create_presentResponse(
+ apdu_req,
+ YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
+ req->resultSetId);
+ package.response() = apdu;
+ return;
+ }
+ std::list<Multi::FrontendSet::PresentJob> jobs;
+ int start = *req->resultSetStartPoint;
+ int number = *req->numberOfRecordsRequested;
+ it->second.round_robin(start, number, jobs);
+
+ std::list<BackendPtr> present_backend_list;
+
+ std::list<BackendSet>::const_iterator bsit;
+ bsit = it->second.m_backend_sets.begin();
+ for (; bsit != it->second.m_backend_sets.end(); bsit++)
+ {
+ std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
+ int start = -1;
+ int end = -1;
+
+ for (jit = jobs.begin(); jit != jobs.end(); jit++)
+ {
+ if (jit->m_backend == bsit->m_backend)
+ {
+ if (start == -1 || jit->m_pos < start)
+ start = jit->m_pos;
+ if (end == -1 || jit->m_pos > end)
+ end = jit->m_pos;
+ }
+ }
+ if (start != -1)
+ {
+ PackagePtr p = bsit->m_backend->m_package;
+
+ *req->resultSetStartPoint = start;
+ *req->numberOfRecordsRequested = end - start + 1;
+
+ p->request() = apdu_req;
+ p->copy_filter(package);
+
+ present_backend_list.push_back(bsit->m_backend);
+ }
+ }
+ multi_move(present_backend_list);
+
+ // look at each response
+ Z_Records *z_records_diag = 0;
+
+ std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
+ for (; pbit != present_backend_list.end(); pbit++)
+ {
+ PackagePtr p = (*pbit)->m_package;
+
+ if (p->session().is_closed()) // if any backend closes, close frontend
+ package.session().close();
+
+ Z_GDU *gdu = p->response().get();
+ if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
+ Z_APDU_presentResponse)
+ {
+ Z_APDU *b_apdu = gdu->u.z3950;
+ Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
+
+ // see we get any errors (AKA diagnstics)
+ if (b_resp->records)
+ {
+ if (b_resp->records->which != Z_Records_DBOSD)
+ z_records_diag = b_resp->records;
+ // we may set this multiple times (TOO BAD!)
+ }
+ }
+ else
+ {
+ // if any target does not return present response - return that
+ package.response() = p->response();
+ return;
+ }
+ }
+
+ yp2::odr odr;
+ Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
+ Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
+
+ if (z_records_diag)
+ {
+ f_resp->records = z_records_diag;
+ *f_resp->presentStatus = Z_PresentStatus_failure;
+ }
+ else
+ {
+ f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
+ Z_Records * records = f_resp->records;
+ records->which = Z_Records_DBOSD;
+ records->u.databaseOrSurDiagnostics =
+ (Z_NamePlusRecordList *)
+ odr_malloc(odr, sizeof(Z_NamePlusRecordList));
+ Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
+ nprl->num_records = jobs.size();
+ nprl->records = (Z_NamePlusRecord**)
+ odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
+ int i = 0;
+ std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
+ for (jit = jobs.begin(); jit != jobs.end(); jit++)
+ {
+ PackagePtr p = jit->m_backend->m_package;
+
+ Z_GDU *gdu = p->response().get();
+ Z_APDU *b_apdu = gdu->u.z3950;
+ Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
+
+ nprl->records[i++] =
+ b_resp->records->u.databaseOrSurDiagnostics->
+ records[jit->m_inside_pos];
+ }
+ *f_resp->nextResultSetPosition = start + i;
+ *f_resp->numberOfRecordsReturned = i;
+ }
package.response() = f_apdu;
}
{
f->search(package, apdu);
}
+ else if (apdu->which == Z_APDU_presentRequest)
+ {
+ f->present(package, apdu);
+ }
else
{
yp2::odr odr;