]> source.dussan.org Git - rspamd.git/commitdiff
Simplify states of task. Disable broken code.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sun, 19 Jan 2014 00:27:54 +0000 (00:27 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sun, 19 Jan 2014 00:27:54 +0000 (00:27 +0000)
src/controller.c
src/filter.c
src/http.c
src/main.h
src/protocol.c
src/protocol.h
src/webui.c
src/worker.c
src/worker_util.c

index 097ed0e7c69dc1ffc40e81038468bb53c071cd46..269ca6db2fdeee84b6b3daec1a7d7842c145ddf9 100644 (file)
@@ -1282,19 +1282,7 @@ fin_learn_task (void *arg)
 {
        struct worker_task             *task = (struct worker_task *) arg;
 
-       if (task->state != WRITING_REPLY) {
-               task->state = WRITE_REPLY;
-       }
-
-       /* Check if we have all events finished */
-       if (task->state != WRITING_REPLY) {
-               if (task->fin_callback) {
-                       task->fin_callback (task->fin_arg);
-               }
-               else {
-                       rspamd_dispatcher_restore (task->dispatcher);
-               }
-       }
+       /* XXX: needs to be reworked */
 
        return TRUE;
 }
@@ -1308,9 +1296,6 @@ restore_learn_task (void *arg)
        struct worker_task             *task = (struct worker_task *) arg;
 
        /* Special state */
-       task->state = WRITING_REPLY;
-
-       rspamd_dispatcher_pause (task->dispatcher);
 }
 
 static                          gboolean
index b1448f1732b23d40b974f02d0f0fa59f50f1c42b..20b924cc039536ca74f3ed5e48a1fa28d0b5686c 100644 (file)
@@ -271,9 +271,6 @@ process_filters (struct worker_task *task)
                task->is_skipped = TRUE;
                task->state = WRITE_REPLY;
                msg_info ("disable check for message id <%s>, user wants spam", task->message_id);
-               task->s->wanna_die = TRUE;
-               check_session_pending (task->s);
-
                return 1;
        }
 
@@ -286,15 +283,14 @@ process_filters (struct worker_task *task)
                        if (!task->pass_all_filters &&
                                                metric->actions[METRIC_ACTION_REJECT].score > 0 &&
                                                check_metric_is_spam (task, metric)) {
-                               task->s->wanna_die = TRUE;
-                               check_session_pending (task->s);
+                               task->state = WRITE_REPLY;
                                return 1;
                        }
                        cur = g_list_next (cur);
                }
        }
-       task->s->wanna_die = TRUE;
-       check_session_pending (task->s);
+
+       task->state = WAIT_FILTER;
 
        return 1;
 }
