]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Adopt normal worker and contorller
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 19 Jun 2019 16:07:56 +0000 (17:07 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 22 Jun 2019 09:57:29 +0000 (10:57 +0100)
src/controller.c
src/libserver/task.c
src/libserver/task.h
src/libserver/worker_util.c
src/libstat/backends/redis_backend.c
src/rspamadm/control.c
src/worker.c
src/worker_private.h

index 8510879450b034f9c870a983c4e49e63e0455aa1..374880952bf3b8a8ddff55e9a8e605480325d50c 100644 (file)
@@ -134,8 +134,7 @@ struct rspamd_controller_worker_ctx {
        /* Config */
        struct rspamd_config *cfg;
        /* END OF COMMON PART */
-       guint32 timeout;
-       struct timeval io_tv;
+       ev_tstamp timeout;
        /* Whether we use ssl for this server */
        gboolean use_ssl;
        /* Webui password */
@@ -728,7 +727,7 @@ rspamd_controller_handle_auth (struct rspamd_http_connection_entry *conn_ent,
        data[4] = st->actions_stat[METRIC_ACTION_SOFT_REJECT];
 
        /* Get uptime */
-       uptime = time (NULL) - session->ctx->start_time;
+       uptime = ev_time () - session->ctx->start_time;
 
        ucl_object_insert_key (obj, ucl_object_fromstring (
                        RVERSION),                         "version",  0, false);
@@ -996,7 +995,7 @@ rspamd_controller_handle_get_map (struct rspamd_http_connection_entry *conn_ent,
        struct rspamd_controller_session *session = conn_ent->ud;
        GList *cur;
        struct rspamd_map *map;
-       struct rspamd_map_backend *bk;
+       struct rspamd_map_backend *bk = NULL;
        const rspamd_ftok_t *idstr;
        struct stat st;
        gint fd;
@@ -1037,7 +1036,7 @@ rspamd_controller_handle_get_map (struct rspamd_http_connection_entry *conn_ent,
                cur = g_list_next (cur);
        }
 
-       if (!found) {
+       if (!found || bk == NULL) {
                msg_info_session ("map not found");
                rspamd_controller_send_error (conn_ent, 404, "Map not found");
                return 0;
@@ -1075,7 +1074,7 @@ rspamd_controller_handle_get_map (struct rspamd_http_connection_entry *conn_ent,
        rspamd_http_router_insert_headers (conn_ent->rt, reply);
        rspamd_http_connection_write_message (conn_ent->conn, reply, NULL,
                        "text/plain", conn_ent,
-                       conn_ent->rt->ptv);
+                       conn_ent->rt->timeout);
        conn_ent->is_reply = TRUE;
 
        return 0;
@@ -1385,13 +1384,13 @@ rspamd_controller_handle_legacy_history (
                row = &copied_rows[row_num];
                /* Get only completed rows */
                if (row->completed) {
-                       rspamd_localtime (row->tv.tv_sec, &tm);
+                       rspamd_localtime (row->timestamp, &tm);
                        strftime (timebuf, sizeof (timebuf) - 1, "%Y-%m-%d %H:%M:%S", &tm);
                        obj = ucl_object_typed_new (UCL_OBJECT);
                        ucl_object_insert_key (obj, ucl_object_fromstring (
                                        timebuf),                 "time", 0, false);
                        ucl_object_insert_key (obj, ucl_object_fromint (
-                                       row->tv.tv_sec), "unix_time", 0, false);
+                                       row->timestamp), "unix_time", 0, false);
                        ucl_object_insert_key (obj, ucl_object_fromstring (
                                        row->message_id), "id",   0, false);
                        ucl_object_insert_key (obj, ucl_object_fromstring (row->from_addr),
@@ -1935,7 +1934,7 @@ rspamd_controller_scan_reply (struct rspamd_task *task)
        rspamd_http_connection_reset (conn_ent->conn);
        rspamd_http_router_insert_headers (conn_ent->rt, msg);
        rspamd_http_connection_write_message (conn_ent->conn, msg, NULL,
-                       "application/json", conn_ent, conn_ent->rt->ptv);
+                       "application/json", conn_ent, conn_ent->rt->timeout);
        conn_ent->is_reply = TRUE;
 }
 
@@ -2125,13 +2124,10 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent,
        }
 
        if (ctx->task_timeout > 0.0) {
-               struct timeval task_tv;
-
-               event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
-                               task);
-               event_base_set (ctx->event_loop, &task->timeout_ev);
-               double_to_tv (ctx->task_timeout, &task_tv);
-               event_add (&task->timeout_ev, &task_tv);
+               task->timeout_ev.data = task;
+               ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
+                               ctx->task_timeout, 0.0);
+               ev_timer_start (task->event_loop, &task->timeout_ev);
        }
 
 end:
@@ -2210,6 +2206,7 @@ rspamd_controller_handle_saveactions (
 
                switch (i) {
                case 0:
+               default:
                        act = METRIC_ACTION_REJECT;
                        break;
                case 1:
@@ -2404,7 +2401,7 @@ rspamd_controller_handle_savemap (struct rspamd_http_connection_entry *conn_ent,
 {
        struct rspamd_controller_session *session = conn_ent->ud;
        GList *cur;
-       struct rspamd_map *map;
+       struct rspamd_map *map = NULL;
        struct rspamd_map_backend *bk;
        struct rspamd_controller_worker_ctx *ctx;
        const rspamd_ftok_t *idstr;
@@ -2903,7 +2900,7 @@ rspamd_controller_handle_ping (struct rspamd_http_connection_entry *conn_ent,
                        NULL,
                        "text/plain",
                        conn_ent,
-                       conn_ent->rt->ptv);
+                       conn_ent->rt->timeout);
        conn_ent->is_reply = TRUE;
 
        return 0;
@@ -2937,7 +2934,7 @@ rspamd_controller_handle_unknown (struct rspamd_http_connection_entry *conn_ent,
                                NULL,
                                "text/plain",
                                conn_ent,
-                               conn_ent->rt->ptv);
+                               conn_ent->rt->timeout);
                conn_ent->is_reply = TRUE;
        }
        else {
@@ -2953,7 +2950,7 @@ rspamd_controller_handle_unknown (struct rspamd_http_connection_entry *conn_ent,
                                NULL,
                                "text/plain",
                                conn_ent,
-                               conn_ent->rt->ptv);
+                               conn_ent->rt->timeout);
                conn_ent->is_reply = TRUE;
        }
 
@@ -3077,9 +3074,9 @@ rspamd_controller_finish_handler (struct rspamd_http_connection_entry *conn_ent)
 }
 
 static void
-rspamd_controller_accept_socket (gint fd, short what, void *arg)
+rspamd_controller_accept_socket (EV_P_ ev_io *w, int revents)
 {
-       struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+       struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
        struct rspamd_controller_worker_ctx *ctx;
        struct rspamd_controller_session *session;
        rspamd_inet_addr_t *addr;
@@ -3088,7 +3085,8 @@ rspamd_controller_accept_socket (gint fd, short what, void *arg)
        ctx = worker->ctx;
 
        if ((nfd =
-               rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
+               rspamd_accept_from_socket (w->fd, &addr,
+                               rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
                msg_warn_ctx ("accept failed: %s", strerror (errno));
                return;
        }
@@ -3113,9 +3111,10 @@ rspamd_controller_accept_socket (gint fd, short what, void *arg)
 }
 
 static void
-rspamd_controller_rrd_update (gint fd, short what, void *arg)
+rspamd_controller_rrd_update (EV_P_ ev_timer *w, int revents)
 {
-       struct rspamd_controller_worker_ctx *ctx = arg;
+       struct rspamd_controller_worker_ctx *ctx =
+                       (struct rspamd_controller_worker_ctx *)w->data;
        struct rspamd_stat *stat;
        GArray ar;
        gdouble points[METRIC_ACTION_MAX];
@@ -3139,8 +3138,7 @@ rspamd_controller_rrd_update (gint fd, short what, void *arg)
        }
 
        /* Plan new event */
-       event_del (ctx->rrd_event);
-       evtimer_add (ctx->rrd_event, &rrd_update_time);
+       ev_timer_again (ctx->event_loop, &ctx->rrd_event);
 }
 
 static void
@@ -3278,11 +3276,13 @@ rspamd_controller_store_saved_stats (struct rspamd_controller_worker_ctx *ctx)
 }
 
 static void
-rspamd_controller_stats_save_periodic (int fd, short what, gpointer ud)
+rspamd_controller_stats_save_periodic (EV_P_ ev_timer *w, int revents)
 {
-       struct rspamd_controller_worker_ctx *ctx = ud;
+       struct rspamd_controller_worker_ctx *ctx =
+                       (struct rspamd_controller_worker_ctx *)w->data;
 
        rspamd_controller_store_saved_stats (ctx);
+       ev_timer_again (EV_A_ w);
 }
 
 static void
@@ -3375,7 +3375,7 @@ init_controller_worker (struct rspamd_config *cfg)
                        ctx,
                        G_STRUCT_OFFSET (struct rspamd_controller_worker_ctx,
                                        timeout),
-                       RSPAMD_CL_FLAG_TIME_INTEGER,
+                       RSPAMD_CL_FLAG_TIME_FLOAT,
                        "Protocol timeout");
 
        rspamd_rcl_register_worker_option (cfg,
@@ -3573,7 +3573,7 @@ rspamd_controller_on_terminate (struct rspamd_worker *worker)
 
        if (ctx->rrd) {
                msg_info ("closing rrd file: %s", ctx->rrd->filename);
-               event_del (ctx->rrd_event);
+               ev_timer_stop (ctx->event_loop, &ctx->rrd_event);
                rspamd_rrd_close (ctx->rrd);
        }
 
@@ -3694,16 +3694,14 @@ start_controller_worker (struct rspamd_worker *worker)
        GHashTableIter iter;
        gpointer key, value;
        guint i;
-       struct timeval stv;
-       const guint save_stats_interval = 60 * 1000; /* 1 minute */
+       const ev_tstamp save_stats_interval = 60; /* 1 minute */
        gpointer m;
 
        ctx->event_loop = rspamd_prepare_worker (worker,
                        "controller",
                        rspamd_controller_accept_socket);
-       msec_to_tv (ctx->timeout, &ctx->io_tv);
 
-       ctx->start_time = time (NULL);
+       ctx->start_time = ev_time ();
        ctx->worker = worker;
        ctx->cfg = worker->srv->cfg;
        ctx->srv = worker->srv;
@@ -3746,10 +3744,10 @@ start_controller_worker (struct rspamd_worker *worker)
                ctx->rrd = rspamd_rrd_file_default (ctx->cfg->rrd_file, &rrd_err);
 
                if (ctx->rrd) {
-                       ctx->rrd_event = g_malloc0 (sizeof (*ctx->rrd_event));
-                       evtimer_set (ctx->rrd_event, rspamd_controller_rrd_update, ctx);
-                       event_base_set (ctx->event_loop, ctx->rrd_event);
-                       event_add (ctx->rrd_event, &rrd_update_time);
+                       ctx->rrd_event.data = ctx;
+                       ev_timer_init (&ctx->rrd_event, rspamd_controller_rrd_update,
+                                       rrd_update_time, rrd_update_time);
+                       ev_timer_start (ctx->event_loop, &ctx->rrd_event);
                }
                else if (rrd_err) {
                        msg_err ("cannot load rrd from %s: %e", ctx->cfg->rrd_file,
@@ -3772,7 +3770,7 @@ start_controller_worker (struct rspamd_worker *worker)
        ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
                        ctx->cfg->ups_ctx);
        ctx->http = rspamd_http_router_new (rspamd_controller_error_handler,
-                       rspamd_controller_finish_handler, &ctx->io_tv,
+                       rspamd_controller_finish_handler, ctx->timeout,
                        ctx->static_files_dir, ctx->http_ctx);
 
        /* Add callbacks for different methods */
@@ -3903,12 +3901,11 @@ start_controller_worker (struct rspamd_worker *worker)
                                ctx->resolver, worker, TRUE);
 
                /* Schedule periodic stats saving, see #1823 */
-               event_set (&ctx->save_stats_event, -1, EV_PERSIST,
+               ctx->save_stats_event.data = ctx;
+               ev_timer_init (&ctx->save_stats_event,
                                rspamd_controller_stats_save_periodic,
-                               ctx);
-               event_base_set (ctx->event_loop, &ctx->save_stats_event);
-               msec_to_tv (save_stats_interval, &stv);
-               evtimer_add (&ctx->save_stats_event, &stv);
+                               save_stats_interval, save_stats_interval);
+               ev_timer_start (ctx->event_loop, &ctx->save_stats_event);
        }
        else {
                rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
@@ -3918,7 +3915,7 @@ start_controller_worker (struct rspamd_worker *worker)
        rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop, worker);
 
        /* Start event loop */
