]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Adopt Lua API
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 17 Jun 2019 15:25:07 +0000 (16:25 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 22 Jun 2019 09:57:29 +0000 (10:57 +0100)
src/lua/lua_config.c
src/lua/lua_http.c
src/lua/lua_redis.c
src/lua/lua_tcp.c
src/lua/lua_udp.c
src/lua/lua_util.c
src/lua/lua_worker.c
src/rspamd.h
src/worker.c

index 196f12e2c2c9bef3243de8ef95c8fe1156b2e571..05c38ff32d1497ac6f6ebe687de7a1266627a013 100644 (file)
@@ -3054,15 +3054,15 @@ struct rspamd_lua_periodic {
        struct rspamd_config *cfg;
        lua_State *L;
        gdouble timeout;
-       struct event ev;
+       ev_timer ev;
        gint cbref;
        gboolean need_jitter;
 };
 
 static void
-lua_periodic_callback (gint unused_fd, short what, gpointer ud)
+lua_periodic_callback (struct ev_loop *loop, ev_timer *w, int revents)
 {
-       struct rspamd_lua_periodic *periodic = ud;
+       struct rspamd_lua_periodic *periodic = (struct rspamd_lua_periodic *)w->data;
        struct rspamd_config **pcfg, *cfg;
        struct ev_loop **pev_base;
        struct thread_entry *thread;
@@ -3084,7 +3084,6 @@ lua_periodic_callback (gint unused_fd, short what, gpointer ud)
        rspamd_lua_setclass (L, "rspamd{ev_base}", -1);
        *pev_base = periodic->ev_base;
 
-       event_del (&periodic->ev);
        lua_thread_call (thread, 2);
 }
 
@@ -3094,7 +3093,6 @@ lua_periodic_callback_finish (struct thread_entry *thread, int ret)
        lua_State *L;
        struct rspamd_lua_periodic *periodic = thread->cd;
        gboolean plan_more = FALSE;
-       struct timeval tv;
        gdouble timeout = 0.0;
 
        L = thread->lua_state;
@@ -3120,11 +3118,12 @@ lua_periodic_callback_finish (struct thread_entry *thread, int ret)
                        timeout = rspamd_time_jitter (timeout, 0.0);
                }
 
-               double_to_tv (timeout, &tv);
-               event_add (&periodic->ev, &tv);
+               periodic->ev.repeat = timeout;
+               ev_timer_again (periodic->ev_base, &periodic->ev);
        }
        else {
                luaL_unref (L, LUA_REGISTRYINDEX, periodic->cbref);
+               ev_timer_stop (periodic->ev_base, &periodic->ev);
                g_free (periodic);
        }
 }
@@ -3138,7 +3137,7 @@ lua_periodic_callback_error (struct thread_entry *thread, int ret, const char *m
 
        msg_err_config ("call to finishing script failed: %s", msg);
 
-       lua_periodic_callback_finish(thread, ret);
+       lua_periodic_callback_finish (thread, ret);
 }
 
 
@@ -3149,7 +3148,6 @@ lua_config_add_periodic (lua_State *L)
        struct rspamd_config *cfg = lua_check_config (L, 1);
        struct ev_loop *ev_base = lua_check_ev_base (L, 2);
        gdouble timeout = lua_tonumber (L, 3);
-       struct timeval tv;
        struct rspamd_lua_periodic *periodic;
        gboolean need_jitter = FALSE;
 
@@ -3169,15 +3167,14 @@ lua_config_add_periodic (lua_State *L)
        periodic->need_jitter = need_jitter;
        lua_pushvalue (L, 4);
        periodic->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
-       event_set (&periodic->ev, -1, EV_TIMEOUT, lua_periodic_callback, periodic);
-       event_base_set (ev_base, &periodic->ev);
 
        if (need_jitter) {
                timeout = rspamd_time_jitter (timeout, 0.0);
        }
 
-       double_to_tv (timeout, &tv);
-       event_add (&periodic->ev, &tv);
+       ev_timer_init (&periodic->ev, lua_periodic_callback, timeout, 0.0);
+       periodic->ev.data = periodic;
+       ev_timer_start (ev_base, &periodic->ev);
 
        return 0;
 }
index 38072e1777db3608393b48a00fa36a1401c0a44a..ec42ab39e4e7dc7c90d36ceab5d04704f6cb7d3d 100644 (file)
@@ -67,10 +67,10 @@ struct lua_http_cbdata {
        struct rspamd_async_session *session;
        struct rspamd_symcache_item *item;
        struct rspamd_http_message *msg;
-       struct ev_loop *ev_base;
+       struct ev_loop *event_loop;
        struct rspamd_config *cfg;
        struct rspamd_task *task;
-       struct timeval tv;
+       ev_tstamp timeout;
        struct rspamd_cryptobox_keypair *local_kp;
        struct rspamd_cryptobox_pubkey *peer_pk;
        rspamd_inet_addr_t *addr;
@@ -86,7 +86,7 @@ struct lua_http_cbdata {
        ref_entry_t ref;
 };
 
-static const int default_http_timeout = 5000;
+static const gdouble default_http_timeout = 5.0;
 
 static struct rspamd_dns_resolver *
 lua_http_global_resolver (struct ev_loop *ev_base)
@@ -451,7 +451,7 @@ lua_http_make_connection (struct lua_http_cbdata *cbd)
 
                rspamd_http_connection_write_message (cbd->conn, msg,
                                cbd->host, cbd->mime_type, cbd,
-                               &cbd->tv);
+                               cbd->timeout);
 
                return TRUE;
        }
