Fix test's failure due to fast update of GIN index
[gevel.git] / gevel.c
diff --git a/gevel.c b/gevel.c
index 4c19212..16e18d3 100644 (file)
--- a/gevel.c
+++ b/gevel.c
@@ -1,6 +1,7 @@
 #include "postgres.h"
 
 #include "access/genam.h"
+#include "access/gin.h"
 #include "access/gist.h"
 #include "access/gist_private.h"
 #include "access/gistscan.h"
 #include "miscadmin.h"
 #include "storage/lmgr.h"
 #include "catalog/namespace.h"
+#if PG_VERSION_NUM >= 80300
+#include <tsearch/ts_utils.h>
+#endif
+#include <utils/tqual.h>
 #include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/datum.h"
+#include "utils/fmgroids.h"
 #include <fmgr.h>
 #include <funcapi.h>
 #include <access/heapam.h>
 #include <catalog/pg_type.h>
+#include <access/relscan.h>
+
 
 #define PAGESIZE       (BLCKSZ - MAXALIGN(sizeof(PageHeaderData) + sizeof(ItemIdData)))
 
@@ -36,6 +46,7 @@ typedef struct {
        int     len;    
 } IdxInfo;
 
+static Relation checkOpenedRelation(Relation r, Oid PgAmOid);
 
 #ifdef PG_MODULE_MAGIC
 /* >= 8.2 */ 
@@ -45,11 +56,21 @@ PG_MODULE_MAGIC;
 static Relation
 gist_index_open(RangeVar *relvar) {
        Oid relOid = RangeVarGetRelid(relvar, false);
-       return index_open(relOid, AccessExclusiveLock);
+       return checkOpenedRelation(
+                               index_open(relOid, AccessExclusiveLock), GIST_AM_OID);
 }
 
 #define        gist_index_close(r)     index_close((r), AccessExclusiveLock)
 
+static Relation
+gin_index_open(RangeVar *relvar) {
+       Oid relOid = RangeVarGetRelid(relvar, false);
+       return checkOpenedRelation(
+                               index_open(relOid, AccessShareLock), GIN_AM_OID);
+}
+
+#define gin_index_close(r) index_close((r), AccessShareLock)
+
 #else /* <8.2 */
 
 static Relation
@@ -57,7 +78,7 @@ gist_index_open(RangeVar *relvar) {
        Relation rel = index_openrv(relvar);
 
        LockRelation(rel, AccessExclusiveLock);
-       return rel;
+       return checkOpenedRelation(rel, GIST_AM_OID);
 }
 
 static void
@@ -66,6 +87,20 @@ gist_index_close(Relation rel) {
        index_close(rel);
 }
 
+static Relation
+gin_index_open(RangeVar *relvar) {
+       Relation rel = index_openrv(relvar);
+
+       LockRelation(rel, AccessShareLock);
+       return checkOpenedRelation(rel, GIN_AM_OID);
+}
+
+static void
+gin_index_close(Relation rel) {
+       UnlockRelation(rel, AccessShareLock);
+       index_close(rel);
+}
+
 #endif
 
 #if PG_VERSION_NUM >= 80300
