-/* $Id: filter_multi.cpp,v 1.25 2007-01-25 14:05:54 adam Exp $
+/* $Id: filter_multi.cpp,v 1.29 2007-11-26 21:21:12 adam Exp $
Copyright (c) 2005-2007, Index Data.
- See the LICENSE file for details
+This file is part of Metaproxy.
+
+Metaproxy is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free
+Software Foundation; either version 2, or (at your option) any later
+version.
+
+Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or
+FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+for more details.
+
+You should have received a copy of the GNU General Public License
+along with Metaproxy; see the file LICENSE. If not, write to the
+Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
+02111-1307, USA.
*/
+#include <yaz/log.h>
+
#include "config.hpp"
#include "filter.hpp"
namespace metaproxy_1 {
namespace filter {
-
+ enum multi_merge_type {
+ round_robin,
+ serve_order
+ };
struct Multi::BackendSet {
BackendPtr m_backend;
int m_count;
Z_Entry *get_entry(ODR odr);
};
struct Multi::FrontendSet {
- struct PresentJob {
+ class PresentJob {
+ public:
BackendPtr m_backend;
- int m_pos;
- int m_inside_pos;
+ int m_pos; // position for backend (1=first, 2=second,..
+ int m_start; // present request start
+ PresentJob(BackendPtr ptr, int pos) :
+ m_backend(ptr), m_pos(pos), m_start(0) {};
};
FrontendSet(std::string setname);
FrontendSet();
~FrontendSet();
void round_robin(int pos, int number, std::list<PresentJob> &job);
+ void serve_order(int pos, int number, std::list<PresentJob> &job);
std::list<BackendSet> m_backend_sets;
std::string m_setname;
boost::condition m_cond_session_ready;
std::map<mp::Session, FrontendPtr> m_clients;
bool m_hide_unavailable;
+ multi_merge_type m_merge_type;
};
}
}
yf::Multi::Rep::Rep()
{
m_hide_unavailable = false;
+ m_merge_type = round_robin;
}
bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
g.join_all();
}
+void yf::Multi::FrontendSet::serve_order(int start, int number,
+ std::list<PresentJob> &jobs)
+{
+ int i;
+ for (i = 0; i < number; i++)
+ {
+ std::list<BackendSet>::const_iterator bsit;
+ int voffset = 0;
+ int offset = start + i - 1;
+ for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end();
+ bsit++)
+ {
+ if (offset >= voffset && offset < voffset + bsit->m_count)
+ {
+ PresentJob job(bsit->m_backend, offset - voffset + 1);
+ jobs.push_back(job);
+ break;
+ }
+ voffset += bsit->m_count;
+ }
+ }
+}
+
void yf::Multi::FrontendSet::round_robin(int start, int number,
std::list<PresentJob> &jobs)
{
std::list<int> pos;
- std::list<int> inside_pos;
std::list<BackendSet>::const_iterator bsit;
for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
{
pos.push_back(1);
- inside_pos.push_back(0);
}
int p = 1;
{
more = false;
std::list<int>::iterator psit = pos.begin();
- std::list<int>::iterator esit = inside_pos.begin();
bsit = m_backend_sets.begin();
- for (; bsit != m_backend_sets.end(); psit++,esit++,bsit++)
+ for (; bsit != m_backend_sets.end(); psit++,bsit++)
{
if (fetched >= number)
{
{
if (p >= start)
{
- PresentJob job;
- job.m_backend = bsit->m_backend;
- job.m_pos = *psit;
- job.m_inside_pos = *esit;
+ PresentJob job(bsit->m_backend, *psit);
jobs.push_back(job);
- (*esit)++;
fetched++;
}
(*psit)++;
std::list<Multi::FrontendSet::PresentJob> jobs;
int start = *req->resultSetStartPoint;
int number = *req->numberOfRecordsRequested;
- it->second.round_robin(start, number, jobs);
+
+ if (m_p->m_merge_type == round_robin)
+ it->second.round_robin(start, number, jobs);
+ else if (m_p->m_merge_type == serve_order)
+ it->second.serve_order(start, number, jobs);
+
+ if (0)
+ {
+ std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
+ for (jit = jobs.begin(); jit != jobs.end(); jit++)
+ {
+ yaz_log(YLOG_LOG, "job pos=%d", jit->m_pos);
+ }
+ }
std::list<BackendPtr> present_backend_list;
bsit = it->second.m_backend_sets.begin();
for (; bsit != it->second.m_backend_sets.end(); bsit++)
{
- std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
int start = -1;
int end = -1;
-
- for (jit = jobs.begin(); jit != jobs.end(); jit++)
{
- if (jit->m_backend == bsit->m_backend)
+ std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
+ for (jit = jobs.begin(); jit != jobs.end(); jit++)
{
- if (start == -1 || jit->m_pos < start)
- start = jit->m_pos;
- if (end == -1 || jit->m_pos > end)
- end = jit->m_pos;
+ if (jit->m_backend == bsit->m_backend)
+ {
+ if (start == -1 || jit->m_pos < start)
+ start = jit->m_pos;
+ if (end == -1 || jit->m_pos > end)
+ end = jit->m_pos;
+ }
}
}
if (start != -1)
{
+ std::list<Multi::FrontendSet::PresentJob>::iterator jit;
+ for (jit = jobs.begin(); jit != jobs.end(); jit++)
+ {
+ if (jit->m_backend == bsit->m_backend)
+ {
+ if (jit->m_pos >= start && jit->m_pos <= end)
+ jit->m_start = start;
+ }
+ }
+
PackagePtr p = bsit->m_backend->m_package;
*req->resultSetStartPoint = start;
nprl->records[i] = (Z_NamePlusRecord*)
odr_malloc(odr, sizeof(Z_NamePlusRecord));
+ int inside_pos = jit->m_pos - jit->m_start;
+ if (inside_pos >= b_resp->records->
+ u.databaseOrSurDiagnostics->num_records)
+ break;
*nprl->records[i] = *b_resp->records->
- u.databaseOrSurDiagnostics->records[jit->m_inside_pos];
+ u.databaseOrSurDiagnostics->records[inside_pos];
nprl->records[i]->databaseName =
odr_strdup(odr, jit->m_backend->m_vhost.c_str());
}
+ nprl->num_records = i; // usually same as jobs.size();
*f_resp->nextResultSetPosition = start + i;
*f_resp->numberOfRecordsReturned = i;
}
{
m_p->m_hide_unavailable = true;
}
+ else if (!strcmp((const char *) ptr->name, "mergetype"))
+ {
+ std::string mergetype = mp::xml::get_text(ptr);
+ if (mergetype == "roundrobin")
+ m_p->m_merge_type = round_robin;
+ else if (mergetype == "serveorder")
+ m_p->m_merge_type = serve_order;
+ else
+ throw mp::filter::FilterException
+ ("Bad mergetype " + mergetype + " in multi filter");
+
+ }
else
{
throw mp::filter::FilterException
("Bad element "
+ std::string((const char *) ptr->name)
- + " in virt_db filter");
+ + " in multi filter");
}
}
}