]> source.dussan.org Git - rspamd.git/commitdiff
Reorganize util.c as it should contain only common utilities.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 7 Dec 2011 16:06:41 +0000 (19:06 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 7 Dec 2011 16:06:41 +0000 (19:06 +0300)
Libkvstorage client is now deprecated as I plan to replace it with hiredis library.

lib/CMakeLists.txt
lib/kvstorage/libkvstorageclient.c
lib/kvstorage/libkvstorageclient.h
src/main.h
src/statfile.c
src/statfile.h
src/util.c
src/util.h
src/worker.c

index f2aa2e5f2ae54e72900050cb3608ed1b9b8ddaa5..5d52e1ab974ea4a53663dd4cbb98ca6a67c93b68 100644 (file)
@@ -1,5 +1,8 @@
 # Librspamdclient
-SET(LIBRSPAMDSRC                         client/librspamdclient.c ../src/mem_pool.c ../src/upstream.c ../src/printf.c)
+SET(LIBRSPAMDSRC                         client/librspamdclient.c 
+                                                         ../src/mem_pool.c 
+                                                         ../src/upstream.c 
+                                                         ../src/printf.c)
 
 ADD_LIBRARY(rspamdclient SHARED ${LIBRSPAMDSRC})
 ADD_LIBRARY(rspamdclient_static STATIC ${LIBRSPAMDSRC})
@@ -88,7 +91,11 @@ TARGET_LINK_LIBRARIES(rspamdserver rspamd_cdb)
 
 
 # Libkvstorageclient
-SET(LIBRKVSTORAGESRC                     kvstorage/libkvstorageclient.c ../src/mem_pool.c ../src/upstream.c ../src/printf.c ../src/util.c)
+SET(LIBRKVSTORAGESRC                     kvstorage/libkvstorageclient.c 
+                                                                 ../src/mem_pool.c 
+                                                                 ../src/upstream.c 
+                                                                 ../src/printf.c 
+                                                                 ../src/util.c)
 
 ADD_LIBRARY(kvstorageclient SHARED ${LIBRKVSTORAGESRC})
 ADD_LIBRARY(kvstorageclient_static STATIC ${LIBRKVSTORAGESRC})
@@ -102,6 +109,7 @@ ENDIF(CMAKE_COMPILER_IS_GNUCC)
 TARGET_LINK_LIBRARIES(kvstorageclient ${CMAKE_REQUIRED_LIBRARIES})
 TARGET_LINK_LIBRARIES(kvstorageclient pcre)
 TARGET_LINK_LIBRARIES(kvstorageclient ${GLIB2_LIBRARIES})
+TARGET_LINK_LIBRARIES(kvstorageclient event)
 
 TARGET_LINK_LIBRARIES(kvstorageclient_static ${CMAKE_REQUIRED_LIBRARIES})
 TARGET_LINK_LIBRARIES(kvstorageclient_static ${GLIB2_LIBRARIES})
index 4d3b17dd3ea8bf473898e114fb825f5b351d4a35..d05e8b5e05ebb1139ca8c2ad5b13ecc533f0eae9 100644 (file)
 
 
 #include "config.h"
-#include "main.h"
+#include "mem_pool.h"
+#include "util.h"
 #include "libkvstorageclient.h"
 
 #define MAX_KV_LINE 1024
 
+#ifdef CRLF
+#undef CRLF
+#undef CR
+#undef LF
+#endif
+
+#define CRLF "\r\n"
+#define CR '\r'
+#define LF '\n'
+
 struct kvstorage_buf {
        guint pos;
        guint len;
@@ -242,12 +253,13 @@ rspamd_kvstorage_parse_reply_error (struct kvstorage_buf *buf)
 static enum rspamd_kvstorage_error
 rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *flags)
 {
-       guint8                                                             *p, *c;
+       guint8                                                             *p, *c, *end;
        gboolean                                error = TRUE;
        gchar                                  *err_str;
 
        p = buf->data;
-       while (p - buf->data < buf->pos) {
+       end = buf->data + buf->pos;
+       while (p < end) {
                if (g_ascii_isspace (*p)) {
                        error = FALSE;
                        while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
@@ -268,10 +280,11 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f
        /* Here we got key, flags and size items */
        /* Skip key */
        error = TRUE;
-       while (p - buf->data < buf->pos) {
+       while (p < end) {
                if (g_ascii_isspace (*p)) {
                        error = FALSE;
-                       while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
+                       /* Skip spaces after key */
+                       while (p < end && g_ascii_isspace (*p)) {
                                p ++;
                        }
                        break;
@@ -285,9 +298,10 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f
        /* Read flags */
        c = p;
        error = TRUE;
-       while (p - buf->data < buf->pos) {
+       while (p < end) {
                if (g_ascii_isspace (*p)) {
                        error = FALSE;
+                       /* Skip spaces after flags */
                        while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
                                p ++;
                        }
@@ -308,16 +322,8 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f
        }
        /* Read len */
        c = p;
-       error = TRUE;
-       while (p - buf->data < buf->pos) {
-               if (g_ascii_isspace (*p)) {
-                       error = FALSE;
-                       while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
-                               p ++;
-                       }
-                       break;
-               }
-               else if (!g_ascii_isdigit (*p)) {
+       while (p < end) {
+               if (!g_ascii_isdigit (*p)) {
                        break;
                }
                p ++;
@@ -347,9 +353,9 @@ rspamd_kvstorage_connect_cb (int fd, short what, gpointer ud)
                cb (KVSTORAGE_ERROR_TIMEOUT, d->c, d->ud);
        }
        else {
+               d->c->state = KV_STATE_CONNECTED;
                cb (KVSTORAGE_ERROR_OK, d->c, d->ud);
        }
-       d->c->state = KV_STATE_CONNECTED;
 }
 
 static void
@@ -364,7 +370,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
        cb = (kvstorage_read_cb)d->c->read_cb;
 
        if (what == EV_TIMEOUT) {
-               cb (KVSTORAGE_ERROR_TIMEOUT, d->key, NULL, 0, d->c, d->ud);
+               cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, NULL, 0, d->c, d->ud);
                return;
        }
        if (d->c->state == KV_STATE_GET) {
@@ -391,6 +397,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
                }
                else if (r == 0) {
                        /* We have written everything */
+                       d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
                        d->c->state = KV_STATE_READ_ELT;
                        event_del (&d->c->ev);
                        event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
@@ -403,7 +410,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
                }
                else {
                        /* Error occured during writing */
-                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
                }
        }
        else if (d->c->state == KV_STATE_WRITE_DATA) {
@@ -424,6 +431,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
                else if (r == 0) {
                        /* We have written everything */
                        d->c->state = KV_STATE_READ_ELT;
+                       d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
                        event_del (&d->c->ev);
                        event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
                        if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
@@ -435,7 +443,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
                }
                else {
                        /* Error occured during writing */
-                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
                }
        }
        else if (d->c->state == KV_STATE_READ_ELT) {
@@ -455,10 +463,11 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
                else if (r == 0) {
                        /* Got all data about elt */
                        if ((r = rspamd_kvstorage_parse_get_line (d->buf, &d->datalen, &flags)) != KVSTORAGE_ERROR_OK) {
-                               cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+                               cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
                                return;
                        }
                        rspamd_kvstorage_buf_drainline (d->buf);
+
                        /* Now allocate and read the data */
                        databuf = rspamd_kvstorage_buf_create (d->datalen, d->c->pool);
                        memcpy (databuf->data, d->buf->data, d->buf->pos);
@@ -475,7 +484,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
                }
                else {
                        /* Error occured during reading reply line */
-                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
                }
        }
        else if (d->c->state == KV_STATE_READ_DATA) {
@@ -486,7 +495,6 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
                        d->c->state = KV_STATE_READ_REPLY;
                        /* Save databuf */
                        d->data = d->buf->data;
-                       d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
                        event_del (&d->c->ev);
                        event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
                        if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
@@ -509,7 +517,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
                }
                else {
                        /* Error occured */
-                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
                }
        }
        else if (d->c->state == KV_STATE_READ_REPLY) {
@@ -527,11 +535,12 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
                        }
                }
                else if (r == 0) {
-                       cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->data, d->datalen, d->c, d->ud);
+                       d->c->state = KV_STATE_CONNECTED;
+                       cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->data, d->datalen, d->c, d->ud);
                }
                else {
                        /* Error occured during reading reply line */
-                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
                }
        }
 }
@@ -547,7 +556,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
        cb = (kvstorage_write_cb)d->c->write_cb;
 
        if (what == EV_TIMEOUT) {
-               cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->c, d->ud);
+               cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, d->c, d->ud);
                return;
        }
        if (d->c->state == KV_STATE_SET) {
@@ -574,6 +583,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
                }
                else if (r == 0) {
                        /* We have written everything */
+                       d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
                        d->c->state = KV_STATE_READ_REPLY;
                        event_del (&d->c->ev);
                        event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d);
