Adding relevance ranking, etc.
[pazpar2-moved-to-github.git] / pazpar2.c
1 /* $Id: pazpar2.c,v 1.5 2006-11-26 05:15:43 quinn Exp $ */
2
3 #include <stdlib.h>
4 #include <stdio.h>
5 #include <string.h>
6 #include <sys/time.h>
7 #include <unistd.h>
8 #include <sys/socket.h>
9 #include <signal.h>
10 #include <ctype.h>
11 #include <assert.h>
12
13 #include <yaz/comstack.h>
14 #include <yaz/tcpip.h>
15 #include <yaz/proto.h>
16 #include <yaz/readconf.h>
17 #include <yaz/pquery.h>
18 #include <yaz/yaz-util.h>
19 #include <yaz/ccl.h>
20
21 #include "pazpar2.h"
22 #include "eventl.h"
23 #include "command.h"
24 #include "http.h"
25 #include "termlists.h"
26 #include "reclists.h"
27 #include "relevance.h"
28
29 #define PAZPAR2_VERSION "0.1"
30 #define MAX_DATABASES 512
31 #define MAX_CHUNK 10
32
33 struct target
34 {
35     struct session *session;
36     char fullname[256];
37     char hostport[128];
38     char *ibuf;
39     int ibufsize;
40     char databases[MAX_DATABASES][128];
41     COMSTACK link;
42     ODR odr_in, odr_out;
43     struct target *next;
44     void *addr;
45     int hits;
46     int records;
47     int setno;
48     int requestid;                              // ID of current outstanding request
49     int diagnostic;
50     enum target_state
51     {
52         No_connection,
53         Connecting,
54         Connected,
55         Initializing,
56         Searching,
57         Presenting,
58         Error,
59         Idle,
60         Failed
61     } state;
62 };
63
64 static char *state_strings[] = {
65     "No_connection",
66     "Connecting",
67     "Connected",
68     "Initializing",
69     "Searching",
70     "Presenting",
71     "Error",
72     "Idle",
73     "Failed"
74 };
75
76
77 IOCHAN channel_list = 0;
78
79 static struct parameters {
80     int timeout;                /* operations timeout, in seconds */
81     char implementationId[128];
82     char implementationName[128];
83     char implementationVersion[128];
84     struct timeval base_time;
85     int toget;
86     int chunk;
87     CCL_bibset ccl_filter;
88 } global_parameters = 
89 {
90     30,
91     "81",
92     "Index Data PazPar2 (MasterKey)",
93     PAZPAR2_VERSION,
94     {0,0},
95     100,
96     MAX_CHUNK,
97     0
98 };
99
100
101 static int send_apdu(struct target *t, Z_APDU *a)
102 {
103     char *buf;
104     int len, r;
105
106     if (!z_APDU(t->odr_out, &a, 0, 0))
107     {
108         odr_perror(t->odr_out, "Encoding APDU");
109         abort();
110     }
111     buf = odr_getbuf(t->odr_out, &len, 0);
112     r = cs_put(t->link, buf, len);
113     if (r < 0)
114     {
115         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(t->link)));
116         return -1;
117     }
118     else if (r == 1)
119     {
120         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
121     }
122     odr_reset(t->odr_out); /* release the APDU structure  */
123     return 0;
124 }
125
126
127 static void send_init(IOCHAN i)
128 {
129     struct target *t = iochan_getdata(i);
130     Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_initRequest);
131
132     a->u.initRequest->implementationId = global_parameters.implementationId;
133     a->u.initRequest->implementationName = global_parameters.implementationName;
134     a->u.initRequest->implementationVersion =
135         global_parameters.implementationVersion;
136     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
137     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
138     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
139
140     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
141     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
142     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
143     if (send_apdu(t, a) >= 0)
144     {
145         iochan_setflags(i, EVENT_INPUT);
146         t->state = Initializing;
147     }
148     else
149     {
150         iochan_destroy(i);
151         t->state = Failed;
152         cs_close(t->link);
153     }
154 }
155
156 static void send_search(IOCHAN i)
157 {
158     struct target *t = iochan_getdata(i);
159     struct session *s = t->session;
160     Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_searchRequest);
161     int ndb;
162     char **databaselist;
163     Z_Query *zquery;
164
165     yaz_log(YLOG_DEBUG, "Sending search");
166     a->u.searchRequest->query = zquery = odr_malloc(t->odr_out, sizeof(Z_Query));
167     zquery->which = Z_Query_type_1;
168     zquery->u.type_1 = p_query_rpn(t->odr_out, PROTO_Z3950, s->query);
169
170     for (ndb = 0; *t->databases[ndb]; ndb++)
171         ;
172     databaselist = odr_malloc(t->odr_out, sizeof(char*) * ndb);
173     for (ndb = 0; *t->databases[ndb]; ndb++)
174         databaselist[ndb] = t->databases[ndb];
175
176     a->u.searchRequest->resultSetName = "Default";
177     a->u.searchRequest->databaseNames = databaselist;
178     a->u.searchRequest->num_databaseNames = ndb;
179
180     if (send_apdu(t, a) >= 0)
181     {
182         iochan_setflags(i, EVENT_INPUT);
183         t->state = Searching;
184         t->requestid = s->requestid;
185     }
186     else
187     {
188         iochan_destroy(i);
189         t->state = Failed;
190         cs_close(t->link);
191     }
192     odr_reset(t->odr_out);
193 }
194
195 static void send_present(IOCHAN i)
196 {
197     struct target *t = iochan_getdata(i);
198     Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_presentRequest);
199     int toget;
200     int start = t->records + 1;
201
202     toget = global_parameters.chunk;
203     if (toget > t->hits - t->records)
204         toget = t->hits - t->records;
205
206     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
207
208     a->u.presentRequest->resultSetStartPoint = &start;
209     a->u.presentRequest->numberOfRecordsRequested = &toget;
210
211     a->u.presentRequest->resultSetId = "Default";
212
213     if (send_apdu(t, a) >= 0)
214     {
215         iochan_setflags(i, EVENT_INPUT);
216         t->state = Presenting;
217     }
218     else
219     {
220         iochan_destroy(i);
221         t->state = Failed;
222         cs_close(t->link);
223     }
224     odr_reset(t->odr_out);
225 }
226
227 static void do_initResponse(IOCHAN i, Z_APDU *a)
228 {
229     struct target *t = iochan_getdata(i);
230     Z_InitResponse *r = a->u.initResponse;
231
232     yaz_log(YLOG_DEBUG, "Received init response");
233
234     if (*r->result)
235     {
236         t->state = Idle;
237     }
238     else
239     {
240         t->state = Failed;
241         iochan_destroy(i);
242         cs_close(t->link);
243     }
244 }
245
246 static void do_searchResponse(IOCHAN i, Z_APDU *a)
247 {
248     struct target *t = iochan_getdata(i);
249     Z_SearchResponse *r = a->u.searchResponse;
250
251     yaz_log(YLOG_DEBUG, "Searchresponse (status=%d)", *r->searchStatus);
252
253     if (*r->searchStatus)
254     {
255         t->hits = *r->resultCount;
256         t->state = Idle;
257     }
258     else
259     {          /*"FAILED"*/
260         t->hits = 0;
261         t->state = Failed;
262         if (r->records) {
263             Z_Records *recs = r->records;
264             if (recs->which == Z_Records_NSD)
265             {
266                 yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
267                 t->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
268                 t->state = Error;
269             }
270         }
271     }
272 }
273
274 const char *find_field(const char *rec, const char *field)
275 {
276     const char *line = rec;
277
278     while (*line)
279     {
280         if (!strncmp(line, field, 3) && line[3] == ' ')
281             return line;
282         while (*(line++) != '\n')
283             ;
284     }
285     return 0;
286 }
287
288 const char *find_subfield(const char *field, char subfield)
289 {
290     const char *p = field;
291
292     while (*p && *p != '\n')
293     {
294         while (*p != '\n' && *p != '\t')
295             p++;
296         if (*p == '\t' && *(++p) == subfield) {
297             if (*(++p) == ' ')
298             {
299                 while (isspace(*p))
300                     p++;
301                 return p;
302             }
303         }
304     }
305     return 0;
306 }
307
308 // Extract 245 $a $b 100 $a
309 char *extract_mergekey(struct session *s, const char *rec)
310 {
311     const char *field, *subfield;
312     char *e, *ef;
313     char *out, *p, *pout;
314
315     wrbuf_rewind(s->wrbuf);
316
317     if (!(field = find_field(rec, "245")))
318         return 0;
319     if (!(subfield = find_subfield(field, 'a')))
320         return 0;
321     ef = index(subfield, '\n');
322     if ((e = index(subfield, '\t')) && e < ef)
323         ef = e;
324     if (ef)
325     {
326         wrbuf_write(s->wrbuf, subfield, ef - subfield);
327         if ((subfield = find_subfield(field, 'b'))) 
328         {
329             ef = index(subfield, '\n');
330             if ((e = index(subfield, '\t')) && e < ef)
331                 ef = e;
332             if (ef)
333             {
334                 wrbuf_puts(s->wrbuf, " field "); 
335                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
336             }
337         }
338     }
339     if ((field = find_field(rec, "100")))
340     {
341         if ((subfield = find_subfield(field, 'a')))
342         {
343             ef = index(subfield, '\n');
344             if ((e = index(subfield, '\t')) && e < ef)
345                 ef = e;
346             if (ef)
347             {
348                 wrbuf_puts(s->wrbuf, " field "); 
349                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
350             }
351         }
352     }
353     wrbuf_putc(s->wrbuf, '\0');
354     p = wrbuf_buf(s->wrbuf);
355     out = pout = nmem_malloc(s->nmem, strlen(p) + 1);
356
357     while (*p)
358     {
359         while (isalnum(*p))
360             *(pout++) = tolower(*(p++));
361         while (*p && !isalnum(*p))
362             p++;
363         *(pout++) = ' ';
364     }
365     if (out != pout)
366         *(--pout) = '\0';
367
368     return out;
369 }
370
371 #ifdef RECHEAP
372 static void push_record(struct session *s, struct record *r)
373 {
374     int p;
375     assert(s->recheap_max + 1 < s->recheap_size);
376
377     s->recheap[p = ++s->recheap_max] = r;
378     while (p > 0)
379     {
380         int parent = (p - 1) >> 1;
381         if (strcmp(s->recheap[p]->merge_key, s->recheap[parent]->merge_key) < 0)
382         {
383             struct record *tmp;
384             tmp = s->recheap[parent];
385             s->recheap[parent] = s->recheap[p];
386             s->recheap[p] = tmp;
387             p = parent;
388         }
389         else
390             break;
391     }
392 }
393
394 static struct record *top_record(struct session *s)
395 {
396     return s-> recheap_max >= 0 ?  s->recheap[0] : 0;
397 }
398
399 static struct record *pop_record(struct session *s)
400 {
401     struct record *res;
402     int p = 0;
403     int lastnonleaf = (s->recheap_max - 1) >> 1;
404
405     if (s->recheap_max < 0)
406         return 0;
407
408     res = s->recheap[0];
409
410     s->recheap[p] = s->recheap[s->recheap_max--];
411
412     while (p <= lastnonleaf)
413     {
414         int right = (p + 1) << 1;
415         int left = right - 1;
416         int min = left;
417
418         if (right < s->recheap_max &&
419                 strcmp(s->recheap[right]->merge_key, s->recheap[left]->merge_key) < 0)
420             min = right;
421         if (strcmp(s->recheap[min]->merge_key, s->recheap[p]->merge_key) < 0)
422         {
423             struct record *tmp = s->recheap[min];
424             s->recheap[min] = s->recheap[p];
425             s->recheap[p] = tmp;
426             p = min;
427         }
428         else
429             break;
430     }
431     return res;
432 }
433
434 // Like pop_record but collapses identical (merge_key) records
435 // The heap will contain multiple independent matching records and possibly
436 // one cluster, created the last time the list was scanned
437 static struct record *pop_mrecord(struct session *s)
438 {
439     struct record *this;
440     struct record *next;
441
442     if (!(this = pop_record(s)))
443         return 0;
444
445     // Collapse identical records
446     while ((next = top_record(s)))
447     {
448         struct record *p, *tmpnext;
449         if (strcmp(this->merge_key, next->merge_key))
450             break;
451         // Absorb record (and clustersiblings) into a supercluster
452         for (p = next; p; p = tmpnext) {
453             tmpnext = p->next_cluster;
454             p->next_cluster = this->next_cluster;
455             this->next_cluster = p;
456         }
457
458         pop_record(s);
459     }
460     return this;
461 }
462
463 // Reads records in sort order. Store records in top of heapspace until rewind is called.
464 static struct record *read_recheap(struct session *s)
465 {
466     struct record *r = pop_mrecord(s);
467
468     if (r)
469     {
470         if (s->recheap_scratch < 0)
471             s->recheap_scratch = s->recheap_size;
472         s->recheap[--s->recheap_scratch] = r;
473     }
474
475     return r;
476 }
477
478 // Return records to heap after read
479 static void rewind_recheap(struct session *s)
480 {
481     while (s->recheap_scratch >= 0) {
482         push_record(s, s->recheap[s->recheap_scratch++]);
483         if (s->recheap_scratch >= s->recheap_size)
484             s->recheap_scratch = -1;
485     }
486 }
487
488 #endif
489
490 // FIXME needs to be generalized. Should flexibly generate X lists per search
491 static void extract_subject(struct session *s, const char *rec)
492 {
493     const char *field, *subfield;
494
495     while ((field = find_field(rec, "650")))
496     {
497         rec = field + 1; // Crude way to cause a loop through repeating fields
498         if ((subfield = find_subfield(field, 'a')))
499         {
500             char *e, *ef;
501             char buf[1024];
502             int len;
503
504             ef = index(subfield, '\n');
505             if ((e = index(subfield, '\t')) && e < ef)
506                 ef = e;
507             while (ef > subfield && !isalpha(*(ef - 1)) && *(ef - 1) != ')')
508                 ef--;
509             len = ef - subfield;
510             assert(len < 1023);
511             memcpy(buf, subfield, len);
512             buf[len] = '\0';
513             termlist_insert(s->termlist, buf);
514         }
515     }
516 }
517
518 static void pull_relevance_keys(struct session *s, struct record *head,  struct record *rec)
519 {
520     relevance_newrec(s->relevance, head);
521     relevance_countwords(s->relevance, head, rec->merge_key, strlen(rec->merge_key));
522     relevance_donerecord(s->relevance, head);
523 }
524
525 struct record *ingest_record(struct target *t, char *buf, int len)
526 {
527     struct session *s = t->session;
528     struct record *res;
529     struct record *head;
530     const char *recbuf;
531
532     wrbuf_rewind(s->wrbuf);
533     yaz_marc_xml(s->yaz_marc, YAZ_MARC_LINE);
534     if (yaz_marc_decode_wrbuf(s->yaz_marc, buf, len, s->wrbuf) < 0)
535     {
536         yaz_log(YLOG_WARN, "Failed to decode MARC record");
537         return 0;
538     }
539     wrbuf_putc(s->wrbuf, '\0');
540     recbuf = wrbuf_buf(s->wrbuf);
541
542     res = nmem_malloc(s->nmem, sizeof(struct record));
543
544     extract_subject(s, recbuf);
545
546     res->merge_key = extract_mergekey(s, recbuf);
547     if (!res->merge_key)
548         return 0;
549     res->buf = nmem_strdupn(s->nmem, recbuf, wrbuf_len(s->wrbuf));
550     res->target = t;
551     res->next_cluster = 0;
552     res->target_offset = -1;
553     res->term_frequency_vec = 0;
554
555     head = reclist_insert(s->reclist, res);
556
557     pull_relevance_keys(s, head, res);
558
559     return res;
560 }
561
562 void ingest_records(struct target *t, Z_Records *r)
563 {
564     //struct session *s = t->session;
565     struct record *rec;
566     Z_NamePlusRecordList *rlist;
567     int i;
568
569     if (r->which != Z_Records_DBOSD)
570         return;
571     rlist = r->u.databaseOrSurDiagnostics;
572     for (i = 0; i < rlist->num_records; i++)
573     {
574         Z_NamePlusRecord *npr = rlist->records[i];
575         Z_External *e;
576         char *buf;
577         int len;
578
579         if (npr->which != Z_NamePlusRecord_databaseRecord)
580         {
581             yaz_log(YLOG_WARN, "Unexpected record type, probably diagnostic");
582             continue;
583         }
584         e = npr->u.databaseRecord;
585         if (e->which != Z_External_octet)
586         {
587             yaz_log(YLOG_WARN, "Unexpected external branch, probably BER");
588             continue;
589         }
590         buf = (char*) e->u.octet_aligned->buf;
591         len = e->u.octet_aligned->len;
592
593         rec = ingest_record(t, buf, len);
594         if (!rec)
595             continue;
596     }
597 }
598
599 static void do_presentResponse(IOCHAN i, Z_APDU *a)
600 {
601     struct target *t = iochan_getdata(i);
602     Z_PresentResponse *r = a->u.presentResponse;
603
604     if (r->records) {
605         Z_Records *recs = r->records;
606         if (recs->which == Z_Records_NSD)
607         {
608             yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
609             t->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
610             t->state = Error;
611         }
612     }
613
614     if (!*r->presentStatus && t->state != Error)
615     {
616         yaz_log(YLOG_DEBUG, "Good Present response");
617         t->records += *r->numberOfRecordsReturned;
618         ingest_records(t, r->records);
619         t->state = Idle;
620     }
621     else if (*r->presentStatus) 
622     {
623         yaz_log(YLOG_WARN, "Bad Present response");
624         t->state = Error;
625     }
626 }
627
628 static void handler(IOCHAN i, int event)
629 {
630     struct target *t = iochan_getdata(i);
631     struct session *s = t->session;
632     //static int waiting = 0;
633
634     if (t->state == No_connection) /* Start connection */
635     {
636         int res = cs_connect(t->link, t->addr);
637
638         t->state = Connecting;
639         if (!res) /* we are go */
640             iochan_setevent(i, EVENT_OUTPUT);
641         else if (res == 1)
642             iochan_setflags(i, EVENT_OUTPUT);
643         else
644         {
645             yaz_log(YLOG_WARN|YLOG_ERRNO, "ERROR %s connect\n", t->hostport);
646             cs_close(t->link);
647             t->state = Failed;
648             iochan_destroy(i);
649         }
650     }
651
652     else if (t->state == Connecting && event & EVENT_OUTPUT)
653     {
654         int errcode;
655         socklen_t errlen = sizeof(errcode);
656
657         if (getsockopt(cs_fileno(t->link), SOL_SOCKET, SO_ERROR, &errcode,
658             &errlen) < 0 || errcode != 0)
659         {
660             cs_close(t->link);
661             iochan_destroy(i);
662             t->state = Failed;
663             return;
664         }
665         else
666         {
667             yaz_log(YLOG_DEBUG, "Connect OK");
668             t->state = Connected;
669         }
670     }
671
672     else if (event & EVENT_INPUT)
673     {
674         int len = cs_get(t->link, &t->ibuf, &t->ibufsize);
675
676         if (len < 0)
677         {
678             cs_close(t->link);
679             iochan_destroy(i);
680             t->state = Failed;
681             return;
682         }
683         if (len == 0)
684         {
685             cs_close(t->link);
686             iochan_destroy(i);
687             t->state = Failed;
688             return;
689         }
690         else if (len > 1)
691         {
692             if (t->requestid == s->requestid || t->state == Initializing) 
693             {
694                 Z_APDU *a;
695
696                 odr_reset(t->odr_in);
697                 odr_setbuf(t->odr_in, t->ibuf, len, 0);
698                 if (!z_APDU(t->odr_in, &a, 0, 0))
699                 {
700                     cs_close(t->link);
701                     iochan_destroy(i);
702                     t->state = Failed;
703                     return;
704                 }
705                 switch (a->which)
706                 {
707                     case Z_APDU_initResponse:
708                         do_initResponse(i, a);
709                         break;
710                     case Z_APDU_searchResponse:
711                         do_searchResponse(i, a);
712                         break;
713                     case Z_APDU_presentResponse:
714                         do_presentResponse(i, a);
715                         break;
716                     default:
717                         yaz_log(YLOG_WARN, "Unexpected result from server");
718                         cs_close(t->link);
719                         iochan_destroy(i);
720                         t->state = Failed;
721                         return;
722                 }
723                 // if (cs_more(t->link))
724                 //    iochan_setevent(i, EVENT_INPUT);
725             }
726             else  // we throw away response and go to idle mode
727                 t->state = Idle;
728         }
729         /* if len==1 we do nothing but wait for more input */
730     }
731
732     else if (t->state == Connected) {
733         send_init(i);
734     }
735
736     if (t->state == Idle)
737     {
738         if (t->requestid != s->requestid) {
739             send_search(i);
740         }
741         else if (t->hits > 0 && t->records < global_parameters.toget &&
742             t->records < t->hits) {
743             send_present(i);
744         }
745     }
746 }
747
748 int load_targets(struct session *s, const char *fn)
749 {
750     FILE *f = fopen(fn, "r");
751     char line[256];
752     struct target **target_p;
753
754     if (!f)
755     {
756         yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", fn);
757         return -1;
758     }
759
760     target_p = &s->targets;
761     while (fgets(line, 255, f))
762     {
763         char *url, *p;
764         struct target *target;
765         IOCHAN new;
766
767         if (strncmp(line, "target ", 7))
768             continue;
769         url = line + 7;
770         url[strlen(url) - 1] = '\0';
771         yaz_log(LOG_DEBUG, "Target: %s", url);
772
773         *target_p = target = xmalloc(sizeof(**target_p));
774         target->next = 0;
775         target_p = &target->next;
776         target->state = No_connection;
777         target->ibuf = 0;
778         target->ibufsize = 0;
779         target->odr_in = odr_createmem(ODR_DECODE);
780         target->odr_out = odr_createmem(ODR_ENCODE);
781         target->hits = -1;
782         target->setno = 0;
783         target->session = s;
784         target->requestid = -1;
785         target->records = 0;
786         target->diagnostic = 0;
787         strcpy(target->fullname, url);
788         if ((p = strchr(url, '/')))
789         {                   
790             *p = '\0';
791             strcpy(target->hostport, url);
792             *p = '/';
793             p++;
794             strcpy(target->databases[0], p);
795             target->databases[1][0] = '\0';
796         }
797         else
798         {
799             strcpy(target->hostport, url);
800             strcpy(target->databases[0], "Default");
801             target->databases[1][0] = '\0';
802         }
803
804         if (!(target->link = cs_create(tcpip_type, 0, PROTO_Z3950)))
805         {
806             yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
807             exit(1);
808         }
809         if (!(target->addr = cs_straddr(target->link, target->hostport)))
810         {
811             printf("ERROR %s bad-address", target->hostport);
812             target->state = Failed;
813             continue;
814         }
815         new = iochan_create(cs_fileno(target->link), handler, 0);
816         iochan_setdata(new, target);
817         iochan_setevent(new, EVENT_EXCEPT);
818         new->next = channel_list;
819         channel_list = new;
820     }
821     fclose(f);
822
823     return 0;
824 }
825
826 void search(struct session *s, char *query)
827 {
828     IOCHAN c;
829     int live_channels = 0;
830
831     yaz_log(YLOG_DEBUG, "Search");
832
833     // Determine what iochans belong to this session
834     // It might have been better to have a list of them
835
836     strcpy(s->query, query);
837     s->requestid++;
838     nmem_reset(s->nmem);
839     for (c = channel_list; c; c = c->next)
840     {
841         struct target *t;
842
843         if (iochan_getfun(c) != handler) // Not a Z target
844             continue;
845         t = iochan_getdata(c);
846         if (t->session == s)
847         {
848             t->hits = -1;
849             t->records = 0;
850             t->diagnostic = 0;
851
852             if (t->state == Error)
853                 t->state = Idle;
854
855             if (t->state == Idle) 
856                 iochan_setflag(c, EVENT_OUTPUT);
857
858             live_channels++;
859         }
860     }
861     if (live_channels)
862     {
863         const char *p[] = { query, 0 };
864         int maxrecs = live_channels * global_parameters.toget;
865         s->termlist = termlist_create(s->nmem, maxrecs, 15);
866         s->reclist = reclist_create(s->nmem, maxrecs);
867         s->relevance = relevance_create(s->nmem, p, maxrecs);
868     }
869 }
870
871 struct session *new_session() 
872 {
873     struct session *session = xmalloc(sizeof(*session));
874
875     yaz_log(YLOG_DEBUG, "New pazpar2 session");
876     
877     session->termlist = 0;
878     session->reclist = 0;
879     session->requestid = -1;
880     session->targets = 0;
881     session->pqf_parser = yaz_pqf_create();
882     session->query[0] = '\0';
883     session->nmem = nmem_create();
884     session->yaz_marc = yaz_marc_create();
885     yaz_marc_subfield_str(session->yaz_marc, "\t");
886     session->wrbuf = wrbuf_alloc();
887
888     return session;
889 }
890
891 void session_destroy(struct session *s)
892 {
893     // FIXME do some shit here!!!!
894 }
895
896 struct hitsbytarget *hitsbytarget(struct session *s, int *count)
897 {
898     static struct hitsbytarget res[1000]; // FIXME MM
899     IOCHAN c;
900
901     *count = 0;
902     for (c = channel_list; c; c = c->next)
903         if (iochan_getfun(c) == handler)
904         {
905             struct target *t = iochan_getdata(c);
906             if (t->session == s)
907             {
908                 strcpy(res[*count].id, t->hostport);
909                 res[*count].hits = t->hits;
910                 res[*count].records = t->records;
911                 res[*count].diagnostic = t->diagnostic;
912                 res[*count].state = state_strings[(int) t->state];
913                 (*count)++;
914             }
915         }
916
917     return res;
918 }
919
920 struct termlist_score **termlist(struct session *s, int *num)
921 {
922     return termlist_highscore(s->termlist, num);
923 }
924
925 struct record **show(struct session *s, int start, int *num)
926 {
927     struct record **recs = nmem_malloc(s->nmem, *num * sizeof(struct record *));
928     int i;
929
930     // FIXME -- skip initial records
931
932     relevance_prepare_read(s->relevance, s->reclist);
933     for (i = 0; i < *num; i++)
934     {
935         struct record *r = reclist_read_record(s->reclist);
936         if (!r)
937         {
938             *num = i;
939             break;
940         }
941         recs[i] = r;
942         yaz_log(YLOG_DEBUG, "%d: %s%s", r->relevance, r->merge_key, r->next_cluster ? " (cluster)": "");
943     }
944     return recs;
945 }
946
947 void statistics(struct session *s, struct statistics *stat)
948 {
949     IOCHAN c;
950     int i;
951
952     bzero(stat, sizeof(*stat));
953     for (i = 0, c = channel_list; c; i++, c = c->next)
954     {
955         struct target *t;
956         if (iochan_getfun(c) != handler)
957             continue;
958         t = iochan_getdata(c);
959         switch (t->state)
960         {
961             case No_connection: stat->num_no_connection++; break;
962             case Connecting: stat->num_connecting++; break;
963             case Initializing: stat->num_initializing++; break;
964             case Searching: stat->num_searching++; break;
965             case Presenting: stat->num_presenting++; break;
966             case Idle: stat->num_idle++; break;
967             case Failed: stat->num_failed++; break;
968             case Error: stat->num_error++; break;
969             default: break;
970         }
971     }
972
973     stat->num_connections = i;
974 }
975
976 static CCL_bibset load_cclfile(const char *fn)
977 {
978     CCL_bibset res = ccl_qual_mk();
979     if (ccl_qual_fname(res, fn) < 0)
980     {
981         yaz_log(YLOG_FATAL|YLOG_ERRNO, "%s", fn);
982         exit(1);
983     }
984     return res;
985 }
986
987 int main(int argc, char **argv)
988 {
989     int ret;
990     char *arg;
991
992     if (signal(SIGPIPE, SIG_IGN) < 0)
993         yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
994
995     yaz_log_init(YLOG_DEFAULT_LEVEL|YLOG_DEBUG, "pazpar2", 0);
996
997     while ((ret = options("c:h:p:C:", argv, argc, &arg)) != -2)
998     {
999         switch (ret) {
1000             case 0:
1001                 break;
1002             case 'c':
1003                 command_init(atoi(arg));
1004                 break;
1005             case 'C':
1006                 global_parameters.ccl_filter = load_cclfile(arg);
1007                 break;
1008             case 'h':
1009                 http_init(atoi(arg));
1010                 break;
1011             case 'p':
1012                 http_set_proxyaddr(arg);
1013                 break;
1014             default:
1015                 fprintf(stderr, "Usage: pazpar2 -d comport");
1016                 exit(1);
1017         }
1018             
1019     }
1020
1021     if (!global_parameters.ccl_filter)
1022         load_cclfile("default.bib");
1023
1024     event_loop(&channel_list);
1025
1026     return 0;
1027 }
1028
1029 /*
1030  * Local variables:
1031  * c-basic-offset: 4
1032  * indent-tabs-mode: nil
1033  * End:
1034  * vim: shiftwidth=4 tabstop=8 expandtab
1035  */