diff options
-rw-r--r-- | lib/CMakeLists.txt | 12 | ||||
-rw-r--r-- | lib/kvstorage/libkvstorageclient.c | 142 | ||||
-rw-r--r-- | lib/kvstorage/libkvstorageclient.h | 24 | ||||
-rw-r--r-- | src/main.h | 18 | ||||
-rw-r--r-- | src/statfile.c | 48 | ||||
-rw-r--r-- | src/statfile.h | 15 | ||||
-rw-r--r-- | src/util.c | 219 | ||||
-rw-r--r-- | src/util.h | 27 | ||||
-rw-r--r-- | src/worker.c | 169 |
9 files changed, 373 insertions, 301 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index f2aa2e5f2..5d52e1ab9 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -1,5 +1,8 @@ # Librspamdclient -SET(LIBRSPAMDSRC client/librspamdclient.c ../src/mem_pool.c ../src/upstream.c ../src/printf.c) +SET(LIBRSPAMDSRC client/librspamdclient.c + ../src/mem_pool.c + ../src/upstream.c + ../src/printf.c) ADD_LIBRARY(rspamdclient SHARED ${LIBRSPAMDSRC}) ADD_LIBRARY(rspamdclient_static STATIC ${LIBRSPAMDSRC}) @@ -88,7 +91,11 @@ TARGET_LINK_LIBRARIES(rspamdserver rspamd_cdb) # Libkvstorageclient -SET(LIBRKVSTORAGESRC kvstorage/libkvstorageclient.c ../src/mem_pool.c ../src/upstream.c ../src/printf.c ../src/util.c) +SET(LIBRKVSTORAGESRC kvstorage/libkvstorageclient.c + ../src/mem_pool.c + ../src/upstream.c + ../src/printf.c + ../src/util.c) ADD_LIBRARY(kvstorageclient SHARED ${LIBRKVSTORAGESRC}) ADD_LIBRARY(kvstorageclient_static STATIC ${LIBRKVSTORAGESRC}) @@ -102,6 +109,7 @@ ENDIF(CMAKE_COMPILER_IS_GNUCC) TARGET_LINK_LIBRARIES(kvstorageclient ${CMAKE_REQUIRED_LIBRARIES}) TARGET_LINK_LIBRARIES(kvstorageclient pcre) TARGET_LINK_LIBRARIES(kvstorageclient ${GLIB2_LIBRARIES}) +TARGET_LINK_LIBRARIES(kvstorageclient event) TARGET_LINK_LIBRARIES(kvstorageclient_static ${CMAKE_REQUIRED_LIBRARIES}) TARGET_LINK_LIBRARIES(kvstorageclient_static ${GLIB2_LIBRARIES}) diff --git a/lib/kvstorage/libkvstorageclient.c b/lib/kvstorage/libkvstorageclient.c index 4d3b17dd3..d05e8b5e0 100644 --- a/lib/kvstorage/libkvstorageclient.c +++ b/lib/kvstorage/libkvstorageclient.c @@ -23,11 +23,22 @@ #include "config.h" -#include "main.h" +#include "mem_pool.h" +#include "util.h" #include "libkvstorageclient.h" #define MAX_KV_LINE 1024 +#ifdef CRLF +#undef CRLF +#undef CR +#undef LF +#endif + +#define CRLF "\r\n" +#define CR '\r' +#define LF '\n' + struct kvstorage_buf { guint pos; guint len; @@ -242,12 +253,13 @@ rspamd_kvstorage_parse_reply_error (struct kvstorage_buf *buf) static enum rspamd_kvstorage_error rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *flags) { - guint8 *p, *c; + guint8 *p, *c, *end; gboolean error = TRUE; gchar *err_str; p = buf->data; - while (p - buf->data < buf->pos) { + end = buf->data + buf->pos; + while (p < end) { if (g_ascii_isspace (*p)) { error = FALSE; while (p - buf->data < buf->pos && g_ascii_isspace (*p)) { @@ -268,10 +280,11 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f /* Here we got key, flags and size items */ /* Skip key */ error = TRUE; - while (p - buf->data < buf->pos) { + while (p < end) { if (g_ascii_isspace (*p)) { error = FALSE; - while (p - buf->data < buf->pos && g_ascii_isspace (*p)) { + /* Skip spaces after key */ + while (p < end && g_ascii_isspace (*p)) { p ++; } break; @@ -285,9 +298,10 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f /* Read flags */ c = p; error = TRUE; - while (p - buf->data < buf->pos) { + while (p < end) { if (g_ascii_isspace (*p)) { error = FALSE; + /* Skip spaces after flags */ while (p - buf->data < buf->pos && g_ascii_isspace (*p)) { p ++; } @@ -308,16 +322,8 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f } /* Read len */ c = p; - error = TRUE; - while (p - buf->data < buf->pos) { - if (g_ascii_isspace (*p)) { - error = FALSE; - while (p - buf->data < buf->pos && g_ascii_isspace (*p)) { - p ++; - } - break; - } - else if (!g_ascii_isdigit (*p)) { + while (p < end) { + if (!g_ascii_isdigit (*p)) { break; } p ++; @@ -347,9 +353,9 @@ rspamd_kvstorage_connect_cb (int fd, short what, gpointer ud) cb (KVSTORAGE_ERROR_TIMEOUT, d->c, d->ud); } else { + d->c->state = KV_STATE_CONNECTED; cb (KVSTORAGE_ERROR_OK, d->c, d->ud); } - d->c->state = KV_STATE_CONNECTED; } static void @@ -364,7 +370,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) cb = (kvstorage_read_cb)d->c->read_cb; if (what == EV_TIMEOUT) { - cb (KVSTORAGE_ERROR_TIMEOUT, d->key, NULL, 0, d->c, d->ud); + cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, NULL, 0, d->c, d->ud); return; } if (d->c->state == KV_STATE_GET) { @@ -391,6 +397,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) } else if (r == 0) { /* We have written everything */ + d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool); d->c->state = KV_STATE_READ_ELT; event_del (&d->c->ev); event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d); @@ -403,7 +410,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) } else { /* Error occured during writing */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud); } } else if (d->c->state == KV_STATE_WRITE_DATA) { @@ -424,6 +431,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) else if (r == 0) { /* We have written everything */ d->c->state = KV_STATE_READ_ELT; + d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool); event_del (&d->c->ev); event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d); if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { @@ -435,7 +443,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) } else { /* Error occured during writing */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud); } } else if (d->c->state == KV_STATE_READ_ELT) { @@ -455,10 +463,11 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) else if (r == 0) { /* Got all data about elt */ if ((r = rspamd_kvstorage_parse_get_line (d->buf, &d->datalen, &flags)) != KVSTORAGE_ERROR_OK) { - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud); return; } rspamd_kvstorage_buf_drainline (d->buf); + /* Now allocate and read the data */ databuf = rspamd_kvstorage_buf_create (d->datalen, d->c->pool); memcpy (databuf->data, d->buf->data, d->buf->pos); @@ -475,7 +484,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) } else { /* Error occured during reading reply line */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud); } } else if (d->c->state == KV_STATE_READ_DATA) { @@ -486,7 +495,6 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) d->c->state = KV_STATE_READ_REPLY; /* Save databuf */ d->data = d->buf->data; - d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool); event_del (&d->c->ev); event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d); if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { @@ -509,7 +517,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) } else { /* Error occured */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud); } } else if (d->c->state == KV_STATE_READ_REPLY) { @@ -527,11 +535,12 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) } } else if (r == 0) { - cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->data, d->datalen, d->c, d->ud); + d->c->state = KV_STATE_CONNECTED; + cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->data, d->datalen, d->c, d->ud); } else { /* Error occured during reading reply line */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud); } } } @@ -547,7 +556,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud) cb = (kvstorage_write_cb)d->c->write_cb; if (what == EV_TIMEOUT) { - cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->c, d->ud); + cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, d->c, d->ud); return; } if (d->c->state == KV_STATE_SET) { @@ -574,6 +583,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud) } else if (r == 0) { /* We have written everything */ + d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool); d->c->state = KV_STATE_READ_REPLY; event_del (&d->c->ev); event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d); @@ -586,7 +596,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud) } else { /* Error occured during writing */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud); } } else if (d->c->state == KV_STATE_WRITE_DATA) { @@ -607,6 +617,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud) else if (r == 0) { /* We have written everything */ d->c->state = KV_STATE_READ_REPLY; + d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool); event_del (&d->c->ev); event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d); if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { @@ -618,7 +629,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud) } else { /* Error occured during writing */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud); } } else if (d->c->state == KV_STATE_READ_REPLY) { @@ -636,11 +647,12 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud) } } else if (r == 0) { - cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->c, d->ud); + d->c->state = KV_STATE_CONNECTED; + cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->c, d->ud); } else { /* Error occured during reading reply line */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud); } } } @@ -656,7 +668,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud) cb = (kvstorage_write_cb)d->c->write_cb; if (what == EV_TIMEOUT) { - cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->c, d->ud); + cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, d->c, d->ud); return; } if (d->c->state == KV_STATE_SET) { @@ -683,6 +695,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud) } else if (r == 0) { /* We have written everything */ + d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool); d->c->state = KV_STATE_READ_REPLY; event_del (&d->c->ev); event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d); @@ -695,7 +708,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud) } else { /* Error occured during writing */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud); } } else if (d->c->state == KV_STATE_WRITE_DATA) { @@ -716,6 +729,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud) else if (r == 0) { /* We have written everything */ d->c->state = KV_STATE_READ_REPLY; + d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool); event_del (&d->c->ev); event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d); if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { @@ -727,7 +741,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud) } else { /* Error occured during writing */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud); } } else if (d->c->state == KV_STATE_READ_REPLY) { @@ -745,11 +759,12 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud) } } else if (r == 0) { - cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->c, d->ud); + d->c->state = KV_STATE_CONNECTED; + cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->c, d->ud); } else { /* Error occured during reading reply line */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud); } } } @@ -773,7 +788,7 @@ rspamd_kvstorage_connect_async (const gchar *host, gint sock; /* Here we do NOT try to resolve hostname */ - if ((sock = make_universal_stream_socket (host, port, TRUE, FALSE, FALSE)) == -1) { + if ((sock = make_universal_stream_socket (host, port, TRUE, FALSE, TRUE)) == -1) { return KVSTORAGE_ERROR_SERVER_ERROR; } @@ -819,7 +834,7 @@ rspamd_kvstorage_connect_async (const gchar *host, */ enum rspamd_kvstorage_error rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn, - const gpointer key, kvstorage_read_cb cb, gpointer ud) + const gpointer key, guint keylen, kvstorage_read_cb cb, gpointer ud) { struct rspamd_kvstorage_async_data *d; @@ -833,7 +848,7 @@ rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn, d->c = conn; d->ud = ud; d->key = memory_pool_strdup (conn->pool, key); - d->keylen = strlen (d->key); + d->keylen = keylen; conn->state = KV_STATE_GET; /* Set event */ @@ -858,7 +873,8 @@ rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn, */ enum rspamd_kvstorage_error rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn, - const gpointer key, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb, gpointer ud) + const gpointer key, guint keylen, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb, + gpointer ud) { struct rspamd_kvstorage_async_data *d; @@ -872,7 +888,7 @@ rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn, d->c = conn; d->ud = ud; d->key = memory_pool_strdup (conn->pool, key); - d->keylen = strlen (d->key); + d->keylen = keylen; d->data = value; d->datalen = len; conn->state = KV_STATE_SET; @@ -898,7 +914,7 @@ rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn, */ enum rspamd_kvstorage_error rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn, - const gpointer key, kvstorage_write_cb cb, gpointer ud) + const gpointer key, guint keylen, kvstorage_write_cb cb, gpointer ud) { struct rspamd_kvstorage_async_data *d; @@ -912,7 +928,7 @@ rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn, d->c = conn; d->ud = ud; d->key = memory_pool_strdup (conn->pool, key); - d->keylen = strlen (d->key); + d->keylen = keylen; conn->state = KV_STATE_SET; /* Set event */ @@ -989,7 +1005,7 @@ rspamd_kvstorage_connect_sync (const gchar *host, */ enum rspamd_kvstorage_error rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn, - const gpointer key, gpointer **value, guint *len) + const gpointer key, guint keylen, gpointer **value, guint *len) { struct kvstorage_buf *buf, *databuf; gint r; @@ -1001,7 +1017,7 @@ rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn, buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, conn->pool); - r = rspamd_snprintf (buf->data, buf->len, "get %s" CRLF, key); + r = rspamd_snprintf (buf->data, buf->len, "get %*s" CRLF, keylen, key); buf->len = r; while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) { poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT); @@ -1051,17 +1067,16 @@ rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn, */ enum rspamd_kvstorage_error rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn, - const gpointer key, const gpointer value, gsize len, guint expire) + const gpointer key, guint keylen, const gpointer value, gsize len, guint expire) { struct kvstorage_buf *buf; - gint r, keylen, buflen; + gint r, buflen; if (conn == NULL || conn->state != KV_STATE_CONNECTED) { return KVSTORAGE_ERROR_INTERNAL_ERROR; } /* Create buf */ - keylen = strlen (key); buflen = len + keylen + sizeof ("set 4294967296 4294967296 4294967296" CRLF); buf = rspamd_kvstorage_buf_create (buflen, conn->pool); @@ -1091,17 +1106,16 @@ rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn, */ enum rspamd_kvstorage_error rspamd_kvstorage_delete_sync (struct rspamd_kvstorage_connection *conn, - const gpointer key) + const gpointer key, guint keylen) { struct kvstorage_buf *buf; - gint r, keylen, buflen; + gint r, buflen; if (conn == NULL || conn->state != KV_STATE_CONNECTED) { return KVSTORAGE_ERROR_INTERNAL_ERROR; } /* Create buf */ - keylen = strlen (key); buflen = MAX (keylen + sizeof ("delete " CRLF), MAX_KV_LINE); buf = rspamd_kvstorage_buf_create (buflen, conn->pool); @@ -1137,3 +1151,29 @@ rspamd_kvstorage_close_sync (struct rspamd_kvstorage_connection *conn) return KVSTORAGE_ERROR_OK; } + +const gchar* +rspamd_kvstorage_strerror (enum rspamd_kvstorage_error err) +{ + switch (err) { + case KVSTORAGE_ERROR_OK: + return "operation completed"; + case KVSTORAGE_ERROR_TIMEOUT: + return "operation timeout"; + case KVSTORAGE_ERROR_NOT_FOUND: + return "key not found"; + case KVSTORAGE_ERROR_NOT_STORED: + return "key not stored"; + case KVSTORAGE_ERROR_EXISTS: + return "key exists"; + case KVSTORAGE_ERROR_SERVER_ERROR: + return "server error"; + case KVSTORAGE_ERROR_CLIENT_ERROR: + return "client error"; + case KVSTORAGE_ERROR_INTERNAL_ERROR: + return "library error"; + } + + /* Not reached */ + return "unknown error"; +} diff --git a/lib/kvstorage/libkvstorageclient.h b/lib/kvstorage/libkvstorageclient.h index 262c5e89e..11145fd49 100644 --- a/lib/kvstorage/libkvstorageclient.h +++ b/lib/kvstorage/libkvstorageclient.h @@ -26,6 +26,7 @@ #define LIBKVSTORAGECLIENT_H_ #include <glib.h> +#include <sys/time.h> /* Errors */ enum rspamd_kvstorage_error { @@ -45,10 +46,11 @@ struct rspamd_kvstorage_connection; /* Callbacks for async API */ typedef void (*kvstorage_connect_cb) (enum rspamd_kvstorage_error code, struct rspamd_kvstorage_connection *conn, gpointer user_data); -typedef void (*kvstorage_read_cb) (enum rspamd_kvstorage_error code, const gpointer key, +typedef void (*kvstorage_read_cb) (enum rspamd_kvstorage_error code, const gpointer key, guint keylen, const gpointer value, gsize datalen, struct rspamd_kvstorage_connection *conn, gpointer user_data); -typedef void (*kvstorage_write_cb) (enum rspamd_kvstorage_error code, const gpointer key, struct rspamd_kvstorage_connection *conn, +typedef void (*kvstorage_write_cb) (enum rspamd_kvstorage_error code, const gpointer key, guint keylen, + struct rspamd_kvstorage_connection *conn, gpointer user_data); /* Asynced API */ @@ -74,7 +76,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_connect_async (const gchar *host, * @param ud user data for callback */ enum rspamd_kvstorage_error rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn, - const gpointer key, kvstorage_read_cb cb, gpointer ud); + const gpointer key, guint keylen, kvstorage_read_cb cb, gpointer ud); /** * Write key asynced @@ -85,7 +87,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_get_async (struct rspamd_kvstorage_ * @param ud user data for callback */ enum rspamd_kvstorage_error rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn, - const gpointer key, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb, gpointer ud); + const gpointer key, guint keylen, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb, gpointer ud); /** * Delete key asynced @@ -95,7 +97,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_set_async (struct rspamd_kvstorage_ * @param ud user data for callback */ enum rspamd_kvstorage_error rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn, - const gpointer key, kvstorage_write_cb cb, gpointer ud); + const gpointer key, guint keylen, kvstorage_write_cb cb, gpointer ud); /** * Close connection @@ -122,7 +124,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_connect_sync (const gchar *host, * @param value value readed */ enum rspamd_kvstorage_error rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn, - const gpointer key, gpointer **value, guint *len); + const gpointer key, guint keylen, gpointer **value, guint *len); /** * Write key synced @@ -131,7 +133,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_get_sync (struct rspamd_kvstorage_c * @param value data to write */ enum rspamd_kvstorage_error rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn, - const gpointer key, const gpointer value, gsize len, guint expire); + const gpointer key, guint keylen, const gpointer value, gsize len, guint expire); /** * Delete key synced @@ -139,7 +141,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_set_sync (struct rspamd_kvstorage_c * @param key key to delete */ enum rspamd_kvstorage_error rspamd_kvstorage_delete_sync (struct rspamd_kvstorage_connection *conn, - const gpointer key); + const gpointer key, guint keylen); /** * Close connection @@ -147,4 +149,10 @@ enum rspamd_kvstorage_error rspamd_kvstorage_delete_sync (struct rspamd_kvstorag */ enum rspamd_kvstorage_error rspamd_kvstorage_close_sync (struct rspamd_kvstorage_connection *conn); +/** + * Convert error code to string + * @param err error code to be converted + */ +const gchar* rspamd_kvstorage_strerror (enum rspamd_kvstorage_error err); + #endif /* LIBKVSTORAGECLIENT_H_ */ diff --git a/src/main.h b/src/main.h index 7ff76afb3..59a5f3c30 100644 --- a/src/main.h +++ b/src/main.h @@ -292,6 +292,24 @@ gpointer init_workers_ctx (enum process_type type); */ extern struct rspamd_main *rspamd_main; +/* Worker task manipulations */ + +/** + * Construct new task for worker + */ +struct worker_task* construct_task (struct rspamd_worker *worker); +/** + * Destroy task object and remove its IO dispatcher if it exists + */ +void free_task (struct worker_task *task, gboolean is_soft); +void free_task_hard (gpointer ud); +void free_task_soft (gpointer ud); + +/** + * Set counter for a symbol + */ +double set_counter (const gchar *name, guint32 value); + #endif /* diff --git a/src/statfile.c b/src/statfile.c index 26d96a38c..d9d42d983 100644 --- a/src/statfile.c +++ b/src/statfile.c @@ -891,3 +891,51 @@ statfile_pool_plan_invalidate (statfile_pool_t *pool, time_t seconds, time_t jit msg_info ("invalidate of statfile pool is planned in %d seconds", (gint)pool->invalidate_tv.tv_sec); } } + + +stat_file_t * +get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf, + const gchar *symbol, struct statfile **st, gboolean try_create) +{ + stat_file_t *res = NULL; + GList *cur; + + if (pool == NULL || ccf == NULL || symbol == NULL) { + msg_err ("invalid input arguments"); + return NULL; + } + + cur = g_list_first (ccf->statfiles); + while (cur) { + *st = cur->data; + if (strcmp (symbol, (*st)->symbol) == 0) { + break; + } + *st = NULL; + cur = g_list_next (cur); + } + if (*st == NULL) { + msg_info ("cannot find statfile with symbol %s", symbol); + return NULL; + } + + if ((res = statfile_pool_is_open (pool, (*st)->path)) == NULL) { + if ((res = statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE)) == NULL) { + msg_warn ("cannot open %s", (*st)->path); + if (try_create) { + if (statfile_pool_create (pool, (*st)->path, (*st)->size) == -1) { + msg_err ("cannot create statfile %s", (*st)->path); + return NULL; + } + res = statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE); + if (res == NULL) { + msg_err ("cannot open statfile %s after creation", (*st)->path); + } + } + } + } + + return res; +} + + diff --git a/src/statfile.h b/src/statfile.h index e76aa0897..edb58e5db 100644 --- a/src/statfile.h +++ b/src/statfile.h @@ -96,6 +96,10 @@ typedef struct statfile_pool_s { struct timeval invalidate_tv; } statfile_pool_t; +/* Forwarded declarations */ +struct classifier_config; +struct statfile; + /** * Create new statfile pool * @param max_size maximum size @@ -261,4 +265,15 @@ guint64 statfile_get_total_blocks (stat_file_t *file); */ void statfile_pool_plan_invalidate (statfile_pool_t *pool, time_t seconds, time_t jitter); +/** + * Get a statfile by symbol + * @param pool pool object + * @param ccf ccf classifier config + * @param symbol symbol to search + * @param st statfile to get + * @param try_create whether we need to create statfile if it is absent + */ +stat_file_t* get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf, + const gchar *symbol, struct statfile **st, gboolean try_create); + #endif diff --git a/src/util.c b/src/util.c index 4d2665f6b..2efd8f0d5 100644 --- a/src/util.c +++ b/src/util.c @@ -38,8 +38,6 @@ /* Default connect timeout for sync sockets */ #define CONNECT_TIMEOUT 3 -rspamd_hash_t *counters = NULL; - static gchar* rspamd_sprintf_num (gchar *buf, gchar *last, guint64 ui64, gchar zero, guint hexadecimal, guint width); gint @@ -939,35 +937,6 @@ calculate_check_time (struct timeval *begin, gint resolution) return (const gchar *)res; } -double -set_counter (const gchar *name, guint32 value) -{ - struct counter_data *cd; - double alpha; - gchar *key; - - cd = rspamd_hash_lookup (counters, (gpointer) name); - - if (cd == NULL) { - cd = memory_pool_alloc_shared (counters->pool, sizeof (struct counter_data)); - cd->value = value; - cd->number = 0; - key = memory_pool_strdup_shared (counters->pool, name); - rspamd_hash_insert (counters, (gpointer) key, (gpointer) cd); - } - else { - /* Calculate new value */ - memory_pool_wlock_rwlock (counters->lock); - - alpha = 2. / (++cd->number + 1); - cd->value = cd->value * (1. - alpha) + value * alpha; - - memory_pool_wunlock_rwlock (counters->lock); - } - - return cd->value; -} - #ifndef g_tolower # define g_tolower(x) (((x) >= 'A' && (x) <= 'Z') ? (x) - 'A' + 'a' : (x)) #endif @@ -1147,52 +1116,6 @@ unlock_file (gint fd, gboolean async) } #endif /* HAVE_FLOCK */ -stat_file_t * -get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf, - const gchar *symbol, struct statfile **st, gboolean try_create) -{ - stat_file_t *res = NULL; - GList *cur; - - if (pool == NULL || ccf == NULL || symbol == NULL) { - msg_err ("invalid input arguments"); - return NULL; - } - - cur = g_list_first (ccf->statfiles); - while (cur) { - *st = cur->data; - if (strcmp (symbol, (*st)->symbol) == 0) { - break; - } - *st = NULL; - cur = g_list_next (cur); - } - if (*st == NULL) { - msg_info ("cannot find statfile with symbol %s", symbol); - return NULL; - } - - if ((res = statfile_pool_is_open (pool, (*st)->path)) == NULL) { - if ((res = statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE)) == NULL) { - msg_warn ("cannot open %s", (*st)->path); - if (try_create) { - if (statfile_pool_create (pool, (*st)->path, (*st)->size) == -1) { - msg_err ("cannot create statfile %s", (*st)->path); - return NULL; - } - res = statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE); - if (res == NULL) { - msg_err ("cannot open statfile %s after creation", (*st)->path); - } - } - } - } - - return res; -} - - #if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION < 22)) void @@ -1276,22 +1199,8 @@ str_to_process (const gchar *str) return TYPE_UNKNOWN; } -/* - * Destructor for recipients list - */ -static void -rcpt_destruct (void *pointer) -{ - struct worker_task *task = (struct worker_task *) pointer; - - if (task->rcpt) { - g_list_free (task->rcpt); - } -} - - /* Compare two emails for building emails tree */ -static gint +gint compare_email_func (gconstpointer a, gconstpointer b) { const struct uri *u1 = a, *u2 = b; @@ -1317,7 +1226,7 @@ compare_email_func (gconstpointer a, gconstpointer b) return 0; } -static gint +gint compare_url_func (gconstpointer a, gconstpointer b) { const struct uri *u1 = a, *u2 = b; @@ -1333,130 +1242,6 @@ compare_url_func (gconstpointer a, gconstpointer b) return r; } -/* - * Create new task - */ -struct worker_task * -construct_task (struct rspamd_worker *worker) -{ - struct worker_task *new_task; - - new_task = g_slice_alloc0 (sizeof (struct worker_task)); - - new_task->worker = worker; - new_task->state = READ_COMMAND; - new_task->cfg = worker->srv->cfg; - new_task->from_addr.s_addr = INADDR_NONE; - new_task->view_checked = FALSE; -#ifdef HAVE_CLOCK_GETTIME -# ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID - clock_gettime (CLOCK_PROCESS_CPUTIME_ID, &new_task->ts); -# elif defined(HAVE_CLOCK_VIRTUAL) - clock_gettime (CLOCK_VIRTUAL, &new_task->ts); -# else - clock_gettime (CLOCK_REALTIME, &new_task->ts); -# endif -#endif - if (gettimeofday (&new_task->tv, NULL) == -1) { - msg_warn ("gettimeofday failed: %s", strerror (errno)); - } - - new_task->task_pool = memory_pool_new (memory_pool_get_size ()); - - /* Add destructor for recipients list (it would be better to use anonymous function here */ - memory_pool_add_destructor (new_task->task_pool, - (pool_destruct_func) rcpt_destruct, new_task); - new_task->results = g_hash_table_new (g_str_hash, g_str_equal); - memory_pool_add_destructor (new_task->task_pool, - (pool_destruct_func) g_hash_table_destroy, - new_task->results); - new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal); - memory_pool_add_destructor (new_task->task_pool, - (pool_destruct_func) g_hash_table_destroy, - new_task->re_cache); - new_task->raw_headers = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal); - memory_pool_add_destructor (new_task->task_pool, - (pool_destruct_func) g_hash_table_destroy, - new_task->raw_headers); - new_task->emails = g_tree_new (compare_email_func); - memory_pool_add_destructor (new_task->task_pool, - (pool_destruct_func) g_tree_destroy, - new_task->emails); - new_task->urls = g_tree_new (compare_url_func); - memory_pool_add_destructor (new_task->task_pool, - (pool_destruct_func) g_tree_destroy, - new_task->urls); - new_task->s = - new_async_session (new_task->task_pool, free_task_hard, new_task); - new_task->sock = -1; - new_task->is_mime = TRUE; - - return new_task; -} - - -/* - * Free all structures of worker_task - */ -void -free_task (struct worker_task *task, gboolean is_soft) -{ - GList *part; - struct mime_part *p; - - if (task) { - debug_task ("free pointer %p", task); - while ((part = g_list_first (task->parts))) { - task->parts = g_list_remove_link (task->parts, part); - p = (struct mime_part *) part->data; - g_byte_array_free (p->content, TRUE); - g_list_free_1 (part); - } - if (task->text_parts) { - g_list_free (task->text_parts); - } - if (task->images) { - g_list_free (task->images); - } - if (task->messages) { - g_list_free (task->messages); - } - if (task->received) { - g_list_free (task->received); - } - memory_pool_delete (task->task_pool); - if (task->dispatcher) { - if (is_soft) { - /* Plan dispatcher shutdown */ - task->dispatcher->wanna_die = 1; - } - else { - rspamd_remove_dispatcher (task->dispatcher); - } - } - if (task->sock != -1) { - close (task->sock); - } - g_slice_free1 (sizeof (struct worker_task), task); - } -} - -void -free_task_hard (gpointer ud) -{ - struct worker_task *task = ud; - - free_task (task, FALSE); -} - -void -free_task_soft (gpointer ud) -{ - struct worker_task *task = ud; - - free_task (task, FALSE); -} - gchar * escape_braces_addr_fstr (memory_pool_t *pool, f_str_t *in) { diff --git a/src/util.h b/src/util.h index bbe49db67..95d5703bd 100644 --- a/src/util.h +++ b/src/util.h @@ -149,11 +149,6 @@ const gchar* calculate_check_time (struct timeval *begin, gint resolution); #endif /* - * Set counter for a symbol - */ -double set_counter (const gchar *name, guint32 value); - -/* * File locking functions */ gboolean lock_file (gint fd, gboolean async); @@ -177,12 +172,6 @@ gboolean fstr_strcase_equal (gconstpointer v, gconstpointer v2); void gperf_profiler_init (struct config_file *cfg, const gchar *descr); /* - * Get a statfile by symbol - */ -stat_file_t* get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf, - const gchar *symbol, struct statfile **st, gboolean try_create); - -/* * Workaround for older versions of glib */ #if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION < 22)) @@ -228,19 +217,11 @@ gchar * escape_braces_addr_fstr (memory_pool_t *pool, f_str_t *in); #define msec_to_tv(msec, tv) do { (tv)->tv_sec = (msec) / 1000; (tv)->tv_usec = ((msec) - (tv)->tv_sec * 1000) * 1000; } while(0) #define tv_to_msec(tv) (tv)->tv_sec * 1000 + (tv)->tv_usec / 1000 -struct worker_task; -struct rspamd_worker; +/* Compare two emails for building emails tree */ +gint compare_email_func (gconstpointer a, gconstpointer b); -/** - * Construct new task for worker - */ -struct worker_task* construct_task (struct rspamd_worker *worker); -/** - * Destroy task object and remove its IO dispatcher if it exists - */ -void free_task (struct worker_task *task, gboolean is_soft); -void free_task_hard (gpointer ud); -void free_task_soft (gpointer ud); +/* Compare two urls for building emails tree */ +gint compare_url_func (gconstpointer a, gconstpointer b); /* * Find string find in string s ignoring case diff --git a/src/worker.c b/src/worker.c index d1aeea859..24cb0dd69 100644 --- a/src/worker.c +++ b/src/worker.c @@ -98,6 +98,8 @@ static gboolean write_socket (void *arg); static sig_atomic_t wanna_die = 0; +rspamd_hash_t *counters = NULL; + #ifndef HAVE_SA_SIGINFO static void sig_handler (gint signo) @@ -160,6 +162,172 @@ sigusr1_handler (gint fd, short what, void *arg) return; } +/* + * Destructor for recipients list in a task + */ +static void +rcpt_destruct (void *pointer) +{ + struct worker_task *task = (struct worker_task *) pointer; + + if (task->rcpt) { + g_list_free (task->rcpt); + } +} + +/* + * Create new task + */ +struct worker_task * +construct_task (struct rspamd_worker *worker) +{ + struct worker_task *new_task; + + new_task = g_slice_alloc0 (sizeof (struct worker_task)); + + new_task->worker = worker; + new_task->state = READ_COMMAND; + new_task->cfg = worker->srv->cfg; + new_task->from_addr.s_addr = INADDR_NONE; + new_task->view_checked = FALSE; +#ifdef HAVE_CLOCK_GETTIME +# ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID + clock_gettime (CLOCK_PROCESS_CPUTIME_ID, &new_task->ts); +# elif defined(HAVE_CLOCK_VIRTUAL) + clock_gettime (CLOCK_VIRTUAL, &new_task->ts); +# else + clock_gettime (CLOCK_REALTIME, &new_task->ts); +# endif +#endif + if (gettimeofday (&new_task->tv, NULL) == -1) { + msg_warn ("gettimeofday failed: %s", strerror (errno)); + } + + new_task->task_pool = memory_pool_new (memory_pool_get_size ()); + + /* Add destructor for recipients list (it would be better to use anonymous function here */ + memory_pool_add_destructor (new_task->task_pool, + (pool_destruct_func) rcpt_destruct, new_task); + new_task->results = g_hash_table_new (g_str_hash, g_str_equal); + memory_pool_add_destructor (new_task->task_pool, + (pool_destruct_func) g_hash_table_destroy, + new_task->results); + new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal); + memory_pool_add_destructor (new_task->task_pool, + (pool_destruct_func) g_hash_table_destroy, + new_task->re_cache); + new_task->raw_headers = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal); + memory_pool_add_destructor (new_task->task_pool, + (pool_destruct_func) g_hash_table_destroy, + new_task->raw_headers); + new_task->emails = g_tree_new (compare_email_func); + memory_pool_add_destructor (new_task->task_pool, + (pool_destruct_func) g_tree_destroy, + new_task->emails); + new_task->urls = g_tree_new (compare_url_func); + memory_pool_add_destructor (new_task->task_pool, + (pool_destruct_func) g_tree_destroy, + new_task->urls); + new_task->s = + new_async_session (new_task->task_pool, free_task_hard, new_task); + new_task->sock = -1; + new_task->is_mime = TRUE; + + return new_task; +} + + +/* + * Free all structures of worker_task + */ +void +free_task (struct worker_task *task, gboolean is_soft) +{ + GList *part; + struct mime_part *p; + + if (task) { + debug_task ("free pointer %p", task); + while ((part = g_list_first (task->parts))) { + task->parts = g_list_remove_link (task->parts, part); + p = (struct mime_part *) part->data; + g_byte_array_free (p->content, TRUE); + g_list_free_1 (part); + } + if (task->text_parts) { + g_list_free (task->text_parts); + } + if (task->images) { + g_list_free (task->images); + } + if (task->messages) { + g_list_free (task->messages); + } + if (task->received) { + g_list_free (task->received); + } + memory_pool_delete (task->task_pool); + if (task->dispatcher) { + if (is_soft) { + /* Plan dispatcher shutdown */ + task->dispatcher->wanna_die = 1; + } + else { + rspamd_remove_dispatcher (task->dispatcher); + } + } + if (task->sock != -1) { + close (task->sock); + } + g_slice_free1 (sizeof (struct worker_task), task); + } +} + +void +free_task_hard (gpointer ud) +{ + struct worker_task *task = ud; + + free_task (task, FALSE); +} + +void +free_task_soft (gpointer ud) +{ + struct worker_task *task = ud; + + free_task (task, FALSE); +} + +double +set_counter (const gchar *name, guint32 value) +{ + struct counter_data *cd; + double alpha; + gchar *key; + + cd = rspamd_hash_lookup (counters, (gpointer) name); + + if (cd == NULL) { + cd = memory_pool_alloc_shared (counters->pool, sizeof (struct counter_data)); + cd->value = value; + cd->number = 0; + key = memory_pool_strdup_shared (counters->pool, name); + rspamd_hash_insert (counters, (gpointer) key, (gpointer) cd); + } + else { + /* Calculate new value */ + memory_pool_wlock_rwlock (counters->lock); + + alpha = 2. / (++cd->number + 1); + cd->value = cd->value * (1. - alpha) + value * alpha; + + memory_pool_wunlock_rwlock (counters->lock); + } + + return cd->value; +} + #ifndef BUILD_STATIC static void fin_custom_filters (struct worker_task *task) @@ -244,6 +412,7 @@ parse_line_custom (struct worker_task *task, f_str_t * in) } #endif + /* * Callback that is called when there is data to read in buffer */ |