8 #include <netinet/in.h>
16 #ifdef HAVE_SYS_POLL_H
19 #error Not defined HAVE_POLL_H or HAVE_SYS_POLL_H
20 #endif /* HAVE_SYS_POLL_H */
21 #endif /* HAVE_POLL */
27 typedef enum SocketState {
35 typedef enum ResStatus {
40 typedef struct ftsPG {
44 PQnoticeReceiver origreceiver;
55 NoticeReceiver(void *arg, const PGresult *res) {
56 ftsPG *db = (ftsPG*) arg;
60 if ( strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "00000") == 0 ) {
61 char *msg = PQresultErrorMessage(res);
63 if ( strstr(msg, "does not exist, skipping")!=NULL ) {
64 /* skip 'NOTICE: table "wow" does not exist, skipping' */
67 if ( strstr(msg, "query contains only stopword(s) or doesn't contain lexeme(s), ignored")!=NULL ) {
73 db->origreceiver(db, res);
80 flushres=PQflush(db->conn);
84 } else if ( flushres == 0 ) {
85 /* success write, waits for read */
88 fprintf(stderr, "PQflush failed: %s", PQerrorMessage(db->conn));
94 checkStatus(ftsPG* db) {
100 return RS_INPROGRESS;
102 PQconsumeInput(db->conn);
103 if ( PQisBusy(db->conn) != 0 ) {
105 return RS_INPROGRESS;
110 fprintf(stderr,"Should not be here!\n");
119 waitResult(ftsPG *db) {
125 pfd.events = pfd.revents = 0;
126 if ( db->state == SS_READ ) {
128 } else if ( db->state == SS_WRITE ) {
129 pfd.events = POLLOUT;
133 int ret = poll( &pfd, 1, INFTIM);
135 fprintf(stderr,"poll failed: %s", strerror(errno));
139 if ( pfd.revents & (POLLHUP | POLLNVAL | POLLERR) ) {
140 fprintf(stderr,"Poll report about socket error\n");
142 } else if ( pfd.revents & POLLIN ) {
143 db->state = SS_READYREAD;
144 } else if ( pfd.revents & POLLOUT ) {
145 db->state = SS_READYWRITE;
148 } while( checkStatus(db) != RS_OK );
150 while ( (res = PQgetResult(db->conn))!= NULL ) {
151 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
152 fprintf(stderr, "Execution of prepared statement failed: %s", PQerrorMessage(db->conn));
160 checkEmptyQuery(ftsPG *db, PGresult *res) {
162 db->emptyquery > 0 &&
163 PQresultStatus(res) == PGRES_FATAL_ERROR &&
164 strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "0A000") == 0 && /* FEATURE_NOT_SUPPORTED */
165 strstr(PQresultErrorField(res, PG_DIAG_SOURCE_FILE), "ginscan.c") != NULL
173 execQuery(ftsDB* adb, char ** words, int flags) {
174 ftsPG *db = (ftsPG*)adb;
175 const char *paramValues[1];
179 if ( db->prepared == 0 ) {
185 if ( flags & FLG_FUNC )
186 sprintf(buf, "SELECT count(*) FROM ftsbench WHERE to_tsvector(body) @@ to_tsquery( $1 ::text );");
188 sprintf(buf, "SELECT count(*) FROM ftsbench WHERE fts @@ to_tsquery( $1 ::text );");
190 res = PQprepare( db->conn, "search_ftsbench", buf, 1, NULL );
192 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
193 fprintf(stderr, "PREPARE SELECT command failed: %s", PQerrorMessage(db->conn));
207 sb_add(&db->b, (db->flags & FLG_OR) ? " | '" : " & '", 4);
209 sb_add(&db->b, "'", 1);
213 sb_add(&db->b, "'", 1);
214 sb_add(&db->b, ptr, 1);
217 sb_add(&db->b, "'", 1);
223 paramValues[0] = db->b.str;
225 res = PQexecPrepared( db->conn, "search_ftsbench",
229 if (PQresultStatus(res) != PGRES_TUPLES_OK) {
230 /* skip error ' all words are a stop word' for GIN index -
231 result is empty, in any case */
232 if ( checkEmptyQuery(db, res) == 0 ) {
233 fprintf(stderr, "Execution of prepared statement failed: %s\n", PQerrorMessage(db->conn));
240 pthread_mutex_lock(&(db->db.nqueryMutex));
242 pthread_mutex_unlock(&(db->db.nqueryMutex));
246 startCreateScheme(ftsDB* adb, int flags) {
247 ftsPG *db = (ftsPG*)adb;
253 res = PQexec(db->conn, "DROP TABLE IF EXISTS ftsbench CASCADE;");
254 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
255 fprintf(stderr, "DROP TABLE command failed: %s", PQerrorMessage(db->conn));
260 if ( flags & FLG_FUNC )
261 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text );");
263 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text, fts tsvector ); "
264 "CREATE TRIGGER tsvectorupdate BEFORE UPDATE OR INSERT ON ftsbench "
265 "FOR EACH ROW EXECUTE PROCEDURE tsearch2(fts, body);" );
267 res = PQexec(db->conn, buf);
268 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
269 fprintf(stderr, "CREATE TABLE command failed: %s", PQerrorMessage(db->conn));
274 res = PQexec(db->conn, "BEGIN;");
275 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
276 fprintf(stderr, "CREATE TABLE command failed: %s", PQerrorMessage(db->conn));
285 finishCreateScheme(ftsDB* adb) {
286 ftsPG *db = (ftsPG*)adb;
289 if ( db->db.nquery > 0 ) {
292 if ( PQsetnonblocking(db->conn, 0) != 0 ) {
293 fprintf(stderr, "PQsetnonblocking command failed: %s", PQerrorMessage(db->conn));
298 res = PQexec(db->conn, "COMMIT;");
299 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
300 fprintf(stderr, "CREATE TABLE command failed: %s", PQerrorMessage(db->conn));
305 if ( (db->flags & (FLG_GIST | FLG_GIN)) != 0 ) {
308 if ( db->flags & FLG_FUNC )
309 sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( to_tsvector(body) );",
310 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
312 sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( fts );",
313 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
315 printf("(create index, ");
318 res = PQexec(db->conn, buf);
319 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
320 fprintf(stderr, "CREATE INDEX command failed: %s", PQerrorMessage(db->conn));
332 res = PQexec(db->conn, "VACUUM ANALYZE ftsbench;");
333 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
334 fprintf(stderr, "VACUUM ANALYZE command failed: %s", PQerrorMessage(db->conn));
346 InsertRow(ftsDB* adb, int id, char *txt) {
347 ftsPG *db = (ftsPG*)adb;
349 const char *paramValues[2];
350 uint32_t binaryIntVal;
351 int paramLengths[] = {sizeof(binaryIntVal), 0};
352 int paramFormats[] = {1, 0};
354 if ( db->db.nquery == 0 ) {
357 res = PQprepare( db->conn, "insert_ftsbench",
358 "INSERT INTO ftsbench (id, body) VALUES ( $1 ::int4, $2 ::text);",
361 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
362 fprintf(stderr, "PREPARE INSERT command failed: %s", PQerrorMessage(db->conn));
367 if ( PQsetnonblocking(db->conn, 1) != 0 ) {
368 fprintf(stderr, "PQsetnonblocking command failed: %s", PQerrorMessage(db->conn));
375 binaryIntVal = htonl((uint32_t) id);
376 paramValues[0] = (char*)&binaryIntVal;
377 paramValues[1] = txt;
379 if ( PQsendQueryPrepared( db->conn, "insert_ftsbench",
381 paramLengths, paramFormats, 0) == 0 ) {
382 fprintf(stderr, "PQsendQueryPrepared failed: %s", PQerrorMessage(db->conn));
392 PGInit(char * connstr) {
393 ftsPG *db = (ftsPG*)malloc(sizeof(ftsPG));
396 memset(db,0,sizeof(ftsPG));
398 sprintf(conninfo, "dbname=%s", connstr);
399 db->conn = PQconnectdb(conninfo);
401 if (PQstatus(db->conn) != CONNECTION_OK) {
402 fprintf(stderr, "Connection to database failed: %s", PQerrorMessage(db->conn));
406 db->origreceiver = PQsetNoticeReceiver(db->conn, NoticeReceiver, (void *)db);
408 db->db.execQuery = execQuery;
409 db->db.startCreateScheme = startCreateScheme;
410 db->db.finishCreateScheme = finishCreateScheme;
411 db->db.InsertRow = InsertRow;
412 db->socket = PQsocket(db->conn);
413 if ( db->socket < 0 ) {
414 fprintf(stderr,"Socket error\n");