@@ -586,7 +596,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
                }
                else {
                        /* Error occured during writing */
-                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
                }
        }
        else if (d->c->state == KV_STATE_WRITE_DATA) {
@@ -607,6 +617,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
                else if (r == 0) {
                        /* We have written everything */
                        d->c->state = KV_STATE_READ_REPLY;
+                       d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
                        event_del (&d->c->ev);
                        event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d);
                        if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
@@ -618,7 +629,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
                }
                else {
                        /* Error occured during writing */
-                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
                }
        }
        else if (d->c->state == KV_STATE_READ_REPLY) {
@@ -636,11 +647,12 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
                        }
                }
                else if (r == 0) {
-                       cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->c, d->ud);
+                       d->c->state = KV_STATE_CONNECTED;
+                       cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->c, d->ud);
                }
                else {
                        /* Error occured during reading reply line */
-                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
                }
        }
 }
@@ -656,7 +668,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
        cb = (kvstorage_write_cb)d->c->write_cb;
 
        if (what == EV_TIMEOUT) {
-               cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->c, d->ud);
+               cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, d->c, d->ud);
                return;
        }
        if (d->c->state == KV_STATE_SET) {
@@ -683,6 +695,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
                }
                else if (r == 0) {
                        /* We have written everything */
+                       d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
                        d->c->state = KV_STATE_READ_REPLY;
                        event_del (&d->c->ev);
                        event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d);