@@ -717,7 +717,7 @@ lua_http_request (lua_State *L)
                lua_pushstring (L, "timeout");
                lua_gettable (L, 1);
                if (lua_type (L, -1) == LUA_TNUMBER) {
-                       timeout = lua_tonumber (L, -1) * 1000.;
+                       timeout = lua_tonumber (L, -1);
                }
                lua_pop (L, 1);
 
@@ -860,7 +860,7 @@ lua_http_request (lua_State *L)
                lua_gettable (L, 1);
 
                if (lua_type (L, -1) == LUA_TNUMBER) {
-                       max_size = lua_tonumber (L, -1);
+                       max_size = lua_tointeger (L, -1);
                }
 
                lua_pop (L, 1);
@@ -943,9 +943,9 @@ lua_http_request (lua_State *L)
        cbd = g_malloc0 (sizeof (*cbd));
        cbd->cbref = cbref;
        cbd->msg = msg;
-       cbd->ev_base = ev_base;
+       cbd->event_loop = ev_base;
        cbd->mime_type = mime_type;
-       msec_to_tv (timeout, &cbd->tv);
+       cbd->timeout = timeout;
        cbd->fd = -1;
        cbd->cfg = cfg;
        cbd->peer_pk = peer_key;
index b96171d8981bab476aecdc8f483c9c036d3000ed..f39168a276727f622c30e51c7630516cd9405beb 100644 (file)
@@ -98,7 +98,7 @@ struct lua_redis_userdata {
        struct rspamd_task *task;
        struct rspamd_symcache_item *item;
        struct rspamd_async_session *s;
-       struct ev_loop *ev_base;
+       struct ev_loop *event_loop;
        struct rspamd_config *cfg;
        struct rspamd_redis_pool *pool;
        gchar *server;
@@ -124,7 +124,7 @@ struct lua_redis_request_specific_userdata {
        struct lua_redis_userdata *c;
        struct lua_redis_ctx *ctx;
        struct lua_redis_request_specific_userdata *next;
-       struct event timeout;
+       ev_timer timeout_ev;
        guint flags;
 };
 
@@ -184,9 +184,7 @@ lua_redis_dtor (struct lua_redis_ctx *ctx)
        if (ud->ctx) {
 
                LL_FOREACH_SAFE (ud->specific, cur, tmp) {
-                       if (rspamd_event_pending (&cur->timeout, EV_TIMEOUT)) {
-                               event_del (&cur->timeout);
-                       }
+                       ev_timer_stop (ud->event_loop, &cur->timeout_ev);
 
                        if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
                                is_successful = FALSE;
@@ -245,9 +243,7 @@ lua_redis_fin (void *arg)
 
        ctx = sp_ud->ctx;
 
-       if (rspamd_event_pending (&sp_ud->timeout, EV_TIMEOUT)) {
-               event_del (&sp_ud->timeout);
-       }
+       ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev);
 
        msg_debug ("finished redis query %p from session %p", sp_ud, ctx);
        sp_ud->flags |= LUA_REDIS_SPECIFIC_FINISHED;
@@ -556,10 +552,7 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
                return;
        }
 
-       if (rspamd_event_pending (&sp_ud->timeout, EV_TIMEOUT)) {
-               event_del (&sp_ud->timeout);
-       }
-
+       ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev);
        msg_debug ("got reply from redis: %p for query %p", ac, sp_ud);
 
        struct lua_redis_result *result = g_malloc0 (sizeof *result);
@@ -630,9 +623,10 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
 }
 
 static void
-lua_redis_timeout_sync (int fd, short what, gpointer priv)
+lua_redis_timeout_sync (EV_P_ ev_timer *w, int revents)
 {
-       struct lua_redis_request_specific_userdata *sp_ud = priv;
+       struct lua_redis_request_specific_userdata *sp_ud =
+                       (struct lua_redis_request_specific_userdata *)w->data;
        struct lua_redis_ctx *ctx = sp_ud->ctx;
        redisAsyncContext *ac;
 
@@ -657,9 +651,10 @@ lua_redis_timeout_sync (int fd, short what, gpointer priv)
 }
 
 static void