@@ -76,6 +111,23 @@ gist_index_close(Relation rel) {
 #define SET_VARSIZE(p,l)       VARATT_SIZEP(p)=(l)
 #endif
 
+static Relation 
+checkOpenedRelation(Relation r, Oid PgAmOid) {
+       if ( r->rd_am == NULL )
+               elog(ERROR, "Relation %s.%s is not an index",
+                                       get_namespace_name(RelationGetNamespace(r)),
+                                       RelationGetRelationName(r)
+                       );
+
+       if ( r->rd_rel->relam != PgAmOid )
+               elog(ERROR, "Index %s.%s has wrong type",
+                                       get_namespace_name(RelationGetNamespace(r)),
+                                       RelationGetRelationName(r)
+                       );
+       
+       return r;
+}
+
 static void
 gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff, IdxInfo *info) {
        Buffer          buffer;
@@ -434,3 +486,314 @@ gist_print(PG_FUNCTION_ARGS) {
        SRF_RETURN_NEXT(funcctx, result);
 }
 
+typedef struct GinStatState {
+       Relation                index;
+       GinState                ginstate;
+       OffsetNumber    attnum;
+
+       Buffer                  buffer;
+       OffsetNumber    offset;
+       Datum                   curval;
+       Datum                   dvalues[2];
+       char                    nulls[2];
+} GinStatState;
+
+static bool
+moveRightIfItNeeded( GinStatState *st )
+{
+       Page page = BufferGetPage(st->buffer);
+
+       if ( st->offset > PageGetMaxOffsetNumber(page) ) {
+               /*
+               * We scaned the whole page, so we should take right page
+               */
+               BlockNumber blkno = GinPageGetOpaque(page)->rightlink;               
+
+               if ( GinPageRightMost(page) )
+                       return false;  /* no more page */
+
+               LockBuffer(st->buffer, GIN_UNLOCK);
+               st->buffer = ReleaseAndReadBuffer(st->buffer, st->index, blkno);
+               LockBuffer(st->buffer, GIN_SHARE);
+               st->offset = FirstOffsetNumber;
+       }
+
+       return true;
+}
+
+/*
+ * Refinds a previois position, at returns it has correctly 
+ * set offset and buffer is locked
+ */
+static bool
+refindPosition(GinStatState *st)
+{
+       Page    page;        
+
+       /* find left if needed (it causes only for first search) */
+       for (;;) {
+               IndexTuple  itup;
+               BlockNumber blkno;
+
+               LockBuffer(st->buffer, GIN_SHARE);
+
+               page = BufferGetPage(st->buffer);
+               if (GinPageIsLeaf(page))
+                       break;
+
+               itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, FirstOffsetNumber));
+               blkno = GinItemPointerGetBlockNumber(&(itup)->t_tid);
+
+               LockBuffer(st->buffer,GIN_UNLOCK);
+               st->buffer = ReleaseAndReadBuffer(st->buffer, st->index, blkno);
+       }
+
+       if (st->offset == InvalidOffsetNumber) {
+               return (PageGetMaxOffsetNumber(page) >= FirstOffsetNumber ) ? true : false; /* first one */
+       }
+
+       for(;;) {
+               int     cmp;
+#if PG_VERSION_NUM < 80400
+               bool    isnull;
+#endif
+               Datum   datum;
+               IndexTuple itup;
+
+               if (moveRightIfItNeeded(st)==false)
+                       return false;
+
+               itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, st->offset));
+#if PG_VERSION_NUM >= 80400
+               datum = gin_index_getattr(&st->ginstate, itup);
+
+               cmp = compareAttEntries(&st->ginstate,
+                                                                       st->attnum + 1, st->curval,
+                                                                       gintuple_get_attrnum(&st->ginstate, itup), datum);
+#else
+               datum = index_getattr(itup, FirstOffsetNumber, st->ginstate.tupdesc, &isnull);
+
+               cmp = DatumGetInt32(
+                               FunctionCall2(
+                                               &st->ginstate.compareFn,
+                                               st->curval,
+                                               datum
+                                       ));
+#endif
+               if ( cmp == 0 )
+               {
+                       if ( !st->index->rd_att->attrs[st->attnum]->attbyval )
+                               pfree( (void*) st->curval );
+                       return true;
+               }
+
+               st->offset++;
+       }
+
+       return false;
+}
+
+static void
+gin_setup_firstcall(FuncCallContext  *funcctx, text *name, int attnum) {
+       MemoryContext     oldcontext;
+       GinStatState     *st;
+       char *relname=t2c(name);
+       TupleDesc            tupdesc;
+
+       oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+       st=(GinStatState*)palloc( sizeof(GinStatState) );
+       memset(st,0,sizeof(GinStatState));
+       st->index = gin_index_open(
+                makeRangeVarFromNameList(stringToQualifiedNameList(relname, "gin_stat")));
+       initGinState( &st->ginstate, st->index );
+
+#if PG_VERSION_NUM >= 80400
+       if (attnum < 0 || attnum >= st->index->rd_att->natts)
+               elog(ERROR,"Wrong column's number");
+       st->attnum = attnum;
+#else
+       st->attnum = 0;
+#endif
+
+       funcctx->user_fctx = (void*)st;
+
+       tupdesc = CreateTemplateTupleDesc(2, false);
+       TupleDescInitEntry(tupdesc, 1, "value", 
+                       st->index->rd_att->attrs[st->attnum]->atttypid, 
+                       st->index->rd_att->attrs[st->attnum]->atttypmod,
+                       st->index->rd_att->attrs[st->attnum]->attndims);
+       TupleDescInitEntry(tupdesc, 2, "nrow", INT4OID, -1, 0);
+
+       memset( st->nulls, ' ', 2*sizeof(char) );
+
+       funcctx->slot = TupleDescGetSlot(tupdesc);
+       funcctx->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+
+       MemoryContextSwitchTo(oldcontext);
+       pfree(relname);
+
+       st->offset = InvalidOffsetNumber;
+       st->buffer = ReadBuffer( st->index, GIN_ROOT_BLKNO );
+}
+
+static void
+processTuple( FuncCallContext  *funcctx,  GinStatState *st, IndexTuple itup ) {
+       MemoryContext           oldcontext;
+#if PG_VERSION_NUM < 80400
+       bool                            isnull;
+#endif
+
+       oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+       st->curval = datumCopy(
+#if PG_VERSION_NUM >= 80400
+                                       gin_index_getattr(&st->ginstate, itup),
+#else
+                                       index_getattr(itup, FirstOffsetNumber, st->ginstate.tupdesc, &isnull),
+#endif
+                                       st->index->rd_att->attrs[st->attnum]->attbyval,
+                                       st->index->rd_att->attrs[st->attnum]->attlen );
+       MemoryContextSwitchTo(oldcontext);
+
+       st->dvalues[0] = st->curval;
+
+       if ( GinIsPostingTree(itup) ) {
+               BlockNumber     rootblkno = GinGetPostingTree(itup);
+               GinPostingTreeScan *gdi;
+               Buffer          entrybuffer;              
+               Page        page;
+
+               LockBuffer(st->buffer, GIN_UNLOCK);
+               gdi = prepareScanPostingTree(st->index, rootblkno, TRUE);
+               entrybuffer = scanBeginPostingTree(gdi);
+
+               page = BufferGetPage(entrybuffer);
+               st->dvalues[1] = Int32GetDatum( gdi->stack->predictNumber * GinPageGetOpaque(page)->maxoff );
+
+               LockBuffer(entrybuffer, GIN_UNLOCK);
+               freeGinBtreeStack(gdi->stack);
+               pfree(gdi);
+       } else {
+               st->dvalues[1] = Int32GetDatum( GinGetNPosting(itup) );
+               LockBuffer(st->buffer, GIN_UNLOCK);
+       }
+}
+
+PG_FUNCTION_INFO_V1(gin_stat);
+Datum  gin_stat(PG_FUNCTION_ARGS);
+Datum
+gin_stat(PG_FUNCTION_ARGS) {
+       FuncCallContext  *funcctx;
+       GinStatState     *st;
+       Datum result=(Datum)0;
+       IndexTuple      ituple;
+       HeapTuple       htuple;
+       Page page;
+
+       if (SRF_IS_FIRSTCALL()) {
+               text    *name=PG_GETARG_TEXT_P(0);
+               funcctx = SRF_FIRSTCALL_INIT();
+               gin_setup_firstcall(funcctx, name, (PG_NARGS()==2) ? PG_GETARG_INT32(1) : 0 );
+               PG_FREE_IF_COPY(name,0);
+       }
+
+       funcctx = SRF_PERCALL_SETUP();  
+       st = (GinStatState*)(funcctx->user_fctx);
+
+       if ( refindPosition(st) == false ) {
+               UnlockReleaseBuffer( st->buffer );
+               gin_index_close(st->index);
+
+               SRF_RETURN_DONE(funcctx);
+       }
+
+       for(;;) {
+               st->offset++;
+       
+               if (moveRightIfItNeeded(st)==false) { 
+                       UnlockReleaseBuffer( st->buffer );
+                       gin_index_close(st->index);
+
+                       SRF_RETURN_DONE(funcctx);
+               }
+
+               page = BufferGetPage(st->buffer);
+               ituple = (IndexTuple) PageGetItem(page, PageGetItemId(page, st->offset)); 
+
+#if PG_VERSION_NUM >= 80400
+               if (st->attnum + 1 == gintuple_get_attrnum(&st->ginstate, ituple))
+#endif
+                       break;
+       }
+
+       processTuple( funcctx,  st, ituple );
+       
+       htuple = heap_formtuple(funcctx->attinmeta->tupdesc, st->dvalues, st->nulls);
+       result = TupleGetDatum(funcctx->slot, htuple);
+
+       SRF_RETURN_NEXT(funcctx, result);
+}
+
+PG_FUNCTION_INFO_V1(gin_count_estimate);
+Datum gin_count_estimate(PG_FUNCTION_ARGS);
+#if PG_VERSION_NUM >= 80300
+Datum
+gin_count_estimate(PG_FUNCTION_ARGS) {
+       text                    *name=PG_GETARG_TEXT_P(0);
+       Relation                index;
+       IndexScanDesc   scan;
+       int64                   count = 0;
+       char                    *relname=t2c(name);
+       ScanKeyData             key;
+#if PG_VERSION_NUM >= 80400
+       TIDBitmap               *bitmap = tbm_create(work_mem * 1024L);
+#else
+#define        MAXTIDS         1024
+       ItemPointerData tids[MAXTIDS];
+       int32                   returned_tids;
+       bool                    more;
+#endif
+
+       index = gin_index_open(
+                makeRangeVarFromNameList(stringToQualifiedNameList(relname, "gin_count_estimate")));
+
+       if ( index->rd_opcintype[0] != TSVECTOROID ) {
+               gin_index_close(index);
+               elog(ERROR, "Column type is not a tsvector");
+       }
+
+       key.sk_flags    = 0;
+       key.sk_attno    = 1;
+       key.sk_strategy = TSearchStrategyNumber;
+       key.sk_subtype  = 0;
+       key.sk_argument = PG_GETARG_DATUM(1);
+
+       fmgr_info( F_TS_MATCH_VQ , &key.sk_func );
+
+#if PG_VERSION_NUM >= 80400
+       scan = index_beginscan_bitmap(index, SnapshotNow, 1, &key);
+
+       count = index_getbitmap(scan, bitmap);
+       tbm_free(bitmap);
+#else
+       scan = index_beginscan_multi(index, SnapshotNow, 1, &key);
+
+       do {
+               more = index_getmulti(scan, tids, MAXTIDS, &returned_tids);
+               count += returned_tids;
+       } while(more);
+#endif
+
+       index_endscan( scan );
+       gin_index_close(index);
+
+       PG_RETURN_INT64(count);
+}
+#else
+Datum
+gin_count_estimate(PG_FUNCTION_ARGS) {
+       elog(NOTICE, "Function is not working under PgSQL < 8.3");
+
+       PG_RETURN_INT64(0);
+}
+#endif