]> source.dussan.org Git - rspamd.git/commitdiff
Allow for post filters to register async events.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 16 Dec 2011 14:38:52 +0000 (17:38 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 16 Dec 2011 14:38:52 +0000 (17:38 +0300)
Some fixes to lua redis library.

src/controller.c
src/events.c
src/events.h
src/kvstorage_server.c
src/lua/lua_redis.c
src/main.h
src/smtp.c
src/worker.c

index fc26b36d974fc8c8909fbaca73020f01b25d2684..1b78b82f5558afd69b07228b3224b24a8ede37fd 100644 (file)
@@ -832,26 +832,47 @@ process_normal_command (const gchar *line)
        return NULL;
 }
 
+/*
+ * Called if all filters are processed
+ */
 static void
 fin_learn_task (void *arg)
 {
-       struct worker_task             *task = (struct worker_task *)arg;
-
-       /* XXX: this is bad logic in fact */
-       /* Process all statfiles */
-       process_statfiles (task);
-       /* Call post filters */
-       lua_call_post_filters (task);
-       task->state = WRITE_REPLY;
-
-       if (task->fin_callback) {
-               task->fin_callback (task->fin_arg);
+       struct worker_task             *task = (struct worker_task *) arg;
+
+       if (task->state != WRITING_REPLY) {
+               task->state = WRITE_REPLY;
+               /* Process all statfiles */
+               process_statfiles (task);
+               /* Call post filters */
+               lua_call_post_filters (task);
        }
-       else {
-               rspamd_dispatcher_restore (task->dispatcher);
+
+       /* Check if we have all events finished */
+       if (task->state != WRITING_REPLY) {
+               if (task->fin_callback) {
+                       task->fin_callback (task->fin_arg);
+               }
+               else {
+                       rspamd_dispatcher_restore (task->dispatcher);
+               }
        }
 }
 
+/*
+ * Called if session was restored inside fin callback
+ */
+static void
+restore_learn_task (void *arg)
+{
+       struct worker_task             *task = (struct worker_task *) arg;
+
+       /* Special state */
+       task->state = WRITING_REPLY;
+
+       rspamd_dispatcher_pause (task->dispatcher);
+}
+
 static                          gboolean
 controller_read_socket (f_str_t * in, void *arg)
 {
@@ -971,7 +992,7 @@ controller_read_socket (f_str_t * in, void *arg)
                        return FALSE;
                }
                /* Set up async session */
-               task->s = new_async_session (task->task_pool, fin_learn_task, free_task_hard, task);
+               task->s = new_async_session (task->task_pool, fin_learn_task, restore_learn_task, free_task_hard, task);
                r = process_filters (task);
                if (r == -1) {
                        session->state = STATE_REPLY;
@@ -1187,7 +1208,7 @@ accept_socket (gint fd, short what, void *arg)
        io_tv->tv_sec = ctx->timeout / 1000;
        io_tv->tv_usec = ctx->timeout - io_tv->tv_sec * 1000;
 
-       new_session->s = new_async_session (new_session->session_pool, NULL, free_session, new_session);
+       new_session->s = new_async_session (new_session->session_pool, NULL, NULL, free_session, new_session);
 
        new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket,
                        controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
index 6d848ca2d0ccf32af1374f090274a6789d640eb6..bb5a0a0a83597cede5b562f96e0ff0a300372c5d 100644 (file)
@@ -58,13 +58,15 @@ rspamd_event_hash (gconstpointer a)
 }
 
 struct rspamd_async_session    *
-new_async_session (memory_pool_t * pool, event_finalizer_t fin, event_finalizer_t cleanup, void *user_data)
+new_async_session (memory_pool_t * pool, event_finalizer_t fin,
+               event_finalizer_t restore, event_finalizer_t cleanup, void *user_data)
 {
        struct rspamd_async_session    *new;
 
        new = memory_pool_alloc (pool, sizeof (struct rspamd_async_session));
        new->pool = pool;
        new->fin = fin;
+       new->restore = restore;
        new->cleanup = cleanup;
        new->user_data = user_data;
        new->wanna_die = FALSE;
@@ -207,7 +209,13 @@ check_session_pending (struct rspamd_async_session *session)
                if (session->fin != NULL) {
                        session->fin (session->user_data);
                }
-               /* No more events */
+               /* Check events count again */
+               if (g_hash_table_size (session->events) != 0) {
+                       if (session->restore != NULL) {
+                               session->restore (session->user_data);
+                       }
+                       return TRUE;
+               }
                return FALSE;
        }
 
index 434c39d803db950c4d021ab95bc161262d895d4d..2aba3eb2ea494702d27883c6d2e6df6276c59998 100644 (file)
@@ -16,6 +16,7 @@ struct rspamd_async_event {
 
 struct rspamd_async_session {
        event_finalizer_t fin;
+       event_finalizer_t restore;
        event_finalizer_t cleanup;
        GHashTable *events;
        GQueue *forced_events;
@@ -28,12 +29,14 @@ struct rspamd_async_session {
  * Make new async session
  * @param pool pool to alloc memory from
  * @param fin a callback called when no events are found in session
+ * @param restore a callback is called to restore processing of session
  * @param cleanup a callback called when session is forcefully destroyed
  * @param user_data abstract user data
  * @return
  */
 struct rspamd_async_session *new_async_session (memory_pool_t *pool,
-               event_finalizer_t fin, event_finalizer_t cleanup, void *user_data);
+               event_finalizer_t fin, event_finalizer_t restore,
+               event_finalizer_t cleanup, void *user_data);
 
 /**
  * Insert new event to the session
index 683c5a2e404e15ad56345279ec8ca68c717cd619..7cf53f1a09c5a3ed9e1721d25dcd4929fc9d8726 100644 (file)
@@ -580,7 +580,7 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
                                                longval);
                        }
                        else {
-                               r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%l" CRLF,
+                               r = rspamd_snprintf (outbuf, sizeof (outbuf), ":%l" CRLF,
                                                longval);
                        }
                        if (!rspamd_dispatcher_write (session->dispather, outbuf,
@@ -691,10 +691,10 @@ kvstorage_check_argnum (struct kvstorage_session *session)
        case KVSTORAGE_CMD_SYNC:
                return session->argc == 1;
        case KVSTORAGE_CMD_SET:
-               return session->argc == 3;
+               return session->argc == 3 || session->argc == 4;
        case KVSTORAGE_CMD_INCR:
        case KVSTORAGE_CMD_DECR:
-               return session->argc == 1 || session->argc == 2;
+               return session->argc == 2 || session->argc == 3;
        default:
                return session->argc == 2;
        }
@@ -831,7 +831,7 @@ kvstorage_read_socket (f_str_t * in, void *arg)
                }
                else if (session->argnum == 2) {
                        /* We get datablock for set command */
-                       if (session->command == KVSTORAGE_CMD_SET) {
+                       if (session->command == KVSTORAGE_CMD_SET && session->argc == 3) {
                                session->state = KVSTORAGE_STATE_READ_CMD;
                                rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
                                if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
@@ -845,6 +845,14 @@ kvstorage_read_socket (f_str_t * in, void *arg)
                                                        sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
                                }
                        }
+                       else if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) {
+                               /* It is expire argument */
+                               session->state = KVSTORAGE_STATE_READ_CMD;
+                               rspamd_strtol (in->begin, in->len, (glong *)&session->expire);
+                               session->argnum ++;
+                               session->state = KVSTORAGE_STATE_READ_ARGLEN;
+                               rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+                       }
                        else {
                                session->state = KVSTORAGE_STATE_READ_CMD;
                                rspamd_strtol (in->begin, in->len, &session->arg_data.value);
@@ -855,6 +863,23 @@ kvstorage_read_socket (f_str_t * in, void *arg)
                                return kvstorage_process_command (session, TRUE);
                        }
                }
+               else if (session->argnum == 3) {
+                       /* We get datablock for set command */
+                       if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) {
+                               session->state = KVSTORAGE_STATE_READ_CMD;
+                               rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+                               if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
+                                               in->begin, in->len,
+                                               session->flags, session->expire)) {
+                                       return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+                                                       sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+                               }
+                               else {
+                                       return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
+                                                       sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
+                               }
+                       }
+               }
                break;
        case KVSTORAGE_STATE_READ_DATA:
                session->state = KVSTORAGE_STATE_READ_CMD;