-lua_redis_timeout (int fd, short what, gpointer u)
+lua_redis_timeout (EV_P_ ev_timer *w, int revents)
 {
-       struct lua_redis_request_specific_userdata *sp_ud = u;
+       struct lua_redis_request_specific_userdata *sp_ud =
+                       (struct lua_redis_request_specific_userdata *)w->data;
        struct lua_redis_ctx *ctx;
        redisAsyncContext *ac;
 
@@ -790,9 +785,9 @@ lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd,
 static struct lua_redis_ctx *
 rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_async)
 {
-       struct lua_redis_ctx *ctx;
+       struct lua_redis_ctx *ctx = NULL;
        rspamd_inet_addr_t *ip = NULL;
-       struct lua_redis_userdata *ud;
+       struct lua_redis_userdata *ud = NULL;
        struct rspamd_lua_ip *addr = NULL;
        struct rspamd_task *task = NULL;
        const gchar *host;
@@ -933,7 +928,7 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_asy
                        ud->s = session;
                        ud->cfg = cfg;
                        ud->pool = cfg->redis_pool;
-                       ud->ev_base = ev_base;
+                       ud->event_loop = ev_base;
                        ud->task = task;
 
                        if (task) {
@@ -1009,7 +1004,6 @@ lua_redis_make_request (lua_State *L)
        struct lua_redis_userdata *ud;
        struct lua_redis_ctx *ctx, **pctx;
        const gchar *cmd = NULL;
-       struct timeval tv;
        gdouble timeout = REDIS_DEFAULT_TIMEOUT;
        gint cbref = -1;
        gboolean ret = FALSE;
@@ -1064,10 +1058,9 @@ lua_redis_make_request (lua_State *L)
 
                        REDIS_RETAIN (ctx); /* Cleared by fin event */
                        ctx->cmds_pending ++;
-                       double_to_tv (timeout, &tv);
-                       event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud);
-                       event_base_set (ud->ev_base, &sp_ud->timeout);
-                       event_add (&sp_ud->timeout, &tv);
+                       sp_ud->timeout_ev.data = sp_ud;
+                       ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout, timeout, 0.0);
+                       ev_timer_start (ud->event_loop, &sp_ud->timeout_ev);
                        ret = TRUE;
                }
                else {
@@ -1347,7 +1340,6 @@ lua_redis_add_cmd (lua_State *L)
        const gchar *cmd = NULL;
        gint args_pos = 2;
        gint cbref = -1, ret;
-       struct timeval tv;
 
        if (ctx) {
                if (ctx->flags & LUA_REDIS_TERMINATED) {
@@ -1426,19 +1418,18 @@ lua_redis_add_cmd (lua_State *L)
                                }
                        }
 
-                       double_to_tv (sp_ud->c->timeout, &tv);
+                       sp_ud->timeout_ev.data = sp_ud;
 
                        if (IS_ASYNC (ctx)) {
-                               event_set (&sp_ud->timeout, -1, EV_TIMEOUT,
-                                               lua_redis_timeout, sp_ud);
+                               ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout,
+                                               sp_ud->c->timeout, 0.0);
                        }
                        else {
-                               event_set (&sp_ud->timeout, -1, EV_TIMEOUT,
-                                               lua_redis_timeout_sync, sp_ud);
+                               ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout_sync,
+                                               sp_ud->c->timeout, 0.0);
                        }
 
-                       event_base_set (ud->ev_base, &sp_ud->timeout);
-                       event_add (&sp_ud->timeout, &tv);
+                       ev_timer_start (ud->event_loop, &sp_ud->timeout_ev);
                        REDIS_RETAIN (ctx);
                        ctx->cmds_pending ++;
                }
index cad0e655628541d079573b2e3303cb10b65e6a87..4d1c205cf27c2c8aa50ac724f9c58fbdde2e90af 100644 (file)
@@ -159,13 +159,6 @@ LUA_FUNCTION_DEF (tcp, connect_sync);
  * Closes TCP connection
  */
 LUA_FUNCTION_DEF (tcp, close);
-/***
- * @method tcp:set_timeout(seconds)
- *
- * Sets new timeout for a TCP connection in **seconds**
- * @param {number} seconds floating point value that specifies new timeout
- */
-LUA_FUNCTION_DEF (tcp, set_timeout);
 
 /***
  * @method tcp:add_read(callback, [pattern])
@@ -210,7 +203,6 @@ static const struct luaL_reg tcp_libf[] = {
 
 static const struct luaL_reg tcp_libm[] = {
        LUA_INTERFACE_DEF (tcp, close),
-       LUA_INTERFACE_DEF (tcp, set_timeout),
        LUA_INTERFACE_DEF (tcp, add_read),
        LUA_INTERFACE_DEF (tcp, add_write),
        LUA_INTERFACE_DEF (tcp, shift_callback),
@@ -226,13 +218,6 @@ static const struct luaL_reg tcp_libm[] = {
  */
 LUA_FUNCTION_DEF (tcp_sync, close);
 
-/***
- * @method set_timeout(seconds)
- *
- * Sets timeout for IO operations
- */
-LUA_FUNCTION_DEF (tcp_sync, set_timeout);
-
 /***
  * @method read_once()
  *
@@ -270,7 +255,6 @@ static void lua_tcp_sync_session_dtor (gpointer ud);
 
 static const struct luaL_reg tcp_sync_libm[] = {
        LUA_INTERFACE_DEF (tcp_sync, close),
-       LUA_INTERFACE_DEF (tcp_sync, set_timeout),
        LUA_INTERFACE_DEF (tcp_sync, read_once),
        LUA_INTERFACE_DEF (tcp_sync, write),
        LUA_INTERFACE_DEF (tcp_sync, eof),
@@ -342,8 +326,7 @@ struct lua_tcp_dtor {
 struct lua_tcp_cbdata {
        struct rspamd_async_session *session;
        struct rspamd_async_event *async_ev;
-       struct ev_loop *ev_base;
-       struct timeval tv;
+       struct ev_loop *event_loop;
        rspamd_inet_addr_t *addr;
        GByteArray *in;
        GQueue *handlers;
@@ -352,7 +335,7 @@ struct lua_tcp_cbdata {
        guint port;
        guint flags;
        gchar tag[7];
-       struct event ev;
+       struct rspamd_io_ev ev;
        struct lua_tcp_dtor *dtors;
        ref_entry_t ref;
        struct rspamd_task *task;
@@ -381,7 +364,7 @@ static void lua_tcp_unregister_event (struct lua_tcp_cbdata *cbd);
 static void
 lua_tcp_void_finalyser (gpointer arg) {}
 
-static const int default_tcp_timeout = 5000;
+static const gdouble default_tcp_timeout = 5.0;
 
 static struct rspamd_dns_resolver *
 lua_tcp_global_resolver (struct ev_loop *ev_base,
@@ -467,7 +450,7 @@ lua_tcp_fin (gpointer arg)
        }
 
        if (cbd->fd != -1) {
-               event_del (&cbd->ev);
+               rspamd_ev_watcher_stop (cbd->event_loop, &cbd->ev);
                close (cbd->fd);
                cbd->fd = -1;
        }
@@ -755,15 +738,7 @@ lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
 static void
 lua_tcp_plan_read (struct lua_tcp_cbdata *cbd)
 {
-       event_del (&cbd->ev);
-#ifdef EV_CLOSED
-       event_set (&cbd->ev, cbd->fd, EV_READ|EV_CLOSED,
-                               lua_tcp_handler, cbd);
-#else
-       event_set (&cbd->ev, cbd->fd, EV_READ, lua_tcp_handler, cbd);
-#endif
-       event_base_set (cbd->ev_base, &cbd->ev);
-       event_add (&cbd->ev, &cbd->tv);
+       rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, EV_READ);
 }
 
 static void
@@ -867,7 +842,6 @@ lua_tcp_write_helper (struct lua_tcp_cbdata *cbd)
        }
        else {
                /* Want to write more */
