]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Make it compileable again...
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 19 Jun 2019 17:23:46 +0000 (18:23 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 22 Jun 2019 09:57:29 +0000 (10:57 +0100)
CMakeLists.txt
contrib/libev/CMakeLists.txt
src/client/rspamc.c
src/hs_helper.c
src/libutil/upstream.c
src/lua/lua_config.c
src/rspamadm/control.c
src/rspamadm/lua_repl.c
src/rspamadm/rspamadm.c
src/rspamd_proxy.c

index d5f37900a815ec37bc52b534feb59436f503648b..fec771663b828f109309878cb1855351446ca7d3 100644 (file)
@@ -864,11 +864,6 @@ CHECK_INCLUDE_FILES(readpassphrase.h HAVE_READPASSPHRASE_H)
 CHECK_INCLUDE_FILES(termios.h HAVE_TERMIOS_H)
 CHECK_INCLUDE_FILES(paths.h HAVE_PATHS_H)
 CHECK_INCLUDE_FILES(ctype.h HAVE_CTYPE_H)
-CHECK_INCLUDE_FILES(sys/sendfile.h HAVE_SYS_SENDFILE_H)
-CHECK_INCLUDE_FILES(linux/falloc.h HAVE_LINUX_FALLOC_H)
-CHECK_INCLUDE_FILES(sys/eventfd.h HAVE_SYS_EVENTFD_H)
-CHECK_INCLUDE_FILES(aio.h HAVE_AIO_H)
-CHECK_INCLUDE_FILES(libaio.h HAVE_LIBAIO_H)
 CHECK_INCLUDE_FILES(unistd.h HAVE_UNISTD_H)
 CHECK_INCLUDE_FILES(cpuid.h HAVE_CPUID_H)
 CHECK_INCLUDE_FILES(dirent.h HAVE_DIRENT_H)
@@ -1185,7 +1180,6 @@ LIST(APPEND RSPAMD_REQUIRED_LIBRARIES "${LUA_LIBRARY}")
 LIST(APPEND RSPAMD_REQUIRED_LIBRARIES ucl)
 LIST(APPEND RSPAMD_REQUIRED_LIBRARIES rdns)
 LIST(APPEND RSPAMD_REQUIRED_LIBRARIES ottery)
-LIST(APPEND RSPAMD_REQUIRED_LIBRARIES event)
 LIST(APPEND RSPAMD_REQUIRED_LIBRARIES xxhash)
 
 IF(GLIB_COMPAT)
@@ -1219,6 +1213,7 @@ LIST(APPEND RSPAMD_REQUIRED_LIBRARIES rspamd-hiredis)
 
 LIST(APPEND RSPAMD_REQUIRED_LIBRARIES rspamd-actrie)
 LIST(APPEND RSPAMD_REQUIRED_LIBRARIES rspamd-t1ha)
+LIST(APPEND RSPAMD_REQUIRED_LIBRARIES rspamd-ev)
 
 IF(ENABLE_CLANG_PLUGIN MATCHES "ON")
        ADD_SUBDIRECTORY(clang-plugin)
index c99b4dd3245c8341ddcae6c1f2d6fa1aa963749d..d363c3dbc8c32f55396f9b11ea8c4585de1b0d4a 100644 (file)
@@ -55,7 +55,7 @@ ENDIF()
 
 CONFIGURE_FILE(config.h.in libev-config.h)
 
-ADD_LIBRARY(rspamd-libev STATIC ${LIBEVSRC})
+ADD_LIBRARY(rspamd-ev STATIC ${LIBEVSRC})
 ADD_DEFINITIONS("-DEV_CONFIG_H=\"${CMAKE_CURRENT_BINARY_DIR}/libev-config.h\""
                -DEV_MULTIPLICITY=1
                -DEV_USE_FLOOR=1
index 3bfc785d84d0e3a6521c3d8b33637fc23f97cfc4..cc339ef7a99fd05b7599b6ccb8d665bcaa4beddd 100644 (file)
@@ -1829,7 +1829,7 @@ rspamc_process_dir (struct ev_loop *ev_base, struct rspamc_command *cmd,
                                if (cur_req >= max_requests) {
                                        cur_req = 0;
                                        /* Wait for completion */
-                                       event_base_loop (ev_base, 0);
+                                       ev_loop (ev_base, 0);
                                }
                        }
                }
@@ -1840,7 +1840,7 @@ rspamc_process_dir (struct ev_loop *ev_base, struct rspamc_command *cmd,
        }
 
        closedir (d);
-       event_base_loop (ev_base, 0);
+       ev_loop (ev_base, 0);
 }
 
 