-       event_base_loop (ctx->event_loop, 0);
+       ev_loop (ctx->event_loop, 0);
        rspamd_worker_block_signals ();
 
        rspamd_stat_close ();
index 84ea1417a9eb1af1d9cec18a42d8262aef964085..3c92e05b9e1ac6bc9110b990014115d866225621 100644 (file)
@@ -316,13 +316,8 @@ rspamd_task_free (struct rspamd_task *task)
                        g_error_free (task->err);
                }
 
-               if (rspamd_event_pending (&task->timeout_ev, EV_TIMEOUT)) {
-                       event_del (&task->timeout_ev);
-               }
-
-               if (task->guard_ev) {
-                       event_del (task->guard_ev);
-               }
+               ev_timer_stop (task->event_loop, &task->timeout_ev);
+               ev_io_stop (task->event_loop, &task->guard_ev);
 
                if (task->sock != -1) {
                        close (task->sock);
index a73102424432dc926e6ccbf15a139475b09f1c2f..7b30f97cd0fad4bab4b5d409d26fb5e0284df76b 100644 (file)
@@ -200,7 +200,7 @@ struct rspamd_task {
        struct rspamd_dns_resolver *resolver;                   /**< DNS resolver                                                                       */
        struct ev_loop *event_loop;                                             /**< Event base                                                                         */
        struct ev_timer timeout_ev;                                             /**< Global task timeout                                                        */
-       struct ev_io *guard_ev;                                                 /**< Event for input sanity guard                                       */
+       struct ev_io guard_ev;                                                  /**< Event for input sanity guard                                       */
 
        gpointer checkpoint;                                                    /**< Opaque checkpoint data                                                     */
        ucl_object_t *settings;                                                 /**< Settings applied to task                                           */
index 0aa0c9cf32dc95b1bbfec5ace09a6c6512183989..70d349c2c4c1f7b81aaff6f5192bf812b5126351 100644 (file)
@@ -274,10 +274,6 @@ void
 rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base)
 {
        struct sigaction signals;
-       /* We ignore these signals in the worker */
-       rspamd_worker_ignore_signal (SIGPIPE);
-       rspamd_worker_ignore_signal (SIGALRM);
-       rspamd_worker_ignore_signal (SIGCHLD);
 
        /* A set of terminating signals */
        rspamd_worker_set_signal_handler (SIGTERM, worker, base,
@@ -298,11 +294,8 @@ rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base)
        sigaddset (&signals.sa_mask, SIGTERM);
        sigaddset (&signals.sa_mask, SIGINT);
        sigaddset (&signals.sa_mask, SIGHUP);
-       sigaddset (&signals.sa_mask, SIGCHLD);
        sigaddset (&signals.sa_mask, SIGUSR1);
        sigaddset (&signals.sa_mask, SIGUSR2);
-       sigaddset (&signals.sa_mask, SIGALRM);
-       sigaddset (&signals.sa_mask, SIGPIPE);
 
        sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
 }
@@ -345,6 +338,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
                        if (ls->fd != -1) {
                                accept_ev = g_malloc0 (sizeof (*accept_ev));
                                accept_ev->event_loop = event_loop;
+                               accept_ev->accept_ev.data = worker;
                                ev_io_init (&accept_ev->accept_ev, hdl, ls->fd, EV_READ);
                                ev_io_start (event_loop, &accept_ev->accept_ev);
 
index 4e0d806f99716b4bf5c088f04cd27d1d865a6738..5d8ccc065b6560d49484cf56b9ec757f40811712 100644 (file)
@@ -1039,9 +1039,7 @@ rspamd_redis_fin_learn (gpointer data)
 
        rt->has_event = FALSE;
        /* Stop timeout */
-       if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
-               event_del (&rt->timeout_event);
-       }
+       ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
 
        if (rt->redis) {
                redis = rt->redis;
@@ -1654,7 +1652,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
        struct upstream *up;
        struct upstream_list *ups;
        rspamd_inet_addr_t *addr;
-       struct timeval tv;
        rspamd_fstring_t *query;
        const gchar *redis_cmd;
        rspamd_token_t *tok;
index 8a42bdac121b94a9c55a92194e65ce70d91ad8ba..754d874a23c05fd80b796b56f22edcd3326b137c 100644 (file)
@@ -173,7 +173,6 @@ rspamadm_control (gint argc, gchar **argv, const struct rspamadm_command *_cmd)
        struct rspamd_http_connection *conn;
        struct rspamd_http_message *msg;
        rspamd_inet_addr_t *addr;
-       struct timeval tv;
        static struct rspamadm_control_cbdata cbdata;
 
        context = g_option_context_new (
@@ -239,16 +238,15 @@ rspamadm_control (gint argc, gchar **argv, const struct rspamadm_command *_cmd)
                        addr);
        msg = rspamd_http_new_message (HTTP_REQUEST);
        msg->url = rspamd_fstring_new_init (path, strlen (path));
-       double_to_tv (timeout, &tv);
 
        cbdata.argc = argc;
        cbdata.argv = argv;
        cbdata.path = path;
 
        rspamd_http_connection_write_message (conn, msg, NULL, NULL, &cbdata,
-                       &tv);
+                       timeout);
 
-       event_base_loop (rspamd_main->event_loop, 0);
+       ev_loop (rspamd_main->event_loop, 0);
 
        rspamd_http_connection_unref (conn);
        rspamd_inet_address_free (addr);
index 5a4adb325c7174f7e7f3cead5a686e925c48af88..ad0782b1716792abb608e2c7029a233f6a17c0e6 100644 (file)
@@ -42,7 +42,7 @@
 #include "lua/lua_common.h"
 
 /* 60 seconds for worker's IO */
-#define DEFAULT_WORKER_IO_TIMEOUT 60000
+#define DEFAULT_WORKER_IO_TIMEOUT 60.0
 
 gpointer init_worker (struct rspamd_config *cfg);
 void start_worker (struct rspamd_worker *worker);
@@ -73,11 +73,10 @@ static gboolean
 rspamd_worker_finalize (gpointer user_data)
 {
        struct rspamd_task *task = user_data;
-       struct timeval tv = {.tv_sec = 0, .tv_usec = 0};
 
        if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) {
                msg_info_task ("finishing actions has been processed, terminating");
-               event_base_loopexit (task->event_loop, &tv);
+               ev_break (task->event_loop, EVBREAK_ALL);
                rspamd_session_destroy (task->s);
 
                return TRUE;
@@ -137,9 +136,9 @@ reduce_tasks_count (gpointer arg)
 }
 
 void
-rspamd_task_timeout (gint fd, short what, gpointer ud)
+rspamd_task_timeout (EV_P_ ev_timer *w, int revents)
 {
-       struct rspamd_task *task = (struct rspamd_task *) ud;
+       struct rspamd_task *task = (struct rspamd_task *)w->data;
 
        if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) {
                msg_info_task ("processing of task timed out, forced processing");
@@ -176,32 +175,13 @@ rspamd_task_timeout (gint fd, short what, gpointer ud)
 }
 
 void
-rspamd_worker_guard_handler (gint fd, short what, void *data)
+rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents)
 {
-       struct rspamd_task *task = data;
+       struct rspamd_task *task = (struct rspamd_task *)w->data;
        gchar fake_buf[1024];
        gssize r;
 
-#ifdef EV_CLOSED
-       if (what == EV_CLOSED) {
-               if (!(task->flags & RSPAMD_TASK_FLAG_JSON) &&
-                               task->cfg->enable_shutdown_workaround) {
-                       msg_info_task ("workaround for shutdown enabled, please update "
-                                       "your client, this support might be removed in future");
-                       shutdown (fd, SHUT_RD);
-                       event_del (task->guard_ev);
-                       task->guard_ev = NULL;
-               }
-               else {
-                       msg_err_task ("the peer has closed connection unexpectedly");
-                       rspamd_session_destroy (task->s);
-               }
-
-               return;
-       }
-#endif
-
-       r = read (fd, fake_buf, sizeof (fake_buf));
+       r = read (w->fd, fake_buf, sizeof (fake_buf));
 
        if (r > 0) {
                msg_warn_task ("received extra data after task is loaded, ignoring");
@@ -218,9 +198,8 @@ rspamd_worker_guard_handler (gint fd, short what, void *data)
                                        task->cfg->enable_shutdown_workaround) {
                                msg_info_task ("workaround for shutdown enabled, please update "
                                                "your client, this support might be removed in future");
-                               shutdown (fd, SHUT_RD);
-                               event_del (task->guard_ev);
-                               task->guard_ev = NULL;
+                               shutdown (w->fd, SHUT_RD);
+                               ev_io_stop (task->event_loop, &task->guard_ev);
                        }
                        else {
                                msg_err_task ("the peer has closed connection unexpectedly");
@@ -245,8 +224,6 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
 {
        struct rspamd_task *task = (struct rspamd_task *) conn->ud;
        struct rspamd_worker_ctx *ctx;
-       struct timeval task_tv;
-       struct event *guard_ev;
 
        ctx = task->worker->ctx;
 
@@ -268,25 +245,16 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
 
        /* Set global timeout for the task */
        if (ctx->task_timeout > 0.0) {
-               event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
-                               task);
-               event_base_set (ctx->ev_base, &task->timeout_ev);
-               double_to_tv (ctx->task_timeout, &task_tv);
-               event_add (&task->timeout_ev, &task_tv);
+               task->timeout_ev.data = task;
+               ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
+                               ctx->task_timeout, 0.0);
+               ev_timer_start (task->event_loop, &task->timeout_ev);
        }
 
        /* Set socket guard */
-       guard_ev = rspamd_mempool_alloc (task->task_pool, sizeof (*guard_ev));
-#ifdef EV_CLOSED
-       event_set (guard_ev, task->sock, EV_READ|EV_PERSIST|EV_CLOSED,
-                               rspamd_worker_guard_handler, task);
-#else
-       event_set (guard_ev, task->sock, EV_READ|EV_PERSIST,
-                       rspamd_worker_guard_handler, task);
-#endif
-       event_base_set (task->event_loop, guard_ev);
-       event_add (guard_ev, NULL);
-       task->guard_ev = guard_ev;
+       task->guard_ev.data = task;
+       ev_io_init (&task->guard_ev, rspamd_worker_guard_handler, task->sock, EV_READ);
+       ev_io_start (task->event_loop, &task->guard_ev);
 
        rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
 
@@ -359,9 +327,9 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
  * Accept new connection and construct task
  */
 static void
-accept_socket (gint fd, short what, void *arg)
+accept_socket (EV_P_ ev_io *w, int revents)
 {
-       struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+       struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
        struct rspamd_worker_ctx *ctx;
        struct rspamd_task *task;
        rspamd_inet_addr_t *addr;
@@ -377,7 +345,8 @@ accept_socket (gint fd, short what, void *arg)
        }
 
        if ((nfd =
-               rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
+               rspamd_accept_from_socket (w->fd, &addr,
+                               rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
                msg_warn_ctx ("accept failed: %s", strerror (errno));
                return;
        }
@@ -386,7 +355,7 @@ accept_socket (gint fd, short what, void *arg)
                return;
        }
 
-       task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->ev_base);
+       task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->event_loop);
 
        msg_info_task ("accepted connection from %s port %d, task ptr: %p",
                rspamd_inet_address_to_string (addr),
@@ -435,7 +404,7 @@ accept_socket (gint fd, short what, void *arg)
 
        rspamd_http_connection_read_message (task->http_conn,
                        task,
-                       &ctx->io_tv);
+                       ctx->timeout);
 }
 
 #ifdef WITH_HYPERSCAN
