aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/controller.c51
-rw-r--r--src/events.c12
-rw-r--r--src/events.h5
-rw-r--r--src/kvstorage_server.c33
-rw-r--r--src/lua/lua_redis.c23
-rw-r--r--src/main.h1
-rw-r--r--src/smtp.c2
-rw-r--r--src/worker.c32
8 files changed, 126 insertions, 33 deletions
diff --git a/src/controller.c b/src/controller.c
index fc26b36d9..1b78b82f5 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -832,26 +832,47 @@ process_normal_command (const gchar *line)
return NULL;
}
+/*
+ * Called if all filters are processed
+ */
static void
fin_learn_task (void *arg)
{
- struct worker_task *task = (struct worker_task *)arg;
-
- /* XXX: this is bad logic in fact */
- /* Process all statfiles */
- process_statfiles (task);
- /* Call post filters */
- lua_call_post_filters (task);
- task->state = WRITE_REPLY;
-
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
+ struct worker_task *task = (struct worker_task *) arg;
+
+ if (task->state != WRITING_REPLY) {
+ task->state = WRITE_REPLY;
+ /* Process all statfiles */
+ process_statfiles (task);
+ /* Call post filters */
+ lua_call_post_filters (task);
}
- else {
- rspamd_dispatcher_restore (task->dispatcher);
+
+ /* Check if we have all events finished */
+ if (task->state != WRITING_REPLY) {
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_dispatcher_restore (task->dispatcher);
+ }
}
}
+/*
+ * Called if session was restored inside fin callback
+ */
+static void
+restore_learn_task (void *arg)
+{
+ struct worker_task *task = (struct worker_task *) arg;
+
+ /* Special state */
+ task->state = WRITING_REPLY;
+
+ rspamd_dispatcher_pause (task->dispatcher);
+}
+
static gboolean
controller_read_socket (f_str_t * in, void *arg)
{
@@ -971,7 +992,7 @@ controller_read_socket (f_str_t * in, void *arg)
return FALSE;
}
/* Set up async session */
- task->s = new_async_session (task->task_pool, fin_learn_task, free_task_hard, task);
+ task->s = new_async_session (task->task_pool, fin_learn_task, restore_learn_task, free_task_hard, task);
r = process_filters (task);
if (r == -1) {
session->state = STATE_REPLY;
@@ -1187,7 +1208,7 @@ accept_socket (gint fd, short what, void *arg)
io_tv->tv_sec = ctx->timeout / 1000;
io_tv->tv_usec = ctx->timeout - io_tv->tv_sec * 1000;
- new_session->s = new_async_session (new_session->session_pool, NULL, free_session, new_session);
+ new_session->s = new_async_session (new_session->session_pool, NULL, NULL, free_session, new_session);
new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket,
controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
diff --git a/src/events.c b/src/events.c
index 6d848ca2d..bb5a0a0a8 100644
--- a/src/events.c
+++ b/src/events.c
@@ -58,13 +58,15 @@ rspamd_event_hash (gconstpointer a)
}
struct rspamd_async_session *
-new_async_session (memory_pool_t * pool, event_finalizer_t fin, event_finalizer_t cleanup, void *user_data)
+new_async_session (memory_pool_t * pool, event_finalizer_t fin,
+ event_finalizer_t restore, event_finalizer_t cleanup, void *user_data)
{
struct rspamd_async_session *new;
new = memory_pool_alloc (pool, sizeof (struct rspamd_async_session));
new->pool = pool;
new->fin = fin;
+ new->restore = restore;
new->cleanup = cleanup;
new->user_data = user_data;
new->wanna_die = FALSE;
@@ -207,7 +209,13 @@ check_session_pending (struct rspamd_async_session *session)
if (session->fin != NULL) {
session->fin (session->user_data);
}
- /* No more events */
+ /* Check events count again */
+ if (g_hash_table_size (session->events) != 0) {
+ if (session->restore != NULL) {
+ session->restore (session->user_data);
+ }
+ return TRUE;
+ }
return FALSE;
}
diff --git a/src/events.h b/src/events.h
index 434c39d80..2aba3eb2e 100644
--- a/src/events.h
+++ b/src/events.h
@@ -16,6 +16,7 @@ struct rspamd_async_event {
struct rspamd_async_session {
event_finalizer_t fin;
+ event_finalizer_t restore;
event_finalizer_t cleanup;
GHashTable *events;
GQueue *forced_events;
@@ -28,12 +29,14 @@ struct rspamd_async_session {
* Make new async session
* @param pool pool to alloc memory from
* @param fin a callback called when no events are found in session
+ * @param restore a callback is called to restore processing of session
* @param cleanup a callback called when session is forcefully destroyed
* @param user_data abstract user data
* @return
*/
struct rspamd_async_session *new_async_session (memory_pool_t *pool,
- event_finalizer_t fin, event_finalizer_t cleanup, void *user_data);
+ event_finalizer_t fin, event_finalizer_t restore,
+ event_finalizer_t cleanup, void *user_data);
/**
* Insert new event to the session
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c
index 683c5a2e4..7cf53f1a0 100644
--- a/src/kvstorage_server.c
+++ b/src/kvstorage_server.c
@@ -580,7 +580,7 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
longval);
}
else {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%l" CRLF,
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), ":%l" CRLF,
longval);
}
if (!rspamd_dispatcher_write (session->dispather, outbuf,
@@ -691,10 +691,10 @@ kvstorage_check_argnum (struct kvstorage_session *session)
case KVSTORAGE_CMD_SYNC:
return session->argc == 1;
case KVSTORAGE_CMD_SET:
- return session->argc == 3;
+ return session->argc == 3 || session->argc == 4;
case KVSTORAGE_CMD_INCR:
case KVSTORAGE_CMD_DECR:
- return session->argc == 1 || session->argc == 2;
+ return session->argc == 2 || session->argc == 3;
default:
return session->argc == 2;
}
@@ -831,7 +831,7 @@ kvstorage_read_socket (f_str_t * in, void *arg)
}
else if (session->argnum == 2) {
/* We get datablock for set command */
- if (session->command == KVSTORAGE_CMD_SET) {
+ if (session->command == KVSTORAGE_CMD_SET && session->argc == 3) {
session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
@@ -845,6 +845,14 @@ kvstorage_read_socket (f_str_t * in, void *arg)
sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
}
}
+ else if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) {
+ /* It is expire argument */
+ session->state = KVSTORAGE_STATE_READ_CMD;
+ rspamd_strtol (in->begin, in->len, (glong *)&session->expire);
+ session->argnum ++;
+ session->state = KVSTORAGE_STATE_READ_ARGLEN;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ }
else {
session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_strtol (in->begin, in->len, &session->arg_data.value);
@@ -855,6 +863,23 @@ kvstorage_read_socket (f_str_t * in, void *arg)
return kvstorage_process_command (session, TRUE);
}
}
+ else if (session->argnum == 3) {
+ /* We get datablock for set command */
+ if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) {
+ session->state = KVSTORAGE_STATE_READ_CMD;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
+ in->begin, in->len,
+ session->flags, session->expire)) {
+ return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+ sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
+ sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ }
break;
case KVSTORAGE_STATE_READ_DATA:
session->state = KVSTORAGE_STATE_READ_CMD;
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index 49a7e9279..c427e60ed 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -134,7 +134,19 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud)
/* Error is nil */
lua_pushnil (ud->L);
/* Data */
- lua_pushlstring (ud->L, r->str, r->len);
+ if (r->type == REDIS_REPLY_STRING) {
+ lua_pushlstring (ud->L, r->str, r->len);
+ }
+ else if (r->type == REDIS_REPLY_INTEGER) {
+ lua_pushnumber (ud->L, r->integer);
+ }
+ else if (r->type == REDIS_REPLY_STATUS) {
+ lua_pushlstring (ud->L, r->str, r->len);
+ }
+ else {
+ msg_info ("bad type is passed: %d", r->type);
+ lua_pushnil (ud->L);
+ }
if (lua_pcall (ud->L, 3, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
@@ -157,10 +169,15 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
if (c->err == 0) {
if (r != NULL) {
- lua_redis_push_data (reply, ud);
+ if (reply->type != REDIS_REPLY_ERROR) {
+ lua_redis_push_data (reply, ud);
+ }
+ else {
+ lua_redis_push_error (reply->str, ud, TRUE);
+ }
}
else {
- lua_redis_push_error ("received no data from server", ud, TRUE);
+ lua_redis_push_error ("received no data from server", ud, FALSE);
}
}
else {
diff --git a/src/main.h b/src/main.h
index 8a680451a..ada9d97e4 100644
--- a/src/main.h
+++ b/src/main.h
@@ -177,6 +177,7 @@ struct worker_task {
WRITE_REPLY,
WRITE_ERROR,
WAIT_FILTER,
+ WAIT_POST_FILTER,
CLOSING_CONNECTION,
WRITING_REPLY
} state; /**< current session state */
diff --git a/src/smtp.c b/src/smtp.c
index 8f3706a6b..cc0e93391 100644
--- a/src/smtp.c
+++ b/src/smtp.c
@@ -700,7 +700,7 @@ accept_socket (gint fd, short what, void *arg)
/* Resolve client's addr */
/* Set up async session */
- session->s = new_async_session (session->pool, NULL, free_smtp_session, session);
+ session->s = new_async_session (session->pool, NULL, NULL, free_smtp_session, session);
session->state = SMTP_STATE_RESOLVE_REVERSE;
if (! make_dns_request (session->resolver, session->s, session->pool,
smtp_dns_cb, session, DNS_REQUEST_PTR, &session->client_addr)) {
diff --git a/src/worker.c b/src/worker.c
index 2411906b0..151045c19 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -494,6 +494,7 @@ read_socket (f_str_t * in, void *arg)
return write_socket (task);
break;
case WAIT_FILTER:
+ case WAIT_POST_FILTER:
msg_info ("ignoring trailing garbadge of size %z", in->len);
break;
default:
@@ -547,10 +548,12 @@ write_socket (void *arg)
return FALSE;
break;
case WRITING_REPLY:
+ case WAIT_FILTER:
+ case WAIT_POST_FILTER:
/* Do nothing here */
break;
default:
- msg_info ("abnormally closing connection");
+ msg_info ("abnormally closing connection at state: %d", task->state);
if (ctx->is_custom) {
fin_custom_filters (task);
}
@@ -589,12 +592,15 @@ fin_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
- /* Process all statfiles */
- process_statfiles (task);
- /* Call post filters */
- lua_call_post_filters (task);
- task->state = WRITE_REPLY;
+ if (task->state != WAIT_POST_FILTER) {
+ /* Process all statfiles */
+ process_statfiles (task);
+ /* Call post filters */
+ lua_call_post_filters (task);
+ }
+ /* Check if we have all events finished */
+ task->state = WRITE_REPLY;
if (task->fin_callback) {
task->fin_callback (task->fin_arg);
}
@@ -604,6 +610,18 @@ fin_task (void *arg)
}
/*
+ * Called if session was restored inside fin callback
+ */
+static void
+restore_task (void *arg)
+{
+ struct worker_task *task = (struct worker_task *) arg;
+
+ /* Special state */
+ task->state = WAIT_POST_FILTER;
+}
+
+/*
* Reduce number of tasks proceeded
*/
static void
@@ -682,7 +700,7 @@ accept_socket (gint fd, short what, void *arg)
/* Set up async session */
new_task->s =
- new_async_session (new_task->task_pool, fin_task, free_task_hard, new_task);
+ new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task);
/* Init custom filters */
#ifndef BUILD_STATIC