]> source.dussan.org Git - rspamd.git/commitdiff
* Implement basic functionality of key value storage
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 31 Oct 2011 15:37:41 +0000 (18:37 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 31 Oct 2011 15:37:41 +0000 (18:37 +0300)
src/cfg_utils.c
src/cfg_xml.c
src/cfg_xml.h
src/kvstorage.c
src/kvstorage.h
src/kvstorage_config.c
src/kvstorage_server.c
src/kvstorage_server.h

index ffebd5c46e524d6debf50e31c1328989c4c2fb8b..48e95cf59e6ae308bb460d548cd1bd363987c65d 100644 (file)
@@ -32,6 +32,7 @@
 #include "classifiers/classifiers.h"
 #include "cfg_xml.h"
 #include "lua/lua_common.h"
+#include "kvstorage_config.h"
 
 #define DEFAULT_SCORE 10.0
 
@@ -960,6 +961,7 @@ read_xml_config (struct config_file *cfg, const gchar *filename)
        ud.if_stack = g_queue_new ();
 
        ctx = g_markup_parse_context_new (&xml_parser, G_MARKUP_TREAT_CDATA_AS_TEXT, &ud, NULL);
+       init_kvstorage_config ();
        res = g_markup_parse_context_parse (ctx, data, st.st_size, &err);
 
        if (g_queue_get_length (ud.if_stack) != 0) {
index edd8c9a17f06fcd796d7247a60a967d3ca9d73dc..f77bdf3aa60ba0e19f96bd9be9a0df2db0f7680b 100644 (file)
@@ -97,6 +97,7 @@ struct xml_subparser {
        enum xml_read_state state;
        const GMarkupParser *parser;
        gpointer user_data;
+       void (*fin_func)(gpointer ud);
 };
 
 /* Here we describes our basic grammar */
@@ -1736,6 +1737,7 @@ rspamd_xml_start_element (GMarkupParseContext *context, const gchar *element_nam
                        else if (subparsers != NULL && (subparser = g_hash_table_lookup (subparsers, element_name)) != NULL) {
                                ud->state = XML_SUBPARSER;
                                g_markup_parse_context_push (context, subparser->parser, subparser->user_data);
+                               rspamd_strlcpy (ud->section_name, element_name, sizeof (ud->section_name));
                        }
                        else {
                                /* Extract other tags */
@@ -1812,6 +1814,7 @@ rspamd_xml_end_element (GMarkupParseContext       *context, const gchar *element_name,
        gboolean                    res;
        gpointer                    tptr;
        struct wrk_cbdata           wcd;
+       struct xml_subparser       *subparser;
        
        if (g_ascii_strcasecmp (element_name, "if") == 0) {
                tptr = g_queue_pop_head (ud->if_stack);
@@ -1929,6 +1932,18 @@ rspamd_xml_end_element (GMarkupParseContext      *context, const gchar *element_name,
                        break;
                case XML_SKIP_ELEMENTS:
                        return;
+               case XML_SUBPARSER:
+                       CHECK_TAG (ud->section_name, TRUE);
+                       if (subparsers != NULL && (subparser = g_hash_table_lookup (subparsers, element_name)) != NULL) {
+                               if (subparser->fin_func) {
+                                       subparser->fin_func (g_markup_parse_context_pop (context));
+                               }
+                               else {
+                                       g_markup_parse_context_pop (context);
+                               }
+                       }
+                       ud->state = XML_READ_PARAM;
+                       break;
                default:
                        ud->state = XML_ERROR;
                        break;
@@ -2256,7 +2271,7 @@ register_classifier_opt (const gchar *ctype, const gchar *optname)
 }
 
 void
-register_subparser (const gchar *tag, enum xml_read_state state, const GMarkupParser *parser, gpointer user_data)
+register_subparser (const gchar *tag, enum xml_read_state state, const GMarkupParser *parser, void (*fin_func)(gpointer ud), gpointer user_data)
 {
        struct xml_subparser                     *subparser;
 
@@ -2268,6 +2283,7 @@ register_subparser (const gchar *tag, enum xml_read_state state, const GMarkupPa
        subparser->parser = parser;
        subparser->state = state;
        subparser->user_data = user_data;
+       subparser->fin_func = fin_func;
 
        g_hash_table_replace (subparsers, g_strdup (tag), subparser);
 }
index 6740e135eae523e03ce11e2ba6c55089a2c8fe18..f1a1742d3fb6311c62d1a5b21c2fda60ce611f93 100644 (file)
@@ -167,7 +167,8 @@ void register_worker_opt (gint wtype, const gchar *optname, element_handler_func
 void register_classifier_opt (const gchar *ctype, const gchar *optname);
 
 /* Register new xml subparser */
-void register_subparser (const gchar *tag, enum xml_read_state state, const GMarkupParser *parser, gpointer user_data);
+void register_subparser (const gchar *tag, enum xml_read_state state,
+               const GMarkupParser *parser, void (*fin_func)(gpointer ud), gpointer user_data);
 
 /* Check validity of module option */
 gboolean check_module_option (const gchar *mname, const gchar *optname, const gchar *data);
index 2cfad9459f051ceda486e3c54b638a3d4fcf9365..aed0390408856605a32695d57e54ba58d6c1a5fb 100644 (file)
@@ -57,6 +57,8 @@ rspamd_kv_storage_new (gint id, const gchar *name, struct rspamd_kv_cache *cache
                rspamd_snprintf (new->name, sizeof ("18446744073709551616"), "%d", id);
        }
 
+       g_static_rw_lock_init (&new->rwlock);
+
        /* Init structures */
        if (new->cache->init_func) {
                new->cache->init_func (new->cache);
@@ -78,23 +80,26 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, struct rsp
        gint                                                            steps = 0;
 
        /* Hard limit */
-       if (elt->size > storage->max_memory) {
-               msg_info ("<%s>: trying to insert value of length %z while limit is %z", elt->size, storage->max_memory);
-               return FALSE;
-       }
-
-       /* Now check limits */
-       while (storage->memory + elt->size > storage->max_memory || storage->elts >= storage->max_elts) {
-               if (storage->expire) {
-                       storage->expire->step_func (storage->expire, storage, time (NULL));
-               }
-               else {
-                       msg_warn ("<%s>: storage %s is full and no expire function is defined", storage->name);
-               }
-               if (++steps > MAX_EXPIRE_STEPS) {
-                       msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
+       if (storage->max_memory > 0) {
+               if (elt->size > storage->max_memory) {
+                       msg_info ("<%s>: trying to insert value of length %z while limit is %z", storage->name,
+                                       elt->size, storage->max_memory);
                        return FALSE;
                }
+
+               /* Now check limits */
+               while (storage->memory + elt->size > storage->max_memory || storage->elts >= storage->max_elts) {
+                       if (storage->expire) {
+                               storage->expire->step_func (storage->expire, storage, time (NULL));
+                       }
+                       else {
+                               msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
+                       }
+                       if (++steps > MAX_EXPIRE_STEPS) {
+                               msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
+                               return FALSE;
+                       }
+               }
        }
 
        /* Insert elt to the cache */
@@ -116,30 +121,33 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, struct rsp
 
 /** Insert new element to the kv storage */
 gboolean
-rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpointer data, gsize len, gint flags)
+rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpointer data, gsize len, gint flags, guint expire)
 {
        gint                                                            steps = 0;
        struct rspamd_kv_element           *elt;
        gboolean                                                        res = TRUE;
 
        /* Hard limit */
-       if (len > storage->max_memory) {
-               msg_info ("<%s>: trying to insert value of length %z while limit is %z", len, storage->max_memory);
-               return FALSE;
-       }
-
-       /* Now check limits */
-       while (storage->memory + len > storage->max_memory || storage->elts >= storage->max_elts) {
-               if (storage->expire) {
-                       storage->expire->step_func (storage->expire, storage, time (NULL));
-               }
-               else {
-                       msg_warn ("<%s>: storage %s is full and no expire function is defined", storage->name);
-               }
-               if (++steps > MAX_EXPIRE_STEPS) {
-                       msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
+       if (storage->max_memory > 0) {
+               if (len > storage->max_memory) {
+                       msg_info ("<%s>: trying to insert value of length %z while limit is %z", storage->name,
+                                       len, storage->max_memory);
                        return FALSE;
                }
+
+               /* Now check limits */
+               while (storage->memory + len > storage->max_memory || storage->elts >= storage->max_elts) {
+                       if (storage->expire) {
+                               storage->expire->step_func (storage->expire, storage, time (NULL));
+                       }
+                       else {
+                               msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
+                       }
+                       if (++steps > MAX_EXPIRE_STEPS) {
+                               msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
+                               return FALSE;
+                       }
+               }
        }
 
        /* Insert elt to the cache */
@@ -149,6 +157,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpoin
        }
        elt->flags = flags;
        elt->size = len;
+       elt->expire = expire;
 
        /* Place to the backend */
        if (storage->backend) {
@@ -174,23 +183,26 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, stru
        gint                                                    steps = 0;
 
        /* Hard limit */
-       if (elt->size > storage->max_memory) {
-               msg_info ("<%s>: trying to replace value of length %z while limit is %z", elt->size, storage->max_memory);
-               return FALSE;
-       }
-
-       /* Now check limits */
-       while (storage->memory + elt->size > storage->max_memory) {
-               if (storage->expire) {
-                       storage->expire->step_func (storage->expire, storage, time (NULL));
-               }
-               else {
-                       msg_warn ("<%s>: storage %s is full and no expire function is defined", storage->name);
-               }
-               if (++steps > MAX_EXPIRE_STEPS) {
-                       msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
+       if (storage->max_memory > 0) {
+               if (elt->size > storage->max_memory) {
+                       msg_info ("<%s>: trying to replace value of length %z while limit is %z", storage->name,
+                                       elt->size, storage->max_memory);
                        return FALSE;
                }
+
+               /* Now check limits */
+               while (storage->memory + elt->size > storage->max_memory) {
+                       if (storage->expire) {
+                               storage->expire->step_func (storage->expire, storage, time (NULL));
+                       }
+                       else {
+                               msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
+                       }
+                       if (++steps > MAX_EXPIRE_STEPS) {
+                               msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
+                               return FALSE;
+                       }
+               }
        }
 
        /* Insert elt to the cache */
index 4bb7f20a7f4f8f897997a3af1ca42358dc691284..5bacc7c6b8b3da7571221b443f9de06ee3938b16 100644 (file)
@@ -117,6 +117,7 @@ struct rspamd_kv_storage {
 
        gint id;                                                                        /* char ID */
        gchar *name;                                                            /* numeric ID */
+       GStaticRWLock rwlock;                                           /* rwlock for threaded access */
 };
 
 /** Create new kv storage */
@@ -125,7 +126,7 @@ struct rspamd_kv_storage *rspamd_kv_storage_new (gint id, const gchar *name,
                gsize max_elts, gsize max_memory);
 
 /** Insert new element to the kv storage */
-gboolean rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpointer data, gsize len, gint flags);
+gboolean rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpointer data, gsize len, gint flags, guint expire);
 
 /** Replace an element in the kv storage */
 gboolean rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, struct rspamd_kv_element *elt);
index 1fb158ec6866397fec6327028b505880e6db1799..bbffb4799104e5b78c20c37e278cff7df0e5ece3 100644 (file)
@@ -115,37 +115,47 @@ void kvstorage_xml_start_element (GMarkupParseContext     *context,
 
        switch (kv_parser->state) {
        case KVSTORAGE_STATE_INIT:
-               /* Make temporary pool */
-               if (kv_parser->pool != NULL) {
-                       memory_pool_delete (kv_parser->pool);
-               }
-               kv_parser->pool = memory_pool_new (memory_pool_get_size ());
-
-               /* Create new kvstorage_config */
-               kv_parser->current_storage = g_malloc0 (sizeof (struct kvstorage_config));
-               kv_parser->current_storage->id = ++last_id;
+               /* XXX: never get this state */
                break;
        case KVSTORAGE_STATE_PARAM:
+               if (kv_parser->current_storage == NULL) {
+                       /* Make temporary pool */
+                       if (kv_parser->pool != NULL) {
+                               memory_pool_delete (kv_parser->pool);
+                       }
+                       kv_parser->pool = memory_pool_new (memory_pool_get_size ());
+
+                       /* Create new kvstorage_config */
+                       kv_parser->current_storage = g_malloc0 (sizeof (struct kvstorage_config));
+                       kv_parser->current_storage->id = last_id++;
+               }
                if (g_ascii_strcasecmp (element_name, "type") == 0) {
                        kv_parser->state = KVSTORAGE_STATE_CACHE_TYPE;
+                       kv_parser->cur_elt = "type";
                }
                else if (g_ascii_strcasecmp (element_name, "max_elements") == 0) {
                        kv_parser->state = KVSTORAGE_STATE_CACHE_MAX_ELTS;
+                       kv_parser->cur_elt = "max_elements";
                }
                else if (g_ascii_strcasecmp (element_name, "max_memory") == 0) {
                        kv_parser->state = KVSTORAGE_STATE_CACHE_MAX_MEM;
+                       kv_parser->cur_elt = "max_memory";
                }
                else if (g_ascii_strcasecmp (element_name, "id") == 0) {
                        kv_parser->state = KVSTORAGE_STATE_ID;
+                       kv_parser->cur_elt = "id";
                }
                else if (g_ascii_strcasecmp (element_name, "name") == 0) {
                        kv_parser->state = KVSTORAGE_STATE_NAME;
+                       kv_parser->cur_elt = "name";
                }
                else if (g_ascii_strcasecmp (element_name, "backend") == 0) {
                        kv_parser->state = KVSTORAGE_STATE_BACKEND;
+                       kv_parser->cur_elt = "backend";
                }
                else if (g_ascii_strcasecmp (element_name, "expire") == 0) {
                        kv_parser->state = KVSTORAGE_STATE_EXPIRE;
+                       kv_parser->cur_elt = "expire";
                }
                else {
                        if (*error == NULL) {
@@ -154,11 +164,11 @@ void kvstorage_xml_start_element (GMarkupParseContext     *context,
                        }
                        kv_parser->state = KVSTORAGE_STATE_ERROR;
                }
-               kv_parser->cur_elt = memory_pool_strdup (kv_parser->pool, element_name);
                break;
        case KVSTORAGE_STATE_BACKEND:
                if (g_ascii_strcasecmp (element_name, "type") == 0) {
                        kv_parser->state = KVSTORAGE_STATE_BACKEND_TYPE;
+                       kv_parser->cur_elt = "type";
                }
                else {
                        if (*error == NULL) {
@@ -171,6 +181,7 @@ void kvstorage_xml_start_element (GMarkupParseContext       *context,
        case KVSTORAGE_STATE_EXPIRE:
                if (g_ascii_strcasecmp (element_name, "type") == 0) {
                        kv_parser->state = KVSTORAGE_STATE_EXPIRE_TYPE;
+                       kv_parser->cur_elt = "type";
                }
                else {
                        if (*error == NULL) {
@@ -211,10 +222,6 @@ void kvstorage_xml_end_element (GMarkupParseContext        *context,
        case KVSTORAGE_STATE_PARAM:
                if (g_ascii_strcasecmp (element_name, "keystorage") == 0) {
                        /* XXX: Init actual storage */
-                       g_hash_table_insert (storages, &kv_parser->current_storage->id, kv_parser->current_storage);
-                       kv_parser->state = KVSTORAGE_STATE_INIT;
-                       g_markup_parse_context_pop (context);
-                       g_hash_table_foreach (storages, kvstorage_init_callback, NULL);
                        return;
                }
                if (*error == NULL) {
@@ -277,6 +284,15 @@ void kvstorage_xml_text       (GMarkupParseContext         *context,
        struct kvstorage_config_parser  *kv_parser = user_data;
        gchar                           *err_str;
 
+       /* Strip space symbols */
+       while (*text && g_ascii_isspace (*text)) {
+               text ++;
+       }
+       if (*text == '\0') {
+               /* Skip empty text */
+               return;
+       }
+
        switch (kv_parser->state) {
        case KVSTORAGE_STATE_INIT:
        case KVSTORAGE_STATE_PARAM:
@@ -294,7 +310,8 @@ void kvstorage_xml_text       (GMarkupParseContext          *context,
                        kv_parser->state = KVSTORAGE_STATE_ERROR;
                }
                else {
-                       last_id = kv_parser->current_storage->id;
+                       last_id ++;
+                       last_id = MAX (kv_parser->current_storage->id, last_id);
                }
                break;
        case KVSTORAGE_STATE_NAME:
@@ -353,13 +370,28 @@ void kvstorage_xml_text       (GMarkupParseContext                *context,
 /* Called on error, including one set by other
 * methods in the vtable. The GError should not be freed.
 */
-void kvstorage_xml_error       (GMarkupParseContext            *context,
+void
+kvstorage_xml_error    (GMarkupParseContext            *context,
                                                                GError              *error,
                                                                gpointer             user_data)
 {
        msg_err ("kvstorage xml parser error: %s", error->message);
 }
 
+/*
+ * Cleanup kvstorage after end tag was read
+ */
+static void
+kvstorage_cleanup (gpointer ud)
+{
+       struct kvstorage_config_parser  *kv_parser = ud;
+
+       g_hash_table_insert (storages, &kv_parser->current_storage->id, kv_parser->current_storage);
+       kv_parser->state = KVSTORAGE_STATE_INIT;
+       g_hash_table_foreach (storages, kvstorage_init_callback, NULL);
+       kv_parser->current_storage = NULL;
+}
+
 /** Public API */
 
 /* Init subparser of kvstorage config */
@@ -387,8 +419,9 @@ init_kvstorage_config (void)
 
        kv_parser = g_malloc0 (sizeof (struct kvstorage_config_parser));
        kv_parser->state = KVSTORAGE_STATE_PARAM;
+       kv_parser->pool = memory_pool_new (memory_pool_get_size ());
 
-       register_subparser ("keystorage", XML_READ_START, parser, kv_parser);
+       register_subparser ("keystorage", XML_READ_START, parser, kvstorage_cleanup, kv_parser);
 }
 
 /* Get configuration for kvstorage with specified ID */
index 78c6c34e3cdddd9a4169eb8ffe8fca62b5bcca3e..81d04d4c2464962c7a969771f9e252d93bfa1081 100644 (file)
 #include "cfg_xml.h"
 #include "main.h"
 
+#define ERROR_COMMON "ERROR" CRLF
+#define ERROR_UNKNOWN_COMMAND "CLIENT_ERROR unknown command" CRLF
+#define ERROR_NOT_STORED "NOT_STORED" CRLF
+#define ERROR_EXISTS "EXISTS" CRLF
+#define ERROR_NOT_FOUND "NOT_FOUND" CRLF
+#define ERROR_INVALID_KEYSTORAGE "CLIENT_ERROR storage does not exists" CRLF
+
 /* This is required for normal signals processing */
 static GList *global_evbases = NULL;
 static struct event_base *main_base = NULL;
@@ -148,11 +155,311 @@ config_kvstorage_worker (struct rspamd_worker *worker)
        return TRUE;
 }
 
+/*
+ * Free kvstorage session
+ */
+static void
+free_kvstorage_session (struct kvstorage_session *session)
+{
+       rspamd_remove_dispatcher (session->dispather);
+       memory_pool_delete (session->pool);
+       close (session->sock);
+}
+
 /**
- * Accept function
+ * Parse kvstorage command
+ */
+static gboolean
+parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
+{
+       gchar                                                           *p, *c, *end;
+       gint                                                             state = 0, next_state;
+
+       p = in->begin;
+       end = in->begin + in->len;
+       c = p;
+
+       /* State machine for parsing */
+       while (p <= end) {
+               switch (state) {
+               case 0:
+                       /* At this state we try to read identifier of storage */
+                       if (g_ascii_isdigit (*p)) {
+                               p ++;
+                       }
+                       else {
+                               if (g_ascii_isspace (*p) && p != c) {
+                                       /* We have some digits, so parse id */
+                                       session->id = strtoul (c, NULL, 10);
+                                       state = 99;
+                                       next_state = 1;
+                               }
+                               else if (c == p) {
+                                       /* We have some character, so assume id as 0 and parse command */
+                                       session->id = 0;
+                                       state = 1;
+                               }
+                               else {
+                                       /* We have something wrong here (like some digits and then come non-digits) */
+                                       return FALSE;
+                               }
+                       }
+                       break;
+               case 1:
+                       /* At this state we parse command */
+                       if (g_ascii_isalpha (*p) && p != end) {
+                               p ++;
+                       }
+                       else {
+                               if ((g_ascii_isspace (*p) || p == end) && p != c) {
+                                       /* We got some command, try to parse it */
+                                       if (p - c == 3) {
+                                               /* Set or get command */
+                                               if (memcmp (c, "get", 3) == 0) {
+                                                       session->command = KVSTORAGE_CMD_GET;
+                                               }
+                                               else if (memcmp (c, "set", 3) == 0) {
+                                                       session->command = KVSTORAGE_CMD_SET;
+                                               }
+                                               else {
+                                                       /* Error */
+                                                       return FALSE;
+                                               }
+                                       }
+                                       else if (p - c == 4) {
+                                               if (memcmp (c, "quit", 4) == 0) {
+                                                       session->command = KVSTORAGE_CMD_QUIT;
+                                                       state = 100;
+                                                       continue;
+                                               }
+                                       }
+                                       else if (p - c == 6) {
+                                               if (memcmp (c, "delete", 6) == 0) {
+                                                       session->command = KVSTORAGE_CMD_DELETE;
+                                               }
+                                               else {
+                                                       return FALSE;
+                                               }
+                                       }
+                                       else {
+                                               return FALSE;
+                                       }
+                                       /* Skip spaces and try to parse key */
+                                       state = 99;
+                                       next_state = 2;
+                               }
+                               else {
+                                       /* Some error */
+                                       return FALSE;
+                               }
+                       }
+                       break;
+               case 2:
+                       /* Read and store key */
+                       if (!g_ascii_isspace (*p) && end != p) {
+                               p ++;
+                       }
+                       else {
+                               if (p == c) {
+                                       return FALSE;
+                               }
+                               else {
+                                       session->key = memory_pool_alloc (session->pool, p - c + 1);
+                                       rspamd_strlcpy (session->key, c, p - c + 1);
+                                       /* Now we must select next state based on command */
+                                       if (session->command == KVSTORAGE_CMD_SET) {
+                                               /* Read flags */
+                                               state = 99;
+                                               next_state = 3;
+                                       }
+                                       else {
+                                               /* Nothing to read for other commands */
+                                               state = 100;
+                                       }
+                               }
+                       }
+                       break;
+               case 3:
+                       /* Read flags */
+                       if (g_ascii_isdigit (*p)) {
+                               p ++;
+                       }
+                       else {
+                               if (g_ascii_isspace (*p)) {
+                                       session->flags = strtoul (c, NULL, 10);
+                                       state = 99;
+                                       next_state = 4;
+                               }
+                               else {
+                                       return FALSE;
+                               }
+                       }
+                       break;
+               case 4:
+                       /* Read exptime */
+                       if (g_ascii_isdigit (*p)) {
+                               p ++;
+                       }
+                       else {
+                               if (g_ascii_isspace (*p)) {
+                                       session->expire = strtoul (c, NULL, 10);
+                                       state = 99;
+                                       next_state = 5;
+                               }
+                               else {
+                                       return FALSE;
+                               }
+                       }
+                       break;
+               case 5:
+                       /* Read size */
+                       if (g_ascii_isdigit (*p)) {
+                               p ++;
+                       }
+                       else {
+                               if (g_ascii_isspace (*p) || end == p) {
+                                       session->length = strtoul (c, NULL, 10);
+                                       state = 100;
+                               }
+                               else {
+                                       return FALSE;
+                               }
+                       }
+                       break;
+               case 99:
+                       /* Skip spaces state */
+                       if (g_ascii_isspace (*p)) {
+                               p ++;
+                       }
+                       else {
+                               c = p;
+                               state = next_state;
+                       }
+                       break;
+               case 100:
+                       /* Successful state */
+                       return TRUE;
+                       break;
+               }
+       }
+
+       return state == 100;
+}
+
+/**
+ * Dispatcher callbacks
+ */
+/*
+ * Callback that is called when there is data to read in buffer
  */
+static gboolean
+kvstorage_read_socket (f_str_t * in, void *arg)
+{
+       struct kvstorage_session                        *session = (struct kvstorage_session *) arg;
+       struct kvstorage_worker_thread          *thr;
+       struct rspamd_kv_element                        *elt;
+       gint                                                             r;
+       gchar                                                            outbuf[BUFSIZ];
+
+       if (in->len == 0) {
+               /* Skip empty commands */
+               return TRUE;
+       }
+       thr = session->thr;
+       switch (session->state) {
+       case KVSTORAGE_STATE_READ_CMD:
+               /* Update timestamp */
+               session->now = time (NULL);
+               if (! parse_kvstorage_command (session, in)) {
+                       thr_info ("%ud: unknown command: %V", thr->id, in);
+                       return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND,
+                                       sizeof (ERROR_UNKNOWN_COMMAND) - 1, FALSE, TRUE);
+               }
+               else {
+                       session->cf = get_kvstorage_config (session->id);
+                       if (session->cf == NULL) {
+                               thr_info ("%ud: bad keystorage: %ud", thr->id, session->id);
+                               return rspamd_dispatcher_write (session->dispather, ERROR_INVALID_KEYSTORAGE,
+                                               sizeof (ERROR_INVALID_KEYSTORAGE) - 1, FALSE, TRUE);
+                       }
+                       if (session->command == KVSTORAGE_CMD_SET) {
+                               session->state = KVSTORAGE_STATE_READ_DATA;
+                               rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->length);
+                       }
+                       else if (session->command == KVSTORAGE_CMD_GET) {
+                               elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->now);
+                               if (elt == NULL) {
+                                       return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
+                                                                                                                               sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+                               }
+                               else {
+                                       r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
+                                                       elt->key, elt->flags, elt->size);
+                                       if (!rspamd_dispatcher_write (session->dispather, outbuf,
+                                                                                                                               r, FALSE, FALSE)) {
+                                               return FALSE;
+                                       }
+                                       if (!rspamd_dispatcher_write (session->dispather, elt->data, elt->size, FALSE, TRUE)) {
+                                               return FALSE;
+                                       }
+                                       return rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
+                                                       sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE);
+                               }
+                       }
+                       else if (session->command == KVSTORAGE_CMD_DELETE) {
+                               if (rspamd_kv_storage_delete (session->cf->storage, session->key)) {
+                                       return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
+                                                                                                                               sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
+                               }
+                               else {
+                                       return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
+                                                                                                               sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+                               }
+                       }
+                       else if (session->command == KVSTORAGE_CMD_QUIT) {
+                               /* Quit session */
+                               free_kvstorage_session (session);
+                               return FALSE;
+                       }
+               }
+               break;
+       case KVSTORAGE_STATE_READ_DATA:
+               session->state = KVSTORAGE_STATE_READ_CMD;
+               rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+               if (rspamd_kv_storage_insert (session->cf->storage, session->key, in->begin, in->len,
+                               session->flags, session->expire)) {
+                       return rspamd_dispatcher_write (session->dispather, "STORED" CRLF,
+                                                                                       sizeof ("STORED" CRLF) - 1, FALSE, TRUE);
+               }
+               else {
+                       return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED,
+                                                                       sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE);
+               }
+
+               break;
+       }
+
+       return TRUE;
+}
+
 /*
- * Accept new connection and construct task
+ * Called if something goes wrong
+ */
+static void
+kvstorage_err_socket (GError * err, void *arg)
+{
+       struct kvstorage_session                        *session = (struct kvstorage_session *) arg;
+       struct kvstorage_worker_thread          *thr;
+
+       thr = session->thr;
+       thr_info ("%ud: abnormally closing connection from: %s, error: %s",
+                       thr->id, inet_ntoa (session->client_addr), err->message);
+       g_error_free (err);
+       free_kvstorage_session (session);
+}
+
+/**
+ * Accept function
  */
 static void
 thr_accept_socket (gint fd, short what, void *arg)
@@ -161,6 +468,7 @@ thr_accept_socket (gint fd, short what, void *arg)
        union sa_union                           su;
        socklen_t                                addrlen = sizeof (su.ss);
        gint                                     nfd;
+       struct kvstorage_session                        *session;
 
        if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
                thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno));
@@ -171,15 +479,26 @@ thr_accept_socket (gint fd, short what, void *arg)
                return;
        }
 
+       session = g_malloc (sizeof (struct kvstorage_session));
+       session->pool = memory_pool_new (memory_pool_get_size ());
+       session->state = KVSTORAGE_STATE_READ_CMD;
+       session->thr = thr;
+       session->sock = nfd;
+       session->dispather = rspamd_create_dispatcher (thr->ctx->ev_base, nfd, BUFFER_LINE,
+                       kvstorage_read_socket, NULL,
+                       kvstorage_err_socket, thr->tv, session);
+       session->dispather->strip_eol = TRUE;
+
        if (su.ss.ss_family == AF_UNIX) {
                thr_info ("%ud: accepted connection from unix socket", thr->id);
+               session->client_addr.s_addr = INADDR_NONE;
        }
        else if (su.ss.ss_family == AF_INET) {
                thr_info ("%ud: accepted connection from %s port %d", thr->id,
                                inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port));
+               memcpy (&session->client_addr, &su.s4.sin_addr,
+                                               sizeof (struct in_addr));
        }
-       /* XXX: write the logic */
-       close (nfd);
 }
 
 /**
@@ -253,6 +572,12 @@ start_kvstorage_worker (struct rspamd_worker *worker)
        ctx->threads = NULL;
 
        g_thread_init (NULL);
+#if _EVENT_NUMERIC_VERSION > 0x02000000
+       if (evthread_use_pthreads () == -1) {
+               msg_err ("threads support is not supported in your libevent so kvstorage is not functionable");
+               exit (EXIT_SUCCESS);
+       }
+#endif
        main_base = ctx->ev_base;
 
        /* Set kvstorage options */
index 29107101c4c03ab559b85baa9c879a664b37659f..6f22b08c2be6bf247f767123b65e987cf408e4c7 100644 (file)
@@ -27,6 +27,7 @@
 
 #include "config.h"
 #include "mem_pool.h"
+#include "buffer.h"
 
 /* Configuration context for kvstorage worker */
 struct kvstorage_worker_ctx {
@@ -50,6 +51,31 @@ struct kvstorage_worker_thread {
        guint id;
 };
 
+struct kvstorage_session {
+       rspamd_io_dispatcher_t *dispather;
+       enum {
+               KVSTORAGE_STATE_READ_CMD,
+               KVSTORAGE_STATE_READ_DATA
+       } state;
+       enum {
+               KVSTORAGE_CMD_SET,
+               KVSTORAGE_CMD_GET,
+               KVSTORAGE_CMD_DELETE,
+               KVSTORAGE_CMD_QUIT
+       } command;
+       guint id;
+       memory_pool_t *pool;
+       gchar *key;
+       struct kvstorage_config *cf;
+       struct kvstorage_worker_thread *thr;
+       struct in_addr client_addr;
+       gint sock;
+       guint flags;
+       guint expire;
+       guint length;
+       time_t now;
+};
+
 gpointer init_kvstorage_worker (void);
 void start_kvstorage_worker (struct rspamd_worker *worker);