diff options
-rw-r--r-- | src/controller.c | 51 | ||||
-rw-r--r-- | src/events.c | 12 | ||||
-rw-r--r-- | src/events.h | 5 | ||||
-rw-r--r-- | src/kvstorage_server.c | 33 | ||||
-rw-r--r-- | src/lua/lua_redis.c | 23 | ||||
-rw-r--r-- | src/main.h | 1 | ||||
-rw-r--r-- | src/smtp.c | 2 | ||||
-rw-r--r-- | src/worker.c | 32 |
8 files changed, 126 insertions, 33 deletions
diff --git a/src/controller.c b/src/controller.c index fc26b36d9..1b78b82f5 100644 --- a/src/controller.c +++ b/src/controller.c @@ -832,26 +832,47 @@ process_normal_command (const gchar *line) return NULL; } +/* + * Called if all filters are processed + */ static void fin_learn_task (void *arg) { - struct worker_task *task = (struct worker_task *)arg; - - /* XXX: this is bad logic in fact */ - /* Process all statfiles */ - process_statfiles (task); - /* Call post filters */ - lua_call_post_filters (task); - task->state = WRITE_REPLY; - - if (task->fin_callback) { - task->fin_callback (task->fin_arg); + struct worker_task *task = (struct worker_task *) arg; + + if (task->state != WRITING_REPLY) { + task->state = WRITE_REPLY; + /* Process all statfiles */ + process_statfiles (task); + /* Call post filters */ + lua_call_post_filters (task); } - else { - rspamd_dispatcher_restore (task->dispatcher); + + /* Check if we have all events finished */ + if (task->state != WRITING_REPLY) { + if (task->fin_callback) { + task->fin_callback (task->fin_arg); + } + else { + rspamd_dispatcher_restore (task->dispatcher); + } } } +/* + * Called if session was restored inside fin callback + */ +static void +restore_learn_task (void *arg) +{ + struct worker_task *task = (struct worker_task *) arg; + + /* Special state */ + task->state = WRITING_REPLY; + + rspamd_dispatcher_pause (task->dispatcher); +} + static gboolean controller_read_socket (f_str_t * in, void *arg) { @@ -971,7 +992,7 @@ controller_read_socket (f_str_t * in, void *arg) return FALSE; } /* Set up async session */ - task->s = new_async_session (task->task_pool, fin_learn_task, free_task_hard, task); + task->s = new_async_session (task->task_pool, fin_learn_task, restore_learn_task, free_task_hard, task); r = process_filters (task); if (r == -1) { session->state = STATE_REPLY; @@ -1187,7 +1208,7 @@ accept_socket (gint fd, short what, void *arg) io_tv->tv_sec = ctx->timeout / 1000; io_tv->tv_usec = ctx->timeout - io_tv->tv_sec * 1000; - new_session->s = new_async_session (new_session->session_pool, NULL, free_session, new_session); + new_session->s = new_async_session (new_session->session_pool, NULL, NULL, free_session, new_session); new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket, controller_write_socket, controller_err_socket, io_tv, (void *)new_session); diff --git a/src/events.c b/src/events.c index 6d848ca2d..bb5a0a0a8 100644 --- a/src/events.c +++ b/src/events.c @@ -58,13 +58,15 @@ rspamd_event_hash (gconstpointer a) } struct rspamd_async_session * -new_async_session (memory_pool_t * pool, event_finalizer_t fin, event_finalizer_t cleanup, void *user_data) +new_async_session (memory_pool_t * pool, event_finalizer_t fin, + event_finalizer_t restore, event_finalizer_t cleanup, void *user_data) { struct rspamd_async_session *new; new = memory_pool_alloc (pool, sizeof (struct rspamd_async_session)); new->pool = pool; new->fin = fin; + new->restore = restore; new->cleanup = cleanup; new->user_data = user_data; new->wanna_die = FALSE; @@ -207,7 +209,13 @@ check_session_pending (struct rspamd_async_session *session) if (session->fin != NULL) { session->fin (session->user_data); } - /* No more events */ + /* Check events count again */ + if (g_hash_table_size (session->events) != 0) { + if (session->restore != NULL) { + session->restore (session->user_data); + } + return TRUE; + } return FALSE; } diff --git a/src/events.h b/src/events.h index 434c39d80..2aba3eb2e 100644 --- a/src/events.h +++ b/src/events.h @@ -16,6 +16,7 @@ struct rspamd_async_event { struct rspamd_async_session { event_finalizer_t fin; + event_finalizer_t restore; event_finalizer_t cleanup; GHashTable *events; GQueue *forced_events; @@ -28,12 +29,14 @@ struct rspamd_async_session { * Make new async session * @param pool pool to alloc memory from * @param fin a callback called when no events are found in session + * @param restore a callback is called to restore processing of session * @param cleanup a callback called when session is forcefully destroyed * @param user_data abstract user data * @return */ struct rspamd_async_session *new_async_session (memory_pool_t *pool, - event_finalizer_t fin, event_finalizer_t cleanup, void *user_data); + event_finalizer_t fin, event_finalizer_t restore, + event_finalizer_t cleanup, void *user_data); /** * Insert new event to the session diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index 683c5a2e4..7cf53f1a0 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -580,7 +580,7 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) longval); } else { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%l" CRLF, + r = rspamd_snprintf (outbuf, sizeof (outbuf), ":%l" CRLF, longval); } if (!rspamd_dispatcher_write (session->dispather, outbuf, @@ -691,10 +691,10 @@ kvstorage_check_argnum (struct kvstorage_session *session) case KVSTORAGE_CMD_SYNC: return session->argc == 1; case KVSTORAGE_CMD_SET: - return session->argc == 3; + return session->argc == 3 || session->argc == 4; case KVSTORAGE_CMD_INCR: case KVSTORAGE_CMD_DECR: - return session->argc == 1 || session->argc == 2; + return session->argc == 2 || session->argc == 3; default: return session->argc == 2; } @@ -831,7 +831,7 @@ kvstorage_read_socket (f_str_t * in, void *arg) } else if (session->argnum == 2) { /* We get datablock for set command */ - if (session->command == KVSTORAGE_CMD_SET) { + if (session->command == KVSTORAGE_CMD_SET && session->argc == 3) { session->state = KVSTORAGE_STATE_READ_CMD; rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen, @@ -845,6 +845,14 @@ kvstorage_read_socket (f_str_t * in, void *arg) sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE); } } + else if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) { + /* It is expire argument */ + session->state = KVSTORAGE_STATE_READ_CMD; + rspamd_strtol (in->begin, in->len, (glong *)&session->expire); + session->argnum ++; + session->state = KVSTORAGE_STATE_READ_ARGLEN; + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + } else { session->state = KVSTORAGE_STATE_READ_CMD; rspamd_strtol (in->begin, in->len, &session->arg_data.value); @@ -855,6 +863,23 @@ kvstorage_read_socket (f_str_t * in, void *arg) return kvstorage_process_command (session, TRUE); } } + else if (session->argnum == 3) { + /* We get datablock for set command */ + if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) { + session->state = KVSTORAGE_STATE_READ_CMD; + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen, + in->begin, in->len, + session->flags, session->expire)) { + return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, + sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF, + sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE); + } + } + } break; case KVSTORAGE_STATE_READ_DATA: session->state = KVSTORAGE_STATE_READ_CMD; diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index 49a7e9279..c427e60ed 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -134,7 +134,19 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud) /* Error is nil */ lua_pushnil (ud->L); /* Data */ - lua_pushlstring (ud->L, r->str, r->len); + if (r->type == REDIS_REPLY_STRING) { + lua_pushlstring (ud->L, r->str, r->len); + } + else if (r->type == REDIS_REPLY_INTEGER) { + lua_pushnumber (ud->L, r->integer); + } + else if (r->type == REDIS_REPLY_STATUS) { + lua_pushlstring (ud->L, r->str, r->len); + } + else { + msg_info ("bad type is passed: %d", r->type); + lua_pushnil (ud->L); + } if (lua_pcall (ud->L, 3, 0, 0) != 0) { msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); @@ -157,10 +169,15 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) if (c->err == 0) { if (r != NULL) { - lua_redis_push_data (reply, ud); + if (reply->type != REDIS_REPLY_ERROR) { + lua_redis_push_data (reply, ud); + } + else { + lua_redis_push_error (reply->str, ud, TRUE); + } } else { - lua_redis_push_error ("received no data from server", ud, TRUE); + lua_redis_push_error ("received no data from server", ud, FALSE); } } else { diff --git a/src/main.h b/src/main.h index 8a680451a..ada9d97e4 100644 --- a/src/main.h +++ b/src/main.h @@ -177,6 +177,7 @@ struct worker_task { WRITE_REPLY, WRITE_ERROR, WAIT_FILTER, + WAIT_POST_FILTER, CLOSING_CONNECTION, WRITING_REPLY } state; /**< current session state */ diff --git a/src/smtp.c b/src/smtp.c index 8f3706a6b..cc0e93391 100644 --- a/src/smtp.c +++ b/src/smtp.c @@ -700,7 +700,7 @@ accept_socket (gint fd, short what, void *arg) /* Resolve client's addr */ /* Set up async session */ - session->s = new_async_session (session->pool, NULL, free_smtp_session, session); + session->s = new_async_session (session->pool, NULL, NULL, free_smtp_session, session); session->state = SMTP_STATE_RESOLVE_REVERSE; if (! make_dns_request (session->resolver, session->s, session->pool, smtp_dns_cb, session, DNS_REQUEST_PTR, &session->client_addr)) { diff --git a/src/worker.c b/src/worker.c index 2411906b0..151045c19 100644 --- a/src/worker.c +++ b/src/worker.c @@ -494,6 +494,7 @@ read_socket (f_str_t * in, void *arg) return write_socket (task); break; case WAIT_FILTER: + case WAIT_POST_FILTER: msg_info ("ignoring trailing garbadge of size %z", in->len); break; default: @@ -547,10 +548,12 @@ write_socket (void *arg) return FALSE; break; case WRITING_REPLY: + case WAIT_FILTER: + case WAIT_POST_FILTER: /* Do nothing here */ break; default: - msg_info ("abnormally closing connection"); + msg_info ("abnormally closing connection at state: %d", task->state); if (ctx->is_custom) { fin_custom_filters (task); } @@ -589,12 +592,15 @@ fin_task (void *arg) { struct worker_task *task = (struct worker_task *) arg; - /* Process all statfiles */ - process_statfiles (task); - /* Call post filters */ - lua_call_post_filters (task); - task->state = WRITE_REPLY; + if (task->state != WAIT_POST_FILTER) { + /* Process all statfiles */ + process_statfiles (task); + /* Call post filters */ + lua_call_post_filters (task); + } + /* Check if we have all events finished */ + task->state = WRITE_REPLY; if (task->fin_callback) { task->fin_callback (task->fin_arg); } @@ -604,6 +610,18 @@ fin_task (void *arg) } /* + * Called if session was restored inside fin callback + */ +static void +restore_task (void *arg) +{ + struct worker_task *task = (struct worker_task *) arg; + + /* Special state */ + task->state = WAIT_POST_FILTER; +} + +/* * Reduce number of tasks proceeded */ static void @@ -682,7 +700,7 @@ accept_socket (gint fd, short what, void *arg) /* Set up async session */ new_task->s = - new_async_session (new_task->task_pool, fin_task, free_task_hard, new_task); + new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task); /* Init custom filters */ #ifndef BUILD_STATIC |