From: Vsevolod Stakhov Date: Mon, 13 Jan 2014 17:40:15 +0000 (+0000) Subject: Implement HTTP session for normal worker. X-Git-Tag: 0.7.0~478 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=f16357c9ac57de02a6f3a2c67774aa4976329d71;p=rspamd.git Implement HTTP session for normal worker. --- diff --git a/src/events.c b/src/events.c index da132c64b..711c3b24d 100644 --- a/src/events.c +++ b/src/events.c @@ -200,8 +200,8 @@ check_session_pending (struct rspamd_async_session *session) g_cond_wait (session->cond, session->mtx); } if (session->fin != NULL) { + g_mutex_unlock (session->mtx); if (! session->fin (session->user_data)) { - g_mutex_unlock (session->mtx); /* Session finished incompletely, perform restoration */ if (session->restore != NULL) { session->restore (session->user_data); @@ -210,6 +210,9 @@ check_session_pending (struct rspamd_async_session *session) } return TRUE; } + else { + return FALSE; + } } g_mutex_unlock (session->mtx); return FALSE; diff --git a/src/http.c b/src/http.c index 2d76ed092..fff780e0f 100644 --- a/src/http.c +++ b/src/http.c @@ -639,11 +639,6 @@ rspamd_http_connection_reset (struct rspamd_http_connection *conn) g_slice_free1 (sizeof (struct iovec) * priv->outlen, priv->out); priv->out = NULL; } - - /* Clear conn itself */ - if (conn->fd != -1) { - close (conn->fd); - } } void diff --git a/src/protocol.c b/src/protocol.c index d78b6f470..02ba94e39 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -809,6 +809,7 @@ rspamd_metric_symbol_ucl (struct worker_task *task, struct metric *m, description = g_hash_table_lookup (m->descriptions, sym->name); obj = ucl_object_insert_key (obj, ucl_object_fromstring (sym->name), "name", 0, false); + obj = ucl_object_insert_key (obj, ucl_object_fromdouble (sym->score), "score", 0, false); if (description) { obj = ucl_object_insert_key (obj, ucl_object_fromstring (description), "description", 0, false); } @@ -882,17 +883,84 @@ rspamd_metric_result_ucl (struct worker_task *task, struct metric_result *mres, return obj; } +/* + * GString ucl emitting functions + */ +static int +rspamd_gstring_append_character (unsigned char c, size_t len, void *ud) +{ + GString *buf = ud; + + if (len == 1) { + g_string_append_c (buf, c); + } + else { + if (buf->allocated_len - buf->len <= len) { + g_string_set_size (buf, buf->len + len + 1); + } + memset (&buf->str[buf->len], c, len); + buf->len += len; + buf->str[buf->len] = '\0'; + } + + return 0; +} + +static int +rspamd_gstring_append_len (const unsigned char *str, size_t len, void *ud) +{ + GString *buf = ud; + + g_string_append_len (buf, str, len); + + return 0; +} + +static int +rspamd_gstring_append_int (int64_t val, void *ud) +{ + GString *buf = ud; + + rspamd_printf_gstring (buf, "%L", (intmax_t)val); + return 0; +} + +static int +rspamd_gstring_append_double (double val, void *ud) +{ + GString *buf = ud; + const double delta = 0.0000001; + + if (val == (double)(int)val) { + rspamd_printf_gstring (buf, "%.1f", val); + } + else if (fabs (val - (double)(int)val) < delta) { + /* Write at maximum precision */ + rspamd_printf_gstring (buf, "%.*g", DBL_DIG, val); + } + else { + rspamd_printf_gstring (buf, "%f", val); + } + + return 0; +} + static gboolean -write_check_reply (struct worker_task *task) +write_check_reply (struct rspamd_http_message *msg, struct worker_task *task) { GString *logbuf; struct metric_result *metric_res; GHashTableIter hiter; gpointer h, v; ucl_object_t *top = NULL, *obj; + struct ucl_emitter_functions func = { + .ucl_emitter_append_character = rspamd_gstring_append_character, + .ucl_emitter_append_len = rspamd_gstring_append_len, + .ucl_emitter_append_int = rspamd_gstring_append_int, + .ucl_emitter_append_double = rspamd_gstring_append_double + }; /* Output the first line - check status */ - logbuf = g_string_sized_new (BUFSIZ); rspamd_printf_gstring (logbuf, "id: <%s>, qid: <%s>, ", task->message_id, task->queue_id); @@ -925,39 +993,34 @@ write_check_reply (struct worker_task *task) write_hashes_to_log (task, logbuf); msg_info ("%v", logbuf); + msg->body = g_string_sized_new (BUFSIZ); + func.ud = msg->body; + ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT, &func); + /* Increase counters */ task->worker->srv->stat->messages_scanned++; + rspamd_http_connection_write_message (task->http_conn, msg, NULL, + "application/json", task, task->sock, &task->tv, task->ev_base); return TRUE; } gboolean write_reply (struct worker_task *task) { - gint r; - gchar outbuf[OUTBUFSIZ]; + struct rspamd_http_message *msg; + + rspamd_http_connection_reset (task->http_conn); + msg = rspamd_http_new_message (HTTP_RESPONSE); + msg->date = time (NULL); debug_task ("writing reply to client"); if (task->error_code != 0) { - /* Write error message and error code to reply */ - if (task->is_http) { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "HTTP/1.0 400 Bad request" CRLF - "Connection: close" CRLF CRLF "Error: %d - %s" CRLF, task->error_code, task->last_error); - } - else { - if (task->proto == SPAMC_PROTO) { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s %d %s" CRLF CRLF, - SPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), task->error_code, SPAMD_ERROR); - debug_task ("writing error: %s", outbuf); - } - else { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s %d %s" CRLF "%s: %s" CRLF CRLF, - RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), task->error_code, SPAMD_ERROR, ERROR_HEADER, task->last_error); - debug_task ("writing error: %s", outbuf); - } - } - /* Write to bufferevent error message */ - return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE); + msg->code = task->error_code; + rspamd_http_connection_write_message (task->http_conn, msg, NULL, + "text/plain", task, task->sock, &task->tv, task->ev_base); + task->state = CLOSING_CONNECTION; + return TRUE; } else { switch (task->cmd) { @@ -966,32 +1029,24 @@ write_reply (struct worker_task *task) case CMD_CHECK: case CMD_SYMBOLS: case CMD_PROCESS: - return write_check_reply (task); - break; case CMD_SKIP: - r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 %s" CRLF, - (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), SPAMD_OK); - return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE); + task->state = CLOSING_CONNECTION; + return write_check_reply (msg, task); break; case CMD_PING: - r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 PONG" CRLF, - (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver)); - return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE); + rspamd_http_connection_write_message (task->http_conn, msg, NULL, + "text/plain", task, task->sock, &task->tv, task->ev_base); + task->state = CLOSING_CONNECTION; break; case CMD_LEARN: - if (task->is_http) { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "HTTP/1.0 200 Ok" CRLF - "Connection: close" CRLF CRLF "%s" CRLF, task->last_error); - } - else { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 LEARN" CRLF CRLF "%s" CRLF, - (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, - rspamc_proto_str (task->proto_ver), - task->last_error); - } - return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE); + msg->code = task->error_code; + rspamd_http_connection_write_message (task->http_conn, msg, NULL, + "text/plain", task, task->sock, &task->tv, task->ev_base); + task->state = CLOSING_CONNECTION; + return TRUE; break; case CMD_OTHER: + task->state = CLOSING_CONNECTION; return task->custom_cmd->func (task); } } diff --git a/src/worker.c b/src/worker.c index a36a0ed72..f2c579fdc 100644 --- a/src/worker.c +++ b/src/worker.c @@ -399,7 +399,8 @@ fin_task (void *arg) { struct worker_task *task = (struct worker_task *) arg; struct rspamd_worker_ctx *ctx; - + gint r; + GError *err = NULL; ctx = task->worker->ctx; @@ -409,7 +410,7 @@ fin_task (void *arg) task->fin_callback (task->fin_arg); } else { - rspamd_dispatcher_restore (task->dispatcher); + write_reply (task); } return TRUE; } @@ -442,7 +443,7 @@ fin_task (void *arg) task->fin_callback (task->fin_arg); } else { - rspamd_dispatcher_restore (task->dispatcher); + write_reply (task); } } else { @@ -454,12 +455,31 @@ fin_task (void *arg) task->fin_callback (task->fin_arg); } else { - rspamd_dispatcher_restore (task->dispatcher); + write_reply (task); } } else { - /* Check normal filters in write callback */ - rspamd_dispatcher_restore (task->dispatcher); + task->state = WAIT_FILTER; + r = process_filters (task); + if (r == -1) { + task->last_error = "Filter processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + write_reply (task); + } + /* Add task to classify to classify pool */ + if (!task->is_skipped && ctx->classify_pool) { + register_async_thread (task->s); + g_thread_pool_push (ctx->classify_pool, task, &err); + if (err != NULL) { + msg_err ("cannot pull task to the pool: %s", err->message); + remove_async_thread (task->s); + g_error_free (err); + } + } + if (task->is_skipped) { + write_reply (task); + } } } @@ -585,7 +605,12 @@ static void rspamd_worker_finish_handler (struct rspamd_http_connection *conn, struct rspamd_http_message *msg) { + struct worker_task *task = (struct worker_task *) conn->ud; + if (task->state == CLOSING_CONNECTION) { + msg_debug ("normally closing connection from: %s", inet_ntoa (task->client_addr)); + destroy_session (task->s); + } } /*