aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-10-31 18:37:41 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-10-31 18:37:41 +0300
commit98bd93ffddbb1f40884c5c12269948f230dbd0d0 (patch)
treee2e0595db3dfb373b27c9304b4e81452ba27de6c
parentdee3bdecf7b338ded8df307703e2ebf6be209e30 (diff)
downloadrspamd-98bd93ffddbb1f40884c5c12269948f230dbd0d0.tar.gz
rspamd-98bd93ffddbb1f40884c5c12269948f230dbd0d0.zip
* Implement basic functionality of key value storage
-rw-r--r--src/cfg_utils.c2
-rw-r--r--src/cfg_xml.c18
-rw-r--r--src/cfg_xml.h3
-rw-r--r--src/kvstorage.c104
-rw-r--r--src/kvstorage.h3
-rw-r--r--src/kvstorage_config.c67
-rw-r--r--src/kvstorage_server.c333
-rw-r--r--src/kvstorage_server.h26
8 files changed, 486 insertions, 70 deletions
diff --git a/src/cfg_utils.c b/src/cfg_utils.c
index ffebd5c46..48e95cf59 100644
--- a/src/cfg_utils.c
+++ b/src/cfg_utils.c
@@ -32,6 +32,7 @@
#include "classifiers/classifiers.h"
#include "cfg_xml.h"
#include "lua/lua_common.h"
+#include "kvstorage_config.h"
#define DEFAULT_SCORE 10.0
@@ -960,6 +961,7 @@ read_xml_config (struct config_file *cfg, const gchar *filename)
ud.if_stack = g_queue_new ();
ctx = g_markup_parse_context_new (&xml_parser, G_MARKUP_TREAT_CDATA_AS_TEXT, &ud, NULL);
+ init_kvstorage_config ();
res = g_markup_parse_context_parse (ctx, data, st.st_size, &err);
if (g_queue_get_length (ud.if_stack) != 0) {
diff --git a/src/cfg_xml.c b/src/cfg_xml.c
index edd8c9a17..f77bdf3aa 100644
--- a/src/cfg_xml.c
+++ b/src/cfg_xml.c
@@ -97,6 +97,7 @@ struct xml_subparser {
enum xml_read_state state;
const GMarkupParser *parser;
gpointer user_data;
+ void (*fin_func)(gpointer ud);
};
/* Here we describes our basic grammar */
@@ -1736,6 +1737,7 @@ rspamd_xml_start_element (GMarkupParseContext *context, const gchar *element_nam
else if (subparsers != NULL && (subparser = g_hash_table_lookup (subparsers, element_name)) != NULL) {
ud->state = XML_SUBPARSER;
g_markup_parse_context_push (context, subparser->parser, subparser->user_data);
+ rspamd_strlcpy (ud->section_name, element_name, sizeof (ud->section_name));
}
else {
/* Extract other tags */
@@ -1812,6 +1814,7 @@ rspamd_xml_end_element (GMarkupParseContext *context, const gchar *element_name,
gboolean res;
gpointer tptr;
struct wrk_cbdata wcd;
+ struct xml_subparser *subparser;
if (g_ascii_strcasecmp (element_name, "if") == 0) {
tptr = g_queue_pop_head (ud->if_stack);
@@ -1929,6 +1932,18 @@ rspamd_xml_end_element (GMarkupParseContext *context, const gchar *element_name,
break;
case XML_SKIP_ELEMENTS:
return;
+ case XML_SUBPARSER:
+ CHECK_TAG (ud->section_name, TRUE);
+ if (subparsers != NULL && (subparser = g_hash_table_lookup (subparsers, element_name)) != NULL) {
+ if (subparser->fin_func) {
+ subparser->fin_func (g_markup_parse_context_pop (context));
+ }
+ else {
+ g_markup_parse_context_pop (context);
+ }
+ }
+ ud->state = XML_READ_PARAM;
+ break;
default:
ud->state = XML_ERROR;
break;
@@ -2256,7 +2271,7 @@ register_classifier_opt (const gchar *ctype, const gchar *optname)
}
void
-register_subparser (const gchar *tag, enum xml_read_state state, const GMarkupParser *parser, gpointer user_data)
+register_subparser (const gchar *tag, enum xml_read_state state, const GMarkupParser *parser, void (*fin_func)(gpointer ud), gpointer user_data)
{
struct xml_subparser *subparser;
@@ -2268,6 +2283,7 @@ register_subparser (const gchar *tag, enum xml_read_state state, const GMarkupPa
subparser->parser = parser;
subparser->state = state;
subparser->user_data = user_data;
+ subparser->fin_func = fin_func;
g_hash_table_replace (subparsers, g_strdup (tag), subparser);
}
diff --git a/src/cfg_xml.h b/src/cfg_xml.h
index 6740e135e..f1a1742d3 100644
--- a/src/cfg_xml.h
+++ b/src/cfg_xml.h
@@ -167,7 +167,8 @@ void register_worker_opt (gint wtype, const gchar *optname, element_handler_func
void register_classifier_opt (const gchar *ctype, const gchar *optname);
/* Register new xml subparser */
-void register_subparser (const gchar *tag, enum xml_read_state state, const GMarkupParser *parser, gpointer user_data);
+void register_subparser (const gchar *tag, enum xml_read_state state,
+ const GMarkupParser *parser, void (*fin_func)(gpointer ud), gpointer user_data);
/* Check validity of module option */
gboolean check_module_option (const gchar *mname, const gchar *optname, const gchar *data);
diff --git a/src/kvstorage.c b/src/kvstorage.c
index 2cfad9459..aed039040 100644
--- a/src/kvstorage.c
+++ b/src/kvstorage.c
@@ -57,6 +57,8 @@ rspamd_kv_storage_new (gint id, const gchar *name, struct rspamd_kv_cache *cache
rspamd_snprintf (new->name, sizeof ("18446744073709551616"), "%d", id);
}
+ g_static_rw_lock_init (&new->rwlock);
+
/* Init structures */
if (new->cache->init_func) {
new->cache->init_func (new->cache);
@@ -78,23 +80,26 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, struct rsp
gint steps = 0;
/* Hard limit */
- if (elt->size > storage->max_memory) {
- msg_info ("<%s>: trying to insert value of length %z while limit is %z", elt->size, storage->max_memory);
- return FALSE;
- }
-
- /* Now check limits */
- while (storage->memory + elt->size > storage->max_memory || storage->elts >= storage->max_elts) {
- if (storage->expire) {
- storage->expire->step_func (storage->expire, storage, time (NULL));
- }
- else {
- msg_warn ("<%s>: storage %s is full and no expire function is defined", storage->name);
- }
- if (++steps > MAX_EXPIRE_STEPS) {
- msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
+ if (storage->max_memory > 0) {
+ if (elt->size > storage->max_memory) {
+ msg_info ("<%s>: trying to insert value of length %z while limit is %z", storage->name,
+ elt->size, storage->max_memory);
return FALSE;
}
+
+ /* Now check limits */
+ while (storage->memory + elt->size > storage->max_memory || storage->elts >= storage->max_elts) {
+ if (storage->expire) {
+ storage->expire->step_func (storage->expire, storage, time (NULL));
+ }
+ else {
+ msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
+ }
+ if (++steps > MAX_EXPIRE_STEPS) {
+ msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
+ return FALSE;
+ }
+ }
}
/* Insert elt to the cache */
@@ -116,30 +121,33 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, struct rsp
/** Insert new element to the kv storage */
gboolean
-rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpointer data, gsize len, gint flags)
+rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpointer data, gsize len, gint flags, guint expire)
{
gint steps = 0;
struct rspamd_kv_element *elt;
gboolean res = TRUE;
/* Hard limit */
- if (len > storage->max_memory) {
- msg_info ("<%s>: trying to insert value of length %z while limit is %z", len, storage->max_memory);
- return FALSE;
- }
-
- /* Now check limits */
- while (storage->memory + len > storage->max_memory || storage->elts >= storage->max_elts) {
- if (storage->expire) {
- storage->expire->step_func (storage->expire, storage, time (NULL));
- }
- else {
- msg_warn ("<%s>: storage %s is full and no expire function is defined", storage->name);
- }
- if (++steps > MAX_EXPIRE_STEPS) {
- msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
+ if (storage->max_memory > 0) {
+ if (len > storage->max_memory) {
+ msg_info ("<%s>: trying to insert value of length %z while limit is %z", storage->name,
+ len, storage->max_memory);
return FALSE;
}
+
+ /* Now check limits */
+ while (storage->memory + len > storage->max_memory || storage->elts >= storage->max_elts) {
+ if (storage->expire) {
+ storage->expire->step_func (storage->expire, storage, time (NULL));
+ }
+ else {
+ msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
+ }
+ if (++steps > MAX_EXPIRE_STEPS) {
+ msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
+ return FALSE;
+ }
+ }
}
/* Insert elt to the cache */
@@ -149,6 +157,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpoin
}
elt->flags = flags;
elt->size = len;
+ elt->expire = expire;
/* Place to the backend */
if (storage->backend) {
@@ -174,23 +183,26 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, stru
gint steps = 0;
/* Hard limit */
- if (elt->size > storage->max_memory) {
- msg_info ("<%s>: trying to replace value of length %z while limit is %z", elt->size, storage->max_memory);
- return FALSE;
- }
-
- /* Now check limits */
- while (storage->memory + elt->size > storage->max_memory) {
- if (storage->expire) {
- storage->expire->step_func (storage->expire, storage, time (NULL));
- }
- else {
- msg_warn ("<%s>: storage %s is full and no expire function is defined", storage->name);
- }
- if (++steps > MAX_EXPIRE_STEPS) {
- msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
+ if (storage->max_memory > 0) {
+ if (elt->size > storage->max_memory) {
+ msg_info ("<%s>: trying to replace value of length %z while limit is %z", storage->name,
+ elt->size, storage->max_memory);
return FALSE;
}
+
+ /* Now check limits */
+ while (storage->memory + elt->size > storage->max_memory) {
+ if (storage->expire) {
+ storage->expire->step_func (storage->expire, storage, time (NULL));
+ }
+ else {
+ msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
+ }
+ if (++steps > MAX_EXPIRE_STEPS) {
+ msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
+ return FALSE;
+ }
+ }
}
/* Insert elt to the cache */
diff --git a/src/kvstorage.h b/src/kvstorage.h
index 4bb7f20a7..5bacc7c6b 100644
--- a/src/kvstorage.h
+++ b/src/kvstorage.h
@@ -117,6 +117,7 @@ struct rspamd_kv_storage {
gint id; /* char ID */
gchar *name; /* numeric ID */
+ GStaticRWLock rwlock; /* rwlock for threaded access */
};
/** Create new kv storage */
@@ -125,7 +126,7 @@ struct rspamd_kv_storage *rspamd_kv_storage_new (gint id, const gchar *name,
gsize max_elts, gsize max_memory);
/** Insert new element to the kv storage */
-gboolean rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpointer data, gsize len, gint flags);
+gboolean rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpointer data, gsize len, gint flags, guint expire);
/** Replace an element in the kv storage */
gboolean rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, struct rspamd_kv_element *elt);
diff --git a/src/kvstorage_config.c b/src/kvstorage_config.c
index 1fb158ec6..bbffb4799 100644
--- a/src/kvstorage_config.c
+++ b/src/kvstorage_config.c
@@ -115,37 +115,47 @@ void kvstorage_xml_start_element (GMarkupParseContext *context,
switch (kv_parser->state) {
case KVSTORAGE_STATE_INIT:
- /* Make temporary pool */
- if (kv_parser->pool != NULL) {
- memory_pool_delete (kv_parser->pool);
- }
- kv_parser->pool = memory_pool_new (memory_pool_get_size ());
-
- /* Create new kvstorage_config */
- kv_parser->current_storage = g_malloc0 (sizeof (struct kvstorage_config));
- kv_parser->current_storage->id = ++last_id;
+ /* XXX: never get this state */
break;
case KVSTORAGE_STATE_PARAM:
+ if (kv_parser->current_storage == NULL) {
+ /* Make temporary pool */
+ if (kv_parser->pool != NULL) {
+ memory_pool_delete (kv_parser->pool);
+ }
+ kv_parser->pool = memory_pool_new (memory_pool_get_size ());
+
+ /* Create new kvstorage_config */
+ kv_parser->current_storage = g_malloc0 (sizeof (struct kvstorage_config));
+ kv_parser->current_storage->id = last_id++;
+ }
if (g_ascii_strcasecmp (element_name, "type") == 0) {
kv_parser->state = KVSTORAGE_STATE_CACHE_TYPE;
+ kv_parser->cur_elt = "type";
}
else if (g_ascii_strcasecmp (element_name, "max_elements") == 0) {
kv_parser->state = KVSTORAGE_STATE_CACHE_MAX_ELTS;
+ kv_parser->cur_elt = "max_elements";
}
else if (g_ascii_strcasecmp (element_name, "max_memory") == 0) {
kv_parser->state = KVSTORAGE_STATE_CACHE_MAX_MEM;
+ kv_parser->cur_elt = "max_memory";
}
else if (g_ascii_strcasecmp (element_name, "id") == 0) {
kv_parser->state = KVSTORAGE_STATE_ID;
+ kv_parser->cur_elt = "id";
}
else if (g_ascii_strcasecmp (element_name, "name") == 0) {
kv_parser->state = KVSTORAGE_STATE_NAME;
+ kv_parser->cur_elt = "name";
}
else if (g_ascii_strcasecmp (element_name, "backend") == 0) {
kv_parser->state = KVSTORAGE_STATE_BACKEND;
+ kv_parser->cur_elt = "backend";
}
else if (g_ascii_strcasecmp (element_name, "expire") == 0) {
kv_parser->state = KVSTORAGE_STATE_EXPIRE;
+ kv_parser->cur_elt = "expire";
}
else {
if (*error == NULL) {
@@ -154,11 +164,11 @@ void kvstorage_xml_start_element (GMarkupParseContext *context,
}
kv_parser->state = KVSTORAGE_STATE_ERROR;
}
- kv_parser->cur_elt = memory_pool_strdup (kv_parser->pool, element_name);
break;
case KVSTORAGE_STATE_BACKEND:
if (g_ascii_strcasecmp (element_name, "type") == 0) {
kv_parser->state = KVSTORAGE_STATE_BACKEND_TYPE;
+ kv_parser->cur_elt = "type";
}
else {
if (*error == NULL) {
@@ -171,6 +181,7 @@ void kvstorage_xml_start_element (GMarkupParseContext *context,
case KVSTORAGE_STATE_EXPIRE:
if (g_ascii_strcasecmp (element_name, "type") == 0) {
kv_parser->state = KVSTORAGE_STATE_EXPIRE_TYPE;
+ kv_parser->cur_elt = "type";
}
else {
if (*error == NULL) {
@@ -211,10 +222,6 @@ void kvstorage_xml_end_element (GMarkupParseContext *context,
case KVSTORAGE_STATE_PARAM:
if (g_ascii_strcasecmp (element_name, "keystorage") == 0) {
/* XXX: Init actual storage */
- g_hash_table_insert (storages, &kv_parser->current_storage->id, kv_parser->current_storage);
- kv_parser->state = KVSTORAGE_STATE_INIT;
- g_markup_parse_context_pop (context);
- g_hash_table_foreach (storages, kvstorage_init_callback, NULL);
return;
}
if (*error == NULL) {
@@ -277,6 +284,15 @@ void kvstorage_xml_text (GMarkupParseContext *context,
struct kvstorage_config_parser *kv_parser = user_data;
gchar *err_str;
+ /* Strip space symbols */
+ while (*text && g_ascii_isspace (*text)) {
+ text ++;
+ }
+ if (*text == '\0') {
+ /* Skip empty text */
+ return;
+ }
+
switch (kv_parser->state) {
case KVSTORAGE_STATE_INIT:
case KVSTORAGE_STATE_PARAM:
@@ -294,7 +310,8 @@ void kvstorage_xml_text (GMarkupParseContext *context,
kv_parser->state = KVSTORAGE_STATE_ERROR;
}
else {
- last_id = kv_parser->current_storage->id;
+ last_id ++;
+ last_id = MAX (kv_parser->current_storage->id, last_id);
}
break;
case KVSTORAGE_STATE_NAME:
@@ -353,13 +370,28 @@ void kvstorage_xml_text (GMarkupParseContext *context,
/* Called on error, including one set by other
* methods in the vtable. The GError should not be freed.
*/
-void kvstorage_xml_error (GMarkupParseContext *context,
+void
+kvstorage_xml_error (GMarkupParseContext *context,
GError *error,
gpointer user_data)
{
msg_err ("kvstorage xml parser error: %s", error->message);
}
+/*
+ * Cleanup kvstorage after end tag was read
+ */
+static void
+kvstorage_cleanup (gpointer ud)
+{
+ struct kvstorage_config_parser *kv_parser = ud;
+
+ g_hash_table_insert (storages, &kv_parser->current_storage->id, kv_parser->current_storage);
+ kv_parser->state = KVSTORAGE_STATE_INIT;
+ g_hash_table_foreach (storages, kvstorage_init_callback, NULL);
+ kv_parser->current_storage = NULL;
+}
+
/** Public API */
/* Init subparser of kvstorage config */
@@ -387,8 +419,9 @@ init_kvstorage_config (void)
kv_parser = g_malloc0 (sizeof (struct kvstorage_config_parser));
kv_parser->state = KVSTORAGE_STATE_PARAM;
+ kv_parser->pool = memory_pool_new (memory_pool_get_size ());
- register_subparser ("keystorage", XML_READ_START, parser, kv_parser);
+ register_subparser ("keystorage", XML_READ_START, parser, kvstorage_cleanup, kv_parser);
}
/* Get configuration for kvstorage with specified ID */
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c
index 78c6c34e3..81d04d4c2 100644
--- a/src/kvstorage_server.c
+++ b/src/kvstorage_server.c
@@ -30,6 +30,13 @@
#include "cfg_xml.h"
#include "main.h"
+#define ERROR_COMMON "ERROR" CRLF
+#define ERROR_UNKNOWN_COMMAND "CLIENT_ERROR unknown command" CRLF
+#define ERROR_NOT_STORED "NOT_STORED" CRLF
+#define ERROR_EXISTS "EXISTS" CRLF
+#define ERROR_NOT_FOUND "NOT_FOUND" CRLF
+#define ERROR_INVALID_KEYSTORAGE "CLIENT_ERROR storage does not exists" CRLF
+
/* This is required for normal signals processing */
static GList *global_evbases = NULL;
static struct event_base *main_base = NULL;
@@ -148,11 +155,311 @@ config_kvstorage_worker (struct rspamd_worker *worker)
return TRUE;
}
+/*
+ * Free kvstorage session
+ */
+static void
+free_kvstorage_session (struct kvstorage_session *session)
+{
+ rspamd_remove_dispatcher (session->dispather);
+ memory_pool_delete (session->pool);
+ close (session->sock);
+}
+
/**
- * Accept function
+ * Parse kvstorage command
+ */
+static gboolean
+parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
+{
+ gchar *p, *c, *end;
+ gint state = 0, next_state;
+
+ p = in->begin;
+ end = in->begin + in->len;
+ c = p;
+
+ /* State machine for parsing */
+ while (p <= end) {
+ switch (state) {
+ case 0:
+ /* At this state we try to read identifier of storage */
+ if (g_ascii_isdigit (*p)) {
+ p ++;
+ }
+ else {
+ if (g_ascii_isspace (*p) && p != c) {
+ /* We have some digits, so parse id */
+ session->id = strtoul (c, NULL, 10);
+ state = 99;
+ next_state = 1;
+ }
+ else if (c == p) {
+ /* We have some character, so assume id as 0 and parse command */
+ session->id = 0;
+ state = 1;
+ }
+ else {
+ /* We have something wrong here (like some digits and then come non-digits) */
+ return FALSE;
+ }
+ }
+ break;
+ case 1:
+ /* At this state we parse command */
+ if (g_ascii_isalpha (*p) && p != end) {
+ p ++;
+ }
+ else {
+ if ((g_ascii_isspace (*p) || p == end) && p != c) {
+ /* We got some command, try to parse it */
+ if (p - c == 3) {
+ /* Set or get command */
+ if (memcmp (c, "get", 3) == 0) {
+ session->command = KVSTORAGE_CMD_GET;
+ }
+ else if (memcmp (c, "set", 3) == 0) {
+ session->command = KVSTORAGE_CMD_SET;
+ }
+ else {
+ /* Error */
+ return FALSE;
+ }
+ }
+ else if (p - c == 4) {
+ if (memcmp (c, "quit", 4) == 0) {
+ session->command = KVSTORAGE_CMD_QUIT;
+ state = 100;
+ continue;
+ }
+ }
+ else if (p - c == 6) {
+ if (memcmp (c, "delete", 6) == 0) {
+ session->command = KVSTORAGE_CMD_DELETE;
+ }
+ else {
+ return FALSE;
+ }
+ }
+ else {
+ return FALSE;
+ }
+ /* Skip spaces and try to parse key */
+ state = 99;
+ next_state = 2;
+ }
+ else {
+ /* Some error */
+ return FALSE;
+ }
+ }
+ break;
+ case 2:
+ /* Read and store key */
+ if (!g_ascii_isspace (*p) && end != p) {
+ p ++;
+ }
+ else {
+ if (p == c) {
+ return FALSE;
+ }
+ else {
+ session->key = memory_pool_alloc (session->pool, p - c + 1);
+ rspamd_strlcpy (session->key, c, p - c + 1);
+ /* Now we must select next state based on command */
+ if (session->command == KVSTORAGE_CMD_SET) {
+ /* Read flags */
+ state = 99;
+ next_state = 3;
+ }
+ else {
+ /* Nothing to read for other commands */
+ state = 100;
+ }
+ }
+ }
+ break;
+ case 3:
+ /* Read flags */
+ if (g_ascii_isdigit (*p)) {
+ p ++;
+ }
+ else {
+ if (g_ascii_isspace (*p)) {
+ session->flags = strtoul (c, NULL, 10);
+ state = 99;
+ next_state = 4;
+ }
+ else {
+ return FALSE;
+ }
+ }
+ break;
+ case 4:
+ /* Read exptime */
+ if (g_ascii_isdigit (*p)) {
+ p ++;
+ }
+ else {
+ if (g_ascii_isspace (*p)) {
+ session->expire = strtoul (c, NULL, 10);
+ state = 99;
+ next_state = 5;
+ }
+ else {
+ return FALSE;
+ }
+ }
+ break;
+ case 5:
+ /* Read size */
+ if (g_ascii_isdigit (*p)) {
+ p ++;
+ }
+ else {
+ if (g_ascii_isspace (*p) || end == p) {
+ session->length = strtoul (c, NULL, 10);
+ state = 100;
+ }
+ else {
+ return FALSE;
+ }
+ }
+ break;
+ case 99:
+ /* Skip spaces state */
+ if (g_ascii_isspace (*p)) {
+ p ++;
+ }
+ else {
+ c = p;
+ state = next_state;
+ }
+ break;
+ case 100:
+ /* Successful state */
+ return TRUE;
+ break;
+ }
+ }
+
+ return state == 100;
+}
+
+/**
+ * Dispatcher callbacks
+ */
+/*
+ * Callback that is called when there is data to read in buffer
*/
+static gboolean
+kvstorage_read_socket (f_str_t * in, void *arg)
+{
+ struct kvstorage_session *session = (struct kvstorage_session *) arg;
+ struct kvstorage_worker_thread *thr;
+ struct rspamd_kv_element *elt;
+ gint r;
+ gchar outbuf[BUFSIZ];
+
+ if (in->len == 0) {
+ /* Skip empty commands */
+ return TRUE;
+ }
+ thr = session->thr;
+ switch (session->state) {
+ case KVSTORAGE_STATE_READ_CMD:
+ /* Update timestamp */
+ session->now = time (NULL);
+ if (! parse_kvstorage_command (session, in)) {
+ thr_info ("%ud: unknown command: %V", thr->id, in);
+ return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND,
+ sizeof (ERROR_UNKNOWN_COMMAND) - 1, FALSE, TRUE);
+ }
+ else {
+ session->cf = get_kvstorage_config (session->id);
+ if (session->cf == NULL) {
+ thr_info ("%ud: bad keystorage: %ud", thr->id, session->id);
+ return rspamd_dispatcher_write (session->dispather, ERROR_INVALID_KEYSTORAGE,
+ sizeof (ERROR_INVALID_KEYSTORAGE) - 1, FALSE, TRUE);
+ }
+ if (session->command == KVSTORAGE_CMD_SET) {
+ session->state = KVSTORAGE_STATE_READ_DATA;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->length);
+ }
+ else if (session->command == KVSTORAGE_CMD_GET) {
+ elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->now);
+ if (elt == NULL) {
+ return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
+ sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ }
+ else {
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
+ elt->key, elt->flags, elt->size);
+ if (!rspamd_dispatcher_write (session->dispather, outbuf,
+ r, FALSE, FALSE)) {
+ return FALSE;
+ }
+ if (!rspamd_dispatcher_write (session->dispather, elt->data, elt->size, FALSE, TRUE)) {
+ return FALSE;
+ }
+ return rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
+ sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ else if (session->command == KVSTORAGE_CMD_DELETE) {
+ if (rspamd_kv_storage_delete (session->cf->storage, session->key)) {
+ return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
+ sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
+ sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ }
+ }
+ else if (session->command == KVSTORAGE_CMD_QUIT) {
+ /* Quit session */
+ free_kvstorage_session (session);
+ return FALSE;
+ }
+ }
+ break;
+ case KVSTORAGE_STATE_READ_DATA:
+ session->state = KVSTORAGE_STATE_READ_CMD;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ if (rspamd_kv_storage_insert (session->cf->storage, session->key, in->begin, in->len,
+ session->flags, session->expire)) {
+ return rspamd_dispatcher_write (session->dispather, "STORED" CRLF,
+ sizeof ("STORED" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED,
+ sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE);
+ }
+
+ break;
+ }
+
+ return TRUE;
+}
+
/*
- * Accept new connection and construct task
+ * Called if something goes wrong
+ */
+static void
+kvstorage_err_socket (GError * err, void *arg)
+{
+ struct kvstorage_session *session = (struct kvstorage_session *) arg;
+ struct kvstorage_worker_thread *thr;
+
+ thr = session->thr;
+ thr_info ("%ud: abnormally closing connection from: %s, error: %s",
+ thr->id, inet_ntoa (session->client_addr), err->message);
+ g_error_free (err);
+ free_kvstorage_session (session);
+}
+
+/**
+ * Accept function
*/
static void
thr_accept_socket (gint fd, short what, void *arg)
@@ -161,6 +468,7 @@ thr_accept_socket (gint fd, short what, void *arg)
union sa_union su;
socklen_t addrlen = sizeof (su.ss);
gint nfd;
+ struct kvstorage_session *session;
if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno));
@@ -171,15 +479,26 @@ thr_accept_socket (gint fd, short what, void *arg)
return;
}
+ session = g_malloc (sizeof (struct kvstorage_session));
+ session->pool = memory_pool_new (memory_pool_get_size ());
+ session->state = KVSTORAGE_STATE_READ_CMD;
+ session->thr = thr;
+ session->sock = nfd;
+ session->dispather = rspamd_create_dispatcher (thr->ctx->ev_base, nfd, BUFFER_LINE,
+ kvstorage_read_socket, NULL,
+ kvstorage_err_socket, thr->tv, session);
+ session->dispather->strip_eol = TRUE;
+
if (su.ss.ss_family == AF_UNIX) {
thr_info ("%ud: accepted connection from unix socket", thr->id);
+ session->client_addr.s_addr = INADDR_NONE;
}
else if (su.ss.ss_family == AF_INET) {
thr_info ("%ud: accepted connection from %s port %d", thr->id,
inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port));
+ memcpy (&session->client_addr, &su.s4.sin_addr,
+ sizeof (struct in_addr));
}
- /* XXX: write the logic */
- close (nfd);
}
/**
@@ -253,6 +572,12 @@ start_kvstorage_worker (struct rspamd_worker *worker)
ctx->threads = NULL;
g_thread_init (NULL);
+#if _EVENT_NUMERIC_VERSION > 0x02000000
+ if (evthread_use_pthreads () == -1) {
+ msg_err ("threads support is not supported in your libevent so kvstorage is not functionable");
+ exit (EXIT_SUCCESS);
+ }
+#endif
main_base = ctx->ev_base;
/* Set kvstorage options */
diff --git a/src/kvstorage_server.h b/src/kvstorage_server.h
index 29107101c..6f22b08c2 100644
--- a/src/kvstorage_server.h
+++ b/src/kvstorage_server.h
@@ -27,6 +27,7 @@
#include "config.h"
#include "mem_pool.h"
+#include "buffer.h"
/* Configuration context for kvstorage worker */
struct kvstorage_worker_ctx {
@@ -50,6 +51,31 @@ struct kvstorage_worker_thread {
guint id;
};
+struct kvstorage_session {
+ rspamd_io_dispatcher_t *dispather;
+ enum {
+ KVSTORAGE_STATE_READ_CMD,
+ KVSTORAGE_STATE_READ_DATA
+ } state;
+ enum {
+ KVSTORAGE_CMD_SET,
+ KVSTORAGE_CMD_GET,
+ KVSTORAGE_CMD_DELETE,
+ KVSTORAGE_CMD_QUIT
+ } command;
+ guint id;
+ memory_pool_t *pool;
+ gchar *key;
+ struct kvstorage_config *cf;
+ struct kvstorage_worker_thread *thr;
+ struct in_addr client_addr;
+ gint sock;
+ guint flags;
+ guint expire;
+ guint length;
+ time_t now;
+};
+
gpointer init_kvstorage_worker (void);
void start_kvstorage_worker (struct rspamd_worker *worker);