]> source.dussan.org Git - rspamd.git/commitdiff
Implement HTTP session for normal worker.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 13 Jan 2014 17:40:15 +0000 (17:40 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 13 Jan 2014 17:40:15 +0000 (17:40 +0000)
src/events.c
src/http.c
src/protocol.c
src/worker.c

index da132c64b66a268bc0ae9b349781403f820ce60c..711c3b24d42252bddd25a3fe73170f75c1a3d212 100644 (file)
@@ -200,8 +200,8 @@ check_session_pending (struct rspamd_async_session *session)
                        g_cond_wait (session->cond, session->mtx);
                }
                if (session->fin != NULL) {
+                       g_mutex_unlock (session->mtx);
                        if (! session->fin (session->user_data)) {
-                               g_mutex_unlock (session->mtx);
                                /* Session finished incompletely, perform restoration */
                                if (session->restore != NULL) {
                                        session->restore (session->user_data);
@@ -210,6 +210,9 @@ check_session_pending (struct rspamd_async_session *session)
                                }
                                return TRUE;
                        }
+                       else {
+                               return FALSE;
+                       }
                }
                g_mutex_unlock (session->mtx);
                return FALSE;
index 2d76ed0922b88444ddc76797e9270511923828e2..fff780e0f55cb448308f866e5edb45fee41a0f6a 100644 (file)
@@ -639,11 +639,6 @@ rspamd_http_connection_reset (struct rspamd_http_connection *conn)
                g_slice_free1 (sizeof (struct iovec) * priv->outlen, priv->out);
                priv->out = NULL;
        }
-
-       /* Clear conn itself */
-       if (conn->fd != -1) {
-               close (conn->fd);
-       }
 }
 
 void