-               event_add (&cbd->ev, &cbd->tv);
        }
 
        return;
@@ -1149,9 +1123,8 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read,
                                msg_debug_tcp ("plan new read");
                                if (can_read) {
                                        /* We need to plan a new event */
-                                       event_set (&cbd->ev, cbd->fd, EV_READ, lua_tcp_handler, cbd);
-                                       event_base_set (cbd->ev_base, &cbd->ev);
-                                       event_add (&cbd->ev, &cbd->tv);
+                                       rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev,
+                                                       EV_READ);
                                }
                                else {
                                        /* Cannot read more */
@@ -1172,9 +1145,8 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read,
                        if (hdl->h.w.pos < hdl->h.w.total_bytes) {
                                msg_debug_tcp ("plan new write");
                                if (can_write) {
-                                       event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd);
-                                       event_base_set (cbd->ev_base, &cbd->ev);
-                                       event_add (&cbd->ev, &cbd->tv);
+                                       rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev,
+                                                       EV_WRITE);
                                }
                                else {
                                        /* Cannot write more */
@@ -1192,9 +1164,8 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read,
                }
                else { /* LUA_WANT_CONNECT */
                        msg_debug_tcp ("plan new connect");
-                       event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd);
-                       event_base_set (cbd->ev_base, &cbd->ev);
-                       event_add (&cbd->ev, &cbd->tv);
+                       rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev,
+                                       EV_WRITE);
                }
        }
 }
@@ -1289,12 +1260,11 @@ lua_tcp_make_connection (struct lua_tcp_cbdata *cbd)
                        verify_peer = TRUE;
                }
 
-               event_base_set (cbd->ev_base, &cbd->ev);
                cbd->ssl_conn =
-                               rspamd_ssl_connection_new (ssl_ctx, cbd->ev_base, verify_peer);
+                               rspamd_ssl_connection_new (ssl_ctx, cbd->event_loop, verify_peer);
 
                if (!rspamd_ssl_connect_fd (cbd->ssl_conn, fd, cbd->hostname, &cbd->ev,
-                               &cbd->tv, lua_tcp_handler, lua_tcp_ssl_on_error, cbd)) {
+                               cbd->ev.timeout, lua_tcp_handler, lua_tcp_ssl_on_error, cbd)) {
                        lua_tcp_push_error (cbd, TRUE, "ssl connection failed: %s",
                                        strerror (errno));
 
@@ -1431,7 +1401,7 @@ lua_tcp_request (lua_State *L)
        guint port;
        gint cbref, tp, conn_cbref = -1;
        gsize plen = 0;
-       struct ev_loop *ev_base;
+       struct ev_loop *event_loop = NULL;
        struct lua_tcp_cbdata *cbd;
        struct rspamd_dns_resolver *resolver = NULL;
        struct rspamd_async_session *session = NULL;
@@ -1453,7 +1423,7 @@ lua_tcp_request (lua_State *L)
                lua_pushstring (L, "port");
                lua_gettable (L, -2);
                if (lua_type (L, -1) == LUA_TNUMBER) {
-                       port = luaL_checknumber (L, -1);
+                       port = lua_tointeger (L, -1);
                }
                else {
                        /* We assume that it is a unix socket */
@@ -1478,7 +1448,7 @@ lua_tcp_request (lua_State *L)
                lua_gettable (L, -2);
                if (lua_type (L, -1) == LUA_TUSERDATA) {
                        task = lua_check_task (L, -1);
-                       ev_base = task->event_loop;
+                       event_loop = task->event_loop;
                        resolver = task->resolver;
                        session = task->s;
                        cfg = task->cfg;
@@ -1489,10 +1459,10 @@ lua_tcp_request (lua_State *L)
                        lua_pushstring (L, "ev_base");
                        lua_gettable (L, -2);
                        if (rspamd_lua_check_udata_maybe (L, -1, "rspamd{ev_base}")) {
-                               ev_base = *(struct ev_loop **)lua_touserdata (L, -1);
+                               event_loop = *(struct ev_loop **)lua_touserdata (L, -1);
                        }
                        else {
-                               ev_base = NULL;
+                               event_loop = ev_default_loop (0);
                        }
                        lua_pop (L, 1);
 
@@ -1522,7 +1492,7 @@ lua_tcp_request (lua_State *L)
                                resolver = *(struct rspamd_dns_resolver **)lua_touserdata (L, -1);
                        }
                        else {
-                               resolver = lua_tcp_global_resolver (ev_base, cfg);
+                               resolver = lua_tcp_global_resolver (event_loop, cfg);
                        }
                        lua_pop (L, 1);
                }
@@ -1530,7 +1500,7 @@ lua_tcp_request (lua_State *L)
                lua_pushstring (L, "timeout");
                lua_gettable (L, -2);
                if (lua_type (L, -1) == LUA_TNUMBER) {
-                       timeout = lua_tonumber (L, -1) * 1000.;
+                       timeout = lua_tonumber (L, -1);
                }
                lua_pop (L, 1);
 
@@ -1691,10 +1661,10 @@ lua_tcp_request (lua_State *L)
                g_queue_push_tail (cbd->handlers, wh);
        }
 