@@ -695,7 +708,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
                }
                else {
                        /* Error occured during writing */
-                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
                }
        }
        else if (d->c->state == KV_STATE_WRITE_DATA) {
@@ -716,6 +729,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
                else if (r == 0) {
                        /* We have written everything */
                        d->c->state = KV_STATE_READ_REPLY;
+                       d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
                        event_del (&d->c->ev);
                        event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d);
                        if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
@@ -727,7 +741,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
                }
                else {
                        /* Error occured during writing */
-                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
                }
        }
        else if (d->c->state == KV_STATE_READ_REPLY) {
@@ -745,11 +759,12 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
                        }
                }
                else if (r == 0) {
-                       cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->c, d->ud);
+                       d->c->state = KV_STATE_CONNECTED;
+                       cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->c, d->ud);
                }
                else {
                        /* Error occured during reading reply line */
-                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
                }
        }
 }
@@ -773,7 +788,7 @@ rspamd_kvstorage_connect_async (const gchar *host,
        gint                                                                     sock;
 
        /* Here we do NOT try to resolve hostname */
-       if ((sock = make_universal_stream_socket (host, port, TRUE, FALSE, FALSE)) == -1) {
+       if ((sock = make_universal_stream_socket (host, port, TRUE, FALSE, TRUE)) == -1) {
                return KVSTORAGE_ERROR_SERVER_ERROR;
        }
 
@@ -819,7 +834,7 @@ rspamd_kvstorage_connect_async (const gchar *host,
  */
 enum rspamd_kvstorage_error
 rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn,
-               const gpointer key, kvstorage_read_cb cb, gpointer ud)
+               const gpointer key, guint keylen, kvstorage_read_cb cb, gpointer ud)
 {
        struct rspamd_kvstorage_async_data              *d;
 
@@ -833,7 +848,7 @@ rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn,
                d->c = conn;
                d->ud = ud;
                d->key = memory_pool_strdup (conn->pool, key);
-               d->keylen = strlen (d->key);
+               d->keylen = keylen;
                conn->state = KV_STATE_GET;
 
                /* Set event */
@@ -858,7 +873,8 @@ rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn,
  */
 enum rspamd_kvstorage_error
 rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn,
-               const gpointer key, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb, gpointer ud)
+               const gpointer key, guint keylen, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb,
+               gpointer ud)
 {
        struct rspamd_kvstorage_async_data              *d;
 
@@ -872,7 +888,7 @@ rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn,
                d->c = conn;
                d->ud = ud;
                d->key = memory_pool_strdup (conn->pool, key);
-               d->keylen = strlen (d->key);
+               d->keylen = keylen;
                d->data = value;
                d->datalen = len;
                conn->state = KV_STATE_SET;
@@ -898,7 +914,7 @@ rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn,
  */
 enum rspamd_kvstorage_error
 rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn,
-               const gpointer key, kvstorage_write_cb cb, gpointer ud)
+               const gpointer key, guint keylen, kvstorage_write_cb cb, gpointer ud)
 {
        struct rspamd_kvstorage_async_data              *d;
 
@@ -912,7 +928,7 @@ rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn,
                d->c = conn;
                d->ud = ud;
                d->key = memory_pool_strdup (conn->pool, key);
-               d->keylen = strlen (d->key);
+               d->keylen = keylen;
                conn->state = KV_STATE_SET;
 
                /* Set event */
@@ -989,7 +1005,7 @@ rspamd_kvstorage_connect_sync (const gchar *host,
  */
 enum rspamd_kvstorage_error
 rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn,
-               const gpointer key, gpointer **value, guint *len)
+               const gpointer key, guint keylen, gpointer **value, guint *len)
 {
        struct kvstorage_buf                               *buf, *databuf;
        gint                                    r;
@@ -1001,7 +1017,7 @@ rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn,
 
        buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, conn->pool);
 
-       r = rspamd_snprintf (buf->data, buf->len, "get %s" CRLF, key);
+       r = rspamd_snprintf (buf->data, buf->len, "get %*s" CRLF, keylen, key);
        buf->len = r;
        while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) {
                poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT);
@@ -1051,17 +1067,16 @@ rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn,
  */
 enum rspamd_kvstorage_error
 rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn,
-               const gpointer key, const gpointer value, gsize len, guint expire)
+               const gpointer key, guint keylen, const gpointer value, gsize len, guint expire)
 {
        struct kvstorage_buf                               *buf;
-       gint                                    r, keylen, buflen;
+       gint                                    r, buflen;
 
        if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
                return KVSTORAGE_ERROR_INTERNAL_ERROR;
        }
 
        /* Create buf */
-       keylen = strlen (key);
        buflen = len + keylen + sizeof ("set  4294967296 4294967296 4294967296" CRLF);
        buf = rspamd_kvstorage_buf_create (buflen, conn->pool);
 
@@ -1091,17 +1106,16 @@ rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn,
  */
 enum rspamd_kvstorage_error
 rspamd_kvstorage_delete_sync (struct rspamd_kvstorage_connection *conn,
-               const gpointer key)
+               const gpointer key, guint keylen)
 {
        struct kvstorage_buf                               *buf;
-       gint                                    r, keylen, buflen;
+       gint                                    r, buflen;
 
        if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
                return KVSTORAGE_ERROR_INTERNAL_ERROR;
        }
 
        /* Create buf */
