1 /* $Id: pazpar2.c,v 1.3 2006-11-21 18:46:43 quinn Exp $ */
8 #include <sys/socket.h>
13 #include <yaz/comstack.h>
14 #include <yaz/tcpip.h>
15 #include <yaz/proto.h>
16 #include <yaz/readconf.h>
17 #include <yaz/pquery.h>
18 #include <yaz/yaz-util.h>
25 #define PAZPAR2_VERSION "0.1"
26 #define MAX_DATABASES 512
31 struct session *session;
36 char databases[MAX_DATABASES][128];
44 int requestid; // ID of current outstanding request
60 static char *state_strings[] = {
73 IOCHAN channel_list = 0;
75 static struct parameters {
76 int timeout; /* operations timeout, in seconds */
77 char implementationId[128];
78 char implementationName[128];
79 char implementationVersion[128];
80 struct timeval base_time;
87 "Index Data PazPar2 (MasterKey)",
95 static int send_apdu(struct target *t, Z_APDU *a)
100 if (!z_APDU(t->odr_out, &a, 0, 0))
102 odr_perror(t->odr_out, "Encoding APDU");
105 buf = odr_getbuf(t->odr_out, &len, 0);
106 r = cs_put(t->link, buf, len);
109 yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(t->link)));
114 fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
116 odr_reset(t->odr_out); /* release the APDU structure */
121 static void send_init(IOCHAN i)
123 struct target *t = iochan_getdata(i);
124 Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_initRequest);
126 a->u.initRequest->implementationId = global_parameters.implementationId;
127 a->u.initRequest->implementationName = global_parameters.implementationName;
128 a->u.initRequest->implementationVersion =
129 global_parameters.implementationVersion;
130 ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
131 ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
132 ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
134 ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
135 ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
136 ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
137 if (send_apdu(t, a) >= 0)
139 iochan_setflags(i, EVENT_INPUT);
140 t->state = Initializing;
150 static void send_search(IOCHAN i)
152 struct target *t = iochan_getdata(i);
153 struct session *s = t->session;
154 Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_searchRequest);
159 yaz_log(YLOG_DEBUG, "Sending search");
160 a->u.searchRequest->query = zquery = odr_malloc(t->odr_out, sizeof(Z_Query));
161 zquery->which = Z_Query_type_1;
162 zquery->u.type_1 = p_query_rpn(t->odr_out, PROTO_Z3950, s->query);
164 for (ndb = 0; *t->databases[ndb]; ndb++)
166 databaselist = odr_malloc(t->odr_out, sizeof(char*) * ndb);
167 for (ndb = 0; *t->databases[ndb]; ndb++)
168 databaselist[ndb] = t->databases[ndb];
170 a->u.searchRequest->resultSetName = "Default";
171 a->u.searchRequest->databaseNames = databaselist;
172 a->u.searchRequest->num_databaseNames = ndb;
174 if (send_apdu(t, a) >= 0)
176 iochan_setflags(i, EVENT_INPUT);
177 t->state = Searching;
178 t->requestid = s->requestid;
186 odr_reset(t->odr_out);
189 static void send_present(IOCHAN i)
191 struct target *t = iochan_getdata(i);
192 Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_presentRequest);
194 int start = t->records + 1;
196 toget = global_parameters.chunk;
197 if (toget > t->hits - t->records)
198 toget = t->hits - t->records;
200 yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
202 a->u.presentRequest->resultSetStartPoint = &start;
203 a->u.presentRequest->numberOfRecordsRequested = &toget;
205 a->u.presentRequest->resultSetId = "Default";
207 if (send_apdu(t, a) >= 0)
209 iochan_setflags(i, EVENT_INPUT);
210 t->state = Presenting;
218 odr_reset(t->odr_out);
221 static void do_initResponse(IOCHAN i, Z_APDU *a)
223 struct target *t = iochan_getdata(i);
224 Z_InitResponse *r = a->u.initResponse;
226 yaz_log(YLOG_DEBUG, "Received init response");
240 static void do_searchResponse(IOCHAN i, Z_APDU *a)
242 struct target *t = iochan_getdata(i);
243 Z_SearchResponse *r = a->u.searchResponse;
245 yaz_log(YLOG_DEBUG, "Searchresponse (status=%d)", *r->searchStatus);
247 if (*r->searchStatus)
249 t->hits = *r->resultCount;
257 Z_Records *recs = r->records;
258 if (recs->which == Z_Records_NSD)
260 yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
261 t->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
268 const char *find_field(const char *rec, const char *field)
270 const char *line = rec;
274 if (!strncmp(line, field, 3) && line[3] == ' ')
276 while (*(line++) != '\n')
282 const char *find_subfield(const char *field, char subfield)
284 const char *p = field;
286 while (*p && *p != '\n')
288 while (*p != '\n' && *p != '\t')
290 if (*p == '\t' && *(++p) == subfield) {
298 // Extract 245 $a $b 100 $a
299 char *extract_mergekey(struct session *s, const char *rec)
301 const char *field, *subfield;
303 char *out, *p, *pout;
305 wrbuf_rewind(s->wrbuf);
307 if (!(field = find_field(rec, "245")))
309 if (!(subfield = find_subfield(field, 'a')))
311 ef = index(subfield, '\n');
312 if ((e = index(subfield, '\t')) && e < ef)
316 wrbuf_write(s->wrbuf, subfield, ef - subfield);
317 if ((subfield = find_subfield(field, 'b')))
319 ef = index(subfield, '\n');
320 if ((e = index(subfield, '\t')) && e < ef)
324 wrbuf_puts(s->wrbuf, " field ");
325 wrbuf_write(s->wrbuf, subfield, ef - subfield);
329 if ((field = find_field(rec, "100")))
331 if ((subfield = find_subfield(field, 'a')))
333 ef = index(subfield, '\n');
334 if ((e = index(subfield, '\t')) && e < ef)
338 wrbuf_puts(s->wrbuf, " field ");
339 wrbuf_write(s->wrbuf, subfield, ef - subfield);
343 wrbuf_putc(s->wrbuf, '\0');
344 p = wrbuf_buf(s->wrbuf);
345 out = pout = nmem_malloc(s->nmem, strlen(p) + 1);
350 *(pout++) = tolower(*(p++));
351 while (*p && !isalnum(*p))
361 static void push_record(struct session *s, struct record *r)
364 assert(s->recheap_max + 1 < s->recheap_size);
366 s->recheap[p = ++s->recheap_max] = r;
369 int parent = (p - 1) >> 1;
370 if (strcmp(s->recheap[p]->merge_key, s->recheap[parent]->merge_key) < 0)
373 tmp = s->recheap[parent];
374 s->recheap[parent] = s->recheap[p];
383 static struct record *top_record(struct session *s)
385 return s-> recheap_max >= 0 ? s->recheap[0] : 0;
388 static struct record *pop_record(struct session *s)
390 struct record *res = s->recheap[0];
392 int lastnonleaf = (s->recheap_max - 1) >> 1;
394 if (s->recheap_max < 0)
397 s->recheap[p] = s->recheap[s->recheap_max--];
399 while (p <= lastnonleaf)
401 int right = (p + 1) << 1;
402 int left = right - 1;
405 if (right < s->recheap_max &&
406 strcmp(s->recheap[right]->merge_key, s->recheap[left]->merge_key) < 0)
408 if (strcmp(s->recheap[min]->merge_key, s->recheap[p]->merge_key) < 0)
410 struct record *tmp = s->recheap[min];
411 s->recheap[min] = s->recheap[p];
421 // Like pop_record but collapses identical (merge_key) records
422 // The heap will contain multiple independent matching records and possibly
423 // one cluster, created the last time the list was scanned
424 static struct record *pop_mrecord(struct session *s)
429 if (!(this = pop_record(s)))
432 // Collapse identical records
433 while ((next = top_record(s)))
435 struct record *p, *tmpnext;
436 if (strcmp(this->merge_key, next->merge_key))
438 // Absorb record (and clustersiblings) into a supercluster
439 for (p = next; p; p = tmpnext) {
440 tmpnext = p->next_cluster;
441 p->next_cluster = this->next_cluster;
442 this->next_cluster = p;
450 // Reads records in sort order. Store records in top of heapspace until rewind is called.
451 static struct record *read_recheap(struct session *s)
453 struct record *r = pop_mrecord(s);
457 if (s->recheap_scratch < 0)
458 s->recheap_scratch = s->recheap_size;
459 s->recheap[--s->recheap_scratch] = r;
465 // Return records to heap after read
466 static void rewind_recheap(struct session *s)
468 while (s->recheap_scratch >= 0) {
469 push_record(s, s->recheap[s->recheap_scratch++]);
470 if (s->recheap_scratch >= s->recheap_size)
471 s->recheap_scratch = -1;
475 struct record *ingest_record(struct target *t, char *buf, int len)
477 struct session *s = t->session;
481 wrbuf_rewind(s->wrbuf);
482 yaz_marc_xml(s->yaz_marc, YAZ_MARC_LINE);
483 if (yaz_marc_decode_wrbuf(s->yaz_marc, buf, len, s->wrbuf) < 0)
485 yaz_log(YLOG_WARN, "Failed to decode MARC record");
488 wrbuf_putc(s->wrbuf, '\0');
489 recbuf = wrbuf_buf(s->wrbuf);
491 res = nmem_malloc(s->nmem, sizeof(struct record));
493 res->merge_key = extract_mergekey(s, recbuf);
496 res->buf = nmem_strdupn(s->nmem, recbuf, wrbuf_len(s->wrbuf));
498 res->next_cluster = 0;
499 res->target_offset = -1;
501 yaz_log(YLOG_DEBUG, "Key: %s", res->merge_key);
508 void ingest_records(struct target *t, Z_Records *r)
510 //struct session *s = t->session;
512 Z_NamePlusRecordList *rlist;
515 if (r->which != Z_Records_DBOSD)
517 rlist = r->u.databaseOrSurDiagnostics;
518 for (i = 0; i < rlist->num_records; i++)
520 Z_NamePlusRecord *npr = rlist->records[i];
525 if (npr->which != Z_NamePlusRecord_databaseRecord)
527 yaz_log(YLOG_WARN, "Unexpected record type, probably diagnostic");
530 e = npr->u.databaseRecord;
531 if (e->which != Z_External_octet)
533 yaz_log(YLOG_WARN, "Unexpected external branch, probably BER");
536 buf = (char*) e->u.octet_aligned->buf;
537 len = e->u.octet_aligned->len;
539 rec = ingest_record(t, buf, len);
542 yaz_log(YLOG_DEBUG, "Ingested a fooking record");
546 static void do_presentResponse(IOCHAN i, Z_APDU *a)
548 struct target *t = iochan_getdata(i);
549 Z_PresentResponse *r = a->u.presentResponse;
552 Z_Records *recs = r->records;
553 if (recs->which == Z_Records_NSD)
555 yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
556 t->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
561 yaz_log(YLOG_DEBUG, "Got Records!");
565 if (!*r->presentStatus && t->state != Error)
567 yaz_log(YLOG_DEBUG, "Good Present response");
568 t->records += *r->numberOfRecordsReturned;
569 ingest_records(t, r->records);
572 else if (*r->presentStatus)
574 yaz_log(YLOG_WARN, "Bad Present response");
579 static void handler(IOCHAN i, int event)
581 struct target *t = iochan_getdata(i);
582 struct session *s = t->session;
583 //static int waiting = 0;
585 if (t->state == No_connection) /* Start connection */
587 int res = cs_connect(t->link, t->addr);
589 t->state = Connecting;
590 if (!res) /* we are go */
591 iochan_setevent(i, EVENT_OUTPUT);
593 iochan_setflags(i, EVENT_OUTPUT);
596 yaz_log(YLOG_WARN|YLOG_ERRNO, "ERROR %s connect\n", t->hostport);
603 else if (t->state == Connecting && event & EVENT_OUTPUT)
606 socklen_t errlen = sizeof(errcode);
608 if (getsockopt(cs_fileno(t->link), SOL_SOCKET, SO_ERROR, &errcode,
609 &errlen) < 0 || errcode != 0)
618 yaz_log(YLOG_DEBUG, "Connect OK");
619 t->state = Connected;
623 else if (event & EVENT_INPUT)
625 int len = cs_get(t->link, &t->ibuf, &t->ibufsize);
643 if (t->requestid == s->requestid || t->state == Initializing)
647 odr_reset(t->odr_in);
648 odr_setbuf(t->odr_in, t->ibuf, len, 0);
649 if (!z_APDU(t->odr_in, &a, 0, 0))
656 yaz_log(YLOG_DEBUG, "Successfully decoded %d oct PDU", len);
659 case Z_APDU_initResponse:
660 do_initResponse(i, a);
662 case Z_APDU_searchResponse:
663 do_searchResponse(i, a);
665 case Z_APDU_presentResponse:
666 do_presentResponse(i, a);
669 yaz_log(YLOG_WARN, "Unexpected result from server");
675 // if (cs_more(t->link))
676 // iochan_setevent(i, EVENT_INPUT);
678 else // we throw away response and go to idle mode
681 /* if len==1 we do nothing but wait for more input */
684 else if (t->state == Connected) {
688 if (t->state == Idle)
690 if (t->requestid != s->requestid) {
693 else if (t->hits > 0 && t->records < global_parameters.toget &&
694 t->records < t->hits) {
700 int load_targets(struct session *s, const char *fn)
702 FILE *f = fopen(fn, "r");
704 struct target **target_p;
708 yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", fn);
712 target_p = &s->targets;
713 while (fgets(line, 255, f))
716 struct target *target;
719 if (strncmp(line, "target ", 7))
722 url[strlen(url) - 1] = '\0';
723 yaz_log(LOG_DEBUG, "Target: %s", url);
725 *target_p = target = xmalloc(sizeof(**target_p));
727 target_p = &target->next;
728 target->state = No_connection;
730 target->ibufsize = 0;
731 target->odr_in = odr_createmem(ODR_DECODE);
732 target->odr_out = odr_createmem(ODR_ENCODE);
736 target->requestid = -1;
738 target->diagnostic = 0;
739 strcpy(target->fullname, url);
740 if ((p = strchr(url, '/')))
743 strcpy(target->hostport, url);
746 strcpy(target->databases[0], p);
747 target->databases[1][0] = '\0';
751 strcpy(target->hostport, url);
752 strcpy(target->databases[0], "Default");
753 target->databases[1][0] = '\0';
756 if (!(target->link = cs_create(tcpip_type, 0, PROTO_Z3950)))
758 yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
761 if (!(target->addr = cs_straddr(target->link, target->hostport)))
763 printf("ERROR %s bad-address", target->hostport);
764 target->state = Failed;
767 new = iochan_create(cs_fileno(target->link), handler, 0);
768 iochan_setdata(new, target);
769 iochan_setevent(new, EVENT_EXCEPT);
770 new->next = channel_list;
778 void search(struct session *s, char *query)
781 int live_channels = 0;
783 yaz_log(YLOG_DEBUG, "Search");
785 // Determine what iochans belong to this session
786 // It might have been better to have a list of them
788 strcpy(s->query, query);
791 for (c = channel_list; c; c = c->next)
795 if (iochan_getfun(c) != handler) // Not a Z target
797 t = iochan_getdata(c);
804 if (t->state == Error)
807 if (t->state == Idle)
808 iochan_setflag(c, EVENT_OUTPUT);
815 int maxrecs = live_channels * global_parameters.toget;
816 if (!s->recheap_size)
818 s->recheap = xmalloc(maxrecs * sizeof(struct record *));
819 s->recheap_size = maxrecs;
821 else if (s->recheap_size < maxrecs)
823 s->recheap = xrealloc(s->recheap, maxrecs * sizeof(struct record*));
824 s->recheap_size = maxrecs;
828 s->recheap_scratch = -1;
831 struct session *new_session()
833 struct session *session = xmalloc(sizeof(*session));
835 yaz_log(YLOG_DEBUG, "New pazpar2 session");
837 session->requestid = -1;
838 session->targets = 0;
839 session->pqf_parser = yaz_pqf_create();
840 session->query[0] = '\0';
841 session->nmem = nmem_create();
842 session->yaz_marc = yaz_marc_create();
843 yaz_marc_subfield_str(session->yaz_marc, "\t");
844 session->wrbuf = wrbuf_alloc();
845 session->recheap = 0;
846 session->recheap_size = 0;
851 void session_destroy(struct session *s)
853 // FIXME do some shit here!!!!
856 struct hitsbytarget *hitsbytarget(struct session *s, int *count)
858 static struct hitsbytarget res[1000]; // FIXME MM
862 for (c = channel_list; c; c = c->next)
863 if (iochan_getfun(c) == handler)
865 struct target *t = iochan_getdata(c);
868 strcpy(res[*count].id, t->hostport);
869 res[*count].hits = t->hits;
870 res[*count].records = t->records;
871 res[*count].diagnostic = t->diagnostic;
872 res[*count].state = state_strings[(int) t->state];
880 struct record **show(struct session *s, int start, int *num)
882 struct record **recs = nmem_malloc(s->nmem, *num * sizeof(struct record *));
885 // FIXME -- skip initial records
887 for (i = 0; i < *num; i++)
889 recs[i] = read_recheap(s);
900 void statistics(struct session *s, struct statistics *stat)
905 bzero(stat, sizeof(*stat));
906 for (i = 0, c = channel_list; c; i++, c = c->next)
909 if (iochan_getfun(c) != handler)
911 t = iochan_getdata(c);
914 case No_connection: stat->num_no_connection++; break;
915 case Connecting: stat->num_connecting++; break;
916 case Initializing: stat->num_initializing++; break;
917 case Searching: stat->num_searching++; break;
918 case Presenting: stat->num_presenting++; break;
919 case Idle: stat->num_idle++; break;
920 case Failed: stat->num_failed++; break;
921 case Error: stat->num_error++; break;
926 stat->num_connections = i;
929 int main(int argc, char **argv)
934 if (signal(SIGPIPE, SIG_IGN) < 0)
935 yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
937 yaz_log_init(YLOG_DEFAULT_LEVEL|YLOG_DEBUG, "pazpar2", 0);
939 while ((ret = options("c:h:", argv, argc, &arg)) != -2)
945 command_init(atoi(arg));
948 http_init(atoi(arg));
951 fprintf(stderr, "Usage: pazpar2 -d comport");
957 event_loop(&channel_list);
965 * indent-tabs-mode: nil
967 * vim: shiftwidth=4 tabstop=8 expandtab