summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
2405d79)
Use network byteorder for messages
Use standart message type for tcp connection.
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
+ * notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
* 3. Neither the name of the author nor the names of any co-contributors
* 3. Neither the name of the author nor the names of any co-contributors
- * may be used to endorse or promote products derived from this software
- * without specific prior written permission.
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
*
* THIS SOFTWARE IS PROVIDED BY CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define CS_OK 0
#define CS_INPROCESS 1
#define CS_CONNECTED 2
#define CS_OK 0
#define CS_INPROCESS 1
#define CS_CONNECTED 2
-#define CS_READ 3
-#define CS_SEND 4
-#define CS_WAIT 5
-#define CS_ERROR 6
+#define CS_READ 3
+#define CS_SEND 4
+#define CS_WAIT 5
+#define CS_ERROR 6
#define CS_FINISHSEND 7
#define CS_FINISHREAD 8
#define CS_TIMEOUT 9
#define CS_FINISHSEND 7
#define CS_FINISHREAD 8
#define CS_TIMEOUT 9
#define READ_INCRIMENT_BUFSIZ 1024
#define READ_INCRIMENT_BUFSIZ 1024
+/*
+ * any message shoulbe compatible with TCMsg struct
+ */
+typedef struct {
+ /* len and type should be transferred in network byte order */
+ u_int32_t len;
+ u_int32_t type;
+ char data[1];
+} TCMsg;
+
+#define TCMSGHDRSZ offsetof(TCMsg, data)
+
+/* tcp */
typedef struct {
/* I/O buffer */
typedef struct {
/* I/O buffer */
- u_int32_t len;
- char *buf;
- char *ptr;
+ u_int32_t len;
+ char *ptr;
+ TCMsg *buf; /* send buf */
- struct sockaddr_in serv_addr;
+ struct sockaddr_in serv_addr;
/* external link */
void* data;
} TC_Connection;
/* external link */
void* data;
} TC_Connection;
-#define TCCONNHDRSZ ( sizeof(TC_Connection) - sizeof(void*) )
+#define TCCONNHDRSZ offsetof(TC_Connection, data)
TC_Connection *TC_fillConnection( TC_Connection *cs, char *name, u_int32_t port );
TC_Connection* TC_AcceptTcp(TC_Connection *cs);
u_int32_t TC_ClientInitConnection(TC_Connection *cs, char *name, u_int32_t port);
u_int32_t TC_ServerInitConnect( TC_Connection *cs );
u_int32_t TC_ServerConnect( TC_Connection *cs, int timeout );
TC_Connection *TC_fillConnection( TC_Connection *cs, char *name, u_int32_t port );
TC_Connection* TC_AcceptTcp(TC_Connection *cs);
u_int32_t TC_ClientInitConnection(TC_Connection *cs, char *name, u_int32_t port);
u_int32_t TC_ServerInitConnect( TC_Connection *cs );
u_int32_t TC_ServerConnect( TC_Connection *cs, int timeout );
+/*
+ * TC_Send doesn't promise to keep TC_Connection->buf unchanged
+ */
u_int32_t TC_Send( TC_Connection *cs );
u_int32_t TC_Read( TC_Connection *cs, size_t maxsize );
u_int32_t TC_Talk( TC_Connection *cs, size_t maxsize );
void TC_FreeConnection( TC_Connection *cs );
int TC_ReadyIO( TC_Connection **cs, int number, int timeout );
u_int32_t TC_Send( TC_Connection *cs );
u_int32_t TC_Read( TC_Connection *cs, size_t maxsize );
u_int32_t TC_Talk( TC_Connection *cs, size_t maxsize );
void TC_FreeConnection( TC_Connection *cs );
int TC_ReadyIO( TC_Connection **cs, int number, int timeout );
-typedef struct {
- u_int32_t len;
- u_int32_t type;
- char data[1];
-} TCMsg;
-
-#define TCMSGHDRSZ (2*sizeof(u_int32_t))
typedef struct {
char *host;
u_int32_t port;
typedef struct {
char *host;
u_int32_t port;
*
* Udp client send:
* Msg msg;
*
* Udp client send:
* Msg msg;
- * msg.host = "127.0.0.1";
- * msg.port = 5432;
+ * msg.host = "127.0.0.1";
+ * msg.port = 5432;
* msg.sockfd =-1;
* msg.msg = GOTFILLEDPMSG();
* if ( TC_sendMsg(&msg)!=CS_OK ) {
* msg.sockfd =-1;
* msg.msg = GOTFILLEDPMSG();
* if ( TC_sendMsg(&msg)!=CS_OK ) {
* }
* msg.msg = GOTFILLEDPMSG();
* if ( TC_sendMsg(&msg)!=CS_OK ) {
* }
* msg.msg = GOTFILLEDPMSG();
* if ( TC_sendMsg(&msg)!=CS_OK ) {
* }
* TC_closefd(&msg);
*/
* }
* TC_closefd(&msg);
*/
u_int32_t TC_sendMsg( Msg *msg );
void TC_closefd( Msg *msg );
u_int32_t TC_sendMsg( Msg *msg );
void TC_closefd( Msg *msg );
+/*
+ * Connection pool
+ */
+
- u_int32_t len;
- u_int32_t number;
- TC_Connection **conn;
+ u_int32_t len;
+ u_int32_t number;
+ TC_Connection **conn;
} PoolConnection;
void TC_addConnection(PoolConnection *pool, TC_Connection *c);
} PoolConnection;
void TC_addConnection(PoolConnection *pool, TC_Connection *c);
#define __REGIS_H__
#include <sys/types.h>
#define __REGIS_H__
#include <sys/types.h>
typedef struct RegisNode {
u_int32_t
typedef struct RegisNode {
u_int32_t
unsigned char data[1];
} RegisNode;
unsigned char data[1];
} RegisNode;
-#define RNHDRSZ (sizeof(u_int32_t)+sizeof(void*))
+#define RNHDRSZ offsetof(RegisNode, data)
#define RSF_ONEOF 1
#define RSF_NONEOF 2
#define RSF_ONEOF 1
#define RSF_NONEOF 2
if ( cs->state == CS_ERROR )
return CS_ERROR;
if ( cs->state == CS_ERROR )
return CS_ERROR;
- if ( cs->state != CS_SEND ) {
+ if ( cs->state != CS_SEND || cs->ptr == NULL ) {
+ cs->ptr = (char*)cs->buf;
+ cs->len = cs->buf->len;
+
+ /* convert fields to network byteorder */
+ cs->buf->len = htonl(cs->buf->len);
+ cs->buf->type = htonl(cs->buf->type);
- if ( cs->ptr - cs->buf >= cs->len ) {
+ if ( cs->ptr - (char*)cs->buf >= cs->len ) {
cs->state = CS_FINISHSEND;
return CS_FINISHSEND;
}
cs->state = CS_FINISHSEND;
return CS_FINISHSEND;
}
- if ((sz=write(cs->fd, cs->ptr, cs->len - (cs->ptr - cs->buf)))==0 ||
+ if ((sz=write(cs->fd, cs->ptr, cs->len - (cs->ptr - (char*)cs->buf)))==0 ||
(sz < 0 && (errno == EWOULDBLOCK || errno == EAGAIN))) {
/* SunOS 4.1.x, are broken and select() says that
(sz < 0 && (errno == EWOULDBLOCK || errno == EAGAIN))) {
/* SunOS 4.1.x, are broken and select() says that
- if ( cs->ptr - cs->buf >= cs->len ) {
+ if ( cs->ptr - (char*)cs->buf >= cs->len ) {
cs->state = CS_FINISHSEND;
cs->state = CS_FINISHSEND;
+ /* revert byteorder conversion */
+ cs->buf->len = ntohl(cs->buf->len);
+ cs->buf->type = ntohl(cs->buf->type);
static void
resizeCS( TC_Connection *cs, int sz ) {
static void
resizeCS( TC_Connection *cs, int sz ) {
- int diff = cs->ptr - cs->buf;
+ int diff = cs->ptr - (char*)cs->buf;
+
if ( cs->len >= sz )
return;
cs->len = sz;
if ( cs->len >= sz )
return;
cs->len = sz;
- cs->buf = (char*)trealloc( (void*)cs->buf, cs->len );
- cs->ptr = cs->buf + diff;
+ cs->buf = (TCMsg*)trealloc( (void*)cs->buf, cs->len );
+ cs->ptr = ((char*)cs->buf) + diff;
if ( cs->state == CS_ERROR )
return CS_ERROR;
if ( cs->state == CS_ERROR )
return CS_ERROR;
- if (cs->state != CS_READ ) {
+ if (cs->state != CS_READ || cs->ptr == NULL ) {
+ cs->ptr = (char*)cs->buf;
+ cs->len = 0;
- alreadyread = cs->ptr - cs->buf;
- if ( alreadyread < sizeof(u_int32_t) ) {
- toread = sizeof(u_int32_t) - alreadyread;
- resizeCS(cs, sizeof(u_int32_t));
+ alreadyread = cs->ptr - (char*)cs->buf;
+ if ( alreadyread < TCMSGHDRSZ ) {
+ toread = TCMSGHDRSZ - alreadyread;
+ resizeCS(cs, TCMSGHDRSZ);
- totalread = *(u_int32_t*)(cs->buf);
+ totalread = ntohl(cs->buf->len);
if ( maxsize > 0 && totalread > maxsize )
{
tlog(TL_ALARM,"TC_Read: message size (%d b) is greater than max allowed (%d b)", totalread, maxsize);
if ( maxsize > 0 && totalread > maxsize )
{
tlog(TL_ALARM,"TC_Read: message size (%d b) is greater than max allowed (%d b)", totalread, maxsize);
+ if ( alreadyread < TCMSGHDRSZ && alreadyread + sz >= TCMSGHDRSZ ) {
+ /*
+ * we just read header - we can get totalread value.
+ */
+ totalread = ntohl(cs->buf->len);
+ }
+
cs->ptr += sz;
alreadyread += sz;
if ( sz == 0 && alreadyread != totalread ) {
cs->ptr += sz;
alreadyread += sz;
if ( sz == 0 && alreadyread != totalread ) {
cs->state = CS_ERROR;
return CS_ERROR;
}
cs->state = CS_ERROR;
return CS_ERROR;
}
- cs->state = ( alreadyread == totalread ) ? CS_FINISHREAD : CS_READ;
+
+ if ( alreadyread == totalread ) {
+ cs->buf->len = ntohl(cs->buf->len);
+ cs->buf->type = ntohl(cs->buf->type);
+ cs->state = CS_FINISHREAD;
+ }
+
return cs->state;
cs->state = CS_SEND;
return cs->state;
cs->state = CS_SEND;
while( cs->state != CS_FINISHSEND ) {
while( !TC_ReadyIO( &cs, 1, 100) );
if ( TC_Send(cs) == CS_ERROR ) return CS_ERROR;
}
cs->state = CS_READ;
while( cs->state != CS_FINISHSEND ) {
while( !TC_ReadyIO( &cs, 1, 100) );
if ( TC_Send(cs) == CS_ERROR ) return CS_ERROR;
}
cs->state = CS_READ;
while( cs->state != CS_FINISHREAD ) {
while( !TC_ReadyIO( &cs, 1, 100) );
if ( TC_Read(cs, maxsize) == CS_ERROR ) return CS_ERROR;
while( cs->state != CS_FINISHREAD ) {
while( !TC_ReadyIO( &cs, 1, 100) );
if ( TC_Read(cs, maxsize) == CS_ERROR ) return CS_ERROR;
char data[1];
} MemoryChunk;
char data[1];
} MemoryChunk;
-#define MEMCHNKHDRSZ ( sizeof(u_int32_t)*2 + sizeof(MemoryChunk*) )
+#define MEMCHNKHDRSZ offsetof(MemoryChunk, data)
typedef struct MemoryContext {
u_int32_t flags;
typedef struct MemoryContext {
u_int32_t flags;
tlog(TL_ALARM, "Got message %d bytes (should be al least %d)", n, TCMSGHDRSZ );
return CS_AGAIN;
}
tlog(TL_ALARM, "Got message %d bytes (should be al least %d)", n, TCMSGHDRSZ );
return CS_AGAIN;
}
+
+ /*
+ * convert from network byteorder
+ */
+ pmsg->len = ntohl(pmsg->len);
+ pmsg->type = ntohl(pmsg->type);
if ( pmsg->len > MSGMAXSIZE ) {
tlog(TL_ALARM, "Messages (%d bytes) is too big", pmsg->len);
if ( pmsg->len > MSGMAXSIZE ) {
tlog(TL_ALARM, "Messages (%d bytes) is too big", pmsg->len);
/* send */
u_int32_t
TC_sendMsg( Msg *msg ) {
/* send */
u_int32_t
TC_sendMsg( Msg *msg ) {
if ( msg->msg == NULL || msg->msg->len <=0 )
return CS_OK;
if ( msg->msg == NULL || msg->msg->len <=0 )
return CS_OK;
msg->host_addr.sin_port = htons(msg->port);
}
msg->host_addr.sin_port = htons(msg->port);
}
- if (sendto(msg->sockfd, (void*)msg->msg, msg->msg->len, 0, (struct sockaddr *) &(msg->host_addr), sizeof(msg->host_addr)) != msg->msg->len) {
+ /*
+ * convert to network byteorder
+ */
+ msglen = msg->msg->len;
+ msg->msg->len = htonl(msg->msg->len);
+ msg->msg->type = htonl(msg->msg->type);
+
+ if (sendto(msg->sockfd, (void*)msg->msg, msglen, 0, (struct sockaddr *) &(msg->host_addr), sizeof(msg->host_addr)) != msglen) {
tlog(TL_CRIT,"Can't send message to %s:%d : %s", msg->host, msg->port, strerror(errno));
close(msg->sockfd);
msg->sockfd=-1;
tlog(TL_CRIT,"Can't send message to %s:%d : %s", msg->host, msg->port, strerror(errno));
close(msg->sockfd);
msg->sockfd=-1;