832eb3b9fc03ab7be2045ac6d8a625a68d63720f
[pazpar2-moved-to-github.git] / src / client.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2011 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file client.c
21     \brief Z39.50 client 
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27 #include <stdlib.h>
28 #include <stdio.h>
29 #include <string.h>
30 #if HAVE_SYS_TIME_H
31 #include <sys/time.h>
32 #endif
33 #if HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #include <signal.h>
37 #include <assert.h>
38
39 #include <yaz/marcdisp.h>
40 #include <yaz/comstack.h>
41 #include <yaz/tcpip.h>
42 #include <yaz/proto.h>
43 #include <yaz/readconf.h>
44 #include <yaz/pquery.h>
45 #include <yaz/otherinfo.h>
46 #include <yaz/yaz-util.h>
47 #include <yaz/nmem.h>
48 #include <yaz/query-charset.h>
49 #include <yaz/querytowrbuf.h>
50 #include <yaz/oid_db.h>
51 #include <yaz/diagbib1.h>
52 #include <yaz/snprintf.h>
53 #include <yaz/rpn2cql.h>
54 #include <yaz/rpn2solr.h>
55
56 #define USE_TIMING 0
57 #if USE_TIMING
58 #include <yaz/timing.h>
59 #endif
60
61 #include "ppmutex.h"
62 #include "session.h"
63 #include "parameters.h"
64 #include "client.h"
65 #include "connection.h"
66 #include "settings.h"
67 #include "relevance.h"
68 #include "incref.h"
69
70 static YAZ_MUTEX g_mutex = 0;
71 static int no_clients = 0;
72
73 static int client_use(int delta)
74 {
75     int clients;
76     if (!g_mutex)
77         yaz_mutex_create(&g_mutex);
78     yaz_mutex_enter(g_mutex);
79     no_clients += delta;
80     clients = no_clients;
81     yaz_mutex_leave(g_mutex);
82     yaz_log(YLOG_DEBUG, "%s clients=%d", delta == 0 ? "" : (delta > 0 ? "INC" : "DEC"), clients);
83     return clients;
84 }
85
86 int  clients_count(void) {
87     return client_use(0);
88 }
89
90
91 /** \brief Represents client state for a connection to one search target */
92 struct client {
93     struct session_database *database;
94     struct connection *connection;
95     struct session *session;
96     char *pquery; // Current search
97     char *cqlquery; // used for SRU targets only
98     Odr_int hits;
99     int record_offset;
100     int maxrecs;
101     int startrecs;
102     int diagnostic;
103     int preferred;
104     enum client_state state;
105     struct show_raw *show_raw;
106     ZOOM_resultset resultset;
107     YAZ_MUTEX mutex;
108     int ref_count;
109     /* copy of database->url */
110     char *url;
111 };
112
113 struct show_raw {
114     int active; // whether this request has been sent to the server
115     int position;
116     int binary;
117     char *syntax;
118     char *esn;
119     void (*error_handler)(void *data, const char *addinfo);
120     void (*record_handler)(void *data, const char *buf, size_t sz);
121     void *data;
122     struct show_raw *next;
123 };
124
125 static const char *client_states[] = {
126     "Client_Connecting",
127     "Client_Idle",
128     "Client_Working",
129     "Client_Error",
130     "Client_Failed",
131     "Client_Disconnected"
132 };
133
134 const char *client_get_state_str(struct client *cl)
135 {
136     return client_states[cl->state];
137 }
138
139 enum client_state client_get_state(struct client *cl)
140 {
141     return cl->state;
142 }
143
144 void client_set_state(struct client *cl, enum client_state st)
145 {
146     int was_active = 0;
147     if (client_is_active(cl))
148         was_active = 1;
149     cl->state = st;
150     /* If client is going from being active to inactive and all clients
151        are now idle we fire a watch for the session . The assumption is
152        that session is not mutex locked if client is already active */
153     if (was_active && !client_is_active(cl) && cl->session)
154     {
155
156         int no_active = session_active_clients(cl->session);
157         yaz_log(YLOG_DEBUG, "%s: releasing watches on zero active: %d", client_get_url(cl), no_active);
158         if (no_active == 0) {
159             session_alert_watch(cl->session, SESSION_WATCH_SHOW);
160             session_alert_watch(cl->session, SESSION_WATCH_SHOW_PREF);
161         }
162     }
163 }
164
165 static void client_show_raw_error(struct client *cl, const char *addinfo);
166
167 struct connection *client_get_connection(struct client *cl)
168 {
169     return cl->connection;
170 }
171
172 struct session_database *client_get_database(struct client *cl)
173 {
174     return cl->database;
175 }
176
177 struct session *client_get_session(struct client *cl)
178 {
179     return cl->session;
180 }
181
182 const char *client_get_pquery(struct client *cl)
183 {
184     return cl->pquery;
185 }
186
187 static void client_send_raw_present(struct client *cl);
188 static int nativesyntax_to_type(struct session_database *sdb, char *type,
189                                 ZOOM_record rec);
190
191 static void client_show_immediate(
192     ZOOM_resultset resultset, struct session_database *sdb, int position,
193     void *data,
194     void (*error_handler)(void *data, const char *addinfo),
195     void (*record_handler)(void *data, const char *buf, size_t sz),
196     int binary)
197 {
198     ZOOM_record rec = 0;
199     char type[80];
200     const char *buf;
201     int len;
202
203     if (!resultset)
204     {
205         error_handler(data, "no resultset");
206         return;
207     }
208     rec = ZOOM_resultset_record(resultset, position-1);
209     if (!rec)
210     {
211         error_handler(data, "no record");
212         return;
213     }
214     if (binary)
215         strcpy(type, "raw");
216     else
217         nativesyntax_to_type(sdb, type, rec);
218     buf = ZOOM_record_get(rec, type, &len);
219     if (!buf)
220     {
221         error_handler(data, "no record");
222         return;
223     }
224     record_handler(data, buf, len);
225 }
226
227
228 int client_show_raw_begin(struct client *cl, int position,
229                           const char *syntax, const char *esn,
230                           void *data,
231                           void (*error_handler)(void *data, const char *addinfo),
232                           void (*record_handler)(void *data, const char *buf,
233                                                  size_t sz),
234                           int binary)
235 {
236     if (syntax == 0 && esn == 0)
237         client_show_immediate(cl->resultset, client_get_database(cl),
238                               position, data,
239                               error_handler, record_handler,
240                               binary);
241     else
242     {
243         struct show_raw *rr, **rrp;
244
245         if (!cl->connection)
246             return -1;
247     
248
249         rr = xmalloc(sizeof(*rr));
250         rr->position = position;
251         rr->active = 0;
252         rr->data = data;
253         rr->error_handler = error_handler;
254         rr->record_handler = record_handler;
255         rr->binary = binary;
256         if (syntax)
257             rr->syntax = xstrdup(syntax);
258         else
259             rr->syntax = 0;
260         if (esn)
261             rr->esn = xstrdup(esn);
262         else
263             rr->esn = 0;
264         rr->next = 0;
265         
266         for (rrp = &cl->show_raw; *rrp; rrp = &(*rrp)->next)
267             ;
268         *rrp = rr;
269         
270         if (cl->state == Client_Failed)
271         {
272             client_show_raw_error(cl, "client failed");
273         }
274         else if (cl->state == Client_Disconnected)
275         {
276             client_show_raw_error(cl, "client disconnected");
277         }
278         else
279         {
280             client_send_raw_present(cl);
281         }
282     }
283     return 0;
284 }
285
286 static void client_show_raw_delete(struct show_raw *r)
287 {
288     xfree(r->syntax);
289     xfree(r->esn);
290     xfree(r);
291 }
292
293 void client_show_raw_remove(struct client *cl, void *data)
294 {
295     struct show_raw *rr = data;
296     struct show_raw **rrp = &cl->show_raw;
297     while (*rrp != rr)
298         rrp = &(*rrp)->next;
299     if (*rrp)
300     {
301         *rrp = rr->next;
302         client_show_raw_delete(rr);
303     }
304 }
305
306 void client_show_raw_dequeue(struct client *cl)
307 {
308     struct show_raw *rr = cl->show_raw;
309
310     cl->show_raw = rr->next;
311     client_show_raw_delete(rr);
312 }
313
314 static void client_show_raw_error(struct client *cl, const char *addinfo)
315 {
316     while (cl->show_raw)
317     {
318         cl->show_raw->error_handler(cl->show_raw->data, addinfo);
319         client_show_raw_dequeue(cl);
320     }
321 }
322
323 static void client_send_raw_present(struct client *cl)
324 {
325     struct session_database *sdb = client_get_database(cl);
326     struct connection *co = client_get_connection(cl);
327     ZOOM_resultset set = cl->resultset;
328
329     int offset = cl->show_raw->position;
330     const char *syntax = 0;
331     const char *elements = 0;
332
333     assert(cl->show_raw);
334     assert(set);
335
336     yaz_log(YLOG_DEBUG, "%s: trying to present %d record(s) from %d",
337             client_get_url(cl), 1, offset);
338
339     if (cl->show_raw->syntax)
340         syntax = cl->show_raw->syntax;
341     else
342         syntax = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
343     ZOOM_resultset_option_set(set, "preferredRecordSyntax", syntax);
344
345     if (cl->show_raw->esn)
346         elements = cl->show_raw->esn;
347     else
348         elements = session_setting_oneval(sdb, PZ_ELEMENTS);
349     if (elements && *elements)
350         ZOOM_resultset_option_set(set, "elementSetName", elements);
351
352     ZOOM_resultset_records(set, 0, offset-1, 1);
353     cl->show_raw->active = 1;
354
355     connection_continue(co);
356 }
357
358 static int nativesyntax_to_type(struct session_database *sdb, char *type,
359                                 ZOOM_record rec)
360 {
361     const char *s = session_setting_oneval(sdb, PZ_NATIVESYNTAX);
362
363     if (s && *s)
364     {
365         if (!strncmp(s, "iso2709", 7))
366         {
367             const char *cp = strchr(s, ';');
368             yaz_snprintf(type, 80, "xml; charset=%s", cp ? cp+1 : "marc-8s");
369         }
370         else if (!strncmp(s, "xml", 3))
371         {
372             strcpy(type, "xml");
373         }
374         else if (!strncmp(s, "txml", 4))
375         {
376             const char *cp = strchr(s, ';');
377             yaz_snprintf(type, 80, "txml; charset=%s", cp ? cp+1 : "marc-8s");
378         }
379         else
380             return -1;
381         return 0;
382     }
383     else  /* attempt to deduce structure */
384     {
385         const char *syntax = ZOOM_record_get(rec, "syntax", NULL);
386         if (syntax)
387         {
388             if (!strcmp(syntax, "XML"))
389             {
390                 strcpy(type, "xml");
391                 return 0;
392             }
393             else if (!strcmp(syntax, "TXML"))
394                 {
395                     strcpy(type, "txml");
396                     return 0;
397                 }
398             else if (!strcmp(syntax, "USmarc") || !strcmp(syntax, "MARC21"))
399             {
400                 strcpy(type, "xml; charset=marc8-s");
401                 return 0;
402             }
403             else return -1;
404         }
405         else return -1;
406     }
407 }
408
409 /**
410  * TODO Consider thread safety!!!
411  *
412  */
413 int client_report_facets(struct client *cl, ZOOM_resultset rs) {
414     int facet_idx;
415     ZOOM_facet_field *facets = ZOOM_resultset_facets(rs);
416     int facet_num;
417     struct session *se = client_get_session(cl);
418     facet_num = ZOOM_resultset_facets_size(rs);
419     yaz_log(YLOG_DEBUG, "client_report_facets: %d", facet_num);
420
421     for (facet_idx = 0; facet_idx < facet_num; facet_idx++) {
422         const char *name = ZOOM_facet_field_name(facets[facet_idx]);
423         size_t term_idx;
424         size_t term_num = ZOOM_facet_field_term_count(facets[facet_idx]);
425         for (term_idx = 0; term_idx < term_num; term_idx++ ) {
426             int freq;
427             const char *term = ZOOM_facet_field_get_term(facets[facet_idx], term_idx, &freq);
428             if (term)
429                 add_facet(se, name, term, freq);
430         }
431     }
432
433     return 0;
434 }
435
436 static void ingest_raw_record(struct client *cl, ZOOM_record rec)
437 {
438     const char *buf;
439     int len;
440     char type[80];
441
442     if (cl->show_raw->binary)
443         strcpy(type, "raw");
444     else
445     {
446         struct session_database *sdb = client_get_database(cl);
447         nativesyntax_to_type(sdb, type, rec);
448     }
449
450     buf = ZOOM_record_get(rec, type, &len);
451     cl->show_raw->record_handler(cl->show_raw->data,  buf, len);
452     client_show_raw_dequeue(cl);
453 }
454
455 void client_check_preferred_watch(struct client *cl)
456 {
457     struct session *se = cl->session;
458     yaz_log(YLOG_DEBUG, "client_check_preferred_watch: %s ", client_get_url(cl));
459     if (se)
460     {
461         client_unlock(cl);
462         if (session_is_preferred_clients_ready(se)) {
463             session_alert_watch(se, SESSION_WATCH_SHOW_PREF);
464         }
465         else
466             yaz_log(YLOG_DEBUG, "client_check_preferred_watch: Still locked on preferred targets.");
467
468         client_lock(cl);
469     }
470     else
471         yaz_log(YLOG_WARN, "client_check_preferred_watch: %s. No session!", client_get_url(cl));
472
473 }
474
475 void client_search_response(struct client *cl)
476 {
477     struct connection *co = cl->connection;
478     struct session *se = cl->session;
479     ZOOM_connection link = connection_get_link(co);
480     ZOOM_resultset resultset = cl->resultset;
481
482     const char *error, *addinfo = 0;
483     
484     if (ZOOM_connection_error(link, &error, &addinfo))
485     {
486         cl->hits = 0;
487         client_set_state(cl, Client_Error);
488         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
489                 error, addinfo, client_get_url(cl));
490     }
491     else
492     {
493         yaz_log(YLOG_DEBUG, "client_search_response: hits "
494                 ODR_INT_PRINTF, cl->hits);
495         client_report_facets(cl, resultset);
496         cl->record_offset = cl->startrecs;
497         cl->hits = ZOOM_resultset_size(resultset);
498         if (se) {
499             se->total_hits += cl->hits;
500             yaz_log(YLOG_DEBUG, "client_search_response: total hits "
501                     ODR_INT_PRINTF, se->total_hits);
502         }
503     }
504 }
505
506 void client_got_records(struct client *cl)
507 {
508     struct session *se = cl->session;
509     if (se)
510     {
511         client_unlock(cl);
512         session_alert_watch(se, SESSION_WATCH_SHOW);
513         session_alert_watch(se, SESSION_WATCH_RECORD);
514         client_lock(cl);
515     }
516 }
517
518 void client_record_response(struct client *cl)
519 {
520     struct connection *co = cl->connection;
521     ZOOM_connection link = connection_get_link(co);
522     ZOOM_resultset resultset = cl->resultset;
523     const char *error, *addinfo;
524
525     if (ZOOM_connection_error(link, &error, &addinfo))
526     {
527         client_set_state(cl, Client_Error);
528         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
529             error, addinfo, client_get_url(cl));
530     }
531     else
532     {
533         ZOOM_record rec = 0;
534         const char *msg, *addinfo;
535         
536         if (cl->show_raw && cl->show_raw->active)
537         {
538             if ((rec = ZOOM_resultset_record(resultset,
539                                              cl->show_raw->position-1)))
540             {
541                 cl->show_raw->active = 0;
542                 ingest_raw_record(cl, rec);
543             }
544             else
545             {
546                 yaz_log(YLOG_WARN, "Expected record, but got NULL, offset=%d",
547                         cl->show_raw->position-1);
548             }
549         }
550         else
551         {
552             int offset = cl->record_offset;
553             if ((rec = ZOOM_resultset_record(resultset, offset)))
554             {
555                 cl->record_offset++;
556                 if (cl->session == 0)
557                     ;
558                 else if (ZOOM_record_error(rec, &msg, &addinfo, 0))
559                 {
560                     yaz_log(YLOG_WARN, "Record error %s (%s): %s (rec #%d)",
561                             msg, addinfo, client_get_url(cl),
562                             cl->record_offset);
563                 }
564                 else
565                 {
566                     struct session_database *sdb = client_get_database(cl);
567                     NMEM nmem = nmem_create();
568                     const char *xmlrec;
569                     char type[80];
570
571                     if (nativesyntax_to_type(sdb, type, rec))
572                         yaz_log(YLOG_WARN, "Failed to determine record type");
573                     xmlrec = ZOOM_record_get(rec, type, NULL);
574                     if (!xmlrec)
575                         yaz_log(YLOG_WARN, "ZOOM_record_get failed from %s",
576                                 client_get_url(cl));
577                     else
578                     {
579                         if (ingest_record(cl, xmlrec, cl->record_offset, nmem))
580                             yaz_log(YLOG_WARN, "Failed to ingest from %s",
581                                     client_get_url(cl));
582                     }
583                     nmem_destroy(nmem);
584                 }
585             }
586             else
587             {
588                 yaz_log(YLOG_WARN, "Expected record, but got NULL, offset=%d",
589                         offset);
590             }
591         }
592     }
593 }
594
595 static int client_set_facets_request(struct client *cl, ZOOM_connection link)
596 {
597     struct session_database *sdb = client_get_database(cl);
598     const char *opt_facet_term_sort  = session_setting_oneval(sdb, PZ_TERMLIST_TERM_SORT);
599     const char *opt_facet_term_count = session_setting_oneval(sdb, PZ_TERMLIST_TERM_COUNT);
600
601     /* Future record filtering on target */
602     /* const char *opt_facet_record_filter = session_setting_oneval(sdb, PZ_RECORDFILTER); */
603
604     /* Disable when no count is set */
605     /* TODO Verify: Do we need to reset the  ZOOM facets if a ZOOM Connection is being reused??? */
606     if (opt_facet_term_count && *opt_facet_term_count)
607     {
608         int index = 0;
609         struct session *session = client_get_session(cl);
610         struct conf_service *service = session->service;
611         int num = service->num_metadata;
612         WRBUF wrbuf = wrbuf_alloc();
613         yaz_log(YLOG_DEBUG, "Facet settings, sort: %s count: %s",
614                 opt_facet_term_sort, opt_facet_term_count);
615         for (index = 0; index < num; index++)
616         {
617             struct conf_metadata *conf_meta = &service->metadata[index];
618             if (conf_meta->termlist)
619             {
620                 if (wrbuf_len(wrbuf))
621                     wrbuf_puts(wrbuf, ", ");
622                 wrbuf_printf(wrbuf, "@attr 1=%s", conf_meta->name);
623                 
624                 if (opt_facet_term_sort && *opt_facet_term_sort)
625                     wrbuf_printf(wrbuf, " @attr 2=%s", opt_facet_term_sort);
626                 wrbuf_printf(wrbuf, " @attr 3=%s", opt_facet_term_count);
627             }
628         }
629         if (wrbuf_len(wrbuf))
630         {
631             yaz_log(YLOG_LOG, "Setting ZOOM facets option: %s", wrbuf_cstr(wrbuf));
632             ZOOM_connection_option_set(link, "facets", wrbuf_cstr(wrbuf));
633             return 1;
634         }
635     }
636     return 0;
637 }
638
639 int client_has_facet(struct client *cl, const char *name) {
640     ZOOM_facet_field facet_field;
641     if (!cl || !cl->resultset || !name) {
642         yaz_log(YLOG_DEBUG, "client has facet: Missing %p %p %s", cl, (cl ? cl->resultset: 0), name);
643         return 0;
644     }
645     facet_field = ZOOM_resultset_get_facet_field(cl->resultset, name);
646     if (facet_field) {
647         yaz_log(YLOG_DEBUG, "client: has facets for %s", name);
648         return 1;
649     }
650     yaz_log(YLOG_DEBUG, "client: No facets for %s", name);
651     return 0;
652 }
653
654
655 void client_start_search(struct client *cl)
656 {
657     struct session_database *sdb = client_get_database(cl);
658     struct connection *co = client_get_connection(cl);
659     ZOOM_connection link = connection_get_link(co);
660     ZOOM_resultset rs;
661     char *databaseName = sdb->database->databases[0];
662     const char *opt_piggyback   = session_setting_oneval(sdb, PZ_PIGGYBACK);
663     const char *opt_queryenc    = session_setting_oneval(sdb, PZ_QUERYENCODING);
664     const char *opt_elements    = session_setting_oneval(sdb, PZ_ELEMENTS);
665     const char *opt_requestsyn  = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
666     const char *opt_maxrecs     = session_setting_oneval(sdb, PZ_MAXRECS);
667     const char *opt_sru         = session_setting_oneval(sdb, PZ_SRU);
668     const char *opt_sort        = session_setting_oneval(sdb, PZ_SORT);
669     const char *opt_preferred   = session_setting_oneval(sdb, PZ_PREFERRED);
670     const char *extra_args      = session_setting_oneval(sdb, PZ_EXTRA_ARGS);
671     char maxrecs_str[24], startrecs_str[24];
672
673     assert(link);
674
675     cl->hits = -1;
676     cl->record_offset = 0;
677     cl->diagnostic = 0;
678
679     if (extra_args && *extra_args)
680         ZOOM_connection_option_set(link, "extraArgs", extra_args);
681
682     if (opt_preferred) {
683         cl->preferred = atoi(opt_preferred);
684         if (cl->preferred)
685             yaz_log(YLOG_LOG, "Target %s has preferred status: %d", sdb->database->url, cl->preferred);
686     }
687     client_set_state(cl, Client_Working);
688
689     if (*opt_piggyback)
690         ZOOM_connection_option_set(link, "piggyback", opt_piggyback);
691     else
692         ZOOM_connection_option_set(link, "piggyback", "1");
693     if (*opt_queryenc)
694         ZOOM_connection_option_set(link, "rpnCharset", opt_queryenc);
695     if (*opt_sru && *opt_elements)
696         ZOOM_connection_option_set(link, "schema", opt_elements);
697     else if (*opt_elements)
698         ZOOM_connection_option_set(link, "elementSetName", opt_elements);
699     if (*opt_requestsyn)
700         ZOOM_connection_option_set(link, "preferredRecordSyntax", opt_requestsyn);
701
702     if (opt_maxrecs && *opt_maxrecs)
703     {
704         cl->maxrecs = atoi(opt_maxrecs);
705     }
706
707     /* convert back to string representation used in ZOOM API */
708     sprintf(maxrecs_str, "%d", cl->maxrecs);
709     ZOOM_connection_option_set(link, "count", maxrecs_str);
710
711     if (cl->maxrecs > 20)
712         ZOOM_connection_option_set(link, "presentChunk", "20");
713     else
714         ZOOM_connection_option_set(link, "presentChunk", maxrecs_str);
715
716     sprintf(startrecs_str, "%d", cl->startrecs);
717     ZOOM_connection_option_set(link, "start", startrecs_str);
718
719     if (databaseName)
720         ZOOM_connection_option_set(link, "databaseName", databaseName);
721
722     /* TODO Verify does it break something for CQL targets(non-SOLR) ? */
723     /* facets definition is in PQF */
724     client_set_facets_request(cl, link);
725
726     if (cl->cqlquery)
727     {
728         ZOOM_query q = ZOOM_query_create();
729         yaz_log(YLOG_LOG, "Search %s CQL: %s", sdb->database->url, cl->cqlquery);
730         ZOOM_query_cql(q, cl->cqlquery);
731         if (*opt_sort)
732             ZOOM_query_sortby(q, opt_sort);
733         rs = ZOOM_connection_search(link, q);
734         ZOOM_query_destroy(q);
735     }
736     else
737     {
738         yaz_log(YLOG_LOG, "Search %s PQF: %s", sdb->database->url, cl->pquery);
739         rs = ZOOM_connection_search_pqf(link, cl->pquery);
740     }
741     ZOOM_resultset_destroy(cl->resultset);
742     cl->resultset = rs;
743     connection_continue(co);
744 }
745
746 struct client *client_create(void)
747 {
748     struct client *cl = xmalloc(sizeof(*cl));
749     cl->maxrecs = 100;
750     cl->startrecs = 0;
751     cl->pquery = 0;
752     cl->cqlquery = 0;
753     cl->database = 0;
754     cl->connection = 0;
755     cl->session = 0;
756     cl->hits = 0;
757     cl->record_offset = 0;
758     cl->diagnostic = 0;
759     cl->state = Client_Disconnected;
760     cl->show_raw = 0;
761     cl->resultset = 0;
762     cl->mutex = 0;
763     pazpar2_mutex_create(&cl->mutex, "client");
764     cl->preferred = 0;
765     cl->ref_count = 1;
766     cl->url = 0;
767     client_use(1);
768     
769     return cl;
770 }
771
772 void client_lock(struct client *c)
773 {
774     yaz_mutex_enter(c->mutex);
775 }
776
777 void client_unlock(struct client *c)
778 {
779     yaz_mutex_leave(c->mutex);
780 }
781
782 void client_incref(struct client *c)
783 {
784     pazpar2_incref(&c->ref_count, c->mutex);
785     yaz_log(YLOG_DEBUG, "client_incref c=%p %s cnt=%d",
786             c, client_get_url(c), c->ref_count);
787 }
788
789 int client_destroy(struct client *c)
790 {
791     if (c)
792     {
793         yaz_log(YLOG_DEBUG, "client_destroy c=%p %s cnt=%d",
794                 c, client_get_url(c), c->ref_count);
795         if (!pazpar2_decref(&c->ref_count, c->mutex))
796         {
797             xfree(c->pquery);
798             c->pquery = 0;
799             xfree(c->cqlquery);
800             c->cqlquery = 0;
801             xfree(c->url);
802             assert(!c->connection);
803
804             if (c->resultset)
805             {
806                 ZOOM_resultset_destroy(c->resultset);
807             }
808             yaz_mutex_destroy(&c->mutex);
809             xfree(c);
810             client_use(-1);
811             return 1;
812         }
813     }
814     return 0;
815 }
816
817 void client_set_connection(struct client *cl, struct connection *con)
818 {
819     if (cl->resultset)
820         ZOOM_resultset_release(cl->resultset);
821     if (con)
822     {
823         assert(cl->connection == 0);
824         cl->connection = con;
825         client_incref(cl);
826     }
827     else
828     {
829         cl->connection = con;
830         client_destroy(cl);
831     }
832 }
833
834 void client_disconnect(struct client *cl)
835 {
836     if (cl->state != Client_Idle)
837         client_set_state(cl, Client_Disconnected);
838     client_set_connection(cl, 0);
839 }
840
841 // Extract terms from query into null-terminated termlist
842 static void extract_terms(NMEM nmem, struct ccl_rpn_node *query, char **termlist)
843 {
844     int num = 0;
845
846     pull_terms(nmem, query, termlist, &num);
847     termlist[num] = 0;
848 }
849
850 // Initialize CCL map for a target
851 static CCL_bibset prepare_cclmap(struct client *cl)
852 {
853     struct session_database *sdb = client_get_database(cl);
854     struct setting *s;
855     CCL_bibset res;
856
857     if (!sdb->settings)
858         return 0;
859     res = ccl_qual_mk();
860     for (s = sdb->settings[PZ_CCLMAP]; s; s = s->next)
861     {
862         char *p = strchr(s->name + 3, ':');
863         if (!p)
864         {
865             yaz_log(YLOG_WARN, "Malformed cclmap name: %s", s->name);
866             ccl_qual_rm(&res);
867             return 0;
868         }
869         p++;
870         ccl_qual_fitem(res, s->value, p);
871     }
872     return res;
873 }
874
875 // returns a xmalloced CQL query corresponding to the pquery in client
876 static char *make_cqlquery(struct client *cl)
877 {
878     cql_transform_t cqlt = cql_transform_create();
879     Z_RPNQuery *zquery;
880     char *r;
881     WRBUF wrb = wrbuf_alloc();
882     int status;
883     ODR odr_out = odr_createmem(ODR_ENCODE);
884
885     zquery = p_query_rpn(odr_out, cl->pquery);
886     yaz_log(YLOG_LOG, "PQF: %s", cl->pquery);
887     if ((status = cql_transform_rpn2cql_wrbuf(cqlt, wrb, zquery)))
888     {
889         yaz_log(YLOG_WARN, "Failed to generate CQL query, code=%d", status);
890         r = 0;
891     }
892     else
893     {
894         r = xstrdup(wrbuf_cstr(wrb));
895     }     
896     wrbuf_destroy(wrb);
897     odr_destroy(odr_out);
898     cql_transform_close(cqlt);
899     return r;
900 }
901
902 // returns a xmalloced SOLR query corresponding to the pquery in client
903 // TODO Could prob. be merge with the similar make_cqlquery
904 static char *make_solrquery(struct client *cl)
905 {
906     solr_transform_t sqlt = solr_transform_create();
907     Z_RPNQuery *zquery;
908     char *r;
909     WRBUF wrb = wrbuf_alloc();
910     int status;
911     ODR odr_out = odr_createmem(ODR_ENCODE);
912
913     zquery = p_query_rpn(odr_out, cl->pquery);
914     yaz_log(YLOG_LOG, "PQF: %s", cl->pquery);
915     if ((status = solr_transform_rpn2solr_wrbuf(sqlt, wrb, zquery)))
916     {
917         yaz_log(YLOG_WARN, "Failed to generate SOLR query, code=%d", status);
918         r = 0;
919     }
920     else
921     {
922         r = xstrdup(wrbuf_cstr(wrb));
923     }
924     wrbuf_destroy(wrb);
925     odr_destroy(odr_out);
926     solr_transform_close(sqlt);
927     return r;
928 }
929
930 // Parse the query given the settings specific to this client
931 int client_parse_query(struct client *cl, const char *query)
932 {
933     struct session *se = client_get_session(cl);
934     struct session_database *sdb = client_get_database(cl);
935     struct ccl_rpn_node *cn;
936     int cerror, cpos;
937     CCL_bibset ccl_map = prepare_cclmap(cl);
938     const char *sru = session_setting_oneval(sdb, PZ_SRU);
939     const char *pqf_prefix = session_setting_oneval(sdb, PZ_PQF_PREFIX);
940     const char *pqf_strftime = session_setting_oneval(sdb, PZ_PQF_STRFTIME);
941     const char *query_syntax = session_setting_oneval(sdb, PZ_QUERY_SYNTAX);
942     if (!ccl_map)
943         return -1;
944
945     cn = ccl_find_str(ccl_map, query, &cerror, &cpos);
946     ccl_qual_rm(&ccl_map);
947     if (!cn)
948     {
949         client_set_state(cl, Client_Error);
950         session_log(se, YLOG_WARN, "Failed to parse CCL query '%s' for %s",
951                 query,
952                 client_get_database(cl)->database->url);
953         return -1;
954     }
955     wrbuf_rewind(se->wrbuf);
956     if (*pqf_prefix)
957     {
958         wrbuf_puts(se->wrbuf, pqf_prefix);
959         wrbuf_puts(se->wrbuf, " ");
960     }
961     if (!pqf_strftime || !*pqf_strftime)
962         ccl_pquery(se->wrbuf, cn);
963     else
964     {
965         time_t cur_time = time(0);
966         struct tm *tm =  localtime(&cur_time);
967         char tmp_str[300];
968         const char *cp = tmp_str;
969
970         /* see man strftime(3) for things .. In particular %% gets converted
971          to %.. And That's our original query .. */
972         strftime(tmp_str, sizeof(tmp_str)-1, pqf_strftime, tm);
973         for (; *cp; cp++)
974         {
975             if (cp[0] == '%')
976                 ccl_pquery(se->wrbuf, cn);
977             else
978                 wrbuf_putc(se->wrbuf, cp[0]);
979         }
980     }
981     xfree(cl->pquery);
982     cl->pquery = xstrdup(wrbuf_cstr(se->wrbuf));
983
984     xfree(cl->cqlquery);
985
986     /* Support for PQF on SRU targets.
987      * TODO Refactor */
988     yaz_log(YLOG_DEBUG, "Query syntax: %s", query_syntax);
989     if (strcmp(query_syntax, "pqf") != 0 && *sru)
990     {
991         if (!strcmp(sru, "solr")) {
992             if (!(cl->cqlquery = make_solrquery(cl)))
993                 return -1;
994         }
995         else {
996             if (!(cl->cqlquery = make_cqlquery(cl)))
997                 return -1;
998         }
999     }
1000     else
1001         cl->cqlquery = 0;
1002
1003     /* TODO FIX Not thread safe */
1004     if (!se->relevance)
1005     {
1006         // Initialize relevance structure with query terms
1007         char *p[512];
1008         extract_terms(se->nmem, cn, p);
1009         se->relevance = relevance_create(
1010             se->service->relevance_pct,
1011             se->nmem, (const char **) p);
1012     }
1013
1014     ccl_rpn_delete(cn);
1015     return 0;
1016 }
1017
1018 void client_set_session(struct client *cl, struct session *se)
1019 {
1020     cl->session = se;
1021 }
1022
1023 int client_is_active(struct client *cl)
1024 {
1025     if (cl->connection && (cl->state == Client_Connecting ||
1026                            cl->state == Client_Working))
1027         return 1;
1028     return 0;
1029 }
1030
1031 int client_is_active_preferred(struct client *cl)
1032 {
1033     /* only count if this is a preferred target. */
1034     if (!cl->preferred)
1035         return 0;
1036     /* TODO No sure this the condition that Seb wants */
1037     if (cl->connection && (cl->state == Client_Connecting ||
1038                            cl->state == Client_Working))
1039         return 1;
1040     return 0;
1041 }
1042
1043
1044 Odr_int client_get_hits(struct client *cl)
1045 {
1046     return cl->hits;
1047 }
1048
1049 int client_get_num_records(struct client *cl)
1050 {
1051     return cl->record_offset;
1052 }
1053
1054 void client_set_diagnostic(struct client *cl, int diagnostic)
1055 {
1056     cl->diagnostic = diagnostic;
1057 }
1058
1059 int client_get_diagnostic(struct client *cl)
1060 {
1061     return cl->diagnostic;
1062 }
1063
1064 void client_set_database(struct client *cl, struct session_database *db)
1065 {
1066     cl->database = db;
1067     /* Copy the URL for safe logging even after session is gone */
1068     if (db) {
1069         cl->url = xstrdup(db->database->url);
1070     }
1071 }
1072
1073 struct host *client_get_host(struct client *cl)
1074 {
1075     return client_get_database(cl)->database->host;
1076 }
1077
1078 const char *client_get_url(struct client *cl)
1079 {
1080     if (cl->url)
1081         return cl->url;
1082     else
1083         /* This must not happen anymore, as the url is present until destruction of client  */
1084         return "NOURL";
1085 }
1086
1087 void client_set_maxrecs(struct client *cl, int v)
1088 {
1089     cl->maxrecs = v;
1090 }
1091
1092 int client_get_maxrecs(struct client *cl)
1093 {
1094     return cl->maxrecs;
1095 }
1096
1097 void client_set_startrecs(struct client *cl, int v)
1098 {
1099     cl->startrecs = v;
1100 }
1101
1102 void client_set_preferred(struct client *cl, int v)
1103 {
1104     cl->preferred = v;
1105 }
1106
1107
1108 /*
1109  * Local variables:
1110  * c-basic-offset: 4
1111  * c-file-style: "Stroustrup"
1112  * indent-tabs-mode: nil
1113  * End:
1114  * vim: shiftwidth=4 tabstop=8 expandtab
1115  */
1116