1 /* This file is part of the Zebra server.
2 Copyright (C) 1995-2008 Index Data
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 #include <yaz/xmalloc.h>
24 #include <idzebra/isamb.h>
32 #define ISAMB_MAJOR_VERSION 3
33 #define ISAMB_MINOR_VERSION_NO_ROOT 0
34 #define ISAMB_MINOR_VERSION_WITH_ROOT 1
46 /* if 1, upper nodes items are encoded; 0 if not encoded */
49 /* maximum size of encoded buffer */
50 #define DST_ITEM_MAX 256
52 #define ISAMB_MAX_LEVEL 10
53 /* approx 2*max page + max size of item */
54 #define DST_BUF_SIZE (2*4096+300)
56 /* should be maximum block size of multiple thereof */
57 #define ISAMB_CACHE_ENTRY_SIZE 4096
59 /* CAT_MAX: _must_ be power of 2 */
61 #define CAT_MASK (CAT_MAX-1)
62 /* CAT_NO: <= CAT_MAX */
65 /* Smallest block size */
66 #define ISAMB_MIN_SIZE 32
68 #define ISAMB_FAC_SIZE 4
70 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
71 #define ISAMB_PTR_CODEC 1
73 struct ISAMB_cache_entry {
78 struct ISAMB_cache_entry *next;
84 struct ISAMB_head head;
85 struct ISAMB_cache_entry *cache_entries;
92 struct ISAMB_file *file;
94 int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
95 int log_io; /* log level for bf_read/bf_write calls */
96 int log_freelist; /* log level for freelist handling */
97 zint skipped_numbers; /* on a leaf node */
98 zint returned_numbers;
99 zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
100 zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
101 zint number_of_int_splits;
102 zint number_of_leaf_splits;
103 int enable_int_count; /* whether we count nodes (or not) */
104 int cache_size; /* size of blocks to cache (if cache=1) */
117 zint no_items; /* number of nodes in this + children */
121 void *decodeClientData;
129 int maxlevel; /* total depth */
132 zint skipped_numbers; /* on a leaf node */
133 zint returned_numbers;
134 zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
135 zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
136 struct ISAMB_block **block;
137 int scope; /* on what level we forward */
141 #define encode_item_len encode_ptr
143 static void encode_ptr(char **dst, zint pos)
145 unsigned char *bp = (unsigned char*) *dst;
149 *bp++ = (unsigned char) (128 | (pos & 127));
152 *bp++ = (unsigned char) pos;
156 static void encode_ptr(char **dst, zint pos)
158 memcpy(*dst, &pos, sizeof(pos));
159 (*dst) += sizeof(pos);
163 #define decode_item_len decode_ptr
165 static void decode_ptr(const char **src, zint *pos)
171 while (((c = *(const unsigned char *)((*src)++)) & 128))
173 d += ((zint) (c & 127) << r);
176 d += ((zint) c << r);
180 static void decode_ptr(const char **src, zint *pos)
182 memcpy(pos, *src, sizeof(*pos));
183 (*src) += sizeof(*pos);
188 void isamb_set_int_count(ISAMB b, int v)
190 b->enable_int_count = v;
193 void isamb_set_cache_size(ISAMB b, int v)
198 ISAMB isamb_open2(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
199 int cache, int no_cat, int *sizes, int use_root_ptr)
201 ISAMB isamb = xmalloc(sizeof(*isamb));
204 assert(no_cat <= CAT_MAX);
207 isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
208 memcpy(isamb->method, method, sizeof(*method));
209 isamb->no_cat = no_cat;
211 isamb->log_freelist = 0;
212 isamb->cache = cache;
213 isamb->skipped_numbers = 0;
214 isamb->returned_numbers = 0;
215 isamb->number_of_int_splits = 0;
216 isamb->number_of_leaf_splits = 0;
217 isamb->enable_int_count = 1;
218 isamb->cache_size = 40;
221 isamb->minor_version = ISAMB_MINOR_VERSION_WITH_ROOT;
223 isamb->minor_version = ISAMB_MINOR_VERSION_NO_ROOT;
227 for (i = 0; i<ISAMB_MAX_LEVEL; i++)
228 isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
232 yaz_log(YLOG_WARN, "isamb_open %s. Degraded TEST mode", name);
236 assert(cache == 0 || cache == 1);
238 isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
240 for (i = 0; i < isamb->no_cat; i++)
242 isamb->file[i].bf = 0;
243 isamb->file[i].head_dirty = 0;
244 isamb->file[i].cache_entries = 0;
247 for (i = 0; i < isamb->no_cat; i++)
249 char fname[DST_BUF_SIZE];
250 char hbuf[DST_BUF_SIZE];
252 sprintf(fname, "%s%c", name, i+'A');
254 isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
257 isamb->file[i].bf = bf_open(bfs, fname, sizes[i], writeflag);
259 if (!isamb->file[i].bf)
265 /* fill-in default values (for empty isamb) */
266 isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/sizes[i]+1;
267 isamb->file[i].head.last_block = isamb->file[i].head.first_block;
268 isamb->file[i].head.block_size = sizes[i];
269 assert(sizes[i] <= ISAMB_CACHE_ENTRY_SIZE);
271 if (i == isamb->no_cat-1 || sizes[i] > 128)
272 isamb->file[i].head.block_offset = 8;
274 isamb->file[i].head.block_offset = 4;
276 isamb->file[i].head.block_offset = 11;
278 isamb->file[i].head.block_max =
279 sizes[i] - isamb->file[i].head.block_offset;
280 isamb->file[i].head.free_list = 0;
281 if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
283 /* got header assume "isamb"major minor len can fit in 16 bytes */
285 int major, minor, len, pos = 0;
288 if (memcmp(hbuf, "isamb", 5))
290 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
294 if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
296 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
300 if (major != ISAMB_MAJOR_VERSION)
302 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
303 fname, major, ISAMB_MAJOR_VERSION);
307 for (left = len - sizes[i]; left > 0; left = left - sizes[i])
310 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*sizes[i]))
312 yaz_log(YLOG_WARN, "truncated isamb header for "
313 "file=%s len=%d pos=%d",
320 decode_ptr(&src, &isamb->file[i].head.first_block);
321 decode_ptr(&src, &isamb->file[i].head.last_block);
322 decode_ptr(&src, &zint_tmp);
323 isamb->file[i].head.block_size = (int) zint_tmp;
324 decode_ptr(&src, &zint_tmp);
325 isamb->file[i].head.block_max = (int) zint_tmp;
326 decode_ptr(&src, &isamb->file[i].head.free_list);
327 if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
328 decode_ptr(&src, &isamb->root_ptr);
330 assert(isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
331 /* must rewrite the header if root ptr is in use (bug #1017) */
332 if (use_root_ptr && writeflag)
333 isamb->file[i].head_dirty = 1;
335 isamb->file[i].head_dirty = 0;
336 assert(isamb->file[i].head.block_size == sizes[i]);
339 yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
344 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
348 int i, b_size = ISAMB_MIN_SIZE;
350 for (i = 0; i<CAT_NO; i++)
353 b_size = b_size * ISAMB_FAC_SIZE;
355 return isamb_open2(bfs, name, writeflag, method, cache,
359 static void flush_blocks(ISAMB b, int cat)
361 while (b->file[cat].cache_entries)
363 struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
364 b->file[cat].cache_entries = ce_this->next;
368 yaz_log(b->log_io, "bf_write: flush_blocks");
369 bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
376 static int cache_block(ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
378 int cat = (int) (pos&CAT_MASK);
379 int off = (int) (((pos/CAT_MAX) &
380 (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
381 * b->file[cat].head.block_size);
382 zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
384 struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
389 assert(ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
390 for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
393 if ((*ce)->pos == norm)
396 *ce = (*ce)->next; /* remove from list */
398 ce_this->next = b->file[cat].cache_entries; /* move to front */
399 b->file[cat].cache_entries = ce_this;
403 memcpy(ce_this->buf + off, userbuf,
404 b->file[cat].head.block_size);
408 memcpy(userbuf, ce_this->buf + off,
409 b->file[cat].head.block_size);
413 if (no >= b->cache_size)
415 assert(ce_last && *ce_last);
417 *ce_last = 0; /* remove the last entry from list */
420 yaz_log(b->log_io, "bf_write: cache_block");
421 bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
426 ce_this = xmalloc(sizeof(*ce_this));
427 ce_this->next = b->file[cat].cache_entries;
428 b->file[cat].cache_entries = ce_this;
429 ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
431 yaz_log(b->log_io, "bf_read: cache_block");
432 if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
433 memset(ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
436 memcpy(ce_this->buf + off, userbuf, b->file[cat].head.block_size);
442 memcpy(userbuf, ce_this->buf + off, b->file[cat].head.block_size);
448 void isamb_close(ISAMB isamb)
451 for (i = 0; isamb->accessed_nodes[i]; i++)
452 yaz_log(YLOG_DEBUG, "isamb_close level leaf-%d: "ZINT_FORMAT" read, "
453 ZINT_FORMAT" skipped",
454 i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
455 yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
456 "skipped "ZINT_FORMAT,
457 isamb->skipped_numbers, isamb->returned_numbers);
459 for (i = 0; i<isamb->no_cat; i++)
461 flush_blocks(isamb, i);
462 if (isamb->file[i].head_dirty)
464 char hbuf[DST_BUF_SIZE];
465 int major = ISAMB_MAJOR_VERSION;
467 char *dst = hbuf + 16;
469 int b_size = isamb->file[i].head.block_size;
471 encode_ptr(&dst, isamb->file[i].head.first_block);
472 encode_ptr(&dst, isamb->file[i].head.last_block);
473 encode_ptr(&dst, isamb->file[i].head.block_size);
474 encode_ptr(&dst, isamb->file[i].head.block_max);
475 encode_ptr(&dst, isamb->file[i].head.free_list);
477 if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
478 encode_ptr(&dst, isamb->root_ptr);
480 memset(dst, '\0', b_size); /* ensure no random bytes are written */
484 /* print exactly 16 bytes (including trailing 0) */
485 sprintf(hbuf, "isamb%02d %02d %02d\r\n", major,
486 isamb->minor_version, len);
488 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
490 for (left = len - b_size; left > 0; left = left - b_size)
493 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
496 if (isamb->file[i].bf)
497 bf_close (isamb->file[i].bf);
500 xfree(isamb->method);
504 /* open_block: read one block at pos.
505 Decode leading sys bytes .. consisting of
507 0: leader byte, != 0 leaf, == 0, non-leaf
508 1-2: used size of block
509 3-7*: number of items and all children
511 * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
512 of items. We can thus have at most 2^40 nodes.
514 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
516 int cat = (int) (pos&CAT_MASK);
518 int offset = b->file[cat].head.block_offset;
519 struct ISAMB_block *p;
522 p = xmalloc(sizeof(*p));
524 p->cat = (int) (pos & CAT_MASK);
525 p->buf = xmalloc(b->file[cat].head.block_size);
528 if (!cache_block (b, pos, p->buf, 0))
530 yaz_log(b->log_io, "bf_read: open_block");
531 if (bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf) != 1)
533 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
534 (long) pos, (long) pos/CAT_MAX);
535 zebra_exit("isamb:open_block");
538 p->bytes = (char *)p->buf + offset;
540 p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
543 yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
546 assert(p->size >= 0);
547 src = (char*) p->buf + 3;
548 decode_ptr(&src, &p->no_items);
553 p->decodeClientData = (*b->method->codec.start)();
557 struct ISAMB_block *new_block(ISAMB b, int leaf, int cat)
559 struct ISAMB_block *p;
561 p = xmalloc(sizeof(*p));
562 p->buf = xmalloc(b->file[cat].head.block_size);
564 if (!b->file[cat].head.free_list)
567 block_no = b->file[cat].head.last_block++;
568 p->pos = block_no * CAT_MAX + cat;
572 p->pos = b->file[cat].head.free_list;
573 assert((p->pos & CAT_MASK) == cat);
574 if (!cache_block(b, p->pos, p->buf, 0))
576 yaz_log(b->log_io, "bf_read: new_block");
577 if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
579 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
580 (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
581 zebra_exit("isamb:new_block");
584 yaz_log(b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
585 cat, p->pos/CAT_MAX);
586 memcpy(&b->file[cat].head.free_list, p->buf, sizeof(zint));
589 b->file[cat].head_dirty = 1;
590 memset(p->buf, 0, b->file[cat].head.block_size);
591 p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
598 p->decodeClientData = (*b->method->codec.start)();
602 struct ISAMB_block *new_leaf(ISAMB b, int cat)
604 return new_block(b, 1, cat);
608 struct ISAMB_block *new_int(ISAMB b, int cat)
610 return new_block(b, 0, cat);
613 static void check_block(ISAMB b, struct ISAMB_block *p)
615 assert(b); /* mostly to make the compiler shut up about unused b */
623 char *startp = p->bytes;
624 const char *src = startp;
625 char *endp = p->bytes + p->size;
627 void *c1 = (*b->method->codec.start)();
629 decode_ptr(&src, &pos);
630 assert((pos&CAT_MASK) == p->cat);
634 char file_item_buf[DST_ITEM_MAX];
635 char *file_item = file_item_buf;
636 (*b->method->codec.reset)(c1);
637 (*b->method->codec.decode)(c1, &file_item, &src);
640 decode_item_len(&src, &item_len);
641 assert(item_len > 0 && item_len < 128);
644 decode_ptr(&src, &pos);
645 if ((pos&CAT_MASK) != p->cat)
647 assert((pos&CAT_MASK) == p->cat);
650 (*b->method->codec.stop)(c1);
654 void close_block(ISAMB b, struct ISAMB_block *p)
660 yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
661 p->pos, p->cat, p->pos/CAT_MAX);
662 memcpy(p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
663 b->file[p->cat].head.free_list = p->pos;
664 if (!cache_block(b, p->pos, p->buf, 1))
666 yaz_log(b->log_io, "bf_write: close_block (deleted)");
667 bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
672 int offset = b->file[p->cat].head.block_offset;
673 int size = p->size + offset;
674 char *dst = (char*)p->buf + 3;
675 assert(p->size >= 0);
677 /* memset becuase encode_ptr usually does not write all bytes */
678 memset(p->buf, 0, b->file[p->cat].head.block_offset);
680 p->buf[1] = size & 255;
681 p->buf[2] = size >> 8;
682 encode_ptr(&dst, p->no_items);
684 if (!cache_block(b, p->pos, p->buf, 1))
686 yaz_log(b->log_io, "bf_write: close_block");
687 bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
690 (*b->method->codec.stop)(p->decodeClientData);
695 int insert_sub(ISAMB b, struct ISAMB_block **p,
696 void *new_item, int *mode,
698 struct ISAMB_block **sp,
699 void *sub_item, int *sub_size,
700 const void *max_item);
702 int insert_int(ISAMB b, struct ISAMB_block *p, void *lookahead_item,
704 ISAMC_I *stream, struct ISAMB_block **sp,
705 void *split_item, int *split_size, const void *last_max_item)
707 char *startp = p->bytes;
708 const char *src = startp;
709 char *endp = p->bytes + p->size;
711 struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
712 char sub_item[DST_ITEM_MAX];
716 void *c1 = (*b->method->codec.start)();
720 assert(p->size >= 0);
721 decode_ptr(&src, &pos);
725 const char *src0 = src;
727 char file_item_buf[DST_ITEM_MAX];
728 char *file_item = file_item_buf;
729 (*b->method->codec.reset)(c1);
730 (*b->method->codec.decode)(c1, &file_item, &src);
731 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
734 sub_p1 = open_block(b, pos);
736 diff_terms -= sub_p1->no_items;
737 more = insert_sub(b, &sub_p1, lookahead_item, mode,
739 sub_item, &sub_size, file_item_buf);
740 diff_terms += sub_p1->no_items;
746 decode_item_len(&src, &item_len);
747 d = (*b->method->compare_item)(src, lookahead_item);
750 sub_p1 = open_block(b, pos);
752 diff_terms -= sub_p1->no_items;
753 more = insert_sub(b, &sub_p1, lookahead_item, mode,
755 sub_item, &sub_size, src);
756 diff_terms += sub_p1->no_items;
762 decode_ptr(&src, &pos);
766 /* we reached the end. So lookahead > last item */
767 sub_p1 = open_block(b, pos);
769 diff_terms -= sub_p1->no_items;
770 more = insert_sub(b, &sub_p1, lookahead_item, mode, stream, &sub_p2,
771 sub_item, &sub_size, last_max_item);
772 diff_terms += sub_p1->no_items;
775 diff_terms += sub_p2->no_items;
779 p->no_items += diff_terms;
783 /* there was a split - must insert pointer in this one */
784 char dst_buf[DST_BUF_SIZE];
787 const char *sub_item_ptr = sub_item;
789 assert(sub_size < 128 && sub_size > 1);
791 memcpy(dst, startp, src - startp);
796 (*b->method->codec.reset)(c1);
797 (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
799 encode_item_len(&dst, sub_size); /* sub length and item */
800 memcpy(dst, sub_item, sub_size);
804 encode_ptr(&dst, sub_p2->pos); /* pos */
806 if (endp - src) /* remaining data */
808 memcpy(dst, src, endp - src);
811 p->size = dst - dst_buf;
812 assert(p->size >= 0);
814 if (p->size <= b->file[p->cat].head.block_max)
816 /* it fits OK in this block */
817 memcpy(startp, dst_buf, dst - dst_buf);
819 close_block(b, sub_p2);
823 /* must split _this_ block as well .. */
824 struct ISAMB_block *sub_p3;
826 char file_item_buf[DST_ITEM_MAX];
827 char *file_item = file_item_buf;
831 zint no_items_first_half = 0;
837 b->number_of_int_splits++;
840 close_block(b, sub_p2);
842 half = src + b->file[p->cat].head.block_size/2;
843 decode_ptr(&src, &pos);
845 if (b->enable_int_count)
847 /* read sub block so we can get no_items for it */
848 sub_p3 = open_block(b, pos);
849 no_items_first_half += sub_p3->no_items;
850 close_block(b, sub_p3);
856 file_item = file_item_buf;
857 (*b->method->codec.reset)(c1);
858 (*b->method->codec.decode)(c1, &file_item, &src);
860 decode_item_len(&src, &split_size_tmp);
861 *split_size = (int) split_size_tmp;
864 decode_ptr(&src, &pos);
866 if (b->enable_int_count)
868 /* read sub block so we can get no_items for it */
869 sub_p3 = open_block(b, pos);
870 no_items_first_half += sub_p3->no_items;
871 close_block(b, sub_p3);
874 /* p is first half */
875 p_new_size = src - dst_buf;
876 memcpy(p->bytes, dst_buf, p_new_size);
879 file_item = file_item_buf;
880 (*b->method->codec.reset)(c1);
881 (*b->method->codec.decode)(c1, &file_item, &src);
882 *split_size = file_item - file_item_buf;
883 memcpy(split_item, file_item_buf, *split_size);
885 decode_item_len(&src, &split_size_tmp);
886 *split_size = (int) split_size_tmp;
887 memcpy(split_item, src, *split_size);
890 /* *sp is second half */
891 *sp = new_int(b, p->cat);
892 (*sp)->size = endp - src;
893 memcpy((*sp)->bytes, src, (*sp)->size);
895 p->size = p_new_size;
897 /* adjust no_items in first&second half */
898 (*sp)->no_items = p->no_items - no_items_first_half;
899 p->no_items = no_items_first_half;
903 close_block(b, sub_p1);
904 (*b->method->codec.stop)(c1);
908 int insert_leaf(ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
909 int *lookahead_mode, ISAMC_I *stream,
910 struct ISAMB_block **sp2,
911 void *sub_item, int *sub_size,
912 const void *max_item)
914 struct ISAMB_block *p = *sp1;
917 char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
919 void *c1 = (*b->method->codec.start)();
920 void *c2 = (*b->method->codec.start)();
922 int quater = b->file[b->no_cat-1].head.block_max / 4;
923 char *mid_cut = dst_buf + quater * 2;
924 char *tail_cut = dst_buf + quater * 3;
925 char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
928 char cut_item_buf[DST_ITEM_MAX];
929 int cut_item_size = 0;
930 int no_items = 0; /* number of items (total) */
931 int no_items_1 = 0; /* number of items (first half) */
932 int inserted_dst_bytes = 0;
936 char file_item_buf[DST_ITEM_MAX];
937 char *file_item = file_item_buf;
940 endp = p->bytes + p->size;
941 (*b->method->codec.decode)(c1, &file_item, &src);
944 const char *dst_item = 0; /* resulting item to be inserted */
945 char *lookahead_next;
950 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
952 /* d now holds comparison between existing file item and
955 d > 0: lookahead before file
956 d < 0: lookahead after file
960 /* lookahead must be inserted */
961 dst_item = lookahead_item;
962 /* if this is not an insertion, it's really bad .. */
963 if (!*lookahead_mode)
965 yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
966 assert(*lookahead_mode);
969 else if (d == 0 && *lookahead_mode == 2)
971 /* For mode == 2, we insert the new key anyway - even
972 though the comparison is 0. */
973 dst_item = lookahead_item;
977 dst_item = file_item_buf;
979 if (d == 0 && !*lookahead_mode)
981 /* it's a deletion and they match so there is nothing
982 to be inserted anyway .. But mark the thing dirty
983 (file item was part of input.. The item will not be
987 else if (!half1 && dst > mid_cut)
989 /* we have reached the splitting point for the first time */
990 const char *dst_item_0 = dst_item;
991 half1 = dst; /* candidate for splitting */
993 /* encode the resulting item */
994 (*b->method->codec.encode)(c2, &dst, &dst_item);
996 cut_item_size = dst_item - dst_item_0;
997 assert(cut_item_size > 0);
998 memcpy(cut_item_buf, dst_item_0, cut_item_size);
1001 no_items_1 = no_items;
1006 /* encode the resulting item */
1007 (*b->method->codec.encode)(c2, &dst, &dst_item);
1011 /* now move "pointers" .. result has been encoded .. */
1014 /* we must move the lookahead pointer */
1016 inserted_dst_bytes += (dst - dst_0);
1017 if (inserted_dst_bytes >= quater)
1018 /* no more room. Mark lookahead as "gone".. */
1022 /* move it really.. */
1023 lookahead_next = lookahead_item;
1024 if (!(*stream->read_item)(stream->clientData,
1028 /* end of stream reached: no "more" and no lookahead */
1032 if (lookahead_item && max_item &&
1033 (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1035 /* the lookahead goes beyond what we allow in this
1036 leaf. Mark it as "gone" */
1045 /* exact match .. move both pointers */
1047 lookahead_next = lookahead_item;
1048 if (!(*stream->read_item)(stream->clientData,
1049 &lookahead_next, lookahead_mode))
1055 break; /* end of file stream reached .. */
1056 file_item = file_item_buf; /* move file pointer */
1057 (*b->method->codec.decode)(c1, &file_item, &src);
1061 /* file pointer must be moved */
1064 file_item = file_item_buf;
1065 (*b->method->codec.decode)(c1, &file_item, &src);
1070 /* this loop runs when we are "appending" to a leaf page. That is
1071 either it's empty (new) or all file items have been read in
1074 maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
1075 while (lookahead_item)
1078 const char *src = lookahead_item;
1081 /* if we have a lookahead item, we stop if we exceed the value of it */
1083 (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1085 /* stop if we have reached the value of max item */
1088 if (!*lookahead_mode)
1090 /* this is append. So a delete is bad */
1091 yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1092 assert(*lookahead_mode);
1094 else if (!half1 && dst > tail_cut)
1096 const char *src_0 = src;
1097 half1 = dst; /* candidate for splitting */
1099 (*b->method->codec.encode)(c2, &dst, &src);
1101 cut_item_size = src - src_0;
1102 assert(cut_item_size > 0);
1103 memcpy(cut_item_buf, src_0, cut_item_size);
1105 no_items_1 = no_items;
1109 (*b->method->codec.encode)(c2, &dst, &src);
1119 dst_item = lookahead_item;
1120 if (!(*stream->read_item)(stream->clientData, &dst_item,
1127 new_size = dst - dst_buf;
1128 if (p && p->cat != b->no_cat-1 &&
1129 new_size > b->file[p->cat].head.block_max)
1131 /* non-btree block will be removed */
1134 /* delete it too!! */
1135 p = 0; /* make a new one anyway */
1138 { /* must create a new one */
1140 for (i = 0; i < b->no_cat; i++)
1141 if (new_size <= b->file[i].head.block_max)
1147 if (new_size > b->file[p->cat].head.block_max)
1150 const char *cut_item = cut_item_buf;
1155 assert(cut_item_size > 0);
1158 p->size = half1 - dst_buf;
1159 assert(p->size <= b->file[p->cat].head.block_max);
1160 memcpy(p->bytes, dst_buf, half1 - dst_buf);
1161 p->no_items = no_items_1;
1164 *sp2 = new_leaf(b, p->cat);
1166 (*b->method->codec.reset)(c2);
1168 b->number_of_leaf_splits++;
1170 first_dst = (*sp2)->bytes;
1172 (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1174 memcpy(first_dst, half2, dst - half2);
1176 (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1177 assert((*sp2)->size <= b->file[p->cat].head.block_max);
1178 (*sp2)->no_items = no_items - no_items_1;
1181 memcpy(sub_item, cut_item_buf, cut_item_size);
1182 *sub_size = cut_item_size;
1186 memcpy(p->bytes, dst_buf, dst - dst_buf);
1188 p->no_items = no_items;
1190 (*b->method->codec.stop)(c1);
1191 (*b->method->codec.stop)(c2);
1196 int insert_sub(ISAMB b, struct ISAMB_block **p, void *new_item,
1199 struct ISAMB_block **sp,
1200 void *sub_item, int *sub_size,
1201 const void *max_item)
1203 if (!*p || (*p)->leaf)
1204 return insert_leaf(b, p, new_item, mode, stream, sp, sub_item,
1205 sub_size, max_item);
1207 return insert_int(b, *p, new_item, mode, stream, sp, sub_item,
1208 sub_size, max_item);
1211 int isamb_unlink(ISAMB b, ISAM_P pos)
1213 struct ISAMB_block *p1;
1217 p1 = open_block(b, pos);
1222 const char *src = p1->bytes + p1->offset;
1224 void *c1 = (*b->method->codec.start)();
1226 decode_ptr(&src, &sub_p);
1227 isamb_unlink(b, sub_p);
1229 while (src != p1->bytes + p1->size)
1232 char file_item_buf[DST_ITEM_MAX];
1233 char *file_item = file_item_buf;
1234 (*b->method->codec.reset)(c1);
1235 (*b->method->codec.decode)(c1, &file_item, &src);
1238 decode_item_len(&src, &item_len);
1241 decode_ptr(&src, &sub_p);
1242 isamb_unlink(b, sub_p);
1245 (*b->method->codec.stop)(c1);
1252 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1254 char item_buf[DST_ITEM_MAX];
1258 int must_delete = 0;
1265 item_ptr = item_buf;
1267 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1272 item_ptr = item_buf;
1273 more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1276 struct ISAMB_block *p = 0, *sp = 0;
1277 char sub_item[DST_ITEM_MAX];
1281 p = open_block(b, *pos);
1282 more = insert_sub(b, &p, item_buf, &i_mode, stream, &sp,
1283 sub_item, &sub_size, 0);
1285 { /* increase level of tree by one */
1286 struct ISAMB_block *p2 = new_int(b, p->cat);
1287 char *dst = p2->bytes + p2->size;
1289 void *c1 = (*b->method->codec.start)();
1290 const char *sub_item_ptr = sub_item;
1293 encode_ptr(&dst, p->pos);
1294 assert(sub_size < 128 && sub_size > 1);
1296 (*b->method->codec.reset)(c1);
1297 (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1299 encode_item_len(&dst, sub_size);
1300 memcpy(dst, sub_item, sub_size);
1303 encode_ptr(&dst, sp->pos);
1305 p2->size = dst - p2->bytes;
1306 p2->no_items = p->no_items + sp->no_items;
1307 *pos = p2->pos; /* return new super page */
1311 (*b->method->codec.stop)(c1);
1316 *pos = p->pos; /* return current one (again) */
1318 if (p->no_items == 0)
1326 isamb_unlink(b, *pos);
1331 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1333 ISAMB_PP pp = xmalloc(sizeof(*pp));
1339 pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1346 pp->skipped_numbers = 0;
1347 pp->returned_numbers = 0;
1349 for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1350 pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1353 struct ISAMB_block *p = open_block(isamb, pos);
1354 const char *src = p->bytes + p->offset;
1355 pp->block[pp->level] = p;
1357 pp->total_size += p->size;
1361 decode_ptr(&src, &pos);
1362 p->offset = src - p->bytes;
1364 pp->accessed_nodes[pp->level]++;
1366 pp->block[pp->level+1] = 0;
1367 pp->maxlevel = pp->level;
1373 ISAMB_PP isamb_pp_open(ISAMB isamb, ISAM_P pos, int scope)
1375 return isamb_pp_open_x(isamb, pos, 0, scope);
1378 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1383 yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, "
1384 "skipped "ZINT_FORMAT,
1385 pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1386 for (i = pp->maxlevel; i>=0; i--)
1387 if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1388 yaz_log(YLOG_DEBUG, "isamb_pp_close level leaf-%d: "
1389 ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1390 pp->accessed_nodes[i], pp->skipped_nodes[i]);
1391 pp->isamb->skipped_numbers += pp->skipped_numbers;
1392 pp->isamb->returned_numbers += pp->returned_numbers;
1393 for (i = pp->maxlevel; i>=0; i--)
1395 pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1396 pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1399 *size = pp->total_size;
1401 *blocks = pp->no_blocks;
1402 for (i = 0; i <= pp->level; i++)
1403 close_block(pp->isamb, pp->block[i]);
1408 int isamb_block_info(ISAMB isamb, int cat)
1410 if (cat >= 0 && cat < isamb->no_cat)
1411 return isamb->file[cat].head.block_size;
1415 void isamb_pp_close(ISAMB_PP pp)
1417 isamb_pp_close_x(pp, 0, 0);
1420 /* simple recursive dumper .. */
1421 static void isamb_dump_r(ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1425 char prefix_str[1024];
1428 struct ISAMB_block *p = open_block(b, pos);
1429 sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1430 ZINT_FORMAT, level*2, "",
1431 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1434 sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1437 while (p->offset < p->size)
1439 const char *src = p->bytes + p->offset;
1441 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1442 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1443 p->offset = src - (char*) p->bytes;
1445 assert(p->offset == p->size);
1449 const char *src = p->bytes + p->offset;
1452 decode_ptr(&src, &sub);
1453 p->offset = src - (char*) p->bytes;
1455 isamb_dump_r(b, sub, pr, level+1);
1457 while (p->offset < p->size)
1460 char file_item_buf[DST_ITEM_MAX];
1461 char *file_item = file_item_buf;
1462 void *c1 = (*b->method->codec.start)();
1463 (*b->method->codec.decode)(c1, &file_item, &src);
1464 (*b->method->codec.stop)(c1);
1465 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1468 decode_item_len(&src, &item_len);
1469 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1472 decode_ptr(&src, &sub);
1474 p->offset = src - (char*) p->bytes;
1476 isamb_dump_r(b, sub, pr, level+1);
1483 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1485 isamb_dump_r(b, pos, pr, 0);
1488 int isamb_pp_read(ISAMB_PP pp, void *buf)
1490 return isamb_pp_forward(pp, buf, 0);
1494 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1495 { /* return an estimate of the current position and of the total number of */
1496 /* occureences in the isam tree, based on the current leaf */
1500 /* if end-of-stream PP may not be leaf */
1502 *total = (double) (pp->block[0]->no_items);
1503 *current = (double) pp->returned_numbers;
1505 yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1506 ZINT_FORMAT, *current, *total, pp->returned_numbers);
1510 int isamb_pp_forward(ISAMB_PP pp, void *buf, const void *untilb)
1514 struct ISAMB_block *p = pp->block[pp->level];
1515 ISAMB b = pp->isamb;
1519 while (p->offset == p->size)
1525 char file_item_buf[DST_ITEM_MAX];
1526 char *file_item = file_item_buf;
1530 while (p->offset == p->size)
1534 close_block(pp->isamb, pp->block[pp->level]);
1535 pp->block[pp->level] = 0;
1537 p = pp->block[pp->level];
1542 src = p->bytes + p->offset;
1545 c1 = (*b->method->codec.start)();
1546 (*b->method->codec.decode)(c1, &file_item, &src);
1548 decode_ptr(&src, &item_len);
1551 decode_ptr(&src, &pos);
1552 p->offset = src - (char*) p->bytes;
1554 src = p->bytes + p->offset;
1558 if (!untilb || p->offset == p->size)
1560 assert(p->offset < p->size);
1563 file_item = file_item_buf;
1564 (*b->method->codec.reset)(c1);
1565 (*b->method->codec.decode)(c1, &file_item, &src);
1566 if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1572 decode_item_len(&src, &item_len);
1573 if ((*b->method->compare_item)(untilb, src) < pp->scope)
1577 decode_ptr(&src, &pos);
1578 p->offset = src - (char*) p->bytes;
1585 pp->block[pp->level] = p = open_block(pp->isamb, pos);
1587 pp->total_size += p->size;
1595 src = p->bytes + p->offset;
1598 decode_ptr(&src, &pos);
1599 p->offset = src - (char*) p->bytes;
1601 if (!untilb || p->offset == p->size)
1603 assert(p->offset < p->size);
1606 file_item = file_item_buf;
1607 (*b->method->codec.reset)(c1);
1608 (*b->method->codec.decode)(c1, &file_item, &src);
1609 if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1615 decode_ptr(&src, &item_len);
1616 if ((*b->method->compare_item)(untilb, src) <= pp->scope)
1624 (*b->method->codec.stop)(c1);
1627 assert(p->offset < p->size);
1632 src = p->bytes + p->offset;
1633 (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1634 p->offset = src - (char*) p->bytes;
1635 if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) < pp->scope)
1638 if (p->offset == p->size) goto again;
1640 pp->returned_numbers++;
1644 zint isamb_get_int_splits(ISAMB b)
1646 return b->number_of_int_splits;
1649 zint isamb_get_leaf_splits(ISAMB b)
1651 return b->number_of_leaf_splits;
1654 zint isamb_get_root_ptr(ISAMB b)
1659 void isamb_set_root_ptr(ISAMB b, zint root_ptr)
1661 b->root_ptr = root_ptr;
1668 * indent-tabs-mode: nil
1670 * vim: shiftwidth=4 tabstop=8 expandtab