diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-10-31 18:37:41 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-10-31 18:37:41 +0300 |
commit | 98bd93ffddbb1f40884c5c12269948f230dbd0d0 (patch) | |
tree | e2e0595db3dfb373b27c9304b4e81452ba27de6c | |
parent | dee3bdecf7b338ded8df307703e2ebf6be209e30 (diff) | |
download | rspamd-98bd93ffddbb1f40884c5c12269948f230dbd0d0.tar.gz rspamd-98bd93ffddbb1f40884c5c12269948f230dbd0d0.zip |
* Implement basic functionality of key value storage
-rw-r--r-- | src/cfg_utils.c | 2 | ||||
-rw-r--r-- | src/cfg_xml.c | 18 | ||||
-rw-r--r-- | src/cfg_xml.h | 3 | ||||
-rw-r--r-- | src/kvstorage.c | 104 | ||||
-rw-r--r-- | src/kvstorage.h | 3 | ||||
-rw-r--r-- | src/kvstorage_config.c | 67 | ||||
-rw-r--r-- | src/kvstorage_server.c | 333 | ||||
-rw-r--r-- | src/kvstorage_server.h | 26 |
8 files changed, 486 insertions, 70 deletions
diff --git a/src/cfg_utils.c b/src/cfg_utils.c index ffebd5c46..48e95cf59 100644 --- a/src/cfg_utils.c +++ b/src/cfg_utils.c @@ -32,6 +32,7 @@ #include "classifiers/classifiers.h" #include "cfg_xml.h" #include "lua/lua_common.h" +#include "kvstorage_config.h" #define DEFAULT_SCORE 10.0 @@ -960,6 +961,7 @@ read_xml_config (struct config_file *cfg, const gchar *filename) ud.if_stack = g_queue_new (); ctx = g_markup_parse_context_new (&xml_parser, G_MARKUP_TREAT_CDATA_AS_TEXT, &ud, NULL); + init_kvstorage_config (); res = g_markup_parse_context_parse (ctx, data, st.st_size, &err); if (g_queue_get_length (ud.if_stack) != 0) { diff --git a/src/cfg_xml.c b/src/cfg_xml.c index edd8c9a17..f77bdf3aa 100644 --- a/src/cfg_xml.c +++ b/src/cfg_xml.c @@ -97,6 +97,7 @@ struct xml_subparser { enum xml_read_state state; const GMarkupParser *parser; gpointer user_data; + void (*fin_func)(gpointer ud); }; /* Here we describes our basic grammar */ @@ -1736,6 +1737,7 @@ rspamd_xml_start_element (GMarkupParseContext *context, const gchar *element_nam else if (subparsers != NULL && (subparser = g_hash_table_lookup (subparsers, element_name)) != NULL) { ud->state = XML_SUBPARSER; g_markup_parse_context_push (context, subparser->parser, subparser->user_data); + rspamd_strlcpy (ud->section_name, element_name, sizeof (ud->section_name)); } else { /* Extract other tags */ @@ -1812,6 +1814,7 @@ rspamd_xml_end_element (GMarkupParseContext *context, const gchar *element_name, gboolean res; gpointer tptr; struct wrk_cbdata wcd; + struct xml_subparser *subparser; if (g_ascii_strcasecmp (element_name, "if") == 0) { tptr = g_queue_pop_head (ud->if_stack); @@ -1929,6 +1932,18 @@ rspamd_xml_end_element (GMarkupParseContext *context, const gchar *element_name, break; case XML_SKIP_ELEMENTS: return; + case XML_SUBPARSER: + CHECK_TAG (ud->section_name, TRUE); + if (subparsers != NULL && (subparser = g_hash_table_lookup (subparsers, element_name)) != NULL) { + if (subparser->fin_func) { + subparser->fin_func (g_markup_parse_context_pop (context)); + } + else { + g_markup_parse_context_pop (context); + } + } + ud->state = XML_READ_PARAM; + break; default: ud->state = XML_ERROR; break; @@ -2256,7 +2271,7 @@ register_classifier_opt (const gchar *ctype, const gchar *optname) } void -register_subparser (const gchar *tag, enum xml_read_state state, const GMarkupParser *parser, gpointer user_data) +register_subparser (const gchar *tag, enum xml_read_state state, const GMarkupParser *parser, void (*fin_func)(gpointer ud), gpointer user_data) { struct xml_subparser *subparser; @@ -2268,6 +2283,7 @@ register_subparser (const gchar *tag, enum xml_read_state state, const GMarkupPa subparser->parser = parser; subparser->state = state; subparser->user_data = user_data; + subparser->fin_func = fin_func; g_hash_table_replace (subparsers, g_strdup (tag), subparser); } diff --git a/src/cfg_xml.h b/src/cfg_xml.h index 6740e135e..f1a1742d3 100644 --- a/src/cfg_xml.h +++ b/src/cfg_xml.h @@ -167,7 +167,8 @@ void register_worker_opt (gint wtype, const gchar *optname, element_handler_func void register_classifier_opt (const gchar *ctype, const gchar *optname); /* Register new xml subparser */ -void register_subparser (const gchar *tag, enum xml_read_state state, const GMarkupParser *parser, gpointer user_data); +void register_subparser (const gchar *tag, enum xml_read_state state, + const GMarkupParser *parser, void (*fin_func)(gpointer ud), gpointer user_data); /* Check validity of module option */ gboolean check_module_option (const gchar *mname, const gchar *optname, const gchar *data); diff --git a/src/kvstorage.c b/src/kvstorage.c index 2cfad9459..aed039040 100644 --- a/src/kvstorage.c +++ b/src/kvstorage.c @@ -57,6 +57,8 @@ rspamd_kv_storage_new (gint id, const gchar *name, struct rspamd_kv_cache *cache rspamd_snprintf (new->name, sizeof ("18446744073709551616"), "%d", id); } + g_static_rw_lock_init (&new->rwlock); + /* Init structures */ if (new->cache->init_func) { new->cache->init_func (new->cache); @@ -78,23 +80,26 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, struct rsp gint steps = 0; /* Hard limit */ - if (elt->size > storage->max_memory) { - msg_info ("<%s>: trying to insert value of length %z while limit is %z", elt->size, storage->max_memory); - return FALSE; - } - - /* Now check limits */ - while (storage->memory + elt->size > storage->max_memory || storage->elts >= storage->max_elts) { - if (storage->expire) { - storage->expire->step_func (storage->expire, storage, time (NULL)); - } - else { - msg_warn ("<%s>: storage %s is full and no expire function is defined", storage->name); - } - if (++steps > MAX_EXPIRE_STEPS) { - msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); + if (storage->max_memory > 0) { + if (elt->size > storage->max_memory) { + msg_info ("<%s>: trying to insert value of length %z while limit is %z", storage->name, + elt->size, storage->max_memory); return FALSE; } + + /* Now check limits */ + while (storage->memory + elt->size > storage->max_memory || storage->elts >= storage->max_elts) { + if (storage->expire) { + storage->expire->step_func (storage->expire, storage, time (NULL)); + } + else { + msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); + } + if (++steps > MAX_EXPIRE_STEPS) { + msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); + return FALSE; + } + } } /* Insert elt to the cache */ @@ -116,30 +121,33 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, struct rsp /** Insert new element to the kv storage */ gboolean -rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpointer data, gsize len, gint flags) +rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpointer data, gsize len, gint flags, guint expire) { gint steps = 0; struct rspamd_kv_element *elt; gboolean res = TRUE; /* Hard limit */ - if (len > storage->max_memory) { - msg_info ("<%s>: trying to insert value of length %z while limit is %z", len, storage->max_memory); - return FALSE; - } - - /* Now check limits */ - while (storage->memory + len > storage->max_memory || storage->elts >= storage->max_elts) { - if (storage->expire) { - storage->expire->step_func (storage->expire, storage, time (NULL)); - } - else { - msg_warn ("<%s>: storage %s is full and no expire function is defined", storage->name); - } - if (++steps > MAX_EXPIRE_STEPS) { - msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); + if (storage->max_memory > 0) { + if (len > storage->max_memory) { + msg_info ("<%s>: trying to insert value of length %z while limit is %z", storage->name, + len, storage->max_memory); return FALSE; } + + /* Now check limits */ + while (storage->memory + len > storage->max_memory || storage->elts >= storage->max_elts) { + if (storage->expire) { + storage->expire->step_func (storage->expire, storage, time (NULL)); + } + else { + msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); + } + if (++steps > MAX_EXPIRE_STEPS) { + msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); + return FALSE; + } + } } /* Insert elt to the cache */ @@ -149,6 +157,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpoin } elt->flags = flags; elt->size = len; + elt->expire = expire; /* Place to the backend */ if (storage->backend) { @@ -174,23 +183,26 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, stru gint steps = 0; /* Hard limit */ - if (elt->size > storage->max_memory) { - msg_info ("<%s>: trying to replace value of length %z while limit is %z", elt->size, storage->max_memory); - return FALSE; - } - - /* Now check limits */ - while (storage->memory + elt->size > storage->max_memory) { - if (storage->expire) { - storage->expire->step_func (storage->expire, storage, time (NULL)); - } - else { - msg_warn ("<%s>: storage %s is full and no expire function is defined", storage->name); - } - if (++steps > MAX_EXPIRE_STEPS) { - msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); + if (storage->max_memory > 0) { + if (elt->size > storage->max_memory) { + msg_info ("<%s>: trying to replace value of length %z while limit is %z", storage->name, + elt->size, storage->max_memory); return FALSE; } + + /* Now check limits */ + while (storage->memory + elt->size > storage->max_memory) { + if (storage->expire) { + storage->expire->step_func (storage->expire, storage, time (NULL)); + } + else { + msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); + } + if (++steps > MAX_EXPIRE_STEPS) { + msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); + return FALSE; + } + } } /* Insert elt to the cache */ diff --git a/src/kvstorage.h b/src/kvstorage.h index 4bb7f20a7..5bacc7c6b 100644 --- a/src/kvstorage.h +++ b/src/kvstorage.h @@ -117,6 +117,7 @@ struct rspamd_kv_storage { gint id; /* char ID */ gchar *name; /* numeric ID */ + GStaticRWLock rwlock; /* rwlock for threaded access */ }; /** Create new kv storage */ @@ -125,7 +126,7 @@ struct rspamd_kv_storage *rspamd_kv_storage_new (gint id, const gchar *name, gsize max_elts, gsize max_memory); /** Insert new element to the kv storage */ -gboolean rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpointer data, gsize len, gint flags); +gboolean rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpointer data, gsize len, gint flags, guint expire); /** Replace an element in the kv storage */ gboolean rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, struct rspamd_kv_element *elt); diff --git a/src/kvstorage_config.c b/src/kvstorage_config.c index 1fb158ec6..bbffb4799 100644 --- a/src/kvstorage_config.c +++ b/src/kvstorage_config.c @@ -115,37 +115,47 @@ void kvstorage_xml_start_element (GMarkupParseContext *context, switch (kv_parser->state) { case KVSTORAGE_STATE_INIT: - /* Make temporary pool */ - if (kv_parser->pool != NULL) { - memory_pool_delete (kv_parser->pool); - } - kv_parser->pool = memory_pool_new (memory_pool_get_size ()); - - /* Create new kvstorage_config */ - kv_parser->current_storage = g_malloc0 (sizeof (struct kvstorage_config)); - kv_parser->current_storage->id = ++last_id; + /* XXX: never get this state */ break; case KVSTORAGE_STATE_PARAM: + if (kv_parser->current_storage == NULL) { + /* Make temporary pool */ + if (kv_parser->pool != NULL) { + memory_pool_delete (kv_parser->pool); + } + kv_parser->pool = memory_pool_new (memory_pool_get_size ()); + + /* Create new kvstorage_config */ + kv_parser->current_storage = g_malloc0 (sizeof (struct kvstorage_config)); + kv_parser->current_storage->id = last_id++; + } if (g_ascii_strcasecmp (element_name, "type") == 0) { kv_parser->state = KVSTORAGE_STATE_CACHE_TYPE; + kv_parser->cur_elt = "type"; } else if (g_ascii_strcasecmp (element_name, "max_elements") == 0) { kv_parser->state = KVSTORAGE_STATE_CACHE_MAX_ELTS; + kv_parser->cur_elt = "max_elements"; } else if (g_ascii_strcasecmp (element_name, "max_memory") == 0) { kv_parser->state = KVSTORAGE_STATE_CACHE_MAX_MEM; + kv_parser->cur_elt = "max_memory"; } else if (g_ascii_strcasecmp (element_name, "id") == 0) { kv_parser->state = KVSTORAGE_STATE_ID; + kv_parser->cur_elt = "id"; } else if (g_ascii_strcasecmp (element_name, "name") == 0) { kv_parser->state = KVSTORAGE_STATE_NAME; + kv_parser->cur_elt = "name"; } else if (g_ascii_strcasecmp (element_name, "backend") == 0) { kv_parser->state = KVSTORAGE_STATE_BACKEND; + kv_parser->cur_elt = "backend"; } else if (g_ascii_strcasecmp (element_name, "expire") == 0) { kv_parser->state = KVSTORAGE_STATE_EXPIRE; + kv_parser->cur_elt = "expire"; } else { if (*error == NULL) { @@ -154,11 +164,11 @@ void kvstorage_xml_start_element (GMarkupParseContext *context, } kv_parser->state = KVSTORAGE_STATE_ERROR; } - kv_parser->cur_elt = memory_pool_strdup (kv_parser->pool, element_name); break; case KVSTORAGE_STATE_BACKEND: if (g_ascii_strcasecmp (element_name, "type") == 0) { kv_parser->state = KVSTORAGE_STATE_BACKEND_TYPE; + kv_parser->cur_elt = "type"; } else { if (*error == NULL) { @@ -171,6 +181,7 @@ void kvstorage_xml_start_element (GMarkupParseContext *context, case KVSTORAGE_STATE_EXPIRE: if (g_ascii_strcasecmp (element_name, "type") == 0) { kv_parser->state = KVSTORAGE_STATE_EXPIRE_TYPE; + kv_parser->cur_elt = "type"; } else { if (*error == NULL) { @@ -211,10 +222,6 @@ void kvstorage_xml_end_element (GMarkupParseContext *context, case KVSTORAGE_STATE_PARAM: if (g_ascii_strcasecmp (element_name, "keystorage") == 0) { /* XXX: Init actual storage */ - g_hash_table_insert (storages, &kv_parser->current_storage->id, kv_parser->current_storage); - kv_parser->state = KVSTORAGE_STATE_INIT; - g_markup_parse_context_pop (context); - g_hash_table_foreach (storages, kvstorage_init_callback, NULL); return; } if (*error == NULL) { @@ -277,6 +284,15 @@ void kvstorage_xml_text (GMarkupParseContext *context, struct kvstorage_config_parser *kv_parser = user_data; gchar *err_str; + /* Strip space symbols */ + while (*text && g_ascii_isspace (*text)) { + text ++; + } + if (*text == '\0') { + /* Skip empty text */ + return; + } + switch (kv_parser->state) { case KVSTORAGE_STATE_INIT: case KVSTORAGE_STATE_PARAM: @@ -294,7 +310,8 @@ void kvstorage_xml_text (GMarkupParseContext *context, kv_parser->state = KVSTORAGE_STATE_ERROR; } else { - last_id = kv_parser->current_storage->id; + last_id ++; + last_id = MAX (kv_parser->current_storage->id, last_id); } break; case KVSTORAGE_STATE_NAME: @@ -353,13 +370,28 @@ void kvstorage_xml_text (GMarkupParseContext *context, /* Called on error, including one set by other * methods in the vtable. The GError should not be freed. */ -void kvstorage_xml_error (GMarkupParseContext *context, +void +kvstorage_xml_error (GMarkupParseContext *context, GError *error, gpointer user_data) { msg_err ("kvstorage xml parser error: %s", error->message); } +/* + * Cleanup kvstorage after end tag was read + */ +static void +kvstorage_cleanup (gpointer ud) +{ + struct kvstorage_config_parser *kv_parser = ud; + + g_hash_table_insert (storages, &kv_parser->current_storage->id, kv_parser->current_storage); + kv_parser->state = KVSTORAGE_STATE_INIT; + g_hash_table_foreach (storages, kvstorage_init_callback, NULL); + kv_parser->current_storage = NULL; +} + /** Public API */ /* Init subparser of kvstorage config */ @@ -387,8 +419,9 @@ init_kvstorage_config (void) kv_parser = g_malloc0 (sizeof (struct kvstorage_config_parser)); kv_parser->state = KVSTORAGE_STATE_PARAM; + kv_parser->pool = memory_pool_new (memory_pool_get_size ()); - register_subparser ("keystorage", XML_READ_START, parser, kv_parser); + register_subparser ("keystorage", XML_READ_START, parser, kvstorage_cleanup, kv_parser); } /* Get configuration for kvstorage with specified ID */ diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index 78c6c34e3..81d04d4c2 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -30,6 +30,13 @@ #include "cfg_xml.h" #include "main.h" +#define ERROR_COMMON "ERROR" CRLF +#define ERROR_UNKNOWN_COMMAND "CLIENT_ERROR unknown command" CRLF +#define ERROR_NOT_STORED "NOT_STORED" CRLF +#define ERROR_EXISTS "EXISTS" CRLF +#define ERROR_NOT_FOUND "NOT_FOUND" CRLF +#define ERROR_INVALID_KEYSTORAGE "CLIENT_ERROR storage does not exists" CRLF + /* This is required for normal signals processing */ static GList *global_evbases = NULL; static struct event_base *main_base = NULL; @@ -148,11 +155,311 @@ config_kvstorage_worker (struct rspamd_worker *worker) return TRUE; } +/* + * Free kvstorage session + */ +static void +free_kvstorage_session (struct kvstorage_session *session) +{ + rspamd_remove_dispatcher (session->dispather); + memory_pool_delete (session->pool); + close (session->sock); +} + /** - * Accept function + * Parse kvstorage command + */ +static gboolean +parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in) +{ + gchar *p, *c, *end; + gint state = 0, next_state; + + p = in->begin; + end = in->begin + in->len; + c = p; + + /* State machine for parsing */ + while (p <= end) { + switch (state) { + case 0: + /* At this state we try to read identifier of storage */ + if (g_ascii_isdigit (*p)) { + p ++; + } + else { + if (g_ascii_isspace (*p) && p != c) { + /* We have some digits, so parse id */ + session->id = strtoul (c, NULL, 10); + state = 99; + next_state = 1; + } + else if (c == p) { + /* We have some character, so assume id as 0 and parse command */ + session->id = 0; + state = 1; + } + else { + /* We have something wrong here (like some digits and then come non-digits) */ + return FALSE; + } + } + break; + case 1: + /* At this state we parse command */ + if (g_ascii_isalpha (*p) && p != end) { + p ++; + } + else { + if ((g_ascii_isspace (*p) || p == end) && p != c) { + /* We got some command, try to parse it */ + if (p - c == 3) { + /* Set or get command */ + if (memcmp (c, "get", 3) == 0) { + session->command = KVSTORAGE_CMD_GET; + } + else if (memcmp (c, "set", 3) == 0) { + session->command = KVSTORAGE_CMD_SET; + } + else { + /* Error */ + return FALSE; + } + } + else if (p - c == 4) { + if (memcmp (c, "quit", 4) == 0) { + session->command = KVSTORAGE_CMD_QUIT; + state = 100; + continue; + } + } + else if (p - c == 6) { + if (memcmp (c, "delete", 6) == 0) { + session->command = KVSTORAGE_CMD_DELETE; + } + else { + return FALSE; + } + } + else { + return FALSE; + } + /* Skip spaces and try to parse key */ + state = 99; + next_state = 2; + } + else { + /* Some error */ + return FALSE; + } + } + break; + case 2: + /* Read and store key */ + if (!g_ascii_isspace (*p) && end != p) { + p ++; + } + else { + if (p == c) { + return FALSE; + } + else { + session->key = memory_pool_alloc (session->pool, p - c + 1); + rspamd_strlcpy (session->key, c, p - c + 1); + /* Now we must select next state based on command */ + if (session->command == KVSTORAGE_CMD_SET) { + /* Read flags */ + state = 99; + next_state = 3; + } + else { + /* Nothing to read for other commands */ + state = 100; + } + } + } + break; + case 3: + /* Read flags */ + if (g_ascii_isdigit (*p)) { + p ++; + } + else { + if (g_ascii_isspace (*p)) { + session->flags = strtoul (c, NULL, 10); + state = 99; + next_state = 4; + } + else { + return FALSE; + } + } + break; + case 4: + /* Read exptime */ + if (g_ascii_isdigit (*p)) { + p ++; + } + else { + if (g_ascii_isspace (*p)) { + session->expire = strtoul (c, NULL, 10); + state = 99; + next_state = 5; + } + else { + return FALSE; + } + } + break; + case 5: + /* Read size */ + if (g_ascii_isdigit (*p)) { + p ++; + } + else { + if (g_ascii_isspace (*p) || end == p) { + session->length = strtoul (c, NULL, 10); + state = 100; + } + else { + return FALSE; + } + } + break; + case 99: + /* Skip spaces state */ + if (g_ascii_isspace (*p)) { + p ++; + } + else { + c = p; + state = next_state; + } + break; + case 100: + /* Successful state */ + return TRUE; + break; + } + } + + return state == 100; +} + +/** + * Dispatcher callbacks + */ +/* + * Callback that is called when there is data to read in buffer */ +static gboolean +kvstorage_read_socket (f_str_t * in, void *arg) +{ + struct kvstorage_session *session = (struct kvstorage_session *) arg; + struct kvstorage_worker_thread *thr; + struct rspamd_kv_element *elt; + gint r; + gchar outbuf[BUFSIZ]; + + if (in->len == 0) { + /* Skip empty commands */ + return TRUE; + } + thr = session->thr; + switch (session->state) { + case KVSTORAGE_STATE_READ_CMD: + /* Update timestamp */ + session->now = time (NULL); + if (! parse_kvstorage_command (session, in)) { + thr_info ("%ud: unknown command: %V", thr->id, in); + return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND, + sizeof (ERROR_UNKNOWN_COMMAND) - 1, FALSE, TRUE); + } + else { + session->cf = get_kvstorage_config (session->id); + if (session->cf == NULL) { + thr_info ("%ud: bad keystorage: %ud", thr->id, session->id); + return rspamd_dispatcher_write (session->dispather, ERROR_INVALID_KEYSTORAGE, + sizeof (ERROR_INVALID_KEYSTORAGE) - 1, FALSE, TRUE); + } + if (session->command == KVSTORAGE_CMD_SET) { + session->state = KVSTORAGE_STATE_READ_DATA; + rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->length); + } + else if (session->command == KVSTORAGE_CMD_GET) { + elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->now); + if (elt == NULL) { + return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, + sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); + } + else { + r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF, + elt->key, elt->flags, elt->size); + if (!rspamd_dispatcher_write (session->dispather, outbuf, + r, FALSE, FALSE)) { + return FALSE; + } + if (!rspamd_dispatcher_write (session->dispather, elt->data, elt->size, FALSE, TRUE)) { + return FALSE; + } + return rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF, + sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE); + } + } + else if (session->command == KVSTORAGE_CMD_DELETE) { + if (rspamd_kv_storage_delete (session->cf->storage, session->key)) { + return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF, + sizeof ("DELETED" CRLF) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, + sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); + } + } + else if (session->command == KVSTORAGE_CMD_QUIT) { + /* Quit session */ + free_kvstorage_session (session); + return FALSE; + } + } + break; + case KVSTORAGE_STATE_READ_DATA: + session->state = KVSTORAGE_STATE_READ_CMD; + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + if (rspamd_kv_storage_insert (session->cf->storage, session->key, in->begin, in->len, + session->flags, session->expire)) { + return rspamd_dispatcher_write (session->dispather, "STORED" CRLF, + sizeof ("STORED" CRLF) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED, + sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE); + } + + break; + } + + return TRUE; +} + /* - * Accept new connection and construct task + * Called if something goes wrong + */ +static void +kvstorage_err_socket (GError * err, void *arg) +{ + struct kvstorage_session *session = (struct kvstorage_session *) arg; + struct kvstorage_worker_thread *thr; + + thr = session->thr; + thr_info ("%ud: abnormally closing connection from: %s, error: %s", + thr->id, inet_ntoa (session->client_addr), err->message); + g_error_free (err); + free_kvstorage_session (session); +} + +/** + * Accept function */ static void thr_accept_socket (gint fd, short what, void *arg) @@ -161,6 +468,7 @@ thr_accept_socket (gint fd, short what, void *arg) union sa_union su; socklen_t addrlen = sizeof (su.ss); gint nfd; + struct kvstorage_session *session; if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno)); @@ -171,15 +479,26 @@ thr_accept_socket (gint fd, short what, void *arg) return; } + session = g_malloc (sizeof (struct kvstorage_session)); + session->pool = memory_pool_new (memory_pool_get_size ()); + session->state = KVSTORAGE_STATE_READ_CMD; + session->thr = thr; + session->sock = nfd; + session->dispather = rspamd_create_dispatcher (thr->ctx->ev_base, nfd, BUFFER_LINE, + kvstorage_read_socket, NULL, + kvstorage_err_socket, thr->tv, session); + session->dispather->strip_eol = TRUE; + if (su.ss.ss_family == AF_UNIX) { thr_info ("%ud: accepted connection from unix socket", thr->id); + session->client_addr.s_addr = INADDR_NONE; } else if (su.ss.ss_family == AF_INET) { thr_info ("%ud: accepted connection from %s port %d", thr->id, inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port)); + memcpy (&session->client_addr, &su.s4.sin_addr, + sizeof (struct in_addr)); } - /* XXX: write the logic */ - close (nfd); } /** @@ -253,6 +572,12 @@ start_kvstorage_worker (struct rspamd_worker *worker) ctx->threads = NULL; g_thread_init (NULL); +#if _EVENT_NUMERIC_VERSION > 0x02000000 + if (evthread_use_pthreads () == -1) { + msg_err ("threads support is not supported in your libevent so kvstorage is not functionable"); + exit (EXIT_SUCCESS); + } +#endif main_base = ctx->ev_base; /* Set kvstorage options */ diff --git a/src/kvstorage_server.h b/src/kvstorage_server.h index 29107101c..6f22b08c2 100644 --- a/src/kvstorage_server.h +++ b/src/kvstorage_server.h @@ -27,6 +27,7 @@ #include "config.h" #include "mem_pool.h" +#include "buffer.h" /* Configuration context for kvstorage worker */ struct kvstorage_worker_ctx { @@ -50,6 +51,31 @@ struct kvstorage_worker_thread { guint id; }; +struct kvstorage_session { + rspamd_io_dispatcher_t *dispather; + enum { + KVSTORAGE_STATE_READ_CMD, + KVSTORAGE_STATE_READ_DATA + } state; + enum { + KVSTORAGE_CMD_SET, + KVSTORAGE_CMD_GET, + KVSTORAGE_CMD_DELETE, + KVSTORAGE_CMD_QUIT + } command; + guint id; + memory_pool_t *pool; + gchar *key; + struct kvstorage_config *cf; + struct kvstorage_worker_thread *thr; + struct in_addr client_addr; + gint sock; + guint flags; + guint expire; + guint length; + time_t now; +}; + gpointer init_kvstorage_worker (void); void start_kvstorage_worker (struct rspamd_worker *worker); |