-       cbd->ev_base = ev_base;
-       msec_to_tv (timeout, &cbd->tv);
+       cbd->event_loop = event_loop;
        cbd->fd = -1;
        cbd->port = port;
+       cbd->ev.timeout = timeout;
 
        if (ssl) {
                cbd->flags |= LUA_TCP_FLAG_SSL;
@@ -1881,9 +1851,8 @@ lua_tcp_connect_sync (lua_State *L)
        rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h);
        cbd->handlers = g_queue_new ();
 
-       cbd->ev_base = ev_base;
+       cbd->event_loop = ev_base;
        cbd->flags |= LUA_TCP_FLAG_SYNC;
-       double_to_tv (timeout, &cbd->tv);
        cbd->fd = -1;
        cbd->port = (guint16)port;
 
@@ -1979,25 +1948,6 @@ lua_tcp_close (lua_State *L)
        return 0;
 }
 
-static gint
-lua_tcp_set_timeout (lua_State *L)
-{
-       LUA_TRACE_POINT;
-       struct lua_tcp_cbdata *cbd = lua_check_tcp (L, 1);
-       gdouble seconds = lua_tonumber (L, 2);
-
-       if (cbd == NULL) {
-               return luaL_error (L, "invalid arguments");
-       }
-       if (!lua_isnumber (L, 2)) {
-               return luaL_error (L, "invalid arguments: 'seconds' is expected to be number");
-       }
-
-       double_to_tv (seconds, &cbd->tv);
-
-       return 0;
-}
-
 static gint
 lua_tcp_add_read (lua_State *L)
 {
@@ -2159,7 +2109,7 @@ lua_tcp_sync_close (lua_State *L)
        cbd->flags |= LUA_TCP_FLAG_FINISHED;
 
        if (cbd->fd != -1) {
-               event_del (&cbd->ev);
+               rspamd_ev_watcher_stop (cbd->event_loop, &cbd->ev);
                close (cbd->fd);
                cbd->fd = -1;
        }
@@ -2175,7 +2125,7 @@ lua_tcp_sync_session_dtor (gpointer ud)
 
        if (cbd->fd != -1) {
                msg_debug ("closing sync TCP connection");
-               event_del (&cbd->ev);
+               rspamd_ev_watcher_stop (cbd->event_loop, &cbd->ev);
                close (cbd->fd);
                cbd->fd = -1;
        }
@@ -2187,25 +2137,6 @@ lua_tcp_sync_session_dtor (gpointer ud)
        cbd->async_ev = NULL;
 }
 
-static int
-lua_tcp_sync_set_timeout (lua_State *L)
-{
-       LUA_TRACE_POINT;
-       struct lua_tcp_cbdata *cbd = lua_check_sync_tcp (L, 1);
-       gdouble seconds = lua_tonumber (L, 2);
-
-       if (cbd == NULL) {
-               return luaL_error (L, "invalid arguments: self is not rspamd{tcp_sync}");
-       }
-       if (lua_type (L, 2) != LUA_TNUMBER) {
-               return luaL_error (L, "invalid arguments: second parameter is expected to be number");
-       }
-
-       double_to_tv (seconds, &cbd->tv);
-
-       return 0;
-}
-
 static int
 lua_tcp_sync_read_once (lua_State *L)
 {
@@ -2363,12 +2294,11 @@ lua_tcp_starttls (lua_State * L)
                verify_peer = TRUE;
        }
 
-       event_base_set (cbd->ev_base, &cbd->ev);
        cbd->ssl_conn =
-                       rspamd_ssl_connection_new (ssl_ctx, cbd->ev_base, verify_peer);
+                       rspamd_ssl_connection_new (ssl_ctx, cbd->event_loop, verify_peer);
 
        if (!rspamd_ssl_connect_fd (cbd->ssl_conn, cbd->fd, cbd->hostname, &cbd->ev,
-                       &cbd->tv, lua_tcp_handler, lua_tcp_ssl_on_error, cbd)) {
+                       cbd->ev.timeout, lua_tcp_handler, lua_tcp_ssl_on_error, cbd)) {
                lua_tcp_push_error (cbd, TRUE, "ssl connection failed: %s",
                                strerror (errno));
        }