@@ -587,7 +556,7 @@ init_worker (struct rspamd_config *cfg)
                        ctx,
                        G_STRUCT_OFFSET (struct rspamd_worker_ctx,
                                                timeout),
-                       RSPAMD_CL_FLAG_TIME_INTEGER,
+                       RSPAMD_CL_FLAG_TIME_FLOAT,
                        "Protocol IO timeout");
 
        rspamd_rcl_register_worker_option (cfg,
@@ -672,9 +641,8 @@ start_worker (struct rspamd_worker *worker)
        struct rspamd_worker_ctx *ctx = worker->ctx;
 
        ctx->cfg = worker->srv->cfg;
-       ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket);
-       msec_to_tv (ctx->timeout, &ctx->io_tv);
-       rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->ev_base,
+       ctx->event_loop = rspamd_prepare_worker (worker, "normal", accept_socket);
+       rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->event_loop,
                        worker);
 
        if (isnan (ctx->task_timeout)) {
@@ -687,20 +655,20 @@ start_worker (struct rspamd_worker *worker)
        }
 
        ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
-                       ctx->ev_base,
+                       ctx->event_loop,
                        worker->srv->cfg);
-       rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver, worker, 0);
+       rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0);
        rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
-                       ctx->ev_base, ctx->resolver->r);
+                       ctx->event_loop, ctx->resolver->r);
 
