Small optimization
[tedtools.git] / tbtree.c
1 /*
2  * Copyright (c) 2005 Teodor Sigaev <teodor@sigaev.ru>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *      notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *      notice, this list of conditions and the following disclaimer in the
12  *      documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the author nor the names of any co-contributors
14  *      may be used to endorse or promote products derived from this software
15  *      without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL CONTRIBUTORS BE LIABLE FOR ANY
21  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
23  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
25  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
27  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <fcntl.h>
32 #include <unistd.h>
33 #include <string.h>
34 #include <errno.h>
35 #include <sys/types.h>
36 #include <sys/stat.h>
37 #include <sys/file.h>
38
39 #include "tmalloc.h"
40 #include "tlog.h"
41
42 #include "tbtree.h"
43
44 #define NOTFOUND        0
45 #define FOUND           1
46
47 static int TBTReadPage(TBTree *db, u_int32_t pagenumber, TBTMemPage **page) ;
48 static int TBTWritePage(TBTree *db, TBTMemPage *page) ;
49 static int TBTNewPage(TBTree *db, TBTMemPage **page) ;
50
51
52
53 int 
54 TBTOpen(TBTree *db, char *file) {
55         off_t offset;
56
57         if ( db->readonly ) {
58                 db->fd = open(file, O_RDONLY);
59                 if ( db->fd < 0 ) {
60                         tlog(TL_CRIT,"TBTOpen: open failed: %s",strerror(errno));
61                         return TBT_ERROR;
62                 }
63                 if ( flock( db->fd, LOCK_SH ) < 0 ) {
64                         tlog(TL_CRIT,"TBTOpen: flock failed: %s",strerror(errno));
65                         close(db->fd);
66                         return TBT_ERROR;
67                 }
68         } else {
69                 db->fd = open(file, O_CREAT | O_RDWR, 0666);
70                 if ( db->fd < 0 ) {
71                         tlog(TL_CRIT,"TBTOpen: open failed: %s",strerror(errno));
72                         return TBT_ERROR;
73                 }
74                 if ( flock( db->fd, LOCK_EX ) < 0 ) {
75                         tlog(TL_CRIT,"TBTOpen: flock failed: %s",strerror(errno));
76                         close(db->fd);
77                         return TBT_ERROR;
78                 }
79         }
80
81         if ( db->npage ) {
82                 db->Cache=(TBTMemPage**)t0malloc( sizeof(TBTMemPage*)*HASHSIZE(db->npage) );
83                 db->curpage=0;
84         }
85
86         db->cachehits = db->pageread = db->pagewrite = 0;
87
88         if ( (offset=lseek(db->fd, 0, SEEK_END)) < 0 ) {
89                 tlog(TL_CRIT,"TBTOpen: lseek failed: %s",strerror(errno));
90                 flock( db->fd, LOCK_UN );
91                 close(db->fd);
92                 return TBT_ERROR;
93         }
94
95         if ( offset==0 ) {
96                 if ( db->readonly ) {
97                         tlog(TL_CRIT,"TBTOpen: file is void");
98                         flock( db->fd, LOCK_UN );
99                         close(db->fd);
100                         return TBT_ERROR;
101                 } else {
102                         TBTMemPage *page;
103                         if ( TBTNewPage(db, &page) != TBT_OK ) {
104                                 flock( db->fd, LOCK_UN );
105                                 close(db->fd);
106                                 return TBT_ERROR;
107                         }
108                         page->page.isleaf=1;
109                         if ( TBTWritePage(db, page)!=TBT_OK ){
110                                 flock( db->fd, LOCK_UN );
111                                 close(db->fd);
112                                 return TBT_ERROR;
113                         }
114                 }
115         } 
116
117         db->pointersize = TBTPOINTERSIZE(db);
118         db->lastpagenumber=0;                            
119         return TBT_OK; 
120 }
121
122 int
123 TBTClose(TBTree *db) {
124         int rc=0;
125
126         if ( !db->readonly ) 
127                 rc=TBTSync(db);
128
129         if ( db->npage ) {
130                 u_int32_t i;
131                 TBTMemPage *ptr,*tmp;
132
133                 for(i=0; i<HASHSIZE(db->npage); i++) {
134                         ptr = db->Cache[i];
135                         while(ptr) {
136                                 tmp = ptr->link;
137                                 tfree(ptr);
138                                 ptr=tmp;
139                         }
140                 }
141
142                 tfree( db->Cache );
143         }
144         
145         flock( db->fd, LOCK_UN );
146         close(db->fd);  
147         
148         return (rc) ? TBT_ERROR : TBT_OK;
149 }
150
151 static int
152 cmpNPage(const void *a, const void *b) {
153         tassert( (*(TBTMemPage**)a)->pagenumber != (*(TBTMemPage**)b)->pagenumber );
154         return ( (*(TBTMemPage**)a)->pagenumber > (*(TBTMemPage**)b)->pagenumber ) ? 1 : -1;
155 }
156
157
158 int
159 TBTSync(TBTree *db) {
160         int rc=0;
161
162         if ( db->readonly || db->npage==0 )
163                 return TBT_OK;
164
165         if ( db->npage ) {
166                 u_int32_t i, total=0;
167                 TBTMemPage *ptr, **data = (TBTMemPage**)tmalloc(db->npage*sizeof(TBTMemPage*)), **pptr;
168                 pptr=data;
169                 for(i=0; i<HASHSIZE(db->npage); i++) {
170                         ptr = db->Cache[i];
171                         while(ptr) {
172                                 if ( !ptr->issynced ) {
173                                         *pptr = ptr;
174                                         pptr++;
175                                 } 
176                                 ptr = ptr->link;
177                         }
178                 }
179                 
180                 total=pptr-data;
181                 if ( total > 0 ) {
182                         if ( total>1 )
183                                 qsort( data, total, sizeof(TBTMemPage*), cmpNPage);
184
185                         pptr=data;
186                         while( pptr-data<total) {
187                                 rc |= TBTWritePage(db, *pptr);
188                                 pptr++;
189                         }
190                 }
191                 tfree(data);
192         }
193         
194         if ( fsync(db->fd) ) {
195                 tlog(TL_CRIT,"TBTSync: fsync failed: %s",strerror(errno));
196                 rc=TBT_ERROR;
197         }
198
199         return (rc) ? TBT_ERROR : TBT_OK;
200 }
201
202 static TBTMemPage *
203 findInCache(TBTree *db, u_int32_t pagenumber) {
204         TBTMemPage *ptr;
205
206         ptr = db->Cache[ pagenumber % HASHSIZE(db->npage) ];
207
208         while(ptr && ptr->pagenumber != pagenumber)
209                 ptr = ptr->link; 
210
211         return ptr;;
212 }
213
214 static void
215 removeFromCache(TBTree *db, u_int32_t pagenumber) {
216         TBTMemPage *ptr, *prev=NULL;
217         int pos = pagenumber % HASHSIZE(db->npage);
218
219         ptr = db->Cache[ pos ];
220
221         while(ptr) {
222                 if ( ptr->pagenumber == pagenumber ) {
223                         if ( prev )
224                                 prev->link = ptr->link;
225                         else
226                                 db->Cache[ pos ] = ptr->link;
227                         break;
228                 }
229                 prev=ptr;
230                 ptr = ptr->link;
231         }
232 }
233
234 static void
235 insertInCache(TBTree *db, TBTMemPage *page) {
236         int pos = page->pagenumber % HASHSIZE(db->npage);
237
238         page->link = db->Cache[ pos ];
239         db->Cache[ pos ] = page;
240 }
241
242 static TBTMemPage*
243 PageAlloc(TBTree *db) {
244         TBTMemPage *page;
245
246         if ( db->curpage < db->npage ) {
247                 page = (TBTMemPage*)tmalloc(sizeof(TBTMemPage));
248                 memset( page, 0, TBTMEMPAGEHDRSZ );
249                 page->iscached=1;
250                 if ( db->curpage == 0 ) {
251                         db->TimeCache = db->TimeCacheLast = page;       
252                 } else {
253                         db->TimeCacheLast->next = page;
254                         page->prev = db->TimeCacheLast;
255                         db->TimeCacheLast = page;
256                 }
257                 db->curpage++;
258         } else if ( db->npage == 0 ) {
259                 page = (TBTMemPage*)tmalloc(sizeof(TBTMemPage));
260                 memset( page, 0, TBTMEMPAGEHDRSZ );
261         } else {
262                 page = db->TimeCache;
263                 while( page->next && page->islocked ) 
264                         page = page->next;
265                 if ( page==db->TimeCacheLast && page->islocked ) { /*all pages locked*/
266                         /* tlog(TL_WARN,"TBTReadPage: all pages in cache locked, increase number of cached pages"); */ 
267                         page = (TBTMemPage*)tmalloc(sizeof(TBTMemPage));
268                         memset( page, 0, TBTMEMPAGEHDRSZ );
269                 } else {
270                         if ( !page->issynced )
271                                 if ( TBTWritePage(db, page) )
272                                         return NULL;
273                         removeFromCache(db, page->pagenumber);
274                         if ( page != db->TimeCacheLast ) {
275                                 if ( page == db->TimeCache ) {
276                                         db->TimeCache = page->next;
277                                         db->TimeCache->prev = NULL;
278                                 } else { 
279                                         page->prev->next = page->next;
280                                         page->next->prev = page->prev;
281                                 }
282                                 memset( page, 0, TBTMEMPAGEHDRSZ );
283                                 db->TimeCacheLast->next = page;
284                                 page->prev = db->TimeCacheLast;
285                                 db->TimeCacheLast = page;
286                         } else 
287                                 memset( page, 0, TBTMEMPAGEHDRSZ );
288                         page->iscached=1;
289                 }
290         }
291
292         return page;
293 }
294
295 static int
296 TBTReadPage(TBTree *db, u_int32_t pagenumber, TBTMemPage **page) {
297         int rc;
298
299         if ( db->npage && (*page=findInCache(db, pagenumber))!=NULL ) {
300                 db->cachehits++;
301                 if ( *page != db->TimeCacheLast ) {
302                         if ( (*page) == db->TimeCache ) {
303                                 db->TimeCache = (*page)->next;
304                                 db->TimeCache->prev=NULL;
305                         } else { 
306                                 (*page)->prev->next = (*page)->next;
307                                 (*page)->next->prev = (*page)->prev;
308                         }
309                         db->TimeCacheLast->next = (*page);
310                         (*page)->prev = db->TimeCacheLast;
311                         db->TimeCacheLast = *page;
312                         (*page)->next = NULL;
313                 }
314         } else {
315                 off_t  offset = (off_t)pagenumber * (off_t)TBTREEPAGESIZE;
316         
317                 if( (*page = PageAlloc(db))==NULL )
318                         return TBT_ERROR;
319
320                 if ( lseek(db->fd, offset, SEEK_SET) != offset ) {
321                         tlog(TL_CRIT,"TBTReadPage: lseek failed: %s",strerror(errno));
322                         return TBT_ERROR;
323                 }
324                 (*page)->issynced=1;
325                 (*page)->pagenumber=pagenumber;
326                 if ( (rc=read(db->fd, &( (*page)->page ), TBTREEPAGESIZE)) != TBTREEPAGESIZE ) {
327                         tlog(TL_CRIT,"TBTReadPage: read failed: %s %d %d",strerror(errno), pagenumber, offset); 
328                         return TBT_ERROR;
329                 }
330                 if ( (*page)->iscached ) 
331                         insertInCache(db, *page);
332         }
333
334         db->pageread++;
335         return TBT_OK;   
336 }
337
338 static int
339 TBTWritePage(TBTree *db, TBTMemPage *page) {
340         off_t  offset = (off_t)(page->pagenumber) * (off_t)TBTREEPAGESIZE;
341         off_t  size;
342
343         if ( page->issynced )
344                 return TBT_OK;
345
346         if ( lseek(db->fd, offset, SEEK_SET) != offset ) {
347                 tlog(TL_CRIT,"TBTWritePage: lseek failed: %s",strerror(errno));
348                 return TBT_ERROR;
349         }
350
351         if ( (size=write(db->fd, &( page->page ), TBTREEPAGESIZE)) != TBTREEPAGESIZE ) {
352                 tlog(TL_CRIT,"TBTWritePage: write failed: %s (writed only %d bytes)",strerror(errno), size);
353                 return TBT_ERROR;
354         }
355
356         page->issynced=1;
357         db->pagewrite++;
358
359         return TBT_OK;
360 }
361
362 static int
363 TBTNewPage(TBTree *db, TBTMemPage **page) {
364         if ( db->lastpagenumber==0 ) {
365                 off_t  offset;
366                 if ( (offset=lseek(db->fd, 0, SEEK_END)) < 0 ) {
367                         tlog(TL_CRIT,"TBTNewPage: lseek failed: %s",strerror(errno));
368                         return TBT_ERROR;
369                 }
370
371                 if ( offset % TBTREEPAGESIZE != 0 ) {
372                         tlog(TL_CRIT,"TBTNewPage: file corrupted");
373                         return TBT_ERROR;
374                 }
375
376                 db->lastpagenumber = offset/TBTREEPAGESIZE;
377         }
378
379         if( (*page = PageAlloc(db))==NULL )
380                 return TBT_ERROR;
381
382         (*page)->pagenumber = db->lastpagenumber;
383         (*page)->issynced=0;
384         (*page)->islocked=1;
385
386         db->lastpagenumber++;
387
388         memset( &((*page)->page), 0, TBTMEMPAGEHDRSZ);
389         (*page)->page.freespace = TBTREEPAGESIZE-TBTPAGEHDRSZ;
390
391         if ( (*page)->iscached ) 
392                 insertInCache(db, *page);
393         
394         return TBT_OK;
395 }
396
397 static void
398 printValue(u_int32_t len, char *ptr) {
399         putchar('\'');
400         while(len>0) {
401                 putchar(*ptr);
402                 ptr++; len--;
403         }
404         putchar('\'');
405 }
406
407 static void
408 dumpPage(TBTree *db, TBTPage *page, int follow) {
409         u_int32_t i,j;
410
411         for(j=0;j<follow;j++)
412                 putchar(' ');
413         printf("Page: f:%d l:%d r:%d n:%d\n", 
414                 page->freespace,
415                 page->isleaf,
416                 page->rightlink,
417                 page->npointer
418         );      
419         for(i=0;i<page->npointer;i++) {
420                 TBTPointer *ptr= (TBTPointer*)(page->data+db->pointersize*i);
421                 for(j=0;j<follow+2;j++)
422                         putchar(' ');
423                 printf("i:%d(%d) kl:%d ko:%d ",
424                         i, (int)ptr, 
425                         (db->keylen) ? db->keylen : ptr->key.varlen.length,
426                         (db->keylen) ? 0 : ptr->key.varlen.offset
427                 );
428                 if ( db->keylen==0 ) 
429                         printValue(ptr->key.varlen.length, page->data + ptr->key.varlen.offset);
430                 else
431                         printf("'%d'", *(int*)(ptr->key.fixed.key));
432                 if ( page->isleaf ) {
433                         printf(" vl:%d vo:%d ", 
434                                 ptr->pointer.leaf.length,
435                                 ptr->pointer.leaf.offset
436                         );
437                         printValue(ptr->pointer.leaf.length,  page->data + ptr->pointer.leaf.offset );
438                         putchar('\n');
439                 } else {
440                         printf(" page:%d\n", ptr->pointer.internal.link);
441                         if ( follow>=0 )
442                                 dumpTree(db, ptr->pointer.internal.link, follow+4); 
443                 }
444         }
445 }
446
447 void
448 dumpTree(TBTree *db, u_int32_t pagenumber, int follow) {
449         TBTMemPage  *page;
450
451         if ( TBTReadPage(db, pagenumber, &page) ) {
452                 tlog(TL_CRIT,"BEDA %d\n", pagenumber);
453                 return;
454         }
455
456         page->islocked=1;
457         dumpPage(db, &(page->page), follow);
458         page->islocked=0;
459         if ( !page->iscached )
460                 tfree(page);
461 }
462
463 static int
464 findInPage(TBTree *db, TBTPage *page, TBTValue *key, TBTPointer **res) {
465         TBTPointer      *StopLow = (TBTPointer*)(page->data);
466         TBTPointer      *StopHigh = (TBTPointer*)(page->data + db->pointersize*page->npointer);
467         TBTPointer      *StopMiddle;
468         int diff ;
469         TBTValue  A;
470
471         A.length =  db->keylen;
472         /* Loop invariant: StopLow <= StopMiddle < StopHigh */
473         while (StopLow < StopHigh) {
474                 StopMiddle = (TBTPointer*)(((char*)StopLow) + db->pointersize*((((char*)StopHigh - (char*)StopLow)/db->pointersize) >> 1));
475                 if ( db->keylen == 0 ) {
476                         A.length = StopMiddle->key.varlen.length;
477                         A.value = page->data + StopMiddle->key.varlen.offset;
478                 } else 
479                         A.value = StopMiddle->key.fixed.key;
480                 
481         
482                 if ( ISINFPOINTER(db, page, StopMiddle) )
483                         diff = 1;
484                 else 
485                         diff = db->cmpkey( &A, key );
486                 if ( diff == 0 ) {
487                         *res = StopMiddle;
488                         return FOUND;
489                 } else if ( diff < 0 )
490                         StopLow = (TBTPointer*)((char*)StopMiddle + db->pointersize);
491                 else
492                         StopHigh = StopMiddle;
493         }
494
495         *res = StopHigh;
496         return NOTFOUND;
497 }
498
499 static int
500 findPage(TBTree *db, TBTValue *key, TBTMemPage **page) {
501         u_int32_t pagenumber = TBTPAGEROOT;
502         int rc = TBT_OK;
503
504         *page=NULL;
505         while(1) {
506                 TBTPointer *ptr;
507
508                 if ( (rc=TBTReadPage(db, pagenumber, page))!=TBT_OK )
509                         return rc;
510
511                 if ( (*page)->page.isleaf ) 
512                         return TBT_OK;
513
514                 findInPage(db, &((*page)->page), key, &ptr );
515                 pagenumber = ptr->pointer.internal.link;
516                 if ( !(*page)->iscached )
517                         tfree(*page);
518         }
519
520         return TBT_OK;
521 }
522
523 int
524 TBTFind(TBTree *db, TBTValue *key, TBTValue *value) {
525         TBTMemPage      *page;
526         TBTPointer *ptr;
527         int rc;
528
529         memset(value, 0, sizeof(TBTValue));
530
531         if ( (rc=findPage(db, key, &page))!=TBT_OK )
532                 return rc; 
533                 
534         if ( page == NULL )
535                 return TBT_OK;
536
537         if ( findInPage(db, &(page->page), key, &ptr) != FOUND ) {
538                 if ( !(page)->iscached )
539                         tfree(page);
540                 return TBT_OK;
541         } 
542
543         value->length = ptr->pointer.leaf.length;
544         value->value = tmalloc(value->length);
545         memcpy( value->value, page->page.data + ptr->pointer.leaf.offset, value->length );
546
547         if ( !(page)->iscached )
548                 tfree(page);
549
550         return TBT_OK;  
551 }
552
553 static void
554 packLeafKV(TBTree *db, TBTPage *page, TBTPointer *ptr, TBTValue *key, TBTValue *value) {
555         if ( ptr ) {
556                 if ( (char*)ptr - page->data  != page->npointer * db->pointersize ) 
557                         memmove((char*)ptr+db->pointersize, ptr, page->npointer * db->pointersize - ( (char*)ptr - page->data ));
558         } else
559                 ptr = (TBTPointer*)(page->data + page->npointer * db->pointersize);
560
561         page->npointer++;
562         page->freespace -= db->pointersize + PTRALIGN(value->length);
563         if ( db->keylen ) {
564                 memcpy( ptr->key.fixed.key, key->value, db->keylen );
565                 ptr->pointer.leaf.offset = page->npointer * db->pointersize + page->freespace;
566         } else {
567                 page->freespace -= PTRALIGN(key->length);
568                 ptr->key.varlen.length = key->length;
569                 ptr->key.varlen.offset = page->npointer * db->pointersize + page->freespace;
570                 memcpy( page->data + ptr->key.varlen.offset, key->value, key->length);
571                 ptr->pointer.leaf.offset = page->npointer * db->pointersize + page->freespace + PTRALIGN(key->length);
572         } 
573         ptr->pointer.leaf.length = value->length;
574         memcpy( page->data + ptr->pointer.leaf.offset, value->value, value->length );
575 }
576
577 static void
578 packInternalKV(TBTree *db, TBTPage *page, TBTPointer *ptr, TBTValue *key, u_int32_t pagenumber) {
579         if ( ptr ) {
580                 if ( (char*)ptr - page->data  != page->npointer * db->pointersize ) 
581                         memmove((char*)ptr+db->pointersize, ptr, page->npointer * db->pointersize - ( (char*)ptr - page->data ));
582         } else
583                 ptr = (TBTPointer*)(page->data + page->npointer * db->pointersize);
584
585         page->npointer++;
586         page->freespace -= db->pointersize;
587
588         if ( db->keylen ) {
589                 if ( key->value )
590                         memcpy( ptr->key.fixed.key, key->value, db->keylen );
591                 else
592                         memset( ptr->key.fixed.key, 0, db->keylen );
593         } else {
594                 page->freespace -= PTRALIGN(key->length);
595                 ptr->key.varlen.length = key->length;
596                 ptr->key.varlen.offset = page->npointer * db->pointersize + page->freespace;
597                 if ( key->length ) 
598                         memcpy( page->data + ptr->key.varlen.offset, key->value, key->length);
599         }
600         ptr->pointer.internal.link = pagenumber;
601 }
602
603
604 static void
605 deleteKV(TBTree *db, TBTPage *page, TBTPointer **ptrs, u_int32_t num) {
606         static char buf[TBTREEPAGESIZE];
607         char  *bufptr = buf;
608         TBTPointer *ptr = (TBTPointer*)page->data, *tmp, **ptrptr = ptrs;
609         u_int32_t i, start;
610         /* suppose ptrs are ordered! */
611         tmp = ptr;
612         while( ptrptr - ptrs < num && (char*)ptr - page->data < db->pointersize*page->npointer ) {
613                 if ( *ptrptr < ptr ) {
614                         tlog(TL_CRIT,"deleteKV: Something wrong: unexistent *ptrs"); 
615                         ptrptr++;
616                 } else if ( *ptrptr > ptr ) {
617                         if ( tmp != ptr )
618                                 memcpy(tmp, ptr, db->pointersize);
619                         ptr = (TBTPointer*)( (char*)ptr + db->pointersize );
620                         tmp = (TBTPointer*)( (char*)tmp + db->pointersize );
621                 } else {
622                         page->freespace += db->pointersize + 
623                                 ( ( page->isleaf ) ? PTRALIGN(ptr->pointer.leaf.length) : 0 ) +
624                                 ( ( db->keylen ) ? 0 : PTRALIGN(ptr->key.varlen.length) );
625                         ptrptr++;
626                         ptr = (TBTPointer*)( (char*)ptr + db->pointersize );
627                 }
628         }
629
630         if ( (char*)ptr - page->data < db->pointersize*page->npointer && tmp!=ptr )
631                 memmove( tmp, ptr, page->npointer * db->pointersize - ((char*)ptr - page->data) );
632
633         page->npointer-=num;
634
635         tmp = (TBTPointer*)page->data;
636         start = page->npointer * db->pointersize + page->freespace;
637         for(i=0;i<page->npointer;i++) {
638                 if ( page->isleaf ) {
639                         memcpy( bufptr, page->data + tmp->pointer.leaf.offset, tmp->pointer.leaf.length );
640                         tmp->pointer.leaf.offset = start + (bufptr-buf);
641                         bufptr+=PTRALIGN(tmp->pointer.leaf.length);
642                 }
643
644                 if ( db->keylen == 0 ) {
645                         if ( tmp->key.varlen.length )
646                                 memcpy( bufptr, page->data + tmp->key.varlen.offset, tmp->key.varlen.length );
647                         tmp->key.varlen.offset = start + (bufptr-buf);
648                         bufptr+=PTRALIGN(tmp->key.varlen.length);
649                 }
650                 tmp = (TBTPointer*)( (char*)tmp + db->pointersize );
651         }
652
653         memcpy( page->data + start, buf, bufptr-buf );
654 }
655
656 static u_int32_t
657 findDelimiter(TBTree *db, TBTPage *page, TBTPointer *ptr, int size, u_int32_t start, int *where) {
658         int  *sizes = (int*) tmalloc( sizeof(int) * (page->npointer+1) );
659         int i;
660         TBTPointer *tomove;
661         int lfree=TBTREEPAGESIZE-TBTPAGEHDRSZ, rfree=TBTREEPAGESIZE-TBTPAGEHDRSZ;
662         int nptr = ( (char*)ptr - page->data ) / db->pointersize; 
663         int was=0;
664
665         for(i=0; i<page->npointer;i++) {
666                 tomove = (TBTPointer*)(page->data + i*db->pointersize);
667                 if ( was==0 && i==nptr ) {
668                         sizes[i] = size;
669                         was=1;
670                 } 
671                 sizes[i+was] = db->pointersize +
672                         ( (page->isleaf) ? PTRALIGN(tomove->pointer.leaf.length) : 0 ) +
673                         ( ( db->keylen ) ? 0 : PTRALIGN(tomove->key.varlen.length) );
674         
675         }
676         
677         if ( was==0 ) {
678                 sizes[page->npointer]=size;
679         }
680
681         for(i=0;i<page->npointer+1;i++) 
682                 if ( i< start )
683                         lfree-=sizes[i];
684                 else 
685                         rfree-=sizes[i];
686
687         while( 1 ) {
688                 if ( lfree<0 ) {
689                         start--;
690                         lfree+=sizes[start];
691                         rfree-=sizes[start];
692                 } else if ( rfree < 0 ) { 
693                         lfree-=sizes[start];
694                         rfree+=sizes[start];
695                         start++;
696                 } else
697                         break;
698         }
699
700         *where = (nptr < start) ? -1 : 1;
701         if ( start>nptr )
702                 start--;
703
704         tfree(sizes);
705         return start;
706 }
707
708 static void
709 populatePages(TBTree *db, TBTPage *srcpage, TBTPage* newpage, u_int32_t start) {
710         u_int32_t i;
711         TBTPointer *tomove, **todelete = tmalloc( sizeof(TBTPointer*)* srcpage->npointer );
712         u_int32_t numtodelete=0;
713         TBTValue k, v;
714
715         for(i=start;i<srcpage->npointer;i++) {
716                 tomove = (TBTPointer*)(srcpage->data + i*db->pointersize);
717
718                 todelete[numtodelete] = tomove;
719                 numtodelete++;
720
721                 if ( db->keylen ) 
722                         k.value = tomove->key.fixed.key;
723                 else {
724                         k.length = tomove->key.varlen.length;
725                         k.value = srcpage->data + tomove->key.varlen.offset;
726                 } 
727                                 
728                 if ( srcpage->isleaf ) {
729                         v.length = tomove->pointer.leaf.length;
730                         v.value = srcpage->data + tomove->pointer.leaf.offset;
731                         packLeafKV(db, newpage, NULL, &k, &v);
732                 } else 
733                         packInternalKV(db, newpage, NULL, &k, tomove->pointer.internal.link);
734         }
735
736         deleteKV(db, srcpage, todelete, numtodelete);   
737         tfree(todelete);
738 }
739
740 static int
741 splitPage(TBTree *db, TBTMemPage *srcpage, TBTMemPage** newpage, TBTPointer **ptr, int size, TBTValue *key, TBTValue *value, u_int32_t pagenumber) {
742         TBTMemPage *tmp;
743         int start=0, where=0;
744         int nptr = ( (char*)(*ptr) - srcpage->page.data ) / db->pointersize;
745
746         if ( TBTNewPage(db, newpage) )
747                 return TBT_ERROR;
748
749         tmp = *newpage;
750         srcpage->issynced=tmp->issynced=0;
751         tmp->islocked=1;
752         tmp->page.rightlink = srcpage->page.rightlink;
753         srcpage->page.rightlink = tmp->pagenumber;
754         tmp->page.isleaf = srcpage->page.isleaf;
755
756         switch(db->strategy) {
757                 case TBT_STATEGY_HALF:
758                         start = findDelimiter(db, &(srcpage->page), *ptr, size, (srcpage->page.npointer+1)/2, &where);  
759                         break;
760                 case TBT_STATEGY_LEFTFILL:
761                         start = findDelimiter(db, &(srcpage->page), *ptr, size, srcpage->page.npointer, &where);        
762                         break;
763                 case TBT_STATEGY_RIGHTFILL:
764                         start = findDelimiter(db, &(srcpage->page), *ptr, size, 0, &where);     
765                         break;
766                 default:
767                         tlog(TL_CRIT,"splitPage: unknown strategy: %d", db->strategy);
768                         return TBT_ERROR;
769         }
770
771         populatePages(db, &(srcpage->page), &(tmp->page), start);
772         if ( where<0 )
773                 tmp=srcpage;
774         else
775                 nptr-=start;
776         *ptr = (TBTPointer*)(tmp->page.data + nptr*db->pointersize);
777
778         if ( tmp->page.isleaf ) 
779                 packLeafKV(db, &(tmp->page), *ptr, key, value);
780         else
781                 packInternalKV(db, &(tmp->page), *ptr, key, pagenumber);
782         return TBT_OK;
783 }
784
785 static int
786 savePage(TBTree *db, TBTMemPage *page) {
787         int rc;
788         page->islocked=0;
789
790         if ( !page->iscached ) {
791                 if (  page->issynced==0 && (rc=TBTWritePage(db, page)) != TBT_OK )
792                         return rc;
793                 tfree(page);
794         }
795         return TBT_OK;
796 }
797
798 static int
799 layerInsert(TBTree *db, u_int32_t pagenumber, TBTMemPage **left, TBTMemPage **right, TBTValue *key, TBTValue *value) {
800         int rc,fnd;
801         TBTPointer      *ptr=NULL;
802         TBTMemPage      *page=NULL, *newright=NULL;
803                         
804         if ( (rc=TBTReadPage(db, pagenumber, &page))!=TBT_OK )
805                 return rc;
806
807         page->islocked=1;
808
809         fnd = findInPage(db, &(page->page), key, &ptr );
810
811         *left=*right=NULL;
812         if ( page->page.isleaf ) {
813                 u_int32_t size = db->pointersize +
814                         PTRALIGN(value->length) +
815                         ( ( db->keylen ) ? 0 : PTRALIGN(key->length) );
816                 
817                 if ( size>(TBTREEPAGESIZE-TBTPAGEHDRSZ)/2 ) {
818                         tlog(TL_ALARM,"Huge size: %d bytes", size);
819                         return TBT_HUGE;
820                 }
821
822                 if (fnd==FOUND) {
823                         deleteKV(db, &(page->page), &ptr, 1);
824                         page->issynced=0;
825                 }
826
827                 if ( size <= page->page.freespace ) {
828                         u_int32_t       oldsize=page->page.freespace;
829                         packLeafKV(db, &(page->page), ptr, key, value);
830                         tassert( oldsize == page->page.freespace + size );
831                         page->issynced=0;
832                 } else {
833                         rc = splitPage(db, page, &newright, &ptr, size, key, value, 0);
834                         if ( rc != TBT_OK ) return rc;
835
836                         page->issynced = newright->issynced = 0;
837                         page->islocked = newright->islocked = 1;
838                 }
839         } else {
840                 if ( (rc=layerInsert(db, ptr->pointer.internal.link, left, right, key, value))!=TBT_OK ) {
841                         page->islocked=0;
842                         return rc;
843                 }
844
845                 if ( *left ) { /* child page was splited */
846                         u_int32_t size;
847                         TBTPointer *rightptr = (TBTPointer*)( (*left)->page.data + ((*left)->page.npointer-1) * db->pointersize );
848                         TBTValue K;
849
850                         if ( ISINFPOINTER(db, &(page->page), ptr) ) {
851                                 deleteKV(db, &(page->page), &ptr, 1);
852                                 K.length = 0; K.value = NULL;
853                                 packInternalKV(db, &(page->page), ptr, &K, (*right)->pagenumber);
854                         } else 
855                                 ptr->pointer.internal.link=(*right)->pagenumber;
856
857                         K.length = ( db->keylen ) ? db->keylen : rightptr->key.varlen.length;
858                         K.value  = ( db->keylen ) ? rightptr->key.fixed.key : (*left)->page.data + rightptr->key.varlen.offset;
859
860                         size = db->pointersize + ( ( db->keylen ) ? 0 : PTRALIGN(K.length) );
861
862                         if ( size <= page->page.freespace ) {
863                                 packInternalKV(db, &(page->page), ptr, &K, (*left)->pagenumber);
864                                 page->issynced=0;
865                         } else {
866                                 rc = splitPage(db, page, &newright, &ptr, size, &K, NULL, (*left)->pagenumber);
867                                 if ( rc != TBT_OK ) return rc;
868
869                                 page->issynced = newright->issynced = 0;
870                                 page->islocked = newright->islocked = 1;
871                         }
872                 }
873         }
874
875         if ( *left ) {
876                 if ( (rc=savePage(db, *left))!=TBT_OK )
877                         return rc;
878                 if ( (rc=savePage(db, *right))!=TBT_OK )
879                         return rc;
880                 *left=*right=NULL;
881         }
882
883         if ( newright ) {
884                 if ( page->pagenumber==TBTPAGEROOT ) { /* root was splited */
885                         TBTMemPage  *newleft;
886                         TBTValue K;
887                 
888
889                         if  ( (rc=TBTNewPage(db,&newleft))!=TBT_OK )
890                                 return rc;
891                         memcpy( &(newleft->page) , &(page->page), TBTREEPAGESIZE);
892                         memset( &(page->page), 0, TBTREEPAGESIZE);
893                         page->page.freespace=TBTREEPAGESIZE-TBTPAGEHDRSZ;
894         
895                         ptr = (TBTPointer*)( newleft->page.data + (newleft->page.npointer-1) * db->pointersize ); 
896                         K.length = ( db->keylen ) ? db->keylen : ptr->key.varlen.length;
897                         K.value  = ( db->keylen ) ? ptr->key.fixed.key : newleft->page.data + ptr->key.varlen.offset;
898                         packInternalKV(db, &(page->page), NULL, &K, newleft->pagenumber);
899                 
900                         ptr = (TBTPointer*)( newright->page.data + (newright->page.npointer-1) * db->pointersize ); 
901                         K.length = 0;
902                         K.value  = NULL;
903                         packInternalKV(db, &(page->page), NULL, &K, newright->pagenumber);
904
905                         if ( (rc=savePage(db, newright))!=TBT_OK )
906                                 return rc;
907                         if ( (rc=savePage(db, newleft))!=TBT_OK )
908                                 return rc;
909                         if ( (rc=savePage(db, page))!=TBT_OK )
910                                 return rc;
911                 } else {
912                         *left = page;
913                         *right = newright;
914                 }
915         } else if ( (rc=savePage(db, page))!=TBT_OK )
916                 return rc;
917         
918
919         return TBT_OK;
920
921
922 int
923 TBTInsert(TBTree *db, TBTValue *key, TBTValue *value) {
924         int rc;
925         TBTMemPage *lpage=NULL, *rpage=NULL;
926
927         rc = layerInsert(db, TBTPAGEROOT, &lpage, &rpage, key, value);
928
929         tassert( lpage==NULL && rpage==NULL );
930
931         return rc;
932 }
933
934 int
935 TBTDelete(TBTree *db, TBTValue *key) {
936         TBTMemPage      *page;
937         TBTPointer *ptr;
938         int rc;
939
940         if ( (rc=findPage(db, key, &page))!=TBT_OK )
941                 return rc; 
942                 
943         if ( page == NULL )
944                 return TBT_OK;
945
946         if ( findInPage(db, &(page->page), key, &ptr) != FOUND ) {
947                 if ( !(page)->iscached )
948                         tfree(page);
949                 return TBT_OK;
950         } 
951
952         deleteKV(db, &(page->page), &ptr, 1);
953         page->issynced=0;
954
955         rc=savePage(db, page);
956
957         return rc;      
958 }
959
960 int 
961 TBTInitIterator(TBTree *db, TBTIterator *iterator) {
962         TBTMemPage      *page=NULL;
963         TBTPointer *ptr;
964         int rc;
965         u_int32_t pagenum=TBTPAGEROOT;
966
967         memset(iterator, 0, sizeof(TBTIterator));
968
969         do {
970                 if ( page && !page->iscached )
971                         tfree(page);
972                 if ( (rc=TBTReadPage(db, pagenum, &page))!=TBT_OK )
973                         return rc;
974
975                 ptr = (TBTPointer*)( page->page.data );
976                 pagenum = ptr->pointer.internal.link;
977         } while( !page->page.isleaf );
978
979         page->islocked=1;
980         iterator->page = page;  
981         iterator->ptr = ptr;    
982
983         return TBT_OK;
984 }
985
986 void 
987 TBTFreeIterator(TBTree *db, TBTIterator *iterator) {
988         
989         if ( iterator->page ) {
990                 iterator->page->islocked=0;
991                 if ( !iterator->page->iscached )
992                         tfree(iterator->page);
993         }
994
995         memset(iterator, 0, sizeof(TBTIterator));
996 }
997
998 int 
999 TBTIterate(TBTree *db, TBTIterator *iterator, TBTValue *key, TBTValue *value ) {
1000         int rc;
1001
1002         if (  iterator->page==NULL ) {
1003                 memset(key, 0, sizeof(TBTValue));
1004                 memset(value, 0, sizeof(TBTValue));
1005                 return TBT_OK;
1006         }
1007
1008         if ( (char*)(iterator->ptr) - iterator->page->page.data >= db->pointersize * iterator->page->page.npointer ) {
1009                 do {
1010                         u_int32_t pagenumber = iterator->page->page.rightlink;
1011                         iterator->page->islocked=0;
1012                         if ( !iterator->page->iscached )
1013                                 tfree(iterator->page);
1014                         iterator->page=NULL;
1015
1016                         if ( pagenumber==TBTPAGEROOT ) {
1017                                 memset(key, 0, sizeof(TBTValue));
1018                                 memset(value, 0, sizeof(TBTValue));
1019                                 return TBT_OK;
1020                         }
1021                         if ( (rc=TBTReadPage(db, pagenumber, &(iterator->page)))!=TBT_OK )
1022                                 return rc;
1023                 } while ( iterator->page->page.npointer == 0 );
1024
1025                 iterator->page->islocked=1;
1026                 iterator->ptr = (TBTPointer*)( iterator->page->page.data );
1027         }
1028
1029         if ( db->keylen ) {
1030                 key->length = db->keylen;
1031                 key->value = iterator->ptr->key.fixed.key;
1032         } else {
1033                 key->length = iterator->ptr->key.varlen.length; 
1034                 key->value = iterator->page->page.data + iterator->ptr->key.varlen.offset;
1035         }
1036
1037         value->length = iterator->ptr->pointer.leaf.length; 
1038         value->value = iterator->page->page.data + iterator->ptr->pointer.leaf.offset;
1039
1040         iterator->ptr = ( TBTPointer* ) (  (char*)(iterator->ptr) + db->pointersize );
1041
1042         return TBT_OK;
1043 }
1044
1045