-       keylen = strlen (key);
        buflen = MAX (keylen + sizeof ("delete " CRLF), MAX_KV_LINE);
        buf = rspamd_kvstorage_buf_create (buflen, conn->pool);
 
@@ -1137,3 +1151,29 @@ rspamd_kvstorage_close_sync (struct rspamd_kvstorage_connection *conn)
 
        return KVSTORAGE_ERROR_OK;
 }
+
+const gchar*
+rspamd_kvstorage_strerror (enum rspamd_kvstorage_error err)
+{
+       switch (err) {
+       case KVSTORAGE_ERROR_OK:
+               return "operation completed";
+       case KVSTORAGE_ERROR_TIMEOUT:
+               return "operation timeout";
+       case KVSTORAGE_ERROR_NOT_FOUND:
+               return "key not found";
+       case KVSTORAGE_ERROR_NOT_STORED:
+               return "key not stored";
+       case KVSTORAGE_ERROR_EXISTS:
+               return "key exists";
+       case KVSTORAGE_ERROR_SERVER_ERROR:
+               return "server error";
+       case KVSTORAGE_ERROR_CLIENT_ERROR:
+               return "client error";
+       case KVSTORAGE_ERROR_INTERNAL_ERROR:
+               return "library error";
+       }
+
+       /* Not reached */
+       return "unknown error";
+}
index 262c5e89e14392ea47870bb97e460ec08ce83cf0..11145fd49d5cccc52e77793879cc720841d0f29b 100644 (file)
@@ -26,6 +26,7 @@
 #define LIBKVSTORAGECLIENT_H_
 
 #include <glib.h>
+#include <sys/time.h>
 
 /* Errors */
 enum rspamd_kvstorage_error {
@@ -45,10 +46,11 @@ struct rspamd_kvstorage_connection;
 /* Callbacks for async API */
 typedef void (*kvstorage_connect_cb) (enum rspamd_kvstorage_error code,
                                                                        struct rspamd_kvstorage_connection *conn, gpointer user_data);
-typedef void (*kvstorage_read_cb) (enum rspamd_kvstorage_error code, const gpointer key,
+typedef void (*kvstorage_read_cb) (enum rspamd_kvstorage_error code, const gpointer key, guint keylen,
                                                                        const gpointer value, gsize datalen, struct rspamd_kvstorage_connection *conn,
                                                                        gpointer user_data);
-typedef void (*kvstorage_write_cb) (enum rspamd_kvstorage_error code, const gpointer key, struct rspamd_kvstorage_connection *conn,
+typedef void (*kvstorage_write_cb) (enum rspamd_kvstorage_error code, const gpointer key, guint keylen,
+                                                                       struct rspamd_kvstorage_connection *conn,
                                                                        gpointer user_data);
 
 /* Asynced API */
@@ -74,7 +76,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_connect_async (const gchar *host,
  * @param ud user data for callback
  */
 enum rspamd_kvstorage_error rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn,
-               const gpointer key, kvstorage_read_cb cb, gpointer ud);
+               const gpointer key, guint keylen, kvstorage_read_cb cb, gpointer ud);
 
 /**
  * Write key asynced
@@ -85,7 +87,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_get_async (struct rspamd_kvstorage_
  * @param ud user data for callback
  */
 enum rspamd_kvstorage_error rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn,
-               const gpointer key, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb, gpointer ud);
+               const gpointer key, guint keylen, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb, gpointer ud);
 
 /**
  * Delete key asynced
@@ -95,7 +97,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_set_async (struct rspamd_kvstorage_
  * @param ud user data for callback
  */
 enum rspamd_kvstorage_error rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn,
-               const gpointer key, kvstorage_write_cb cb, gpointer ud);
+               const gpointer key, guint keylen, kvstorage_write_cb cb, gpointer ud);
 
 /**
  * Close connection
@@ -122,7 +124,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_connect_sync (const gchar *host,
  * @param value value readed
  */
 enum rspamd_kvstorage_error rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn,
-               const gpointer key, gpointer **value, guint *len);
+               const gpointer key, guint keylen, gpointer **value, guint *len);
 
 /**
  * Write key synced
@@ -131,7 +133,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_get_sync (struct rspamd_kvstorage_c
  * @param value data to write
  */
 enum rspamd_kvstorage_error rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn,
-               const gpointer key, const gpointer value, gsize len, guint expire);
+               const gpointer key, guint keylen, const gpointer value, gsize len, guint expire);
 
 /**
  * Delete key synced
@@ -139,7 +141,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_set_sync (struct rspamd_kvstorage_c
  * @param key key to delete
  */
 enum rspamd_kvstorage_error rspamd_kvstorage_delete_sync (struct rspamd_kvstorage_connection *conn,
-               const gpointer key);
+               const gpointer key, guint keylen);
 
 /**
  * Close connection
@@ -147,4 +149,10 @@ enum rspamd_kvstorage_error rspamd_kvstorage_delete_sync (struct rspamd_kvstorag
  */
 enum rspamd_kvstorage_error rspamd_kvstorage_close_sync (struct rspamd_kvstorage_connection *conn);
 