-       ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->ev_base,
+       ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
                        ctx->cfg->ups_ctx);
-       rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver,
+       rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
                        &ctx->lang_det);
-       rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->ev_base,
+       rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
                        worker);
 
-       event_base_loop (ctx->ev_base, 0);
+       ev_loop (ctx->event_loop, 0);
        rspamd_worker_block_signals ();
 
        rspamd_stat_close ();
index 35a2b465be798cb123fdde8e28dd52082be9077e..6d0e763aa39a23c17754a4c73e30642aac093285 100644 (file)
@@ -30,14 +30,13 @@ struct rspamd_lang_detector;
 struct rspamd_worker_ctx {
        guint64 magic;
        /* Events base */
-       struct ev_loop *ev_base;
+       struct ev_loop *event_loop;
        /* DNS resolver */
        struct rspamd_dns_resolver *resolver;
        /* Config */
        struct rspamd_config *cfg;
 
-       guint32 timeout;
-       struct timeval io_tv;
+       ev_tstamp timeout;
        /* Detect whether this worker is mime worker    */
        gboolean is_mime;
        /* Allow encrypted requests only using network */
@@ -45,7 +44,7 @@ struct rspamd_worker_ctx {
        /* Limit of tasks */
        guint32 max_tasks;
        /* Maximum time for task processing */
-       gdouble task_timeout;
+       ev_tstamp task_timeout;
        /* Encryption key */
        struct rspamd_cryptobox_keypair *key;
        /* Keys cache */
@@ -64,11 +63,11 @@ void rspamd_worker_init_scanner (struct rspamd_worker *worker,
 /*
  * Called on forced timeout
  */
-void rspamd_task_timeout (gint fd, short what, gpointer ud);
+void rspamd_task_timeout (EV_P_ ev_timer *w, int revents);
 
 /*
  * Called on unexpected IO error (e.g. ECONNRESET)
  */
-void rspamd_worker_guard_handler (gint fd, short what, void *data);
+void rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents);
 
 #endif