@@ -1863,7 +1863,7 @@ main (gint argc, gchar **argv, gchar **env)
        GPid cld;
        struct rspamc_command *cmd;
        FILE *in = NULL;
-       struct ev_loop *ev_base;
+       struct ev_loop *event_loop;
        struct stat st;
        struct sigaction sigpipe_act;
        gchar **exclude_pattern;
@@ -1884,6 +1884,7 @@ main (gint argc, gchar **argv, gchar **env)
        npatterns = 0;
 
        while (exclude_pattern && *exclude_pattern) {
+               exclude_pattern ++;
                npatterns ++;
        }
 
@@ -1902,7 +1903,7 @@ main (gint argc, gchar **argv, gchar **env)
        }
 
        rspamd_init_libs ();
-       ev_base = event_base_new ();
+       event_loop = ev_default_loop (EVFLAG_SIGNALFD);
 
        struct rspamd_http_context_cfg http_config;
 
@@ -1911,7 +1912,7 @@ main (gint argc, gchar **argv, gchar **env)
        http_config.kp_cache_size_server = 0;
        http_config.user_agent = user_agent;
        http_ctx = rspamd_http_context_create_config (&http_config,
-                       ev_base, NULL);
+                       event_loop, NULL);
 
        /* Ignore sigpipe */
        sigemptyset (&sigpipe_act.sa_mask);
@@ -1972,10 +1973,10 @@ main (gint argc, gchar **argv, gchar **env)
        if (start_argc == argc) {
                /* Do command without input or with stdin */
                if (empty_input) {
-                       rspamc_process_input (ev_base, cmd, NULL, "empty", kwattrs);
+                       rspamc_process_input (event_loop, cmd, NULL, "empty", kwattrs);
                }
                else {
-                       rspamc_process_input (ev_base, cmd, in, "stdin", kwattrs);
+                       rspamc_process_input (event_loop, cmd, in, "stdin", kwattrs);
                }
        }
        else {
@@ -1990,7 +1991,7 @@ main (gint argc, gchar **argv, gchar **env)
                                }
                                if (S_ISDIR (st.st_mode)) {
                                        /* Directories are processed with a separate limit */
-                                       rspamc_process_dir (ev_base, cmd, argv[i], kwattrs);
+                                       rspamc_process_dir (event_loop, cmd, argv[i], kwattrs);
                                        cur_req = 0;
                                }
                                else {
@@ -1999,24 +2000,24 @@ main (gint argc, gchar **argv, gchar **env)
                                                fprintf (stderr, "cannot open file %s\n", argv[i]);
                                                exit (EXIT_FAILURE);
                                        }
-                                       rspamc_process_input (ev_base, cmd, in, argv[i], kwattrs);
+                                       rspamc_process_input (event_loop, cmd, in, argv[i], kwattrs);
                                        cur_req++;
                                        fclose (in);
                                }
                                if (cur_req >= max_requests) {
                                        cur_req = 0;
                                        /* Wait for completion */
-                                       event_base_loop (ev_base, 0);
+                                       ev_loop (event_loop, 0);
                                }
                        }
                }
 
                if (cmd->cmd == RSPAMC_COMMAND_FUZZY_DELHASH) {
-                       rspamc_process_input (ev_base, cmd, NULL, "hashes", kwattrs);
+                       rspamc_process_input (event_loop, cmd, NULL, "hashes", kwattrs);
                }
        }
 
