c910c55287647949d63b680fa2c27505359dbbae
[pazpar2-moved-to-github.git] / src / client.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2011 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file client.c
21     \brief Z39.50 client 
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27 #include <stdlib.h>
28 #include <stdio.h>
29 #include <string.h>
30 #if HAVE_SYS_TIME_H
31 #include <sys/time.h>
32 #endif
33 #if HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #include <signal.h>
37 #include <assert.h>
38
39 #include <yaz/marcdisp.h>
40 #include <yaz/comstack.h>
41 #include <yaz/tcpip.h>
42 #include <yaz/proto.h>
43 #include <yaz/readconf.h>
44 #include <yaz/pquery.h>
45 #include <yaz/otherinfo.h>
46 #include <yaz/yaz-util.h>
47 #include <yaz/nmem.h>
48 #include <yaz/query-charset.h>
49 #include <yaz/querytowrbuf.h>
50 #include <yaz/oid_db.h>
51 #include <yaz/diagbib1.h>
52 #include <yaz/snprintf.h>
53 #include <yaz/rpn2cql.h>
54 #include <yaz/rpn2solr.h>
55
56 #define USE_TIMING 0
57 #if USE_TIMING
58 #include <yaz/timing.h>
59 #endif
60
61 #include "ppmutex.h"
62 #include "session.h"
63 #include "parameters.h"
64 #include "client.h"
65 #include "connection.h"
66 #include "settings.h"
67 #include "relevance.h"
68 #include "incref.h"
69
70 static YAZ_MUTEX g_mutex = 0;
71 static int no_clients = 0;
72 static int no_clients_total = 0;
73
74 static int client_use(int delta)
75 {
76     int clients;
77     if (!g_mutex)
78         yaz_mutex_create(&g_mutex);
79     yaz_mutex_enter(g_mutex);
80     no_clients += delta;
81     if (delta > 0)
82         no_clients_total += delta;
83     clients = no_clients;
84     yaz_mutex_leave(g_mutex);
85     yaz_log(YLOG_DEBUG, "%s clients=%d", delta == 0 ? "" : (delta > 0 ? "INC" : "DEC"), clients);
86     return clients;
87 }
88
89 int  clients_count(void) {
90     return client_use(0);
91 }
92
93 int  clients_count_total(void) {
94     int total = 0;
95     if (!g_mutex)
96         return 0;
97     yaz_mutex_enter(g_mutex);
98     total = no_clients_total;
99     yaz_mutex_leave(g_mutex);
100     return total;
101 }
102
103
104 /** \brief Represents client state for a connection to one search target */
105 struct client {
106     struct session_database *database;
107     struct connection *connection;
108     struct session *session;
109     char *pquery; // Current search
110     char *cqlquery; // used for SRU targets only
111     Odr_int hits;
112     int record_offset;
113     int maxrecs;
114     int startrecs;
115     int diagnostic;
116     int preferred;
117     struct suggestions *suggestions;
118     enum client_state state;
119     struct show_raw *show_raw;
120     ZOOM_resultset resultset;
121     YAZ_MUTEX mutex;
122     int ref_count;
123     char *id;
124 };
125
126 struct suggestions {
127     NMEM nmem;
128     int num;
129     char **misspelled;
130     char **suggest;
131 };
132
133 struct show_raw {
134     int active; // whether this request has been sent to the server
135     int position;
136     int binary;
137     char *syntax;
138     char *esn;
139     void (*error_handler)(void *data, const char *addinfo);
140     void (*record_handler)(void *data, const char *buf, size_t sz);
141     void *data;
142     struct show_raw *next;
143 };
144
145 static const char *client_states[] = {
146     "Client_Connecting",
147     "Client_Idle",
148     "Client_Working",
149     "Client_Error",
150     "Client_Failed",
151     "Client_Disconnected"
152 };
153
154 const char *client_get_state_str(struct client *cl)
155 {
156     return client_states[cl->state];
157 }
158
159 enum client_state client_get_state(struct client *cl)
160 {
161     return cl->state;
162 }
163
164 void client_set_state(struct client *cl, enum client_state st)
165 {
166     int was_active = 0;
167     if (client_is_active(cl))
168         was_active = 1;
169     cl->state = st;
170     /* If client is going from being active to inactive and all clients
171        are now idle we fire a watch for the session . The assumption is
172        that session is not mutex locked if client is already active */
173     if (was_active && !client_is_active(cl) && cl->session)
174     {
175
176         int no_active = session_active_clients(cl->session);
177         yaz_log(YLOG_DEBUG, "%s: releasing watches on zero active: %d",
178                 client_get_id(cl), no_active);
179         if (no_active == 0) {
180             session_alert_watch(cl->session, SESSION_WATCH_SHOW);
181             session_alert_watch(cl->session, SESSION_WATCH_SHOW_PREF);
182         }
183     }
184 }
185
186 static void client_show_raw_error(struct client *cl, const char *addinfo);
187
188 struct connection *client_get_connection(struct client *cl)
189 {
190     return cl->connection;
191 }
192
193 struct session_database *client_get_database(struct client *cl)
194 {
195     return cl->database;
196 }
197
198 struct session *client_get_session(struct client *cl)
199 {
200     return cl->session;
201 }
202
203 const char *client_get_pquery(struct client *cl)
204 {
205     return cl->pquery;
206 }
207
208 static void client_send_raw_present(struct client *cl);
209 static int nativesyntax_to_type(struct session_database *sdb, char *type,
210                                 ZOOM_record rec);
211
212 static void client_show_immediate(
213     ZOOM_resultset resultset, struct session_database *sdb, int position,
214     void *data,
215     void (*error_handler)(void *data, const char *addinfo),
216     void (*record_handler)(void *data, const char *buf, size_t sz),
217     int binary)
218 {
219     ZOOM_record rec = 0;
220     char type[80];
221     const char *buf;
222     int len;
223
224     if (!resultset)
225     {
226         error_handler(data, "no resultset");
227         return;
228     }
229     rec = ZOOM_resultset_record(resultset, position-1);
230     if (!rec)
231     {
232         error_handler(data, "no record");
233         return;
234     }
235     if (binary)
236         strcpy(type, "raw");
237     else
238         nativesyntax_to_type(sdb, type, rec);
239     buf = ZOOM_record_get(rec, type, &len);
240     if (!buf)
241     {
242         error_handler(data, "no record");
243         return;
244     }
245     record_handler(data, buf, len);
246 }
247
248
249 int client_show_raw_begin(struct client *cl, int position,
250                           const char *syntax, const char *esn,
251                           void *data,
252                           void (*error_handler)(void *data, const char *addinfo),
253                           void (*record_handler)(void *data, const char *buf,
254                                                  size_t sz),
255                           int binary)
256 {
257     if (syntax == 0 && esn == 0)
258         client_show_immediate(cl->resultset, client_get_database(cl),
259                               position, data,
260                               error_handler, record_handler,
261                               binary);
262     else
263     {
264         struct show_raw *rr, **rrp;
265
266         if (!cl->connection)
267             return -1;
268     
269
270         rr = xmalloc(sizeof(*rr));
271         rr->position = position;
272         rr->active = 0;
273         rr->data = data;
274         rr->error_handler = error_handler;
275         rr->record_handler = record_handler;
276         rr->binary = binary;
277         if (syntax)
278             rr->syntax = xstrdup(syntax);
279         else
280             rr->syntax = 0;
281         if (esn)
282             rr->esn = xstrdup(esn);
283         else
284             rr->esn = 0;
285         rr->next = 0;
286         
287         for (rrp = &cl->show_raw; *rrp; rrp = &(*rrp)->next)
288             ;
289         *rrp = rr;
290         
291         if (cl->state == Client_Failed)
292         {
293             client_show_raw_error(cl, "client failed");
294         }
295         else if (cl->state == Client_Disconnected)
296         {
297             client_show_raw_error(cl, "client disconnected");
298         }
299         else
300         {
301             client_send_raw_present(cl);
302         }
303     }
304     return 0;
305 }
306
307 static void client_show_raw_delete(struct show_raw *r)
308 {
309     xfree(r->syntax);
310     xfree(r->esn);
311     xfree(r);
312 }
313
314 void client_show_raw_remove(struct client *cl, void *data)
315 {
316     struct show_raw *rr = data;
317     struct show_raw **rrp = &cl->show_raw;
318     while (*rrp != rr)
319         rrp = &(*rrp)->next;
320     if (*rrp)
321     {
322         *rrp = rr->next;
323         client_show_raw_delete(rr);
324     }
325 }
326
327 void client_show_raw_dequeue(struct client *cl)
328 {
329     struct show_raw *rr = cl->show_raw;
330
331     cl->show_raw = rr->next;
332     client_show_raw_delete(rr);
333 }
334
335 static void client_show_raw_error(struct client *cl, const char *addinfo)
336 {
337     while (cl->show_raw)
338     {
339         cl->show_raw->error_handler(cl->show_raw->data, addinfo);
340         client_show_raw_dequeue(cl);
341     }
342 }
343
344 static void client_send_raw_present(struct client *cl)
345 {
346     struct session_database *sdb = client_get_database(cl);
347     struct connection *co = client_get_connection(cl);
348     ZOOM_resultset set = cl->resultset;
349
350     int offset = cl->show_raw->position;
351     const char *syntax = 0;
352     const char *elements = 0;
353
354     assert(cl->show_raw);
355     assert(set);
356
357     yaz_log(YLOG_DEBUG, "%s: trying to present %d record(s) from %d",
358             client_get_id(cl), 1, offset);
359
360     if (cl->show_raw->syntax)
361         syntax = cl->show_raw->syntax;
362     else
363         syntax = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
364     ZOOM_resultset_option_set(set, "preferredRecordSyntax", syntax);
365
366     if (cl->show_raw->esn)
367         elements = cl->show_raw->esn;
368     else
369         elements = session_setting_oneval(sdb, PZ_ELEMENTS);
370     if (elements && *elements)
371         ZOOM_resultset_option_set(set, "elementSetName", elements);
372
373     ZOOM_resultset_records(set, 0, offset-1, 1);
374     cl->show_raw->active = 1;
375
376     connection_continue(co);
377 }
378
379 static int nativesyntax_to_type(struct session_database *sdb, char *type,
380                                 ZOOM_record rec)
381 {
382     const char *s = session_setting_oneval(sdb, PZ_NATIVESYNTAX);
383
384     if (s && *s)
385     {
386         if (!strncmp(s, "iso2709", 7))
387         {
388             const char *cp = strchr(s, ';');
389             yaz_snprintf(type, 80, "xml; charset=%s", cp ? cp+1 : "marc-8s");
390         }
391         else if (!strncmp(s, "xml", 3))
392         {
393             strcpy(type, "xml");
394         }
395         else if (!strncmp(s, "txml", 4))
396         {
397             const char *cp = strchr(s, ';');
398             yaz_snprintf(type, 80, "txml; charset=%s", cp ? cp+1 : "marc-8s");
399         }
400         else
401             return -1;
402         return 0;
403     }
404     else  /* attempt to deduce structure */
405     {
406         const char *syntax = ZOOM_record_get(rec, "syntax", NULL);
407         if (syntax)
408         {
409             if (!strcmp(syntax, "XML"))
410             {
411                 strcpy(type, "xml");
412                 return 0;
413             }
414             else if (!strcmp(syntax, "USmarc") || !strcmp(syntax, "MARC21"))
415             {
416                 strcpy(type, "xml; charset=marc8-s");
417                 return 0;
418             }
419             else return -1;
420         }
421         else return -1;
422     }
423 }
424
425 /**
426  * TODO Consider thread safety!!!
427  *
428  */
429 void client_report_facets(struct client *cl, ZOOM_resultset rs)
430 {
431     struct session_database *sdb = client_get_database(cl);
432     ZOOM_facet_field *facets = ZOOM_resultset_facets(rs);
433
434     if (sdb && facets)
435     {
436         struct session *se = client_get_session(cl);
437         int facet_num = ZOOM_resultset_facets_size(rs);
438         struct setting *s;
439
440         for (s = sdb->settings[PZ_FACETMAP]; s; s = s->next)
441         {
442             const char *p = strchr(s->name + 3, ':');
443             if (p && p[1] && s->value && s->value[0])
444             {
445                 int facet_idx;
446                 p++; /* p now holds logical facet name */
447                 for (facet_idx = 0; facet_idx < facet_num; facet_idx++)
448                 {
449                     const char *native_name =
450                         ZOOM_facet_field_name(facets[facet_idx]);
451                     if (native_name && !strcmp(s->value, native_name))
452                     {
453                         size_t term_idx;
454                         size_t term_num =
455                             ZOOM_facet_field_term_count(facets[facet_idx]);
456                         for (term_idx = 0; term_idx < term_num; term_idx++ )
457                         {
458                             int freq;
459                             const char *term =
460                                 ZOOM_facet_field_get_term(facets[facet_idx],
461                                                           term_idx, &freq);
462                             if (term)
463                                 add_facet(se, p, term, freq);
464                         }
465                         break;
466                     }
467                 }
468             }
469         }
470     }
471 }
472
473 static void ingest_raw_record(struct client *cl, ZOOM_record rec)
474 {
475     const char *buf;
476     int len;
477     char type[80];
478
479     if (cl->show_raw->binary)
480         strcpy(type, "raw");
481     else
482     {
483         struct session_database *sdb = client_get_database(cl);
484         nativesyntax_to_type(sdb, type, rec);
485     }
486
487     buf = ZOOM_record_get(rec, type, &len);
488     cl->show_raw->record_handler(cl->show_raw->data,  buf, len);
489     client_show_raw_dequeue(cl);
490 }
491
492 void client_check_preferred_watch(struct client *cl)
493 {
494     struct session *se = cl->session;
495     yaz_log(YLOG_DEBUG, "client_check_preferred_watch: %s ", client_get_id(cl));
496     if (se)
497     {
498         client_unlock(cl);
499         /* TODO possible threading issue. Session can have been destroyed */
500         if (session_is_preferred_clients_ready(se)) {
501             session_alert_watch(se, SESSION_WATCH_SHOW_PREF);
502         }
503         else
504             yaz_log(YLOG_DEBUG, "client_check_preferred_watch: Still locked on preferred targets.");
505
506         client_lock(cl);
507     }
508     else
509         yaz_log(YLOG_WARN, "client_check_preferred_watch: %s. No session!", client_get_id(cl));
510
511 }
512
513 struct suggestions* client_suggestions_create(const char* suggestions_string);
514 static void client_suggestions_destroy(struct client *cl);
515
516 void client_search_response(struct client *cl)
517 {
518     struct connection *co = cl->connection;
519     ZOOM_connection link = connection_get_link(co);
520     ZOOM_resultset resultset = cl->resultset;
521
522     const char *error, *addinfo = 0;
523     
524     if (ZOOM_connection_error(link, &error, &addinfo))
525     {
526         cl->hits = 0;
527         client_set_state(cl, Client_Error);
528         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
529                 error, addinfo, client_get_id(cl));
530     }
531     else
532     {
533         yaz_log(YLOG_DEBUG, "client_search_response: hits "
534                 ODR_INT_PRINTF, cl->hits);
535         client_report_facets(cl, resultset);
536         cl->record_offset = cl->startrecs;
537         cl->hits = ZOOM_resultset_size(resultset);
538         if (cl->suggestions)
539             client_suggestions_destroy(cl);
540         cl->suggestions = client_suggestions_create(ZOOM_resultset_option_get(resultset, "suggestions"));
541     }
542 }
543
544 void client_got_records(struct client *cl)
545 {
546     struct session *se = cl->session;
547     if (se)
548     {
549         client_unlock(cl);
550         session_alert_watch(se, SESSION_WATCH_SHOW);
551         session_alert_watch(se, SESSION_WATCH_RECORD);
552         client_lock(cl);
553     }
554 }
555
556 void client_record_response(struct client *cl)
557 {
558     struct connection *co = cl->connection;
559     ZOOM_connection link = connection_get_link(co);
560     ZOOM_resultset resultset = cl->resultset;
561     const char *error, *addinfo;
562
563     if (ZOOM_connection_error(link, &error, &addinfo))
564     {
565         client_set_state(cl, Client_Error);
566         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
567             error, addinfo, client_get_id(cl));
568     }
569     else
570     {
571         ZOOM_record rec = 0;
572         const char *msg, *addinfo;
573         
574         if (cl->show_raw && cl->show_raw->active)
575         {
576             if ((rec = ZOOM_resultset_record(resultset,
577                                              cl->show_raw->position-1)))
578             {
579                 cl->show_raw->active = 0;
580                 ingest_raw_record(cl, rec);
581             }
582             else
583             {
584                 yaz_log(YLOG_WARN, "Expected record, but got NULL, offset=%d",
585                         cl->show_raw->position-1);
586             }
587         }
588         else
589         {
590             int offset = cl->record_offset;
591             if ((rec = ZOOM_resultset_record(resultset, offset)))
592             {
593                 cl->record_offset++;
594                 if (cl->session == 0)
595                     ;
596                 else if (ZOOM_record_error(rec, &msg, &addinfo, 0))
597                 {
598                     yaz_log(YLOG_WARN, "Record error %s (%s): %s (rec #%d)",
599                             msg, addinfo, client_get_id(cl),
600                             cl->record_offset);
601                 }
602                 else
603                 {
604                     struct session_database *sdb = client_get_database(cl);
605                     NMEM nmem = nmem_create();
606                     const char *xmlrec;
607                     char type[80];
608
609                     if (nativesyntax_to_type(sdb, type, rec))
610                         yaz_log(YLOG_WARN, "Failed to determine record type");
611                     xmlrec = ZOOM_record_get(rec, type, NULL);
612                     if (!xmlrec)
613                         yaz_log(YLOG_WARN, "ZOOM_record_get failed from %s",
614                                 client_get_id(cl));
615                     else
616                     {
617                         /* OK = 0, -1 = failure, -2 = Filtered */
618                         if (ingest_record(cl, xmlrec, cl->record_offset, nmem) == -1)
619                             yaz_log(YLOG_WARN, "Failed to ingest from %s", client_get_id(cl));
620                     }
621                     nmem_destroy(nmem);
622                 }
623             }
624             else
625             {
626                 yaz_log(YLOG_WARN, "Expected record, but got NULL, offset=%d",
627                         offset);
628             }
629         }
630     }
631 }
632
633 static void client_set_facets_request(struct client *cl, ZOOM_connection link)
634 {
635     struct session_database *sdb = client_get_database(cl);
636
637     WRBUF w = wrbuf_alloc();
638     
639     struct setting *s;
640
641     for (s = sdb->settings[PZ_FACETMAP]; s; s = s->next)
642     {
643         const char *p = strchr(s->name + 3, ':');
644         if (!p)
645         {
646             yaz_log(YLOG_WARN, "Malformed facetmap name: %s", s->name);
647         }
648         else if (s->value && s->value[0])
649         {
650             wrbuf_puts(w, "@attr 1=");
651             yaz_encode_pqf_term(w, s->value, strlen(s->value));
652             if (s->next)
653                 wrbuf_puts(w, ",");
654         }
655     }
656     yaz_log(YLOG_LOG, "using facets str: %s", wrbuf_cstr(w));
657     ZOOM_connection_option_set(link, "facets",
658                                wrbuf_len(w) ? wrbuf_cstr(w) : 0);
659     wrbuf_destroy(w);
660 }
661
662 int client_has_facet(struct client *cl, const char *name)
663 {
664     struct session_database *sdb = client_get_database(cl);
665     struct setting *s;
666
667     for (s = sdb->settings[PZ_FACETMAP]; s; s = s->next)
668     {
669         const char *p = strchr(s->name + 3, ':');
670         if (p && !strcmp(name, p + 1))
671             return 1;
672     }
673     return 0;
674 }
675
676 void client_start_search(struct client *cl, const char *sort_strategy_and_spec,
677                          int increasing)
678 {
679     struct session_database *sdb = client_get_database(cl);
680     struct connection *co = client_get_connection(cl);
681     ZOOM_connection link = connection_get_link(co);
682     ZOOM_resultset rs;
683     const char *opt_piggyback   = session_setting_oneval(sdb, PZ_PIGGYBACK);
684     const char *opt_queryenc    = session_setting_oneval(sdb, PZ_QUERYENCODING);
685     const char *opt_elements    = session_setting_oneval(sdb, PZ_ELEMENTS);
686     const char *opt_requestsyn  = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
687     const char *opt_maxrecs     = session_setting_oneval(sdb, PZ_MAXRECS);
688     const char *opt_sru         = session_setting_oneval(sdb, PZ_SRU);
689     const char *opt_sort        = session_setting_oneval(sdb, PZ_SORT);
690     const char *opt_preferred   = session_setting_oneval(sdb, PZ_PREFERRED);
691     const char *extra_args      = session_setting_oneval(sdb, PZ_EXTRA_ARGS);
692     char maxrecs_str[24], startrecs_str[24];
693     ZOOM_query q;
694
695     assert(link);
696
697     cl->record_offset = 0;
698     cl->diagnostic = 0;
699
700     if (extra_args && *extra_args)
701         ZOOM_connection_option_set(link, "extraArgs", extra_args);
702
703     if (opt_preferred) {
704         cl->preferred = atoi(opt_preferred);
705         if (cl->preferred)
706             yaz_log(YLOG_LOG, "Target %s has preferred status: %d",
707                     client_get_id(cl), cl->preferred);
708     }
709     client_set_state(cl, Client_Working);
710
711     if (*opt_piggyback)
712         ZOOM_connection_option_set(link, "piggyback", opt_piggyback);
713     else
714         ZOOM_connection_option_set(link, "piggyback", "1");
715     if (*opt_queryenc)
716         ZOOM_connection_option_set(link, "rpnCharset", opt_queryenc);
717     if (*opt_sru && *opt_elements)
718         ZOOM_connection_option_set(link, "schema", opt_elements);
719     else if (*opt_elements)
720         ZOOM_connection_option_set(link, "elementSetName", opt_elements);
721     if (*opt_requestsyn)
722         ZOOM_connection_option_set(link, "preferredRecordSyntax", opt_requestsyn);
723
724     if (opt_maxrecs && *opt_maxrecs)
725     {
726         cl->maxrecs = atoi(opt_maxrecs);
727     }
728
729     /* convert back to string representation used in ZOOM API */
730     sprintf(maxrecs_str, "%d", cl->maxrecs);
731     ZOOM_connection_option_set(link, "count", maxrecs_str);
732
733     if (cl->maxrecs > 20)
734         ZOOM_connection_option_set(link, "presentChunk", "20");
735     else
736         ZOOM_connection_option_set(link, "presentChunk", maxrecs_str);
737
738     sprintf(startrecs_str, "%d", cl->startrecs);
739     ZOOM_connection_option_set(link, "start", startrecs_str);
740
741     /* TODO Verify does it break something for CQL targets(non-SOLR) ? */
742     /* facets definition is in PQF */
743     client_set_facets_request(cl, link);
744
745     q = ZOOM_query_create();
746     if (cl->cqlquery)
747     {
748         yaz_log(YLOG_LOG, "Search %s CQL: %s", client_get_id(cl),
749                 cl->cqlquery);
750         ZOOM_query_cql(q, cl->cqlquery);
751         if (*opt_sort)
752             ZOOM_query_sortby(q, opt_sort);
753     }
754     else
755     {
756         yaz_log(YLOG_LOG, "Search %s PQF: %s", client_get_id(cl), cl->pquery);
757         
758         ZOOM_query_prefix(q, cl->pquery);
759     }
760     if (sort_strategy_and_spec &&
761         strlen(sort_strategy_and_spec) < 40 /* spec below */)
762     {
763         char spec[50], *p;
764         strcpy(spec, sort_strategy_and_spec);
765         p = strchr(spec, ':');
766         if (p)
767         {
768             *p++ = '\0'; /* cut the string in two */
769             while (*p == ' ')
770                 p++;
771             if (increasing)
772                 strcat(p, " <");
773             else
774                 strcat(p, " >");
775             yaz_log(YLOG_LOG, "applying %s %s", spec, p);
776             ZOOM_query_sortby2(q, spec, p);
777         }
778     }
779     rs = ZOOM_connection_search(link, q);
780     ZOOM_query_destroy(q);
781     ZOOM_resultset_destroy(cl->resultset);
782     cl->resultset = rs;
783     connection_continue(co);
784 }
785
786 struct client *client_create(const char *id)
787 {
788     struct client *cl = xmalloc(sizeof(*cl));
789     cl->maxrecs = 100;
790     cl->startrecs = 0;
791     cl->pquery = 0;
792     cl->cqlquery = 0;
793     cl->database = 0;
794     cl->connection = 0;
795     cl->session = 0;
796     cl->hits = 0;
797     cl->record_offset = 0;
798     cl->diagnostic = 0;
799     cl->state = Client_Disconnected;
800     cl->show_raw = 0;
801     cl->resultset = 0;
802     cl->suggestions = 0;
803     cl->mutex = 0;
804     pazpar2_mutex_create(&cl->mutex, "client");
805     cl->preferred = 0;
806     cl->ref_count = 1;
807     assert(id);
808     cl->id = xstrdup(id);
809     client_use(1);
810     
811     return cl;
812 }
813
814 void client_lock(struct client *c)
815 {
816     yaz_mutex_enter(c->mutex);
817 }
818
819 void client_unlock(struct client *c)
820 {
821     yaz_mutex_leave(c->mutex);
822 }
823
824 void client_incref(struct client *c)
825 {
826     pazpar2_incref(&c->ref_count, c->mutex);
827     yaz_log(YLOG_DEBUG, "client_incref c=%p %s cnt=%d",
828             c, client_get_id(c), c->ref_count);
829 }
830
831 int client_destroy(struct client *c)
832 {
833     if (c)
834     {
835         yaz_log(YLOG_DEBUG, "client_destroy c=%p %s cnt=%d",
836                 c, client_get_id(c), c->ref_count);
837         if (!pazpar2_decref(&c->ref_count, c->mutex))
838         {
839             xfree(c->pquery);
840             c->pquery = 0;
841             xfree(c->cqlquery);
842             c->cqlquery = 0;
843             xfree(c->id);
844             assert(!c->connection);
845
846             if (c->resultset)
847             {
848                 ZOOM_resultset_destroy(c->resultset);
849             }
850             yaz_mutex_destroy(&c->mutex);
851             xfree(c);
852             client_use(-1);
853             return 1;
854         }
855     }
856     return 0;
857 }
858
859 void client_set_connection(struct client *cl, struct connection *con)
860 {
861     if (cl->resultset)
862         ZOOM_resultset_release(cl->resultset);
863     if (con)
864     {
865         assert(cl->connection == 0);
866         cl->connection = con;
867         client_incref(cl);
868     }
869     else
870     {
871         cl->connection = con;
872         client_destroy(cl);
873     }
874 }
875
876 void client_disconnect(struct client *cl)
877 {
878     if (cl->state != Client_Idle)
879         client_set_state(cl, Client_Disconnected);
880     client_set_connection(cl, 0);
881 }
882
883
884 // Initialize CCL map for a target
885 static CCL_bibset prepare_cclmap(struct client *cl)
886 {
887     struct session_database *sdb = client_get_database(cl);
888     struct setting *s;
889     CCL_bibset res;
890
891     if (!sdb->settings)
892         return 0;
893     res = ccl_qual_mk();
894     for (s = sdb->settings[PZ_CCLMAP]; s; s = s->next)
895     {
896         char *p = strchr(s->name + 3, ':');
897         if (!p)
898         {
899             yaz_log(YLOG_WARN, "Malformed cclmap name: %s", s->name);
900             ccl_qual_rm(&res);
901             return 0;
902         }
903         p++;
904         ccl_qual_fitem(res, s->value, p);
905     }
906     return res;
907 }
908
909 // returns a xmalloced CQL query corresponding to the pquery in client
910 static char *make_cqlquery(struct client *cl)
911 {
912     cql_transform_t cqlt = cql_transform_create();
913     Z_RPNQuery *zquery;
914     char *r;
915     WRBUF wrb = wrbuf_alloc();
916     int status;
917     ODR odr_out = odr_createmem(ODR_ENCODE);
918
919     zquery = p_query_rpn(odr_out, cl->pquery);
920     yaz_log(YLOG_LOG, "PQF: %s", cl->pquery);
921     if ((status = cql_transform_rpn2cql_wrbuf(cqlt, wrb, zquery)))
922     {
923         yaz_log(YLOG_WARN, "Failed to generate CQL query, code=%d", status);
924         r = 0;
925     }
926     else
927     {
928         r = xstrdup(wrbuf_cstr(wrb));
929     }     
930     wrbuf_destroy(wrb);
931     odr_destroy(odr_out);
932     cql_transform_close(cqlt);
933     return r;
934 }
935
936 // returns a xmalloced SOLR query corresponding to the pquery in client
937 // TODO Could prob. be merge with the similar make_cqlquery
938 static char *make_solrquery(struct client *cl)
939 {
940     solr_transform_t sqlt = solr_transform_create();
941     Z_RPNQuery *zquery;
942     char *r;
943     WRBUF wrb = wrbuf_alloc();
944     int status;
945     ODR odr_out = odr_createmem(ODR_ENCODE);
946
947     zquery = p_query_rpn(odr_out, cl->pquery);
948     if (zquery == 0) {
949         yaz_log(YLOG_WARN, "Failed to generate RPN from PQF: %s", cl->pquery);
950         return 0;
951     }
952     yaz_log(YLOG_LOG, "PQF: %s", cl->pquery);
953     if ((status = solr_transform_rpn2solr_wrbuf(sqlt, wrb, zquery)))
954     {
955         yaz_log(YLOG_WARN, "Failed to generate SOLR query from PQF %s, code=%d", cl->pquery, status);
956         r = 0;
957     }
958     else
959     {
960         r = xstrdup(wrbuf_cstr(wrb));
961     }
962     wrbuf_destroy(wrb);
963     odr_destroy(odr_out);
964     solr_transform_close(sqlt);
965     return r;
966 }
967
968 static void apply_limit(struct session_database *sdb,
969                         facet_limits_t facet_limits,
970                         WRBUF w_pqf, WRBUF w_ccl)
971 {
972     int i = 0;
973     const char *name;
974     const char *value;
975     NMEM nmem_tmp = nmem_create();
976     for (i = 0; (name = facet_limits_get(facet_limits, i, &value)); i++)
977     {
978         struct setting *s = 0;
979         
980         for (s = sdb->settings[PZ_LIMITMAP]; s; s = s->next)
981         {
982             const char *p = strchr(s->name + 3, ':');
983             if (p && !strcmp(p + 1, name) && s->value)
984             {
985                 char **values = 0;
986                 int i, num = 0;
987                 nmem_strsplit_escape2(nmem_tmp, "|", value, &values,
988                                       &num, 1, '\\', 1);
989
990                 if (!strncmp(s->value, "rpn:", 4))
991                 {
992                     const char *pqf = s->value + 4;
993
994                     wrbuf_puts(w_pqf, "@and ");
995                     wrbuf_puts(w_pqf, pqf);
996                     wrbuf_puts(w_pqf, " ");
997                     for (i = 0; i < num; i++)
998                     {
999                         if (i < num - 1)
1000                             wrbuf_puts(w_pqf, "@or ");
1001                         yaz_encode_pqf_term(w_pqf, values[i],
1002                                             strlen(values[i]));
1003                     }
1004                 }
1005                 else if (!strncmp(s->value, "ccl:", 4))
1006                 {
1007                     const char *ccl = s->value + 4;
1008
1009                     wrbuf_puts(w_ccl, " and (");
1010
1011                     for (i = 0; i < num; i++)
1012                     {
1013                         if (i)
1014                             wrbuf_puts(w_ccl, " or ");
1015                         wrbuf_puts(w_ccl, ccl);
1016                         wrbuf_puts(w_ccl, "=\"");
1017                         wrbuf_puts(w_ccl, values[i]);
1018                         wrbuf_puts(w_ccl, "\"");
1019                     }
1020                     wrbuf_puts(w_ccl, ")");
1021
1022                 }
1023                 break;
1024             }
1025         }
1026         nmem_reset(nmem_tmp);
1027         if (!s)
1028         {
1029             yaz_log(YLOG_WARN, "Target %s: limit %s used, but no limitmap defined",
1030                     (sdb->database ? sdb->database->id : "<no id>"), name);
1031         }
1032     }
1033     nmem_destroy(nmem_tmp);
1034 }
1035                         
1036 // Parse the query given the settings specific to this client
1037 int client_parse_query(struct client *cl, const char *query,
1038                        facet_limits_t facet_limits)
1039 {
1040     struct session *se = client_get_session(cl);
1041     struct session_database *sdb = client_get_database(cl);
1042     struct ccl_rpn_node *cn;
1043     int cerror, cpos;
1044     CCL_bibset ccl_map = prepare_cclmap(cl);
1045     const char *sru = session_setting_oneval(sdb, PZ_SRU);
1046     const char *pqf_prefix = session_setting_oneval(sdb, PZ_PQF_PREFIX);
1047     const char *pqf_strftime = session_setting_oneval(sdb, PZ_PQF_STRFTIME);
1048     const char *query_syntax = session_setting_oneval(sdb, PZ_QUERY_SYNTAX);
1049     WRBUF w_ccl, w_pqf;
1050     if (!ccl_map)
1051         return -1;
1052
1053     cl->hits = -1;
1054     w_ccl = wrbuf_alloc();
1055     wrbuf_puts(w_ccl, query);
1056
1057     w_pqf = wrbuf_alloc();
1058     if (*pqf_prefix)
1059     {
1060         wrbuf_puts(w_pqf, pqf_prefix);
1061         wrbuf_puts(w_pqf, " ");
1062     }
1063
1064     apply_limit(sdb, facet_limits, w_pqf, w_ccl);
1065
1066     yaz_log(YLOG_LOG, "CCL query: %s", wrbuf_cstr(w_ccl));
1067     cn = ccl_find_str(ccl_map, wrbuf_cstr(w_ccl), &cerror, &cpos);
1068     ccl_qual_rm(&ccl_map);
1069     if (!cn)
1070     {
1071         client_set_state(cl, Client_Error);
1072         session_log(se, YLOG_WARN, "Failed to parse CCL query '%s' for %s",
1073                     wrbuf_cstr(w_ccl),
1074                     client_get_id(cl));
1075         wrbuf_destroy(w_ccl);
1076         wrbuf_destroy(w_pqf);
1077         return -1;
1078     }
1079     wrbuf_destroy(w_ccl);
1080
1081     if (!pqf_strftime || !*pqf_strftime)
1082         ccl_pquery(w_pqf, cn);
1083     else
1084     {
1085         time_t cur_time = time(0);
1086         struct tm *tm =  localtime(&cur_time);
1087         char tmp_str[300];
1088         const char *cp = tmp_str;
1089
1090         /* see man strftime(3) for things .. In particular %% gets converted
1091          to %.. And That's our original query .. */
1092         strftime(tmp_str, sizeof(tmp_str)-1, pqf_strftime, tm);
1093         for (; *cp; cp++)
1094         {
1095             if (cp[0] == '%')
1096                 ccl_pquery(w_pqf, cn);
1097             else
1098                 wrbuf_putc(w_pqf, cp[0]);
1099         }
1100     }
1101     xfree(cl->pquery);
1102     cl->pquery = xstrdup(wrbuf_cstr(w_pqf));
1103     wrbuf_destroy(w_pqf);
1104
1105     yaz_log(YLOG_LOG, "PQF query: %s", cl->pquery);
1106
1107     xfree(cl->cqlquery);
1108
1109     /* Support for PQF on SRU targets. */
1110     /* TODO Refactor */
1111     yaz_log(YLOG_DEBUG, "Query syntax: %s", query_syntax);
1112     if (strcmp(query_syntax, "pqf") != 0 && *sru)
1113     {
1114         if (!strcmp(sru, "solr")) {
1115             if (!(cl->cqlquery = make_solrquery(cl)))
1116                 return -1;
1117         }
1118         else {
1119             if (!(cl->cqlquery = make_cqlquery(cl)))
1120                 return -1;
1121         }
1122     }
1123     else
1124         cl->cqlquery = 0;
1125
1126     /* TODO FIX Not thread safe */
1127     if (!se->relevance)
1128     {
1129         // Initialize relevance structure with query terms
1130         se->relevance = relevance_create_ccl(
1131             se->service->charsets, se->nmem, cn);
1132     }
1133
1134     ccl_rpn_delete(cn);
1135     return 0;
1136 }
1137
1138 void client_set_session(struct client *cl, struct session *se)
1139 {
1140     cl->session = se;
1141 }
1142
1143 int client_is_active(struct client *cl)
1144 {
1145     if (cl->connection && (cl->state == Client_Connecting ||
1146                            cl->state == Client_Working))
1147         return 1;
1148     return 0;
1149 }
1150
1151 int client_is_active_preferred(struct client *cl)
1152 {
1153     /* only count if this is a preferred target. */
1154     if (!cl->preferred)
1155         return 0;
1156     /* TODO No sure this the condition that Seb wants */
1157     if (cl->connection && (cl->state == Client_Connecting ||
1158                            cl->state == Client_Working))
1159         return 1;
1160     return 0;
1161 }
1162
1163 Odr_int client_get_hits(struct client *cl)
1164 {
1165     return cl->hits;
1166 }
1167
1168 int client_get_num_records(struct client *cl)
1169 {
1170     return cl->record_offset;
1171 }
1172
1173 void client_set_diagnostic(struct client *cl, int diagnostic)
1174 {
1175     cl->diagnostic = diagnostic;
1176 }
1177
1178 int client_get_diagnostic(struct client *cl)
1179 {
1180     return cl->diagnostic;
1181 }
1182
1183 const char * client_get_suggestions_xml(struct client *cl, WRBUF wrbuf)
1184 {
1185     int idx;
1186     struct suggestions *suggestions = cl->suggestions;
1187     if (!suggestions || suggestions->num == 0) {
1188         return "";
1189     }
1190     for (idx = 0; idx < suggestions->num; idx++) {
1191         wrbuf_printf(wrbuf, "<suggest term=\"%s\"", suggestions->suggest[idx]);
1192         if (suggestions->misspelled[idx] && suggestions->misspelled[idx]) {
1193             wrbuf_puts(wrbuf, suggestions->misspelled[idx]);
1194             wrbuf_puts(wrbuf, "</suggest>\n");
1195         }
1196         else
1197             wrbuf_puts(wrbuf, "/>\n");
1198     }
1199     return wrbuf_cstr(wrbuf);
1200 }
1201
1202
1203 void client_set_database(struct client *cl, struct session_database *db)
1204 {
1205     cl->database = db;
1206 }
1207
1208 const char *client_get_id(struct client *cl)
1209 {
1210     return cl->id;
1211 }
1212
1213 void client_set_maxrecs(struct client *cl, int v)
1214 {
1215     cl->maxrecs = v;
1216 }
1217
1218 int client_get_maxrecs(struct client *cl)
1219 {
1220     return cl->maxrecs;
1221 }
1222
1223 void client_set_startrecs(struct client *cl, int v)
1224 {
1225     cl->startrecs = v;
1226 }
1227
1228 void client_set_preferred(struct client *cl, int v)
1229 {
1230     cl->preferred = v;
1231 }
1232
1233
1234 struct suggestions* client_suggestions_create(const char* suggestions_string) {
1235     if (suggestions_string == 0)
1236         return 0;
1237     int i;
1238     NMEM nmem = nmem_create();
1239     struct suggestions *suggestions = nmem_malloc(nmem, sizeof(*suggestions));
1240     suggestions->nmem = nmem;
1241     suggestions->num = 0;
1242     suggestions->misspelled = 0;
1243     suggestions->suggest = 0;
1244     if (suggestions_string)
1245         nmem_strsplit_escape2(suggestions->nmem, "\n", suggestions_string, &suggestions->suggest,
1246                               &suggestions->num, 1, '\\', 0);
1247     /* Set up misspelled array */
1248     suggestions->misspelled = (char **) nmem_malloc(nmem, suggestions->num * sizeof(**suggestions->misspelled));
1249     /* replace = with \0 .. for each item */
1250     for (i = 0; i < suggestions->num; i++)
1251     {
1252         char *cp = strchr(suggestions->suggest[i], '=');
1253         if (cp) {
1254             *cp = '\0';
1255             suggestions->misspelled[i] = cp+1;
1256         }
1257     }
1258     return suggestions;
1259 }
1260
1261 static void client_suggestions_destroy(struct client *cl)
1262 {
1263     nmem_destroy(cl->suggestions->nmem);
1264     cl->suggestions = 0;
1265 }
1266
1267 /*
1268  * Local variables:
1269  * c-basic-offset: 4
1270  * c-file-style: "Stroustrup"
1271  * indent-tabs-mode: nil
1272  * End:
1273  * vim: shiftwidth=4 tabstop=8 expandtab
1274  */
1275