BTree realization
[tedtools.git] / tbtree.c
1 /*
2  * Copyright (c) 2005 Teodor Sigaev <teodor@sigaev.ru>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *      notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *      notice, this list of conditions and the following disclaimer in the
12  *      documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the author nor the names of any co-contributors
14  *      may be used to endorse or promote products derived from this software
15  *      without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL CONTRIBUTORS BE LIABLE FOR ANY
21  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
23  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
25  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
27  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <fcntl.h>
32 #include <unistd.h>
33 #include <string.h>
34 #include <errno.h>
35 #include <sys/types.h>
36 #include <sys/stat.h>
37 #include <sys/file.h>
38
39 #include "tmalloc.h"
40 #include "tlog.h"
41
42 #include "tbtree.h"
43
44 static int TBTReadPage(TBTree *db, u_int32_t pagenumber, TBTMemPage **page) ;
45 static int TBTWritePage(TBTree *db, TBTMemPage *page) ;
46 static int TBTNewPage(TBTree *db, TBTMemPage **page) ;
47
48 int 
49 TBTOpen(TBTree *db, char *file) {
50         off_t offset;
51
52         if ( db->readonly ) {
53                 db->fd = open(file, O_RDONLY);
54                 if ( db->fd < 0 ) {
55                         tlog(TL_CRIT,"TBTOpen: open failed: %s",strerror(errno));
56                         return TBT_ERROR;
57                 }
58                 if ( flock( db->fd, LOCK_SH ) < 0 ) {
59                         tlog(TL_CRIT,"TBTOpen: flock failed: %s",strerror(errno));
60                         close(db->fd);
61                         return TBT_ERROR;
62                 }
63         } else {
64                 db->fd = open(file, O_CREAT | O_RDWR, 0666);
65                 if ( db->fd < 0 ) {
66                         tlog(TL_CRIT,"TBTOpen: open failed: %s",strerror(errno));
67                         return TBT_ERROR;
68                 }
69                 if ( flock( db->fd, LOCK_EX ) < 0 ) {
70                         tlog(TL_CRIT,"TBTOpen: flock failed: %s",strerror(errno));
71                         close(db->fd);
72                         return TBT_ERROR;
73                 }
74         }
75
76         if ( db->npage ) {
77                 db->Cache=(TBTMemPage**)t0malloc( sizeof(TBTMemPage*)*db->npage );
78                 db->TimeCache=(TBTMemPage**)t0malloc( sizeof(TBTMemPage*)*db->npage );
79                 db->curpage=0;
80         }
81
82         db->cachehits = db->pageread = db->pagewrite = 0;
83
84         if ( (offset=lseek(db->fd, 0, SEEK_END)) < 0 ) {
85                 tlog(TL_CRIT,"TBTOpen: lseek failed: %s",strerror(errno));
86                 flock( db->fd, LOCK_UN );
87                 close(db->fd);
88                 return TBT_ERROR;
89         }
90
91         if ( offset==0 ) {
92                 if ( db->readonly ) {
93                         tlog(TL_CRIT,"TBTOpen: file is void");
94                         flock( db->fd, LOCK_UN );
95                         close(db->fd);
96                         return TBT_ERROR;
97                 } else {
98                         TBTMemPage *page;
99                         if ( TBTNewPage(db, &page) != TBT_OK ) {
100                                 flock( db->fd, LOCK_UN );
101                                 close(db->fd);
102                                 return TBT_ERROR;
103                         }
104                         page->page.isleaf=1;
105                         if ( TBTWritePage(db, page)!=TBT_OK ){
106                                 flock( db->fd, LOCK_UN );
107                                 close(db->fd);
108                                 return TBT_ERROR;
109                         }
110                 }
111         } 
112
113         db->lastpagenumber=0;                            
114         return TBT_OK; 
115 }
116
117 int
118 TBTClose(TBTree *db) {
119         int rc=0;
120
121         if ( !db->readonly ) 
122                 rc=TBTSync(db);
123
124         if ( db->npage ) {
125                 u_int32_t i;
126
127                 for(i=0; i<db->curpage; i++)
128                         tfree( db->Cache[i] );
129
130                 tfree( db->Cache );
131                 tfree( db->TimeCache );
132         }
133         
134         flock( db->fd, LOCK_UN );
135         close(db->fd);  
136         
137         return (rc) ? TBT_ERROR : TBT_OK;
138 }
139
140 int
141 TBTSync(TBTree *db) {
142         u_int32_t i;
143         int rc=0;
144
145         if ( db->readonly || db->npage==0 )
146                 return TBT_OK;
147  
148         for(i=0; i<db->curpage; i++)
149                 if ( !db->Cache[i]->issynced )
150                         rc |= TBTWritePage(db, db->Cache[i]);
151         
152         if ( fsync(db->fd) ) {
153                 tlog(TL_CRIT,"TBTSync: fsync failed: %s",strerror(errno));
154                 rc=1;
155         }
156
157         return (rc) ? TBT_ERROR : TBT_OK;
158 }
159
160 static int
161 cmpNPage(const void *a, const void *b) {
162         if (  (*(TBTMemPage**)a)->pagenumber == (*(TBTMemPage**)b)->pagenumber ) 
163                 return 0;
164         return ( (*(TBTMemPage**)a)->pagenumber > (*(TBTMemPage**)b)->pagenumber ) ? 1 : -1;
165 }
166
167 static TBTMemPage*
168 PageAlloc(TBTree *db, int *pos) {
169         TBTMemPage *page;
170
171         if ( db->curpage < db->npage ) {
172                 page = db->Cache[db->curpage] = db->TimeCache[db->curpage] = (TBTMemPage*)t0malloc(sizeof(TBTMemPage));
173                 *pos = db->curpage;
174                 page->iscached=1;
175                 db->curpage++;
176         } else if ( db->npage == 0 ) {
177                 page = (TBTMemPage*)t0malloc(sizeof(TBTMemPage));
178         } else {
179                 u_int32_t i;
180                 for( i=0; i<db->curpage && db->TimeCache[i]->islocked; i++ );
181                 if ( i == db->curpage ) { /*all pages locked*/
182                         /*tlog(TL_WARN,"TBTReadPage: all pages in cache locked, increase number of cached pages");*/
183                         page = (TBTMemPage*)t0malloc(sizeof(TBTMemPage));
184                 } else {
185                         page = db->TimeCache[i];
186                         if ( !page->issynced )
187                                 if ( TBTWritePage(db, page) )
188                                         return NULL;
189                         memset(page,0,sizeof(TBTMemPage));
190                         page->iscached=1;
191                         *pos=i;
192                 }
193         }
194
195         return page;
196 }
197
198 static void
199 moveFirstTime(TBTree *db, int pos) {
200         TBTMemPage *ptr;
201         
202         if ( pos < 0 ) 
203                 return;
204         if (pos+1==db->curpage )
205                 return;
206
207         ptr = db->TimeCache[pos];
208         memmove( db->TimeCache + pos , db->TimeCache + pos + 1, sizeof(TBTMemPage*)*(db->curpage-pos-1) );
209         db->TimeCache[db->curpage-1] = ptr;
210 }
211
212 static void
213 findAndMoveFirstTime(TBTree *db, TBTMemPage *page) {
214         int pos;
215         for(pos=0;pos<db->curpage;pos++)
216                 if ( page==db->TimeCache[pos] ) {
217                         moveFirstTime(db,pos);
218                         break;
219                 }
220         tassert( pos<db->curpage );
221 }
222
223 static void
224 findAndMovePN(TBTree *db, TBTMemPage *page) {
225         int pos;
226
227         if ( db->curpage < 2 )
228                 return;
229
230         for(pos=0;pos<db->curpage;pos++)
231                 if ( page==db->Cache[pos] ) 
232                         break;
233         tassert( pos<db->curpage );
234
235         while(1) {
236                 if ( pos+1 != db->curpage && db->Cache[pos+1]->pagenumber < page->pagenumber ) {
237                         db->Cache[pos] = db->Cache[pos+1];
238                         db->Cache[pos+1] = page;
239                         pos++; 
240                 } else if ( pos!=0 && db->Cache[pos-1]->pagenumber > page->pagenumber ) {
241                         db->Cache[pos] = db->Cache[pos-1];
242                         db->Cache[pos-1] = page;
243                         pos--;
244                 } else
245                         break; 
246         }       
247 }
248
249 static int
250 TBTReadPage(TBTree *db, u_int32_t pagenumber, TBTMemPage **page) {
251         TBTMemPage      *ptr, key, **res;
252         int rc;
253         int pos;
254
255         ptr=&key;
256         key.pagenumber = pagenumber;
257
258         res = (TBTMemPage**)bsearch(&ptr, db->Cache, db->curpage, sizeof(TBTMemPage*), cmpNPage);
259         if ( res ) {
260                 db->cachehits++;
261                 *page=*res;
262                 findAndMoveFirstTime(db, *page);
263         } else {
264                 off_t  offset = (off_t)pagenumber * (off_t)TBTREEPAGESIZE;
265                 
266                 if( (*page = PageAlloc(db,&pos))==NULL )
267                         return TBT_ERROR;
268
269                 if ( lseek(db->fd, offset, SEEK_SET) != offset ) {
270                         tlog(TL_CRIT,"TBTReadPage: lseek failed: %s",strerror(errno));
271                         return TBT_ERROR;
272                 }
273                 (*page)->issynced=1;
274                 (*page)->pagenumber=pagenumber;
275                 if ( (rc=read(db->fd, &( (*page)->page ), TBTREEPAGESIZE)) != TBTREEPAGESIZE ) {
276                         tlog(TL_CRIT,"TBTReadPage: read failed: %s %d %d",strerror(errno), pagenumber, offset);
277                         return TBT_ERROR;
278                 }
279                 if ( (*page)->iscached ) {
280                         moveFirstTime(db, pos);
281                         findAndMovePN(db, *page);
282                 } 
283         }
284
285         db->pageread++;
286         gettimeofday( &( (*page)->last ), NULL );
287         
288         return TBT_OK;   
289 }
290
291 static int
292 TBTWritePage(TBTree *db, TBTMemPage *page) {
293         off_t  offset = (off_t)(page->pagenumber) * (off_t)TBTREEPAGESIZE;
294         off_t  size;
295
296         if ( page->issynced )
297                 return TBT_OK;
298
299         if ( lseek(db->fd, offset, SEEK_SET) != offset ) {
300                 tlog(TL_CRIT,"TBTWritePage: lseek failed: %s",strerror(errno));
301                 return TBT_ERROR;
302         }
303
304         if ( (size=write(db->fd, &( page->page ), TBTREEPAGESIZE)) != TBTREEPAGESIZE ) {
305                 tlog(TL_CRIT,"TBTWritePage: write failed: %s (writed only %d bytes)",strerror(errno), size);
306                 return TBT_ERROR;
307         }
308
309         page->issynced=1;
310         db->pagewrite++;
311
312         return TBT_OK;
313 }
314
315 static int
316 TBTNewPage(TBTree *db, TBTMemPage **page) {
317         int pos;
318         if ( db->lastpagenumber==0 ) {
319                 off_t  offset;
320                 if ( (offset=lseek(db->fd, 0, SEEK_END)) < 0 ) {
321                         tlog(TL_CRIT,"TBTNewPage: lseek failed: %s",strerror(errno));
322                         return TBT_ERROR;
323                 }
324
325                 if ( offset % TBTREEPAGESIZE != 0 ) {
326                         tlog(TL_CRIT,"TBTNewPage: file corrupted");
327                         return TBT_ERROR;
328                 }
329
330                 db->lastpagenumber = offset/TBTREEPAGESIZE;
331         }
332
333         if( (*page = PageAlloc(db,&pos))==NULL )
334                 return TBT_ERROR;
335         (*page)->pagenumber = db->lastpagenumber;
336         (*page)->issynced=0;
337         (*page)->islocked=1;
338
339         db->lastpagenumber++;
340
341         memset( &((*page)->page), 0, sizeof(TBTPAGEHDRSZ));
342         (*page)->page.freespace = TBTREEPAGESIZE-TBTPAGEHDRSZ;
343
344         gettimeofday( &( (*page)->last ), NULL );
345         if ( (*page)->iscached ) {
346                 moveFirstTime(db, pos);
347                 findAndMovePN(db, *page);
348         }
349         
350         return TBT_OK;
351 }
352
353 #define NOTFOUND        0
354 #define FOUND           1
355
356 static void
357 printValue(u_int32_t len, char *ptr) {
358         putchar('\'');
359         while(len>0) {
360                 putchar(*ptr);
361                 ptr++; len--;
362         }
363         putchar('\'');
364 }
365
366 static void
367 dumpPage(TBTree *db, TBTPage *page, int follow) {
368         u_int32_t i,j;
369
370         for(j=0;j<follow;j++)
371                 putchar(' ');
372         printf("Page: f:%d l:%d r:%d n:%d\n", 
373                 page->freespace,
374                 page->isleaf,
375                 page->rightlink,
376                 page->npointer
377         );      
378         for(i=0;i<page->npointer;i++) {
379                 TBTPointer *ptr= (TBTPointer*)(page->data+TBTPOINTERSIZE(db)*i);
380                 for(j=0;j<follow+2;j++)
381                         putchar(' ');
382                 printf("i:%d(%d) kl:%d ko:%d ",
383                         i, (int)ptr, 
384                         (db->keylen) ? db->keylen : ptr->key.varlen.length,
385                         (db->keylen) ? 0 : ptr->key.varlen.offset
386                 );
387                 if ( db->keylen==0 ) 
388                         printValue(ptr->key.varlen.length, page->data + ptr->key.varlen.offset);
389                 else
390                         printf("'%d'", *(int*)(ptr->key.fixed.key));
391                 if ( page->isleaf ) {
392                         printf(" vl:%d vo:%d ", 
393                                 ptr->pointer.leaf.length,
394                                 ptr->pointer.leaf.offset
395                         );
396                         printValue(ptr->pointer.leaf.length,  page->data + ptr->pointer.leaf.offset );
397                         putchar('\n');
398                 } else {
399                         printf(" page:%d\n", ptr->pointer.internal.link);
400                         if ( follow>=0 )
401                                 dumpTree(db, ptr->pointer.internal.link, follow+4); 
402                 }
403         }
404 }
405
406 void
407 dumpTree(TBTree *db, u_int32_t pagenumber, int follow) {
408         TBTMemPage  *page;
409
410         if ( TBTReadPage(db, pagenumber, &page) ) {
411                 fprintf(stderr,"BEDA %d\n", pagenumber);
412                 return;
413         }
414
415         page->islocked=1;
416         dumpPage(db, &(page->page), follow);
417         page->islocked=0;
418         if ( !page->iscached )
419                 tfree(page);
420 }
421
422 static int
423 findInPage(TBTree *db, TBTPage *page, TBTValue *key, TBTPointer **res) {
424         TBTPointer      *StopLow = (TBTPointer*)(page->data);
425         TBTPointer      *StopHigh = (TBTPointer*)(page->data + TBTPOINTERSIZE(db)*page->npointer);
426         TBTPointer      *StopMiddle;
427         int diff ;
428         TBTValue  A;
429
430         /* Loop invariant: StopLow <= StopMiddle < StopHigh */
431         while (StopLow < StopHigh) {
432                 StopMiddle = (TBTPointer*)(((char*)StopLow) + TBTPOINTERSIZE(db)*((((char*)StopHigh - (char*)StopLow)/TBTPOINTERSIZE(db)) >> 1));
433                 if ( db->keylen == 0 ) {
434                         A.length = StopMiddle->key.varlen.length;
435                         A.value = page->data + StopMiddle->key.varlen.offset;
436                 } else {
437                         A.length =  db->keylen;
438                         A.value = StopMiddle->key.fixed.key;
439                 }
440         
441                 if ( ISINFPOINTER(db, page, StopMiddle) )
442                         diff = 1;
443                 else 
444                         diff = db->cmpkey( &A, key );
445                 if ( diff == 0 ) {
446                         *res = StopMiddle;
447                         return FOUND;
448                 } else if ( diff < 0 )
449                         StopLow = (TBTPointer*)((char*)StopMiddle + TBTPOINTERSIZE(db));
450                 else
451                         StopHigh = StopMiddle;
452         }
453
454         *res = StopHigh;
455         return NOTFOUND;
456 }
457
458 static int
459 findPage(TBTree *db, TBTValue *key, TBTMemPage **page) {
460         u_int32_t pagenumber = TBTPAGEROOT;
461         int rc = TBT_OK;
462
463         *page=NULL;
464         while(1) {
465                 TBTPointer *ptr;
466
467                 if ( (rc=TBTReadPage(db, pagenumber, page))!=TBT_OK )
468                         return rc;
469
470                 if ( (*page)->page.isleaf ) 
471                         return TBT_OK;
472
473                 findInPage(db, &((*page)->page), key, &ptr );
474                 pagenumber = ptr->pointer.internal.link;
475                 if ( !(*page)->iscached )
476                         tfree(*page);
477         }
478
479         return TBT_OK;
480 }
481
482 int
483 TBTFind(TBTree *db, TBTValue *key, TBTValue *value) {
484         TBTMemPage      *page;
485         TBTPointer *ptr;
486         int rc;
487
488         memset(value, 0, sizeof(TBTValue));
489
490         if ( (rc=findPage(db, key, &page))!=TBT_OK )
491                 return rc; 
492                 
493         if ( page == NULL )
494                 return TBT_OK;
495
496         if ( findInPage(db, &(page->page), key, &ptr) != FOUND ) {
497                 if ( !(page)->iscached )
498                         tfree(page);
499                 return TBT_OK;
500         } 
501
502         value->length = ptr->pointer.leaf.length;
503         value->value = tmalloc(value->length);
504         memcpy( value->value, page->page.data + ptr->pointer.leaf.offset, value->length );
505
506         if ( !(page)->iscached )
507                 tfree(page);
508
509         return TBT_OK;  
510 }
511
512 static void
513 packLeafKV(TBTree *db, TBTPage *page, TBTPointer *ptr, TBTValue *key, TBTValue *value) {
514         if ( ptr ) {
515                 if ( (char*)ptr - page->data  != page->npointer * TBTPOINTERSIZE(db) ) 
516                         memmove((char*)ptr+TBTPOINTERSIZE(db), ptr, page->npointer * TBTPOINTERSIZE(db) - ( (char*)ptr - page->data ));
517         } else
518                 ptr = (TBTPointer*)(page->data + page->npointer * TBTPOINTERSIZE(db));
519
520         page->npointer++;
521         page->freespace -= TBTPOINTERSIZE(db) + PTRALIGN(value->length);
522         if ( db->keylen ) {
523                 memcpy( ptr->key.fixed.key, key->value, db->keylen );
524                 ptr->pointer.leaf.offset = page->npointer * TBTPOINTERSIZE(db) + page->freespace;
525         } else {
526                 page->freespace -= PTRALIGN(key->length);
527                 ptr->key.varlen.length = key->length;
528                 ptr->key.varlen.offset = page->npointer * TBTPOINTERSIZE(db) + page->freespace;
529                 memcpy( page->data + ptr->key.varlen.offset, key->value, key->length);
530                 ptr->pointer.leaf.offset = page->npointer * TBTPOINTERSIZE(db) + page->freespace + PTRALIGN(key->length);
531         } 
532         ptr->pointer.leaf.length = value->length;
533         memcpy( page->data + ptr->pointer.leaf.offset, value->value, value->length );
534 }
535
536 static void
537 packInternalKV(TBTree *db, TBTPage *page, TBTPointer *ptr, TBTValue *key, u_int32_t pagenumber) {
538         if ( ptr ) {
539                 if ( (char*)ptr - page->data  != page->npointer * TBTPOINTERSIZE(db) ) 
540                         memmove((char*)ptr+TBTPOINTERSIZE(db), ptr, page->npointer * TBTPOINTERSIZE(db) - ( (char*)ptr - page->data ));
541         } else
542                 ptr = (TBTPointer*)(page->data + page->npointer * TBTPOINTERSIZE(db));
543
544         page->npointer++;
545         page->freespace -= TBTPOINTERSIZE(db);
546
547         if ( db->keylen ) {
548                 if ( key->value )
549                         memcpy( ptr->key.fixed.key, key->value, db->keylen );
550                 else
551                         memset( ptr->key.fixed.key, 0, db->keylen );
552         } else {
553                 page->freespace -= PTRALIGN(key->length);
554                 ptr->key.varlen.length = key->length;
555                 ptr->key.varlen.offset = page->npointer * TBTPOINTERSIZE(db) + page->freespace;
556                 if ( key->length ) 
557                         memcpy( page->data + ptr->key.varlen.offset, key->value, key->length);
558         }
559         ptr->pointer.internal.link = pagenumber;
560 }
561
562
563 static void
564 deleteKV(TBTree *db, TBTPage *page, TBTPointer **ptrs, u_int32_t num) {
565         static char buf[TBTREEPAGESIZE];
566         char  *bufptr = buf;
567         TBTPointer *ptr = (TBTPointer*)page->data, *tmp, **ptrptr = ptrs;
568         u_int32_t i, start;
569         /* suppose ptrs are ordered! */
570         tmp = ptr;
571         while( ptrptr - ptrs < num && (char*)ptr - page->data < TBTPOINTERSIZE(db)*page->npointer ) {
572                 if ( *ptrptr < ptr ) {
573                         tlog(TL_CRIT,"deleteKV: Something wrong: unexistent *ptrs"); 
574                         ptrptr++;
575                 } else if ( *ptrptr > ptr ) {
576                         if ( tmp != ptr )
577                                 memcpy(tmp, ptr, TBTPOINTERSIZE(db));
578                         ptr = (TBTPointer*)( (char*)ptr + TBTPOINTERSIZE(db) );
579                         tmp = (TBTPointer*)( (char*)tmp + TBTPOINTERSIZE(db) );
580                 } else {
581                         page->freespace += TBTPOINTERSIZE(db) + 
582                                 ( ( page->isleaf ) ? PTRALIGN(ptr->pointer.leaf.length) : 0 ) +
583                                 ( ( db->keylen ) ? 0 : PTRALIGN(ptr->key.varlen.length) );
584                         ptrptr++;
585                         ptr = (TBTPointer*)( (char*)ptr + TBTPOINTERSIZE(db) );
586                 }
587         }
588
589         if ( (char*)ptr - page->data < TBTPOINTERSIZE(db)*page->npointer && tmp!=ptr )
590                 memmove( tmp, ptr, page->npointer * TBTPOINTERSIZE(db) - ((char*)ptr - page->data) );
591
592         page->npointer-=num;
593
594         tmp = (TBTPointer*)page->data;
595         start = page->npointer * TBTPOINTERSIZE(db) + page->freespace;
596         for(i=0;i<page->npointer;i++) {
597                 if ( page->isleaf ) {
598                         memcpy( bufptr, page->data + tmp->pointer.leaf.offset, tmp->pointer.leaf.length );
599                         tmp->pointer.leaf.offset = start + (bufptr-buf);
600                         bufptr+=PTRALIGN(tmp->pointer.leaf.length);
601                 }
602
603                 if ( db->keylen == 0 ) {
604                         if ( tmp->key.varlen.length )
605                                 memcpy( bufptr, page->data + tmp->key.varlen.offset, tmp->key.varlen.length );
606                         tmp->key.varlen.offset = start + (bufptr-buf);
607                         bufptr+=PTRALIGN(tmp->key.varlen.length);
608                 }
609                 tmp = (TBTPointer*)( (char*)tmp + TBTPOINTERSIZE(db) );
610         }
611
612         memcpy( page->data + start, buf, bufptr-buf );
613 }
614
615 static u_int32_t
616 findDelimiter(TBTree *db, TBTPage *page, TBTPointer *ptr, int size, u_int32_t start, int *where) {
617         int  *sizes = (int*) tmalloc( sizeof(int) * (page->npointer+1) );
618         int i;
619         TBTPointer *tomove;
620         int lfree=TBTREEPAGESIZE-TBTPAGEHDRSZ, rfree=TBTREEPAGESIZE-TBTPAGEHDRSZ;
621         int nptr = ( (char*)ptr - page->data ) / TBTPOINTERSIZE(db); 
622         int was=0;
623
624         for(i=0; i<page->npointer;i++) {
625                 tomove = (TBTPointer*)(page->data + i*TBTPOINTERSIZE(db));
626                 if ( was==0 && i==nptr ) {
627                         sizes[i] = size;
628                         was=1;
629                 } 
630                 sizes[i+was] = TBTPOINTERSIZE(db) +
631                         ( (page->isleaf) ? PTRALIGN(tomove->pointer.leaf.length) : 0 ) +
632                         ( ( db->keylen ) ? 0 : PTRALIGN(tomove->key.varlen.length) );
633         
634         }
635         
636         if ( was==0 )
637                 sizes[page->npointer]=size;
638
639         for(i=0;i<page->npointer+1;i++) {
640                 if ( i< start )
641                         lfree-=sizes[i];
642                 else 
643                         rfree-=sizes[i];
644         }
645
646         while( 1 ) {
647                 if ( lfree<0 ) {
648                         lfree+=sizes[start];
649                         rfree-=sizes[start];
650                         start--;
651                 } else if ( rfree < 0 ) { 
652                         lfree-=sizes[start];
653                         rfree+=sizes[start];
654                         start++;
655                 } else
656                         break;
657         }
658
659         *where = (nptr < start) ? -1 : 1;
660         if ( start>nptr )
661                 start--;
662
663         tfree(sizes);
664         return start;
665 }
666
667 static void
668 populatePages(TBTree *db, TBTPage *srcpage, TBTPage* newpage, u_int32_t start) {
669         u_int32_t i;
670         TBTPointer *tomove, **todelete = tmalloc( sizeof(TBTPointer*)* srcpage->npointer );
671         u_int32_t numtodelete=0;
672         TBTValue k, v;
673
674         for(i=start;i<srcpage->npointer;i++) {
675                 tomove = (TBTPointer*)(srcpage->data + i*TBTPOINTERSIZE(db));
676
677                 todelete[numtodelete] = tomove;
678                 numtodelete++;
679
680                 if ( db->keylen ) 
681                         k.value = tomove->key.fixed.key;
682                 else {
683                         k.length = tomove->key.varlen.length;
684                         k.value = srcpage->data + tomove->key.varlen.offset;
685                 } 
686                                 
687                 if ( srcpage->isleaf ) {
688                         v.length = tomove->pointer.leaf.length;
689                         v.value = srcpage->data + tomove->pointer.leaf.offset;
690                         packLeafKV(db, newpage, NULL, &k, &v);
691                 } else 
692                         packInternalKV(db, newpage, NULL, &k, tomove->pointer.internal.link);
693         }
694
695         deleteKV(db, srcpage, todelete, numtodelete);   
696         tfree(todelete);
697 }
698
699 static int
700 splitPage(TBTree *db, TBTMemPage *srcpage, TBTMemPage** newpage, TBTPointer **ptr, int size, TBTValue *key, TBTValue *value, u_int32_t pagenumber) {
701         TBTMemPage *tmp;
702         int start=0, where=0;
703         int nptr = ( (char*)(*ptr) - srcpage->page.data ) / TBTPOINTERSIZE(db);
704
705         if ( TBTNewPage(db, newpage) )
706                 return TBT_ERROR;
707
708         tmp = *newpage;
709         srcpage->issynced=tmp->issynced=0;
710         tmp->islocked=1;
711         tmp->page.rightlink = srcpage->page.rightlink;
712         srcpage->page.rightlink = tmp->pagenumber;
713         tmp->page.isleaf = srcpage->page.isleaf;
714
715         switch(db->strategy) {
716                 case TBT_STATEGY_HALF:
717                         start = findDelimiter(db, &(srcpage->page), *ptr, size, (srcpage->page.npointer+1)/2, &where);  
718                         break;
719                 case TBT_STATEGY_LEFTFILL:
720                         start = findDelimiter(db, &(srcpage->page), *ptr, size, srcpage->page.npointer, &where);        
721                         break;
722                 case TBT_STATEGY_RIGHTFILL:
723                         start = findDelimiter(db, &(srcpage->page), *ptr, size, 0, &where);     
724                         break;
725                 default:
726                         tlog(TL_CRIT,"splitPage: unknown strategy: %d", db->strategy);
727                         return TBT_ERROR;
728         }
729
730         populatePages(db, &(srcpage->page), &(tmp->page), start);
731         if ( where<0 )
732                 tmp=srcpage;
733         else
734                 nptr-=start;
735         *ptr = (TBTPointer*)(tmp->page.data + nptr*TBTPOINTERSIZE(db));
736
737         if ( tmp->page.isleaf ) 
738                 packLeafKV(db, &(tmp->page), *ptr, key, value);
739         else
740                 packInternalKV(db, &(tmp->page), *ptr, key, pagenumber);
741
742         return TBT_OK;
743 }
744
745 static int
746 savePage(TBTree *db, TBTMemPage *page) {
747         int rc;
748         page->islocked=0;
749
750         if ( !page->iscached ) {
751                 if (  page->issynced==0 && (rc=TBTWritePage(db, page)) != TBT_OK )
752                         return rc;
753                 tfree(page);
754         }
755         return TBT_OK;
756 }
757
758 static int
759 layerInsert(TBTree *db, u_int32_t pagenumber, TBTMemPage **left, TBTMemPage **right, TBTValue *key, TBTValue *value) {
760         int rc,fnd;
761         TBTPointer      *ptr=NULL;
762         TBTMemPage      *page=NULL, *newright=NULL;
763                         
764         if ( (rc=TBTReadPage(db, pagenumber, &page))!=TBT_OK )
765                 return rc;
766
767         page->islocked=1;
768
769         fnd = findInPage(db, &(page->page), key, &ptr );
770
771         *left=*right=NULL;
772         if ( page->page.isleaf ) {
773                 u_int32_t size = TBTPOINTERSIZE(db) +
774                         PTRALIGN(value->length) +
775                         ( ( db->keylen ) ? 0 : PTRALIGN(key->length) );
776                 
777                 if ( size>(TBTREEPAGESIZE-TBTPAGEHDRSZ)/2 ) {
778                         tlog(TL_ALARM,"Huge size: %d bytes", size);
779                         return TBT_HUGE;
780                 }
781
782                 if (fnd==FOUND) {
783                         deleteKV(db, &(page->page), &ptr, 1);
784                         page->issynced=0;
785                 }
786
787                 if ( size <= page->page.freespace ) {
788                         packLeafKV(db, &(page->page), ptr, key, value);
789                         page->issynced=0;
790                 } else {
791                         rc = splitPage(db, page, &newright, &ptr, size, key, value, 0);
792                         if ( rc != TBT_OK ) return rc;
793
794                         page->issynced = newright->issynced = 0;
795                         page->islocked = newright->islocked = 1;
796                 }
797         } else {
798                 if ( (rc=layerInsert(db, ptr->pointer.internal.link, left, right, key, value))!=TBT_OK ) {
799                         page->islocked=0;
800                         return rc;
801                 }
802
803                 if ( *left ) { /* child page was splited */
804                         u_int32_t size;
805                         TBTPointer *rightptr = (TBTPointer*)( (*left)->page.data + ((*left)->page.npointer-1) * TBTPOINTERSIZE(db) );
806                         TBTValue K;
807
808                         if ( ISINFPOINTER(db, &(page->page), ptr) ) {
809                                 deleteKV(db, &(page->page), &ptr, 1);
810                                 K.length = 0; K.value = NULL;
811                                 packInternalKV(db, &(page->page), ptr, &K, (*right)->pagenumber);
812                         } else 
813                                 ptr->pointer.internal.link=(*right)->pagenumber;
814
815                         K.length = ( db->keylen ) ? db->keylen : rightptr->key.varlen.length;
816                         K.value  = ( db->keylen ) ? rightptr->key.fixed.key : (*left)->page.data + rightptr->key.varlen.offset;
817
818                         size = TBTPOINTERSIZE(db) + ( ( db->keylen ) ? 0 : PTRALIGN(K.length) );
819
820                         if ( size <= page->page.freespace ) {
821                                 packInternalKV(db, &(page->page), ptr, &K, (*left)->pagenumber);
822                                 page->issynced=0;
823                         } else {
824                                 rc = splitPage(db, page, &newright, &ptr, size, &K, NULL, (*left)->pagenumber);
825                                 if ( rc != TBT_OK ) return rc;
826
827                                 page->issynced = newright->issynced = 0;
828                                 page->islocked = newright->islocked = 1;
829                         }
830                 }
831         }
832
833         if ( *left ) {
834                 if ( (rc=savePage(db, *left))!=TBT_OK )
835                         return rc;
836                 if ( (rc=savePage(db, *right))!=TBT_OK )
837                         return rc;
838                 *left=*right=NULL;
839         }
840
841         if ( newright ) {
842                 if ( page->pagenumber==TBTPAGEROOT ) { /* root was splited */
843                         TBTMemPage  *newleft;
844                         TBTValue K;
845                 
846
847                         if  ( (rc=TBTNewPage(db,&newleft))!=TBT_OK )
848                                 return rc;
849                         memcpy( &(newleft->page) , &(page->page), TBTREEPAGESIZE);
850                         memset( &(page->page), 0, TBTREEPAGESIZE);
851                         page->page.freespace=TBTREEPAGESIZE-TBTPAGEHDRSZ;
852         
853                         ptr = (TBTPointer*)( newleft->page.data + (newleft->page.npointer-1) * TBTPOINTERSIZE(db) ); 
854                         K.length = ( db->keylen ) ? db->keylen : ptr->key.varlen.length;
855                         K.value  = ( db->keylen ) ? ptr->key.fixed.key : newleft->page.data + ptr->key.varlen.offset;
856                         packInternalKV(db, &(page->page), NULL, &K, newleft->pagenumber);
857                 
858                         ptr = (TBTPointer*)( newright->page.data + (newright->page.npointer-1) * TBTPOINTERSIZE(db) ); 
859                         K.length = 0;
860                         K.value  = NULL;
861                         packInternalKV(db, &(page->page), NULL, &K, newright->pagenumber);
862
863                         if ( (rc=savePage(db, newright))!=TBT_OK )
864                                 return rc;
865                         if ( (rc=savePage(db, newleft))!=TBT_OK )
866                                 return rc;
867                         if ( (rc=savePage(db, page))!=TBT_OK )
868                                 return rc;
869                 } else {
870                         *left = page;
871                         *right = newright;
872                 }
873         } else if ( (rc=savePage(db, page))!=TBT_OK )
874                 return rc;
875         
876
877         return TBT_OK;
878
879
880 int
881 TBTInsert(TBTree *db, TBTValue *key, TBTValue *value) {
882         int rc;
883         TBTMemPage *lpage=NULL, *rpage=NULL;
884
885         rc = layerInsert(db, TBTPAGEROOT, &lpage, &rpage, key, value);
886
887         tassert( lpage==NULL && rpage==NULL );
888
889         return rc;
890 }
891
892 int
893 TBTDelete(TBTree *db, TBTValue *key) {
894         TBTMemPage      *page;
895         TBTPointer *ptr;
896         int rc;
897
898         if ( (rc=findPage(db, key, &page))!=TBT_OK )
899                 return rc; 
900                 
901         if ( page == NULL )
902                 return TBT_OK;
903
904         if ( findInPage(db, &(page->page), key, &ptr) != FOUND ) {
905                 if ( !(page)->iscached )
906                         tfree(page);
907                 return TBT_OK;
908         } 
909
910         deleteKV(db, &(page->page), &ptr, 1);
911         page->issynced=0;
912
913         rc=savePage(db, page);
914
915         return rc;      
916 }
917
918 int 
919 TBTInitIterator(TBTree *db, TBTIterator *iterator) {
920         TBTMemPage      *page=NULL;
921         TBTPointer *ptr;
922         int rc;
923         u_int32_t pagenum=TBTPAGEROOT;
924
925         memset(iterator, 0, sizeof(TBTIterator));
926
927         do {
928                 if ( page && !page->iscached )
929                         tfree(page);
930                 if ( (rc=TBTReadPage(db, pagenum, &page))!=TBT_OK )
931                         return rc;
932
933                 ptr = (TBTPointer*)( page->page.data );
934                 pagenum = ptr->pointer.internal.link;
935         } while( !page->page.isleaf );
936
937         page->islocked=1;
938         iterator->page = page;  
939         iterator->ptr = ptr;    
940
941         return TBT_OK;
942 }
943
944 void 
945 TBTFreeIterator(TBTree *db, TBTIterator *iterator) {
946         
947         if ( iterator->page ) {
948                 iterator->page->islocked=0;
949                 if ( !iterator->page->iscached )
950                         tfree(iterator->page);
951         }
952
953         memset(iterator, 0, sizeof(TBTIterator));
954 }
955
956 int 
957 TBTIterate(TBTree *db, TBTIterator *iterator, TBTValue *key, TBTValue *value ) {
958         int rc;
959
960         if (  iterator->page==NULL ) {
961                 memset(key, 0, sizeof(TBTValue));
962                 memset(value, 0, sizeof(TBTValue));
963                 return TBT_OK;
964         }
965
966         if ( (char*)(iterator->ptr) - iterator->page->page.data >= TBTPOINTERSIZE(db) * iterator->page->page.npointer ) {
967                 do {
968                         u_int32_t pagenumber = iterator->page->page.rightlink;
969                         iterator->page->islocked=0;
970                         if ( !iterator->page->iscached )
971                                 tfree(iterator->page);
972                         iterator->page=NULL;
973
974                         if ( pagenumber==TBTPAGEROOT ) {
975                                 memset(key, 0, sizeof(TBTValue));
976                                 memset(value, 0, sizeof(TBTValue));
977                                 return TBT_OK;
978                         }
979                         if ( (rc=TBTReadPage(db, pagenumber, &(iterator->page)))!=TBT_OK )
980                                 return rc;
981                 } while ( iterator->page->page.npointer == 0 );
982
983                 iterator->page->islocked=1;
984                 iterator->ptr = (TBTPointer*)( iterator->page->page.data );
985         }
986
987         if ( db->keylen ) {
988                 key->length = db->keylen;
989                 key->value = iterator->ptr->key.fixed.key;
990         } else {
991                 key->length = iterator->ptr->key.varlen.length; 
992                 key->value = iterator->page->page.data + iterator->ptr->key.varlen.offset;
993         }
994
995         value->length = iterator->ptr->pointer.leaf.length; 
996         value->value = iterator->page->page.data + iterator->ptr->pointer.leaf.offset;
997
998         iterator->ptr = ( TBTPointer* ) (  (char*)(iterator->ptr) + TBTPOINTERSIZE(db) );
999
1000         return TBT_OK;
1001 }
1002
1003