1 /* This file is part of the YAZ toolkit.
2 * Copyright (C) Index Data
3 * See the file LICENSE for details.
6 * \file zoom-memcached.c
7 * \brief Implements query/record caching using memcached
18 #include <yaz/yaz-util.h>
19 #include <yaz/xmalloc.h>
21 #include <yaz/diagbib1.h>
23 void ZOOM_memcached_init(ZOOM_connection c)
31 c->expire_search = 600;
32 c->expire_record = 1200;
35 void ZOOM_memcached_destroy(ZOOM_connection c)
39 memcached_free(c->mc_st);
43 redisFree(c->redis_c);
48 static memcached_st *create_memcached(const char *conf,
49 int *expire_search, int *expire_record)
53 memcached_st *mc = memcached_create(0);
54 NMEM nmem = nmem_create();
55 memcached_return_t rc;
57 nmem_strsplit_blank(nmem, conf, &darray, &num);
58 for (i = 0; mc && i < num; i++)
60 if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
62 char *host = darray[i] + 9;
63 char *port = strchr(host, ':');
64 char *weight = strstr(host, "/?");
72 rc = memcached_server_add(mc, host, port ? atoi(port) : 11211);
73 yaz_log(YLOG_LOG, "memcached_server_add host=%s rc=%u %s",
74 host, (unsigned) rc, memcached_strerror(mc, rc));
75 if (rc != MEMCACHED_SUCCESS)
81 else if (!yaz_strncasecmp(darray[i], "--EXPIRE=", 9))
83 *expire_search = atoi(darray[i] + 9);
84 *expire_record = 600 + *expire_search;
99 static redisContext *create_redis(const char *conf,
100 int *expire_search, int *expire_record)
104 NMEM nmem = nmem_create();
105 redisContext *context = 0;
107 nmem_strsplit_blank(nmem, conf, &darray, &num);
108 for (i = 0; i < num; i++)
110 if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
112 struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
113 char *host = darray[i] + 9;
114 char *port = strchr(host, ':');
117 context = redisConnectWithTimeout(host,
118 port ? atoi(port) : 6379,
121 else if (!yaz_strncasecmp(darray[i], "--EXPIRE=", 9))
123 *expire_search = atoi(darray[i] + 9);
124 *expire_record = 600 + *expire_search;
132 int ZOOM_memcached_configure(ZOOM_connection c)
138 redisFree(c->redis_c);
142 #if HAVE_LIBMEMCACHED
145 memcached_free(c->mc_st);
150 val = ZOOM_options_get(c->options, "redis");
154 c->redis_c = create_redis(val,
155 &c->expire_search, &c->expire_record);
156 if (c->redis_c == 0 || c->redis_c->err)
158 ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
159 "could not create redis");
162 return 0; /* don't bother with memcached if redis is enabled */
164 ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
168 val = ZOOM_options_get(c->options, "memcached");
171 #if HAVE_LIBMEMCACHED
172 c->mc_st = create_memcached(val, &c->expire_search, &c->expire_record);
175 ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
176 "could not create memcached");
179 memcached_behavior_set(c->mc_st, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
181 ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
188 static void wrbuf_vary_puts(WRBUF w, const char *v)
194 wrbuf_sha1_puts(w, v, 1);
203 void ZOOM_memcached_resultset(ZOOM_resultset r, ZOOM_query q)
205 ZOOM_connection c = r->connection;
207 r->mc_key = wrbuf_alloc();
208 wrbuf_puts(r->mc_key, "1;");
209 wrbuf_vary_puts(r->mc_key, c->host_port);
210 wrbuf_puts(r->mc_key, ";");
211 wrbuf_vary_puts(r->mc_key, ZOOM_resultset_option_get(r, "extraArgs"));
212 wrbuf_puts(r->mc_key, ";");
213 wrbuf_vary_puts(r->mc_key, c->user);
214 wrbuf_puts(r->mc_key, ";");
215 wrbuf_vary_puts(r->mc_key, c->group);
216 wrbuf_puts(r->mc_key, ";");
218 wrbuf_sha1_puts(r->mc_key, c->password, 1);
219 wrbuf_puts(r->mc_key, ";");
221 WRBUF w = wrbuf_alloc();
222 ZOOM_query_get_hash(q, w);
223 wrbuf_sha1_puts(r->mc_key, wrbuf_cstr(w), 1);
226 wrbuf_puts(r->mc_key, ";");
227 wrbuf_vary_puts(r->mc_key, r->req_facets);
230 void ZOOM_memcached_search(ZOOM_connection c, ZOOM_resultset resultset)
233 if (c->redis_c && resultset->live_set == 0)
239 argv[1] = wrbuf_cstr(resultset->mc_key);
241 reply = redisCommandArgv(c->redis_c, 2, argv, 0);
242 /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
243 if (reply && reply->type == REDIS_REPLY_STRING)
245 char *v = reply->str;
246 int v_len = reply->len;
248 size_t lead_len = strlen(v) + 1;
250 resultset->size = odr_atoi(v);
252 yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
253 wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
255 if (v_len > lead_len)
257 Z_OtherInformation *oi = 0;
258 int oi_len = v_len - lead_len;
259 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
260 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
262 yaz_log(YLOG_WARN, "oi decoding failed");
263 freeReplyObject(reply);
266 ZOOM_handle_search_result(c, resultset, oi);
267 ZOOM_handle_facet_result(c, resultset, oi);
269 event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
270 ZOOM_connection_put_event(c, event);
271 resultset->live_set = 1;
274 freeReplyObject(reply);
277 #if HAVE_LIBMEMCACHED
278 if (c->mc_st && resultset->live_set == 0)
282 memcached_return_t rc;
283 char *v = memcached_get(c->mc_st, wrbuf_buf(resultset->mc_key),
284 wrbuf_len(resultset->mc_key),
285 &v_len, &flags, &rc);
286 /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
290 size_t lead_len = strlen(v) + 1;
292 resultset->size = odr_atoi(v);
294 yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
295 wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
297 if (v_len > lead_len)
299 Z_OtherInformation *oi = 0;
300 int oi_len = v_len - lead_len;
301 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
302 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
304 yaz_log(YLOG_WARN, "oi decoding failed");
308 ZOOM_handle_search_result(c, resultset, oi);
309 ZOOM_handle_facet_result(c, resultset, oi);
312 event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
313 ZOOM_connection_put_event(c, event);
314 resultset->live_set = 1;
321 static void expire_redis(redisContext *redis_c,
322 const char *buf, size_t len, int exp)
329 sprintf(key_val, "%d", exp);
336 argvlen[2] = strlen(key_val);
337 reply = redisCommandArgv(redis_c, 3, argv, argvlen);
338 freeReplyObject(reply);
342 void ZOOM_memcached_hitcount(ZOOM_connection c, ZOOM_resultset resultset,
343 Z_OtherInformation *oi, const char *precision)
346 if (c->redis_c && resultset->live_set == 0)
349 ODR odr = odr_createmem(ODR_ENCODE);
354 str = odr_malloc(odr, 20 + strlen(precision));
355 /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
356 sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
359 z_OtherInformation(odr, &oi, 0, 0);
360 oi_buf = odr_getbuf(odr, &oi_len, 0);
362 key = odr_malloc(odr, strlen(str) + 1 + oi_len);
365 memcpy(key + strlen(str) + 1, oi_buf, oi_len);
373 argv[1] = wrbuf_buf(resultset->mc_key);
374 argvlen[1] = wrbuf_len(resultset->mc_key);
376 argvlen[2] = strlen(str) + 1 + oi_len;
377 reply = redisCommandArgv(c->redis_c, 3, argv, argvlen);
378 freeReplyObject(reply);
380 expire_redis(c->redis_c,
381 wrbuf_buf(resultset->mc_key),
382 wrbuf_len(resultset->mc_key),
387 #if HAVE_LIBMEMCACHED
388 if (c->mc_st && resultset->live_set == 0)
391 memcached_return_t rc;
393 ODR odr = odr_createmem(ODR_ENCODE);
398 str = odr_malloc(odr, 20 + strlen(precision));
399 /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
400 sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
403 z_OtherInformation(odr, &oi, 0, 0);
404 oi_buf = odr_getbuf(odr, &oi_len, 0);
406 key = odr_malloc(odr, strlen(str) + 1 + oi_len);
409 memcpy(key + strlen(str) + 1, oi_buf, oi_len);
411 rc = memcached_set(c->mc_st,
412 wrbuf_buf(resultset->mc_key),
413 wrbuf_len(resultset->mc_key),
414 key, strlen(str) + 1 + oi_len,
415 c->expire_search, flags);
416 yaz_log(YLOG_LOG, "Store hit count key=%s value=%s oi_len=%d rc=%u %s",
417 wrbuf_cstr(resultset->mc_key), str, oi_len, (unsigned) rc,
418 memcached_strerror(c->mc_st, rc));
424 void ZOOM_memcached_add(ZOOM_resultset r, Z_NamePlusRecord *npr,
426 const char *syntax, const char *elementSetName,
428 Z_SRW_diagnostic *diag)
431 if (r->connection->redis_c &&
432 !diag && npr->which == Z_NamePlusRecord_databaseRecord)
434 WRBUF k = wrbuf_alloc();
435 WRBUF rec_sha1 = wrbuf_alloc();
436 ODR odr = odr_createmem(ODR_ENCODE);
443 z_NamePlusRecord(odr, &npr, 0, 0);
444 rec_buf = odr_getbuf(odr, &rec_len, 0);
446 wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
447 wrbuf_printf(k, ";%d;%s;%s;%s", pos,
448 syntax ? syntax : "",
449 elementSetName ? elementSetName : "",
450 schema ? schema : "");
452 wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
456 argv[1] = wrbuf_buf(k);
457 argvlen[1] = wrbuf_len(k);
458 argv[2] = wrbuf_buf(rec_sha1);
459 argvlen[2] = wrbuf_len(rec_sha1);
461 reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
462 yaz_log(YLOG_LOG, "Store record key=%s val=%s",
463 wrbuf_cstr(k), wrbuf_cstr(rec_sha1));
464 freeReplyObject(reply);
466 expire_redis(r->connection->redis_c, argv[1], argvlen[1],
467 r->connection->expire_search);
469 argv[1] = wrbuf_buf(rec_sha1);
470 argvlen[1] = wrbuf_len(rec_sha1);
472 argvlen[2] = rec_len;
474 reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
475 yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d",
476 wrbuf_cstr(rec_sha1), rec_len);
477 freeReplyObject(reply);
479 expire_redis(r->connection->redis_c, argv[1], argvlen[1],
480 r->connection->expire_record);
484 wrbuf_destroy(rec_sha1);
487 #if HAVE_LIBMEMCACHED
488 if (r->connection->mc_st &&
489 !diag && npr->which == Z_NamePlusRecord_databaseRecord)
491 WRBUF k = wrbuf_alloc();
492 WRBUF rec_sha1 = wrbuf_alloc();
494 memcached_return_t rc;
495 ODR odr = odr_createmem(ODR_ENCODE);
499 z_NamePlusRecord(odr, &npr, 0, 0);
500 rec_buf = odr_getbuf(odr, &rec_len, 0);
502 wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
503 wrbuf_printf(k, ";%d;%s;%s;%s", pos,
504 syntax ? syntax : "",
505 elementSetName ? elementSetName : "",
506 schema ? schema : "");
508 wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
510 rc = memcached_set(r->connection->mc_st,
511 wrbuf_buf(k), wrbuf_len(k),
512 wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
513 r->connection->expire_search, flags);
515 yaz_log(YLOG_LOG, "Store record key=%s val=%s rc=%u %s",
516 wrbuf_cstr(k), wrbuf_cstr(rec_sha1), (unsigned) rc,
517 memcached_strerror(r->connection->mc_st, rc));
519 rc = memcached_add(r->connection->mc_st,
520 wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
522 r->connection->expire_record, flags);
524 yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d rc=%u %s",
525 wrbuf_cstr(rec_sha1), rec_len, (unsigned) rc,
526 memcached_strerror(r->connection->mc_st, rc));
530 wrbuf_destroy(rec_sha1);
535 Z_NamePlusRecord *ZOOM_memcached_lookup(ZOOM_resultset r, int pos,
537 const char *elementSetName,
541 if (r->connection && r->connection->redis_c)
543 WRBUF k = wrbuf_alloc();
548 wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
549 wrbuf_printf(k, ";%d;%s;%s;%s", pos,
550 syntax ? syntax : "",
551 elementSetName ? elementSetName : "",
552 schema ? schema : "");
554 yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
557 argv[1] = wrbuf_buf(k);
558 argvlen[1] = wrbuf_len(k);
559 reply1 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
562 if (reply1 && reply1->type == REDIS_REPLY_STRING)
565 char *sha1_buf = reply1->str;
566 int sha1_len = reply1->len;
568 yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
573 argvlen[1] = sha1_len;
575 reply2 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
576 if (reply2 && reply2->type == REDIS_REPLY_STRING)
578 Z_NamePlusRecord *npr = 0;
579 char *v_buf = reply2->str;
580 int v_len = reply2->len;
582 odr_setbuf(r->odr, v_buf, v_len, 0);
583 z_NamePlusRecord(r->odr, &npr, 0, 0);
585 yaz_log(YLOG_LOG, "returned redis copy");
586 freeReplyObject(reply2);
587 freeReplyObject(reply1);
590 freeReplyObject(reply2);
592 freeReplyObject(reply1);
595 #if HAVE_LIBMEMCACHED
596 if (r->connection && r->connection->mc_st)
598 WRBUF k = wrbuf_alloc();
602 memcached_return_t rc;
604 wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
605 wrbuf_printf(k, ";%d;%s;%s;%s", pos,
606 syntax ? syntax : "",
607 elementSetName ? elementSetName : "",
608 schema ? schema : "");
610 yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
611 sha1_buf = memcached_get(r->connection->mc_st,
612 wrbuf_buf(k), wrbuf_len(k),
613 &sha1_len, &flags, &rc);
621 yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
622 v_buf = memcached_get(r->connection->mc_st, sha1_buf, sha1_len,
623 &v_len, &flags, &rc);
627 Z_NamePlusRecord *npr = 0;
629 odr_setbuf(r->odr, v_buf, v_len, 0);
630 z_NamePlusRecord(r->odr, &npr, 0, 0);
633 yaz_log(YLOG_LOG, "returned memcached copy");
645 * c-file-style: "Stroustrup"
646 * indent-tabs-mode: nil
648 * vim: shiftwidth=4 tabstop=8 expandtab