index d78b6f470cf7cffb8cd4f8a05a3d0c7038889fd4..02ba94e394074a90333e1923b39a20bee00329e2 100644 (file)
@@ -809,6 +809,7 @@ rspamd_metric_symbol_ucl (struct worker_task *task, struct metric *m,
        description = g_hash_table_lookup (m->descriptions, sym->name);
 
        obj = ucl_object_insert_key (obj, ucl_object_fromstring (sym->name), "name", 0, false);
+       obj = ucl_object_insert_key (obj, ucl_object_fromdouble (sym->score), "score", 0, false);
        if (description) {
                obj = ucl_object_insert_key (obj, ucl_object_fromstring (description), "description", 0, false);
        }
@@ -882,17 +883,84 @@ rspamd_metric_result_ucl (struct worker_task *task, struct metric_result *mres,
        return obj;
 }
 
+/*
+ * GString ucl emitting functions
+ */
+static int
+rspamd_gstring_append_character (unsigned char c, size_t len, void *ud)
+{
+       GString *buf = ud;
+
+       if (len == 1) {
+               g_string_append_c (buf, c);
+       }
+       else {
+               if (buf->allocated_len - buf->len <= len) {
+                       g_string_set_size (buf, buf->len + len + 1);
+               }
+               memset (&buf->str[buf->len], c, len);
+               buf->len += len;
+               buf->str[buf->len] = '\0';
+       }
+
+       return 0;
+}
+
+static int
+rspamd_gstring_append_len (const unsigned char *str, size_t len, void *ud)
+{
+       GString *buf = ud;
+
+       g_string_append_len (buf, str, len);
+
+       return 0;
+}
+
+static int
+rspamd_gstring_append_int (int64_t val, void *ud)
+{
+       GString *buf = ud;
+
+       rspamd_printf_gstring (buf, "%L", (intmax_t)val);
+       return 0;
+}
+
+static int
+rspamd_gstring_append_double (double val, void *ud)
+{
+       GString *buf = ud;
+       const double delta = 0.0000001;
+
+       if (val == (double)(int)val) {
+               rspamd_printf_gstring (buf, "%.1f", val);
+       }
+       else if (fabs (val - (double)(int)val) < delta) {
+               /* Write at maximum precision */
+               rspamd_printf_gstring (buf, "%.*g", DBL_DIG, val);
+       }
+       else {
+               rspamd_printf_gstring (buf, "%f", val);
+       }
+
+       return 0;
+}
+
 static gboolean
-write_check_reply (struct worker_task *task)
+write_check_reply (struct rspamd_http_message *msg, struct worker_task *task)
 {
        GString                         *logbuf;
        struct metric_result           *metric_res;
        GHashTableIter                   hiter;
        gpointer                         h, v;
        ucl_object_t                    *top = NULL, *obj;
+       struct ucl_emitter_functions func = {
+               .ucl_emitter_append_character = rspamd_gstring_append_character,
+               .ucl_emitter_append_len = rspamd_gstring_append_len,
+               .ucl_emitter_append_int = rspamd_gstring_append_int,
+               .ucl_emitter_append_double = rspamd_gstring_append_double
+       };
 
        /* Output the first line - check status */
-
        logbuf = g_string_sized_new (BUFSIZ);
        rspamd_printf_gstring (logbuf, "id: <%s>, qid: <%s>, ", task->message_id, task->queue_id);
 
@@ -925,39 +993,34 @@ write_check_reply (struct worker_task *task)
        write_hashes_to_log (task, logbuf);
        msg_info ("%v", logbuf);
 
+       msg->body = g_string_sized_new (BUFSIZ);
+       func.ud = msg->body;
+       ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT, &func);
+
        /* Increase counters */
        task->worker->srv->stat->messages_scanned++;
 
+       rspamd_http_connection_write_message (task->http_conn, msg, NULL,
+                       "application/json", task, task->sock, &task->tv, task->ev_base);
        return TRUE;
 }
 
 gboolean
 write_reply (struct worker_task *task)
 {
-       gint                            r;
-       gchar                           outbuf[OUTBUFSIZ];
+       struct rspamd_http_message    *msg;
+
+       rspamd_http_connection_reset (task->http_conn);
+       msg = rspamd_http_new_message (HTTP_RESPONSE);
+       msg->date = time (NULL);
 
        debug_task ("writing reply to client");
        if (task->error_code != 0) {
-               /* Write error message and error code to reply */
-               if (task->is_http) {
-                       r = rspamd_snprintf (outbuf, sizeof (outbuf), "HTTP/1.0 400 Bad request" CRLF
-                                       "Connection: close" CRLF CRLF "Error: %d - %s" CRLF, task->error_code, task->last_error);
-               }
-               else {
-                       if (task->proto == SPAMC_PROTO) {
-                               r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s %d %s" CRLF CRLF,
-                                               SPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), task->error_code, SPAMD_ERROR);
-                               debug_task ("writing error: %s", outbuf);
-                       }
-                       else {
-                               r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s %d %s" CRLF "%s: %s" CRLF CRLF,
-                                               RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), task->error_code, SPAMD_ERROR, ERROR_HEADER, task->last_error);
-                               debug_task ("writing error: %s", outbuf);
-                       }
-               }
-               /* Write to bufferevent error message */
-               return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
+               msg->code = task->error_code;
+               rspamd_http_connection_write_message (task->http_conn, msg, NULL,
+                               "text/plain", task, task->sock, &task->tv, task->ev_base);
+               task->state = CLOSING_CONNECTION;
+               return TRUE;
        }
        else {
                switch (task->cmd) {
@@ -966,32 +1029,24 @@ write_reply (struct worker_task *task)
                case CMD_CHECK:
                case CMD_SYMBOLS:
                case CMD_PROCESS:
-                       return write_check_reply (task);
-                       break;
                case CMD_SKIP:
-                       r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 %s" CRLF, 
-                                       (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), SPAMD_OK);
-                       return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
+                       task->state = CLOSING_CONNECTION;
+                       return write_check_reply (msg, task);
                        break;
                case CMD_PING:
-                       r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 PONG" CRLF, 
-                                       (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver));
-                       return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
+                       rspamd_http_connection_write_message (task->http_conn, msg, NULL,
+                                               "text/plain", task, task->sock, &task->tv, task->ev_base);
+                       task->state = CLOSING_CONNECTION;
                        break;
                case CMD_LEARN:
-                       if (task->is_http) {
-                               r = rspamd_snprintf (outbuf, sizeof (outbuf), "HTTP/1.0 200 Ok" CRLF
-                                                                       "Connection: close" CRLF CRLF "%s" CRLF, task->last_error);
-                       }
-                       else {
-                               r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 LEARN" CRLF CRLF "%s" CRLF,
-                                               (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER,
-                                               rspamc_proto_str (task->proto_ver),
-                                               task->last_error);
-                       }
-                       return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
+                       msg->code = task->error_code;
+                       rspamd_http_connection_write_message (task->http_conn, msg, NULL,
+                                               "text/plain", task, task->sock, &task->tv, task->ev_base);
+                       task->state = CLOSING_CONNECTION;
+                       return TRUE;
                        break;
                case CMD_OTHER:
+                       task->state = CLOSING_CONNECTION;
                        return task->custom_cmd->func (task);
                }
        }
index a36a0ed72468faf96eb8a967c30f937f93ed461a..f2c579fdca70a1da45337bbd88b82bcb4edc630c 100644 (file)
@@ -399,7 +399,8 @@ fin_task (void *arg)
 {
        struct worker_task              *task = (struct worker_task *) arg;
        struct rspamd_worker_ctx        *ctx;
-
+       gint r;
+       GError *err = NULL;
 
        ctx = task->worker->ctx;
 
@@ -409,7 +410,7 @@ fin_task (void *arg)
                        task->fin_callback (task->fin_arg);
                }
                else {
-                       rspamd_dispatcher_restore (task->dispatcher);
+                       write_reply (task);
                }
                return TRUE;
        }
@@ -442,7 +443,7 @@ fin_task (void *arg)
                        task->fin_callback (task->fin_arg);
                }
                else {
-                       rspamd_dispatcher_restore (task->dispatcher);
+                       write_reply (task);
                }
        }
        else {
@@ -454,12 +455,31 @@ fin_task (void *arg)
                                task->fin_callback (task->fin_arg);
                        }
                        else {
-                               rspamd_dispatcher_restore (task->dispatcher);
+                               write_reply (task);
                        }
                }
                else {
-                       /* Check normal filters in write callback */
-                       rspamd_dispatcher_restore (task->dispatcher);
+                       task->state = WAIT_FILTER;
+                       r = process_filters (task);
+                       if (r == -1) {
+                               task->last_error = "Filter processing error";
+                               task->error_code = RSPAMD_FILTER_ERROR;
+                               task->state = WRITE_ERROR;
+                               write_reply (task);
+                       }
+                       /* Add task to classify to classify pool */
+                       if (!task->is_skipped && ctx->classify_pool) {
+                               register_async_thread (task->s);
+                               g_thread_pool_push (ctx->classify_pool, task, &err);
+                               if (err != NULL) {
+                                       msg_err ("cannot pull task to the pool: %s", err->message);
+                                       remove_async_thread (task->s);
+                                       g_error_free (err);
+                               }
+                       }
+                       if (task->is_skipped) {
+                               write_reply (task);
+                       }
                }
        }
 
@@ -585,7 +605,12 @@ static void
 rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
                struct rspamd_http_message *msg)
 {
+       struct worker_task             *task = (struct worker_task *) conn->ud;
 
+       if (task->state == CLOSING_CONNECTION) {
+               msg_debug ("normally closing connection from: %s", inet_ntoa (task->client_addr));
+               destroy_session (task->s);
+       }
 }
 
 /*