1d0999aebc7d63fe6de1f0dca58a659fc9c55a61
[tedtools.git] / tbtree.c
1 /*
2  * Copyright (c) 2005 Teodor Sigaev <teodor@sigaev.ru>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *      notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *      notice, this list of conditions and the following disclaimer in the
12  *      documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the author nor the names of any co-contributors
14  *      may be used to endorse or promote products derived from this software
15  *      without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL CONTRIBUTORS BE LIABLE FOR ANY
21  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
23  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
25  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
27  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <fcntl.h>
32 #include <unistd.h>
33 #include <string.h>
34 #include <errno.h>
35 #include <sys/types.h>
36 #include <sys/stat.h>
37 #include <sys/file.h>
38 #include <sys/time.h>
39
40 #include "tmalloc.h"
41 #include "tlog.h"
42
43 #include "tbtree.h"
44
45 static int TBTReadPage(TBTree *db, u_int32_t pagenumber, TBTMemPage **page) ;
46 static int TBTWritePage(TBTree *db, TBTMemPage *page) ;
47 static int TBTNewPage(TBTree *db, TBTMemPage **page) ;
48
49 int 
50 TBTOpen(TBTree *db, char *file) {
51         off_t offset;
52
53         if ( db->readonly ) {
54                 db->fd = open(file, O_RDONLY);
55                 if ( db->fd < 0 ) {
56                         tlog(TL_CRIT,"TBTOpen: open failed: %s",strerror(errno));
57                         return TBT_ERROR;
58                 }
59                 if ( flock( db->fd, LOCK_SH ) < 0 ) {
60                         tlog(TL_CRIT,"TBTOpen: flock failed: %s",strerror(errno));
61                         close(db->fd);
62                         return TBT_ERROR;
63                 }
64         } else {
65                 db->fd = open(file, O_CREAT | O_RDWR, 0666);
66                 if ( db->fd < 0 ) {
67                         tlog(TL_CRIT,"TBTOpen: open failed: %s",strerror(errno));
68                         return TBT_ERROR;
69                 }
70                 if ( flock( db->fd, LOCK_EX ) < 0 ) {
71                         tlog(TL_CRIT,"TBTOpen: flock failed: %s",strerror(errno));
72                         close(db->fd);
73                         return TBT_ERROR;
74                 }
75         }
76
77         if ( db->npage ) {
78                 db->Cache=(TBTMemPage**)t0malloc( sizeof(TBTMemPage*)*db->npage );
79                 db->TimeCache=(TBTMemPage**)t0malloc( sizeof(TBTMemPage*)*db->npage );
80                 db->curpage=0;
81         }
82
83         db->cachehits = db->pageread = db->pagewrite = 0;
84
85         if ( (offset=lseek(db->fd, 0, SEEK_END)) < 0 ) {
86                 tlog(TL_CRIT,"TBTOpen: lseek failed: %s",strerror(errno));
87                 flock( db->fd, LOCK_UN );
88                 close(db->fd);
89                 return TBT_ERROR;
90         }
91
92         if ( offset==0 ) {
93                 if ( db->readonly ) {
94                         tlog(TL_CRIT,"TBTOpen: file is void");
95                         flock( db->fd, LOCK_UN );
96                         close(db->fd);
97                         return TBT_ERROR;
98                 } else {
99                         TBTMemPage *page;
100                         if ( TBTNewPage(db, &page) != TBT_OK ) {
101                                 flock( db->fd, LOCK_UN );
102                                 close(db->fd);
103                                 return TBT_ERROR;
104                         }
105                         page->page.isleaf=1;
106                         if ( TBTWritePage(db, page)!=TBT_OK ){
107                                 flock( db->fd, LOCK_UN );
108                                 close(db->fd);
109                                 return TBT_ERROR;
110                         }
111                 }
112         } 
113
114         db->lastpagenumber=0;                            
115         return TBT_OK; 
116 }
117
118 int
119 TBTClose(TBTree *db) {
120         int rc=0;
121
122         if ( !db->readonly ) 
123                 rc=TBTSync(db);
124
125         if ( db->npage ) {
126                 u_int32_t i;
127
128                 for(i=0; i<db->curpage; i++)
129                         tfree( db->Cache[i] );
130
131                 tfree( db->Cache );
132                 tfree( db->TimeCache );
133         }
134         
135         flock( db->fd, LOCK_UN );
136         close(db->fd);  
137         
138         return (rc) ? TBT_ERROR : TBT_OK;
139 }
140
141 int
142 TBTSync(TBTree *db) {
143         u_int32_t i;
144         int rc=0;
145
146         if ( db->readonly || db->npage==0 )
147                 return TBT_OK;
148  
149         for(i=0; i<db->curpage; i++)
150                 if ( !db->Cache[i]->issynced )
151                         rc |= TBTWritePage(db, db->Cache[i]);
152         
153         if ( fsync(db->fd) ) {
154                 tlog(TL_CRIT,"TBTSync: fsync failed: %s",strerror(errno));
155                 rc=1;
156         }
157
158         return (rc) ? TBT_ERROR : TBT_OK;
159 }
160
161 static TBTMemPage*
162 PageAlloc(TBTree *db, int *pos) {
163         TBTMemPage *page;
164
165         if ( db->curpage < db->npage ) {
166                 page = db->Cache[db->curpage] = db->TimeCache[db->curpage] = (TBTMemPage*)t0malloc(sizeof(TBTMemPage));
167                 *pos = db->curpage;
168                 page->iscached=1;
169                 db->curpage++;
170         } else if ( db->npage == 0 ) {
171                 page = (TBTMemPage*)t0malloc(sizeof(TBTMemPage));
172         } else {
173                 u_int32_t i;
174                 for( i=0; i<db->curpage && db->TimeCache[i]->islocked; i++ );
175                 if ( i == db->curpage ) { /*all pages locked*/
176                         /*tlog(TL_WARN,"TBTReadPage: all pages in cache locked, increase number of cached pages");*/
177                         page = (TBTMemPage*)t0malloc(sizeof(TBTMemPage));
178                 } else {
179                         page = db->TimeCache[i];
180                         if ( !page->issynced )
181                                 if ( TBTWritePage(db, page) )
182                                         return NULL;
183                         memset(page,0,sizeof(TBTMemPage));
184                         page->iscached=1;
185                         *pos=i;
186                 }
187         }
188
189         return page;
190 }
191
192 static void
193 moveFirstTime(TBTree *db, int pos) {
194         TBTMemPage *ptr;
195         
196         if ( pos < 0 ) 
197                 return;
198         if (pos+1==db->curpage )
199                 return;
200
201         ptr = db->TimeCache[pos];
202         memmove( db->TimeCache + pos , db->TimeCache + pos + 1, sizeof(TBTMemPage*)*(db->curpage-pos-1) );
203         db->TimeCache[db->curpage-1] = ptr;
204 }
205
206 static void
207 findAndMoveFirstTime(TBTree *db, TBTMemPage *page) {
208         int pos;
209         for(pos=0;pos<db->curpage;pos++)
210                 if ( page==db->TimeCache[pos] ) {
211                         moveFirstTime(db,pos);
212                         break;
213                 }
214         tassert( pos<db->curpage );
215 }
216
217 static void
218 findAndMovePN(TBTree *db, TBTMemPage *page) {
219         int pos;
220
221         if ( db->curpage < 2 )
222                 return;
223
224         for(pos=0;pos<db->curpage;pos++)
225                 if ( page==db->Cache[pos] ) 
226                         break;
227         tassert( pos<db->curpage );
228
229         while(1) {
230                 if ( pos+1 != db->curpage && db->Cache[pos+1]->pagenumber < page->pagenumber ) {
231                         db->Cache[pos] = db->Cache[pos+1];
232                         db->Cache[pos+1] = page;
233                         pos++; 
234                 } else if ( pos!=0 && db->Cache[pos-1]->pagenumber > page->pagenumber ) {
235                         db->Cache[pos] = db->Cache[pos-1];
236                         db->Cache[pos-1] = page;
237                         pos--;
238                 } else
239                         break; 
240         }       
241 }
242
243 static int
244 cmpNPage(const void *a, const void *b) {
245         if (  (*(TBTMemPage**)a)->pagenumber == (*(TBTMemPage**)b)->pagenumber ) 
246                 return 0;
247         return ( (*(TBTMemPage**)a)->pagenumber > (*(TBTMemPage**)b)->pagenumber ) ? 1 : -1;
248 }
249
250 static int
251 TBTReadPage(TBTree *db, u_int32_t pagenumber, TBTMemPage **page) {
252         TBTMemPage      *ptr, key, **res;
253         int rc;
254         int pos;
255
256         ptr=&key;
257         key.pagenumber = pagenumber;
258
259         res = (TBTMemPage**)bsearch(&ptr, db->Cache, db->curpage, sizeof(TBTMemPage*), cmpNPage);
260         if ( res ) {
261                 db->cachehits++;
262                 *page=*res;
263                 findAndMoveFirstTime(db, *page);
264         } else {
265                 off_t  offset = (off_t)pagenumber * (off_t)TBTREEPAGESIZE;
266                 
267                 if( (*page = PageAlloc(db,&pos))==NULL )
268                         return TBT_ERROR;
269
270                 if ( lseek(db->fd, offset, SEEK_SET) != offset ) {
271                         tlog(TL_CRIT,"TBTReadPage: lseek failed: %s",strerror(errno));
272                         return TBT_ERROR;
273                 }
274                 (*page)->issynced=1;
275                 (*page)->pagenumber=pagenumber;
276                 if ( (rc=read(db->fd, &( (*page)->page ), TBTREEPAGESIZE)) != TBTREEPAGESIZE ) {
277                         tlog(TL_CRIT,"TBTReadPage: read failed: %s %d %d",strerror(errno), pagenumber, offset);
278                         return TBT_ERROR;
279                 }
280                 if ( (*page)->iscached ) {
281                         moveFirstTime(db, pos);
282                         findAndMovePN(db, *page);
283                 } 
284         }
285
286         db->pageread++;
287         gettimeofday( &( (*page)->last ), NULL );
288         
289         return TBT_OK;   
290 }
291
292 static int
293 TBTWritePage(TBTree *db, TBTMemPage *page) {
294         off_t  offset = (off_t)(page->pagenumber) * (off_t)TBTREEPAGESIZE;
295         off_t  size;
296
297         if ( page->issynced )
298                 return TBT_OK;
299
300         if ( lseek(db->fd, offset, SEEK_SET) != offset ) {
301                 tlog(TL_CRIT,"TBTWritePage: lseek failed: %s",strerror(errno));
302                 return TBT_ERROR;
303         }
304
305         if ( (size=write(db->fd, &( page->page ), TBTREEPAGESIZE)) != TBTREEPAGESIZE ) {
306                 tlog(TL_CRIT,"TBTWritePage: write failed: %s (writed only %d bytes)",strerror(errno), size);
307                 return TBT_ERROR;
308         }
309
310         page->issynced=1;
311         db->pagewrite++;
312
313         return TBT_OK;
314 }
315
316 static int
317 TBTNewPage(TBTree *db, TBTMemPage **page) {
318         int pos;
319         if ( db->lastpagenumber==0 ) {
320                 off_t  offset;
321                 if ( (offset=lseek(db->fd, 0, SEEK_END)) < 0 ) {
322                         tlog(TL_CRIT,"TBTNewPage: lseek failed: %s",strerror(errno));
323                         return TBT_ERROR;
324                 }
325
326                 if ( offset % TBTREEPAGESIZE != 0 ) {
327                         tlog(TL_CRIT,"TBTNewPage: file corrupted");
328                         return TBT_ERROR;
329                 }
330
331                 db->lastpagenumber = offset/TBTREEPAGESIZE;
332         }
333
334         if( (*page = PageAlloc(db,&pos))==NULL )
335                 return TBT_ERROR;
336         (*page)->pagenumber = db->lastpagenumber;
337         (*page)->issynced=0;
338         (*page)->islocked=1;
339
340         db->lastpagenumber++;
341
342         memset( &((*page)->page), 0, sizeof(TBTPAGEHDRSZ));
343         (*page)->page.freespace = TBTREEPAGESIZE-TBTPAGEHDRSZ;
344
345         gettimeofday( &( (*page)->last ), NULL );
346         if ( (*page)->iscached ) {
347                 moveFirstTime(db, pos);
348                 findAndMovePN(db, *page);
349         }
350         
351         return TBT_OK;
352 }
353
354 #define NOTFOUND        0
355 #define FOUND           1
356
357 static void
358 printValue(u_int32_t len, char *ptr) {
359         putchar('\'');
360         while(len>0) {
361                 putchar(*ptr);
362                 ptr++; len--;
363         }
364         putchar('\'');
365 }
366
367 static void
368 dumpPage(TBTree *db, TBTPage *page, int follow) {
369         u_int32_t i,j;
370
371         for(j=0;j<follow;j++)
372                 putchar(' ');
373         printf("Page: f:%d l:%d r:%d n:%d\n", 
374                 page->freespace,
375                 page->isleaf,
376                 page->rightlink,
377                 page->npointer
378         );      
379         for(i=0;i<page->npointer;i++) {
380                 TBTPointer *ptr= (TBTPointer*)(page->data+TBTPOINTERSIZE(db)*i);
381                 for(j=0;j<follow+2;j++)
382                         putchar(' ');
383                 printf("i:%d(%d) kl:%d ko:%d ",
384                         i, (int)ptr, 
385                         (db->keylen) ? db->keylen : ptr->key.varlen.length,
386                         (db->keylen) ? 0 : ptr->key.varlen.offset
387                 );
388                 if ( db->keylen==0 ) 
389                         printValue(ptr->key.varlen.length, page->data + ptr->key.varlen.offset);
390                 else
391                         printf("'%d'", *(int*)(ptr->key.fixed.key));
392                 if ( page->isleaf ) {
393                         printf(" vl:%d vo:%d ", 
394                                 ptr->pointer.leaf.length,
395                                 ptr->pointer.leaf.offset
396                         );
397                         printValue(ptr->pointer.leaf.length,  page->data + ptr->pointer.leaf.offset );
398                         putchar('\n');
399                 } else {
400                         printf(" page:%d\n", ptr->pointer.internal.link);
401                         if ( follow>=0 )
402                                 dumpTree(db, ptr->pointer.internal.link, follow+4); 
403                 }
404         }
405 }
406
407 void
408 dumpTree(TBTree *db, u_int32_t pagenumber, int follow) {
409         TBTMemPage  *page;
410
411         if ( TBTReadPage(db, pagenumber, &page) ) {
412                 fprintf(stderr,"BEDA %d\n", pagenumber);
413                 return;
414         }
415
416         page->islocked=1;
417         dumpPage(db, &(page->page), follow);
418         page->islocked=0;
419         if ( !page->iscached )
420                 tfree(page);
421 }
422
423 static int
424 findInPage(TBTree *db, TBTPage *page, TBTValue *key, TBTPointer **res) {
425         TBTPointer      *StopLow = (TBTPointer*)(page->data);
426         TBTPointer      *StopHigh = (TBTPointer*)(page->data + TBTPOINTERSIZE(db)*page->npointer);
427         TBTPointer      *StopMiddle;
428         int diff ;
429         TBTValue  A;
430
431         /* Loop invariant: StopLow <= StopMiddle < StopHigh */
432         while (StopLow < StopHigh) {
433                 StopMiddle = (TBTPointer*)(((char*)StopLow) + TBTPOINTERSIZE(db)*((((char*)StopHigh - (char*)StopLow)/TBTPOINTERSIZE(db)) >> 1));
434                 if ( db->keylen == 0 ) {
435                         A.length = StopMiddle->key.varlen.length;
436                         A.value = page->data + StopMiddle->key.varlen.offset;
437                 } else {
438                         A.length =  db->keylen;
439                         A.value = StopMiddle->key.fixed.key;
440                 }
441         
442                 if ( ISINFPOINTER(db, page, StopMiddle) )
443                         diff = 1;
444                 else 
445                         diff = db->cmpkey( &A, key );
446                 if ( diff == 0 ) {
447                         *res = StopMiddle;
448                         return FOUND;
449                 } else if ( diff < 0 )
450                         StopLow = (TBTPointer*)((char*)StopMiddle + TBTPOINTERSIZE(db));
451                 else
452                         StopHigh = StopMiddle;
453         }
454
455         *res = StopHigh;
456         return NOTFOUND;
457 }
458
459 static int
460 findPage(TBTree *db, TBTValue *key, TBTMemPage **page) {
461         u_int32_t pagenumber = TBTPAGEROOT;
462         int rc = TBT_OK;
463
464         *page=NULL;
465         while(1) {
466                 TBTPointer *ptr;
467
468                 if ( (rc=TBTReadPage(db, pagenumber, page))!=TBT_OK )
469                         return rc;
470
471                 if ( (*page)->page.isleaf ) 
472                         return TBT_OK;
473
474                 findInPage(db, &((*page)->page), key, &ptr );
475                 pagenumber = ptr->pointer.internal.link;
476                 if ( !(*page)->iscached )
477                         tfree(*page);
478         }
479
480         return TBT_OK;
481 }
482
483 int
484 TBTFind(TBTree *db, TBTValue *key, TBTValue *value) {
485         TBTMemPage      *page;
486         TBTPointer *ptr;
487         int rc;
488
489         memset(value, 0, sizeof(TBTValue));
490
491         if ( (rc=findPage(db, key, &page))!=TBT_OK )
492                 return rc; 
493                 
494         if ( page == NULL )
495                 return TBT_OK;
496
497         if ( findInPage(db, &(page->page), key, &ptr) != FOUND ) {
498                 if ( !(page)->iscached )
499                         tfree(page);
500                 return TBT_OK;
501         } 
502
503         value->length = ptr->pointer.leaf.length;
504         value->value = tmalloc(value->length);
505         memcpy( value->value, page->page.data + ptr->pointer.leaf.offset, value->length );
506
507         if ( !(page)->iscached )
508                 tfree(page);
509
510         return TBT_OK;  
511 }
512
513 static void
514 packLeafKV(TBTree *db, TBTPage *page, TBTPointer *ptr, TBTValue *key, TBTValue *value) {
515         if ( ptr ) {
516                 if ( (char*)ptr - page->data  != page->npointer * TBTPOINTERSIZE(db) ) 
517                         memmove((char*)ptr+TBTPOINTERSIZE(db), ptr, page->npointer * TBTPOINTERSIZE(db) - ( (char*)ptr - page->data ));
518         } else
519                 ptr = (TBTPointer*)(page->data + page->npointer * TBTPOINTERSIZE(db));
520
521         page->npointer++;
522         page->freespace -= TBTPOINTERSIZE(db) + PTRALIGN(value->length);
523         if ( db->keylen ) {
524                 memcpy( ptr->key.fixed.key, key->value, db->keylen );
525                 ptr->pointer.leaf.offset = page->npointer * TBTPOINTERSIZE(db) + page->freespace;
526         } else {
527                 page->freespace -= PTRALIGN(key->length);
528                 ptr->key.varlen.length = key->length;
529                 ptr->key.varlen.offset = page->npointer * TBTPOINTERSIZE(db) + page->freespace;
530                 memcpy( page->data + ptr->key.varlen.offset, key->value, key->length);
531                 ptr->pointer.leaf.offset = page->npointer * TBTPOINTERSIZE(db) + page->freespace + PTRALIGN(key->length);
532         } 
533         ptr->pointer.leaf.length = value->length;
534         memcpy( page->data + ptr->pointer.leaf.offset, value->value, value->length );
535 }
536
537 static void
538 packInternalKV(TBTree *db, TBTPage *page, TBTPointer *ptr, TBTValue *key, u_int32_t pagenumber) {
539         if ( ptr ) {
540                 if ( (char*)ptr - page->data  != page->npointer * TBTPOINTERSIZE(db) ) 
541                         memmove((char*)ptr+TBTPOINTERSIZE(db), ptr, page->npointer * TBTPOINTERSIZE(db) - ( (char*)ptr - page->data ));
542         } else
543                 ptr = (TBTPointer*)(page->data + page->npointer * TBTPOINTERSIZE(db));
544
545         page->npointer++;
546         page->freespace -= TBTPOINTERSIZE(db);
547
548         if ( db->keylen ) {
549                 if ( key->value )
550                         memcpy( ptr->key.fixed.key, key->value, db->keylen );
551                 else
552                         memset( ptr->key.fixed.key, 0, db->keylen );
553         } else {
554                 page->freespace -= PTRALIGN(key->length);
555                 ptr->key.varlen.length = key->length;
556                 ptr->key.varlen.offset = page->npointer * TBTPOINTERSIZE(db) + page->freespace;
557                 if ( key->length ) 
558                         memcpy( page->data + ptr->key.varlen.offset, key->value, key->length);
559         }
560         ptr->pointer.internal.link = pagenumber;
561 }
562
563
564 static void
565 deleteKV(TBTree *db, TBTPage *page, TBTPointer **ptrs, u_int32_t num) {
566         static char buf[TBTREEPAGESIZE];
567         char  *bufptr = buf;
568         TBTPointer *ptr = (TBTPointer*)page->data, *tmp, **ptrptr = ptrs;
569         u_int32_t i, start;
570         /* suppose ptrs are ordered! */
571         tmp = ptr;
572         while( ptrptr - ptrs < num && (char*)ptr - page->data < TBTPOINTERSIZE(db)*page->npointer ) {
573                 if ( *ptrptr < ptr ) {
574                         tlog(TL_CRIT,"deleteKV: Something wrong: unexistent *ptrs"); 
575                         ptrptr++;
576                 } else if ( *ptrptr > ptr ) {
577                         if ( tmp != ptr )
578                                 memcpy(tmp, ptr, TBTPOINTERSIZE(db));
579                         ptr = (TBTPointer*)( (char*)ptr + TBTPOINTERSIZE(db) );
580                         tmp = (TBTPointer*)( (char*)tmp + TBTPOINTERSIZE(db) );
581                 } else {
582                         page->freespace += TBTPOINTERSIZE(db) + 
583                                 ( ( page->isleaf ) ? PTRALIGN(ptr->pointer.leaf.length) : 0 ) +
584                                 ( ( db->keylen ) ? 0 : PTRALIGN(ptr->key.varlen.length) );
585                         ptrptr++;
586                         ptr = (TBTPointer*)( (char*)ptr + TBTPOINTERSIZE(db) );
587                 }
588         }
589
590         if ( (char*)ptr - page->data < TBTPOINTERSIZE(db)*page->npointer && tmp!=ptr )
591                 memmove( tmp, ptr, page->npointer * TBTPOINTERSIZE(db) - ((char*)ptr - page->data) );
592
593         page->npointer-=num;
594
595         tmp = (TBTPointer*)page->data;
596         start = page->npointer * TBTPOINTERSIZE(db) + page->freespace;
597         for(i=0;i<page->npointer;i++) {
598                 if ( page->isleaf ) {
599                         memcpy( bufptr, page->data + tmp->pointer.leaf.offset, tmp->pointer.leaf.length );
600                         tmp->pointer.leaf.offset = start + (bufptr-buf);
601                         bufptr+=PTRALIGN(tmp->pointer.leaf.length);
602                 }
603
604                 if ( db->keylen == 0 ) {
605                         if ( tmp->key.varlen.length )
606                                 memcpy( bufptr, page->data + tmp->key.varlen.offset, tmp->key.varlen.length );
607                         tmp->key.varlen.offset = start + (bufptr-buf);
608                         bufptr+=PTRALIGN(tmp->key.varlen.length);
609                 }
610                 tmp = (TBTPointer*)( (char*)tmp + TBTPOINTERSIZE(db) );
611         }
612
613         memcpy( page->data + start, buf, bufptr-buf );
614 }
615
616 static u_int32_t
617 findDelimiter(TBTree *db, TBTPage *page, TBTPointer *ptr, int size, u_int32_t start, int *where) {
618         int  *sizes = (int*) tmalloc( sizeof(int) * (page->npointer+1) );
619         int i;
620         TBTPointer *tomove;
621         int lfree=TBTREEPAGESIZE-TBTPAGEHDRSZ, rfree=TBTREEPAGESIZE-TBTPAGEHDRSZ;
622         int nptr = ( (char*)ptr - page->data ) / TBTPOINTERSIZE(db); 
623         int was=0;
624
625         for(i=0; i<page->npointer;i++) {
626                 tomove = (TBTPointer*)(page->data + i*TBTPOINTERSIZE(db));
627                 if ( was==0 && i==nptr ) {
628                         sizes[i] = size;
629                         was=1;
630                 } 
631                 sizes[i+was] = TBTPOINTERSIZE(db) +
632                         ( (page->isleaf) ? PTRALIGN(tomove->pointer.leaf.length) : 0 ) +
633                         ( ( db->keylen ) ? 0 : PTRALIGN(tomove->key.varlen.length) );
634         
635         }
636         
637         if ( was==0 )
638                 sizes[page->npointer]=size;
639
640         for(i=0;i<page->npointer+1;i++) {
641                 if ( i< start )
642                         lfree-=sizes[i];
643                 else 
644                         rfree-=sizes[i];
645         }
646
647         while( 1 ) {
648                 if ( lfree<0 ) {
649                         lfree+=sizes[start];
650                         rfree-=sizes[start];
651                         start--;
652                 } else if ( rfree < 0 ) { 
653                         lfree-=sizes[start];
654                         rfree+=sizes[start];
655                         start++;
656                 } else
657                         break;
658         }
659
660         *where = (nptr < start) ? -1 : 1;
661         if ( start>nptr )
662                 start--;
663
664         tfree(sizes);
665         return start;
666 }
667
668 static void
669 populatePages(TBTree *db, TBTPage *srcpage, TBTPage* newpage, u_int32_t start) {
670         u_int32_t i;
671         TBTPointer *tomove, **todelete = tmalloc( sizeof(TBTPointer*)* srcpage->npointer );
672         u_int32_t numtodelete=0;
673         TBTValue k, v;
674
675         for(i=start;i<srcpage->npointer;i++) {
676                 tomove = (TBTPointer*)(srcpage->data + i*TBTPOINTERSIZE(db));
677
678                 todelete[numtodelete] = tomove;
679                 numtodelete++;
680
681                 if ( db->keylen ) 
682                         k.value = tomove->key.fixed.key;
683                 else {
684                         k.length = tomove->key.varlen.length;
685                         k.value = srcpage->data + tomove->key.varlen.offset;
686                 } 
687                                 
688                 if ( srcpage->isleaf ) {
689                         v.length = tomove->pointer.leaf.length;
690                         v.value = srcpage->data + tomove->pointer.leaf.offset;
691                         packLeafKV(db, newpage, NULL, &k, &v);
692                 } else 
693                         packInternalKV(db, newpage, NULL, &k, tomove->pointer.internal.link);
694         }
695
696         deleteKV(db, srcpage, todelete, numtodelete);   
697         tfree(todelete);
698 }
699
700 static int
701 splitPage(TBTree *db, TBTMemPage *srcpage, TBTMemPage** newpage, TBTPointer **ptr, int size, TBTValue *key, TBTValue *value, u_int32_t pagenumber) {
702         TBTMemPage *tmp;
703         int start=0, where=0;
704         int nptr = ( (char*)(*ptr) - srcpage->page.data ) / TBTPOINTERSIZE(db);
705
706         if ( TBTNewPage(db, newpage) )
707                 return TBT_ERROR;
708
709         tmp = *newpage;
710         srcpage->issynced=tmp->issynced=0;
711         tmp->islocked=1;
712         tmp->page.rightlink = srcpage->page.rightlink;
713         srcpage->page.rightlink = tmp->pagenumber;
714         tmp->page.isleaf = srcpage->page.isleaf;
715
716         switch(db->strategy) {
717                 case TBT_STATEGY_HALF:
718                         start = findDelimiter(db, &(srcpage->page), *ptr, size, (srcpage->page.npointer+1)/2, &where);  
719                         break;
720                 case TBT_STATEGY_LEFTFILL:
721                         start = findDelimiter(db, &(srcpage->page), *ptr, size, srcpage->page.npointer, &where);        
722                         break;
723                 case TBT_STATEGY_RIGHTFILL:
724                         start = findDelimiter(db, &(srcpage->page), *ptr, size, 0, &where);     
725                         break;
726                 default:
727                         tlog(TL_CRIT,"splitPage: unknown strategy: %d", db->strategy);
728                         return TBT_ERROR;
729         }
730
731         populatePages(db, &(srcpage->page), &(tmp->page), start);
732         if ( where<0 )
733                 tmp=srcpage;
734         else
735                 nptr-=start;
736         *ptr = (TBTPointer*)(tmp->page.data + nptr*TBTPOINTERSIZE(db));
737
738         if ( tmp->page.isleaf ) 
739                 packLeafKV(db, &(tmp->page), *ptr, key, value);
740         else
741                 packInternalKV(db, &(tmp->page), *ptr, key, pagenumber);
742
743         return TBT_OK;
744 }
745
746 static int
747 savePage(TBTree *db, TBTMemPage *page) {
748         int rc;
749         page->islocked=0;
750
751         if ( !page->iscached ) {
752                 if (  page->issynced==0 && (rc=TBTWritePage(db, page)) != TBT_OK )
753                         return rc;
754                 tfree(page);
755         }
756         return TBT_OK;
757 }
758
759 static int
760 layerInsert(TBTree *db, u_int32_t pagenumber, TBTMemPage **left, TBTMemPage **right, TBTValue *key, TBTValue *value) {
761         int rc,fnd;
762         TBTPointer      *ptr=NULL;
763         TBTMemPage      *page=NULL, *newright=NULL;
764                         
765         if ( (rc=TBTReadPage(db, pagenumber, &page))!=TBT_OK )
766                 return rc;
767
768         page->islocked=1;
769
770         fnd = findInPage(db, &(page->page), key, &ptr );
771
772         *left=*right=NULL;
773         if ( page->page.isleaf ) {
774                 u_int32_t size = TBTPOINTERSIZE(db) +
775                         PTRALIGN(value->length) +
776                         ( ( db->keylen ) ? 0 : PTRALIGN(key->length) );
777                 
778                 if ( size>(TBTREEPAGESIZE-TBTPAGEHDRSZ)/2 ) {
779                         tlog(TL_ALARM,"Huge size: %d bytes", size);
780                         return TBT_HUGE;
781                 }
782
783                 if (fnd==FOUND) {
784                         deleteKV(db, &(page->page), &ptr, 1);
785                         page->issynced=0;
786                 }
787
788                 if ( size <= page->page.freespace ) {
789                         packLeafKV(db, &(page->page), ptr, key, value);
790                         page->issynced=0;
791                 } else {
792                         rc = splitPage(db, page, &newright, &ptr, size, key, value, 0);
793                         if ( rc != TBT_OK ) return rc;
794
795                         page->issynced = newright->issynced = 0;
796                         page->islocked = newright->islocked = 1;
797                 }
798         } else {
799                 if ( (rc=layerInsert(db, ptr->pointer.internal.link, left, right, key, value))!=TBT_OK ) {
800                         page->islocked=0;
801                         return rc;
802                 }
803
804                 if ( *left ) { /* child page was splited */
805                         u_int32_t size;
806                         TBTPointer *rightptr = (TBTPointer*)( (*left)->page.data + ((*left)->page.npointer-1) * TBTPOINTERSIZE(db) );
807                         TBTValue K;
808
809                         if ( ISINFPOINTER(db, &(page->page), ptr) ) {
810                                 deleteKV(db, &(page->page), &ptr, 1);
811                                 K.length = 0; K.value = NULL;
812                                 packInternalKV(db, &(page->page), ptr, &K, (*right)->pagenumber);
813                         } else 
814                                 ptr->pointer.internal.link=(*right)->pagenumber;
815
816                         K.length = ( db->keylen ) ? db->keylen : rightptr->key.varlen.length;
817                         K.value  = ( db->keylen ) ? rightptr->key.fixed.key : (*left)->page.data + rightptr->key.varlen.offset;
818
819                         size = TBTPOINTERSIZE(db) + ( ( db->keylen ) ? 0 : PTRALIGN(K.length) );
820
821                         if ( size <= page->page.freespace ) {
822                                 packInternalKV(db, &(page->page), ptr, &K, (*left)->pagenumber);
823                                 page->issynced=0;
824                         } else {
825                                 rc = splitPage(db, page, &newright, &ptr, size, &K, NULL, (*left)->pagenumber);
826                                 if ( rc != TBT_OK ) return rc;
827
828                                 page->issynced = newright->issynced = 0;
829                                 page->islocked = newright->islocked = 1;
830                         }
831                 }
832         }
833
834         if ( *left ) {
835                 if ( (rc=savePage(db, *left))!=TBT_OK )
836                         return rc;
837                 if ( (rc=savePage(db, *right))!=TBT_OK )
838                         return rc;
839                 *left=*right=NULL;
840         }
841
842         if ( newright ) {
843                 if ( page->pagenumber==TBTPAGEROOT ) { /* root was splited */
844                         TBTMemPage  *newleft;
845                         TBTValue K;
846                 
847
848                         if  ( (rc=TBTNewPage(db,&newleft))!=TBT_OK )
849                                 return rc;
850                         memcpy( &(newleft->page) , &(page->page), TBTREEPAGESIZE);
851                         memset( &(page->page), 0, TBTREEPAGESIZE);
852                         page->page.freespace=TBTREEPAGESIZE-TBTPAGEHDRSZ;
853         
854                         ptr = (TBTPointer*)( newleft->page.data + (newleft->page.npointer-1) * TBTPOINTERSIZE(db) ); 
855                         K.length = ( db->keylen ) ? db->keylen : ptr->key.varlen.length;
856                         K.value  = ( db->keylen ) ? ptr->key.fixed.key : newleft->page.data + ptr->key.varlen.offset;
857                         packInternalKV(db, &(page->page), NULL, &K, newleft->pagenumber);
858                 
859                         ptr = (TBTPointer*)( newright->page.data + (newright->page.npointer-1) * TBTPOINTERSIZE(db) ); 
860                         K.length = 0;
861                         K.value  = NULL;
862                         packInternalKV(db, &(page->page), NULL, &K, newright->pagenumber);
863
864                         if ( (rc=savePage(db, newright))!=TBT_OK )
865                                 return rc;
866                         if ( (rc=savePage(db, newleft))!=TBT_OK )
867                                 return rc;
868                         if ( (rc=savePage(db, page))!=TBT_OK )
869                                 return rc;
870                 } else {
871                         *left = page;
872                         *right = newright;
873                 }
874         } else if ( (rc=savePage(db, page))!=TBT_OK )
875                 return rc;
876         
877
878         return TBT_OK;
879
880
881 int
882 TBTInsert(TBTree *db, TBTValue *key, TBTValue *value) {
883         int rc;
884         TBTMemPage *lpage=NULL, *rpage=NULL;
885
886         rc = layerInsert(db, TBTPAGEROOT, &lpage, &rpage, key, value);
887
888         tassert( lpage==NULL && rpage==NULL );
889
890         return rc;
891 }
892
893 int
894 TBTDelete(TBTree *db, TBTValue *key) {
895         TBTMemPage      *page;
896         TBTPointer *ptr;
897         int rc;
898
899         if ( (rc=findPage(db, key, &page))!=TBT_OK )
900                 return rc; 
901                 
902         if ( page == NULL )
903                 return TBT_OK;
904
905         if ( findInPage(db, &(page->page), key, &ptr) != FOUND ) {
906                 if ( !(page)->iscached )
907                         tfree(page);
908                 return TBT_OK;
909         } 
910
911         deleteKV(db, &(page->page), &ptr, 1);
912         page->issynced=0;
913
914         rc=savePage(db, page);
915
916         return rc;      
917 }
918
919 int 
920 TBTInitIterator(TBTree *db, TBTIterator *iterator) {
921         TBTMemPage      *page=NULL;
922         TBTPointer *ptr;
923         int rc;
924         u_int32_t pagenum=TBTPAGEROOT;
925
926         memset(iterator, 0, sizeof(TBTIterator));
927
928         do {
929                 if ( page && !page->iscached )
930                         tfree(page);
931                 if ( (rc=TBTReadPage(db, pagenum, &page))!=TBT_OK )
932                         return rc;
933
934                 ptr = (TBTPointer*)( page->page.data );
935                 pagenum = ptr->pointer.internal.link;
936         } while( !page->page.isleaf );
937
938         page->islocked=1;
939         iterator->page = page;  
940         iterator->ptr = ptr;    
941
942         return TBT_OK;
943 }
944
945 void 
946 TBTFreeIterator(TBTree *db, TBTIterator *iterator) {
947         
948         if ( iterator->page ) {
949                 iterator->page->islocked=0;
950                 if ( !iterator->page->iscached )
951                         tfree(iterator->page);
952         }
953
954         memset(iterator, 0, sizeof(TBTIterator));
955 }
956
957 int 
958 TBTIterate(TBTree *db, TBTIterator *iterator, TBTValue *key, TBTValue *value ) {
959         int rc;
960
961         if (  iterator->page==NULL ) {
962                 memset(key, 0, sizeof(TBTValue));
963                 memset(value, 0, sizeof(TBTValue));
964                 return TBT_OK;
965         }
966
967         if ( (char*)(iterator->ptr) - iterator->page->page.data >= TBTPOINTERSIZE(db) * iterator->page->page.npointer ) {
968                 do {
969                         u_int32_t pagenumber = iterator->page->page.rightlink;
970                         iterator->page->islocked=0;
971                         if ( !iterator->page->iscached )
972                                 tfree(iterator->page);
973                         iterator->page=NULL;
974
975                         if ( pagenumber==TBTPAGEROOT ) {
976                                 memset(key, 0, sizeof(TBTValue));
977                                 memset(value, 0, sizeof(TBTValue));
978                                 return TBT_OK;
979                         }
980                         if ( (rc=TBTReadPage(db, pagenumber, &(iterator->page)))!=TBT_OK )
981                                 return rc;
982                 } while ( iterator->page->page.npointer == 0 );
983
984                 iterator->page->islocked=1;
985                 iterator->ptr = (TBTPointer*)( iterator->page->page.data );
986         }
987
988         if ( db->keylen ) {
989                 key->length = db->keylen;
990                 key->value = iterator->ptr->key.fixed.key;
991         } else {
992                 key->length = iterator->ptr->key.varlen.length; 
993                 key->value = iterator->page->page.data + iterator->ptr->key.varlen.offset;
994         }
995
996         value->length = iterator->ptr->pointer.leaf.length; 
997         value->value = iterator->page->page.data + iterator->ptr->pointer.leaf.offset;
998
999         iterator->ptr = ( TBTPointer* ) (  (char*)(iterator->ptr) + TBTPOINTERSIZE(db) );
1000
1001         return TBT_OK;
1002 }
1003
1004