-       event_base_loop (ev_base, 0);
+       ev_loop (event_loop, 0);
 
        g_queue_free_full (kwattrs, rspamc_kwattr_free);
 
index 9f0a0ab33996b2b0cc9b08a9fad9429f64d3034a..f83a9d4292752f13954da468e11f0ceb866708bf 100644 (file)
@@ -47,7 +47,7 @@ static const guint64 rspamd_hs_helper_magic = 0x22d310157a2288a0ULL;
 struct hs_helper_ctx {
        guint64 magic;
        /* Events base */
-       struct ev_loop *ev_base;
+       struct ev_loop *event_loop;
        /* DNS resolver */
        struct rspamd_dns_resolver *resolver;
        /* Config */
@@ -57,7 +57,7 @@ struct hs_helper_ctx {
        gboolean loaded;
        gdouble max_time;
        gdouble recompile_time;
-       struct event recompile_timer;
+       ev_timer recompile_timer;
 };
 
 static gpointer
@@ -216,7 +216,7 @@ rspamd_rs_compile (struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
         * XXX: now we just sleep for 5 seconds to ensure that
         */
        if (!ctx->loaded) {
-               sleep (5);
+               ev_sleep (5.0);
                ctx->loaded = TRUE;
        }
 
@@ -226,7 +226,7 @@ rspamd_rs_compile (struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
                        sizeof (srv_cmd.cmd.hs_loaded.cache_dir));
        srv_cmd.cmd.hs_loaded.forced = forced;
 
-       rspamd_srv_send_command (worker, ctx->ev_base, &srv_cmd, -1, NULL, NULL);
+       rspamd_srv_send_command (worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL);
 
        return TRUE;
 }
@@ -258,26 +258,23 @@ rspamd_hs_helper_reload (struct rspamd_main *rspamd_main,
 }
 
 static void
-rspamd_hs_helper_timer (gint fd, short what, gpointer ud)
+rspamd_hs_helper_timer (EV_P_ ev_timer *w, int revents)
 {
-       struct rspamd_worker *worker = ud;
+       struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
        struct hs_helper_ctx *ctx;
-       struct timeval tv;
        double tim;
 
        ctx = worker->ctx;
        tim = rspamd_time_jitter (ctx->recompile_time, 0);
-       double_to_tv (tim, &tv);
-       event_del (&ctx->recompile_timer);
+       w->repeat = tim;
        rspamd_rs_compile (ctx, worker, FALSE);
-       event_add (&ctx->recompile_timer, &tv);
+       ev_timer_again (EV_A_ w);
 }
 
 static void
 start_hs_helper (struct rspamd_worker *worker)
 {
        struct hs_helper_ctx *ctx = worker->ctx;
-       struct timeval tv;
        double tim;
 
        ctx->cfg = worker->srv->cfg;
@@ -289,7 +286,7 @@ start_hs_helper (struct rspamd_worker *worker)
                ctx->hs_dir = RSPAMD_DBDIR "/";
        }
 
-       ctx->ev_base = rspamd_prepare_worker (worker,
+       ctx->event_loop = rspamd_prepare_worker (worker,
                        "hs_helper",
                        NULL);
 
@@ -301,13 +298,12 @@ start_hs_helper (struct rspamd_worker *worker)
        rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_RECOMPILE,
                        rspamd_hs_helper_reload, ctx);
 
-       event_set (&ctx->recompile_timer, -1, EV_TIMEOUT, rspamd_hs_helper_timer,
-                       worker);
-       event_base_set (ctx->ev_base, &ctx->recompile_timer);
+       ctx->recompile_timer.data = worker;
        tim = rspamd_time_jitter (ctx->recompile_time, 0);
-       double_to_tv (tim, &tv);
-       event_add (&ctx->recompile_timer, &tv);
-       event_base_loop (ctx->ev_base, 0);
+       ev_timer_init (&ctx->recompile_timer, rspamd_hs_helper_timer, tim, 0.0);
+       ev_timer_start (ctx->event_loop, &ctx->recompile_timer);
+
+       ev_loop (ctx->event_loop, 0);
        rspamd_worker_block_signals ();
 
        rspamd_log_close (worker->srv->logger, TRUE);
index 263f52511508c4779cf40446fdf09f218ca12c29..c445751b44afd49f90bc33e9bfd3abb42f90921a 100644 (file)
@@ -913,9 +913,7 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
        /* Here the upstreams list is already locked */
        RSPAMD_UPSTREAM_LOCK (up->lock);
 
-       if (rspamd_event_pending (&up->ev, EV_TIMEOUT)) {
-               event_del (&up->ev);
-       }
+       ev_timer_stop (up->ctx->event_loop, &up->ev);
        g_ptr_array_add (ups->alive, up);
        up->active_idx = ups->alive->len - 1;
        RSPAMD_UPSTREAM_UNLOCK (up->lock);
index 05c38ff32d1497ac6f6ebe687de7a1266627a013..9f7952cc3ad522dbbddd3ef1de68f498edcbc31d 100644 (file)
@@ -3050,7 +3050,7 @@ static void lua_periodic_callback_finish (struct thread_entry *thread, int ret);
 static void lua_periodic_callback_error (struct thread_entry *thread, int ret, const char *msg);
 
 struct rspamd_lua_periodic {
-       struct ev_loop *ev_base;
+       struct ev_loop *event_loop;
        struct rspamd_config *cfg;
        lua_State *L;
        gdouble timeout;
@@ -3082,7 +3082,7 @@ lua_periodic_callback (struct ev_loop *loop, ev_timer *w, int revents)
        *pcfg = cfg;
        pev_base = lua_newuserdata (L, sizeof (*pev_base));
        rspamd_lua_setclass (L, "rspamd{ev_base}", -1);
-       *pev_base = periodic->ev_base;
+       *pev_base = periodic->event_loop;
 
        lua_thread_call (thread, 2);
 }
@@ -3097,6 +3097,7 @@ lua_periodic_callback_finish (struct thread_entry *thread, int ret)
 
        L = thread->lua_state;
 
+       ev_now_update (periodic->event_loop);
 #ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
        event_base_update_cache_time (periodic->ev_base);
 #endif
@@ -3119,11 +3120,11 @@ lua_periodic_callback_finish (struct thread_entry *thread, int ret)
                }
 
                periodic->ev.repeat = timeout;
