summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/controller.c17
-rw-r--r--src/filter.c10
-rw-r--r--src/http.c4
-rw-r--r--src/main.h8
-rw-r--r--src/protocol.c33
-rw-r--r--src/protocol.h13
-rw-r--r--src/webui.c13
-rw-r--r--src/worker.c46
-rw-r--r--src/worker_util.c2
9 files changed, 77 insertions, 69 deletions
diff --git a/src/controller.c b/src/controller.c
index 097ed0e7c..269ca6db2 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -1282,19 +1282,7 @@ fin_learn_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
- if (task->state != WRITING_REPLY) {
- task->state = WRITE_REPLY;
- }
-
- /* Check if we have all events finished */
- if (task->state != WRITING_REPLY) {
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_dispatcher_restore (task->dispatcher);
- }
- }
+ /* XXX: needs to be reworked */
return TRUE;
}
@@ -1308,9 +1296,6 @@ restore_learn_task (void *arg)
struct worker_task *task = (struct worker_task *) arg;
/* Special state */
- task->state = WRITING_REPLY;
-
- rspamd_dispatcher_pause (task->dispatcher);
}
static gboolean
diff --git a/src/filter.c b/src/filter.c
index b1448f173..20b924cc0 100644
--- a/src/filter.c
+++ b/src/filter.c
@@ -271,9 +271,6 @@ process_filters (struct worker_task *task)
task->is_skipped = TRUE;
task->state = WRITE_REPLY;
msg_info ("disable check for message id <%s>, user wants spam", task->message_id);
- task->s->wanna_die = TRUE;
- check_session_pending (task->s);
-
return 1;
}
@@ -286,15 +283,14 @@ process_filters (struct worker_task *task)
if (!task->pass_all_filters &&
metric->actions[METRIC_ACTION_REJECT].score > 0 &&
check_metric_is_spam (task, metric)) {
- task->s->wanna_die = TRUE;
- check_session_pending (task->s);
+ task->state = WRITE_REPLY;
return 1;
}
cur = g_list_next (cur);
}
}
- task->s->wanna_die = TRUE;
- check_session_pending (task->s);
+
+ task->state = WAIT_FILTER;
return 1;
}
diff --git a/src/http.c b/src/http.c
index fff780e0f..22c202b88 100644
--- a/src/http.c
+++ b/src/http.c
@@ -712,8 +712,8 @@ rspamd_http_connection_write_message (struct rspamd_http_connection *conn,
rspamd_snprintf (datebuf, sizeof (datebuf), "%s, %02d %s %4d %02d:%02d:%02d GMT",
http_week[t.tm_wday],
t.tm_mday,
- http_month[t.tm_mon - 1],
- t.tm_year,
+ http_month[t.tm_mon],
+ t.tm_year + 1900,
t.tm_hour,
t.tm_min,
t.tm_sec);
diff --git a/src/main.h b/src/main.h
index ea1172468..3acd5a870 100644
--- a/src/main.h
+++ b/src/main.h
@@ -176,16 +176,12 @@ struct controller_session {
struct worker_task {
struct rspamd_worker *worker; /**< pointer to worker object */
enum {
- READ_COMMAND,
- READ_HEADER,
READ_MESSAGE,
- WRITE_REPLY,
- WRITE_ERROR,
WAIT_PRE_FILTER,
WAIT_FILTER,
WAIT_POST_FILTER,
- CLOSING_CONNECTION,
- WRITING_REPLY
+ WRITE_REPLY,
+ CLOSING_CONNECTION
} state; /**< current session state */
enum rspamd_command cmd; /**< command */
struct custom_command *custom_cmd; /**< custom command if any */
diff --git a/src/protocol.c b/src/protocol.c
index 3351a5195..23ed859ad 100644
--- a/src/protocol.c
+++ b/src/protocol.c
@@ -687,7 +687,7 @@ rspamd_gstring_append_double (double val, void *ud)
return 0;
}
-static gboolean
+static void
write_check_reply (struct rspamd_http_message *msg, struct worker_task *task)
{
GString *logbuf;
@@ -743,28 +743,23 @@ write_check_reply (struct rspamd_http_message *msg, struct worker_task *task)
/* Increase counters */
task->worker->srv->stat->messages_scanned++;
-
- rspamd_http_connection_write_message (task->http_conn, msg, NULL,
- "application/json", task, task->sock, &task->tv, task->ev_base);
- return TRUE;
}
-gboolean
+void
rspamd_protocol_write_reply (struct worker_task *task)
{
struct rspamd_http_message *msg;
+ const gchar *ctype = "text/plain";
- rspamd_http_connection_reset (task->http_conn);
msg = rspamd_http_new_message (HTTP_RESPONSE);
msg->date = time (NULL);
+ task->state = CLOSING_CONNECTION;
+
debug_task ("writing reply to client");
if (task->error_code != 0) {
msg->code = task->error_code;
- rspamd_http_connection_write_message (task->http_conn, msg, NULL,
- "text/plain", task, task->sock, &task->tv, task->ev_base);
- task->state = CLOSING_CONNECTION;
- return TRUE;
+ msg->body = g_string_new (task->last_error);
}
else {
switch (task->cmd) {
@@ -774,21 +769,21 @@ rspamd_protocol_write_reply (struct worker_task *task)
case CMD_SYMBOLS:
case CMD_PROCESS:
case CMD_SKIP:
- task->state = CLOSING_CONNECTION;
- return write_check_reply (msg, task);
+ ctype = "application/json";
+ write_check_reply (msg, task);
break;
case CMD_PING:
- rspamd_http_connection_write_message (task->http_conn, msg, NULL,
- "text/plain", task, task->sock, &task->tv, task->ev_base);
- task->state = CLOSING_CONNECTION;
+ msg->body = g_string_new ("pong");
break;
case CMD_OTHER:
- task->state = CLOSING_CONNECTION;
- return task->custom_cmd->func (task);
+ msg_err ("BROKEN");
+ break;
}
}
- return FALSE;
+ rspamd_http_connection_reset (task->http_conn);
+ rspamd_http_connection_write_message (task->http_conn, msg, NULL,
+ ctype, task, task->sock, &task->tv, task->ev_base);
}
void
diff --git a/src/protocol.h b/src/protocol.h
index 4b62fdfb2..30d3c51db 100644
--- a/src/protocol.h
+++ b/src/protocol.h
@@ -10,11 +10,12 @@
#include "filter.h"
#include "http.h"
-#define RSPAMD_FILTER_ERROR 1
-#define RSPAMD_NETWORK_ERROR 2
-#define RSPAMD_PROTOCOL_ERROR 3
-#define RSPAMD_LENGTH_ERROR 4
-#define RSPAMD_STATFILE_ERROR 5
+#define RSPAMD_BASE_ERROR 500
+#define RSPAMD_FILTER_ERROR RSPAMD_BASE_ERROR + 1
+#define RSPAMD_NETWORK_ERROR RSPAMD_BASE_ERROR + 2
+#define RSPAMD_PROTOCOL_ERROR RSPAMD_BASE_ERROR + 3
+#define RSPAMD_LENGTH_ERROR RSPAMD_BASE_ERROR + 4
+#define RSPAMD_STATFILE_ERROR RSPAMD_BASE_ERROR + 5
struct worker_task;
struct metric;
@@ -52,7 +53,7 @@ gboolean rspamd_protocol_handle_request (struct worker_task *task, struct rspamd
* @param task task object
* @return 0 if we wrote reply and -1 if there was some error
*/
-gboolean rspamd_protocol_write_reply (struct worker_task *task);
+void rspamd_protocol_write_reply (struct worker_task *task);
/**
diff --git a/src/webui.c b/src/webui.c
index 3553523f8..4472e5d35 100644
--- a/src/webui.c
+++ b/src/webui.c
@@ -406,7 +406,7 @@ http_scan_task_fin (gpointer arg)
{
struct scan_callback_data *cbdata = arg;
static struct timeval tv = {.tv_sec = 0, .tv_usec = 0 };
-
+#if 0
if (cbdata->task->state != WRITING_REPLY) {
process_statfiles (cbdata->task);
cbdata->task->state = WRITE_REPLY;
@@ -426,7 +426,7 @@ http_scan_task_fin (gpointer arg)
event_base_once (cbdata->ctx->ev_base, -1, EV_TIMEOUT, http_scan_task_event_helper, cbdata, &tv);
}
}
-
+#endif
return TRUE;
}
@@ -439,7 +439,9 @@ http_scan_task_restore (gpointer arg)
struct scan_callback_data *cbdata = arg;
/* Special state */
+#if 0
cbdata->task->state = WRITING_REPLY;
+#endif
}
/* Prepare callback data for scan */
@@ -565,7 +567,7 @@ http_learn_task_fin (gpointer arg)
{
struct learn_callback_data *cbdata = arg;
static struct timeval tv = {.tv_sec = 0, .tv_usec = 0 };
-
+#if 0
if (cbdata->task->state != WRITING_REPLY) {
cbdata->task->state = WRITE_REPLY;
}
@@ -584,7 +586,7 @@ http_learn_task_fin (gpointer arg)
event_base_once (cbdata->ctx->ev_base, -1, EV_TIMEOUT, http_learn_task_event_helper, cbdata, &tv);
}
}
-
+#endif
return TRUE;
}
@@ -595,9 +597,10 @@ static void
http_learn_task_restore (gpointer arg)
{
struct learn_callback_data *cbdata = arg;
-
+#if 0
/* Special state */
cbdata->task->state = WRITING_REPLY;
+#endif
}
/* Prepare callback data for learn */
diff --git a/src/worker.c b/src/worker.c
index 4199a809f..fc1d7a904 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -155,6 +155,7 @@ sigusr1_handler (gint fd, short what, void *arg)
/*
* Called if all filters are processed
+ * @return TRUE if session should be terminated
*/
static gboolean
fin_task (void *arg)
@@ -219,6 +220,7 @@ fin_task (void *arg)
else {
rspamd_protocol_write_reply (task);
}
+ return TRUE;
}
else {
task->state = WAIT_FILTER;
@@ -226,8 +228,9 @@ fin_task (void *arg)
if (r == -1) {
task->last_error = "Filter processing error";
task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
+ task->state = WRITE_REPLY;
rspamd_protocol_write_reply (task);
+ return TRUE;
}
/* Add task to classify to classify pool */
if (!task->is_skipped && ctx->classify_pool) {
@@ -242,6 +245,9 @@ fin_task (void *arg)
if (task->is_skipped) {
rspamd_protocol_write_reply (task);
}
+ else {
+ return FALSE;
+ }
}
}
@@ -257,7 +263,9 @@ restore_task (void *arg)
struct worker_task *task = (struct worker_task *) arg;
/* Call post filters */
- lua_call_post_filters (task);
+ if (task->state == WAIT_POST_FILTER) {
+ lua_call_post_filters (task);
+ }
task->s->wanna_die = TRUE;
}
@@ -284,13 +292,18 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
ctx = task->worker->ctx;
+ if (task->cmd == CMD_PING) {
+ task->state = WRITE_REPLY;
+ return 0;
+ }
+
if (msg->body->len == 0) {
msg_err ("got zero length body, cannot continue");
return 0;
}
if (!rspamd_protocol_handle_request (task, msg)) {
- task->state = WRITE_ERROR;
+ task->state = WRITE_REPLY;
return 0;
}
@@ -298,12 +311,15 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
debug_task ("got string of length %z", task->msg->len);
+ /* We got body, set wanna_die flag */
+ task->s->wanna_die = TRUE;
+
r = process_message (task);
if (r == -1) {
msg_warn ("processing of message failed");
task->last_error = "MIME processing error";
task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
+ task->state = WRITE_REPLY;
return 0;
}
if (task->cmd == CMD_OTHER) {
@@ -317,7 +333,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
if (r == -1) {
task->last_error = "Filter processing error";
task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
+ task->state = WRITE_REPLY;
return 0;
}
/* Add task to classify to classify pool */
@@ -351,8 +367,19 @@ rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
{
struct worker_task *task = (struct worker_task *) conn->ud;
- msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message);
- destroy_session (task->s);
+ msg_info ("abnormally closing connection from: %s, error: %s",
+ inet_ntoa (task->client_addr), err->message);
+ if (task->state != CLOSING_CONNECTION) {
+ /* We still need to write a reply */
+ task->error_code = err->code;
+ task->last_error = err->message;
+ task->state = WRITE_REPLY;
+ rspamd_protocol_write_reply (task);
+ }
+ else {
+ /* Terminate session immediately */
+ destroy_session (task->s);
+ }
}
static void
@@ -366,6 +393,11 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
destroy_session (task->s);
}
else {
+ /*
+ * If all filters have finished their tasks, this function will trigger
+ * writing a reply.
+ */
+ task->s->wanna_die = TRUE;
check_session_pending (task->s);
}
}
diff --git a/src/worker_util.c b/src/worker_util.c
index 3da6a26a5..c21de269e 100644
--- a/src/worker_util.c
+++ b/src/worker_util.c
@@ -51,7 +51,7 @@ construct_task (struct rspamd_worker *worker)
new_task = g_slice_alloc0 (sizeof (struct worker_task));
new_task->worker = worker;
- new_task->state = READ_COMMAND;
+ new_task->state = READ_MESSAGE;
if (worker) {
new_task->cfg = worker->srv->cfg;
}