summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/CMakeLists.txt12
-rw-r--r--lib/kvstorage/libkvstorageclient.c142
-rw-r--r--lib/kvstorage/libkvstorageclient.h24
-rw-r--r--src/main.h18
-rw-r--r--src/statfile.c48
-rw-r--r--src/statfile.h15
-rw-r--r--src/util.c219
-rw-r--r--src/util.h27
-rw-r--r--src/worker.c169
9 files changed, 373 insertions, 301 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index f2aa2e5f2..5d52e1ab9 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -1,5 +1,8 @@
# Librspamdclient
-SET(LIBRSPAMDSRC client/librspamdclient.c ../src/mem_pool.c ../src/upstream.c ../src/printf.c)
+SET(LIBRSPAMDSRC client/librspamdclient.c
+ ../src/mem_pool.c
+ ../src/upstream.c
+ ../src/printf.c)
ADD_LIBRARY(rspamdclient SHARED ${LIBRSPAMDSRC})
ADD_LIBRARY(rspamdclient_static STATIC ${LIBRSPAMDSRC})
@@ -88,7 +91,11 @@ TARGET_LINK_LIBRARIES(rspamdserver rspamd_cdb)
# Libkvstorageclient
-SET(LIBRKVSTORAGESRC kvstorage/libkvstorageclient.c ../src/mem_pool.c ../src/upstream.c ../src/printf.c ../src/util.c)
+SET(LIBRKVSTORAGESRC kvstorage/libkvstorageclient.c
+ ../src/mem_pool.c
+ ../src/upstream.c
+ ../src/printf.c
+ ../src/util.c)
ADD_LIBRARY(kvstorageclient SHARED ${LIBRKVSTORAGESRC})
ADD_LIBRARY(kvstorageclient_static STATIC ${LIBRKVSTORAGESRC})
@@ -102,6 +109,7 @@ ENDIF(CMAKE_COMPILER_IS_GNUCC)
TARGET_LINK_LIBRARIES(kvstorageclient ${CMAKE_REQUIRED_LIBRARIES})
TARGET_LINK_LIBRARIES(kvstorageclient pcre)
TARGET_LINK_LIBRARIES(kvstorageclient ${GLIB2_LIBRARIES})
+TARGET_LINK_LIBRARIES(kvstorageclient event)
TARGET_LINK_LIBRARIES(kvstorageclient_static ${CMAKE_REQUIRED_LIBRARIES})
TARGET_LINK_LIBRARIES(kvstorageclient_static ${GLIB2_LIBRARIES})
diff --git a/lib/kvstorage/libkvstorageclient.c b/lib/kvstorage/libkvstorageclient.c
index 4d3b17dd3..d05e8b5e0 100644
--- a/lib/kvstorage/libkvstorageclient.c
+++ b/lib/kvstorage/libkvstorageclient.c
@@ -23,11 +23,22 @@
#include "config.h"
-#include "main.h"
+#include "mem_pool.h"
+#include "util.h"
#include "libkvstorageclient.h"
#define MAX_KV_LINE 1024
+#ifdef CRLF
+#undef CRLF
+#undef CR
+#undef LF
+#endif
+
+#define CRLF "\r\n"
+#define CR '\r'
+#define LF '\n'
+
struct kvstorage_buf {
guint pos;
guint len;
@@ -242,12 +253,13 @@ rspamd_kvstorage_parse_reply_error (struct kvstorage_buf *buf)
static enum rspamd_kvstorage_error
rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *flags)
{
- guint8 *p, *c;
+ guint8 *p, *c, *end;
gboolean error = TRUE;
gchar *err_str;
p = buf->data;
- while (p - buf->data < buf->pos) {
+ end = buf->data + buf->pos;
+ while (p < end) {
if (g_ascii_isspace (*p)) {
error = FALSE;
while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
@@ -268,10 +280,11 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f
/* Here we got key, flags and size items */
/* Skip key */
error = TRUE;
- while (p - buf->data < buf->pos) {
+ while (p < end) {
if (g_ascii_isspace (*p)) {
error = FALSE;
- while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
+ /* Skip spaces after key */
+ while (p < end && g_ascii_isspace (*p)) {
p ++;
}
break;
@@ -285,9 +298,10 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f
/* Read flags */
c = p;
error = TRUE;
- while (p - buf->data < buf->pos) {
+ while (p < end) {
if (g_ascii_isspace (*p)) {
error = FALSE;
+ /* Skip spaces after flags */
while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
p ++;
}
@@ -308,16 +322,8 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f
}
/* Read len */
c = p;
- error = TRUE;
- while (p - buf->data < buf->pos) {
- if (g_ascii_isspace (*p)) {
- error = FALSE;
- while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
- p ++;
- }
- break;
- }
- else if (!g_ascii_isdigit (*p)) {
+ while (p < end) {
+ if (!g_ascii_isdigit (*p)) {
break;
}
p ++;
@@ -347,9 +353,9 @@ rspamd_kvstorage_connect_cb (int fd, short what, gpointer ud)
cb (KVSTORAGE_ERROR_TIMEOUT, d->c, d->ud);
}
else {
+ d->c->state = KV_STATE_CONNECTED;
cb (KVSTORAGE_ERROR_OK, d->c, d->ud);
}
- d->c->state = KV_STATE_CONNECTED;
}
static void
@@ -364,7 +370,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
cb = (kvstorage_read_cb)d->c->read_cb;
if (what == EV_TIMEOUT) {
- cb (KVSTORAGE_ERROR_TIMEOUT, d->key, NULL, 0, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, NULL, 0, d->c, d->ud);
return;
}
if (d->c->state == KV_STATE_GET) {
@@ -391,6 +397,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
}
else if (r == 0) {
/* We have written everything */
+ d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
d->c->state = KV_STATE_READ_ELT;
event_del (&d->c->ev);
event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
@@ -403,7 +410,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
}
else {
/* Error occured during writing */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
}
}
else if (d->c->state == KV_STATE_WRITE_DATA) {
@@ -424,6 +431,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
else if (r == 0) {
/* We have written everything */
d->c->state = KV_STATE_READ_ELT;
+ d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
event_del (&d->c->ev);
event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
@@ -435,7 +443,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
}
else {
/* Error occured during writing */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
}
}
else if (d->c->state == KV_STATE_READ_ELT) {
@@ -455,10 +463,11 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
else if (r == 0) {
/* Got all data about elt */
if ((r = rspamd_kvstorage_parse_get_line (d->buf, &d->datalen, &flags)) != KVSTORAGE_ERROR_OK) {
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
return;
}
rspamd_kvstorage_buf_drainline (d->buf);
+
/* Now allocate and read the data */
databuf = rspamd_kvstorage_buf_create (d->datalen, d->c->pool);
memcpy (databuf->data, d->buf->data, d->buf->pos);
@@ -475,7 +484,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
}
else {
/* Error occured during reading reply line */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
}
}
else if (d->c->state == KV_STATE_READ_DATA) {
@@ -486,7 +495,6 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
d->c->state = KV_STATE_READ_REPLY;
/* Save databuf */
d->data = d->buf->data;
- d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
event_del (&d->c->ev);
event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
@@ -509,7 +517,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
}
else {
/* Error occured */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
}
}
else if (d->c->state == KV_STATE_READ_REPLY) {
@@ -527,11 +535,12 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
}
}
else if (r == 0) {
- cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->data, d->datalen, d->c, d->ud);
+ d->c->state = KV_STATE_CONNECTED;
+ cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->data, d->datalen, d->c, d->ud);
}
else {
/* Error occured during reading reply line */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
}
}
}
@@ -547,7 +556,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
cb = (kvstorage_write_cb)d->c->write_cb;
if (what == EV_TIMEOUT) {
- cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, d->c, d->ud);
return;
}
if (d->c->state == KV_STATE_SET) {
@@ -574,6 +583,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
}
else if (r == 0) {
/* We have written everything */
+ d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
d->c->state = KV_STATE_READ_REPLY;
event_del (&d->c->ev);
event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d);
@@ -586,7 +596,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
}
else {
/* Error occured during writing */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
}
}
else if (d->c->state == KV_STATE_WRITE_DATA) {
@@ -607,6 +617,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
else if (r == 0) {
/* We have written everything */
d->c->state = KV_STATE_READ_REPLY;
+ d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
event_del (&d->c->ev);
event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d);
if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
@@ -618,7 +629,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
}
else {
/* Error occured during writing */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
}
}
else if (d->c->state == KV_STATE_READ_REPLY) {
@@ -636,11 +647,12 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
}
}
else if (r == 0) {
- cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->c, d->ud);
+ d->c->state = KV_STATE_CONNECTED;
+ cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->c, d->ud);
}
else {
/* Error occured during reading reply line */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
}
}
}
@@ -656,7 +668,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
cb = (kvstorage_write_cb)d->c->write_cb;
if (what == EV_TIMEOUT) {
- cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, d->c, d->ud);
return;
}
if (d->c->state == KV_STATE_SET) {
@@ -683,6 +695,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
}
else if (r == 0) {
/* We have written everything */
+ d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
d->c->state = KV_STATE_READ_REPLY;
event_del (&d->c->ev);
event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d);
@@ -695,7 +708,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
}
else {
/* Error occured during writing */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
}
}
else if (d->c->state == KV_STATE_WRITE_DATA) {
@@ -716,6 +729,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
else if (r == 0) {
/* We have written everything */
d->c->state = KV_STATE_READ_REPLY;
+ d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
event_del (&d->c->ev);
event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d);
if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
@@ -727,7 +741,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
}
else {
/* Error occured during writing */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
}
}
else if (d->c->state == KV_STATE_READ_REPLY) {
@@ -745,11 +759,12 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
}
}
else if (r == 0) {
- cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->c, d->ud);
+ d->c->state = KV_STATE_CONNECTED;
+ cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->c, d->ud);
}
else {
/* Error occured during reading reply line */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
}
}
}
@@ -773,7 +788,7 @@ rspamd_kvstorage_connect_async (const gchar *host,
gint sock;
/* Here we do NOT try to resolve hostname */
- if ((sock = make_universal_stream_socket (host, port, TRUE, FALSE, FALSE)) == -1) {
+ if ((sock = make_universal_stream_socket (host, port, TRUE, FALSE, TRUE)) == -1) {
return KVSTORAGE_ERROR_SERVER_ERROR;
}
@@ -819,7 +834,7 @@ rspamd_kvstorage_connect_async (const gchar *host,
*/
enum rspamd_kvstorage_error
rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn,
- const gpointer key, kvstorage_read_cb cb, gpointer ud)
+ const gpointer key, guint keylen, kvstorage_read_cb cb, gpointer ud)
{
struct rspamd_kvstorage_async_data *d;
@@ -833,7 +848,7 @@ rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn,
d->c = conn;
d->ud = ud;
d->key = memory_pool_strdup (conn->pool, key);
- d->keylen = strlen (d->key);
+ d->keylen = keylen;
conn->state = KV_STATE_GET;
/* Set event */
@@ -858,7 +873,8 @@ rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn,
*/
enum rspamd_kvstorage_error
rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn,
- const gpointer key, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb, gpointer ud)
+ const gpointer key, guint keylen, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb,
+ gpointer ud)
{
struct rspamd_kvstorage_async_data *d;
@@ -872,7 +888,7 @@ rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn,
d->c = conn;
d->ud = ud;
d->key = memory_pool_strdup (conn->pool, key);
- d->keylen = strlen (d->key);
+ d->keylen = keylen;
d->data = value;
d->datalen = len;
conn->state = KV_STATE_SET;
@@ -898,7 +914,7 @@ rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn,
*/
enum rspamd_kvstorage_error
rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn,
- const gpointer key, kvstorage_write_cb cb, gpointer ud)
+ const gpointer key, guint keylen, kvstorage_write_cb cb, gpointer ud)
{
struct rspamd_kvstorage_async_data *d;
@@ -912,7 +928,7 @@ rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn,
d->c = conn;
d->ud = ud;
d->key = memory_pool_strdup (conn->pool, key);
- d->keylen = strlen (d->key);
+ d->keylen = keylen;
conn->state = KV_STATE_SET;
/* Set event */
@@ -989,7 +1005,7 @@ rspamd_kvstorage_connect_sync (const gchar *host,
*/
enum rspamd_kvstorage_error
rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn,
- const gpointer key, gpointer **value, guint *len)
+ const gpointer key, guint keylen, gpointer **value, guint *len)
{
struct kvstorage_buf *buf, *databuf;
gint r;
@@ -1001,7 +1017,7 @@ rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn,
buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, conn->pool);
- r = rspamd_snprintf (buf->data, buf->len, "get %s" CRLF, key);
+ r = rspamd_snprintf (buf->data, buf->len, "get %*s" CRLF, keylen, key);
buf->len = r;
while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) {
poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT);
@@ -1051,17 +1067,16 @@ rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn,
*/
enum rspamd_kvstorage_error
rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn,
- const gpointer key, const gpointer value, gsize len, guint expire)
+ const gpointer key, guint keylen, const gpointer value, gsize len, guint expire)
{
struct kvstorage_buf *buf;
- gint r, keylen, buflen;
+ gint r, buflen;
if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
return KVSTORAGE_ERROR_INTERNAL_ERROR;
}
/* Create buf */
- keylen = strlen (key);
buflen = len + keylen + sizeof ("set 4294967296 4294967296 4294967296" CRLF);
buf = rspamd_kvstorage_buf_create (buflen, conn->pool);
@@ -1091,17 +1106,16 @@ rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn,
*/
enum rspamd_kvstorage_error
rspamd_kvstorage_delete_sync (struct rspamd_kvstorage_connection *conn,
- const gpointer key)
+ const gpointer key, guint keylen)
{
struct kvstorage_buf *buf;
- gint r, keylen, buflen;
+ gint r, buflen;
if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
return KVSTORAGE_ERROR_INTERNAL_ERROR;
}
/* Create buf */
- keylen = strlen (key);
buflen = MAX (keylen + sizeof ("delete " CRLF), MAX_KV_LINE);
buf = rspamd_kvstorage_buf_create (buflen, conn->pool);
@@ -1137,3 +1151,29 @@ rspamd_kvstorage_close_sync (struct rspamd_kvstorage_connection *conn)
return KVSTORAGE_ERROR_OK;
}
+
+const gchar*
+rspamd_kvstorage_strerror (enum rspamd_kvstorage_error err)
+{
+ switch (err) {
+ case KVSTORAGE_ERROR_OK:
+ return "operation completed";
+ case KVSTORAGE_ERROR_TIMEOUT:
+ return "operation timeout";
+ case KVSTORAGE_ERROR_NOT_FOUND:
+ return "key not found";
+ case KVSTORAGE_ERROR_NOT_STORED:
+ return "key not stored";
+ case KVSTORAGE_ERROR_EXISTS:
+ return "key exists";
+ case KVSTORAGE_ERROR_SERVER_ERROR:
+ return "server error";
+ case KVSTORAGE_ERROR_CLIENT_ERROR:
+ return "client error";
+ case KVSTORAGE_ERROR_INTERNAL_ERROR:
+ return "library error";
+ }
+
+ /* Not reached */
+ return "unknown error";
+}
diff --git a/lib/kvstorage/libkvstorageclient.h b/lib/kvstorage/libkvstorageclient.h
index 262c5e89e..11145fd49 100644
--- a/lib/kvstorage/libkvstorageclient.h
+++ b/lib/kvstorage/libkvstorageclient.h
@@ -26,6 +26,7 @@
#define LIBKVSTORAGECLIENT_H_
#include <glib.h>
+#include <sys/time.h>
/* Errors */
enum rspamd_kvstorage_error {
@@ -45,10 +46,11 @@ struct rspamd_kvstorage_connection;
/* Callbacks for async API */
typedef void (*kvstorage_connect_cb) (enum rspamd_kvstorage_error code,
struct rspamd_kvstorage_connection *conn, gpointer user_data);
-typedef void (*kvstorage_read_cb) (enum rspamd_kvstorage_error code, const gpointer key,
+typedef void (*kvstorage_read_cb) (enum rspamd_kvstorage_error code, const gpointer key, guint keylen,
const gpointer value, gsize datalen, struct rspamd_kvstorage_connection *conn,
gpointer user_data);
-typedef void (*kvstorage_write_cb) (enum rspamd_kvstorage_error code, const gpointer key, struct rspamd_kvstorage_connection *conn,
+typedef void (*kvstorage_write_cb) (enum rspamd_kvstorage_error code, const gpointer key, guint keylen,
+ struct rspamd_kvstorage_connection *conn,
gpointer user_data);
/* Asynced API */
@@ -74,7 +76,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_connect_async (const gchar *host,
* @param ud user data for callback
*/
enum rspamd_kvstorage_error rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn,
- const gpointer key, kvstorage_read_cb cb, gpointer ud);
+ const gpointer key, guint keylen, kvstorage_read_cb cb, gpointer ud);
/**
* Write key asynced
@@ -85,7 +87,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_get_async (struct rspamd_kvstorage_
* @param ud user data for callback
*/
enum rspamd_kvstorage_error rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn,
- const gpointer key, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb, gpointer ud);
+ const gpointer key, guint keylen, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb, gpointer ud);
/**
* Delete key asynced
@@ -95,7 +97,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_set_async (struct rspamd_kvstorage_
* @param ud user data for callback
*/
enum rspamd_kvstorage_error rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn,
- const gpointer key, kvstorage_write_cb cb, gpointer ud);
+ const gpointer key, guint keylen, kvstorage_write_cb cb, gpointer ud);
/**
* Close connection
@@ -122,7 +124,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_connect_sync (const gchar *host,
* @param value value readed
*/
enum rspamd_kvstorage_error rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn,
- const gpointer key, gpointer **value, guint *len);
+ const gpointer key, guint keylen, gpointer **value, guint *len);
/**
* Write key synced
@@ -131,7 +133,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_get_sync (struct rspamd_kvstorage_c
* @param value data to write
*/
enum rspamd_kvstorage_error rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn,
- const gpointer key, const gpointer value, gsize len, guint expire);
+ const gpointer key, guint keylen, const gpointer value, gsize len, guint expire);
/**
* Delete key synced
@@ -139,7 +141,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_set_sync (struct rspamd_kvstorage_c
* @param key key to delete
*/
enum rspamd_kvstorage_error rspamd_kvstorage_delete_sync (struct rspamd_kvstorage_connection *conn,
- const gpointer key);
+ const gpointer key, guint keylen);
/**
* Close connection
@@ -147,4 +149,10 @@ enum rspamd_kvstorage_error rspamd_kvstorage_delete_sync (struct rspamd_kvstorag
*/
enum rspamd_kvstorage_error rspamd_kvstorage_close_sync (struct rspamd_kvstorage_connection *conn);
+/**
+ * Convert error code to string
+ * @param err error code to be converted
+ */
+const gchar* rspamd_kvstorage_strerror (enum rspamd_kvstorage_error err);
+
#endif /* LIBKVSTORAGECLIENT_H_ */
diff --git a/src/main.h b/src/main.h
index 7ff76afb3..59a5f3c30 100644
--- a/src/main.h
+++ b/src/main.h
@@ -292,6 +292,24 @@ gpointer init_workers_ctx (enum process_type type);
*/
extern struct rspamd_main *rspamd_main;
+/* Worker task manipulations */
+
+/**
+ * Construct new task for worker
+ */
+struct worker_task* construct_task (struct rspamd_worker *worker);
+/**
+ * Destroy task object and remove its IO dispatcher if it exists
+ */
+void free_task (struct worker_task *task, gboolean is_soft);
+void free_task_hard (gpointer ud);
+void free_task_soft (gpointer ud);
+
+/**
+ * Set counter for a symbol
+ */
+double set_counter (const gchar *name, guint32 value);
+
#endif
/*
diff --git a/src/statfile.c b/src/statfile.c
index 26d96a38c..d9d42d983 100644
--- a/src/statfile.c
+++ b/src/statfile.c
@@ -891,3 +891,51 @@ statfile_pool_plan_invalidate (statfile_pool_t *pool, time_t seconds, time_t jit
msg_info ("invalidate of statfile pool is planned in %d seconds", (gint)pool->invalidate_tv.tv_sec);
}
}
+
+
+stat_file_t *
+get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf,
+ const gchar *symbol, struct statfile **st, gboolean try_create)
+{
+ stat_file_t *res = NULL;
+ GList *cur;
+
+ if (pool == NULL || ccf == NULL || symbol == NULL) {
+ msg_err ("invalid input arguments");
+ return NULL;
+ }
+
+ cur = g_list_first (ccf->statfiles);
+ while (cur) {
+ *st = cur->data;
+ if (strcmp (symbol, (*st)->symbol) == 0) {
+ break;
+ }
+ *st = NULL;
+ cur = g_list_next (cur);
+ }
+ if (*st == NULL) {
+ msg_info ("cannot find statfile with symbol %s", symbol);
+ return NULL;
+ }
+
+ if ((res = statfile_pool_is_open (pool, (*st)->path)) == NULL) {
+ if ((res = statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE)) == NULL) {
+ msg_warn ("cannot open %s", (*st)->path);
+ if (try_create) {
+ if (statfile_pool_create (pool, (*st)->path, (*st)->size) == -1) {
+ msg_err ("cannot create statfile %s", (*st)->path);
+ return NULL;
+ }
+ res = statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE);
+ if (res == NULL) {
+ msg_err ("cannot open statfile %s after creation", (*st)->path);
+ }
+ }
+ }
+ }
+
+ return res;
+}
+
+
diff --git a/src/statfile.h b/src/statfile.h
index e76aa0897..edb58e5db 100644
--- a/src/statfile.h
+++ b/src/statfile.h
@@ -96,6 +96,10 @@ typedef struct statfile_pool_s {
struct timeval invalidate_tv;
} statfile_pool_t;
+/* Forwarded declarations */
+struct classifier_config;
+struct statfile;
+
/**
* Create new statfile pool
* @param max_size maximum size
@@ -261,4 +265,15 @@ guint64 statfile_get_total_blocks (stat_file_t *file);
*/
void statfile_pool_plan_invalidate (statfile_pool_t *pool, time_t seconds, time_t jitter);
+/**
+ * Get a statfile by symbol
+ * @param pool pool object
+ * @param ccf ccf classifier config
+ * @param symbol symbol to search
+ * @param st statfile to get
+ * @param try_create whether we need to create statfile if it is absent
+ */
+stat_file_t* get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf,
+ const gchar *symbol, struct statfile **st, gboolean try_create);
+
#endif
diff --git a/src/util.c b/src/util.c
index 4d2665f6b..2efd8f0d5 100644
--- a/src/util.c
+++ b/src/util.c
@@ -38,8 +38,6 @@
/* Default connect timeout for sync sockets */
#define CONNECT_TIMEOUT 3
-rspamd_hash_t *counters = NULL;
-
static gchar* rspamd_sprintf_num (gchar *buf, gchar *last, guint64 ui64, gchar zero, guint hexadecimal, guint width);
gint
@@ -939,35 +937,6 @@ calculate_check_time (struct timeval *begin, gint resolution)
return (const gchar *)res;
}
-double
-set_counter (const gchar *name, guint32 value)
-{
- struct counter_data *cd;
- double alpha;
- gchar *key;
-
- cd = rspamd_hash_lookup (counters, (gpointer) name);
-
- if (cd == NULL) {
- cd = memory_pool_alloc_shared (counters->pool, sizeof (struct counter_data));
- cd->value = value;
- cd->number = 0;
- key = memory_pool_strdup_shared (counters->pool, name);
- rspamd_hash_insert (counters, (gpointer) key, (gpointer) cd);
- }
- else {
- /* Calculate new value */
- memory_pool_wlock_rwlock (counters->lock);
-
- alpha = 2. / (++cd->number + 1);
- cd->value = cd->value * (1. - alpha) + value * alpha;
-
- memory_pool_wunlock_rwlock (counters->lock);
- }
-
- return cd->value;
-}
-
#ifndef g_tolower
# define g_tolower(x) (((x) >= 'A' && (x) <= 'Z') ? (x) - 'A' + 'a' : (x))
#endif
@@ -1147,52 +1116,6 @@ unlock_file (gint fd, gboolean async)
}
#endif /* HAVE_FLOCK */
-stat_file_t *
-get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf,
- const gchar *symbol, struct statfile **st, gboolean try_create)
-{
- stat_file_t *res = NULL;
- GList *cur;
-
- if (pool == NULL || ccf == NULL || symbol == NULL) {
- msg_err ("invalid input arguments");
- return NULL;
- }
-
- cur = g_list_first (ccf->statfiles);
- while (cur) {
- *st = cur->data;
- if (strcmp (symbol, (*st)->symbol) == 0) {
- break;
- }
- *st = NULL;
- cur = g_list_next (cur);
- }
- if (*st == NULL) {
- msg_info ("cannot find statfile with symbol %s", symbol);
- return NULL;
- }
-
- if ((res = statfile_pool_is_open (pool, (*st)->path)) == NULL) {
- if ((res = statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE)) == NULL) {
- msg_warn ("cannot open %s", (*st)->path);
- if (try_create) {
- if (statfile_pool_create (pool, (*st)->path, (*st)->size) == -1) {
- msg_err ("cannot create statfile %s", (*st)->path);
- return NULL;
- }
- res = statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE);
- if (res == NULL) {
- msg_err ("cannot open statfile %s after creation", (*st)->path);
- }
- }
- }
- }
-
- return res;
-}
-
-
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION < 22))
void
@@ -1276,22 +1199,8 @@ str_to_process (const gchar *str)
return TYPE_UNKNOWN;
}
-/*
- * Destructor for recipients list
- */
-static void
-rcpt_destruct (void *pointer)
-{
- struct worker_task *task = (struct worker_task *) pointer;
-
- if (task->rcpt) {
- g_list_free (task->rcpt);
- }
-}
-
-
/* Compare two emails for building emails tree */
-static gint
+gint
compare_email_func (gconstpointer a, gconstpointer b)
{
const struct uri *u1 = a, *u2 = b;
@@ -1317,7 +1226,7 @@ compare_email_func (gconstpointer a, gconstpointer b)
return 0;
}
-static gint
+gint
compare_url_func (gconstpointer a, gconstpointer b)
{
const struct uri *u1 = a, *u2 = b;
@@ -1333,130 +1242,6 @@ compare_url_func (gconstpointer a, gconstpointer b)
return r;
}
-/*
- * Create new task
- */
-struct worker_task *
-construct_task (struct rspamd_worker *worker)
-{
- struct worker_task *new_task;
-
- new_task = g_slice_alloc0 (sizeof (struct worker_task));
-
- new_task->worker = worker;
- new_task->state = READ_COMMAND;
- new_task->cfg = worker->srv->cfg;
- new_task->from_addr.s_addr = INADDR_NONE;
- new_task->view_checked = FALSE;
-#ifdef HAVE_CLOCK_GETTIME
-# ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID
- clock_gettime (CLOCK_PROCESS_CPUTIME_ID, &new_task->ts);
-# elif defined(HAVE_CLOCK_VIRTUAL)
- clock_gettime (CLOCK_VIRTUAL, &new_task->ts);
-# else
- clock_gettime (CLOCK_REALTIME, &new_task->ts);
-# endif
-#endif
- if (gettimeofday (&new_task->tv, NULL) == -1) {
- msg_warn ("gettimeofday failed: %s", strerror (errno));
- }
-
- new_task->task_pool = memory_pool_new (memory_pool_get_size ());
-
- /* Add destructor for recipients list (it would be better to use anonymous function here */
- memory_pool_add_destructor (new_task->task_pool,
- (pool_destruct_func) rcpt_destruct, new_task);
- new_task->results = g_hash_table_new (g_str_hash, g_str_equal);
- memory_pool_add_destructor (new_task->task_pool,
- (pool_destruct_func) g_hash_table_destroy,
- new_task->results);
- new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal);
- memory_pool_add_destructor (new_task->task_pool,
- (pool_destruct_func) g_hash_table_destroy,
- new_task->re_cache);
- new_task->raw_headers = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
- memory_pool_add_destructor (new_task->task_pool,
- (pool_destruct_func) g_hash_table_destroy,
- new_task->raw_headers);
- new_task->emails = g_tree_new (compare_email_func);
- memory_pool_add_destructor (new_task->task_pool,
- (pool_destruct_func) g_tree_destroy,
- new_task->emails);
- new_task->urls = g_tree_new (compare_url_func);
- memory_pool_add_destructor (new_task->task_pool,
- (pool_destruct_func) g_tree_destroy,
- new_task->urls);
- new_task->s =
- new_async_session (new_task->task_pool, free_task_hard, new_task);
- new_task->sock = -1;
- new_task->is_mime = TRUE;
-
- return new_task;
-}
-
-
-/*
- * Free all structures of worker_task
- */
-void
-free_task (struct worker_task *task, gboolean is_soft)
-{
- GList *part;
- struct mime_part *p;
-
- if (task) {
- debug_task ("free pointer %p", task);
- while ((part = g_list_first (task->parts))) {
- task->parts = g_list_remove_link (task->parts, part);
- p = (struct mime_part *) part->data;
- g_byte_array_free (p->content, TRUE);
- g_list_free_1 (part);
- }
- if (task->text_parts) {
- g_list_free (task->text_parts);
- }
- if (task->images) {
- g_list_free (task->images);
- }
- if (task->messages) {
- g_list_free (task->messages);
- }
- if (task->received) {
- g_list_free (task->received);
- }
- memory_pool_delete (task->task_pool);
- if (task->dispatcher) {
- if (is_soft) {
- /* Plan dispatcher shutdown */
- task->dispatcher->wanna_die = 1;
- }
- else {
- rspamd_remove_dispatcher (task->dispatcher);
- }
- }
- if (task->sock != -1) {
- close (task->sock);
- }
- g_slice_free1 (sizeof (struct worker_task), task);
- }
-}
-
-void
-free_task_hard (gpointer ud)
-{
- struct worker_task *task = ud;
-
- free_task (task, FALSE);
-}
-
-void
-free_task_soft (gpointer ud)
-{
- struct worker_task *task = ud;
-
- free_task (task, FALSE);
-}
-
gchar *
escape_braces_addr_fstr (memory_pool_t *pool, f_str_t *in)
{
diff --git a/src/util.h b/src/util.h
index bbe49db67..95d5703bd 100644
--- a/src/util.h
+++ b/src/util.h
@@ -149,11 +149,6 @@ const gchar* calculate_check_time (struct timeval *begin, gint resolution);
#endif
/*
- * Set counter for a symbol
- */
-double set_counter (const gchar *name, guint32 value);
-
-/*
* File locking functions
*/
gboolean lock_file (gint fd, gboolean async);
@@ -177,12 +172,6 @@ gboolean fstr_strcase_equal (gconstpointer v, gconstpointer v2);
void gperf_profiler_init (struct config_file *cfg, const gchar *descr);
/*
- * Get a statfile by symbol
- */
-stat_file_t* get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf,
- const gchar *symbol, struct statfile **st, gboolean try_create);
-
-/*
* Workaround for older versions of glib
*/
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION < 22))
@@ -228,19 +217,11 @@ gchar * escape_braces_addr_fstr (memory_pool_t *pool, f_str_t *in);
#define msec_to_tv(msec, tv) do { (tv)->tv_sec = (msec) / 1000; (tv)->tv_usec = ((msec) - (tv)->tv_sec * 1000) * 1000; } while(0)
#define tv_to_msec(tv) (tv)->tv_sec * 1000 + (tv)->tv_usec / 1000
-struct worker_task;
-struct rspamd_worker;
+/* Compare two emails for building emails tree */
+gint compare_email_func (gconstpointer a, gconstpointer b);
-/**
- * Construct new task for worker
- */
-struct worker_task* construct_task (struct rspamd_worker *worker);
-/**
- * Destroy task object and remove its IO dispatcher if it exists
- */
-void free_task (struct worker_task *task, gboolean is_soft);
-void free_task_hard (gpointer ud);
-void free_task_soft (gpointer ud);
+/* Compare two urls for building emails tree */
+gint compare_url_func (gconstpointer a, gconstpointer b);
/*
* Find string find in string s ignoring case
diff --git a/src/worker.c b/src/worker.c
index d1aeea859..24cb0dd69 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -98,6 +98,8 @@ static gboolean write_socket (void *arg);
static sig_atomic_t wanna_die = 0;
+rspamd_hash_t *counters = NULL;
+
#ifndef HAVE_SA_SIGINFO
static void
sig_handler (gint signo)
@@ -160,6 +162,172 @@ sigusr1_handler (gint fd, short what, void *arg)
return;
}
+/*
+ * Destructor for recipients list in a task
+ */
+static void
+rcpt_destruct (void *pointer)
+{
+ struct worker_task *task = (struct worker_task *) pointer;
+
+ if (task->rcpt) {
+ g_list_free (task->rcpt);
+ }
+}
+
+/*
+ * Create new task
+ */
+struct worker_task *
+construct_task (struct rspamd_worker *worker)
+{
+ struct worker_task *new_task;
+
+ new_task = g_slice_alloc0 (sizeof (struct worker_task));
+
+ new_task->worker = worker;
+ new_task->state = READ_COMMAND;
+ new_task->cfg = worker->srv->cfg;
+ new_task->from_addr.s_addr = INADDR_NONE;
+ new_task->view_checked = FALSE;
+#ifdef HAVE_CLOCK_GETTIME
+# ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID
+ clock_gettime (CLOCK_PROCESS_CPUTIME_ID, &new_task->ts);
+# elif defined(HAVE_CLOCK_VIRTUAL)
+ clock_gettime (CLOCK_VIRTUAL, &new_task->ts);
+# else
+ clock_gettime (CLOCK_REALTIME, &new_task->ts);
+# endif
+#endif
+ if (gettimeofday (&new_task->tv, NULL) == -1) {
+ msg_warn ("gettimeofday failed: %s", strerror (errno));
+ }
+
+ new_task->task_pool = memory_pool_new (memory_pool_get_size ());
+
+ /* Add destructor for recipients list (it would be better to use anonymous function here */
+ memory_pool_add_destructor (new_task->task_pool,
+ (pool_destruct_func) rcpt_destruct, new_task);
+ new_task->results = g_hash_table_new (g_str_hash, g_str_equal);
+ memory_pool_add_destructor (new_task->task_pool,
+ (pool_destruct_func) g_hash_table_destroy,
+ new_task->results);
+ new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal);
+ memory_pool_add_destructor (new_task->task_pool,
+ (pool_destruct_func) g_hash_table_destroy,
+ new_task->re_cache);
+ new_task->raw_headers = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
+ memory_pool_add_destructor (new_task->task_pool,
+ (pool_destruct_func) g_hash_table_destroy,
+ new_task->raw_headers);
+ new_task->emails = g_tree_new (compare_email_func);
+ memory_pool_add_destructor (new_task->task_pool,
+ (pool_destruct_func) g_tree_destroy,
+ new_task->emails);
+ new_task->urls = g_tree_new (compare_url_func);
+ memory_pool_add_destructor (new_task->task_pool,
+ (pool_destruct_func) g_tree_destroy,
+ new_task->urls);
+ new_task->s =
+ new_async_session (new_task->task_pool, free_task_hard, new_task);
+ new_task->sock = -1;
+ new_task->is_mime = TRUE;
+
+ return new_task;
+}
+
+
+/*
+ * Free all structures of worker_task
+ */
+void
+free_task (struct worker_task *task, gboolean is_soft)
+{
+ GList *part;
+ struct mime_part *p;
+
+ if (task) {
+ debug_task ("free pointer %p", task);
+ while ((part = g_list_first (task->parts))) {
+ task->parts = g_list_remove_link (task->parts, part);
+ p = (struct mime_part *) part->data;
+ g_byte_array_free (p->content, TRUE);
+ g_list_free_1 (part);
+ }
+ if (task->text_parts) {
+ g_list_free (task->text_parts);
+ }
+ if (task->images) {
+ g_list_free (task->images);
+ }
+ if (task->messages) {
+ g_list_free (task->messages);
+ }
+ if (task->received) {
+ g_list_free (task->received);
+ }
+ memory_pool_delete (task->task_pool);
+ if (task->dispatcher) {
+ if (is_soft) {
+ /* Plan dispatcher shutdown */
+ task->dispatcher->wanna_die = 1;
+ }
+ else {
+ rspamd_remove_dispatcher (task->dispatcher);
+ }
+ }
+ if (task->sock != -1) {
+ close (task->sock);
+ }
+ g_slice_free1 (sizeof (struct worker_task), task);
+ }
+}
+
+void
+free_task_hard (gpointer ud)
+{
+ struct worker_task *task = ud;
+
+ free_task (task, FALSE);
+}
+
+void
+free_task_soft (gpointer ud)
+{
+ struct worker_task *task = ud;
+
+ free_task (task, FALSE);
+}
+
+double
+set_counter (const gchar *name, guint32 value)
+{
+ struct counter_data *cd;
+ double alpha;
+ gchar *key;
+
+ cd = rspamd_hash_lookup (counters, (gpointer) name);
+
+ if (cd == NULL) {
+ cd = memory_pool_alloc_shared (counters->pool, sizeof (struct counter_data));
+ cd->value = value;
+ cd->number = 0;
+ key = memory_pool_strdup_shared (counters->pool, name);
+ rspamd_hash_insert (counters, (gpointer) key, (gpointer) cd);
+ }
+ else {
+ /* Calculate new value */
+ memory_pool_wlock_rwlock (counters->lock);
+
+ alpha = 2. / (++cd->number + 1);
+ cd->value = cd->value * (1. - alpha) + value * alpha;
+
+ memory_pool_wunlock_rwlock (counters->lock);
+ }
+
+ return cd->value;
+}
+
#ifndef BUILD_STATIC
static void
fin_custom_filters (struct worker_task *task)
@@ -244,6 +412,7 @@ parse_line_custom (struct worker_task *task, f_str_t * in)
}
#endif
+
/*
* Callback that is called when there is data to read in buffer
*/