-               ev_timer_again (periodic->ev_base, &periodic->ev);
+               ev_timer_again (periodic->event_loop, &periodic->ev);
        }
        else {
                luaL_unref (L, LUA_REGISTRYINDEX, periodic->cbref);
-               ev_timer_stop (periodic->ev_base, &periodic->ev);
+               ev_timer_stop (periodic->event_loop, &periodic->ev);
                g_free (periodic);
        }
 }
@@ -3163,7 +3164,7 @@ lua_config_add_periodic (lua_State *L)
        periodic->timeout = timeout;
        periodic->L = L;
        periodic->cfg = cfg;
-       periodic->ev_base = ev_base;
+       periodic->event_loop = ev_base;
        periodic->need_jitter = need_jitter;
        lua_pushvalue (L, 4);
        periodic->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
index 754d874a23c05fd80b796b56f22edcd3326b137c..0aa995abf838198da27b8feed5f486322167bcb7 100644 (file)
@@ -111,7 +111,6 @@ rspamd_control_finish_handler (struct rspamd_http_connection *conn,
        const gchar *body;
        gsize body_len;
        struct rspamadm_control_cbdata *cbdata = conn->ud;
-       struct timeval exit_tv;
 
        body = rspamd_http_message_get_body (msg, &body_len);
        parser = ucl_parser_new (0);
@@ -157,9 +156,7 @@ rspamd_control_finish_handler (struct rspamd_http_connection *conn,
        }
 
 end:
-       exit_tv.tv_sec = 0;
-       exit_tv.tv_usec = 0;
-       event_base_loopexit (rspamd_main->event_loop, &exit_tv);
+       ev_break (rspamd_main->event_loop, EVBREAK_ALL);
 
        return 0;
 }