+/**
+ * Convert error code to string
+ * @param err error code to be converted
+ */
+const gchar* rspamd_kvstorage_strerror (enum rspamd_kvstorage_error err);
+
 #endif /* LIBKVSTORAGECLIENT_H_ */
index 7ff76afb37913d3b749bcdd00206c9cad3e4831a..59a5f3c30467807220a9b31d344f10ea98b65f93 100644 (file)
@@ -292,6 +292,24 @@ gpointer init_workers_ctx (enum process_type type);
  */
 extern struct rspamd_main *rspamd_main;
 
+/* Worker task manipulations */
+
+/**
+ * Construct new task for worker
+ */
+struct worker_task* construct_task (struct rspamd_worker *worker);
+/**
+ * Destroy task object and remove its IO dispatcher if it exists
+ */
+void free_task (struct worker_task *task, gboolean is_soft);
+void free_task_hard (gpointer ud);
+void free_task_soft (gpointer ud);
+
+/**
+ * Set counter for a symbol
+ */
+double set_counter (const gchar *name, guint32 value);
+
 #endif
 
 /* 
index 26d96a38cd286d305be48846637127cb14775aae..d9d42d98347c348c301a1d26ca4eb76c04a74f73 100644 (file)
@@ -891,3 +891,51 @@ statfile_pool_plan_invalidate (statfile_pool_t *pool, time_t seconds, time_t jit
                msg_info ("invalidate of statfile pool is planned in %d seconds", (gint)pool->invalidate_tv.tv_sec);
        }
 }
+
+
+stat_file_t *
+get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf,
+        const gchar *symbol, struct statfile **st, gboolean try_create)
+{
+    stat_file_t *res = NULL;
+    GList *cur;
+
+    if (pool == NULL || ccf == NULL || symbol == NULL) {
+               msg_err ("invalid input arguments");
+        return NULL;
+    }
+
+    cur = g_list_first (ccf->statfiles);
+       while (cur) {
+               *st = cur->data;
+               if (strcmp (symbol, (*st)->symbol) == 0) {
+                       break;
+               }
+               *st = NULL;
+               cur = g_list_next (cur);
+       }
+    if (*st == NULL) {
+               msg_info ("cannot find statfile with symbol %s", symbol);
+        return NULL;
+    }
+
+    if ((res = statfile_pool_is_open (pool, (*st)->path)) == NULL) {
+               if ((res = statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE)) == NULL) {
+                       msg_warn ("cannot open %s", (*st)->path);
+            if (try_create) {
+                if (statfile_pool_create (pool, (*st)->path, (*st)->size) == -1) {
+                                       msg_err ("cannot create statfile %s", (*st)->path);
+                                       return NULL;
+                               }
+                res = statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE);
+                               if (res == NULL) {
+                                       msg_err ("cannot open statfile %s after creation", (*st)->path);
+                               }
+            }
+               }
+       }
+
+    return res;
+}
+
+
index e76aa08971c148ddcdbb7d82de5c03679a4c7e02..edb58e5db72a8afea8ac5daf2e743f670be2f3a1 100644 (file)
@@ -96,6 +96,10 @@ typedef struct statfile_pool_s {
        struct timeval invalidate_tv;
 } statfile_pool_t;
 
+/* Forwarded declarations */
+struct classifier_config;
+struct statfile;
+
 /**
  * Create new statfile pool
  * @param max_size maximum size
@@ -261,4 +265,15 @@ guint64 statfile_get_total_blocks (stat_file_t *file);
  */
 void statfile_pool_plan_invalidate (statfile_pool_t *pool, time_t seconds, time_t jitter);
 
+/**
+ * Get a statfile by symbol
+ * @param pool pool object
+ * @param ccf ccf classifier config
+ * @param symbol symbol to search
+ * @param st statfile to get
+ * @param try_create whether we need to create statfile if it is absent
+ */
+stat_file_t* get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf,
+               const gchar *symbol, struct statfile **st, gboolean try_create);
+
 #endif
index 4d2665f6b8eb9235e2b7c98a9ec0138f2677237f..2efd8f0d57d5f31a6780fe733217e12dc511348a 100644 (file)
@@ -38,8 +38,6 @@
 /* Default connect timeout for sync sockets */
 #define CONNECT_TIMEOUT 3
 
-rspamd_hash_t                      *counters = NULL;
-
 static gchar* rspamd_sprintf_num (gchar *buf, gchar *last, guint64 ui64, gchar zero, guint hexadecimal, guint width);
 
 gint
@@ -939,35 +937,6 @@ calculate_check_time (struct timeval *begin, gint resolution)
        return (const gchar *)res;
 }
 
-double
-set_counter (const gchar *name, guint32 value)
-{
-       struct counter_data            *cd;
-       double                          alpha;
-       gchar                           *key;
-
-       cd = rspamd_hash_lookup (counters, (gpointer) name);
-
-       if (cd == NULL) {
-               cd = memory_pool_alloc_shared (counters->pool, sizeof (struct counter_data));
-               cd->value = value;
-               cd->number = 0;
-               key = memory_pool_strdup_shared (counters->pool, name);
-               rspamd_hash_insert (counters, (gpointer) key, (gpointer) cd);
-       }
-       else {
-               /* Calculate new value */
-               memory_pool_wlock_rwlock (counters->lock);
-
-               alpha = 2. / (++cd->number + 1);
-               cd->value = cd->value * (1. - alpha) + value * alpha;
-
-               memory_pool_wunlock_rwlock (counters->lock);
-       }
-
-       return cd->value;
-}
-
 #ifndef g_tolower
 #   define g_tolower(x) (((x) >= 'A' && (x) <= 'Z') ? (x) - 'A' + 'a' : (x))
 #endif
