From f8be5f536bda6428c8fdbe306b8ceab88dedcf58 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 24 Dec 2019 17:00:46 +0000 Subject: [PATCH] [Minor] Move functions --- src/libserver/task.c | 121 ++++++++++++++++++++++++++++++++++++++++++ src/libserver/task.h | 10 ++++ src/worker.c | 122 +------------------------------------------ src/worker_private.h | 10 ---- 4 files changed, 132 insertions(+), 131 deletions(-) diff --git a/src/libserver/task.c b/src/libserver/task.c index 86886b1e2..c1fcc752f 100644 --- a/src/libserver/task.c +++ b/src/libserver/task.c @@ -1857,4 +1857,125 @@ rspamd_task_stage_name (enum rspamd_task_stage stg) } return ret; +} + +void +rspamd_task_timeout (EV_P_ ev_timer *w, int revents) +{ + struct rspamd_task *task = (struct rspamd_task *)w->data; + + if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) { + ev_now_update_if_cheap (task->event_loop); + msg_info_task ("processing of task time out: %.1fs spent; %.1fs limit; " + "forced processing", + ev_now (task->event_loop) - task->task_timestamp, + w->repeat); + + if (task->cfg->soft_reject_on_timeout) { + struct rspamd_action *action, *soft_reject; + + action = rspamd_check_action_metric (task); + + if (action->action_type != METRIC_ACTION_REJECT) { + soft_reject = rspamd_config_get_action_by_type (task->cfg, + METRIC_ACTION_SOFT_REJECT); + rspamd_add_passthrough_result (task, + soft_reject, + 0, + NAN, + "timeout processing message", + "task timeout", + 0); + + ucl_object_replace_key (task->messages, + ucl_object_fromstring_common ("timeout processing message", + 0, UCL_STRING_RAW), + "smtp_message", 0, + false); + } + } + + ev_timer_again (EV_A_ w); + task->processed_stages |= RSPAMD_TASK_STAGE_FILTERS; + rspamd_session_cleanup (task->s); + rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL); + rspamd_session_pending (task->s); + } + else { + /* Postprocessing timeout */ + msg_info_task ("post-processing of task time out: %.1f second spent; forced processing", + ev_now (task->event_loop) - task->task_timestamp); + + if (task->cfg->soft_reject_on_timeout) { + struct rspamd_action *action, *soft_reject; + + action = rspamd_check_action_metric (task); + + if (action->action_type != METRIC_ACTION_REJECT) { + soft_reject = rspamd_config_get_action_by_type (task->cfg, + METRIC_ACTION_SOFT_REJECT); + rspamd_add_passthrough_result (task, + soft_reject, + 0, + NAN, + "timeout post-processing message", + "task timeout", + 0); + + ucl_object_replace_key (task->messages, + ucl_object_fromstring_common ("timeout post-processing message", + 0, UCL_STRING_RAW), + "smtp_message", 0, + false); + } + } + + ev_timer_stop (EV_A_ w); + task->processed_stages |= RSPAMD_TASK_STAGE_DONE; + rspamd_session_cleanup (task->s); + rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL); + rspamd_session_pending (task->s); + } +} + +void +rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents) +{ + struct rspamd_task *task = (struct rspamd_task *)w->data; + gchar fake_buf[1024]; + gssize r; + + r = read (w->fd, fake_buf, sizeof (fake_buf)); + + if (r > 0) { + msg_warn_task ("received extra data after task is loaded, ignoring"); + } + else { + if (r == 0) { + /* + * Poor man approach, that might break things in case of + * shutdown (SHUT_WR) but sockets are so bad that there's no + * reliable way to distinguish between shutdown(SHUT_WR) and + * close. + */ + if (task->cmd != CMD_CHECK_V2 && task->cfg->enable_shutdown_workaround) { + msg_info_task ("workaround for shutdown enabled, please update " + "your client, this support might be removed in future"); + shutdown (w->fd, SHUT_RD); + ev_io_stop (task->event_loop, &task->guard_ev); + } + else { + msg_err_task ("the peer has closed connection unexpectedly"); + rspamd_session_destroy (task->s); + } + } + else if (errno != EAGAIN) { + msg_err_task ("the peer has closed connection unexpectedly: %s", + strerror (errno)); + rspamd_session_destroy (task->s); + } + else { + return; + } + } } \ No newline at end of file diff --git a/src/libserver/task.h b/src/libserver/task.h index feac456dd..50e07b23f 100644 --- a/src/libserver/task.h +++ b/src/libserver/task.h @@ -375,6 +375,16 @@ gboolean rspamd_task_set_finish_time (struct rspamd_task *task); */ const gchar *rspamd_task_stage_name (enum rspamd_task_stage stg); +/* + * Called on forced timeout + */ +void rspamd_task_timeout (EV_P_ ev_timer *w, int revents); + +/* + * Called on unexpected IO error (e.g. ECONNRESET) + */ +void rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents); + #ifdef __cplusplus } #endif diff --git a/src/worker.c b/src/worker.c index b825603e1..4f13db469 100644 --- a/src/worker.c +++ b/src/worker.c @@ -102,127 +102,6 @@ reduce_tasks_count (gpointer arg) } } -void -rspamd_task_timeout (EV_P_ ev_timer *w, int revents) -{ - struct rspamd_task *task = (struct rspamd_task *)w->data; - - if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) { - ev_now_update_if_cheap (task->event_loop); - msg_info_task ("processing of task time out: %.1f (%.1f limit) second spent; " - "forced processing", - ev_now (task->event_loop) - task->task_timestamp, - task->cfg->task_timeout); - - if (task->cfg->soft_reject_on_timeout) { - struct rspamd_action *action, *soft_reject; - - action = rspamd_check_action_metric (task); - - if (action->action_type != METRIC_ACTION_REJECT) { - soft_reject = rspamd_config_get_action_by_type (task->cfg, - METRIC_ACTION_SOFT_REJECT); - rspamd_add_passthrough_result (task, - soft_reject, - 0, - NAN, - "timeout processing message", - "task timeout", - 0); - - ucl_object_replace_key (task->messages, - ucl_object_fromstring_common ("timeout processing message", - 0, UCL_STRING_RAW), - "smtp_message", 0, - false); - } - } - - ev_timer_again (EV_A_ w); - task->processed_stages |= RSPAMD_TASK_STAGE_FILTERS; - rspamd_session_cleanup (task->s); - rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL); - rspamd_session_pending (task->s); - } - else { - /* Postprocessing timeout */ - msg_info_task ("post-processing of task time out: %.1f second spent; forced processing", - ev_now (task->event_loop) - task->task_timestamp); - - if (task->cfg->soft_reject_on_timeout) { - struct rspamd_action *action, *soft_reject; - - action = rspamd_check_action_metric (task); - - if (action->action_type != METRIC_ACTION_REJECT) { - soft_reject = rspamd_config_get_action_by_type (task->cfg, - METRIC_ACTION_SOFT_REJECT); - rspamd_add_passthrough_result (task, - soft_reject, - 0, - NAN, - "timeout post-processing message", - "task timeout", - 0); - - ucl_object_replace_key (task->messages, - ucl_object_fromstring_common ("timeout post-processing message", - 0, UCL_STRING_RAW), - "smtp_message", 0, - false); - } - } - - ev_timer_stop (EV_A_ w); - task->processed_stages |= RSPAMD_TASK_STAGE_DONE; - rspamd_session_cleanup (task->s); - rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL); - rspamd_session_pending (task->s); - } -} - -void -rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents) -{ - struct rspamd_task *task = (struct rspamd_task *)w->data; - gchar fake_buf[1024]; - gssize r; - - r = read (w->fd, fake_buf, sizeof (fake_buf)); - - if (r > 0) { - msg_warn_task ("received extra data after task is loaded, ignoring"); - } - else { - if (r == 0) { - /* - * Poor man approach, that might break things in case of - * shutdown (SHUT_WR) but sockets are so bad that there's no - * reliable way to distinguish between shutdown(SHUT_WR) and - * close. - */ - if (task->cmd != CMD_CHECK_V2 && task->cfg->enable_shutdown_workaround) { - msg_info_task ("workaround for shutdown enabled, please update " - "your client, this support might be removed in future"); - shutdown (w->fd, SHUT_RD); - ev_io_stop (task->event_loop, &task->guard_ev); - } - else { - msg_err_task ("the peer has closed connection unexpectedly"); - rspamd_session_destroy (task->s); - } - } - else if (errno != EAGAIN) { - msg_err_task ("the peer has closed connection unexpectedly: %s", - strerror (errno)); - rspamd_session_destroy (task->s); - } - else { - return; - } - } -} - static gint rspamd_worker_body_handler (struct rspamd_http_connection *conn, struct rspamd_http_message *msg, @@ -312,6 +191,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, ev_timer_init (&task->timeout_ev, rspamd_task_timeout, ctx->task_timeout, ctx->task_timeout); + ev_set_priority (&task->timeout_ev, EV_MAXPRI); ev_timer_start (task->event_loop, &task->timeout_ev); } diff --git a/src/worker_private.h b/src/worker_private.h index cef2c9a19..62fec96f1 100644 --- a/src/worker_private.h +++ b/src/worker_private.h @@ -65,16 +65,6 @@ void rspamd_worker_init_scanner (struct rspamd_worker *worker, struct rspamd_dns_resolver *resolver, struct rspamd_lang_detector **plang_det); -/* - * Called on forced timeout - */ -void rspamd_task_timeout (EV_P_ ev_timer *w, int revents); - -/* - * Called on unexpected IO error (e.g. ECONNRESET) - */ -void rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents); - #ifdef __cplusplus } #endif -- 2.39.5