index a95521bdbb0b7f2cb29ab3e4542857c3a2e5d85c..46fc342eafa896cd5f266cc1a1e62ad1b811fb44 100644 (file)
@@ -515,7 +515,7 @@ rspamadm_lua_run_repl (lua_State *L)
 {
        gchar *input;
        gboolean is_multiline = FALSE;
-       GString *tb;
+       GString *tb = NULL;
        guint i;
 
        for (;;) {
@@ -591,15 +591,16 @@ struct rspamadm_lua_repl_session {
 };
 
 static void
-rspamadm_lua_accept_cb (gint fd, short what, void *arg)
+rspamadm_lua_accept_cb (EV_P_ ev_io *w, int revents)
 {
-       struct rspamadm_lua_repl_context *ctx = arg;
+       struct rspamadm_lua_repl_context *ctx =
+                       (struct rspamadm_lua_repl_context *)w->data;
        rspamd_inet_addr_t *addr;
        struct rspamadm_lua_repl_session *session;
        gint nfd;
 
        if ((nfd =
-                       rspamd_accept_from_socket (fd, &addr, NULL)) == -1) {
+                       rspamd_accept_from_socket (w->fd, &addr, NULL, NULL)) == -1) {
                rspamd_fprintf (stderr, "accept failed: %s", strerror (errno));
                return;
        }
@@ -808,7 +809,7 @@ rspamadm_lua (gint argc, gchar **argv, const struct rspamadm_command *cmd)
                ctx = g_malloc0  (sizeof (*ctx));
                http = rspamd_http_router_new (rspamadm_lua_error_handler,
                                                rspamadm_lua_finish_handler,
-                                               NULL,
+                                               0.0,
                                                NULL,
                                                rspamd_main->http_ctx);
                ctx->L = L;
@@ -822,19 +823,17 @@ rspamadm_lua (gint argc, gchar **argv, const struct rspamadm_command *cmd)
 
                        fd = rspamd_inet_address_listen (addr, SOCK_STREAM, TRUE);
                        if (fd != -1) {
-                               struct event *ev;
+                               static ev_io ev;
 
-                               ev = g_malloc0 (sizeof (*ev));
-                               event_set (ev, fd, EV_READ|EV_PERSIST, rspamadm_lua_accept_cb,
-                                               ctx);
-                               event_base_set (ev_base, ev);
-                               event_add (ev, NULL);
+                               ev.data = ctx;
+                               ev_io_init (&ev, rspamadm_lua_accept_cb, fd, EV_READ);
+                               ev_io_start (ev_base, &ev);
                                rspamd_printf ("listen on %s\n",
                                                rspamd_inet_address_to_string_pretty (addr));
                        }
                }
 
-               event_base_loop (ev_base, 0);
+               ev_loop (ev_base, 0);
 
                exit (EXIT_SUCCESS);
        }
index 4320c24607565056db0cea0b2654f0653c7e6123..5908f77f98720854b0f32ca1ec17be11fb809078 100644 (file)
@@ -379,15 +379,6 @@ main (gint argc, gchar **argv, gchar **env)
        rspamd_main->server_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
                        "rspamadm");
 
-#ifdef HAVE_EVENT_NO_CACHE_TIME_FLAG
-       struct event_config *ev_cfg;
-       ev_cfg = event_config_new ();
-       event_config_set_flag (ev_cfg, EVENT_BASE_FLAG_NO_CACHE_TIME);
-       rspamd_main->ev_base = event_base_new_with_config (ev_cfg);
-#else
-       rspamd_main->event_loop = event_init ();
-#endif
-
        rspamadm_fill_internal_commands (all_commands);
        help_command.command_data = all_commands;
 