index 49a7e92799943cee05ee004c5b320d544ebd926b..c427e60ed68a014bbd1e5690ea2e992d32239b11 100644 (file)
@@ -134,7 +134,19 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud)
        /* Error is nil */
        lua_pushnil (ud->L);
        /* Data */
-       lua_pushlstring (ud->L, r->str, r->len);
+       if (r->type == REDIS_REPLY_STRING) {
+               lua_pushlstring (ud->L, r->str, r->len);
+       }
+       else if (r->type == REDIS_REPLY_INTEGER) {
+               lua_pushnumber (ud->L, r->integer);
+       }
+       else if (r->type == REDIS_REPLY_STATUS) {
+               lua_pushlstring (ud->L, r->str, r->len);
+       }
+       else {
+               msg_info ("bad type is passed: %d", r->type);
+               lua_pushnil (ud->L);
+       }
 
        if (lua_pcall (ud->L, 3, 0, 0) != 0) {
                msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
@@ -157,10 +169,15 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
 
        if (c->err == 0) {
                if (r != NULL) {
-                       lua_redis_push_data (reply, ud);
+                       if (reply->type != REDIS_REPLY_ERROR) {
+                               lua_redis_push_data (reply, ud);
+                       }
+                       else {
+                               lua_redis_push_error (reply->str, ud, TRUE);
+                       }
                }
                else {
-                       lua_redis_push_error ("received no data from server", ud, TRUE);
+                       lua_redis_push_error ("received no data from server", ud, FALSE);
                }
        }
        else {
index 8a680451a0d6d4dab936f27515f292df6760d0dc..ada9d97e481929acdbe2117ecac580f9339aadd7 100644 (file)
@@ -177,6 +177,7 @@ struct worker_task {
                WRITE_REPLY,
                WRITE_ERROR,
                WAIT_FILTER,
+               WAIT_POST_FILTER,
                CLOSING_CONNECTION,
                WRITING_REPLY
        } state;                                                                                                        /**< current session state                                                      */
index 8f3706a6bf9a2a6e6c109e266c9c920e97102b21..cc0e933917d4cb08538c72c2e9ff4467721b190c 100644 (file)
@@ -700,7 +700,7 @@ accept_socket (gint fd, short what, void *arg)
 
        /* Resolve client's addr */
        /* Set up async session */
-       session->s = new_async_session (session->pool, NULL, free_smtp_session, session);
+       session->s = new_async_session (session->pool, NULL, NULL, free_smtp_session, session);
        session->state = SMTP_STATE_RESOLVE_REVERSE;
        if (! make_dns_request (session->resolver, session->s, session->pool,
                        smtp_dns_cb, session, DNS_REQUEST_PTR, &session->client_addr)) {
index 2411906b048c8f161846e768f91d671dcfe644e0..151045c195177c20b88e68dc37d27b390029a958 100644 (file)
@@ -494,6 +494,7 @@ read_socket (f_str_t * in, void *arg)
                return write_socket (task);
                break;
        case WAIT_FILTER:
+       case WAIT_POST_FILTER:
                msg_info ("ignoring trailing garbadge of size %z", in->len);
                break;
        default:
@@ -547,10 +548,12 @@ write_socket (void *arg)
                return FALSE;
                break;
        case WRITING_REPLY:
+       case WAIT_FILTER:
+       case WAIT_POST_FILTER:
                /* Do nothing here */
                break;
        default:
-               msg_info ("abnormally closing connection");
+               msg_info ("abnormally closing connection at state: %d", task->state);
                if (ctx->is_custom) {
                        fin_custom_filters (task);
                }
@@ -589,12 +592,15 @@ fin_task (void *arg)
 {
        struct worker_task             *task = (struct worker_task *) arg;
 
-       /* Process all statfiles */
-       process_statfiles (task);
-       /* Call post filters */
-       lua_call_post_filters (task);
-       task->state = WRITE_REPLY;
+       if (task->state != WAIT_POST_FILTER) {
+               /* Process all statfiles */
+               process_statfiles (task);
+               /* Call post filters */
+               lua_call_post_filters (task);
+       }
 
+       /* Check if we have all events finished */
+       task->state = WRITE_REPLY;
        if (task->fin_callback) {
                task->fin_callback (task->fin_arg);
        }
@@ -603,6 +609,18 @@ fin_task (void *arg)
        }
 }
 
+/*
+ * Called if session was restored inside fin callback
+ */
+static void
+restore_task (void *arg)
+{
+       struct worker_task             *task = (struct worker_task *) arg;
+
+       /* Special state */
+       task->state = WAIT_POST_FILTER;
+}
+
 /*
  * Reduce number of tasks proceeded
  */
@@ -682,7 +700,7 @@ accept_socket (gint fd, short what, void *arg)
 
        /* Set up async session */
        new_task->s =
-                               new_async_session (new_task->task_pool, fin_task, free_task_hard, new_task);
+                               new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task);
 
        /* Init custom filters */
 #ifndef BUILD_STATIC