aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2014-01-13 17:40:15 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2014-01-13 17:40:15 +0000
commitf16357c9ac57de02a6f3a2c67774aa4976329d71 (patch)
treedefa261880fb773943752dbfa63f94cf6e5816c4 /src
parent6e61846cdac04d690a1ebaf0ca7fc2862efb2b0c (diff)
downloadrspamd-f16357c9ac57de02a6f3a2c67774aa4976329d71.tar.gz
rspamd-f16357c9ac57de02a6f3a2c67774aa4976329d71.zip
Implement HTTP session for normal worker.
Diffstat (limited to 'src')
-rw-r--r--src/events.c5
-rw-r--r--src/http.c5
-rw-r--r--src/protocol.c139
-rw-r--r--src/worker.c37
4 files changed, 132 insertions, 54 deletions
diff --git a/src/events.c b/src/events.c
index da132c64b..711c3b24d 100644
--- a/src/events.c
+++ b/src/events.c
@@ -200,8 +200,8 @@ check_session_pending (struct rspamd_async_session *session)
g_cond_wait (session->cond, session->mtx);
}
if (session->fin != NULL) {
+ g_mutex_unlock (session->mtx);
if (! session->fin (session->user_data)) {
- g_mutex_unlock (session->mtx);
/* Session finished incompletely, perform restoration */
if (session->restore != NULL) {
session->restore (session->user_data);
@@ -210,6 +210,9 @@ check_session_pending (struct rspamd_async_session *session)
}
return TRUE;
}
+ else {
+ return FALSE;
+ }
}
g_mutex_unlock (session->mtx);
return FALSE;
diff --git a/src/http.c b/src/http.c
index 2d76ed092..fff780e0f 100644
--- a/src/http.c
+++ b/src/http.c
@@ -639,11 +639,6 @@ rspamd_http_connection_reset (struct rspamd_http_connection *conn)
g_slice_free1 (sizeof (struct iovec) * priv->outlen, priv->out);
priv->out = NULL;
}
-
- /* Clear conn itself */
- if (conn->fd != -1) {
- close (conn->fd);
- }
}
void
diff --git a/src/protocol.c b/src/protocol.c
index d78b6f470..02ba94e39 100644
--- a/src/protocol.c
+++ b/src/protocol.c
@@ -809,6 +809,7 @@ rspamd_metric_symbol_ucl (struct worker_task *task, struct metric *m,
description = g_hash_table_lookup (m->descriptions, sym->name);
obj = ucl_object_insert_key (obj, ucl_object_fromstring (sym->name), "name", 0, false);
+ obj = ucl_object_insert_key (obj, ucl_object_fromdouble (sym->score), "score", 0, false);
if (description) {
obj = ucl_object_insert_key (obj, ucl_object_fromstring (description), "description", 0, false);
}
@@ -882,17 +883,84 @@ rspamd_metric_result_ucl (struct worker_task *task, struct metric_result *mres,
return obj;
}
+/*
+ * GString ucl emitting functions
+ */
+static int
+rspamd_gstring_append_character (unsigned char c, size_t len, void *ud)
+{
+ GString *buf = ud;
+
+ if (len == 1) {
+ g_string_append_c (buf, c);
+ }
+ else {
+ if (buf->allocated_len - buf->len <= len) {
+ g_string_set_size (buf, buf->len + len + 1);
+ }
+ memset (&buf->str[buf->len], c, len);
+ buf->len += len;
+ buf->str[buf->len] = '\0';
+ }
+
+ return 0;
+}
+
+static int
+rspamd_gstring_append_len (const unsigned char *str, size_t len, void *ud)
+{
+ GString *buf = ud;
+
+ g_string_append_len (buf, str, len);
+
+ return 0;
+}
+
+static int
+rspamd_gstring_append_int (int64_t val, void *ud)
+{
+ GString *buf = ud;
+
+ rspamd_printf_gstring (buf, "%L", (intmax_t)val);
+ return 0;
+}
+
+static int
+rspamd_gstring_append_double (double val, void *ud)
+{
+ GString *buf = ud;
+ const double delta = 0.0000001;
+
+ if (val == (double)(int)val) {
+ rspamd_printf_gstring (buf, "%.1f", val);
+ }
+ else if (fabs (val - (double)(int)val) < delta) {
+ /* Write at maximum precision */
+ rspamd_printf_gstring (buf, "%.*g", DBL_DIG, val);
+ }
+ else {
+ rspamd_printf_gstring (buf, "%f", val);
+ }
+
+ return 0;
+}
+
static gboolean
-write_check_reply (struct worker_task *task)
+write_check_reply (struct rspamd_http_message *msg, struct worker_task *task)
{
GString *logbuf;
struct metric_result *metric_res;
GHashTableIter hiter;
gpointer h, v;
ucl_object_t *top = NULL, *obj;
+ struct ucl_emitter_functions func = {
+ .ucl_emitter_append_character = rspamd_gstring_append_character,
+ .ucl_emitter_append_len = rspamd_gstring_append_len,
+ .ucl_emitter_append_int = rspamd_gstring_append_int,
+ .ucl_emitter_append_double = rspamd_gstring_append_double
+ };
/* Output the first line - check status */
-
logbuf = g_string_sized_new (BUFSIZ);
rspamd_printf_gstring (logbuf, "id: <%s>, qid: <%s>, ", task->message_id, task->queue_id);
@@ -925,39 +993,34 @@ write_check_reply (struct worker_task *task)
write_hashes_to_log (task, logbuf);
msg_info ("%v", logbuf);
+ msg->body = g_string_sized_new (BUFSIZ);
+ func.ud = msg->body;
+ ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT, &func);
+
/* Increase counters */
task->worker->srv->stat->messages_scanned++;
+ rspamd_http_connection_write_message (task->http_conn, msg, NULL,
+ "application/json", task, task->sock, &task->tv, task->ev_base);
return TRUE;
}
gboolean
write_reply (struct worker_task *task)
{
- gint r;
- gchar outbuf[OUTBUFSIZ];
+ struct rspamd_http_message *msg;
+
+ rspamd_http_connection_reset (task->http_conn);
+ msg = rspamd_http_new_message (HTTP_RESPONSE);
+ msg->date = time (NULL);
debug_task ("writing reply to client");
if (task->error_code != 0) {
- /* Write error message and error code to reply */
- if (task->is_http) {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "HTTP/1.0 400 Bad request" CRLF
- "Connection: close" CRLF CRLF "Error: %d - %s" CRLF, task->error_code, task->last_error);
- }
- else {
- if (task->proto == SPAMC_PROTO) {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s %d %s" CRLF CRLF,
- SPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), task->error_code, SPAMD_ERROR);
- debug_task ("writing error: %s", outbuf);
- }
- else {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s %d %s" CRLF "%s: %s" CRLF CRLF,
- RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), task->error_code, SPAMD_ERROR, ERROR_HEADER, task->last_error);
- debug_task ("writing error: %s", outbuf);
- }
- }
- /* Write to bufferevent error message */
- return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
+ msg->code = task->error_code;
+ rspamd_http_connection_write_message (task->http_conn, msg, NULL,
+ "text/plain", task, task->sock, &task->tv, task->ev_base);
+ task->state = CLOSING_CONNECTION;
+ return TRUE;
}
else {
switch (task->cmd) {
@@ -966,32 +1029,24 @@ write_reply (struct worker_task *task)
case CMD_CHECK:
case CMD_SYMBOLS:
case CMD_PROCESS:
- return write_check_reply (task);
- break;
case CMD_SKIP:
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 %s" CRLF,
- (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), SPAMD_OK);
- return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
+ task->state = CLOSING_CONNECTION;
+ return write_check_reply (msg, task);
break;
case CMD_PING:
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 PONG" CRLF,
- (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver));
- return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
+ rspamd_http_connection_write_message (task->http_conn, msg, NULL,
+ "text/plain", task, task->sock, &task->tv, task->ev_base);
+ task->state = CLOSING_CONNECTION;
break;
case CMD_LEARN:
- if (task->is_http) {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "HTTP/1.0 200 Ok" CRLF
- "Connection: close" CRLF CRLF "%s" CRLF, task->last_error);
- }
- else {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 LEARN" CRLF CRLF "%s" CRLF,
- (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER,
- rspamc_proto_str (task->proto_ver),
- task->last_error);
- }
- return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
+ msg->code = task->error_code;
+ rspamd_http_connection_write_message (task->http_conn, msg, NULL,
+ "text/plain", task, task->sock, &task->tv, task->ev_base);
+ task->state = CLOSING_CONNECTION;
+ return TRUE;
break;
case CMD_OTHER:
+ task->state = CLOSING_CONNECTION;
return task->custom_cmd->func (task);
}
}
diff --git a/src/worker.c b/src/worker.c
index a36a0ed72..f2c579fdc 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -399,7 +399,8 @@ fin_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
struct rspamd_worker_ctx *ctx;
-
+ gint r;
+ GError *err = NULL;
ctx = task->worker->ctx;
@@ -409,7 +410,7 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
- rspamd_dispatcher_restore (task->dispatcher);
+ write_reply (task);
}
return TRUE;
}
@@ -442,7 +443,7 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
- rspamd_dispatcher_restore (task->dispatcher);
+ write_reply (task);
}
}
else {
@@ -454,12 +455,31 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
- rspamd_dispatcher_restore (task->dispatcher);
+ write_reply (task);
}
}
else {
- /* Check normal filters in write callback */
- rspamd_dispatcher_restore (task->dispatcher);
+ task->state = WAIT_FILTER;
+ r = process_filters (task);
+ if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ write_reply (task);
+ }
+ /* Add task to classify to classify pool */
+ if (!task->is_skipped && ctx->classify_pool) {
+ register_async_thread (task->s);
+ g_thread_pool_push (ctx->classify_pool, task, &err);
+ if (err != NULL) {
+ msg_err ("cannot pull task to the pool: %s", err->message);
+ remove_async_thread (task->s);
+ g_error_free (err);
+ }
+ }
+ if (task->is_skipped) {
+ write_reply (task);
+ }
}
}
@@ -585,7 +605,12 @@ static void
rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
{
+ struct worker_task *task = (struct worker_task *) conn->ud;
+ if (task->state == CLOSING_CONNECTION) {
+ msg_debug ("normally closing connection from: %s", inet_ntoa (task->client_addr));
+ destroy_session (task->s);
+ }
}
/*