@@ -565,10 +556,8 @@ main (gint argc, gchar **argv, gchar **env)
                cmd->run (0, NULL, cmd);
        }
 
-       event_base_loopexit (rspamd_main->event_loop, NULL);
-#ifdef HAVE_EVENT_NO_CACHE_TIME_FLAG
-       event_config_free (ev_cfg);
-#endif
+       ev_break (rspamd_main->event_loop, EVBREAK_ALL);
+
 
        REF_RELEASE (rspamd_main->cfg);
        rspamd_log_close (rspamd_main->logger, TRUE);
index 5907a7ba4779764722ab72945a46abd3fc6fd1df..b14bd086aa6fbb42f29285561d758549b18e3cd1 100644 (file)
@@ -86,7 +86,6 @@ struct rspamd_http_upstream {
        struct upstream_list *u;
        struct rspamd_cryptobox_pubkey *key;
        gdouble timeout;
-       struct timeval io_tv;
        gint parser_from_ref;
        gint parser_to_ref;
        gboolean local;
@@ -101,7 +100,6 @@ struct rspamd_http_mirror {
        struct rspamd_cryptobox_pubkey *key;
        gdouble prob;
        gdouble timeout;
-       struct timeval io_tv;
        gint parser_from_ref;
        gint parser_to_ref;
        gboolean local;
@@ -113,14 +111,13 @@ static const guint64 rspamd_rspamd_proxy_magic = 0xcdeb4fd1fc351980ULL;
 struct rspamd_proxy_ctx {
        guint64 magic;
        /* Events base */
-       struct ev_loop *ev_base;
+       struct ev_loop *event_loop;
        /* DNS resolver */
        struct rspamd_dns_resolver *resolver;
        /* Config */
        struct rspamd_config *cfg;
        /* END OF COMMON PART */
        gdouble timeout;
-       struct timeval io_tv;
        /* Encryption key for clients */
        struct rspamd_cryptobox_keypair *key;
        /* HTTP context */
@@ -174,8 +171,8 @@ struct rspamd_proxy_backend_connection {
        ucl_object_t *results;
        const gchar *err;
        struct rspamd_proxy_session *s;
-       struct timeval *io_tv;
        gint backend_sock;
+       ev_tstamp timeout;
        enum rspamd_backend_flags flags;
        gint parser_from_ref;
        gint parser_to_ref;
@@ -464,8 +461,6 @@ rspamd_proxy_parse_upstream (rspamd_mempool_t *pool,
                rspamd_lua_add_ref_dtor (L, pool, up->parser_to_ref);
        }
 
-       double_to_tv (up->timeout, &up->io_tv);
-
        g_hash_table_insert (ctx->upstreams, up->name, up);
 
        return TRUE;
@@ -617,8 +612,6 @@ rspamd_proxy_parse_mirror (rspamd_mempool_t *pool,
                up->settings_id = rspamd_mempool_strdup (pool, ucl_object_tostring (elt));
        }
 
-       double_to_tv (up->timeout, &up->io_tv);
-
        g_ptr_array_add (ctx->mirrors, up);
 
        return TRUE;
@@ -1144,8 +1137,6 @@ proxy_request_decompress (struct rspamd_http_message *msg)
                rspamd_http_message_set_body_from_fstring_steal (msg, body);
                rspamd_http_message_remove_header (msg, "Compression");
        }
-
-       return;
 }
 
 static struct rspamd_proxy_session *
@@ -1350,7 +1341,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session)
                                sizeof (*bk_conn));
                bk_conn->s = session;
                bk_conn->name = m->name;
