summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-12-24 17:00:46 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-12-24 17:00:46 +0000
commitf8be5f536bda6428c8fdbe306b8ceab88dedcf58 (patch)
tree6bd7bdb2af1d6f97dbedcf57fc4fab56ef9ff473
parent14c1c342fadc78232bc4dfefb6b39d7f07bf456a (diff)
downloadrspamd-f8be5f536bda6428c8fdbe306b8ceab88dedcf58.tar.gz
rspamd-f8be5f536bda6428c8fdbe306b8ceab88dedcf58.zip
[Minor] Move functions
-rw-r--r--src/libserver/task.c121
-rw-r--r--src/libserver/task.h10
-rw-r--r--src/worker.c122
-rw-r--r--src/worker_private.h10
4 files changed, 132 insertions, 131 deletions
diff --git a/src/libserver/task.c b/src/libserver/task.c
index 86886b1e2..c1fcc752f 100644
--- a/src/libserver/task.c
+++ b/src/libserver/task.c
@@ -1857,4 +1857,125 @@ rspamd_task_stage_name (enum rspamd_task_stage stg)
}
return ret;
+}
+
+void
+rspamd_task_timeout (EV_P_ ev_timer *w, int revents)
+{
+ struct rspamd_task *task = (struct rspamd_task *)w->data;
+
+ if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) {
+ ev_now_update_if_cheap (task->event_loop);
+ msg_info_task ("processing of task time out: %.1fs spent; %.1fs limit; "
+ "forced processing",
+ ev_now (task->event_loop) - task->task_timestamp,
+ w->repeat);
+
+ if (task->cfg->soft_reject_on_timeout) {
+ struct rspamd_action *action, *soft_reject;
+
+ action = rspamd_check_action_metric (task);
+
+ if (action->action_type != METRIC_ACTION_REJECT) {
+ soft_reject = rspamd_config_get_action_by_type (task->cfg,
+ METRIC_ACTION_SOFT_REJECT);
+ rspamd_add_passthrough_result (task,
+ soft_reject,
+ 0,
+ NAN,
+ "timeout processing message",
+ "task timeout",
+ 0);
+
+ ucl_object_replace_key (task->messages,
+ ucl_object_fromstring_common ("timeout processing message",
+ 0, UCL_STRING_RAW),
+ "smtp_message", 0,
+ false);
+ }
+ }
+
+ ev_timer_again (EV_A_ w);
+ task->processed_stages |= RSPAMD_TASK_STAGE_FILTERS;
+ rspamd_session_cleanup (task->s);
+ rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
+ rspamd_session_pending (task->s);
+ }
+ else {
+ /* Postprocessing timeout */
+ msg_info_task ("post-processing of task time out: %.1f second spent; forced processing",
+ ev_now (task->event_loop) - task->task_timestamp);
+
+ if (task->cfg->soft_reject_on_timeout) {
+ struct rspamd_action *action, *soft_reject;
+
+ action = rspamd_check_action_metric (task);
+
+ if (action->action_type != METRIC_ACTION_REJECT) {
+ soft_reject = rspamd_config_get_action_by_type (task->cfg,
+ METRIC_ACTION_SOFT_REJECT);
+ rspamd_add_passthrough_result (task,
+ soft_reject,
+ 0,
+ NAN,
+ "timeout post-processing message",
+ "task timeout",
+ 0);
+
+ ucl_object_replace_key (task->messages,
+ ucl_object_fromstring_common ("timeout post-processing message",
+ 0, UCL_STRING_RAW),
+ "smtp_message", 0,
+ false);
+ }
+ }
+
+ ev_timer_stop (EV_A_ w);
+ task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
+ rspamd_session_cleanup (task->s);
+ rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
+ rspamd_session_pending (task->s);
+ }
+}
+
+void
+rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents)
+{
+ struct rspamd_task *task = (struct rspamd_task *)w->data;
+ gchar fake_buf[1024];
+ gssize r;
+
+ r = read (w->fd, fake_buf, sizeof (fake_buf));
+
+ if (r > 0) {
+ msg_warn_task ("received extra data after task is loaded, ignoring");
+ }
+ else {
+ if (r == 0) {
+ /*
+ * Poor man approach, that might break things in case of
+ * shutdown (SHUT_WR) but sockets are so bad that there's no
+ * reliable way to distinguish between shutdown(SHUT_WR) and
+ * close.
+ */
+ if (task->cmd != CMD_CHECK_V2 && task->cfg->enable_shutdown_workaround) {
+ msg_info_task ("workaround for shutdown enabled, please update "
+ "your client, this support might be removed in future");
+ shutdown (w->fd, SHUT_RD);
+ ev_io_stop (task->event_loop, &task->guard_ev);
+ }
+ else {
+ msg_err_task ("the peer has closed connection unexpectedly");
+ rspamd_session_destroy (task->s);
+ }
+ }
+ else if (errno != EAGAIN) {
+ msg_err_task ("the peer has closed connection unexpectedly: %s",
+ strerror (errno));
+ rspamd_session_destroy (task->s);
+ }
+ else {
+ return;
+ }
+ }
} \ No newline at end of file
diff --git a/src/libserver/task.h b/src/libserver/task.h
index feac456dd..50e07b23f 100644
--- a/src/libserver/task.h
+++ b/src/libserver/task.h
@@ -375,6 +375,16 @@ gboolean rspamd_task_set_finish_time (struct rspamd_task *task);
*/
const gchar *rspamd_task_stage_name (enum rspamd_task_stage stg);
+/*
+ * Called on forced timeout
+ */
+void rspamd_task_timeout (EV_P_ ev_timer *w, int revents);
+
+/*
+ * Called on unexpected IO error (e.g. ECONNRESET)
+ */
+void rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/worker.c b/src/worker.c
index b825603e1..4f13db469 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -102,127 +102,6 @@ reduce_tasks_count (gpointer arg)
}
}
-void
-rspamd_task_timeout (EV_P_ ev_timer *w, int revents)
-{
- struct rspamd_task *task = (struct rspamd_task *)w->data;
-
- if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) {
- ev_now_update_if_cheap (task->event_loop);
- msg_info_task ("processing of task time out: %.1f (%.1f limit) second spent; "
- "forced processing",
- ev_now (task->event_loop) - task->task_timestamp,
- task->cfg->task_timeout);
-
- if (task->cfg->soft_reject_on_timeout) {
- struct rspamd_action *action, *soft_reject;
-
- action = rspamd_check_action_metric (task);
-
- if (action->action_type != METRIC_ACTION_REJECT) {
- soft_reject = rspamd_config_get_action_by_type (task->cfg,
- METRIC_ACTION_SOFT_REJECT);
- rspamd_add_passthrough_result (task,
- soft_reject,
- 0,
- NAN,
- "timeout processing message",
- "task timeout",
- 0);
-
- ucl_object_replace_key (task->messages,
- ucl_object_fromstring_common ("timeout processing message",
- 0, UCL_STRING_RAW),
- "smtp_message", 0,
- false);
- }
- }
-
- ev_timer_again (EV_A_ w);
- task->processed_stages |= RSPAMD_TASK_STAGE_FILTERS;
- rspamd_session_cleanup (task->s);
- rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
- rspamd_session_pending (task->s);
- }
- else {
- /* Postprocessing timeout */
- msg_info_task ("post-processing of task time out: %.1f second spent; forced processing",
- ev_now (task->event_loop) - task->task_timestamp);
-
- if (task->cfg->soft_reject_on_timeout) {
- struct rspamd_action *action, *soft_reject;
-
- action = rspamd_check_action_metric (task);
-
- if (action->action_type != METRIC_ACTION_REJECT) {
- soft_reject = rspamd_config_get_action_by_type (task->cfg,
- METRIC_ACTION_SOFT_REJECT);
- rspamd_add_passthrough_result (task,
- soft_reject,
- 0,
- NAN,
- "timeout post-processing message",
- "task timeout",
- 0);
-
- ucl_object_replace_key (task->messages,
- ucl_object_fromstring_common ("timeout post-processing message",
- 0, UCL_STRING_RAW),
- "smtp_message", 0,
- false);
- }
- }
-
- ev_timer_stop (EV_A_ w);
- task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
- rspamd_session_cleanup (task->s);
- rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
- rspamd_session_pending (task->s);
- }
-}
-
-void
-rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents)
-{
- struct rspamd_task *task = (struct rspamd_task *)w->data;
- gchar fake_buf[1024];
- gssize r;
-
- r = read (w->fd, fake_buf, sizeof (fake_buf));
-
- if (r > 0) {
- msg_warn_task ("received extra data after task is loaded, ignoring");
- }
- else {
- if (r == 0) {
- /*
- * Poor man approach, that might break things in case of
- * shutdown (SHUT_WR) but sockets are so bad that there's no
- * reliable way to distinguish between shutdown(SHUT_WR) and
- * close.
- */
- if (task->cmd != CMD_CHECK_V2 && task->cfg->enable_shutdown_workaround) {
- msg_info_task ("workaround for shutdown enabled, please update "
- "your client, this support might be removed in future");
- shutdown (w->fd, SHUT_RD);
- ev_io_stop (task->event_loop, &task->guard_ev);
- }
- else {
- msg_err_task ("the peer has closed connection unexpectedly");
- rspamd_session_destroy (task->s);
- }
- }
- else if (errno != EAGAIN) {
- msg_err_task ("the peer has closed connection unexpectedly: %s",
- strerror (errno));
- rspamd_session_destroy (task->s);
- }
- else {
- return;
- }
- }
-}
-
static gint
rspamd_worker_body_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg,
@@ -312,6 +191,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
ctx->task_timeout,
ctx->task_timeout);
+ ev_set_priority (&task->timeout_ev, EV_MAXPRI);
ev_timer_start (task->event_loop, &task->timeout_ev);
}
diff --git a/src/worker_private.h b/src/worker_private.h
index cef2c9a19..62fec96f1 100644
--- a/src/worker_private.h
+++ b/src/worker_private.h
@@ -65,16 +65,6 @@ void rspamd_worker_init_scanner (struct rspamd_worker *worker,
struct rspamd_dns_resolver *resolver,
struct rspamd_lang_detector **plang_det);
-/*
- * Called on forced timeout
- */
-void rspamd_task_timeout (EV_P_ ev_timer *w, int revents);
-
-/*
- * Called on unexpected IO error (e.g. ECONNRESET)
- */
-void rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents);
-
#ifdef __cplusplus
}
#endif