1422da0c4bd160996a3e66b39c3e738f45cbb7d9
[tedtools.git] / tbtree.c
1 /*
2  * Copyright (c) 2005 Teodor Sigaev <teodor@sigaev.ru>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *      notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *      notice, this list of conditions and the following disclaimer in the
12  *      documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the author nor the names of any co-contributors
14  *      may be used to endorse or promote products derived from this software
15  *      without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL CONTRIBUTORS BE LIABLE FOR ANY
21  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
23  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
25  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
27  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <fcntl.h>
32 #include <unistd.h>
33 #include <string.h>
34 #include <errno.h>
35 #include <sys/types.h>
36 #include <sys/stat.h>
37 #include <sys/file.h>
38
39 #include "tmalloc.h"
40 #include "tlog.h"
41
42 #include "tbtree.h"
43
44 #define NOTFOUND        0
45 #define FOUND           1
46
47 static int TBTReadPage(TBTree *db, u_int32_t pagenumber, TBTMemPage **page) ;
48 static int TBTWritePage(TBTree *db, TBTMemPage *page) ;
49 static int TBTNewPage(TBTree *db, TBTMemPage **page) ;
50
51
52
53 int 
54 TBTOpen(TBTree *db, char *file) {
55         off_t offset;
56
57         if ( db->readonly ) {
58                 db->fd = open(file, O_RDONLY);
59                 if ( db->fd < 0 ) {
60                         tlog(TL_CRIT,"TBTOpen: open failed: %s",strerror(errno));
61                         return TBT_ERROR;
62                 }
63                 if ( flock( db->fd, LOCK_SH ) < 0 ) {
64                         tlog(TL_CRIT,"TBTOpen: flock failed: %s",strerror(errno));
65                         close(db->fd);
66                         return TBT_ERROR;
67                 }
68         } else {
69                 db->fd = open(file, O_CREAT | O_RDWR, 0666);
70                 if ( db->fd < 0 ) {
71                         tlog(TL_CRIT,"TBTOpen: open failed: %s",strerror(errno));
72                         return TBT_ERROR;
73                 }
74                 if ( flock( db->fd, LOCK_EX ) < 0 ) {
75                         tlog(TL_CRIT,"TBTOpen: flock failed: %s",strerror(errno));
76                         close(db->fd);
77                         return TBT_ERROR;
78                 }
79         }
80
81         if ( db->npage ) {
82                 db->Cache=(TBTMemPage**)t0malloc( sizeof(TBTMemPage*)*db->npage );
83                 db->curpage=0;
84         }
85
86         db->cachehits = db->pageread = db->pagewrite = 0;
87
88         if ( (offset=lseek(db->fd, 0, SEEK_END)) < 0 ) {
89                 tlog(TL_CRIT,"TBTOpen: lseek failed: %s",strerror(errno));
90                 flock( db->fd, LOCK_UN );
91                 close(db->fd);
92                 return TBT_ERROR;
93         }
94
95         if ( offset==0 ) {
96                 if ( db->readonly ) {
97                         tlog(TL_CRIT,"TBTOpen: file is void");
98                         flock( db->fd, LOCK_UN );
99                         close(db->fd);
100                         return TBT_ERROR;
101                 } else {
102                         TBTMemPage *page;
103                         if ( TBTNewPage(db, &page) != TBT_OK ) {
104                                 flock( db->fd, LOCK_UN );
105                                 close(db->fd);
106                                 return TBT_ERROR;
107                         }
108                         page->page.isleaf=1;
109                         if ( TBTWritePage(db, page)!=TBT_OK ){
110                                 flock( db->fd, LOCK_UN );
111                                 close(db->fd);
112                                 return TBT_ERROR;
113                         }
114                 }
115         } 
116
117         db->lastpagenumber=0;                            
118         return TBT_OK; 
119 }
120
121 int
122 TBTClose(TBTree *db) {
123         int rc=0;
124
125         if ( !db->readonly ) 
126                 rc=TBTSync(db);
127
128         if ( db->npage ) {
129                 u_int32_t i;
130
131                 for(i=0; i<db->curpage; i++)
132                         tfree( db->Cache[i] );
133
134                 tfree( db->Cache );
135         }
136         
137         flock( db->fd, LOCK_UN );
138         close(db->fd);  
139         
140         return (rc) ? TBT_ERROR : TBT_OK;
141 }
142
143 int
144 TBTSync(TBTree *db) {
145         u_int32_t i;
146         int rc=0;
147
148         if ( db->readonly || db->npage==0 )
149                 return TBT_OK;
150  
151         for(i=0; i<db->curpage; i++)
152                 if ( !db->Cache[i]->issynced )
153                         rc |= TBTWritePage(db, db->Cache[i]);
154         
155         if ( fsync(db->fd) ) {
156                 tlog(TL_CRIT,"TBTSync: fsync failed: %s",strerror(errno));
157                 rc=1;
158         }
159
160         return (rc) ? TBT_ERROR : TBT_OK;
161 }
162
163 static int
164 findInCache(TBTree *db, u_int32_t pagenumber, int *pos, TBTMemPage **StopLow, TBTMemPage **StopHigh) {
165         TBTMemPage      **StopMiddle;
166
167         /* Loop invariant: StopLow <= StopMiddle < StopHigh */
168         while (StopLow < StopHigh) {
169                 StopMiddle = StopLow + ( ( StopHigh - StopLow ) >> 1 );
170         
171                 if ( (*StopMiddle)->pagenumber == pagenumber ) {
172                         *pos = StopMiddle - db->Cache;
173                         return FOUND;
174                 } else if ( (*StopMiddle)->pagenumber < pagenumber )
175                         StopLow = StopMiddle + 1;
176                 else
177                         StopHigh = StopMiddle;
178         }
179
180         *pos = StopHigh - db->Cache;
181         return NOTFOUND;
182 }
183
184 static TBTMemPage*
185 PageAlloc(TBTree *db, int *posincache) {
186         TBTMemPage *page;
187
188         *posincache = -1;
189         if ( db->curpage < db->npage ) {
190                 page = db->Cache[db->curpage] = (TBTMemPage*)tmalloc(sizeof(TBTMemPage));
191                 memset( page, 0, TBTMEMPAGEHDRSZ );
192                 page->iscached=1;
193                 if ( db->curpage == 0 ) {
194                         db->TimeCache = db->TimeCacheLast = page;       
195                 } else {
196                         db->TimeCacheLast->next = page;
197                         page->prev = db->TimeCacheLast;
198                         db->TimeCacheLast = page;
199                 }
200                 *posincache = db->curpage;
201                 db->curpage++;
202         } else if ( db->npage == 0 ) {
203                 page = (TBTMemPage*)tmalloc(sizeof(TBTMemPage));
204                 memset( page, 0, TBTMEMPAGEHDRSZ );
205         } else {
206                 page = db->TimeCache;
207                 while( page->next && page->islocked ) 
208                         page = page->next;
209                 if ( page==db->TimeCacheLast && page->islocked ) { /*all pages locked*/
210                         /*tlog(TL_WARN,"TBTReadPage: all pages in cache locked, increase number of cached pages");*/
211                         page = (TBTMemPage*)tmalloc(sizeof(TBTMemPage));
212                         memset( page, 0, TBTMEMPAGEHDRSZ );
213                 } else {
214                         if ( !page->issynced )
215                                 if ( TBTWritePage(db, page) )
216                                         return NULL;
217                         if( findInCache(db, page->pagenumber, posincache, db->Cache, db->Cache + db->curpage) != FOUND ) 
218                                 tlog(TL_CRIT,"PageAlloc: something wrong: no page with pagenumber %d in cache", page->pagenumber);
219                         memset( page, 0, TBTMEMPAGEHDRSZ );
220                         page->iscached=1;
221                         if ( page != db->TimeCacheLast ) {
222                                 if ( page->prev )
223                                         page->prev->next = page->next;
224                                 if ( page->next )
225                                         page->next->prev = page->prev;
226                                 db->TimeCacheLast->next = page;
227                                 page->prev = db->TimeCacheLast;
228                                 db->TimeCacheLast = page;
229                                 page->next = NULL;
230                         }
231                 }
232         }
233
234         return page;
235 }
236
237 static void
238 findAndMovePN(TBTree *db, TBTMemPage *page, int pos) {
239         int newpos;
240
241         if ( db->curpage < 2 )
242                 return;
243
244         if ( pos < 0 ) 
245                 pos = db->curpage-1;
246         tassert( pos < db->curpage && db->Cache[pos]==page );
247
248         if ( pos+1 != db->curpage && db->Cache[pos+1]->pagenumber < page->pagenumber ) {
249                 if ( findInCache(db, page->pagenumber, &newpos, db->Cache + pos + 1, db->Cache + db->curpage )!=NOTFOUND) 
250                         tlog(TL_CRIT,"findAndMovePN: Something wrong, duplicate page with pagenumber %d", page->pagenumber);
251                 memmove( db->Cache + pos, db->Cache + pos + 1, (newpos-pos-1)*sizeof(TBTMemPage**) );
252                 db->Cache[newpos-1] = page; 
253         } else if ( pos!=0 && db->Cache[pos-1]->pagenumber > page->pagenumber ) {
254                 if( findInCache(db, page->pagenumber, &newpos, db->Cache, db->Cache + pos )!=NOTFOUND); 
255                         tlog(TL_CRIT,"findAndMovePN: Something wrong, duplicate page with pagenumber %d", page->pagenumber);
256                 memmove( db->Cache + newpos + 1, db->Cache + newpos, (pos-newpos)*sizeof(TBTMemPage**));
257                 db->Cache[newpos] = page;
258         }
259 }
260
261 static int
262 TBTReadPage(TBTree *db, u_int32_t pagenumber, TBTMemPage **page) {
263         int rc, pos=0;
264
265         if ( findInCache(db, pagenumber, &pos, db->Cache, db->Cache + db->curpage)==FOUND ) {
266                 db->cachehits++;
267                 *page=db->Cache[pos];
268                 if ( *page != db->TimeCacheLast ) {
269                         if ( (*page)->prev )
270                                 (*page)->prev->next = (*page)->next;
271                         if ( (*page)->next )
272                                 (*page)->next->prev = (*page)->prev;
273                         db->TimeCacheLast->next = (*page);
274                         (*page)->prev = db->TimeCacheLast;
275                         db->TimeCacheLast = *page;
276                         (*page)->next = NULL;
277                 }
278         } else {
279                 off_t  offset = (off_t)pagenumber * (off_t)TBTREEPAGESIZE;
280         
281                 if( (*page = PageAlloc(db, &pos))==NULL )
282                         return TBT_ERROR;
283
284                 if ( lseek(db->fd, offset, SEEK_SET) != offset ) {
285                         tlog(TL_CRIT,"TBTReadPage: lseek failed: %s",strerror(errno));
286                         return TBT_ERROR;
287                 }
288                 (*page)->issynced=1;
289                 (*page)->pagenumber=pagenumber;
290                 if ( (rc=read(db->fd, &( (*page)->page ), TBTREEPAGESIZE)) != TBTREEPAGESIZE ) {
291                         tlog(TL_CRIT,"TBTReadPage: read failed: %s %d %d",strerror(errno), pagenumber, offset);
292                         return TBT_ERROR;
293                 }
294                 if ( (*page)->iscached ) 
295                         findAndMovePN(db, *page, pos);
296         }
297
298         db->pageread++;
299         return TBT_OK;   
300 }
301
302 static int
303 TBTWritePage(TBTree *db, TBTMemPage *page) {
304         off_t  offset = (off_t)(page->pagenumber) * (off_t)TBTREEPAGESIZE;
305         off_t  size;
306
307         if ( page->issynced )
308                 return TBT_OK;
309
310         if ( lseek(db->fd, offset, SEEK_SET) != offset ) {
311                 tlog(TL_CRIT,"TBTWritePage: lseek failed: %s",strerror(errno));
312                 return TBT_ERROR;
313         }
314
315         if ( (size=write(db->fd, &( page->page ), TBTREEPAGESIZE)) != TBTREEPAGESIZE ) {
316                 tlog(TL_CRIT,"TBTWritePage: write failed: %s (writed only %d bytes)",strerror(errno), size);
317                 return TBT_ERROR;
318         }
319
320         page->issynced=1;
321         db->pagewrite++;
322
323         return TBT_OK;
324 }
325
326 static int
327 TBTNewPage(TBTree *db, TBTMemPage **page) {
328         int pos;
329
330         if ( db->lastpagenumber==0 ) {
331                 off_t  offset;
332                 if ( (offset=lseek(db->fd, 0, SEEK_END)) < 0 ) {
333                         tlog(TL_CRIT,"TBTNewPage: lseek failed: %s",strerror(errno));
334                         return TBT_ERROR;
335                 }
336
337                 if ( offset % TBTREEPAGESIZE != 0 ) {
338                         tlog(TL_CRIT,"TBTNewPage: file corrupted");
339                         return TBT_ERROR;
340                 }
341
342                 db->lastpagenumber = offset/TBTREEPAGESIZE;
343         }
344
345         if( (*page = PageAlloc(db, &pos))==NULL )
346                 return TBT_ERROR;
347
348         (*page)->pagenumber = db->lastpagenumber;
349         (*page)->issynced=0;
350         (*page)->islocked=1;
351
352         db->lastpagenumber++;
353
354         memset( &((*page)->page), 0, TBTMEMPAGEHDRSZ);
355         (*page)->page.freespace = TBTREEPAGESIZE-TBTPAGEHDRSZ;
356
357         if ( (*page)->iscached ) 
358                 findAndMovePN(db, *page, pos);
359         
360         return TBT_OK;
361 }
362
363 static void
364 printValue(u_int32_t len, char *ptr) {
365         putchar('\'');
366         while(len>0) {
367                 putchar(*ptr);
368                 ptr++; len--;
369         }
370         putchar('\'');
371 }
372
373 static void
374 dumpPage(TBTree *db, TBTPage *page, int follow) {
375         u_int32_t i,j;
376
377         for(j=0;j<follow;j++)
378                 putchar(' ');
379         printf("Page: f:%d l:%d r:%d n:%d\n", 
380                 page->freespace,
381                 page->isleaf,
382                 page->rightlink,
383                 page->npointer
384         );      
385         for(i=0;i<page->npointer;i++) {
386                 TBTPointer *ptr= (TBTPointer*)(page->data+TBTPOINTERSIZE(db)*i);
387                 for(j=0;j<follow+2;j++)
388                         putchar(' ');
389                 printf("i:%d(%d) kl:%d ko:%d ",
390                         i, (int)ptr, 
391                         (db->keylen) ? db->keylen : ptr->key.varlen.length,
392                         (db->keylen) ? 0 : ptr->key.varlen.offset
393                 );
394                 if ( db->keylen==0 ) 
395                         printValue(ptr->key.varlen.length, page->data + ptr->key.varlen.offset);
396                 else
397                         printf("'%d'", *(int*)(ptr->key.fixed.key));
398                 if ( page->isleaf ) {
399                         printf(" vl:%d vo:%d ", 
400                                 ptr->pointer.leaf.length,
401                                 ptr->pointer.leaf.offset
402                         );
403                         printValue(ptr->pointer.leaf.length,  page->data + ptr->pointer.leaf.offset );
404                         putchar('\n');
405                 } else {
406                         printf(" page:%d\n", ptr->pointer.internal.link);
407                         if ( follow>=0 )
408                                 dumpTree(db, ptr->pointer.internal.link, follow+4); 
409                 }
410         }
411 }
412
413 void
414 dumpTree(TBTree *db, u_int32_t pagenumber, int follow) {
415         TBTMemPage  *page;
416
417         if ( TBTReadPage(db, pagenumber, &page) ) {
418                 tlog(TL_CRIT,"BEDA %d\n", pagenumber);
419                 return;
420         }
421
422         page->islocked=1;
423         dumpPage(db, &(page->page), follow);
424         page->islocked=0;
425         if ( !page->iscached )
426                 tfree(page);
427 }
428
429 static int
430 findInPage(TBTree *db, TBTPage *page, TBTValue *key, TBTPointer **res) {
431         TBTPointer      *StopLow = (TBTPointer*)(page->data);
432         TBTPointer      *StopHigh = (TBTPointer*)(page->data + TBTPOINTERSIZE(db)*page->npointer);
433         TBTPointer      *StopMiddle;
434         int diff ;
435         TBTValue  A;
436
437         /* Loop invariant: StopLow <= StopMiddle < StopHigh */
438         while (StopLow < StopHigh) {
439                 StopMiddle = (TBTPointer*)(((char*)StopLow) + TBTPOINTERSIZE(db)*((((char*)StopHigh - (char*)StopLow)/TBTPOINTERSIZE(db)) >> 1));
440                 if ( db->keylen == 0 ) {
441                         A.length = StopMiddle->key.varlen.length;
442                         A.value = page->data + StopMiddle->key.varlen.offset;
443                 } else {
444                         A.length =  db->keylen;
445                         A.value = StopMiddle->key.fixed.key;
446                 }
447         
448                 if ( ISINFPOINTER(db, page, StopMiddle) )
449                         diff = 1;
450                 else 
451                         diff = db->cmpkey( &A, key );
452                 if ( diff == 0 ) {
453                         *res = StopMiddle;
454                         return FOUND;
455                 } else if ( diff < 0 )
456                         StopLow = (TBTPointer*)((char*)StopMiddle + TBTPOINTERSIZE(db));
457                 else
458                         StopHigh = StopMiddle;
459         }
460
461         *res = StopHigh;
462         return NOTFOUND;
463 }
464
465 static int
466 findPage(TBTree *db, TBTValue *key, TBTMemPage **page) {
467         u_int32_t pagenumber = TBTPAGEROOT;
468         int rc = TBT_OK;
469
470         *page=NULL;
471         while(1) {
472                 TBTPointer *ptr;
473
474                 if ( (rc=TBTReadPage(db, pagenumber, page))!=TBT_OK )
475                         return rc;
476
477                 if ( (*page)->page.isleaf ) 
478                         return TBT_OK;
479
480                 findInPage(db, &((*page)->page), key, &ptr );
481                 pagenumber = ptr->pointer.internal.link;
482                 if ( !(*page)->iscached )
483                         tfree(*page);
484         }
485
486         return TBT_OK;
487 }
488
489 int
490 TBTFind(TBTree *db, TBTValue *key, TBTValue *value) {
491         TBTMemPage      *page;
492         TBTPointer *ptr;
493         int rc;
494
495         memset(value, 0, sizeof(TBTValue));
496
497         if ( (rc=findPage(db, key, &page))!=TBT_OK )
498                 return rc; 
499                 
500         if ( page == NULL )
501                 return TBT_OK;
502
503         if ( findInPage(db, &(page->page), key, &ptr) != FOUND ) {
504                 if ( !(page)->iscached )
505                         tfree(page);
506                 return TBT_OK;
507         } 
508
509         value->length = ptr->pointer.leaf.length;
510         value->value = tmalloc(value->length);
511         memcpy( value->value, page->page.data + ptr->pointer.leaf.offset, value->length );
512
513         if ( !(page)->iscached )
514                 tfree(page);
515
516         return TBT_OK;  
517 }
518
519 static void
520 packLeafKV(TBTree *db, TBTPage *page, TBTPointer *ptr, TBTValue *key, TBTValue *value) {
521         if ( ptr ) {
522                 if ( (char*)ptr - page->data  != page->npointer * TBTPOINTERSIZE(db) ) 
523                         memmove((char*)ptr+TBTPOINTERSIZE(db), ptr, page->npointer * TBTPOINTERSIZE(db) - ( (char*)ptr - page->data ));
524         } else
525                 ptr = (TBTPointer*)(page->data + page->npointer * TBTPOINTERSIZE(db));
526
527         page->npointer++;
528         page->freespace -= TBTPOINTERSIZE(db) + PTRALIGN(value->length);
529         if ( db->keylen ) {
530                 memcpy( ptr->key.fixed.key, key->value, db->keylen );
531                 ptr->pointer.leaf.offset = page->npointer * TBTPOINTERSIZE(db) + page->freespace;
532         } else {
533                 page->freespace -= PTRALIGN(key->length);
534                 ptr->key.varlen.length = key->length;
535                 ptr->key.varlen.offset = page->npointer * TBTPOINTERSIZE(db) + page->freespace;
536                 memcpy( page->data + ptr->key.varlen.offset, key->value, key->length);
537                 ptr->pointer.leaf.offset = page->npointer * TBTPOINTERSIZE(db) + page->freespace + PTRALIGN(key->length);
538         } 
539         ptr->pointer.leaf.length = value->length;
540         memcpy( page->data + ptr->pointer.leaf.offset, value->value, value->length );
541 }
542
543 static void
544 packInternalKV(TBTree *db, TBTPage *page, TBTPointer *ptr, TBTValue *key, u_int32_t pagenumber) {
545         if ( ptr ) {
546                 if ( (char*)ptr - page->data  != page->npointer * TBTPOINTERSIZE(db) ) 
547                         memmove((char*)ptr+TBTPOINTERSIZE(db), ptr, page->npointer * TBTPOINTERSIZE(db) - ( (char*)ptr - page->data ));
548         } else
549                 ptr = (TBTPointer*)(page->data + page->npointer * TBTPOINTERSIZE(db));
550
551         page->npointer++;
552         page->freespace -= TBTPOINTERSIZE(db);
553
554         if ( db->keylen ) {
555                 if ( key->value )
556                         memcpy( ptr->key.fixed.key, key->value, db->keylen );
557                 else
558                         memset( ptr->key.fixed.key, 0, db->keylen );
559         } else {
560                 page->freespace -= PTRALIGN(key->length);
561                 ptr->key.varlen.length = key->length;
562                 ptr->key.varlen.offset = page->npointer * TBTPOINTERSIZE(db) + page->freespace;
563                 if ( key->length ) 
564                         memcpy( page->data + ptr->key.varlen.offset, key->value, key->length);
565         }
566         ptr->pointer.internal.link = pagenumber;
567 }
568
569
570 static void
571 deleteKV(TBTree *db, TBTPage *page, TBTPointer **ptrs, u_int32_t num) {
572         static char buf[TBTREEPAGESIZE];
573         char  *bufptr = buf;
574         TBTPointer *ptr = (TBTPointer*)page->data, *tmp, **ptrptr = ptrs;
575         u_int32_t i, start;
576         /* suppose ptrs are ordered! */
577         tmp = ptr;
578         while( ptrptr - ptrs < num && (char*)ptr - page->data < TBTPOINTERSIZE(db)*page->npointer ) {
579                 if ( *ptrptr < ptr ) {
580                         tlog(TL_CRIT,"deleteKV: Something wrong: unexistent *ptrs"); 
581                         ptrptr++;
582                 } else if ( *ptrptr > ptr ) {
583                         if ( tmp != ptr )
584                                 memcpy(tmp, ptr, TBTPOINTERSIZE(db));
585                         ptr = (TBTPointer*)( (char*)ptr + TBTPOINTERSIZE(db) );
586                         tmp = (TBTPointer*)( (char*)tmp + TBTPOINTERSIZE(db) );
587                 } else {
588                         page->freespace += TBTPOINTERSIZE(db) + 
589                                 ( ( page->isleaf ) ? PTRALIGN(ptr->pointer.leaf.length) : 0 ) +
590                                 ( ( db->keylen ) ? 0 : PTRALIGN(ptr->key.varlen.length) );
591                         ptrptr++;
592                         ptr = (TBTPointer*)( (char*)ptr + TBTPOINTERSIZE(db) );
593                 }
594         }
595
596         if ( (char*)ptr - page->data < TBTPOINTERSIZE(db)*page->npointer && tmp!=ptr )
597                 memmove( tmp, ptr, page->npointer * TBTPOINTERSIZE(db) - ((char*)ptr - page->data) );
598
599         page->npointer-=num;
600
601         tmp = (TBTPointer*)page->data;
602         start = page->npointer * TBTPOINTERSIZE(db) + page->freespace;
603         for(i=0;i<page->npointer;i++) {
604                 if ( page->isleaf ) {
605                         memcpy( bufptr, page->data + tmp->pointer.leaf.offset, tmp->pointer.leaf.length );
606                         tmp->pointer.leaf.offset = start + (bufptr-buf);
607                         bufptr+=PTRALIGN(tmp->pointer.leaf.length);
608                 }
609
610                 if ( db->keylen == 0 ) {
611                         if ( tmp->key.varlen.length )
612                                 memcpy( bufptr, page->data + tmp->key.varlen.offset, tmp->key.varlen.length );
613                         tmp->key.varlen.offset = start + (bufptr-buf);
614                         bufptr+=PTRALIGN(tmp->key.varlen.length);
615                 }
616                 tmp = (TBTPointer*)( (char*)tmp + TBTPOINTERSIZE(db) );
617         }
618
619         memcpy( page->data + start, buf, bufptr-buf );
620 }
621
622 static u_int32_t
623 findDelimiter(TBTree *db, TBTPage *page, TBTPointer *ptr, int size, u_int32_t start, int *where) {
624         int  *sizes = (int*) tmalloc( sizeof(int) * (page->npointer+1) );
625         int i;
626         TBTPointer *tomove;
627         int lfree=TBTREEPAGESIZE-TBTPAGEHDRSZ, rfree=TBTREEPAGESIZE-TBTPAGEHDRSZ;
628         int nptr = ( (char*)ptr - page->data ) / TBTPOINTERSIZE(db); 
629         int was=0;
630
631         for(i=0; i<page->npointer;i++) {
632                 tomove = (TBTPointer*)(page->data + i*TBTPOINTERSIZE(db));
633                 if ( was==0 && i==nptr ) {
634                         sizes[i] = size;
635                         was=1;
636                 } 
637                 sizes[i+was] = TBTPOINTERSIZE(db) +
638                         ( (page->isleaf) ? PTRALIGN(tomove->pointer.leaf.length) : 0 ) +
639                         ( ( db->keylen ) ? 0 : PTRALIGN(tomove->key.varlen.length) );
640         
641         }
642         
643         if ( was==0 )
644                 sizes[page->npointer]=size;
645
646         for(i=0;i<page->npointer+1;i++) {
647                 if ( i< start )
648                         lfree-=sizes[i];
649                 else 
650                         rfree-=sizes[i];
651         }
652
653         while( 1 ) {
654                 if ( lfree<0 ) {
655                         lfree+=sizes[start];
656                         rfree-=sizes[start];
657                         start--;
658                 } else if ( rfree < 0 ) { 
659                         lfree-=sizes[start];
660                         rfree+=sizes[start];
661                         start++;
662                 } else
663                         break;
664         }
665
666         *where = (nptr < start) ? -1 : 1;
667         if ( start>nptr )
668                 start--;
669
670         tfree(sizes);
671         return start;
672 }
673
674 static void
675 populatePages(TBTree *db, TBTPage *srcpage, TBTPage* newpage, u_int32_t start) {
676         u_int32_t i;
677         TBTPointer *tomove, **todelete = tmalloc( sizeof(TBTPointer*)* srcpage->npointer );
678         u_int32_t numtodelete=0;
679         TBTValue k, v;
680
681         for(i=start;i<srcpage->npointer;i++) {
682                 tomove = (TBTPointer*)(srcpage->data + i*TBTPOINTERSIZE(db));
683
684                 todelete[numtodelete] = tomove;
685                 numtodelete++;
686
687                 if ( db->keylen ) 
688                         k.value = tomove->key.fixed.key;
689                 else {
690                         k.length = tomove->key.varlen.length;
691                         k.value = srcpage->data + tomove->key.varlen.offset;
692                 } 
693                                 
694                 if ( srcpage->isleaf ) {
695                         v.length = tomove->pointer.leaf.length;
696                         v.value = srcpage->data + tomove->pointer.leaf.offset;
697                         packLeafKV(db, newpage, NULL, &k, &v);
698                 } else 
699                         packInternalKV(db, newpage, NULL, &k, tomove->pointer.internal.link);
700         }
701
702         deleteKV(db, srcpage, todelete, numtodelete);   
703         tfree(todelete);
704 }
705
706 static int
707 splitPage(TBTree *db, TBTMemPage *srcpage, TBTMemPage** newpage, TBTPointer **ptr, int size, TBTValue *key, TBTValue *value, u_int32_t pagenumber) {
708         TBTMemPage *tmp;
709         int start=0, where=0;
710         int nptr = ( (char*)(*ptr) - srcpage->page.data ) / TBTPOINTERSIZE(db);
711
712         if ( TBTNewPage(db, newpage) )
713                 return TBT_ERROR;
714
715         tmp = *newpage;
716         srcpage->issynced=tmp->issynced=0;
717         tmp->islocked=1;
718         tmp->page.rightlink = srcpage->page.rightlink;
719         srcpage->page.rightlink = tmp->pagenumber;
720         tmp->page.isleaf = srcpage->page.isleaf;
721
722         switch(db->strategy) {
723                 case TBT_STATEGY_HALF:
724                         start = findDelimiter(db, &(srcpage->page), *ptr, size, (srcpage->page.npointer+1)/2, &where);  
725                         break;
726                 case TBT_STATEGY_LEFTFILL:
727                         start = findDelimiter(db, &(srcpage->page), *ptr, size, srcpage->page.npointer, &where);        
728                         break;
729                 case TBT_STATEGY_RIGHTFILL:
730                         start = findDelimiter(db, &(srcpage->page), *ptr, size, 0, &where);     
731                         break;
732                 default:
733                         tlog(TL_CRIT,"splitPage: unknown strategy: %d", db->strategy);
734                         return TBT_ERROR;
735         }
736
737         populatePages(db, &(srcpage->page), &(tmp->page), start);
738         if ( where<0 )
739                 tmp=srcpage;
740         else
741                 nptr-=start;
742         *ptr = (TBTPointer*)(tmp->page.data + nptr*TBTPOINTERSIZE(db));
743
744         if ( tmp->page.isleaf ) 
745                 packLeafKV(db, &(tmp->page), *ptr, key, value);
746         else
747                 packInternalKV(db, &(tmp->page), *ptr, key, pagenumber);
748
749         return TBT_OK;
750 }
751
752 static int
753 savePage(TBTree *db, TBTMemPage *page) {
754         int rc;
755         page->islocked=0;
756
757         if ( !page->iscached ) {
758                 if (  page->issynced==0 && (rc=TBTWritePage(db, page)) != TBT_OK )
759                         return rc;
760                 tfree(page);
761         }
762         return TBT_OK;
763 }
764
765 static int
766 layerInsert(TBTree *db, u_int32_t pagenumber, TBTMemPage **left, TBTMemPage **right, TBTValue *key, TBTValue *value) {
767         int rc,fnd;
768         TBTPointer      *ptr=NULL;
769         TBTMemPage      *page=NULL, *newright=NULL;
770                         
771         if ( (rc=TBTReadPage(db, pagenumber, &page))!=TBT_OK )
772                 return rc;
773
774         page->islocked=1;
775
776         fnd = findInPage(db, &(page->page), key, &ptr );
777
778         *left=*right=NULL;
779         if ( page->page.isleaf ) {
780                 u_int32_t size = TBTPOINTERSIZE(db) +
781                         PTRALIGN(value->length) +
782                         ( ( db->keylen ) ? 0 : PTRALIGN(key->length) );
783                 
784                 if ( size>(TBTREEPAGESIZE-TBTPAGEHDRSZ)/2 ) {
785                         tlog(TL_ALARM,"Huge size: %d bytes", size);
786                         return TBT_HUGE;
787                 }
788
789                 if (fnd==FOUND) {
790                         deleteKV(db, &(page->page), &ptr, 1);
791                         page->issynced=0;
792                 }
793
794                 if ( size <= page->page.freespace ) {
795                         packLeafKV(db, &(page->page), ptr, key, value);
796                         page->issynced=0;
797                 } else {
798                         rc = splitPage(db, page, &newright, &ptr, size, key, value, 0);
799                         if ( rc != TBT_OK ) return rc;
800
801                         page->issynced = newright->issynced = 0;
802                         page->islocked = newright->islocked = 1;
803                 }
804         } else {
805                 if ( (rc=layerInsert(db, ptr->pointer.internal.link, left, right, key, value))!=TBT_OK ) {
806                         page->islocked=0;
807                         return rc;
808                 }
809
810                 if ( *left ) { /* child page was splited */
811                         u_int32_t size;
812                         TBTPointer *rightptr = (TBTPointer*)( (*left)->page.data + ((*left)->page.npointer-1) * TBTPOINTERSIZE(db) );
813                         TBTValue K;
814
815                         if ( ISINFPOINTER(db, &(page->page), ptr) ) {
816                                 deleteKV(db, &(page->page), &ptr, 1);
817                                 K.length = 0; K.value = NULL;
818                                 packInternalKV(db, &(page->page), ptr, &K, (*right)->pagenumber);
819                         } else 
820                                 ptr->pointer.internal.link=(*right)->pagenumber;
821
822                         K.length = ( db->keylen ) ? db->keylen : rightptr->key.varlen.length;
823                         K.value  = ( db->keylen ) ? rightptr->key.fixed.key : (*left)->page.data + rightptr->key.varlen.offset;
824
825                         size = TBTPOINTERSIZE(db) + ( ( db->keylen ) ? 0 : PTRALIGN(K.length) );
826
827                         if ( size <= page->page.freespace ) {
828                                 packInternalKV(db, &(page->page), ptr, &K, (*left)->pagenumber);
829                                 page->issynced=0;
830                         } else {
831                                 rc = splitPage(db, page, &newright, &ptr, size, &K, NULL, (*left)->pagenumber);
832                                 if ( rc != TBT_OK ) return rc;
833
834                                 page->issynced = newright->issynced = 0;
835                                 page->islocked = newright->islocked = 1;
836                         }
837                 }
838         }
839
840         if ( *left ) {
841                 if ( (rc=savePage(db, *left))!=TBT_OK )
842                         return rc;
843                 if ( (rc=savePage(db, *right))!=TBT_OK )
844                         return rc;
845                 *left=*right=NULL;
846         }
847
848         if ( newright ) {
849                 if ( page->pagenumber==TBTPAGEROOT ) { /* root was splited */
850                         TBTMemPage  *newleft;
851                         TBTValue K;
852                 
853
854                         if  ( (rc=TBTNewPage(db,&newleft))!=TBT_OK )
855                                 return rc;
856                         memcpy( &(newleft->page) , &(page->page), TBTREEPAGESIZE);
857                         memset( &(page->page), 0, TBTREEPAGESIZE);
858                         page->page.freespace=TBTREEPAGESIZE-TBTPAGEHDRSZ;
859         
860                         ptr = (TBTPointer*)( newleft->page.data + (newleft->page.npointer-1) * TBTPOINTERSIZE(db) ); 
861                         K.length = ( db->keylen ) ? db->keylen : ptr->key.varlen.length;
862                         K.value  = ( db->keylen ) ? ptr->key.fixed.key : newleft->page.data + ptr->key.varlen.offset;
863                         packInternalKV(db, &(page->page), NULL, &K, newleft->pagenumber);
864                 
865                         ptr = (TBTPointer*)( newright->page.data + (newright->page.npointer-1) * TBTPOINTERSIZE(db) ); 
866                         K.length = 0;
867                         K.value  = NULL;
868                         packInternalKV(db, &(page->page), NULL, &K, newright->pagenumber);
869
870                         if ( (rc=savePage(db, newright))!=TBT_OK )
871                                 return rc;
872                         if ( (rc=savePage(db, newleft))!=TBT_OK )
873                                 return rc;
874                         if ( (rc=savePage(db, page))!=TBT_OK )
875                                 return rc;
876                 } else {
877                         *left = page;
878                         *right = newright;
879                 }
880         } else if ( (rc=savePage(db, page))!=TBT_OK )
881                 return rc;
882         
883
884         return TBT_OK;
885
886
887 int
888 TBTInsert(TBTree *db, TBTValue *key, TBTValue *value) {
889         int rc;
890         TBTMemPage *lpage=NULL, *rpage=NULL;
891
892         rc = layerInsert(db, TBTPAGEROOT, &lpage, &rpage, key, value);
893
894         tassert( lpage==NULL && rpage==NULL );
895
896         return rc;
897 }
898
899 int
900 TBTDelete(TBTree *db, TBTValue *key) {
901         TBTMemPage      *page;
902         TBTPointer *ptr;
903         int rc;
904
905         if ( (rc=findPage(db, key, &page))!=TBT_OK )
906                 return rc; 
907                 
908         if ( page == NULL )
909                 return TBT_OK;
910
911         if ( findInPage(db, &(page->page), key, &ptr) != FOUND ) {
912                 if ( !(page)->iscached )
913                         tfree(page);
914                 return TBT_OK;
915         } 
916
917         deleteKV(db, &(page->page), &ptr, 1);
918         page->issynced=0;
919
920         rc=savePage(db, page);
921
922         return rc;      
923 }
924
925 int 
926 TBTInitIterator(TBTree *db, TBTIterator *iterator) {
927         TBTMemPage      *page=NULL;
928         TBTPointer *ptr;
929         int rc;
930         u_int32_t pagenum=TBTPAGEROOT;
931
932         memset(iterator, 0, sizeof(TBTIterator));
933
934         do {
935                 if ( page && !page->iscached )
936                         tfree(page);
937                 if ( (rc=TBTReadPage(db, pagenum, &page))!=TBT_OK )
938                         return rc;
939
940                 ptr = (TBTPointer*)( page->page.data );
941                 pagenum = ptr->pointer.internal.link;
942         } while( !page->page.isleaf );
943
944         page->islocked=1;
945         iterator->page = page;  
946         iterator->ptr = ptr;    
947
948         return TBT_OK;
949 }
950
951 void 
952 TBTFreeIterator(TBTree *db, TBTIterator *iterator) {
953         
954         if ( iterator->page ) {
955                 iterator->page->islocked=0;
956                 if ( !iterator->page->iscached )
957                         tfree(iterator->page);
958         }
959
960         memset(iterator, 0, sizeof(TBTIterator));
961 }
962
963 int 
964 TBTIterate(TBTree *db, TBTIterator *iterator, TBTValue *key, TBTValue *value ) {
965         int rc;
966
967         if (  iterator->page==NULL ) {
968                 memset(key, 0, sizeof(TBTValue));
969                 memset(value, 0, sizeof(TBTValue));
970                 return TBT_OK;
971         }
972
973         if ( (char*)(iterator->ptr) - iterator->page->page.data >= TBTPOINTERSIZE(db) * iterator->page->page.npointer ) {
974                 do {
975                         u_int32_t pagenumber = iterator->page->page.rightlink;
976                         iterator->page->islocked=0;
977                         if ( !iterator->page->iscached )
978                                 tfree(iterator->page);
979                         iterator->page=NULL;
980
981                         if ( pagenumber==TBTPAGEROOT ) {
982                                 memset(key, 0, sizeof(TBTValue));
983                                 memset(value, 0, sizeof(TBTValue));
984                                 return TBT_OK;
985                         }
986                         if ( (rc=TBTReadPage(db, pagenumber, &(iterator->page)))!=TBT_OK )
987                                 return rc;
988                 } while ( iterator->page->page.npointer == 0 );
989
990                 iterator->page->islocked=1;
991                 iterator->ptr = (TBTPointer*)( iterator->page->page.data );
992         }
993
994         if ( db->keylen ) {
995                 key->length = db->keylen;
996                 key->value = iterator->ptr->key.fixed.key;
997         } else {
998                 key->length = iterator->ptr->key.varlen.length; 
999                 key->value = iterator->page->page.data + iterator->ptr->key.varlen.offset;
1000         }
1001
1002         value->length = iterator->ptr->pointer.leaf.length; 
1003         value->value = iterator->page->page.data + iterator->ptr->pointer.leaf.offset;
1004
1005         iterator->ptr = ( TBTPointer* ) (  (char*)(iterator->ptr) + TBTPOINTERSIZE(db) );
1006
1007         return TBT_OK;
1008 }
1009
1010