-               bk_conn->io_tv = &m->io_tv;
+               bk_conn->timeout = m->timeout;
 
                bk_conn->up = rspamd_upstream_get (m->u,
                                RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
@@ -1415,7 +1406,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session)
                        msg->method = HTTP_GET;
                        rspamd_http_connection_write_message_shared (bk_conn->backend_conn,
                                        msg, NULL, NULL, bk_conn,
-                                       bk_conn->io_tv);
+                                       bk_conn->timeout);
                }
                else {
                        if (session->fname) {
@@ -1442,7 +1433,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session)
 
                        rspamd_http_connection_write_message (bk_conn->backend_conn,
                                        msg, NULL, NULL, bk_conn,
-                                       bk_conn->io_tv);
+                                       bk_conn->timeout);
                }
 
                g_ptr_array_add (session->mirror_conns, bk_conn);
@@ -1468,7 +1459,7 @@ proxy_client_write_error (struct rspamd_proxy_session *session, gint code,
                reply->status = rspamd_fstring_new_init (status, strlen (status));
                rspamd_http_connection_write_message (session->client_conn,
                                reply, NULL, NULL, session,
-                               &session->ctx->io_tv);
+                               session->ctx->timeout);
        }
 }
 
@@ -1566,7 +1557,7 @@ proxy_backend_master_finish_handler (struct rspamd_http_connection *conn,
        else {
                rspamd_http_connection_write_message (session->client_conn,
                                msg, NULL, NULL, session,
-                               bk_conn->io_tv);
+                               bk_conn->timeout);
        }
 
        return 0;
@@ -1625,7 +1616,7 @@ rspamd_proxy_scan_self_reply (struct rspamd_task *task)
                                NULL,
                                ctype,
                                session,
-                               NULL);
+                               0);
        }
 }
 
@@ -1666,7 +1657,7 @@ rspamd_proxy_self_scan (struct rspamd_proxy_session *session)
        msg = session->client_message;
        task = rspamd_task_new (session->worker, session->ctx->cfg,
                        session->pool, session->ctx->lang_det,
-                       session->ctx->ev_base);
+                       session->ctx->event_loop);
        task->flags |= RSPAMD_TASK_FLAG_MIME;
        task->sock = -1;
 
@@ -1711,23 +1702,18 @@ rspamd_proxy_self_scan (struct rspamd_proxy_session *session)
 
        /* Set global timeout for the task */
        if (session->ctx->default_upstream->timeout > 0.0) {
-               struct timeval task_tv;
+               task->timeout_ev.data = task;
+               ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
+                               session->ctx->default_upstream->timeout, 0.0);
+               ev_timer_start (task->event_loop, &task->timeout_ev);
 
-               event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
-                               task);
-               event_base_set (session->ctx->ev_base, &task->timeout_ev);
-               double_to_tv (session->ctx->default_upstream->timeout, &task_tv);
-               event_add (&task->timeout_ev, &task_tv);
        }
        else if (session->ctx->has_self_scan) {
                if (session->ctx->cfg->task_timeout > 0) {
-                       struct timeval task_tv;
-
-                       event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
-                                       task);
-                       event_base_set (session->ctx->ev_base, &task->timeout_ev);
-                       double_to_tv (session->ctx->cfg->task_timeout, &task_tv);
-                       event_add (&task->timeout_ev, &task_tv);
+                       task->timeout_ev.data = task;
+                       ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
+                                       session->ctx->cfg->task_timeout, 0.0);
+                       ev_timer_start (task->event_loop, &task->timeout_ev);
                }
        }
 