index fff780e0f55cb448308f866e5edb45fee41a0f6a..22c202b88a518d7a237112d908b98f642ad9c3f6 100644 (file)
@@ -712,8 +712,8 @@ rspamd_http_connection_write_message (struct rspamd_http_connection *conn,
                rspamd_snprintf (datebuf, sizeof (datebuf), "%s, %02d %s %4d %02d:%02d:%02d GMT",
                                http_week[t.tm_wday],
                                t.tm_mday,
-                               http_month[t.tm_mon - 1],
-                               t.tm_year,
+                               http_month[t.tm_mon],
+                               t.tm_year + 1900,
                                t.tm_hour,
                                t.tm_min,
                                t.tm_sec);
index ea1172468c4ba2d4363fc9e8275d47fc8fc6caa3..3acd5a8705c60d6187e538b446cedbe3dbd542e7 100644 (file)
@@ -176,16 +176,12 @@ struct controller_session {
 struct worker_task {
        struct rspamd_worker *worker;                                                           /**< pointer to worker object                                           */
        enum {
-               READ_COMMAND,
-               READ_HEADER,
                READ_MESSAGE,
-               WRITE_REPLY,
-               WRITE_ERROR,
                WAIT_PRE_FILTER,
                WAIT_FILTER,
                WAIT_POST_FILTER,
-               CLOSING_CONNECTION,
-               WRITING_REPLY
+               WRITE_REPLY,
+               CLOSING_CONNECTION
        } state;                                                                                                        /**< current session state                                                      */
        enum rspamd_command cmd;                                                                        /**< command                                                                            */
        struct custom_command *custom_cmd;                                                      /**< custom command if any                                                      */      
index 3351a51951f83f086654512a1c22acc80602081c..23ed859ad1fa793a2f9b92cb7614d8e01521807b 100644 (file)
@@ -687,7 +687,7 @@ rspamd_gstring_append_double (double val, void *ud)
        return 0;
 }
 
-static gboolean
+static void
 write_check_reply (struct rspamd_http_message *msg, struct worker_task *task)
 {
        GString                         *logbuf;
@@ -743,28 +743,23 @@ write_check_reply (struct rspamd_http_message *msg, struct worker_task *task)
 
        /* Increase counters */
        task->worker->srv->stat->messages_scanned++;
-
-       rspamd_http_connection_write_message (task->http_conn, msg, NULL,
-                       "application/json", task, task->sock, &task->tv, task->ev_base);
-       return TRUE;
 }
 
-gboolean
+void
 rspamd_protocol_write_reply (struct worker_task *task)
 {
        struct rspamd_http_message    *msg;
+       const gchar                   *ctype = "text/plain";
 
-       rspamd_http_connection_reset (task->http_conn);
        msg = rspamd_http_new_message (HTTP_RESPONSE);
        msg->date = time (NULL);
 
+       task->state = CLOSING_CONNECTION;
+
        debug_task ("writing reply to client");
        if (task->error_code != 0) {
                msg->code = task->error_code;
-               rspamd_http_connection_write_message (task->http_conn, msg, NULL,
-                               "text/plain", task, task->sock, &task->tv, task->ev_base);
-               task->state = CLOSING_CONNECTION;
-               return TRUE;
+               msg->body = g_string_new (task->last_error);
        }
        else {
                switch (task->cmd) {
@@ -774,21 +769,21 @@ rspamd_protocol_write_reply (struct worker_task *task)
                case CMD_SYMBOLS:
                case CMD_PROCESS:
                case CMD_SKIP:
-                       task->state = CLOSING_CONNECTION;
-                       return write_check_reply (msg, task);
+                       ctype = "application/json";
+                       write_check_reply (msg, task);
                        break;
                case CMD_PING:
-                       rspamd_http_connection_write_message (task->http_conn, msg, NULL,
-                                               "text/plain", task, task->sock, &task->tv, task->ev_base);
-                       task->state = CLOSING_CONNECTION;
+                       msg->body = g_string_new ("pong");
                        break;
                case CMD_OTHER:
-                       task->state = CLOSING_CONNECTION;
-                       return task->custom_cmd->func (task);
+                       msg_err ("BROKEN");
+                       break;
                }
        }
 
-       return FALSE;
+       rspamd_http_connection_reset (task->http_conn);
+       rspamd_http_connection_write_message (task->http_conn, msg, NULL,
+                                       ctype, task, task->sock, &task->tv, task->ev_base);
 }
 
 void
index 4b62fdfb219f37f3a9585a832d79e59677b8d73e..30d3c51dbb65aee90a508e9ac8b1b8539a67213c 100644 (file)
 #include "filter.h"
 #include "http.h"
 
-#define RSPAMD_FILTER_ERROR 1
-#define RSPAMD_NETWORK_ERROR 2
-#define RSPAMD_PROTOCOL_ERROR 3
-#define RSPAMD_LENGTH_ERROR 4
-#define RSPAMD_STATFILE_ERROR 5
+#define RSPAMD_BASE_ERROR 500
+#define RSPAMD_FILTER_ERROR RSPAMD_BASE_ERROR + 1
+#define RSPAMD_NETWORK_ERROR RSPAMD_BASE_ERROR + 2
+#define RSPAMD_PROTOCOL_ERROR RSPAMD_BASE_ERROR + 3
+#define RSPAMD_LENGTH_ERROR RSPAMD_BASE_ERROR + 4
+#define RSPAMD_STATFILE_ERROR RSPAMD_BASE_ERROR + 5
 
 struct worker_task;
 struct metric;
@@ -52,7 +53,7 @@ gboolean rspamd_protocol_handle_request (struct worker_task *task, struct rspamd
  * @param task task object
  * @return 0 if we wrote reply and -1 if there was some error
  */
-gboolean rspamd_protocol_write_reply (struct worker_task *task);
+void rspamd_protocol_write_reply (struct worker_task *task);
 
 
 /**
index 3553523f8d8cff87c776143d7d0d85e87e60eb23..4472e5d355c6610ae1a5819750a062562807d7f3 100644 (file)
@@ -406,7 +406,7 @@ http_scan_task_fin (gpointer arg)
 {
        struct scan_callback_data                               *cbdata = arg;
        static struct timeval                                    tv = {.tv_sec = 0, .tv_usec = 0 };
-
+#if 0
        if (cbdata->task->state != WRITING_REPLY) {
                process_statfiles (cbdata->task);
                cbdata->task->state = WRITE_REPLY;
@@ -426,7 +426,7 @@ http_scan_task_fin (gpointer arg)
                        event_base_once (cbdata->ctx->ev_base, -1, EV_TIMEOUT, http_scan_task_event_helper, cbdata, &tv);
                }
        }
-
+#endif
        return TRUE;
 }
 
@@ -439,7 +439,9 @@ http_scan_task_restore (gpointer arg)
        struct scan_callback_data                               *cbdata = arg;
 
        /* Special state */
+#if 0
        cbdata->task->state = WRITING_REPLY;
+#endif
 }
 
 /* Prepare callback data for scan */
@@ -565,7 +567,7 @@ http_learn_task_fin (gpointer arg)
 {
        struct learn_callback_data                              *cbdata = arg;
        static struct timeval                                    tv = {.tv_sec = 0, .tv_usec = 0 };
-
+#if 0
        if (cbdata->task->state != WRITING_REPLY) {
                cbdata->task->state = WRITE_REPLY;
        }
@@ -584,7 +586,7 @@ http_learn_task_fin (gpointer arg)
                        event_base_once (cbdata->ctx->ev_base, -1, EV_TIMEOUT, http_learn_task_event_helper, cbdata, &tv);
                }
        }
-
+#endif
        return TRUE;
 }
 