@@ -1147,52 +1116,6 @@ unlock_file (gint fd, gboolean async)
 }
 #endif /* HAVE_FLOCK */
 
-stat_file_t *
-get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf, 
-        const gchar *symbol, struct statfile **st, gboolean try_create)
-{
-    stat_file_t *res = NULL;
-    GList *cur;
-
-    if (pool == NULL || ccf == NULL || symbol == NULL) {
-               msg_err ("invalid input arguments");
-        return NULL;
-    }
-
-    cur = g_list_first (ccf->statfiles);
-       while (cur) {
-               *st = cur->data;
-               if (strcmp (symbol, (*st)->symbol) == 0) {
-                       break;
-               }
-               *st = NULL;
-               cur = g_list_next (cur);
-       }
-    if (*st == NULL) {
-               msg_info ("cannot find statfile with symbol %s", symbol);
-        return NULL;
-    }
-
-    if ((res = statfile_pool_is_open (pool, (*st)->path)) == NULL) {
-               if ((res = statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE)) == NULL) {
-                       msg_warn ("cannot open %s", (*st)->path);
-            if (try_create) {
-                if (statfile_pool_create (pool, (*st)->path, (*st)->size) == -1) {
-                                       msg_err ("cannot create statfile %s", (*st)->path);
-                                       return NULL;
-                               }
-                res = statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE);
-                               if (res == NULL) {
-                                       msg_err ("cannot open statfile %s after creation", (*st)->path);
-                               }
-            }
-               }
-       }
-    
-    return res;
-}
-
-
 
 #if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION < 22))
 void
@@ -1276,22 +1199,8 @@ str_to_process (const gchar *str)
        return TYPE_UNKNOWN;
 }
 
-/*
- * Destructor for recipients list
- */
-static void
-rcpt_destruct (void *pointer)
-{
-       struct worker_task             *task = (struct worker_task *) pointer;
-
-       if (task->rcpt) {
-               g_list_free (task->rcpt);
-       }
-}
-
-
 /* Compare two emails for building emails tree */
-static gint
+gint
 compare_email_func (gconstpointer a, gconstpointer b)
 {
        const struct uri               *u1 = a, *u2 = b;
@@ -1317,7 +1226,7 @@ compare_email_func (gconstpointer a, gconstpointer b)
        return 0;
 }
 
-static gint
+gint
 compare_url_func (gconstpointer a, gconstpointer b)
 {
        const struct uri               *u1 = a, *u2 = b;
@@ -1333,130 +1242,6 @@ compare_url_func (gconstpointer a, gconstpointer b)
        return r;
 }
 