@@ -1783,7 +1769,7 @@ retry:
 
                session->master_conn->up = rspamd_upstream_get (backend->u,
                                RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
-               session->master_conn->io_tv = &backend->io_tv;
+               session->master_conn->timeout = backend->timeout;
 
                if (session->master_conn->up == NULL) {
                        msg_err_session ("cannot select upstream for %s",
@@ -1853,7 +1839,7 @@ retry:
                        rspamd_http_connection_write_message_shared (
                                        session->master_conn->backend_conn,
                                        msg, NULL, NULL, session->master_conn,
-                                       session->master_conn->io_tv);
+                                       session->master_conn->timeout);
                }
                else {
                        if (session->fname) {
@@ -1881,7 +1867,7 @@ retry:
                        rspamd_http_connection_write_message (
                                        session->master_conn->backend_conn,
                                        msg, NULL, NULL, session->master_conn,
-                                       session->master_conn->io_tv);
+                                       session->master_conn->timeout);
                }
        }
 
@@ -2031,9 +2017,9 @@ proxy_milter_error_handler (gint fd,
 }
 
 static void
-proxy_accept_socket (gint fd, short what, void *arg)
+proxy_accept_socket (EV_P_ ev_io *w, int revents)
 {
-       struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+       struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
        struct rspamd_proxy_ctx *ctx;
        rspamd_inet_addr_t *addr;
        struct rspamd_proxy_session *session;
@@ -2042,7 +2028,8 @@ proxy_accept_socket (gint fd, short what, void *arg)
        ctx = worker->ctx;
 
        if ((nfd =
-               rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
+               rspamd_accept_from_socket (w->fd, &addr,
+                               rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
                msg_warn ("accept failed: %s", strerror (errno));
                return;
        }
@@ -2086,7 +2073,7 @@ proxy_accept_socket (gint fd, short what, void *arg)
 
                rspamd_http_connection_read_message_shared (session->client_conn,
                                session,
-                               &ctx->io_tv);
+                               session->ctx->timeout);
        }
        else {
                msg_info_session ("accepted milter connection from %s port %d",
@@ -2110,9 +2097,9 @@ proxy_accept_socket (gint fd, short what, void *arg)
                }
 #endif
 
-               rspamd_milter_handle_socket (nfd, NULL,
+               rspamd_milter_handle_socket (nfd, 0.0,
                                session->pool,
-                               ctx->ev_base,
+                               ctx->event_loop,
                                proxy_milter_finish_handler,
                                proxy_milter_error_handler,
                                session);
@@ -2153,30 +2140,29 @@ start_rspamd_proxy (struct rspamd_worker *worker)
        struct rspamd_proxy_ctx *ctx = worker->ctx;
 
        ctx->cfg = worker->srv->cfg;
-       ctx->ev_base = rspamd_prepare_worker (worker, "rspamd_proxy",
+       ctx->event_loop = rspamd_prepare_worker (worker, "rspamd_proxy",
                        proxy_accept_socket);
 
        ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
-                       ctx->ev_base,
+                       ctx->event_loop,
                        worker->srv->cfg);
-       double_to_tv (ctx->timeout, &ctx->io_tv);
-       rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver, worker, 0);
+       rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0);
 
        rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
-                       ctx->ev_base, ctx->resolver->r);
+                       ctx->event_loop, ctx->resolver->r);
 
-       ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->ev_base,
+       ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
                        ctx->cfg->ups_ctx);
 
        if (ctx->has_self_scan) {
                /* Additional initialisation needed */
-               rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver,
+               rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
                                &ctx->lang_det);
        }
 
        if (worker->srv->cfg->enable_sessions_cache) {
                ctx->sessions_cache = rspamd_worker_session_cache_new (worker,
-                               ctx->ev_base);
+                               ctx->event_loop);
        }
 
        ctx->milter_ctx.spam_header = ctx->spam_header;
@@ -2188,11 +2174,11 @@ start_rspamd_proxy (struct rspamd_worker *worker)
        ctx->milter_ctx.cfg = ctx->cfg;
        rspamd_milter_init_library (&ctx->milter_ctx);
 
-       rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->ev_base,
+       rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
                        worker);
        adjust_upstreams_limits (ctx);
 
-       event_base_loop (ctx->ev_base, 0);
+       ev_loop (ctx->event_loop, 0);
        rspamd_worker_block_signals ();
 
        if (ctx->has_self_scan) {