index fdbc0f36d86883fcfb2d7f335fbca8a40ca86bed..94d27bf63bbc7a5ab224e8cf58e2ec084e2e737b 100644 (file)
@@ -18,6 +18,7 @@
 #include "utlist.h"
 #include "unix-std.h"
 #include <math.h>
+#include <src/libutil/libev_helper.h>
 
 static const gchar *M = "rspamd lua udp";
 
@@ -59,9 +60,8 @@ static const struct luaL_reg udp_libf[] = {
 };
 
 struct lua_udp_cbdata {
-       struct event io;
-       struct timeval tv;
-       struct ev_loop *ev_base;
+       struct ev_loop *event_loop;
+       struct rspamd_io_ev ev;
        struct rspamd_async_event *async_ev;
        struct rspamd_task *task;
        rspamd_mempool_t *pool;
@@ -115,10 +115,7 @@ lua_udp_cbd_fin (gpointer p)
        struct lua_udp_cbdata *cbd = (struct lua_udp_cbdata *)p;
 
        if (cbd->sock != -1) {
-               if (cbd->io.ev_base != NULL) {
-                       event_del (&cbd->io);
-               }
-
+               rspamd_ev_watcher_stop (cbd->event_loop, &cbd->ev);
                close (cbd->sock);
        }
 
@@ -264,16 +261,12 @@ lua_udp_io_handler (gint fd, short what, gpointer p)
 
        L = cbd->L;
 
-       event_del (&cbd->io);
-
        if (what == EV_TIMEOUT) {
                if (cbd->sent && cbd->retransmits > 0) {
                        r = lua_try_send_request (cbd);
 
                        if (r == RSPAMD_SENT_OK) {
-                               event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
-                               event_base_set (cbd->ev_base, &cbd->io);
-                               event_add (&cbd->io, &cbd->tv);
+                               rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, EV_READ);
                                lua_udp_maybe_register_event (cbd);
                                cbd->retransmits --;
                        }
@@ -282,9 +275,7 @@ lua_udp_io_handler (gint fd, short what, gpointer p)
                        }
                        else {
                                cbd->retransmits --;
-                               event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
-                               event_base_set (cbd->ev_base, &cbd->io);
-                               event_add (&cbd->io, &cbd->tv);
+                               rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, EV_WRITE);
                        }
                }
                else {
@@ -301,9 +292,7 @@ lua_udp_io_handler (gint fd, short what, gpointer p)
 
                if (r == RSPAMD_SENT_OK) {
                        if (cbd->cbref != -1) {
-                               event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
-                               event_base_set (cbd->ev_base, &cbd->io);
-                               event_add (&cbd->io, &cbd->tv);
+                               rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, EV_READ);
                                cbd->sent = TRUE;
                        }
                        else {
@@ -315,9 +304,7 @@ lua_udp_io_handler (gint fd, short what, gpointer p)
                }
                else {
                        cbd->retransmits --;
-                       event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
-                       event_base_set (cbd->ev_base, &cbd->io);
-                       event_add (&cbd->io, &cbd->tv);
+                       rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, EV_WRITE);
                }
        }
        else if (what == EV_READ) {
@@ -371,7 +358,7 @@ lua_udp_sendto (lua_State *L) {
                lua_gettable (L, -2);
 
                if (lua_type (L, -1) == LUA_TNUMBER) {
-                       port = luaL_checknumber (L, -1);
+                       port = lua_tointeger (L, -1);
                }
                else {
                        /* We assume that it is a unix socket */
@@ -472,22 +459,15 @@ lua_udp_sendto (lua_State *L) {
                }
 
 
-
-               if (!ev_base || !pool) {
-                       rspamd_inet_address_free (addr);
-
-                       return luaL_error (L, "invalid arguments");
-               }
-
                cbd = rspamd_mempool_alloc0 (pool, sizeof (*cbd));
-               cbd->ev_base = ev_base;
+               cbd->event_loop = ev_base;
                cbd->pool = pool;
                cbd->s = session;
                cbd->addr = addr;
                cbd->sock = rspamd_socket_create (rspamd_inet_address_get_af (addr),
                                SOCK_DGRAM, 0, TRUE);
                cbd->cbref = -1;
-               double_to_tv (timeout, &cbd->tv);
+               cbd->ev.timeout = timeout;
 
                if (cbd->sock == -1) {
                        rspamd_inet_address_free (addr);
@@ -555,9 +535,9 @@ lua_udp_sendto (lua_State *L) {
                                        return 2;
                                }
 
-                               event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
-                               event_base_set (cbd->ev_base, &cbd->io);
-                               event_add (&cbd->io, &cbd->tv);
+                               rspamd_ev_watcher_init (&cbd->ev, cbd->sock, EV_READ,
+                                               lua_udp_io_handler, cbd);
+                               rspamd_ev_watcher_start (cbd->event_loop, &cbd->ev, timeout);
                                cbd->sent = TRUE;
                        }
 
@@ -571,9 +551,9 @@ lua_udp_sendto (lua_State *L) {
                        return 2;
                }
                else {
-                       event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
-                       event_base_set (cbd->ev_base, &cbd->io);
-                       event_add (&cbd->io, &cbd->tv);
+                       rspamd_ev_watcher_init (&cbd->ev, cbd->sock, EV_WRITE,
+                                       lua_udp_io_handler, cbd);
+                       rspamd_ev_watcher_start (cbd->event_loop, &cbd->ev, timeout);
 
                        if (!lua_udp_maybe_register_event (cbd)) {
                                lua_pushboolean (L, false);
index 8ef0bc2fbe9a3592a912a4f98b3c228e10bfefef..8e6403972a9c6bec059eeb72e9037b8dbc860983 100644 (file)
@@ -699,7 +699,7 @@ lua_util_create_event_base (lua_State *L)
 
        pev_base = lua_newuserdata (L, sizeof (struct ev_loop *));
        rspamd_lua_setclass (L, "rspamd{ev_base}", -1);
-       *pev_base = event_init ();
+       *pev_base = ev_default_loop (EVFLAG_SIGNALFD);
 
        return 1;
 }
@@ -848,7 +848,7 @@ lua_util_process_message (lua_State *L)
        message = luaL_checklstring (L, 2, &mlen);
 
        if (cfg != NULL && message != NULL) {
-               base = event_init ();
+               base = ev_loop_new (EVFLAG_SIGNALFD);
                rspamd_init_filters (cfg, FALSE);
                task = rspamd_task_new (NULL, cfg, NULL, NULL, base);
                task->msg.begin = rspamd_mempool_alloc (task->task_pool, mlen);
@@ -865,7 +865,7 @@ lua_util_process_message (lua_State *L)
                }
                else {
                        if (rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL)) {
-                               event_base_loop (base, 0);
+                               ev_loop (base, 0);
 
                                if (res != NULL) {
                                        ucl_object_push_lua (L, res, true);
@@ -885,7 +885,7 @@ lua_util_process_message (lua_State *L)
                        }
                }
 
-               event_base_free (base);
+               ev_loop_destroy (base);
        }
        else {
                lua_pushnil (L);
@@ -3799,10 +3799,10 @@ lua_ev_base_loop (lua_State *L)
 
        ev_base = lua_check_ev_base (L, 1);
        if (lua_isnumber (L, 2)) {
-               flags = lua_tonumber (L, 2);
+               flags = lua_tointeger (L, 2);
        }
 
-       int ret = event_base_loop (ev_base, flags);
+       int ret = ev_run (ev_base, flags);
        lua_pushinteger (L, ret);
 
        return 1;
index d876d0879d8b6078bcb7a00afc2893dcd32449cb..7dbefc6be0bf4ca8d714fbb74bb6300f6de31c26 100644 (file)
@@ -282,8 +282,8 @@ struct rspamd_lua_process_cbdata {
        GString *out_buf;
        goffset out_pos;
        struct rspamd_worker *wrk;
-       struct ev_loop *ev_base;
-       struct event ev;
+       struct ev_loop *event_loop;
+       ev_io ev;
 };
 
 static void
@@ -393,9 +393,9 @@ rspamd_lua_cld_handler (struct rspamd_worker_signal_handler *sigh, void *ud)
 
        if (!cbdata->replied) {
                /* We still need to call on_complete callback */
+               ev_io_stop (cbdata->event_loop, &cbdata->ev);
                rspamd_lua_call_on_complete (cbdata->L, cbdata,
                                "Worker has died without reply", NULL, 0);
-               event_del (&cbdata->ev);
        }
 
        /* Free structures */
@@ -414,7 +414,7 @@ rspamd_lua_cld_handler (struct rspamd_worker_signal_handler *sigh, void *ud)
        srv_cmd.cmd.on_fork.state = child_dead;
        srv_cmd.cmd.on_fork.cpid = cbdata->cpid;
        srv_cmd.cmd.on_fork.ppid = getpid ();
-       rspamd_srv_send_command (cbdata->wrk, cbdata->ev_base, &srv_cmd, -1,
+       rspamd_srv_send_command (cbdata->wrk, cbdata->event_loop, &srv_cmd, -1,
                        NULL, NULL);
        g_free (cbdata);
 
@@ -423,9 +423,10 @@ rspamd_lua_cld_handler (struct rspamd_worker_signal_handler *sigh, void *ud)
 }
 
 static void
-rspamd_lua_subprocess_io (gint fd, short what, gpointer ud)
+rspamd_lua_subprocess_io (EV_P_ ev_io *w, int revents)
 {
-       struct rspamd_lua_process_cbdata *cbdata = ud;
+       struct rspamd_lua_process_cbdata *cbdata =
+                       (struct rspamd_lua_process_cbdata *)w->data;
        gssize r;
 
        if (cbdata->sz == (guint64)-1) {
@@ -436,9 +437,9 @@ rspamd_lua_subprocess_io (gint fd, short what, gpointer ud)
                                sizeof (guint64) - cbdata->io_buf->len);
 
                if (r == 0) {
+                       ev_io_stop (cbdata->event_loop, &cbdata->ev);
                        rspamd_lua_call_on_complete (cbdata->L, cbdata,
                                        "Unexpected EOF", NULL, 0);
-                       event_del (&cbdata->ev);
                        cbdata->replied = TRUE;
                        kill (cbdata->cpid, SIGTERM);
 
@@ -449,9 +450,9 @@ rspamd_lua_subprocess_io (gint fd, short what, gpointer ud)
                                return;
                        }
                        else {
+                               ev_io_stop (cbdata->event_loop, &cbdata->ev);
                                rspamd_lua_call_on_complete (cbdata->L, cbdata,
                                                strerror (errno), NULL, 0);
-                               event_del (&cbdata->ev);
                                cbdata->replied = TRUE;
                                kill (cbdata->cpid, SIGTERM);
 
@@ -481,9 +482,9 @@ rspamd_lua_subprocess_io (gint fd, short what, gpointer ud)
                                cbdata->sz - cbdata->io_buf->len);
 
                if (r == 0) {
+                       ev_io_stop (cbdata->event_loop, &cbdata->ev);
                        rspamd_lua_call_on_complete (cbdata->L, cbdata,
                                        "Unexpected EOF", NULL, 0);
-                       event_del (&cbdata->ev);
                        cbdata->replied = TRUE;
                        kill (cbdata->cpid, SIGTERM);
 
@@ -494,9 +495,9 @@ rspamd_lua_subprocess_io (gint fd, short what, gpointer ud)
                                return;
                        }
                        else {
+                               ev_io_stop (cbdata->event_loop, &cbdata->ev);
                                rspamd_lua_call_on_complete (cbdata->L, cbdata,
                                                strerror (errno), NULL, 0);
-                               event_del (&cbdata->ev);
                                cbdata->replied = TRUE;
                                kill (cbdata->cpid, SIGTERM);
 
@@ -509,6 +510,7 @@ rspamd_lua_subprocess_io (gint fd, short what, gpointer ud)
                if (cbdata->io_buf->len == cbdata->sz) {
                        gchar rep[4];
 
+                       ev_io_stop (cbdata->event_loop, &cbdata->ev);
                        /* Finished reading data */
                        if (cbdata->is_error) {
                                cbdata->io_buf->str[cbdata->io_buf->len] = '\0';
@@ -520,7 +522,6 @@ rspamd_lua_subprocess_io (gint fd, short what, gpointer ud)
                                                NULL, cbdata->io_buf->str, cbdata->io_buf->len);
                        }
 
-                       event_del (&cbdata->ev);
                        cbdata->replied = TRUE;
 
                        /* Write reply to the child */
@@ -577,7 +578,7 @@ lua_worker_spawn_process (lua_State *L)
        actx = w->ctx;
        cbdata->wrk = w;
        cbdata->L = L;
-       cbdata->ev_base = actx->ev_base;
+       cbdata->event_loop = actx->event_loop;
        cbdata->sz = (guint64)-1;
 
        pid = fork ();
@@ -612,7 +613,8 @@ lua_worker_spawn_process (lua_State *L)
                close (cbdata->sp[0]);
                /* Here we assume that we can block on writing results */
                rspamd_socket_blocking (cbdata->sp[1]);
-               event_reinit (cbdata->ev_base);
+               ev_loop_destroy (EV_DEFAULT);
+               cbdata->event_loop = ev_default_loop (EVFLAG_SIGNALFD);
                g_hash_table_remove_all (w->signal_events);
                rspamd_worker_unblock_signals ();
                rspamd_lua_execute_lua_subprocess (L, cbdata);
@@ -639,21 +641,19 @@ lua_worker_spawn_process (lua_State *L)
        srv_cmd.cmd.on_fork.state = child_create;
        srv_cmd.cmd.on_fork.cpid = pid;
        srv_cmd.cmd.on_fork.ppid = getpid ();
-       rspamd_srv_send_command (w, cbdata->ev_base, &srv_cmd, -1, NULL, NULL);
+       rspamd_srv_send_command (w, cbdata->event_loop, &srv_cmd, -1, NULL, NULL);
 
        close (cbdata->sp[1]);
        rspamd_socket_nonblocking (cbdata->sp[0]);
        /* Parent */
-       rspamd_worker_set_signal_handler (SIGCHLD, w, cbdata->ev_base,
+       rspamd_worker_set_signal_handler (SIGCHLD, w, cbdata->event_loop,
                        rspamd_lua_cld_handler,
                        cbdata);
 
        /* Add result pipe waiting */
-       event_set (&cbdata->ev, cbdata->sp[0], EV_READ | EV_PERSIST,
-                       rspamd_lua_subprocess_io, cbdata);
-       event_base_set (cbdata->ev_base, &cbdata->ev);
-       /* TODO: maybe add timeout? */
-       event_add (&cbdata->ev, NULL);
+       ev_io_init (&cbdata->ev, rspamd_lua_subprocess_io, cbdata->sp[0], EV_READ);
+       cbdata->ev.data = cbdata;
+       ev_io_start (cbdata->event_loop, &cbdata->ev);
 
        return 0;
 }
index 4deb2f9337b5ea13c3e8b10b4648426fbc2a48a8..9048a26bd8618fc09838f6b4ee2c771e755ec341 100644 (file)
@@ -100,7 +100,7 @@ struct rspamd_worker {
 struct rspamd_abstract_worker_ctx {
        guint64 magic;
        /* Events base */
-       struct ev_loop *ev_base;
+       struct ev_loop *event_loop;
        /* DNS resolver */
        struct rspamd_dns_resolver *resolver;
        /* Config */
index a202d0917797b7d0619759cebd241023e55f0bce..62c781c0519d553cc513ff7338ffc6bffb94faf1 100644 (file)
@@ -97,7 +97,7 @@ rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
        if (cfg->on_term_scripts) {
                ctx = worker->ctx;
                /* Create a fake task object for async events */
-               task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->ev_base);
+               task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop);
                task->resolver = ctx->resolver;
                task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
                task->s = rspamd_session_create (task->task_pool,