-/*
- * Create new task
- */
-struct worker_task             *
-construct_task (struct rspamd_worker *worker)
-{
-       struct worker_task             *new_task;
-
-       new_task = g_slice_alloc0 (sizeof (struct worker_task));
-
-       new_task->worker = worker;
-       new_task->state = READ_COMMAND;
-       new_task->cfg = worker->srv->cfg;
-       new_task->from_addr.s_addr = INADDR_NONE;
-       new_task->view_checked = FALSE;
-#ifdef HAVE_CLOCK_GETTIME
-# ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID
-       clock_gettime (CLOCK_PROCESS_CPUTIME_ID, &new_task->ts);
-# elif defined(HAVE_CLOCK_VIRTUAL)
-       clock_gettime (CLOCK_VIRTUAL, &new_task->ts);
-# else
-       clock_gettime (CLOCK_REALTIME, &new_task->ts);
-# endif
-#endif
-       if (gettimeofday (&new_task->tv, NULL) == -1) {
-               msg_warn ("gettimeofday failed: %s", strerror (errno));
-       }
-
-       new_task->task_pool = memory_pool_new (memory_pool_get_size ());
-
-       /* Add destructor for recipients list (it would be better to use anonymous function here */
-       memory_pool_add_destructor (new_task->task_pool,
-                       (pool_destruct_func) rcpt_destruct, new_task);
-       new_task->results = g_hash_table_new (g_str_hash, g_str_equal);
-       memory_pool_add_destructor (new_task->task_pool,
-                       (pool_destruct_func) g_hash_table_destroy,
-                       new_task->results);
-       new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal);
-       memory_pool_add_destructor (new_task->task_pool,
-                       (pool_destruct_func) g_hash_table_destroy,
-                       new_task->re_cache);
-       new_task->raw_headers = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
-       memory_pool_add_destructor (new_task->task_pool,
-                               (pool_destruct_func) g_hash_table_destroy,
-                               new_task->raw_headers);
-       new_task->emails = g_tree_new (compare_email_func);
-       memory_pool_add_destructor (new_task->task_pool,
-                               (pool_destruct_func) g_tree_destroy,
-                               new_task->emails);
-       new_task->urls = g_tree_new (compare_url_func);
-       memory_pool_add_destructor (new_task->task_pool,
-                                       (pool_destruct_func) g_tree_destroy,
-                                       new_task->urls);
-       new_task->s =
-                       new_async_session (new_task->task_pool, free_task_hard, new_task);
-       new_task->sock = -1;
-       new_task->is_mime = TRUE;
-
-       return new_task;
-}
-
-
-/*
- * Free all structures of worker_task
- */
-void
-free_task (struct worker_task *task, gboolean is_soft)
-{
-       GList                          *part;
-       struct mime_part               *p;
-
-       if (task) {
-               debug_task ("free pointer %p", task);
-               while ((part = g_list_first (task->parts))) {
-                       task->parts = g_list_remove_link (task->parts, part);
-                       p = (struct mime_part *) part->data;
-                       g_byte_array_free (p->content, TRUE);
-                       g_list_free_1 (part);
-               }
-               if (task->text_parts) {
-                       g_list_free (task->text_parts);
-               }
-               if (task->images) {
-                       g_list_free (task->images);
-               }
-               if (task->messages) {
-                       g_list_free (task->messages);
-               }
-               if (task->received) {
-                       g_list_free (task->received);
-               }
-               memory_pool_delete (task->task_pool);
-               if (task->dispatcher) {
-                       if (is_soft) {
-                               /* Plan dispatcher shutdown */
-                               task->dispatcher->wanna_die = 1;
-                       }
-                       else {
-                               rspamd_remove_dispatcher (task->dispatcher);
-                       }
-               }
-               if (task->sock != -1) {
-                       close (task->sock);
-               }
-               g_slice_free1 (sizeof (struct worker_task), task);
-       }
-}
-
-void
-free_task_hard (gpointer ud)
-{
-  struct worker_task             *task = ud;
-
-  free_task (task, FALSE);
-}
-
-void
-free_task_soft (gpointer ud)
-{
-  struct worker_task             *task = ud;
-
-  free_task (task, FALSE);
-}
-
 gchar *
 escape_braces_addr_fstr (memory_pool_t *pool, f_str_t *in)
 {
index bbe49db673d9a01ad15e8158c2da9a285816e576..95d5703bd70d0f610cb0d3a2533bc01f4a280ebb 100644 (file)
@@ -148,11 +148,6 @@ const gchar* calculate_check_time (struct timeval *tv, struct timespec *begin, g
 const gchar* calculate_check_time (struct timeval *begin, gint resolution);
 #endif
 
-/*
- * Set counter for a symbol
- */
-double set_counter (const gchar *name, guint32 value);
-
 /*
  * File locking functions
  */
@@ -176,12 +171,6 @@ gboolean fstr_strcase_equal (gconstpointer v, gconstpointer v2);
  */
 void gperf_profiler_init (struct config_file *cfg, const gchar *descr);
 
-/*
- * Get a statfile by symbol
- */
-stat_file_t* get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf, 
-               const gchar *symbol, struct statfile **st, gboolean try_create);
-
 /*
  * Workaround for older versions of glib
  */
@@ -228,19 +217,11 @@ gchar * escape_braces_addr_fstr (memory_pool_t *pool, f_str_t *in);
 #define msec_to_tv(msec, tv) do { (tv)->tv_sec = (msec) / 1000; (tv)->tv_usec = ((msec) - (tv)->tv_sec * 1000) * 1000; } while(0)
 #define tv_to_msec(tv) (tv)->tv_sec * 1000 + (tv)->tv_usec / 1000
 
-struct worker_task;
-struct rspamd_worker;
+/* Compare two emails for building emails tree */
+gint compare_email_func (gconstpointer a, gconstpointer b);
 
-/**
- * Construct new task for worker
- */
-struct worker_task* construct_task (struct rspamd_worker *worker);
-/**
- * Destroy task object and remove its IO dispatcher if it exists
- */
-void free_task (struct worker_task *task, gboolean is_soft);
-void free_task_hard (gpointer ud);
-void free_task_soft (gpointer ud);
+/* Compare two urls for building emails tree */
+gint compare_url_func (gconstpointer a, gconstpointer b);
 
 /*
  * Find string find in string s ignoring case
index d1aeea8595010bd25991843014dad7a3eae5b2e7..24cb0dd69c658362adc258bdb20eef334a124685 100644 (file)
@@ -98,6 +98,8 @@ static gboolean                 write_socket (void *arg);
 
 static sig_atomic_t             wanna_die = 0;
 
+rspamd_hash_t                  *counters = NULL;
+
 #ifndef HAVE_SA_SIGINFO
 static void
 sig_handler (gint signo)
@@ -160,6 +162,172 @@ sigusr1_handler (gint fd, short what, void *arg)
        return;
 }
 
+/*
+ * Destructor for recipients list in a task
+ */
+static void
+rcpt_destruct (void *pointer)
+{
+       struct worker_task             *task = (struct worker_task *) pointer;
+
+       if (task->rcpt) {
+               g_list_free (task->rcpt);
+       }
+}
+
+/*
+ * Create new task
+ */
+struct worker_task             *
+construct_task (struct rspamd_worker *worker)
+{
+       struct worker_task             *new_task;
+
+       new_task = g_slice_alloc0 (sizeof (struct worker_task));
+
+       new_task->worker = worker;
+       new_task->state = READ_COMMAND;
+       new_task->cfg = worker->srv->cfg;
+       new_task->from_addr.s_addr = INADDR_NONE;
+       new_task->view_checked = FALSE;
+#ifdef HAVE_CLOCK_GETTIME
+# ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID
+       clock_gettime (CLOCK_PROCESS_CPUTIME_ID, &new_task->ts);
+# elif defined(HAVE_CLOCK_VIRTUAL)
+       clock_gettime (CLOCK_VIRTUAL, &new_task->ts);
+# else
+       clock_gettime (CLOCK_REALTIME, &new_task->ts);
+# endif
+#endif
+       if (gettimeofday (&new_task->tv, NULL) == -1) {
+               msg_warn ("gettimeofday failed: %s", strerror (errno));
+       }
+
+       new_task->task_pool = memory_pool_new (memory_pool_get_size ());
+
+       /* Add destructor for recipients list (it would be better to use anonymous function here */
+       memory_pool_add_destructor (new_task->task_pool,
+                       (pool_destruct_func) rcpt_destruct, new_task);
+       new_task->results = g_hash_table_new (g_str_hash, g_str_equal);
+       memory_pool_add_destructor (new_task->task_pool,
+                       (pool_destruct_func) g_hash_table_destroy,
+                       new_task->results);
+       new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal);
+       memory_pool_add_destructor (new_task->task_pool,
+                       (pool_destruct_func) g_hash_table_destroy,
+                       new_task->re_cache);
+       new_task->raw_headers = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
+       memory_pool_add_destructor (new_task->task_pool,
+                               (pool_destruct_func) g_hash_table_destroy,
+                               new_task->raw_headers);
+       new_task->emails = g_tree_new (compare_email_func);
+       memory_pool_add_destructor (new_task->task_pool,
+                               (pool_destruct_func) g_tree_destroy,
+                               new_task->emails);
+       new_task->urls = g_tree_new (compare_url_func);
+       memory_pool_add_destructor (new_task->task_pool,
+                                       (pool_destruct_func) g_tree_destroy,
+                                       new_task->urls);
+       new_task->s =
+                       new_async_session (new_task->task_pool, free_task_hard, new_task);
+       new_task->sock = -1;
+       new_task->is_mime = TRUE;
+
+       return new_task;
+}
+
+
+/*
+ * Free all structures of worker_task
+ */
+void
+free_task (struct worker_task *task, gboolean is_soft)
+{
+       GList                          *part;
+       struct mime_part               *p;
+
+       if (task) {
+               debug_task ("free pointer %p", task);
+               while ((part = g_list_first (task->parts))) {
+                       task->parts = g_list_remove_link (task->parts, part);
+                       p = (struct mime_part *) part->data;
+                       g_byte_array_free (p->content, TRUE);
+                       g_list_free_1 (part);
+               }
+               if (task->text_parts) {
+                       g_list_free (task->text_parts);
+               }
+               if (task->images) {
+                       g_list_free (task->images);
+               }
+               if (task->messages) {
+                       g_list_free (task->messages);
+               }
+               if (task->received) {
+                       g_list_free (task->received);
+               }
+               memory_pool_delete (task->task_pool);
+               if (task->dispatcher) {
+                       if (is_soft) {
+                               /* Plan dispatcher shutdown */
+                               task->dispatcher->wanna_die = 1;
+                       }
+                       else {
+                               rspamd_remove_dispatcher (task->dispatcher);
+                       }
+               }
+               if (task->sock != -1) {
+                       close (task->sock);
+               }
+               g_slice_free1 (sizeof (struct worker_task), task);
+       }
+}
+
+void
+free_task_hard (gpointer ud)
+{
+  struct worker_task             *task = ud;
+
+  free_task (task, FALSE);
+}
+
+void
+free_task_soft (gpointer ud)
+{
+  struct worker_task             *task = ud;
+
+  free_task (task, FALSE);
+}
+
+double
+set_counter (const gchar *name, guint32 value)
+{
+       struct counter_data            *cd;
+       double                          alpha;
+       gchar                           *key;
+
+       cd = rspamd_hash_lookup (counters, (gpointer) name);
+
+       if (cd == NULL) {
+               cd = memory_pool_alloc_shared (counters->pool, sizeof (struct counter_data));
+               cd->value = value;
+               cd->number = 0;
+               key = memory_pool_strdup_shared (counters->pool, name);
+               rspamd_hash_insert (counters, (gpointer) key, (gpointer) cd);
+       }
+       else {
+               /* Calculate new value */
+               memory_pool_wlock_rwlock (counters->lock);
+
+               alpha = 2. / (++cd->number + 1);
+               cd->value = cd->value * (1. - alpha) + value * alpha;
+
+               memory_pool_wunlock_rwlock (counters->lock);
+       }
+
+       return cd->value;
+}
+
 #ifndef BUILD_STATIC
 static void
 fin_custom_filters (struct worker_task *task)
@@ -244,6 +412,7 @@ parse_line_custom (struct worker_task *task, f_str_t * in)
 }
 #endif
 
+
 /*
  * Callback that is called when there is data to read in buffer
  */