@@ -595,9 +597,10 @@ static void
 http_learn_task_restore (gpointer arg)
 {
        struct learn_callback_data                              *cbdata = arg;
-
+#if 0
        /* Special state */
        cbdata->task->state = WRITING_REPLY;
+#endif
 }
 
 /* Prepare callback data for learn */
index 4199a809f1aa2d664fd7609d66a7b673322d24bd..fc1d7a9047fa8455c280256e4795bb5db7d15109 100644 (file)
@@ -155,6 +155,7 @@ sigusr1_handler (gint fd, short what, void *arg)
 
 /*
  * Called if all filters are processed
+ * @return TRUE if session should be terminated
  */
 static gboolean
 fin_task (void *arg)
@@ -219,6 +220,7 @@ fin_task (void *arg)
                        else {
                                rspamd_protocol_write_reply (task);
                        }
+                       return TRUE;
                }
                else {
                        task->state = WAIT_FILTER;
@@ -226,8 +228,9 @@ fin_task (void *arg)
                        if (r == -1) {
                                task->last_error = "Filter processing error";
                                task->error_code = RSPAMD_FILTER_ERROR;
-                               task->state = WRITE_ERROR;
+                               task->state = WRITE_REPLY;
                                rspamd_protocol_write_reply (task);
+                               return TRUE;
                        }
                        /* Add task to classify to classify pool */
                        if (!task->is_skipped && ctx->classify_pool) {
@@ -242,6 +245,9 @@ fin_task (void *arg)
                        if (task->is_skipped) {
                                rspamd_protocol_write_reply (task);
                        }
+                       else {
+                               return FALSE;
+                       }
                }
        }
 
@@ -257,7 +263,9 @@ restore_task (void *arg)
        struct worker_task             *task = (struct worker_task *) arg;
 
        /* Call post filters */
-       lua_call_post_filters (task);
+       if (task->state == WAIT_POST_FILTER) {
+               lua_call_post_filters (task);
+       }
        task->s->wanna_die = TRUE;
 }
 
@@ -284,13 +292,18 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
 
        ctx = task->worker->ctx;
 
+       if (task->cmd == CMD_PING) {
+               task->state = WRITE_REPLY;
+               return 0;
+       }
+
        if (msg->body->len == 0) {
                msg_err ("got zero length body, cannot continue");
                return 0;
        }
 
        if (!rspamd_protocol_handle_request (task, msg)) {
-               task->state = WRITE_ERROR;
+               task->state = WRITE_REPLY;
                return 0;
        }
 
@@ -298,12 +311,15 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
 
        debug_task ("got string of length %z", task->msg->len);
 
+       /* We got body, set wanna_die flag */
+       task->s->wanna_die = TRUE;
+
        r = process_message (task);
        if (r == -1) {
                msg_warn ("processing of message failed");
                task->last_error = "MIME processing error";
                task->error_code = RSPAMD_FILTER_ERROR;
-               task->state = WRITE_ERROR;
+               task->state = WRITE_REPLY;
                return 0;
        }
        if (task->cmd == CMD_OTHER) {
@@ -317,7 +333,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
                        if (r == -1) {
                                task->last_error = "Filter processing error";
                                task->error_code = RSPAMD_FILTER_ERROR;
-                               task->state = WRITE_ERROR;
+                               task->state = WRITE_REPLY;
                                return 0;
                        }
                        /* Add task to classify to classify pool */
@@ -351,8 +367,19 @@ rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
 {
        struct worker_task             *task = (struct worker_task *) conn->ud;
 
-       msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message);
-       destroy_session (task->s);
+       msg_info ("abnormally closing connection from: %s, error: %s",
+                       inet_ntoa (task->client_addr), err->message);
+       if (task->state != CLOSING_CONNECTION) {
+               /* We still need to write a reply */
+               task->error_code = err->code;
+               task->last_error = err->message;
+               task->state = WRITE_REPLY;
+               rspamd_protocol_write_reply (task);
+       }
+       else {
+               /* Terminate session immediately */
+               destroy_session (task->s);
+       }
 }
 
 static void
@@ -366,6 +393,11 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
                destroy_session (task->s);
        }
        else {
+               /*
+                * If all filters have finished their tasks, this function will trigger
+                * writing a reply.
+                */
+               task->s->wanna_die = TRUE;
                check_session_pending (task->s);
        }
 }
index 3da6a26a5f81063741ec1f1a178258e5bee51435..c21de269e84291f274b4058120f717cd230bcb12 100644 (file)
@@ -51,7 +51,7 @@ construct_task (struct rspamd_worker *worker)
        new_task = g_slice_alloc0 (sizeof (struct worker_task));
 
        new_task->worker = worker;
-       new_task->state = READ_COMMAND;
+       new_task->state = READ_MESSAGE;
        if (worker) {
                new_task->cfg = worker->srv->cfg;
        }