Hash based cache
[tedtools.git] / tbtree.c
1 /*
2  * Copyright (c) 2005 Teodor Sigaev <teodor@sigaev.ru>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *      notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *      notice, this list of conditions and the following disclaimer in the
12  *      documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the author nor the names of any co-contributors
14  *      may be used to endorse or promote products derived from this software
15  *      without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL CONTRIBUTORS BE LIABLE FOR ANY
21  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
23  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
25  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
27  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <fcntl.h>
32 #include <unistd.h>
33 #include <string.h>
34 #include <errno.h>
35 #include <sys/types.h>
36 #include <sys/stat.h>
37 #include <sys/file.h>
38
39 #include "tmalloc.h"
40 #include "tlog.h"
41
42 #include "tbtree.h"
43
44 #define NOTFOUND        0
45 #define FOUND           1
46
47 static int TBTReadPage(TBTree *db, u_int32_t pagenumber, TBTMemPage **page) ;
48 static int TBTWritePage(TBTree *db, TBTMemPage *page) ;
49 static int TBTNewPage(TBTree *db, TBTMemPage **page) ;
50
51
52
53 int 
54 TBTOpen(TBTree *db, char *file) {
55         off_t offset;
56
57         if ( db->readonly ) {
58                 db->fd = open(file, O_RDONLY);
59                 if ( db->fd < 0 ) {
60                         tlog(TL_CRIT,"TBTOpen: open failed: %s",strerror(errno));
61                         return TBT_ERROR;
62                 }
63                 if ( flock( db->fd, LOCK_SH ) < 0 ) {
64                         tlog(TL_CRIT,"TBTOpen: flock failed: %s",strerror(errno));
65                         close(db->fd);
66                         return TBT_ERROR;
67                 }
68         } else {
69                 db->fd = open(file, O_CREAT | O_RDWR, 0666);
70                 if ( db->fd < 0 ) {
71                         tlog(TL_CRIT,"TBTOpen: open failed: %s",strerror(errno));
72                         return TBT_ERROR;
73                 }
74                 if ( flock( db->fd, LOCK_EX ) < 0 ) {
75                         tlog(TL_CRIT,"TBTOpen: flock failed: %s",strerror(errno));
76                         close(db->fd);
77                         return TBT_ERROR;
78                 }
79         }
80
81         if ( db->npage ) {
82                 db->Cache=(TBTMemPage**)t0malloc( sizeof(TBTMemPage*)*HASHSIZE(db->npage) );
83                 db->curpage=0;
84         }
85
86         db->cachehits = db->pageread = db->pagewrite = 0;
87
88         if ( (offset=lseek(db->fd, 0, SEEK_END)) < 0 ) {
89                 tlog(TL_CRIT,"TBTOpen: lseek failed: %s",strerror(errno));
90                 flock( db->fd, LOCK_UN );
91                 close(db->fd);
92                 return TBT_ERROR;
93         }
94
95         if ( offset==0 ) {
96                 if ( db->readonly ) {
97                         tlog(TL_CRIT,"TBTOpen: file is void");
98                         flock( db->fd, LOCK_UN );
99                         close(db->fd);
100                         return TBT_ERROR;
101                 } else {
102                         TBTMemPage *page;
103                         if ( TBTNewPage(db, &page) != TBT_OK ) {
104                                 flock( db->fd, LOCK_UN );
105                                 close(db->fd);
106                                 return TBT_ERROR;
107                         }
108                         page->page.isleaf=1;
109                         if ( TBTWritePage(db, page)!=TBT_OK ){
110                                 flock( db->fd, LOCK_UN );
111                                 close(db->fd);
112                                 return TBT_ERROR;
113                         }
114                 }
115         } 
116
117         db->lastpagenumber=0;                            
118         return TBT_OK; 
119 }
120
121 int
122 TBTClose(TBTree *db) {
123         int rc=0;
124
125         if ( !db->readonly ) 
126                 rc=TBTSync(db);
127
128         if ( db->npage ) {
129                 u_int32_t i;
130                 TBTMemPage *ptr,*tmp;
131
132                 for(i=0; i<HASHSIZE(db->npage); i++) {
133                         ptr = db->Cache[i];
134                         while(ptr) {
135                                 tmp = ptr->link;
136                                 tfree(ptr);
137                                 ptr=tmp;
138                         }
139                 }
140
141                 tfree( db->Cache );
142         }
143         
144         flock( db->fd, LOCK_UN );
145         close(db->fd);  
146         
147         return (rc) ? TBT_ERROR : TBT_OK;
148 }
149
150 static int
151 cmpNPage(const void *a, const void *b) {
152         tassert( (*(TBTMemPage**)a)->pagenumber != (*(TBTMemPage**)b)->pagenumber );
153         return ( (*(TBTMemPage**)a)->pagenumber > (*(TBTMemPage**)b)->pagenumber ) ? 1 : -1;
154 }
155
156
157 int
158 TBTSync(TBTree *db) {
159         int rc=0;
160
161         if ( db->readonly || db->npage==0 )
162                 return TBT_OK;
163
164         if ( db->npage ) {
165                 u_int32_t i, total=0;
166                 TBTMemPage *ptr, **data = (TBTMemPage**)tmalloc(db->npage*sizeof(TBTMemPage*)), **pptr;
167                 pptr=data;
168                 for(i=0; i<HASHSIZE(db->npage); i++) {
169                         ptr = db->Cache[i];
170                         while(ptr) {
171                                 if ( !ptr->issynced ) {
172                                         *pptr = ptr;
173                                         pptr++;
174                                 } 
175                                 ptr = ptr->link;
176                         }
177                 }
178                 
179                 total=pptr-data;
180                 if ( total > 0 ) {
181                         if ( total>1 )
182                                 qsort( data, total, sizeof(TBTMemPage*), cmpNPage);
183
184                         pptr=data;
185                         while( pptr-data<total) {
186                                 rc |= TBTWritePage(db, *pptr);
187                                 pptr++;
188                         }
189                 }
190                 tfree(data);
191         }
192         
193         if ( fsync(db->fd) ) {
194                 tlog(TL_CRIT,"TBTSync: fsync failed: %s",strerror(errno));
195                 rc=TBT_ERROR;
196         }
197
198         return (rc) ? TBT_ERROR : TBT_OK;
199 }
200
201 static TBTMemPage *
202 findInCache(TBTree *db, u_int32_t pagenumber) {
203         TBTMemPage *ptr;
204
205         ptr = db->Cache[ pagenumber % HASHSIZE(db->npage) ];
206
207         while(ptr && ptr->pagenumber != pagenumber)
208                 ptr = ptr->link; 
209
210         return ptr;;
211 }
212
213 static void
214 removeFromCache(TBTree *db, u_int32_t pagenumber) {
215         TBTMemPage *ptr, *prev=NULL;
216         int pos = pagenumber % HASHSIZE(db->npage);
217
218         ptr = db->Cache[ pos ];
219
220         while(ptr) {
221                 if ( ptr->pagenumber == pagenumber ) {
222                         if ( prev )
223                                 prev->link = ptr->link;
224                         else
225                                 db->Cache[ pos ] = ptr->link;
226                         break;
227                 }
228                 prev=ptr;
229                 ptr = ptr->link;
230         }
231 }
232
233 static void
234 insertInCache(TBTree *db, TBTMemPage *page) {
235         int pos = page->pagenumber % HASHSIZE(db->npage);
236
237         page->link = db->Cache[ pos ];
238         db->Cache[ pos ] = page;
239 }
240
241 static TBTMemPage*
242 PageAlloc(TBTree *db) {
243         TBTMemPage *page;
244
245         if ( db->curpage < db->npage ) {
246                 page = (TBTMemPage*)tmalloc(sizeof(TBTMemPage));
247                 memset( page, 0, TBTMEMPAGEHDRSZ );
248                 page->iscached=1;
249                 if ( db->curpage == 0 ) {
250                         db->TimeCache = db->TimeCacheLast = page;       
251                 } else {
252                         db->TimeCacheLast->next = page;
253                         page->prev = db->TimeCacheLast;
254                         db->TimeCacheLast = page;
255                 }
256                 db->curpage++;
257         } else if ( db->npage == 0 ) {
258                 page = (TBTMemPage*)tmalloc(sizeof(TBTMemPage));
259                 memset( page, 0, TBTMEMPAGEHDRSZ );
260         } else {
261                 page = db->TimeCache;
262                 while( page->next && page->islocked ) 
263                         page = page->next;
264                 if ( page==db->TimeCacheLast && page->islocked ) { /*all pages locked*/
265                         /* tlog(TL_WARN,"TBTReadPage: all pages in cache locked, increase number of cached pages"); */ 
266                         page = (TBTMemPage*)tmalloc(sizeof(TBTMemPage));
267                         memset( page, 0, TBTMEMPAGEHDRSZ );
268                 } else {
269                         if ( !page->issynced )
270                                 if ( TBTWritePage(db, page) )
271                                         return NULL;
272                         removeFromCache(db, page->pagenumber);
273                         if ( page != db->TimeCacheLast ) {
274                                 if ( page == db->TimeCache ) {
275                                         db->TimeCache = page->next;
276                                         db->TimeCache->prev = NULL;
277                                 } else { 
278                                         page->prev->next = page->next;
279                                         page->next->prev = page->prev;
280                                 }
281                                 memset( page, 0, TBTMEMPAGEHDRSZ );
282                                 db->TimeCacheLast->next = page;
283                                 page->prev = db->TimeCacheLast;
284                                 db->TimeCacheLast = page;
285                         } else 
286                                 memset( page, 0, TBTMEMPAGEHDRSZ );
287                         page->iscached=1;
288                 }
289         }
290
291         return page;
292 }
293
294 static int
295 TBTReadPage(TBTree *db, u_int32_t pagenumber, TBTMemPage **page) {
296         int rc;
297
298         if ( db->npage && (*page=findInCache(db, pagenumber))!=NULL ) {
299                 db->cachehits++;
300                 if ( *page != db->TimeCacheLast ) {
301                         if ( (*page) == db->TimeCache ) {
302                                 db->TimeCache = (*page)->next;
303                                 db->TimeCache->prev=NULL;
304                         } else { 
305                                 (*page)->prev->next = (*page)->next;
306                                 (*page)->next->prev = (*page)->prev;
307                         }
308                         db->TimeCacheLast->next = (*page);
309                         (*page)->prev = db->TimeCacheLast;
310                         db->TimeCacheLast = *page;
311                         (*page)->next = NULL;
312                 }
313         } else {
314                 off_t  offset = (off_t)pagenumber * (off_t)TBTREEPAGESIZE;
315         
316                 if( (*page = PageAlloc(db))==NULL )
317                         return TBT_ERROR;
318
319                 if ( lseek(db->fd, offset, SEEK_SET) != offset ) {
320                         tlog(TL_CRIT,"TBTReadPage: lseek failed: %s",strerror(errno));
321                         return TBT_ERROR;
322                 }
323                 (*page)->issynced=1;
324                 (*page)->pagenumber=pagenumber;
325                 if ( (rc=read(db->fd, &( (*page)->page ), TBTREEPAGESIZE)) != TBTREEPAGESIZE ) {
326                         tlog(TL_CRIT,"TBTReadPage: read failed: %s %d %d",strerror(errno), pagenumber, offset); 
327                         return TBT_ERROR;
328                 }
329                 if ( (*page)->iscached ) 
330                         insertInCache(db, *page);
331         }
332
333         db->pageread++;
334         return TBT_OK;   
335 }
336
337 static int
338 TBTWritePage(TBTree *db, TBTMemPage *page) {
339         off_t  offset = (off_t)(page->pagenumber) * (off_t)TBTREEPAGESIZE;
340         off_t  size;
341
342         if ( page->issynced )
343                 return TBT_OK;
344
345         if ( lseek(db->fd, offset, SEEK_SET) != offset ) {
346                 tlog(TL_CRIT,"TBTWritePage: lseek failed: %s",strerror(errno));
347                 return TBT_ERROR;
348         }
349
350         if ( (size=write(db->fd, &( page->page ), TBTREEPAGESIZE)) != TBTREEPAGESIZE ) {
351                 tlog(TL_CRIT,"TBTWritePage: write failed: %s (writed only %d bytes)",strerror(errno), size);
352                 return TBT_ERROR;
353         }
354
355         page->issynced=1;
356         db->pagewrite++;
357
358         return TBT_OK;
359 }
360
361 static int
362 TBTNewPage(TBTree *db, TBTMemPage **page) {
363         if ( db->lastpagenumber==0 ) {
364                 off_t  offset;
365                 if ( (offset=lseek(db->fd, 0, SEEK_END)) < 0 ) {
366                         tlog(TL_CRIT,"TBTNewPage: lseek failed: %s",strerror(errno));
367                         return TBT_ERROR;
368                 }
369
370                 if ( offset % TBTREEPAGESIZE != 0 ) {
371                         tlog(TL_CRIT,"TBTNewPage: file corrupted");
372                         return TBT_ERROR;
373                 }
374
375                 db->lastpagenumber = offset/TBTREEPAGESIZE;
376         }
377
378         if( (*page = PageAlloc(db))==NULL )
379                 return TBT_ERROR;
380
381         (*page)->pagenumber = db->lastpagenumber;
382         (*page)->issynced=0;
383         (*page)->islocked=1;
384
385         db->lastpagenumber++;
386
387         memset( &((*page)->page), 0, TBTMEMPAGEHDRSZ);
388         (*page)->page.freespace = TBTREEPAGESIZE-TBTPAGEHDRSZ;
389
390         if ( (*page)->iscached ) 
391                 insertInCache(db, *page);
392         
393         return TBT_OK;
394 }
395
396 static void
397 printValue(u_int32_t len, char *ptr) {
398         putchar('\'');
399         while(len>0) {
400                 putchar(*ptr);
401                 ptr++; len--;
402         }
403         putchar('\'');
404 }
405
406 static void
407 dumpPage(TBTree *db, TBTPage *page, int follow) {
408         u_int32_t i,j;
409
410         for(j=0;j<follow;j++)
411                 putchar(' ');
412         printf("Page: f:%d l:%d r:%d n:%d\n", 
413                 page->freespace,
414                 page->isleaf,
415                 page->rightlink,
416                 page->npointer
417         );      
418         for(i=0;i<page->npointer;i++) {
419                 TBTPointer *ptr= (TBTPointer*)(page->data+TBTPOINTERSIZE(db)*i);
420                 for(j=0;j<follow+2;j++)
421                         putchar(' ');
422                 printf("i:%d(%d) kl:%d ko:%d ",
423                         i, (int)ptr, 
424                         (db->keylen) ? db->keylen : ptr->key.varlen.length,
425                         (db->keylen) ? 0 : ptr->key.varlen.offset
426                 );
427                 if ( db->keylen==0 ) 
428                         printValue(ptr->key.varlen.length, page->data + ptr->key.varlen.offset);
429                 else
430                         printf("'%d'", *(int*)(ptr->key.fixed.key));
431                 if ( page->isleaf ) {
432                         printf(" vl:%d vo:%d ", 
433                                 ptr->pointer.leaf.length,
434                                 ptr->pointer.leaf.offset
435                         );
436                         printValue(ptr->pointer.leaf.length,  page->data + ptr->pointer.leaf.offset );
437                         putchar('\n');
438                 } else {
439                         printf(" page:%d\n", ptr->pointer.internal.link);
440                         if ( follow>=0 )
441                                 dumpTree(db, ptr->pointer.internal.link, follow+4); 
442                 }
443         }
444 }
445
446 void
447 dumpTree(TBTree *db, u_int32_t pagenumber, int follow) {
448         TBTMemPage  *page;
449
450         if ( TBTReadPage(db, pagenumber, &page) ) {
451                 tlog(TL_CRIT,"BEDA %d\n", pagenumber);
452                 return;
453         }
454
455         page->islocked=1;
456         dumpPage(db, &(page->page), follow);
457         page->islocked=0;
458         if ( !page->iscached )
459                 tfree(page);
460 }
461
462 static int
463 findInPage(TBTree *db, TBTPage *page, TBTValue *key, TBTPointer **res) {
464         TBTPointer      *StopLow = (TBTPointer*)(page->data);
465         TBTPointer      *StopHigh = (TBTPointer*)(page->data + TBTPOINTERSIZE(db)*page->npointer);
466         TBTPointer      *StopMiddle;
467         int diff ;
468         TBTValue  A;
469
470         /* Loop invariant: StopLow <= StopMiddle < StopHigh */
471         while (StopLow < StopHigh) {
472                 StopMiddle = (TBTPointer*)(((char*)StopLow) + TBTPOINTERSIZE(db)*((((char*)StopHigh - (char*)StopLow)/TBTPOINTERSIZE(db)) >> 1));
473                 if ( db->keylen == 0 ) {
474                         A.length = StopMiddle->key.varlen.length;
475                         A.value = page->data + StopMiddle->key.varlen.offset;
476                 } else {
477                         A.length =  db->keylen;
478                         A.value = StopMiddle->key.fixed.key;
479                 }
480         
481                 if ( ISINFPOINTER(db, page, StopMiddle) )
482                         diff = 1;
483                 else 
484                         diff = db->cmpkey( &A, key );
485                 if ( diff == 0 ) {
486                         *res = StopMiddle;
487                         return FOUND;
488                 } else if ( diff < 0 )
489                         StopLow = (TBTPointer*)((char*)StopMiddle + TBTPOINTERSIZE(db));
490                 else
491                         StopHigh = StopMiddle;
492         }
493
494         *res = StopHigh;
495         return NOTFOUND;
496 }
497
498 static int
499 findPage(TBTree *db, TBTValue *key, TBTMemPage **page) {
500         u_int32_t pagenumber = TBTPAGEROOT;
501         int rc = TBT_OK;
502
503         *page=NULL;
504         while(1) {
505                 TBTPointer *ptr;
506
507                 if ( (rc=TBTReadPage(db, pagenumber, page))!=TBT_OK )
508                         return rc;
509
510                 if ( (*page)->page.isleaf ) 
511                         return TBT_OK;
512
513                 findInPage(db, &((*page)->page), key, &ptr );
514                 pagenumber = ptr->pointer.internal.link;
515                 if ( !(*page)->iscached )
516                         tfree(*page);
517         }
518
519         return TBT_OK;
520 }
521
522 int
523 TBTFind(TBTree *db, TBTValue *key, TBTValue *value) {
524         TBTMemPage      *page;
525         TBTPointer *ptr;
526         int rc;
527
528         memset(value, 0, sizeof(TBTValue));
529
530         if ( (rc=findPage(db, key, &page))!=TBT_OK )
531                 return rc; 
532                 
533         if ( page == NULL )
534                 return TBT_OK;
535
536         if ( findInPage(db, &(page->page), key, &ptr) != FOUND ) {
537                 if ( !(page)->iscached )
538                         tfree(page);
539                 return TBT_OK;
540         } 
541
542         value->length = ptr->pointer.leaf.length;
543         value->value = tmalloc(value->length);
544         memcpy( value->value, page->page.data + ptr->pointer.leaf.offset, value->length );
545
546         if ( !(page)->iscached )
547                 tfree(page);
548
549         return TBT_OK;  
550 }
551
552 static void
553 packLeafKV(TBTree *db, TBTPage *page, TBTPointer *ptr, TBTValue *key, TBTValue *value) {
554         if ( ptr ) {
555                 if ( (char*)ptr - page->data  != page->npointer * TBTPOINTERSIZE(db) ) 
556                         memmove((char*)ptr+TBTPOINTERSIZE(db), ptr, page->npointer * TBTPOINTERSIZE(db) - ( (char*)ptr - page->data ));
557         } else
558                 ptr = (TBTPointer*)(page->data + page->npointer * TBTPOINTERSIZE(db));
559
560         page->npointer++;
561         page->freespace -= TBTPOINTERSIZE(db) + PTRALIGN(value->length);
562         if ( db->keylen ) {
563                 memcpy( ptr->key.fixed.key, key->value, db->keylen );
564                 ptr->pointer.leaf.offset = page->npointer * TBTPOINTERSIZE(db) + page->freespace;
565         } else {
566                 page->freespace -= PTRALIGN(key->length);
567                 ptr->key.varlen.length = key->length;
568                 ptr->key.varlen.offset = page->npointer * TBTPOINTERSIZE(db) + page->freespace;
569                 memcpy( page->data + ptr->key.varlen.offset, key->value, key->length);
570                 ptr->pointer.leaf.offset = page->npointer * TBTPOINTERSIZE(db) + page->freespace + PTRALIGN(key->length);
571         } 
572         ptr->pointer.leaf.length = value->length;
573         memcpy( page->data + ptr->pointer.leaf.offset, value->value, value->length );
574 }
575
576 static void
577 packInternalKV(TBTree *db, TBTPage *page, TBTPointer *ptr, TBTValue *key, u_int32_t pagenumber) {
578         if ( ptr ) {
579                 if ( (char*)ptr - page->data  != page->npointer * TBTPOINTERSIZE(db) ) 
580                         memmove((char*)ptr+TBTPOINTERSIZE(db), ptr, page->npointer * TBTPOINTERSIZE(db) - ( (char*)ptr - page->data ));
581         } else
582                 ptr = (TBTPointer*)(page->data + page->npointer * TBTPOINTERSIZE(db));
583
584         page->npointer++;
585         page->freespace -= TBTPOINTERSIZE(db);
586
587         if ( db->keylen ) {
588                 if ( key->value )
589                         memcpy( ptr->key.fixed.key, key->value, db->keylen );
590                 else
591                         memset( ptr->key.fixed.key, 0, db->keylen );
592         } else {
593                 page->freespace -= PTRALIGN(key->length);
594                 ptr->key.varlen.length = key->length;
595                 ptr->key.varlen.offset = page->npointer * TBTPOINTERSIZE(db) + page->freespace;
596                 if ( key->length ) 
597                         memcpy( page->data + ptr->key.varlen.offset, key->value, key->length);
598         }
599         ptr->pointer.internal.link = pagenumber;
600 }
601
602
603 static void
604 deleteKV(TBTree *db, TBTPage *page, TBTPointer **ptrs, u_int32_t num) {
605         static char buf[TBTREEPAGESIZE];
606         char  *bufptr = buf;
607         TBTPointer *ptr = (TBTPointer*)page->data, *tmp, **ptrptr = ptrs;
608         u_int32_t i, start;
609         /* suppose ptrs are ordered! */
610         tmp = ptr;
611         while( ptrptr - ptrs < num && (char*)ptr - page->data < TBTPOINTERSIZE(db)*page->npointer ) {
612                 if ( *ptrptr < ptr ) {
613                         tlog(TL_CRIT,"deleteKV: Something wrong: unexistent *ptrs"); 
614                         ptrptr++;
615                 } else if ( *ptrptr > ptr ) {
616                         if ( tmp != ptr )
617                                 memcpy(tmp, ptr, TBTPOINTERSIZE(db));
618                         ptr = (TBTPointer*)( (char*)ptr + TBTPOINTERSIZE(db) );
619                         tmp = (TBTPointer*)( (char*)tmp + TBTPOINTERSIZE(db) );
620                 } else {
621                         page->freespace += TBTPOINTERSIZE(db) + 
622                                 ( ( page->isleaf ) ? PTRALIGN(ptr->pointer.leaf.length) : 0 ) +
623                                 ( ( db->keylen ) ? 0 : PTRALIGN(ptr->key.varlen.length) );
624                         ptrptr++;
625                         ptr = (TBTPointer*)( (char*)ptr + TBTPOINTERSIZE(db) );
626                 }
627         }
628
629         if ( (char*)ptr - page->data < TBTPOINTERSIZE(db)*page->npointer && tmp!=ptr )
630                 memmove( tmp, ptr, page->npointer * TBTPOINTERSIZE(db) - ((char*)ptr - page->data) );
631
632         page->npointer-=num;
633
634         tmp = (TBTPointer*)page->data;
635         start = page->npointer * TBTPOINTERSIZE(db) + page->freespace;
636         for(i=0;i<page->npointer;i++) {
637                 if ( page->isleaf ) {
638                         memcpy( bufptr, page->data + tmp->pointer.leaf.offset, tmp->pointer.leaf.length );
639                         tmp->pointer.leaf.offset = start + (bufptr-buf);
640                         bufptr+=PTRALIGN(tmp->pointer.leaf.length);
641                 }
642
643                 if ( db->keylen == 0 ) {
644                         if ( tmp->key.varlen.length )
645                                 memcpy( bufptr, page->data + tmp->key.varlen.offset, tmp->key.varlen.length );
646                         tmp->key.varlen.offset = start + (bufptr-buf);
647                         bufptr+=PTRALIGN(tmp->key.varlen.length);
648                 }
649                 tmp = (TBTPointer*)( (char*)tmp + TBTPOINTERSIZE(db) );
650         }
651
652         memcpy( page->data + start, buf, bufptr-buf );
653 }
654
655 static u_int32_t
656 findDelimiter(TBTree *db, TBTPage *page, TBTPointer *ptr, int size, u_int32_t start, int *where) {
657         int  *sizes = (int*) tmalloc( sizeof(int) * (page->npointer+1) );
658         int i;
659         TBTPointer *tomove;
660         int lfree=TBTREEPAGESIZE-TBTPAGEHDRSZ, rfree=TBTREEPAGESIZE-TBTPAGEHDRSZ;
661         int nptr = ( (char*)ptr - page->data ) / TBTPOINTERSIZE(db); 
662         int was=0;
663
664         for(i=0; i<page->npointer;i++) {
665                 tomove = (TBTPointer*)(page->data + i*TBTPOINTERSIZE(db));
666                 if ( was==0 && i==nptr ) {
667                         sizes[i] = size;
668                         was=1;
669                 } 
670                 sizes[i+was] = TBTPOINTERSIZE(db) +
671                         ( (page->isleaf) ? PTRALIGN(tomove->pointer.leaf.length) : 0 ) +
672                         ( ( db->keylen ) ? 0 : PTRALIGN(tomove->key.varlen.length) );
673         
674         }
675         
676         if ( was==0 )
677                 sizes[page->npointer]=size;
678
679         for(i=0;i<page->npointer+1;i++) {
680                 if ( i< start )
681                         lfree-=sizes[i];
682                 else 
683                         rfree-=sizes[i];
684         }
685
686         while( 1 ) {
687                 if ( lfree<0 ) {
688                         lfree+=sizes[start];
689                         rfree-=sizes[start];
690                         start--;
691                 } else if ( rfree < 0 ) { 
692                         lfree-=sizes[start];
693                         rfree+=sizes[start];
694                         start++;
695                 } else
696                         break;
697         }
698
699         *where = (nptr < start) ? -1 : 1;
700         if ( start>nptr )
701                 start--;
702
703         tfree(sizes);
704         return start;
705 }
706
707 static void
708 populatePages(TBTree *db, TBTPage *srcpage, TBTPage* newpage, u_int32_t start) {
709         u_int32_t i;
710         TBTPointer *tomove, **todelete = tmalloc( sizeof(TBTPointer*)* srcpage->npointer );
711         u_int32_t numtodelete=0;
712         TBTValue k, v;
713
714         for(i=start;i<srcpage->npointer;i++) {
715                 tomove = (TBTPointer*)(srcpage->data + i*TBTPOINTERSIZE(db));
716
717                 todelete[numtodelete] = tomove;
718                 numtodelete++;
719
720                 if ( db->keylen ) 
721                         k.value = tomove->key.fixed.key;
722                 else {
723                         k.length = tomove->key.varlen.length;
724                         k.value = srcpage->data + tomove->key.varlen.offset;
725                 } 
726                                 
727                 if ( srcpage->isleaf ) {
728                         v.length = tomove->pointer.leaf.length;
729                         v.value = srcpage->data + tomove->pointer.leaf.offset;
730                         packLeafKV(db, newpage, NULL, &k, &v);
731                 } else 
732                         packInternalKV(db, newpage, NULL, &k, tomove->pointer.internal.link);
733         }
734
735         deleteKV(db, srcpage, todelete, numtodelete);   
736         tfree(todelete);
737 }
738
739 static int
740 splitPage(TBTree *db, TBTMemPage *srcpage, TBTMemPage** newpage, TBTPointer **ptr, int size, TBTValue *key, TBTValue *value, u_int32_t pagenumber) {
741         TBTMemPage *tmp;
742         int start=0, where=0;
743         int nptr = ( (char*)(*ptr) - srcpage->page.data ) / TBTPOINTERSIZE(db);
744
745         if ( TBTNewPage(db, newpage) )
746                 return TBT_ERROR;
747
748         tmp = *newpage;
749         srcpage->issynced=tmp->issynced=0;
750         tmp->islocked=1;
751         tmp->page.rightlink = srcpage->page.rightlink;
752         srcpage->page.rightlink = tmp->pagenumber;
753         tmp->page.isleaf = srcpage->page.isleaf;
754
755         switch(db->strategy) {
756                 case TBT_STATEGY_HALF:
757                         start = findDelimiter(db, &(srcpage->page), *ptr, size, (srcpage->page.npointer+1)/2, &where);  
758                         break;
759                 case TBT_STATEGY_LEFTFILL:
760                         start = findDelimiter(db, &(srcpage->page), *ptr, size, srcpage->page.npointer, &where);        
761                         break;
762                 case TBT_STATEGY_RIGHTFILL:
763                         start = findDelimiter(db, &(srcpage->page), *ptr, size, 0, &where);     
764                         break;
765                 default:
766                         tlog(TL_CRIT,"splitPage: unknown strategy: %d", db->strategy);
767                         return TBT_ERROR;
768         }
769
770         populatePages(db, &(srcpage->page), &(tmp->page), start);
771         if ( where<0 )
772                 tmp=srcpage;
773         else
774                 nptr-=start;
775         *ptr = (TBTPointer*)(tmp->page.data + nptr*TBTPOINTERSIZE(db));
776
777         if ( tmp->page.isleaf ) 
778                 packLeafKV(db, &(tmp->page), *ptr, key, value);
779         else
780                 packInternalKV(db, &(tmp->page), *ptr, key, pagenumber);
781
782         return TBT_OK;
783 }
784
785 static int
786 savePage(TBTree *db, TBTMemPage *page) {
787         int rc;
788         page->islocked=0;
789
790         if ( !page->iscached ) {
791                 if (  page->issynced==0 && (rc=TBTWritePage(db, page)) != TBT_OK )
792                         return rc;
793                 tfree(page);
794         }
795         return TBT_OK;
796 }
797
798 static int
799 layerInsert(TBTree *db, u_int32_t pagenumber, TBTMemPage **left, TBTMemPage **right, TBTValue *key, TBTValue *value) {
800         int rc,fnd;
801         TBTPointer      *ptr=NULL;
802         TBTMemPage      *page=NULL, *newright=NULL;
803                         
804         if ( (rc=TBTReadPage(db, pagenumber, &page))!=TBT_OK )
805                 return rc;
806
807         page->islocked=1;
808
809         fnd = findInPage(db, &(page->page), key, &ptr );
810
811         *left=*right=NULL;
812         if ( page->page.isleaf ) {
813                 u_int32_t size = TBTPOINTERSIZE(db) +
814                         PTRALIGN(value->length) +
815                         ( ( db->keylen ) ? 0 : PTRALIGN(key->length) );
816                 
817                 if ( size>(TBTREEPAGESIZE-TBTPAGEHDRSZ)/2 ) {
818                         tlog(TL_ALARM,"Huge size: %d bytes", size);
819                         return TBT_HUGE;
820                 }
821
822                 if (fnd==FOUND) {
823                         deleteKV(db, &(page->page), &ptr, 1);
824                         page->issynced=0;
825                 }
826
827                 if ( size <= page->page.freespace ) {
828                         packLeafKV(db, &(page->page), ptr, key, value);
829                         page->issynced=0;
830                 } else {
831                         rc = splitPage(db, page, &newright, &ptr, size, key, value, 0);
832                         if ( rc != TBT_OK ) return rc;
833
834                         page->issynced = newright->issynced = 0;
835                         page->islocked = newright->islocked = 1;
836                 }
837         } else {
838                 if ( (rc=layerInsert(db, ptr->pointer.internal.link, left, right, key, value))!=TBT_OK ) {
839                         page->islocked=0;
840                         return rc;
841                 }
842
843                 if ( *left ) { /* child page was splited */
844                         u_int32_t size;
845                         TBTPointer *rightptr = (TBTPointer*)( (*left)->page.data + ((*left)->page.npointer-1) * TBTPOINTERSIZE(db) );
846                         TBTValue K;
847
848                         if ( ISINFPOINTER(db, &(page->page), ptr) ) {
849                                 deleteKV(db, &(page->page), &ptr, 1);
850                                 K.length = 0; K.value = NULL;
851                                 packInternalKV(db, &(page->page), ptr, &K, (*right)->pagenumber);
852                         } else 
853                                 ptr->pointer.internal.link=(*right)->pagenumber;
854
855                         K.length = ( db->keylen ) ? db->keylen : rightptr->key.varlen.length;
856                         K.value  = ( db->keylen ) ? rightptr->key.fixed.key : (*left)->page.data + rightptr->key.varlen.offset;
857
858                         size = TBTPOINTERSIZE(db) + ( ( db->keylen ) ? 0 : PTRALIGN(K.length) );
859
860                         if ( size <= page->page.freespace ) {
861                                 packInternalKV(db, &(page->page), ptr, &K, (*left)->pagenumber);
862                                 page->issynced=0;
863                         } else {
864                                 rc = splitPage(db, page, &newright, &ptr, size, &K, NULL, (*left)->pagenumber);
865                                 if ( rc != TBT_OK ) return rc;
866
867                                 page->issynced = newright->issynced = 0;
868                                 page->islocked = newright->islocked = 1;
869                         }
870                 }
871         }
872
873         if ( *left ) {
874                 if ( (rc=savePage(db, *left))!=TBT_OK )
875                         return rc;
876                 if ( (rc=savePage(db, *right))!=TBT_OK )
877                         return rc;
878                 *left=*right=NULL;
879         }
880
881         if ( newright ) {
882                 if ( page->pagenumber==TBTPAGEROOT ) { /* root was splited */
883                         TBTMemPage  *newleft;
884                         TBTValue K;
885                 
886
887                         if  ( (rc=TBTNewPage(db,&newleft))!=TBT_OK )
888                                 return rc;
889                         memcpy( &(newleft->page) , &(page->page), TBTREEPAGESIZE);
890                         memset( &(page->page), 0, TBTREEPAGESIZE);
891                         page->page.freespace=TBTREEPAGESIZE-TBTPAGEHDRSZ;
892         
893                         ptr = (TBTPointer*)( newleft->page.data + (newleft->page.npointer-1) * TBTPOINTERSIZE(db) ); 
894                         K.length = ( db->keylen ) ? db->keylen : ptr->key.varlen.length;
895                         K.value  = ( db->keylen ) ? ptr->key.fixed.key : newleft->page.data + ptr->key.varlen.offset;
896                         packInternalKV(db, &(page->page), NULL, &K, newleft->pagenumber);
897                 
898                         ptr = (TBTPointer*)( newright->page.data + (newright->page.npointer-1) * TBTPOINTERSIZE(db) ); 
899                         K.length = 0;
900                         K.value  = NULL;
901                         packInternalKV(db, &(page->page), NULL, &K, newright->pagenumber);
902
903                         if ( (rc=savePage(db, newright))!=TBT_OK )
904                                 return rc;
905                         if ( (rc=savePage(db, newleft))!=TBT_OK )
906                                 return rc;
907                         if ( (rc=savePage(db, page))!=TBT_OK )
908                                 return rc;
909                 } else {
910                         *left = page;
911                         *right = newright;
912                 }
913         } else if ( (rc=savePage(db, page))!=TBT_OK )
914                 return rc;
915         
916
917         return TBT_OK;
918
919
920 int
921 TBTInsert(TBTree *db, TBTValue *key, TBTValue *value) {
922         int rc;
923         TBTMemPage *lpage=NULL, *rpage=NULL;
924
925         rc = layerInsert(db, TBTPAGEROOT, &lpage, &rpage, key, value);
926
927         tassert( lpage==NULL && rpage==NULL );
928
929         return rc;
930 }
931
932 int
933 TBTDelete(TBTree *db, TBTValue *key) {
934         TBTMemPage      *page;
935         TBTPointer *ptr;
936         int rc;
937
938         if ( (rc=findPage(db, key, &page))!=TBT_OK )
939                 return rc; 
940                 
941         if ( page == NULL )
942                 return TBT_OK;
943
944         if ( findInPage(db, &(page->page), key, &ptr) != FOUND ) {
945                 if ( !(page)->iscached )
946                         tfree(page);
947                 return TBT_OK;
948         } 
949
950         deleteKV(db, &(page->page), &ptr, 1);
951         page->issynced=0;
952
953         rc=savePage(db, page);
954
955         return rc;      
956 }
957
958 int 
959 TBTInitIterator(TBTree *db, TBTIterator *iterator) {
960         TBTMemPage      *page=NULL;
961         TBTPointer *ptr;
962         int rc;
963         u_int32_t pagenum=TBTPAGEROOT;
964
965         memset(iterator, 0, sizeof(TBTIterator));
966
967         do {
968                 if ( page && !page->iscached )
969                         tfree(page);
970                 if ( (rc=TBTReadPage(db, pagenum, &page))!=TBT_OK )
971                         return rc;
972
973                 ptr = (TBTPointer*)( page->page.data );
974                 pagenum = ptr->pointer.internal.link;
975         } while( !page->page.isleaf );
976
977         page->islocked=1;
978         iterator->page = page;  
979         iterator->ptr = ptr;    
980
981         return TBT_OK;
982 }
983
984 void 
985 TBTFreeIterator(TBTree *db, TBTIterator *iterator) {
986         
987         if ( iterator->page ) {
988                 iterator->page->islocked=0;
989                 if ( !iterator->page->iscached )
990                         tfree(iterator->page);
991         }
992
993         memset(iterator, 0, sizeof(TBTIterator));
994 }
995
996 int 
997 TBTIterate(TBTree *db, TBTIterator *iterator, TBTValue *key, TBTValue *value ) {
998         int rc;
999
1000         if (  iterator->page==NULL ) {
1001                 memset(key, 0, sizeof(TBTValue));
1002                 memset(value, 0, sizeof(TBTValue));
1003                 return TBT_OK;
1004         }
1005
1006         if ( (char*)(iterator->ptr) - iterator->page->page.data >= TBTPOINTERSIZE(db) * iterator->page->page.npointer ) {
1007                 do {
1008                         u_int32_t pagenumber = iterator->page->page.rightlink;
1009                         iterator->page->islocked=0;
1010                         if ( !iterator->page->iscached )
1011                                 tfree(iterator->page);
1012                         iterator->page=NULL;
1013
1014                         if ( pagenumber==TBTPAGEROOT ) {
1015                                 memset(key, 0, sizeof(TBTValue));
1016                                 memset(value, 0, sizeof(TBTValue));
1017                                 return TBT_OK;
1018                         }
1019                         if ( (rc=TBTReadPage(db, pagenumber, &(iterator->page)))!=TBT_OK )
1020                                 return rc;
1021                 } while ( iterator->page->page.npointer == 0 );
1022
1023                 iterator->page->islocked=1;
1024                 iterator->ptr = (TBTPointer*)( iterator->page->page.data );
1025         }
1026
1027         if ( db->keylen ) {
1028                 key->length = db->keylen;
1029                 key->value = iterator->ptr->key.fixed.key;
1030         } else {
1031                 key->length = iterator->ptr->key.varlen.length; 
1032                 key->value = iterator->page->page.data + iterator->ptr->key.varlen.offset;
1033         }
1034
1035         value->length = iterator->ptr->pointer.leaf.length; 
1036         value->value = iterator->page->page.data + iterator->ptr->pointer.leaf.offset;
1037
1038         iterator->ptr = ( TBTPointer* ) (  (char*)(iterator->ptr) + TBTPOINTERSIZE(db) );
1039
1040         return TBT_OK;
1041 }
1042
1043