From 3b24fcbdb74c1a318b53ff536b183e48841c251c Mon Sep 17 00:00:00 2001 From: Adam Dickmeiss Date: Tue, 5 Feb 2013 15:56:35 +0100 Subject: [PATCH] session lock work (completely untested) --- perf/bash/par.sh | 4 +-- src/client.c | 50 +++++++++++---------------- src/connection.c | 5 +-- src/http_command.c | 7 ++-- src/ppmutex.c | 98 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/ppmutex.h | 23 +++++++++++- src/session.c | 95 +++++++++++++++++++++++++++----------------------- src/session.h | 14 ++++++-- 8 files changed, 210 insertions(+), 86 deletions(-) diff --git a/perf/bash/par.sh b/perf/bash/par.sh index 7072a3f..83f7e56 100755 --- a/perf/bash/par.sh +++ b/perf/bash/par.sh @@ -2,9 +2,9 @@ DELAY=0.001 WAIT=5 NUMBER=5 -ROUNDS=5 +ROUNDS=500 PORT=9004 -SERVICE=perf_t +SERVICE=perf SHUTDOWN=1 HOST=127.0.0.1 if test -n "$1"; then diff --git a/src/client.c b/src/client.c index 0cf2e07..94414ff 100644 --- a/src/client.c +++ b/src/client.c @@ -125,7 +125,6 @@ struct client { enum client_state state; struct show_raw *show_raw; ZOOM_resultset resultset; - YAZ_MUTEX mutex; int ref_count; char *id; facet_limits_t facet_limits; @@ -513,22 +512,26 @@ static void ingest_raw_record(struct client *cl, ZOOM_record rec) void client_check_preferred_watch(struct client *cl) { struct session *se = cl->session; + + session_enter_ro(se, "client_check_preferred_watch"); yaz_log(YLOG_DEBUG, "client_check_preferred_watch: %s ", client_get_id(cl)); if (se) { - client_unlock(cl); /* TODO possible threading issue. Session can have been destroyed */ + assert(cl->session); if (session_is_preferred_clients_ready(se)) { + assert(cl->session); session_alert_watch(se, SESSION_WATCH_SHOW_PREF); + assert(cl->session); } else yaz_log(YLOG_DEBUG, "client_check_preferred_watch: Still locked on preferred targets."); - - client_lock(cl); + assert(cl->session); } else yaz_log(YLOG_WARN, "client_check_preferred_watch: %s. No session!", client_get_id(cl)); + session_leave_ro(se, "client_check_preferred_watch"); } struct suggestions* client_suggestions_create(const char* suggestions_string); @@ -542,6 +545,7 @@ void client_search_response(struct client *cl) const char *error, *addinfo = 0; + session_enter_rw(cl->session, "client_search_response"); if (ZOOM_connection_error(link, &error, &addinfo)) { cl->hits = 0; @@ -559,23 +563,22 @@ void client_search_response(struct client *cl) client_suggestions_destroy(cl); cl->suggestions = client_suggestions_create(ZOOM_resultset_option_get(resultset, "suggestions")); } + session_leave_rw(cl->session, "client_search_response"); } void client_got_records(struct client *cl) { struct session *se = cl->session; - if (se) + + session_enter_ro(se, "client_got_records"); + if (reclist_get_num_records(se->reclist) > 0) { - if (reclist_get_num_records(se->reclist) > 0) - { - client_unlock(cl); - session_alert_watch(se, SESSION_WATCH_SHOW); - session_alert_watch(se, SESSION_WATCH_BYTARGET); - session_alert_watch(se, SESSION_WATCH_TERMLIST); - session_alert_watch(se, SESSION_WATCH_RECORD); - client_lock(cl); - } + session_alert_watch(se, SESSION_WATCH_SHOW); + session_alert_watch(se, SESSION_WATCH_BYTARGET); + session_alert_watch(se, SESSION_WATCH_TERMLIST); + session_alert_watch(se, SESSION_WATCH_RECORD); } + session_leave_ro(se, "client_got_records"); } static void client_record_ingest(struct client *cl) @@ -652,6 +655,7 @@ void client_record_response(struct client *cl) ZOOM_resultset resultset = cl->resultset; const char *error, *addinfo; + session_enter_rw(cl->session, "client_record_response"); if (ZOOM_connection_error(link, &error, &addinfo)) { client_set_state(cl, Client_Error); @@ -680,6 +684,7 @@ void client_record_response(struct client *cl) client_record_ingest(cl); } } + session_leave_rw(cl->session, "client_record_response"); } int client_reingest(struct client *cl) @@ -946,8 +951,6 @@ struct client *client_create(const char *id) cl->show_raw = 0; cl->resultset = 0; cl->suggestions = 0; - cl->mutex = 0; - pazpar2_mutex_create(&cl->mutex, "client"); cl->preferred = 0; cl->ref_count = 1; cl->facet_limits = 0; @@ -960,19 +963,9 @@ struct client *client_create(const char *id) return cl; } -void client_lock(struct client *c) -{ - yaz_mutex_enter(c->mutex); -} - -void client_unlock(struct client *c) -{ - yaz_mutex_leave(c->mutex); -} - void client_incref(struct client *c) { - pazpar2_incref(&c->ref_count, c->mutex); + c->ref_count++; yaz_log(YLOG_DEBUG, "client_incref c=%p %s cnt=%d", c, client_get_id(c), c->ref_count); } @@ -983,7 +976,7 @@ int client_destroy(struct client *c) { yaz_log(YLOG_DEBUG, "client_destroy c=%p %s cnt=%d", c, client_get_id(c), c->ref_count); - if (!pazpar2_decref(&c->ref_count, c->mutex)) + if (--c->ref_count == 0) { xfree(c->pquery); c->pquery = 0; @@ -1003,7 +996,6 @@ int client_destroy(struct client *c) { ZOOM_resultset_destroy(c->resultset); } - yaz_mutex_destroy(&c->mutex); xfree(c); client_use(-1); return 1; diff --git a/src/connection.c b/src/connection.c index 9ca2142..9a29f90 100644 --- a/src/connection.c +++ b/src/connection.c @@ -323,9 +323,8 @@ static void connection_handler(IOCHAN iochan, int event) else if (event & EVENT_TIMEOUT) { ZOOM_connection_fire_event_timeout(co->link); - client_lock(cl); + non_block_events(co); - client_unlock(cl); remove_connection_from_host(co); yaz_mutex_leave(host->mutex); @@ -335,13 +334,11 @@ static void connection_handler(IOCHAN iochan, int event) { yaz_mutex_leave(host->mutex); - client_lock(cl); non_block_events(co); ZOOM_connection_fire_event_socket(co->link, event); non_block_events(co); - client_unlock(cl); if (co->link) { diff --git a/src/http_command.c b/src/http_command.c index f2f9b20..4f6c5c2 100644 --- a/src/http_command.c +++ b/src/http_command.c @@ -162,7 +162,7 @@ struct http_session *http_session_create(struct conf_service *service, char tmp_str[50]; sprintf(tmp_str, "session#%u", sesid); - r->psession = new_session(nmem, service, sesid); + r->psession = session_create(nmem, service, sesid); r->session_id = sesid; r->timestamp = 0; r->nmem = nmem; @@ -1098,6 +1098,7 @@ static void show_records(struct http_channel *c, struct http_session *s, int act if (!s) return; + session_enter_ro(s->psession, "show_records"); // We haven't counted clients yet if we're called on a block release if (active < 0) active = session_active_clients(s->psession); @@ -1114,8 +1115,8 @@ static void show_records(struct http_channel *c, struct http_session *s, int act if (!(sp = reclist_parse_sortparms(c->nmem, sort, service))) { error(rs, PAZPAR2_MALFORMED_PARAMETER_VALUE, "sort"); + session_leave_ro(s->psession, "show_records"); return; - } rl = show_range_start(s->psession, sp, startn, &numn, &total, &total_hits, &approx_hits); @@ -1161,7 +1162,7 @@ static void show_records(struct http_channel *c, struct http_session *s, int act } show_range_stop(s->psession, rl); - + session_leave_ro(s->psession, "show_records"); response_close(c, "show"); } diff --git a/src/ppmutex.c b/src/ppmutex.c index 8212653..8ddb201 100644 --- a/src/ppmutex.c +++ b/src/ppmutex.c @@ -43,6 +43,104 @@ void pazpar2_mutex_create(YAZ_MUTEX *p, const char *name) yaz_mutex_set_name(*p, ppmutex_level, name); } +int pazpar2_lock_rdwr_init(Pazpar2_lock_rdwr *p) +{ + p->readers_reading = 0; + p->writers_writing = 0; +#if YAZ_POSIX_THREADS + pthread_mutex_init(&p->mutex, 0); + pthread_cond_init(&p->lock_free, 0); +#endif + return 0; +} + +int pazpar2_lock_rdwr_destroy(Pazpar2_lock_rdwr *p) +{ + assert (p->readers_reading == 0); + assert (p->writers_writing == 0); +#if YAZ_POSIX_THREADS + pthread_mutex_destroy(&p->mutex); + pthread_cond_destroy(&p->lock_free); +#endif + return 0; +} + +int pazpar2_lock_rdwr_rlock(Pazpar2_lock_rdwr *p) +{ +#if YAZ_POSIX_THREADS + pthread_mutex_lock(& p->mutex); + while (p->writers_writing) + pthread_cond_wait(&p->lock_free, &p->mutex); + p->readers_reading++; + pthread_mutex_unlock(&p->mutex); +#endif + return 0; +} + +int pazpar2_lock_rdwr_wlock(Pazpar2_lock_rdwr *p) +{ +#if YAZ_POSIX_THREADS + pthread_mutex_lock(&p->mutex); + while (p->writers_writing || p->readers_reading) + pthread_cond_wait(&p->lock_free, &p->mutex); + p->writers_writing++; + pthread_mutex_unlock(&p->mutex); +#endif + return 0; +} + +int pazpar2_lock_rdwr_upgrade(Pazpar2_lock_rdwr *p) +{ +#if YAZ_POSIX_THREADS + pthread_mutex_lock(&p->mutex); + --p->readers_reading; + while (p->writers_writing || p->readers_reading) + pthread_cond_wait(&p->lock_free, &p->mutex); + p->writers_writing++; + pthread_mutex_unlock(&p->mutex); +#endif + return 0; +} + +int pazpar2_lock_rdwr_runlock(Pazpar2_lock_rdwr *p) +{ +#if YAZ_POSIX_THREADS + pthread_mutex_lock(&p->mutex); + if (p->readers_reading == 0) + { + pthread_mutex_unlock(&p->mutex); + return -1; + } + else + { + p->readers_reading--; + if (p->readers_reading == 0) + pthread_cond_signal(&p->lock_free); + pthread_mutex_unlock(&p->mutex); + } +#endif + return 0; +} + +int pazpar2_lock_rdwr_wunlock(Pazpar2_lock_rdwr *p) +{ +#if YAZ_POSIX_THREADS + pthread_mutex_lock(&p->mutex); + if (p->writers_writing == 0) + { + pthread_mutex_unlock(&p->mutex); + return -1; + } + else + { + p->writers_writing--; + pthread_cond_broadcast(&p->lock_free); + pthread_mutex_unlock(&p->mutex); + } +#endif + return 0; +} + /* * Local variables: * c-basic-offset: 4 diff --git a/src/ppmutex.h b/src/ppmutex.h index a4159ec..a3c6146 100644 --- a/src/ppmutex.h +++ b/src/ppmutex.h @@ -22,14 +22,35 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef PAZPAR2_PPMUTEX_H -#define PAZPAR2_PPMUTEXF_H +#define PAZPAR2_PPMUTEX_H #include +#if YAZ_POSIX_THREADS +#include +#endif + void pazpar2_mutex_init(void); void pazpar2_mutex_create(YAZ_MUTEX *p, const char *name); +typedef struct { + int readers_reading; + int writers_writing; +#if YAZ_POSIX_THREADS + pthread_mutex_t mutex; + pthread_cond_t lock_free; +#endif +} Pazpar2_lock_rdwr; + +int pazpar2_lock_rdwr_init(Pazpar2_lock_rdwr *p); +int pazpar2_lock_rdwr_destroy(Pazpar2_lock_rdwr *p); +int pazpar2_lock_rdwr_rlock(Pazpar2_lock_rdwr *p); +int pazpar2_lock_rdwr_wlock(Pazpar2_lock_rdwr *p); +int pazpar2_lock_rdwr_runlock(Pazpar2_lock_rdwr *p); +int pazpar2_lock_rdwr_wunlock(Pazpar2_lock_rdwr *p); +int pazpar2_lock_rdwr_upgrade(Pazpar2_lock_rdwr *p); + #endif /* diff --git a/src/session.c b/src/session.c index 16f7794..4ce25c3 100644 --- a/src/session.c +++ b/src/session.c @@ -148,18 +148,36 @@ static void log_xml_doc(xmlDoc *doc) xmlFree(result); } -static void session_enter(struct session *s, const char *caller) +void session_enter_ro(struct session *s, const char *caller) { + assert(s); + if (caller) + session_log(s, YLOG_DEBUG, "Session read lock by %s", caller); + pazpar2_lock_rdwr_rlock(&s->lock); +} + +void session_enter_rw(struct session *s, const char *caller) +{ + assert(s); if (caller) - session_log(s, YLOG_DEBUG, "Session lock by %s", caller); - yaz_mutex_enter(s->session_mutex); + session_log(s, YLOG_DEBUG, "Session write lock by %s", caller); + pazpar2_lock_rdwr_wlock(&s->lock); } -static void session_leave(struct session *s, const char *caller) +void session_leave_ro(struct session *s, const char *caller) { - yaz_mutex_leave(s->session_mutex); + assert(s); if (caller) - session_log(s, YLOG_DEBUG, "Session unlock by %s", caller); + session_log(s, YLOG_DEBUG, "Session read unlock by %s", caller); + pazpar2_lock_rdwr_runlock(&s->lock); +} + +void session_leave_rw(struct session *s, const char *caller) +{ + assert(s); + if (caller) + session_log(s, YLOG_DEBUG, "Session write unlock by %s", caller); + pazpar2_lock_rdwr_wunlock(&s->lock); } static void session_normalize_facet(struct session *s, const char *type, @@ -464,7 +482,6 @@ int session_set_watch(struct session *s, int what, struct http_channel *chan) { int ret; - session_enter(s, "session_set_watch"); if (s->watchlist[what].fun) ret = -1; else @@ -476,14 +493,12 @@ int session_set_watch(struct session *s, int what, session_watch_cancel); ret = 0; } - session_leave(s, "session_set_watch"); return ret; } void session_alert_watch(struct session *s, int what) { assert(s); - session_enter(s, "session_alert_watch"); if (s->watchlist[what].fun) { /* our watch is no longer associated with http_channel */ @@ -500,13 +515,10 @@ void session_alert_watch(struct session *s, int what) s->watchlist[what].data = 0; s->watchlist[what].obs = 0; - session_leave(s, "session_alert_watch"); session_log(s, YLOG_DEBUG, "Alert Watch: %d calling function: %p", what, fun); fun(data); } - else - session_leave(s,"session_alert_watch"); } //callback for grep_databases @@ -546,18 +558,14 @@ static void session_reset_active_clients(struct session *se, { struct client_list *l; - session_enter(se, "session_reset_active_clients"); l = se->clients_active; se->clients_active = new_list; - session_leave(se, "session_reset_active_clients"); while (l) { struct client_list *l_next = l->next; - client_lock(l->client); client_set_session(l->client, 0); /* mark client inactive */ - client_unlock(l->client); xfree(l); l = l_next; @@ -570,18 +578,14 @@ static void session_remove_cached_clients(struct session *se) session_reset_active_clients(se, 0); - session_enter(se, "session_remove_cached_clients"); l = se->clients_cached; se->clients_cached = 0; - session_leave(se, "session_remove_cached_clients"); while (l) { struct client_list *l_next = l->next; - client_lock(l->client); client_set_session(l->client, 0); client_set_database(l->client, 0); - client_unlock(l->client); client_destroy(l->client); xfree(l); l = l_next; @@ -739,13 +743,14 @@ enum pazpar2_error_code session_search(struct session *se, *addinfo = 0; + session_enter_rw(se, "session_search"); + if (se->settings_modified) { session_remove_cached_clients(se); } else session_reset_active_clients(se, 0); - session_enter(se, "session_search"); se->settings_modified = 0; session_clear_set(se, sp); @@ -754,7 +759,7 @@ enum pazpar2_error_code session_search(struct session *se, live_channels = select_targets(se, filter); if (!live_channels) { - session_leave(se, "session_search"); + session_leave_ro(se, "session_search"); return PAZPAR2_NO_TARGETS; } @@ -763,13 +768,13 @@ enum pazpar2_error_code session_search(struct session *se, if (!se->facet_limits) { *addinfo = "limit"; - session_leave(se, "session_search"); + session_leave_ro(se, "session_search"); return PAZPAR2_MALFORMED_PARAMETER_VALUE; } l0 = se->clients_active; se->clients_active = 0; - session_leave(se, "session_search"); + session_leave_ro(se, "session_search"); for (l = l0; l; l = l->next) { @@ -912,6 +917,8 @@ void session_apply_setting(struct session *se, char *dbname, char *setting, void session_destroy(struct session *se) { struct session_database *sdb; + + session_enter_rw(se, "session_destroy"); session_log(se, YLOG_DEBUG, "Destroying"); session_use(-1); session_remove_cached_clients(se); @@ -928,22 +935,24 @@ void session_destroy(struct session *se) facet_limits_destroy(se->facet_limits); nmem_destroy(se->nmem); service_destroy(se->service); - yaz_mutex_destroy(&se->session_mutex); + + session_leave_rw(se, "session_destroy"); + pazpar2_lock_rdwr_destroy(&se->lock); } size_t session_get_memory_status(struct session *session) { size_t session_nmem; if (session == 0) return 0; - session_enter(session, "session_get_memory_status"); + session_enter_ro(session, "session_get_memory_status"); session_nmem = nmem_total(session->nmem); - session_leave(session, "session_get_memory_status"); + session_leave_ro(session, "session_get_memory_status"); return session_nmem; } -struct session *new_session(NMEM nmem, struct conf_service *service, - unsigned session_id) +struct session *session_create(NMEM nmem, struct conf_service *service, + unsigned session_id) { int i; struct session *session = nmem_malloc(nmem, sizeof(*session)); @@ -976,8 +985,9 @@ struct session *new_session(NMEM nmem, struct conf_service *service, session->watchlist[i].fun = 0; } session->normalize_cache = normalize_cache_create(); - session->session_mutex = 0; - pazpar2_mutex_create(&session->session_mutex, tmp_str); + + pazpar2_lock_rdwr_init(&session->lock); + session_use(1); return session; } @@ -1028,9 +1038,9 @@ static struct hitsbytarget *hitsbytarget_nb(struct session *se, struct hitsbytarget *get_hitsbytarget(struct session *se, int *count, NMEM nmem) { struct hitsbytarget *p; - session_enter(se, "get_hitsbytarget"); + session_enter_ro(se, "get_hitsbytarget"); p = hitsbytarget_nb(se, count, nmem); - session_leave(se, "get_hitsbytarget"); + session_leave_ro(se, "get_hitsbytarget"); return p; } @@ -1113,7 +1123,7 @@ void perform_termlist(struct http_channel *c, struct session *se, nmem_strsplit(nmem_tmp, ",", name, &names, &num_names); - session_enter(se, "perform_termlist"); + session_enter_ro(se, "perform_termlist"); for (j = 0; j < num_names; j++) { @@ -1176,7 +1186,7 @@ void perform_termlist(struct http_channel *c, struct session *se, wrbuf_puts(c->wrbuf, "\"/>\n"); } } - session_leave(se, "perform_termlist"); + session_leave_ro(se, "perform_termlist"); nmem_destroy(nmem_tmp); } @@ -1199,7 +1209,7 @@ struct record_cluster *show_single_start(struct session *se, const char *id, { struct record_cluster *r = 0; - session_enter(se, "show_single_start"); + session_enter_ro(se, "show_single_start"); *prev_r = 0; *next_r = 0; if (se->reclist) @@ -1219,13 +1229,13 @@ struct record_cluster *show_single_start(struct session *se, const char *id, reclist_leave(se->reclist); } if (!r) - session_leave(se, "show_single_start"); + session_leave_ro(se, "show_single_start"); return r; } void show_single_stop(struct session *se, struct record_cluster *rec) { - session_leave(se, "show_single_stop"); + session_leave_ro(se, "show_single_stop"); } @@ -1241,7 +1251,6 @@ struct record_cluster **show_range_start(struct session *se, #if USE_TIMING yaz_timing_t t = yaz_timing_create(); #endif - session_enter(se, "show_range_start"); *sumhits = 0; *approx_hits = 0; *total = 0; @@ -1297,7 +1306,6 @@ struct record_cluster **show_range_start(struct session *se, void show_range_stop(struct session *se, struct record_cluster **recs) { - session_leave(se, "show_range_stop"); } void statistics(struct session *se, struct statistics *stat) @@ -1604,10 +1612,9 @@ int ingest_record(struct client *cl, const char *rec, xmlFreeDoc(xdoc); return -1; } - session_enter(se, "ingest_record"); - if (client_get_session(cl) == se) - ret = ingest_to_cluster(cl, xdoc, root, record_no, mergekey_norm); - session_leave(se, "ingest_record"); + assert(client_get_session(cl) == se); + + ret = ingest_to_cluster(cl, xdoc, root, record_no, mergekey_norm); xmlFreeDoc(xdoc); return ret; diff --git a/src/session.h b/src/session.h index 66c41ba..197e01b 100644 --- a/src/session.h +++ b/src/session.h @@ -29,6 +29,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #include "termlists.h" #include "reclists.h" #include "http.h" +#include "ppmutex.h" struct record; struct client; @@ -117,7 +118,7 @@ struct session { int number_of_warnings_unknown_elements; int number_of_warnings_unknown_metadata; normalize_cache_t normalize_cache; - YAZ_MUTEX session_mutex; + Pazpar2_lock_rdwr lock; unsigned session_id; int settings_modified; facet_limits_t facet_limits; @@ -153,8 +154,8 @@ struct hitsbytarget { }; struct hitsbytarget *get_hitsbytarget(struct session *s, int *count, NMEM nmem); -struct session *new_session(NMEM nmem, struct conf_service *service, - unsigned session_id); +struct session *session_create(NMEM nmem, struct conf_service *service, + unsigned session_id); void session_destroy(struct session *s); void session_init_databases(struct session *s); void statistics(struct session *s, struct statistics *stat); @@ -190,6 +191,13 @@ void add_facet(struct session *s, const char *type, const char *value, int count int session_check_cluster_limit(struct session *se, struct record_cluster *rec); void perform_termlist(struct http_channel *c, struct session *se, const char *name, int num, int version); + +void session_enter_ro(struct session *s, const char *caller); +void session_leave_ro(struct session *s, const char *caller); + +void session_enter_rw(struct session *s, const char *caller); +void session_leave_rw(struct session *s, const char *caller); + void session_log(struct session *s, int level, const char *fmt, ...) #ifdef __GNUC__ __attribute__ ((format (printf, 3, 4))) -- 1.7.10.4