aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2014-01-12 15:20:50 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2014-01-12 15:20:50 +0000
commit67b932ad9786743dc032ff2adc3788c5aadd3933 (patch)
tree084872b519dc54beef83cbdade01af8bb639eafd /src/worker.c
parentd6b454eda97f6ffe8455ec83ab8da6293e7b9c07 (diff)
downloadrspamd-67b932ad9786743dc032ff2adc3788c5aadd3933.tar.gz
rspamd-67b932ad9786743dc032ff2adc3788c5aadd3933.zip
Start moving to HTTP world.
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c107
1 files changed, 106 insertions, 1 deletions
diff --git a/src/worker.c b/src/worker.c
index 3eb91a289..a36a0ed72 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -155,7 +155,7 @@ sigusr1_handler (gint fd, short what, void *arg)
return;
}
-
+# if 0
/*
* Callback that is called when there is data to read in buffer
*/
@@ -389,6 +389,7 @@ err_socket (GError * err, void *arg)
g_error_free (err);
destroy_session (task->s);
}
+#endif
/*
* Called if all filters are processed
@@ -489,6 +490,104 @@ reduce_tasks_count (gpointer arg)
(*tasks) --;
}
+static gboolean
+rspamd_worker_body_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg,
+ const gchar *chunk, gsize len)
+{
+ struct worker_task *task = (struct worker_task *) conn->ud;
+ struct rspamd_worker_ctx *ctx;
+ ssize_t r;
+ GError *err = NULL;
+
+ ctx = task->worker->ctx;
+
+ if (msg->body->len == 0) {
+ msg_err ("got zero length body, cannot continue");
+ return FALSE;
+ }
+
+ task->msg = msg->body;
+
+ debug_task ("got string of length %z", task->msg->len);
+
+ r = process_message (task);
+ if (r == -1) {
+ msg_warn ("processing of message failed");
+ task->last_error = "MIME processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ return FALSE;
+ }
+ if (task->cmd == CMD_OTHER) {
+ /* Skip filters */
+ task->state = WRITE_REPLY;
+ return FALSE;
+ }
+ else if (task->cmd == CMD_LEARN) {
+ if (!learn_task (task->statfile, task, &err)) {
+ task->last_error = memory_pool_strdup (task->task_pool, err->message);
+ task->error_code = err->code;
+ g_error_free (err);
+ task->state = WRITE_ERROR;
+ }
+ else {
+ task->last_error = "learn ok";
+ task->error_code = 0;
+ task->state = WRITE_REPLY;
+ }
+ return FALSE;
+ }
+ else {
+ if (task->cfg->pre_filters == NULL) {
+ r = process_filters (task);
+ if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ return FALSE;
+ }
+ /* Add task to classify to classify pool */
+ if (!task->is_skipped && ctx->classify_pool) {
+ register_async_thread (task->s);
+ g_thread_pool_push (ctx->classify_pool, task, &err);
+ if (err != NULL) {
+ msg_err ("cannot pull task to the pool: %s", err->message);
+ remove_async_thread (task->s);
+ }
+ }
+ if (task->is_skipped) {
+ /* Call write_socket to write reply and exit */
+ return TRUE;
+ }
+ }
+ else {
+ lua_call_pre_filters (task);
+ /* We want fin_task after pre filters are processed */
+ task->s->wanna_die = TRUE;
+ task->state = WAIT_PRE_FILTER;
+ check_session_pending (task->s);
+ }
+ }
+ return TRUE;
+}
+
+static void
+rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
+{
+ struct worker_task *task = (struct worker_task *) conn->ud;
+
+ msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message);
+ destroy_session (task->s);
+}
+
+static void
+rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg)
+{
+
+}
+
/*
* Accept new connection and construct task
*/
@@ -550,11 +649,15 @@ accept_socket (gint fd, short what, void *arg)
new_task->resolver = ctx->resolver;
msec_to_tv (ctx->timeout, &ctx->io_tv);
+#if 0
/* Set up dispatcher */
new_task->dispatcher =
rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, read_socket, write_socket,
err_socket, &ctx->io_tv, (void *) new_task);
new_task->dispatcher->peer_addr = new_task->client_addr.s_addr;
+#endif
+ new_task->http_conn = rspamd_http_connection_new (rspamd_worker_body_handler,
+ rspamd_worker_error_handler, rspamd_worker_finish_handler, 0, RSPAMD_HTTP_SERVER);
new_task->ev_base = ctx->ev_base;
ctx->tasks ++;
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks);
@@ -562,6 +665,8 @@ accept_socket (gint fd, short what, void *arg)
/* Set up async session */
new_task->s =
new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task);
+
+ rspamd_http_connection_read_message (new_task->http_conn, new_task, nfd, &ctx->io_tv, ctx->ev_base);
}
gpointer