{
struct worker_task *task = (struct worker_task *) arg;
- if (task->state != WRITING_REPLY) {
- task->state = WRITE_REPLY;
- }
-
- /* Check if we have all events finished */
- if (task->state != WRITING_REPLY) {
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_dispatcher_restore (task->dispatcher);
- }
- }
+ /* XXX: needs to be reworked */
return TRUE;
}
struct worker_task *task = (struct worker_task *) arg;
/* Special state */
- task->state = WRITING_REPLY;
-
- rspamd_dispatcher_pause (task->dispatcher);
}
static gboolean
task->is_skipped = TRUE;
task->state = WRITE_REPLY;
msg_info ("disable check for message id <%s>, user wants spam", task->message_id);
- task->s->wanna_die = TRUE;
- check_session_pending (task->s);
-
return 1;
}
if (!task->pass_all_filters &&
metric->actions[METRIC_ACTION_REJECT].score > 0 &&
check_metric_is_spam (task, metric)) {
- task->s->wanna_die = TRUE;
- check_session_pending (task->s);
+ task->state = WRITE_REPLY;
return 1;
}
cur = g_list_next (cur);
}
}
- task->s->wanna_die = TRUE;
- check_session_pending (task->s);
+
+ task->state = WAIT_FILTER;
return 1;
}
rspamd_snprintf (datebuf, sizeof (datebuf), "%s, %02d %s %4d %02d:%02d:%02d GMT",
http_week[t.tm_wday],
t.tm_mday,
- http_month[t.tm_mon - 1],
- t.tm_year,
+ http_month[t.tm_mon],
+ t.tm_year + 1900,
t.tm_hour,
t.tm_min,
t.tm_sec);
struct worker_task {
struct rspamd_worker *worker; /**< pointer to worker object */
enum {
- READ_COMMAND,
- READ_HEADER,
READ_MESSAGE,
- WRITE_REPLY,
- WRITE_ERROR,
WAIT_PRE_FILTER,
WAIT_FILTER,
WAIT_POST_FILTER,
- CLOSING_CONNECTION,
- WRITING_REPLY
+ WRITE_REPLY,
+ CLOSING_CONNECTION
} state; /**< current session state */
enum rspamd_command cmd; /**< command */
struct custom_command *custom_cmd; /**< custom command if any */
return 0;
}
-static gboolean
+static void
write_check_reply (struct rspamd_http_message *msg, struct worker_task *task)
{
GString *logbuf;
/* Increase counters */
task->worker->srv->stat->messages_scanned++;
-
- rspamd_http_connection_write_message (task->http_conn, msg, NULL,
- "application/json", task, task->sock, &task->tv, task->ev_base);
- return TRUE;
}
-gboolean
+void
rspamd_protocol_write_reply (struct worker_task *task)
{
struct rspamd_http_message *msg;
+ const gchar *ctype = "text/plain";
- rspamd_http_connection_reset (task->http_conn);
msg = rspamd_http_new_message (HTTP_RESPONSE);
msg->date = time (NULL);
+ task->state = CLOSING_CONNECTION;
+
debug_task ("writing reply to client");
if (task->error_code != 0) {
msg->code = task->error_code;
- rspamd_http_connection_write_message (task->http_conn, msg, NULL,
- "text/plain", task, task->sock, &task->tv, task->ev_base);
- task->state = CLOSING_CONNECTION;
- return TRUE;
+ msg->body = g_string_new (task->last_error);
}
else {
switch (task->cmd) {
case CMD_SYMBOLS:
case CMD_PROCESS:
case CMD_SKIP:
- task->state = CLOSING_CONNECTION;
- return write_check_reply (msg, task);
+ ctype = "application/json";
+ write_check_reply (msg, task);
break;
case CMD_PING:
- rspamd_http_connection_write_message (task->http_conn, msg, NULL,
- "text/plain", task, task->sock, &task->tv, task->ev_base);
- task->state = CLOSING_CONNECTION;
+ msg->body = g_string_new ("pong");
break;
case CMD_OTHER:
- task->state = CLOSING_CONNECTION;
- return task->custom_cmd->func (task);
+ msg_err ("BROKEN");
+ break;
}
}
- return FALSE;
+ rspamd_http_connection_reset (task->http_conn);
+ rspamd_http_connection_write_message (task->http_conn, msg, NULL,
+ ctype, task, task->sock, &task->tv, task->ev_base);
}
void
#include "filter.h"
#include "http.h"
-#define RSPAMD_FILTER_ERROR 1
-#define RSPAMD_NETWORK_ERROR 2
-#define RSPAMD_PROTOCOL_ERROR 3
-#define RSPAMD_LENGTH_ERROR 4
-#define RSPAMD_STATFILE_ERROR 5
+#define RSPAMD_BASE_ERROR 500
+#define RSPAMD_FILTER_ERROR RSPAMD_BASE_ERROR + 1
+#define RSPAMD_NETWORK_ERROR RSPAMD_BASE_ERROR + 2
+#define RSPAMD_PROTOCOL_ERROR RSPAMD_BASE_ERROR + 3
+#define RSPAMD_LENGTH_ERROR RSPAMD_BASE_ERROR + 4
+#define RSPAMD_STATFILE_ERROR RSPAMD_BASE_ERROR + 5
struct worker_task;
struct metric;
* @param task task object
* @return 0 if we wrote reply and -1 if there was some error
*/
-gboolean rspamd_protocol_write_reply (struct worker_task *task);
+void rspamd_protocol_write_reply (struct worker_task *task);
/**
{
struct scan_callback_data *cbdata = arg;
static struct timeval tv = {.tv_sec = 0, .tv_usec = 0 };
-
+#if 0
if (cbdata->task->state != WRITING_REPLY) {
process_statfiles (cbdata->task);
cbdata->task->state = WRITE_REPLY;
event_base_once (cbdata->ctx->ev_base, -1, EV_TIMEOUT, http_scan_task_event_helper, cbdata, &tv);
}
}
-
+#endif
return TRUE;
}
struct scan_callback_data *cbdata = arg;
/* Special state */
+#if 0
cbdata->task->state = WRITING_REPLY;
+#endif
}
/* Prepare callback data for scan */
{
struct learn_callback_data *cbdata = arg;
static struct timeval tv = {.tv_sec = 0, .tv_usec = 0 };
-
+#if 0
if (cbdata->task->state != WRITING_REPLY) {
cbdata->task->state = WRITE_REPLY;
}
event_base_once (cbdata->ctx->ev_base, -1, EV_TIMEOUT, http_learn_task_event_helper, cbdata, &tv);
}
}
-
+#endif
return TRUE;
}
http_learn_task_restore (gpointer arg)
{
struct learn_callback_data *cbdata = arg;
-
+#if 0
/* Special state */
cbdata->task->state = WRITING_REPLY;
+#endif
}
/* Prepare callback data for learn */
/*
* Called if all filters are processed
+ * @return TRUE if session should be terminated
*/
static gboolean
fin_task (void *arg)
else {
rspamd_protocol_write_reply (task);
}
+ return TRUE;
}
else {
task->state = WAIT_FILTER;
if (r == -1) {
task->last_error = "Filter processing error";
task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
+ task->state = WRITE_REPLY;
rspamd_protocol_write_reply (task);
+ return TRUE;
}
/* Add task to classify to classify pool */
if (!task->is_skipped && ctx->classify_pool) {
if (task->is_skipped) {
rspamd_protocol_write_reply (task);
}
+ else {
+ return FALSE;
+ }
}
}
struct worker_task *task = (struct worker_task *) arg;
/* Call post filters */
- lua_call_post_filters (task);
+ if (task->state == WAIT_POST_FILTER) {
+ lua_call_post_filters (task);
+ }
task->s->wanna_die = TRUE;
}
ctx = task->worker->ctx;
+ if (task->cmd == CMD_PING) {
+ task->state = WRITE_REPLY;
+ return 0;
+ }
+
if (msg->body->len == 0) {
msg_err ("got zero length body, cannot continue");
return 0;
}
if (!rspamd_protocol_handle_request (task, msg)) {
- task->state = WRITE_ERROR;
+ task->state = WRITE_REPLY;
return 0;
}
debug_task ("got string of length %z", task->msg->len);
+ /* We got body, set wanna_die flag */
+ task->s->wanna_die = TRUE;
+
r = process_message (task);
if (r == -1) {
msg_warn ("processing of message failed");
task->last_error = "MIME processing error";
task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
+ task->state = WRITE_REPLY;
return 0;
}
if (task->cmd == CMD_OTHER) {
if (r == -1) {
task->last_error = "Filter processing error";
task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
+ task->state = WRITE_REPLY;
return 0;
}
/* Add task to classify to classify pool */
{
struct worker_task *task = (struct worker_task *) conn->ud;
- msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message);
- destroy_session (task->s);
+ msg_info ("abnormally closing connection from: %s, error: %s",
+ inet_ntoa (task->client_addr), err->message);
+ if (task->state != CLOSING_CONNECTION) {
+ /* We still need to write a reply */
+ task->error_code = err->code;
+ task->last_error = err->message;
+ task->state = WRITE_REPLY;
+ rspamd_protocol_write_reply (task);
+ }
+ else {
+ /* Terminate session immediately */
+ destroy_session (task->s);
+ }
}
static void
destroy_session (task->s);
}
else {
+ /*
+ * If all filters have finished their tasks, this function will trigger
+ * writing a reply.
+ */
+ task->s->wanna_die = TRUE;
check_session_pending (task->s);
}
}
new_task = g_slice_alloc0 (sizeof (struct worker_task));
new_task->worker = worker;
- new_task->state = READ_COMMAND;
+ new_task->state = READ_MESSAGE;
if (worker) {
new_task->cfg = worker->srv->cfg;
}