DELAY=0.001
WAIT=5
NUMBER=5
-ROUNDS=5
+ROUNDS=500
PORT=9004
-SERVICE=perf_t
+SERVICE=perf
SHUTDOWN=1
HOST=127.0.0.1
if test -n "$1"; then
enum client_state state;
struct show_raw *show_raw;
ZOOM_resultset resultset;
- YAZ_MUTEX mutex;
int ref_count;
char *id;
facet_limits_t facet_limits;
void client_check_preferred_watch(struct client *cl)
{
struct session *se = cl->session;
+
+ session_enter_ro(se, "client_check_preferred_watch");
yaz_log(YLOG_DEBUG, "client_check_preferred_watch: %s ", client_get_id(cl));
if (se)
{
- client_unlock(cl);
/* TODO possible threading issue. Session can have been destroyed */
+ assert(cl->session);
if (session_is_preferred_clients_ready(se)) {
+ assert(cl->session);
session_alert_watch(se, SESSION_WATCH_SHOW_PREF);
+ assert(cl->session);
}
else
yaz_log(YLOG_DEBUG, "client_check_preferred_watch: Still locked on preferred targets.");
-
- client_lock(cl);
+ assert(cl->session);
}
else
yaz_log(YLOG_WARN, "client_check_preferred_watch: %s. No session!", client_get_id(cl));
+ session_leave_ro(se, "client_check_preferred_watch");
}
struct suggestions* client_suggestions_create(const char* suggestions_string);
const char *error, *addinfo = 0;
+ session_enter_rw(cl->session, "client_search_response");
if (ZOOM_connection_error(link, &error, &addinfo))
{
cl->hits = 0;
client_suggestions_destroy(cl);
cl->suggestions = client_suggestions_create(ZOOM_resultset_option_get(resultset, "suggestions"));
}
+ session_leave_rw(cl->session, "client_search_response");
}
void client_got_records(struct client *cl)
{
struct session *se = cl->session;
- if (se)
+
+ session_enter_ro(se, "client_got_records");
+ if (reclist_get_num_records(se->reclist) > 0)
{
- if (reclist_get_num_records(se->reclist) > 0)
- {
- client_unlock(cl);
- session_alert_watch(se, SESSION_WATCH_SHOW);
- session_alert_watch(se, SESSION_WATCH_BYTARGET);
- session_alert_watch(se, SESSION_WATCH_TERMLIST);
- session_alert_watch(se, SESSION_WATCH_RECORD);
- client_lock(cl);
- }
+ session_alert_watch(se, SESSION_WATCH_SHOW);
+ session_alert_watch(se, SESSION_WATCH_BYTARGET);
+ session_alert_watch(se, SESSION_WATCH_TERMLIST);
+ session_alert_watch(se, SESSION_WATCH_RECORD);
}
+ session_leave_ro(se, "client_got_records");
}
static void client_record_ingest(struct client *cl)
ZOOM_resultset resultset = cl->resultset;
const char *error, *addinfo;
+ session_enter_rw(cl->session, "client_record_response");
if (ZOOM_connection_error(link, &error, &addinfo))
{
client_set_state(cl, Client_Error);
client_record_ingest(cl);
}
}
+ session_leave_rw(cl->session, "client_record_response");
}
int client_reingest(struct client *cl)
cl->show_raw = 0;
cl->resultset = 0;
cl->suggestions = 0;
- cl->mutex = 0;
- pazpar2_mutex_create(&cl->mutex, "client");
cl->preferred = 0;
cl->ref_count = 1;
cl->facet_limits = 0;
return cl;
}
-void client_lock(struct client *c)
-{
- yaz_mutex_enter(c->mutex);
-}
-
-void client_unlock(struct client *c)
-{
- yaz_mutex_leave(c->mutex);
-}
-
void client_incref(struct client *c)
{
- pazpar2_incref(&c->ref_count, c->mutex);
+ c->ref_count++;
yaz_log(YLOG_DEBUG, "client_incref c=%p %s cnt=%d",
c, client_get_id(c), c->ref_count);
}
{
yaz_log(YLOG_DEBUG, "client_destroy c=%p %s cnt=%d",
c, client_get_id(c), c->ref_count);
- if (!pazpar2_decref(&c->ref_count, c->mutex))
+ if (--c->ref_count == 0)
{
xfree(c->pquery);
c->pquery = 0;
{
ZOOM_resultset_destroy(c->resultset);
}
- yaz_mutex_destroy(&c->mutex);
xfree(c);
client_use(-1);
return 1;
else if (event & EVENT_TIMEOUT)
{
ZOOM_connection_fire_event_timeout(co->link);
- client_lock(cl);
+
non_block_events(co);
- client_unlock(cl);
remove_connection_from_host(co);
yaz_mutex_leave(host->mutex);
{
yaz_mutex_leave(host->mutex);
- client_lock(cl);
non_block_events(co);
ZOOM_connection_fire_event_socket(co->link, event);
non_block_events(co);
- client_unlock(cl);
if (co->link)
{
char tmp_str[50];
sprintf(tmp_str, "session#%u", sesid);
- r->psession = new_session(nmem, service, sesid);
+ r->psession = session_create(nmem, service, sesid);
r->session_id = sesid;
r->timestamp = 0;
r->nmem = nmem;
if (!s)
return;
+ session_enter_ro(s->psession, "show_records");
// We haven't counted clients yet if we're called on a block release
if (active < 0)
active = session_active_clients(s->psession);
if (!(sp = reclist_parse_sortparms(c->nmem, sort, service)))
{
error(rs, PAZPAR2_MALFORMED_PARAMETER_VALUE, "sort");
+ session_leave_ro(s->psession, "show_records");
return;
-
}
rl = show_range_start(s->psession, sp, startn, &numn, &total, &total_hits, &approx_hits);
}
show_range_stop(s->psession, rl);
-
+ session_leave_ro(s->psession, "show_records");
response_close(c, "show");
}
yaz_mutex_set_name(*p, ppmutex_level, name);
}
+int pazpar2_lock_rdwr_init(Pazpar2_lock_rdwr *p)
+{
+ p->readers_reading = 0;
+ p->writers_writing = 0;
+#if YAZ_POSIX_THREADS
+ pthread_mutex_init(&p->mutex, 0);
+ pthread_cond_init(&p->lock_free, 0);
+#endif
+ return 0;
+}
+
+int pazpar2_lock_rdwr_destroy(Pazpar2_lock_rdwr *p)
+{
+ assert (p->readers_reading == 0);
+ assert (p->writers_writing == 0);
+#if YAZ_POSIX_THREADS
+ pthread_mutex_destroy(&p->mutex);
+ pthread_cond_destroy(&p->lock_free);
+#endif
+ return 0;
+}
+
+int pazpar2_lock_rdwr_rlock(Pazpar2_lock_rdwr *p)
+{
+#if YAZ_POSIX_THREADS
+ pthread_mutex_lock(& p->mutex);
+ while (p->writers_writing)
+ pthread_cond_wait(&p->lock_free, &p->mutex);
+ p->readers_reading++;
+ pthread_mutex_unlock(&p->mutex);
+#endif
+ return 0;
+}
+
+int pazpar2_lock_rdwr_wlock(Pazpar2_lock_rdwr *p)
+{
+#if YAZ_POSIX_THREADS
+ pthread_mutex_lock(&p->mutex);
+ while (p->writers_writing || p->readers_reading)
+ pthread_cond_wait(&p->lock_free, &p->mutex);
+ p->writers_writing++;
+ pthread_mutex_unlock(&p->mutex);
+#endif
+ return 0;
+}
+
+int pazpar2_lock_rdwr_upgrade(Pazpar2_lock_rdwr *p)
+{
+#if YAZ_POSIX_THREADS
+ pthread_mutex_lock(&p->mutex);
+ --p->readers_reading;
+ while (p->writers_writing || p->readers_reading)
+ pthread_cond_wait(&p->lock_free, &p->mutex);
+ p->writers_writing++;
+ pthread_mutex_unlock(&p->mutex);
+#endif
+ return 0;
+}
+
+int pazpar2_lock_rdwr_runlock(Pazpar2_lock_rdwr *p)
+{
+#if YAZ_POSIX_THREADS
+ pthread_mutex_lock(&p->mutex);
+ if (p->readers_reading == 0)
+ {
+ pthread_mutex_unlock(&p->mutex);
+ return -1;
+ }
+ else
+ {
+ p->readers_reading--;
+ if (p->readers_reading == 0)
+ pthread_cond_signal(&p->lock_free);
+ pthread_mutex_unlock(&p->mutex);
+ }
+#endif
+ return 0;
+}
+
+int pazpar2_lock_rdwr_wunlock(Pazpar2_lock_rdwr *p)
+{
+#if YAZ_POSIX_THREADS
+ pthread_mutex_lock(&p->mutex);
+ if (p->writers_writing == 0)
+ {
+ pthread_mutex_unlock(&p->mutex);
+ return -1;
+ }
+ else
+ {
+ p->writers_writing--;
+ pthread_cond_broadcast(&p->lock_free);
+ pthread_mutex_unlock(&p->mutex);
+ }
+#endif
+ return 0;
+}
+
/*
* Local variables:
* c-basic-offset: 4
*/
#ifndef PAZPAR2_PPMUTEX_H
-#define PAZPAR2_PPMUTEXF_H
+#define PAZPAR2_PPMUTEX_H
#include <yaz/mutex.h>
+#if YAZ_POSIX_THREADS
+#include <pthread.h>
+#endif
+
void pazpar2_mutex_init(void);
void pazpar2_mutex_create(YAZ_MUTEX *p, const char *name);
+typedef struct {
+ int readers_reading;
+ int writers_writing;
+#if YAZ_POSIX_THREADS
+ pthread_mutex_t mutex;
+ pthread_cond_t lock_free;
+#endif
+} Pazpar2_lock_rdwr;
+
+int pazpar2_lock_rdwr_init(Pazpar2_lock_rdwr *p);
+int pazpar2_lock_rdwr_destroy(Pazpar2_lock_rdwr *p);
+int pazpar2_lock_rdwr_rlock(Pazpar2_lock_rdwr *p);
+int pazpar2_lock_rdwr_wlock(Pazpar2_lock_rdwr *p);
+int pazpar2_lock_rdwr_runlock(Pazpar2_lock_rdwr *p);
+int pazpar2_lock_rdwr_wunlock(Pazpar2_lock_rdwr *p);
+int pazpar2_lock_rdwr_upgrade(Pazpar2_lock_rdwr *p);
+
#endif
/*
xmlFree(result);
}
-static void session_enter(struct session *s, const char *caller)
+void session_enter_ro(struct session *s, const char *caller)
{
+ assert(s);
+ if (caller)
+ session_log(s, YLOG_DEBUG, "Session read lock by %s", caller);
+ pazpar2_lock_rdwr_rlock(&s->lock);
+}
+
+void session_enter_rw(struct session *s, const char *caller)
+{
+ assert(s);
if (caller)
- session_log(s, YLOG_DEBUG, "Session lock by %s", caller);
- yaz_mutex_enter(s->session_mutex);
+ session_log(s, YLOG_DEBUG, "Session write lock by %s", caller);
+ pazpar2_lock_rdwr_wlock(&s->lock);
}
-static void session_leave(struct session *s, const char *caller)
+void session_leave_ro(struct session *s, const char *caller)
{
- yaz_mutex_leave(s->session_mutex);
+ assert(s);
if (caller)
- session_log(s, YLOG_DEBUG, "Session unlock by %s", caller);
+ session_log(s, YLOG_DEBUG, "Session read unlock by %s", caller);
+ pazpar2_lock_rdwr_runlock(&s->lock);
+}
+
+void session_leave_rw(struct session *s, const char *caller)
+{
+ assert(s);
+ if (caller)
+ session_log(s, YLOG_DEBUG, "Session write unlock by %s", caller);
+ pazpar2_lock_rdwr_wunlock(&s->lock);
}
static void session_normalize_facet(struct session *s, const char *type,
struct http_channel *chan)
{
int ret;
- session_enter(s, "session_set_watch");
if (s->watchlist[what].fun)
ret = -1;
else
session_watch_cancel);
ret = 0;
}
- session_leave(s, "session_set_watch");
return ret;
}
void session_alert_watch(struct session *s, int what)
{
assert(s);
- session_enter(s, "session_alert_watch");
if (s->watchlist[what].fun)
{
/* our watch is no longer associated with http_channel */
s->watchlist[what].data = 0;
s->watchlist[what].obs = 0;
- session_leave(s, "session_alert_watch");
session_log(s, YLOG_DEBUG,
"Alert Watch: %d calling function: %p", what, fun);
fun(data);
}
- else
- session_leave(s,"session_alert_watch");
}
//callback for grep_databases
{
struct client_list *l;
- session_enter(se, "session_reset_active_clients");
l = se->clients_active;
se->clients_active = new_list;
- session_leave(se, "session_reset_active_clients");
while (l)
{
struct client_list *l_next = l->next;
- client_lock(l->client);
client_set_session(l->client, 0); /* mark client inactive */
- client_unlock(l->client);
xfree(l);
l = l_next;
session_reset_active_clients(se, 0);
- session_enter(se, "session_remove_cached_clients");
l = se->clients_cached;
se->clients_cached = 0;
- session_leave(se, "session_remove_cached_clients");
while (l)
{
struct client_list *l_next = l->next;
- client_lock(l->client);
client_set_session(l->client, 0);
client_set_database(l->client, 0);
- client_unlock(l->client);
client_destroy(l->client);
xfree(l);
l = l_next;
*addinfo = 0;
+ session_enter_rw(se, "session_search");
+
if (se->settings_modified) {
session_remove_cached_clients(se);
}
else
session_reset_active_clients(se, 0);
- session_enter(se, "session_search");
se->settings_modified = 0;
session_clear_set(se, sp);
live_channels = select_targets(se, filter);
if (!live_channels)
{
- session_leave(se, "session_search");
+ session_leave_ro(se, "session_search");
return PAZPAR2_NO_TARGETS;
}
if (!se->facet_limits)
{
*addinfo = "limit";
- session_leave(se, "session_search");
+ session_leave_ro(se, "session_search");
return PAZPAR2_MALFORMED_PARAMETER_VALUE;
}
l0 = se->clients_active;
se->clients_active = 0;
- session_leave(se, "session_search");
+ session_leave_ro(se, "session_search");
for (l = l0; l; l = l->next)
{
void session_destroy(struct session *se)
{
struct session_database *sdb;
+
+ session_enter_rw(se, "session_destroy");
session_log(se, YLOG_DEBUG, "Destroying");
session_use(-1);
session_remove_cached_clients(se);
facet_limits_destroy(se->facet_limits);
nmem_destroy(se->nmem);
service_destroy(se->service);
- yaz_mutex_destroy(&se->session_mutex);
+
+ session_leave_rw(se, "session_destroy");
+ pazpar2_lock_rdwr_destroy(&se->lock);
}
size_t session_get_memory_status(struct session *session) {
size_t session_nmem;
if (session == 0)
return 0;
- session_enter(session, "session_get_memory_status");
+ session_enter_ro(session, "session_get_memory_status");
session_nmem = nmem_total(session->nmem);
- session_leave(session, "session_get_memory_status");
+ session_leave_ro(session, "session_get_memory_status");
return session_nmem;
}
-struct session *new_session(NMEM nmem, struct conf_service *service,
- unsigned session_id)
+struct session *session_create(NMEM nmem, struct conf_service *service,
+ unsigned session_id)
{
int i;
struct session *session = nmem_malloc(nmem, sizeof(*session));
session->watchlist[i].fun = 0;
}
session->normalize_cache = normalize_cache_create();
- session->session_mutex = 0;
- pazpar2_mutex_create(&session->session_mutex, tmp_str);
+
+ pazpar2_lock_rdwr_init(&session->lock);
+
session_use(1);
return session;
}
struct hitsbytarget *get_hitsbytarget(struct session *se, int *count, NMEM nmem)
{
struct hitsbytarget *p;
- session_enter(se, "get_hitsbytarget");
+ session_enter_ro(se, "get_hitsbytarget");
p = hitsbytarget_nb(se, count, nmem);
- session_leave(se, "get_hitsbytarget");
+ session_leave_ro(se, "get_hitsbytarget");
return p;
}
nmem_strsplit(nmem_tmp, ",", name, &names, &num_names);
- session_enter(se, "perform_termlist");
+ session_enter_ro(se, "perform_termlist");
for (j = 0; j < num_names; j++)
{
wrbuf_puts(c->wrbuf, "\"/>\n");
}
}
- session_leave(se, "perform_termlist");
+ session_leave_ro(se, "perform_termlist");
nmem_destroy(nmem_tmp);
}
{
struct record_cluster *r = 0;
- session_enter(se, "show_single_start");
+ session_enter_ro(se, "show_single_start");
*prev_r = 0;
*next_r = 0;
if (se->reclist)
reclist_leave(se->reclist);
}
if (!r)
- session_leave(se, "show_single_start");
+ session_leave_ro(se, "show_single_start");
return r;
}
void show_single_stop(struct session *se, struct record_cluster *rec)
{
- session_leave(se, "show_single_stop");
+ session_leave_ro(se, "show_single_stop");
}
#if USE_TIMING
yaz_timing_t t = yaz_timing_create();
#endif
- session_enter(se, "show_range_start");
*sumhits = 0;
*approx_hits = 0;
*total = 0;
void show_range_stop(struct session *se, struct record_cluster **recs)
{
- session_leave(se, "show_range_stop");
}
void statistics(struct session *se, struct statistics *stat)
xmlFreeDoc(xdoc);
return -1;
}
- session_enter(se, "ingest_record");
- if (client_get_session(cl) == se)
- ret = ingest_to_cluster(cl, xdoc, root, record_no, mergekey_norm);
- session_leave(se, "ingest_record");
+ assert(client_get_session(cl) == se);
+
+ ret = ingest_to_cluster(cl, xdoc, root, record_no, mergekey_norm);
xmlFreeDoc(xdoc);
return ret;
#include "termlists.h"
#include "reclists.h"
#include "http.h"
+#include "ppmutex.h"
struct record;
struct client;
int number_of_warnings_unknown_elements;
int number_of_warnings_unknown_metadata;
normalize_cache_t normalize_cache;
- YAZ_MUTEX session_mutex;
+ Pazpar2_lock_rdwr lock;
unsigned session_id;
int settings_modified;
facet_limits_t facet_limits;
};
struct hitsbytarget *get_hitsbytarget(struct session *s, int *count, NMEM nmem);
-struct session *new_session(NMEM nmem, struct conf_service *service,
- unsigned session_id);
+struct session *session_create(NMEM nmem, struct conf_service *service,
+ unsigned session_id);
void session_destroy(struct session *s);
void session_init_databases(struct session *s);
void statistics(struct session *s, struct statistics *stat);
int session_check_cluster_limit(struct session *se, struct record_cluster *rec);
void perform_termlist(struct http_channel *c, struct session *se, const char *name, int num, int version);
+
+void session_enter_ro(struct session *s, const char *caller);
+void session_leave_ro(struct session *s, const char *caller);
+
+void session_enter_rw(struct session *s, const char *caller);
+void session_leave_rw(struct session *s, const char *caller);
+
void session_log(struct session *s, int level, const char *fmt, ...)
#ifdef __GNUC__
__attribute__ ((format (printf, 3, 4)))