1 /* $Id: filter_load_balance.cpp,v 1.4 2007-01-04 13:03:17 marc Exp $
2 Copyright (c) 2005-2006, Index Data.
4 See the LICENSE file for details
11 #include "filter_load_balance.hpp"
14 #include <boost/thread/mutex.hpp>
15 #include <boost/date_time/posix_time/posix_time.hpp>
24 namespace mp = metaproxy_1;
25 namespace yf = mp::filter;
27 namespace metaproxy_1 {
29 class LoadBalance::Impl {
33 void process(metaproxy_1::Package & package);
34 void configure(const xmlNode * ptr);
36 // statistic manipulating functions,
37 void add_dead(unsigned long session_id);
38 //void clear_dead(unsigned long session_id);
39 void add_package(unsigned long session_id);
40 void remove_package(unsigned long session_id);
41 void add_session(unsigned long session_id, std::string target);
42 void remove_session(unsigned long session_id);
43 std::string find_session_target(unsigned long session_id);
46 unsigned int cost(std::string target);
47 unsigned int dead(std::string target);
52 unsigned int sessions;
53 unsigned int packages;
56 unsigned int c = sessions + packages + deads;
57 std::cout << "stats c:" << c
66 // local protected databases
68 std::map<std::string, TargetStat> m_target_stat;
69 std::map<unsigned long, std::string> m_session_target;
74 // define Pimpl wrapper forwarding to Impl
76 yf::LoadBalance::LoadBalance() : m_p(new Impl)
80 yf::LoadBalance::~LoadBalance()
81 { // must have a destructor because of boost::scoped_ptr
84 void yf::LoadBalance::configure(const xmlNode *xmlnode)
86 m_p->configure(xmlnode);
89 void yf::LoadBalance::process(mp::Package &package) const
91 m_p->process(package);
95 // define Implementation stuff
99 yf::LoadBalance::Impl::Impl()
103 yf::LoadBalance::Impl::~Impl()
107 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
111 void yf::LoadBalance::Impl::process(mp::Package &package)
114 bool is_closed_front = false;
115 bool is_closed_back = false;
117 // checking for closed front end packages
118 if (package.session().is_closed()){
119 is_closed_front = true;
122 Z_GDU *gdu_req = package.request().get();
124 // passing anything but z3950 packages
125 if (gdu_req && gdu_req->which == Z_GDU_Z3950){
127 // target selecting only on Z39.50 init request
128 if (gdu_req->u.z3950->which == Z_APDU_initRequest){
130 mp::odr odr_en(ODR_ENCODE);
131 Z_InitRequest *org_init = gdu_req->u.z3950->u.initRequest;
133 // extracting virtual hosts
134 std::list<std::string> vhosts;
136 mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
138 // choosing one target according to load-balancing algorithm
142 unsigned int cost = std::numeric_limits<unsigned int>::max();
144 { //locking scope for local databases
145 boost::mutex::scoped_lock scoped_lock(m_mutex);
147 // load-balancing algorithm goes here
148 //target = *vhosts.begin();
149 for(std::list<std::string>::const_iterator ivh
153 if ((*ivh).size() != 0){
155 = yf::LoadBalance::Impl::cost(*ivh);
163 // updating local database
164 add_session(package.session().id(), target);
165 yf::LoadBalance::Impl::cost(target);
166 add_package(package.session().id());
169 // copying new target into init package
170 mp::util::set_vhost_otherinfo(&(org_init->otherInfo),
172 package.request() = gdu_req;
176 // frontend Z39.50 close request is added to statistics and marked
177 else if (gdu_req->u.z3950->which == Z_APDU_close){
178 is_closed_front = true;
179 boost::mutex::scoped_lock scoped_lock(m_mutex);
180 add_package(package.session().id());
182 // any other Z39.50 package is added to statistics
184 boost::mutex::scoped_lock scoped_lock(m_mutex);
185 add_package(package.session().id());
189 // moving all package types
193 // checking for closed back end packages
194 if (package.session().is_closed())
195 is_closed_back = true;
197 Z_GDU *gdu_res = package.response().get();
199 // passing anything but z3950 packages
200 if (gdu_res && gdu_res->which == Z_GDU_Z3950){
202 // session closing only on Z39.50 close response
203 if (gdu_res->u.z3950->which == Z_APDU_close){
204 is_closed_back = true;
205 boost::mutex::scoped_lock scoped_lock(m_mutex);
206 remove_package(package.session().id());
208 // any other Z39.50 package is removed from statistics
210 boost::mutex::scoped_lock scoped_lock(m_mutex);
211 remove_package(package.session().id());
215 // finally removing sessions and marking deads
216 if (is_closed_back || is_closed_front){
217 boost::mutex::scoped_lock scoped_lock(m_mutex);
219 // marking backend dead if backend closed without fronted close
220 if (is_closed_front == false)
221 add_dead(package.session().id());
223 remove_session(package.session().id());
225 // making sure that package is closed
226 package.session().close();
230 // getting timestamp for receiving of package
231 //boost::posix_time::ptime receive_time
232 // = boost::posix_time::microsec_clock::local_time();
233 // //<< receive_time << " "
234 // //<< to_iso_string(receive_time) << " "
235 //<< to_iso_extended_string(receive_time) << " "
238 // statistic manipulating functions,
239 void yf::LoadBalance::Impl::add_dead(unsigned long session_id){
242 std::string target = find_session_target(session_id);
244 if (target.size() != 0){
245 std::map<std::string, TargetStat>::iterator itarg;
246 itarg = m_target_stat.find(target);
247 if (itarg != m_target_stat.end()
248 && itarg->second.deads < std::numeric_limits<unsigned int>::max()){
249 itarg->second.deads += 1;
250 std::cout << "add_dead " << session_id << " " << target
251 << " d:" << itarg->second.deads << "\n";
256 //void yf::LoadBalance::Impl::clear_dead(unsigned long session_id){
257 // std::cout << "clear_dead " << session_id << "\n";
260 void yf::LoadBalance::Impl::add_package(unsigned long session_id){
262 std::string target = find_session_target(session_id);
264 if (target.size() != 0){
265 std::map<std::string, TargetStat>::iterator itarg;
266 itarg = m_target_stat.find(target);
267 if (itarg != m_target_stat.end()
268 && itarg->second.packages
269 < std::numeric_limits<unsigned int>::max()){
270 itarg->second.packages += 1;
271 std::cout << "add_package " << session_id << " " << target
272 << " p:" << itarg->second.packages << "\n";
277 void yf::LoadBalance::Impl::remove_package(unsigned long session_id){
278 std::string target = find_session_target(session_id);
280 if (target.size() != 0){
281 std::map<std::string, TargetStat>::iterator itarg;
282 itarg = m_target_stat.find(target);
283 if (itarg != m_target_stat.end()
284 && itarg->second.packages > 0){
285 itarg->second.packages -= 1;
286 std::cout << "remove_package " << session_id << " " << target
287 << " p:" << itarg->second.packages << "\n";
292 void yf::LoadBalance::Impl::add_session(unsigned long session_id,
295 // finding and adding session
296 std::map<unsigned long, std::string>::iterator isess;
297 isess = m_session_target.find(session_id);
298 if (isess == m_session_target.end()){
299 m_session_target.insert(std::make_pair(session_id, target));
302 // finding and adding target statistics
303 std::map<std::string, TargetStat>::iterator itarg;
304 itarg = m_target_stat.find(target);
305 if (itarg == m_target_stat.end()){
308 stat.packages = 0; // no idea why the defaut constructor TargetStat()
309 stat.deads = 0; // is not initializig this correctly to zero ??
310 m_target_stat.insert(std::make_pair(target, stat));
311 std::cout << "add_session " << session_id << " " << target
314 else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
316 itarg->second.sessions += 1;
317 std::cout << "add_session " << session_id << " " << target
318 << " s:" << itarg->second.sessions << "\n";
322 void yf::LoadBalance::Impl::remove_session(unsigned long session_id){
327 std::map<unsigned long, std::string>::iterator isess;
328 isess = m_session_target.find(session_id);
329 if (isess == m_session_target.end())
332 target = isess->second;
334 // finding target statistics
335 std::map<std::string, TargetStat>::iterator itarg;
336 itarg = m_target_stat.find(target);
337 if (itarg == m_target_stat.end()){
338 m_session_target.erase(isess);
342 // counting session down
343 if (itarg->second.sessions > 0)
344 itarg->second.sessions -= 1;
346 std::cout << "remove_session " << session_id << " " << target
347 << " s:" << itarg->second.sessions << "\n";
349 // clearing empty sessions and targets
350 if (itarg->second.sessions == 0 && itarg->second.deads == 0 ){
351 m_target_stat.erase(itarg);
352 m_session_target.erase(isess);
357 yf::LoadBalance::Impl::find_session_target(unsigned long session_id){
360 std::map<unsigned long, std::string>::iterator isess;
361 isess = m_session_target.find(session_id);
362 if (isess != m_session_target.end())
363 target = isess->second;
369 unsigned int yf::LoadBalance::Impl::cost(std::string target){
373 if (target.size() != 0){
374 std::map<std::string, TargetStat>::iterator itarg;
375 itarg = m_target_stat.find(target);
376 if (itarg != m_target_stat.end()){
377 cost = itarg->second.cost();
381 //std::cout << "cost " << target << " c:" << cost << "\n";
385 unsigned int yf::LoadBalance::Impl::dead(std::string target){
389 if (target.size() != 0){
390 std::map<std::string, TargetStat>::iterator itarg;
391 itarg = m_target_stat.find(target);
392 if (itarg != m_target_stat.end()){
393 dead = itarg->second.deads;
397 //std::cout << "dead " << target << " d:" << dead << "\n";
404 static mp::filter::Base* filter_creator()
406 return new mp::filter::LoadBalance;
410 struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
421 * indent-tabs-mode: nil
422 * c-file-style: "stroustrup"
424 * vim: shiftwidth=4 tabstop=8 expandtab