diff options
-rw-r--r-- | src/lua/lua_config.c | 23 | ||||
-rw-r--r-- | src/lua/lua_http.c | 16 | ||||
-rw-r--r-- | src/lua/lua_redis.c | 55 | ||||
-rw-r--r-- | src/lua/lua_tcp.c | 124 | ||||
-rw-r--r-- | src/lua/lua_udp.c | 54 | ||||
-rw-r--r-- | src/lua/lua_util.c | 12 | ||||
-rw-r--r-- | src/lua/lua_worker.c | 40 | ||||
-rw-r--r-- | src/rspamd.h | 2 | ||||
-rw-r--r-- | src/worker.c | 2 |
9 files changed, 113 insertions, 215 deletions
diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c index 196f12e2c..05c38ff32 100644 --- a/src/lua/lua_config.c +++ b/src/lua/lua_config.c @@ -3054,15 +3054,15 @@ struct rspamd_lua_periodic { struct rspamd_config *cfg; lua_State *L; gdouble timeout; - struct event ev; + ev_timer ev; gint cbref; gboolean need_jitter; }; static void -lua_periodic_callback (gint unused_fd, short what, gpointer ud) +lua_periodic_callback (struct ev_loop *loop, ev_timer *w, int revents) { - struct rspamd_lua_periodic *periodic = ud; + struct rspamd_lua_periodic *periodic = (struct rspamd_lua_periodic *)w->data; struct rspamd_config **pcfg, *cfg; struct ev_loop **pev_base; struct thread_entry *thread; @@ -3084,7 +3084,6 @@ lua_periodic_callback (gint unused_fd, short what, gpointer ud) rspamd_lua_setclass (L, "rspamd{ev_base}", -1); *pev_base = periodic->ev_base; - event_del (&periodic->ev); lua_thread_call (thread, 2); } @@ -3094,7 +3093,6 @@ lua_periodic_callback_finish (struct thread_entry *thread, int ret) lua_State *L; struct rspamd_lua_periodic *periodic = thread->cd; gboolean plan_more = FALSE; - struct timeval tv; gdouble timeout = 0.0; L = thread->lua_state; @@ -3120,11 +3118,12 @@ lua_periodic_callback_finish (struct thread_entry *thread, int ret) timeout = rspamd_time_jitter (timeout, 0.0); } - double_to_tv (timeout, &tv); - event_add (&periodic->ev, &tv); + periodic->ev.repeat = timeout; + ev_timer_again (periodic->ev_base, &periodic->ev); } else { luaL_unref (L, LUA_REGISTRYINDEX, periodic->cbref); + ev_timer_stop (periodic->ev_base, &periodic->ev); g_free (periodic); } } @@ -3138,7 +3137,7 @@ lua_periodic_callback_error (struct thread_entry *thread, int ret, const char *m msg_err_config ("call to finishing script failed: %s", msg); - lua_periodic_callback_finish(thread, ret); + lua_periodic_callback_finish (thread, ret); } @@ -3149,7 +3148,6 @@ lua_config_add_periodic (lua_State *L) struct rspamd_config *cfg = lua_check_config (L, 1); struct ev_loop *ev_base = lua_check_ev_base (L, 2); gdouble timeout = lua_tonumber (L, 3); - struct timeval tv; struct rspamd_lua_periodic *periodic; gboolean need_jitter = FALSE; @@ -3169,15 +3167,14 @@ lua_config_add_periodic (lua_State *L) periodic->need_jitter = need_jitter; lua_pushvalue (L, 4); periodic->cbref = luaL_ref (L, LUA_REGISTRYINDEX); - event_set (&periodic->ev, -1, EV_TIMEOUT, lua_periodic_callback, periodic); - event_base_set (ev_base, &periodic->ev); if (need_jitter) { timeout = rspamd_time_jitter (timeout, 0.0); } - double_to_tv (timeout, &tv); - event_add (&periodic->ev, &tv); + ev_timer_init (&periodic->ev, lua_periodic_callback, timeout, 0.0); + periodic->ev.data = periodic; + ev_timer_start (ev_base, &periodic->ev); return 0; } diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c index 38072e177..ec42ab39e 100644 --- a/src/lua/lua_http.c +++ b/src/lua/lua_http.c @@ -67,10 +67,10 @@ struct lua_http_cbdata { struct rspamd_async_session *session; struct rspamd_symcache_item *item; struct rspamd_http_message *msg; - struct ev_loop *ev_base; + struct ev_loop *event_loop; struct rspamd_config *cfg; struct rspamd_task *task; - struct timeval tv; + ev_tstamp timeout; struct rspamd_cryptobox_keypair *local_kp; struct rspamd_cryptobox_pubkey *peer_pk; rspamd_inet_addr_t *addr; @@ -86,7 +86,7 @@ struct lua_http_cbdata { ref_entry_t ref; }; -static const int default_http_timeout = 5000; +static const gdouble default_http_timeout = 5.0; static struct rspamd_dns_resolver * lua_http_global_resolver (struct ev_loop *ev_base) @@ -451,7 +451,7 @@ lua_http_make_connection (struct lua_http_cbdata *cbd) rspamd_http_connection_write_message (cbd->conn, msg, cbd->host, cbd->mime_type, cbd, - &cbd->tv); + cbd->timeout); return TRUE; } @@ -717,7 +717,7 @@ lua_http_request (lua_State *L) lua_pushstring (L, "timeout"); lua_gettable (L, 1); if (lua_type (L, -1) == LUA_TNUMBER) { - timeout = lua_tonumber (L, -1) * 1000.; + timeout = lua_tonumber (L, -1); } lua_pop (L, 1); @@ -860,7 +860,7 @@ lua_http_request (lua_State *L) lua_gettable (L, 1); if (lua_type (L, -1) == LUA_TNUMBER) { - max_size = lua_tonumber (L, -1); + max_size = lua_tointeger (L, -1); } lua_pop (L, 1); @@ -943,9 +943,9 @@ lua_http_request (lua_State *L) cbd = g_malloc0 (sizeof (*cbd)); cbd->cbref = cbref; cbd->msg = msg; - cbd->ev_base = ev_base; + cbd->event_loop = ev_base; cbd->mime_type = mime_type; - msec_to_tv (timeout, &cbd->tv); + cbd->timeout = timeout; cbd->fd = -1; cbd->cfg = cfg; cbd->peer_pk = peer_key; diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index b96171d89..f39168a27 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -98,7 +98,7 @@ struct lua_redis_userdata { struct rspamd_task *task; struct rspamd_symcache_item *item; struct rspamd_async_session *s; - struct ev_loop *ev_base; + struct ev_loop *event_loop; struct rspamd_config *cfg; struct rspamd_redis_pool *pool; gchar *server; @@ -124,7 +124,7 @@ struct lua_redis_request_specific_userdata { struct lua_redis_userdata *c; struct lua_redis_ctx *ctx; struct lua_redis_request_specific_userdata *next; - struct event timeout; + ev_timer timeout_ev; guint flags; }; @@ -184,9 +184,7 @@ lua_redis_dtor (struct lua_redis_ctx *ctx) if (ud->ctx) { LL_FOREACH_SAFE (ud->specific, cur, tmp) { - if (rspamd_event_pending (&cur->timeout, EV_TIMEOUT)) { - event_del (&cur->timeout); - } + ev_timer_stop (ud->event_loop, &cur->timeout_ev); if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) { is_successful = FALSE; @@ -245,9 +243,7 @@ lua_redis_fin (void *arg) ctx = sp_ud->ctx; - if (rspamd_event_pending (&sp_ud->timeout, EV_TIMEOUT)) { - event_del (&sp_ud->timeout); - } + ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev); msg_debug ("finished redis query %p from session %p", sp_ud, ctx); sp_ud->flags |= LUA_REDIS_SPECIFIC_FINISHED; @@ -556,10 +552,7 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv) return; } - if (rspamd_event_pending (&sp_ud->timeout, EV_TIMEOUT)) { - event_del (&sp_ud->timeout); - } - + ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev); msg_debug ("got reply from redis: %p for query %p", ac, sp_ud); struct lua_redis_result *result = g_malloc0 (sizeof *result); @@ -630,9 +623,10 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv) } static void -lua_redis_timeout_sync (int fd, short what, gpointer priv) +lua_redis_timeout_sync (EV_P_ ev_timer *w, int revents) { - struct lua_redis_request_specific_userdata *sp_ud = priv; + struct lua_redis_request_specific_userdata *sp_ud = + (struct lua_redis_request_specific_userdata *)w->data; struct lua_redis_ctx *ctx = sp_ud->ctx; redisAsyncContext *ac; @@ -657,9 +651,10 @@ lua_redis_timeout_sync (int fd, short what, gpointer priv) } static void -lua_redis_timeout (int fd, short what, gpointer u) +lua_redis_timeout (EV_P_ ev_timer *w, int revents) { - struct lua_redis_request_specific_userdata *sp_ud = u; + struct lua_redis_request_specific_userdata *sp_ud = + (struct lua_redis_request_specific_userdata *)w->data; struct lua_redis_ctx *ctx; redisAsyncContext *ac; @@ -790,9 +785,9 @@ lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd, static struct lua_redis_ctx * rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_async) { - struct lua_redis_ctx *ctx; + struct lua_redis_ctx *ctx = NULL; rspamd_inet_addr_t *ip = NULL; - struct lua_redis_userdata *ud; + struct lua_redis_userdata *ud = NULL; struct rspamd_lua_ip *addr = NULL; struct rspamd_task *task = NULL; const gchar *host; @@ -933,7 +928,7 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_asy ud->s = session; ud->cfg = cfg; ud->pool = cfg->redis_pool; - ud->ev_base = ev_base; + ud->event_loop = ev_base; ud->task = task; if (task) { @@ -1009,7 +1004,6 @@ lua_redis_make_request (lua_State *L) struct lua_redis_userdata *ud; struct lua_redis_ctx *ctx, **pctx; const gchar *cmd = NULL; - struct timeval tv; gdouble timeout = REDIS_DEFAULT_TIMEOUT; gint cbref = -1; gboolean ret = FALSE; @@ -1064,10 +1058,9 @@ lua_redis_make_request (lua_State *L) REDIS_RETAIN (ctx); /* Cleared by fin event */ ctx->cmds_pending ++; - double_to_tv (timeout, &tv); - event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud); - event_base_set (ud->ev_base, &sp_ud->timeout); - event_add (&sp_ud->timeout, &tv); + sp_ud->timeout_ev.data = sp_ud; + ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout, timeout, 0.0); + ev_timer_start (ud->event_loop, &sp_ud->timeout_ev); ret = TRUE; } else { @@ -1347,7 +1340,6 @@ lua_redis_add_cmd (lua_State *L) const gchar *cmd = NULL; gint args_pos = 2; gint cbref = -1, ret; - struct timeval tv; if (ctx) { if (ctx->flags & LUA_REDIS_TERMINATED) { @@ -1426,19 +1418,18 @@ lua_redis_add_cmd (lua_State *L) } } - double_to_tv (sp_ud->c->timeout, &tv); + sp_ud->timeout_ev.data = sp_ud; if (IS_ASYNC (ctx)) { - event_set (&sp_ud->timeout, -1, EV_TIMEOUT, - lua_redis_timeout, sp_ud); + ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout, + sp_ud->c->timeout, 0.0); } else { - event_set (&sp_ud->timeout, -1, EV_TIMEOUT, - lua_redis_timeout_sync, sp_ud); + ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout_sync, + sp_ud->c->timeout, 0.0); } - event_base_set (ud->ev_base, &sp_ud->timeout); - event_add (&sp_ud->timeout, &tv); + ev_timer_start (ud->event_loop, &sp_ud->timeout_ev); REDIS_RETAIN (ctx); ctx->cmds_pending ++; } diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c index cad0e6556..4d1c205cf 100644 --- a/src/lua/lua_tcp.c +++ b/src/lua/lua_tcp.c @@ -159,13 +159,6 @@ LUA_FUNCTION_DEF (tcp, connect_sync); * Closes TCP connection */ LUA_FUNCTION_DEF (tcp, close); -/*** - * @method tcp:set_timeout(seconds) - * - * Sets new timeout for a TCP connection in **seconds** - * @param {number} seconds floating point value that specifies new timeout - */ -LUA_FUNCTION_DEF (tcp, set_timeout); /*** * @method tcp:add_read(callback, [pattern]) @@ -210,7 +203,6 @@ static const struct luaL_reg tcp_libf[] = { static const struct luaL_reg tcp_libm[] = { LUA_INTERFACE_DEF (tcp, close), - LUA_INTERFACE_DEF (tcp, set_timeout), LUA_INTERFACE_DEF (tcp, add_read), LUA_INTERFACE_DEF (tcp, add_write), LUA_INTERFACE_DEF (tcp, shift_callback), @@ -227,13 +219,6 @@ static const struct luaL_reg tcp_libm[] = { LUA_FUNCTION_DEF (tcp_sync, close); /*** - * @method set_timeout(seconds) - * - * Sets timeout for IO operations - */ -LUA_FUNCTION_DEF (tcp_sync, set_timeout); - -/*** * @method read_once() * * Performs one read operation. If syscall returned with EAGAIN/EINT, @@ -270,7 +255,6 @@ static void lua_tcp_sync_session_dtor (gpointer ud); static const struct luaL_reg tcp_sync_libm[] = { LUA_INTERFACE_DEF (tcp_sync, close), - LUA_INTERFACE_DEF (tcp_sync, set_timeout), LUA_INTERFACE_DEF (tcp_sync, read_once), LUA_INTERFACE_DEF (tcp_sync, write), LUA_INTERFACE_DEF (tcp_sync, eof), @@ -342,8 +326,7 @@ struct lua_tcp_dtor { struct lua_tcp_cbdata { struct rspamd_async_session *session; struct rspamd_async_event *async_ev; - struct ev_loop *ev_base; - struct timeval tv; + struct ev_loop *event_loop; rspamd_inet_addr_t *addr; GByteArray *in; GQueue *handlers; @@ -352,7 +335,7 @@ struct lua_tcp_cbdata { guint port; guint flags; gchar tag[7]; - struct event ev; + struct rspamd_io_ev ev; struct lua_tcp_dtor *dtors; ref_entry_t ref; struct rspamd_task *task; @@ -381,7 +364,7 @@ static void lua_tcp_unregister_event (struct lua_tcp_cbdata *cbd); static void lua_tcp_void_finalyser (gpointer arg) {} -static const int default_tcp_timeout = 5000; +static const gdouble default_tcp_timeout = 5.0; static struct rspamd_dns_resolver * lua_tcp_global_resolver (struct ev_loop *ev_base, @@ -467,7 +450,7 @@ lua_tcp_fin (gpointer arg) } if (cbd->fd != -1) { - event_del (&cbd->ev); + rspamd_ev_watcher_stop (cbd->event_loop, &cbd->ev); close (cbd->fd); cbd->fd = -1; } @@ -755,15 +738,7 @@ lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) static void lua_tcp_plan_read (struct lua_tcp_cbdata *cbd) { - event_del (&cbd->ev); -#ifdef EV_CLOSED - event_set (&cbd->ev, cbd->fd, EV_READ|EV_CLOSED, - lua_tcp_handler, cbd); -#else - event_set (&cbd->ev, cbd->fd, EV_READ, lua_tcp_handler, cbd); -#endif - event_base_set (cbd->ev_base, &cbd->ev); - event_add (&cbd->ev, &cbd->tv); + rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, EV_READ); } static void @@ -867,7 +842,6 @@ lua_tcp_write_helper (struct lua_tcp_cbdata *cbd) } else { /* Want to write more */ - event_add (&cbd->ev, &cbd->tv); } return; @@ -1149,9 +1123,8 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read, msg_debug_tcp ("plan new read"); if (can_read) { /* We need to plan a new event */ - event_set (&cbd->ev, cbd->fd, EV_READ, lua_tcp_handler, cbd); - event_base_set (cbd->ev_base, &cbd->ev); - event_add (&cbd->ev, &cbd->tv); + rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, + EV_READ); } else { /* Cannot read more */ @@ -1172,9 +1145,8 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read, if (hdl->h.w.pos < hdl->h.w.total_bytes) { msg_debug_tcp ("plan new write"); if (can_write) { - event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd); - event_base_set (cbd->ev_base, &cbd->ev); - event_add (&cbd->ev, &cbd->tv); + rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, + EV_WRITE); } else { /* Cannot write more */ @@ -1192,9 +1164,8 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read, } else { /* LUA_WANT_CONNECT */ msg_debug_tcp ("plan new connect"); - event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd); - event_base_set (cbd->ev_base, &cbd->ev); - event_add (&cbd->ev, &cbd->tv); + rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, + EV_WRITE); } } } @@ -1289,12 +1260,11 @@ lua_tcp_make_connection (struct lua_tcp_cbdata *cbd) verify_peer = TRUE; } - event_base_set (cbd->ev_base, &cbd->ev); cbd->ssl_conn = - rspamd_ssl_connection_new (ssl_ctx, cbd->ev_base, verify_peer); + rspamd_ssl_connection_new (ssl_ctx, cbd->event_loop, verify_peer); if (!rspamd_ssl_connect_fd (cbd->ssl_conn, fd, cbd->hostname, &cbd->ev, - &cbd->tv, lua_tcp_handler, lua_tcp_ssl_on_error, cbd)) { + cbd->ev.timeout, lua_tcp_handler, lua_tcp_ssl_on_error, cbd)) { lua_tcp_push_error (cbd, TRUE, "ssl connection failed: %s", strerror (errno)); @@ -1431,7 +1401,7 @@ lua_tcp_request (lua_State *L) guint port; gint cbref, tp, conn_cbref = -1; gsize plen = 0; - struct ev_loop *ev_base; + struct ev_loop *event_loop = NULL; struct lua_tcp_cbdata *cbd; struct rspamd_dns_resolver *resolver = NULL; struct rspamd_async_session *session = NULL; @@ -1453,7 +1423,7 @@ lua_tcp_request (lua_State *L) lua_pushstring (L, "port"); lua_gettable (L, -2); if (lua_type (L, -1) == LUA_TNUMBER) { - port = luaL_checknumber (L, -1); + port = lua_tointeger (L, -1); } else { /* We assume that it is a unix socket */ @@ -1478,7 +1448,7 @@ lua_tcp_request (lua_State *L) lua_gettable (L, -2); if (lua_type (L, -1) == LUA_TUSERDATA) { task = lua_check_task (L, -1); - ev_base = task->event_loop; + event_loop = task->event_loop; resolver = task->resolver; session = task->s; cfg = task->cfg; @@ -1489,10 +1459,10 @@ lua_tcp_request (lua_State *L) lua_pushstring (L, "ev_base"); lua_gettable (L, -2); if (rspamd_lua_check_udata_maybe (L, -1, "rspamd{ev_base}")) { - ev_base = *(struct ev_loop **)lua_touserdata (L, -1); + event_loop = *(struct ev_loop **)lua_touserdata (L, -1); } else { - ev_base = NULL; + event_loop = ev_default_loop (0); } lua_pop (L, 1); @@ -1522,7 +1492,7 @@ lua_tcp_request (lua_State *L) resolver = *(struct rspamd_dns_resolver **)lua_touserdata (L, -1); } else { - resolver = lua_tcp_global_resolver (ev_base, cfg); + resolver = lua_tcp_global_resolver (event_loop, cfg); } lua_pop (L, 1); } @@ -1530,7 +1500,7 @@ lua_tcp_request (lua_State *L) lua_pushstring (L, "timeout"); lua_gettable (L, -2); if (lua_type (L, -1) == LUA_TNUMBER) { - timeout = lua_tonumber (L, -1) * 1000.; + timeout = lua_tonumber (L, -1); } lua_pop (L, 1); @@ -1691,10 +1661,10 @@ lua_tcp_request (lua_State *L) g_queue_push_tail (cbd->handlers, wh); } - cbd->ev_base = ev_base; - msec_to_tv (timeout, &cbd->tv); + cbd->event_loop = event_loop; cbd->fd = -1; cbd->port = port; + cbd->ev.timeout = timeout; if (ssl) { cbd->flags |= LUA_TCP_FLAG_SSL; @@ -1881,9 +1851,8 @@ lua_tcp_connect_sync (lua_State *L) rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h); cbd->handlers = g_queue_new (); - cbd->ev_base = ev_base; + cbd->event_loop = ev_base; cbd->flags |= LUA_TCP_FLAG_SYNC; - double_to_tv (timeout, &cbd->tv); cbd->fd = -1; cbd->port = (guint16)port; @@ -1980,25 +1949,6 @@ lua_tcp_close (lua_State *L) } static gint -lua_tcp_set_timeout (lua_State *L) -{ - LUA_TRACE_POINT; - struct lua_tcp_cbdata *cbd = lua_check_tcp (L, 1); - gdouble seconds = lua_tonumber (L, 2); - - if (cbd == NULL) { - return luaL_error (L, "invalid arguments"); - } - if (!lua_isnumber (L, 2)) { - return luaL_error (L, "invalid arguments: 'seconds' is expected to be number"); - } - - double_to_tv (seconds, &cbd->tv); - - return 0; -} - -static gint lua_tcp_add_read (lua_State *L) { LUA_TRACE_POINT; @@ -2159,7 +2109,7 @@ lua_tcp_sync_close (lua_State *L) cbd->flags |= LUA_TCP_FLAG_FINISHED; if (cbd->fd != -1) { - event_del (&cbd->ev); + rspamd_ev_watcher_stop (cbd->event_loop, &cbd->ev); close (cbd->fd); cbd->fd = -1; } @@ -2175,7 +2125,7 @@ lua_tcp_sync_session_dtor (gpointer ud) if (cbd->fd != -1) { msg_debug ("closing sync TCP connection"); - event_del (&cbd->ev); + rspamd_ev_watcher_stop (cbd->event_loop, &cbd->ev); close (cbd->fd); cbd->fd = -1; } @@ -2188,25 +2138,6 @@ lua_tcp_sync_session_dtor (gpointer ud) } static int -lua_tcp_sync_set_timeout (lua_State *L) -{ - LUA_TRACE_POINT; - struct lua_tcp_cbdata *cbd = lua_check_sync_tcp (L, 1); - gdouble seconds = lua_tonumber (L, 2); - - if (cbd == NULL) { - return luaL_error (L, "invalid arguments: self is not rspamd{tcp_sync}"); - } - if (lua_type (L, 2) != LUA_TNUMBER) { - return luaL_error (L, "invalid arguments: second parameter is expected to be number"); - } - - double_to_tv (seconds, &cbd->tv); - - return 0; -} - -static int lua_tcp_sync_read_once (lua_State *L) { LUA_TRACE_POINT; @@ -2363,12 +2294,11 @@ lua_tcp_starttls (lua_State * L) verify_peer = TRUE; } - event_base_set (cbd->ev_base, &cbd->ev); cbd->ssl_conn = - rspamd_ssl_connection_new (ssl_ctx, cbd->ev_base, verify_peer); + rspamd_ssl_connection_new (ssl_ctx, cbd->event_loop, verify_peer); if (!rspamd_ssl_connect_fd (cbd->ssl_conn, cbd->fd, cbd->hostname, &cbd->ev, - &cbd->tv, lua_tcp_handler, lua_tcp_ssl_on_error, cbd)) { + cbd->ev.timeout, lua_tcp_handler, lua_tcp_ssl_on_error, cbd)) { lua_tcp_push_error (cbd, TRUE, "ssl connection failed: %s", strerror (errno)); } diff --git a/src/lua/lua_udp.c b/src/lua/lua_udp.c index fdbc0f36d..94d27bf63 100644 --- a/src/lua/lua_udp.c +++ b/src/lua/lua_udp.c @@ -18,6 +18,7 @@ #include "utlist.h" #include "unix-std.h" #include <math.h> +#include <src/libutil/libev_helper.h> static const gchar *M = "rspamd lua udp"; @@ -59,9 +60,8 @@ static const struct luaL_reg udp_libf[] = { }; struct lua_udp_cbdata { - struct event io; - struct timeval tv; - struct ev_loop *ev_base; + struct ev_loop *event_loop; + struct rspamd_io_ev ev; struct rspamd_async_event *async_ev; struct rspamd_task *task; rspamd_mempool_t *pool; @@ -115,10 +115,7 @@ lua_udp_cbd_fin (gpointer p) struct lua_udp_cbdata *cbd = (struct lua_udp_cbdata *)p; if (cbd->sock != -1) { - if (cbd->io.ev_base != NULL) { - event_del (&cbd->io); - } - + rspamd_ev_watcher_stop (cbd->event_loop, &cbd->ev); close (cbd->sock); } @@ -264,16 +261,12 @@ lua_udp_io_handler (gint fd, short what, gpointer p) L = cbd->L; - event_del (&cbd->io); - if (what == EV_TIMEOUT) { if (cbd->sent && cbd->retransmits > 0) { r = lua_try_send_request (cbd); if (r == RSPAMD_SENT_OK) { - event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd); - event_base_set (cbd->ev_base, &cbd->io); - event_add (&cbd->io, &cbd->tv); + rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, EV_READ); lua_udp_maybe_register_event (cbd); cbd->retransmits --; } @@ -282,9 +275,7 @@ lua_udp_io_handler (gint fd, short what, gpointer p) } else { cbd->retransmits --; - event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd); - event_base_set (cbd->ev_base, &cbd->io); - event_add (&cbd->io, &cbd->tv); + rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, EV_WRITE); } } else { @@ -301,9 +292,7 @@ lua_udp_io_handler (gint fd, short what, gpointer p) if (r == RSPAMD_SENT_OK) { if (cbd->cbref != -1) { - event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd); - event_base_set (cbd->ev_base, &cbd->io); - event_add (&cbd->io, &cbd->tv); + rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, EV_READ); cbd->sent = TRUE; } else { @@ -315,9 +304,7 @@ lua_udp_io_handler (gint fd, short what, gpointer p) } else { cbd->retransmits --; - event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd); - event_base_set (cbd->ev_base, &cbd->io); - event_add (&cbd->io, &cbd->tv); + rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, EV_WRITE); } } else if (what == EV_READ) { @@ -371,7 +358,7 @@ lua_udp_sendto (lua_State *L) { lua_gettable (L, -2); if (lua_type (L, -1) == LUA_TNUMBER) { - port = luaL_checknumber (L, -1); + port = lua_tointeger (L, -1); } else { /* We assume that it is a unix socket */ @@ -472,22 +459,15 @@ lua_udp_sendto (lua_State *L) { } - - if (!ev_base || !pool) { - rspamd_inet_address_free (addr); - - return luaL_error (L, "invalid arguments"); - } - cbd = rspamd_mempool_alloc0 (pool, sizeof (*cbd)); - cbd->ev_base = ev_base; + cbd->event_loop = ev_base; cbd->pool = pool; cbd->s = session; cbd->addr = addr; cbd->sock = rspamd_socket_create (rspamd_inet_address_get_af (addr), SOCK_DGRAM, 0, TRUE); cbd->cbref = -1; - double_to_tv (timeout, &cbd->tv); + cbd->ev.timeout = timeout; if (cbd->sock == -1) { rspamd_inet_address_free (addr); @@ -555,9 +535,9 @@ lua_udp_sendto (lua_State *L) { return 2; } - event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd); - event_base_set (cbd->ev_base, &cbd->io); - event_add (&cbd->io, &cbd->tv); + rspamd_ev_watcher_init (&cbd->ev, cbd->sock, EV_READ, + lua_udp_io_handler, cbd); + rspamd_ev_watcher_start (cbd->event_loop, &cbd->ev, timeout); cbd->sent = TRUE; } @@ -571,9 +551,9 @@ lua_udp_sendto (lua_State *L) { return 2; } else { - event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd); - event_base_set (cbd->ev_base, &cbd->io); - event_add (&cbd->io, &cbd->tv); + rspamd_ev_watcher_init (&cbd->ev, cbd->sock, EV_WRITE, + lua_udp_io_handler, cbd); + rspamd_ev_watcher_start (cbd->event_loop, &cbd->ev, timeout); if (!lua_udp_maybe_register_event (cbd)) { lua_pushboolean (L, false); diff --git a/src/lua/lua_util.c b/src/lua/lua_util.c index 8ef0bc2fb..8e6403972 100644 --- a/src/lua/lua_util.c +++ b/src/lua/lua_util.c @@ -699,7 +699,7 @@ lua_util_create_event_base (lua_State *L) pev_base = lua_newuserdata (L, sizeof (struct ev_loop *)); rspamd_lua_setclass (L, "rspamd{ev_base}", -1); - *pev_base = event_init (); + *pev_base = ev_default_loop (EVFLAG_SIGNALFD); return 1; } @@ -848,7 +848,7 @@ lua_util_process_message (lua_State *L) message = luaL_checklstring (L, 2, &mlen); if (cfg != NULL && message != NULL) { - base = event_init (); + base = ev_loop_new (EVFLAG_SIGNALFD); rspamd_init_filters (cfg, FALSE); task = rspamd_task_new (NULL, cfg, NULL, NULL, base); task->msg.begin = rspamd_mempool_alloc (task->task_pool, mlen); @@ -865,7 +865,7 @@ lua_util_process_message (lua_State *L) } else { if (rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL)) { - event_base_loop (base, 0); + ev_loop (base, 0); if (res != NULL) { ucl_object_push_lua (L, res, true); @@ -885,7 +885,7 @@ lua_util_process_message (lua_State *L) } } - event_base_free (base); + ev_loop_destroy (base); } else { lua_pushnil (L); @@ -3799,10 +3799,10 @@ lua_ev_base_loop (lua_State *L) ev_base = lua_check_ev_base (L, 1); if (lua_isnumber (L, 2)) { - flags = lua_tonumber (L, 2); + flags = lua_tointeger (L, 2); } - int ret = event_base_loop (ev_base, flags); + int ret = ev_run (ev_base, flags); lua_pushinteger (L, ret); return 1; diff --git a/src/lua/lua_worker.c b/src/lua/lua_worker.c index d876d0879..7dbefc6be 100644 --- a/src/lua/lua_worker.c +++ b/src/lua/lua_worker.c @@ -282,8 +282,8 @@ struct rspamd_lua_process_cbdata { GString *out_buf; goffset out_pos; struct rspamd_worker *wrk; - struct ev_loop *ev_base; - struct event ev; + struct ev_loop *event_loop; + ev_io ev; }; static void @@ -393,9 +393,9 @@ rspamd_lua_cld_handler (struct rspamd_worker_signal_handler *sigh, void *ud) if (!cbdata->replied) { /* We still need to call on_complete callback */ + ev_io_stop (cbdata->event_loop, &cbdata->ev); rspamd_lua_call_on_complete (cbdata->L, cbdata, "Worker has died without reply", NULL, 0); - event_del (&cbdata->ev); } /* Free structures */ @@ -414,7 +414,7 @@ rspamd_lua_cld_handler (struct rspamd_worker_signal_handler *sigh, void *ud) srv_cmd.cmd.on_fork.state = child_dead; srv_cmd.cmd.on_fork.cpid = cbdata->cpid; srv_cmd.cmd.on_fork.ppid = getpid (); - rspamd_srv_send_command (cbdata->wrk, cbdata->ev_base, &srv_cmd, -1, + rspamd_srv_send_command (cbdata->wrk, cbdata->event_loop, &srv_cmd, -1, NULL, NULL); g_free (cbdata); @@ -423,9 +423,10 @@ rspamd_lua_cld_handler (struct rspamd_worker_signal_handler *sigh, void *ud) } static void -rspamd_lua_subprocess_io (gint fd, short what, gpointer ud) +rspamd_lua_subprocess_io (EV_P_ ev_io *w, int revents) { - struct rspamd_lua_process_cbdata *cbdata = ud; + struct rspamd_lua_process_cbdata *cbdata = + (struct rspamd_lua_process_cbdata *)w->data; gssize r; if (cbdata->sz == (guint64)-1) { @@ -436,9 +437,9 @@ rspamd_lua_subprocess_io (gint fd, short what, gpointer ud) sizeof (guint64) - cbdata->io_buf->len); if (r == 0) { + ev_io_stop (cbdata->event_loop, &cbdata->ev); rspamd_lua_call_on_complete (cbdata->L, cbdata, "Unexpected EOF", NULL, 0); - event_del (&cbdata->ev); cbdata->replied = TRUE; kill (cbdata->cpid, SIGTERM); @@ -449,9 +450,9 @@ rspamd_lua_subprocess_io (gint fd, short what, gpointer ud) return; } else { + ev_io_stop (cbdata->event_loop, &cbdata->ev); rspamd_lua_call_on_complete (cbdata->L, cbdata, strerror (errno), NULL, 0); - event_del (&cbdata->ev); cbdata->replied = TRUE; kill (cbdata->cpid, SIGTERM); @@ -481,9 +482,9 @@ rspamd_lua_subprocess_io (gint fd, short what, gpointer ud) cbdata->sz - cbdata->io_buf->len); if (r == 0) { + ev_io_stop (cbdata->event_loop, &cbdata->ev); rspamd_lua_call_on_complete (cbdata->L, cbdata, "Unexpected EOF", NULL, 0); - event_del (&cbdata->ev); cbdata->replied = TRUE; kill (cbdata->cpid, SIGTERM); @@ -494,9 +495,9 @@ rspamd_lua_subprocess_io (gint fd, short what, gpointer ud) return; } else { + ev_io_stop (cbdata->event_loop, &cbdata->ev); rspamd_lua_call_on_complete (cbdata->L, cbdata, strerror (errno), NULL, 0); - event_del (&cbdata->ev); cbdata->replied = TRUE; kill (cbdata->cpid, SIGTERM); @@ -509,6 +510,7 @@ rspamd_lua_subprocess_io (gint fd, short what, gpointer ud) if (cbdata->io_buf->len == cbdata->sz) { gchar rep[4]; + ev_io_stop (cbdata->event_loop, &cbdata->ev); /* Finished reading data */ if (cbdata->is_error) { cbdata->io_buf->str[cbdata->io_buf->len] = '\0'; @@ -520,7 +522,6 @@ rspamd_lua_subprocess_io (gint fd, short what, gpointer ud) NULL, cbdata->io_buf->str, cbdata->io_buf->len); } - event_del (&cbdata->ev); cbdata->replied = TRUE; /* Write reply to the child */ @@ -577,7 +578,7 @@ lua_worker_spawn_process (lua_State *L) actx = w->ctx; cbdata->wrk = w; cbdata->L = L; - cbdata->ev_base = actx->ev_base; + cbdata->event_loop = actx->event_loop; cbdata->sz = (guint64)-1; pid = fork (); @@ -612,7 +613,8 @@ lua_worker_spawn_process (lua_State *L) close (cbdata->sp[0]); /* Here we assume that we can block on writing results */ rspamd_socket_blocking (cbdata->sp[1]); - event_reinit (cbdata->ev_base); + ev_loop_destroy (EV_DEFAULT); + cbdata->event_loop = ev_default_loop (EVFLAG_SIGNALFD); g_hash_table_remove_all (w->signal_events); rspamd_worker_unblock_signals (); rspamd_lua_execute_lua_subprocess (L, cbdata); @@ -639,21 +641,19 @@ lua_worker_spawn_process (lua_State *L) srv_cmd.cmd.on_fork.state = child_create; srv_cmd.cmd.on_fork.cpid = pid; srv_cmd.cmd.on_fork.ppid = getpid (); - rspamd_srv_send_command (w, cbdata->ev_base, &srv_cmd, -1, NULL, NULL); + rspamd_srv_send_command (w, cbdata->event_loop, &srv_cmd, -1, NULL, NULL); close (cbdata->sp[1]); rspamd_socket_nonblocking (cbdata->sp[0]); /* Parent */ - rspamd_worker_set_signal_handler (SIGCHLD, w, cbdata->ev_base, + rspamd_worker_set_signal_handler (SIGCHLD, w, cbdata->event_loop, rspamd_lua_cld_handler, cbdata); /* Add result pipe waiting */ - event_set (&cbdata->ev, cbdata->sp[0], EV_READ | EV_PERSIST, - rspamd_lua_subprocess_io, cbdata); - event_base_set (cbdata->ev_base, &cbdata->ev); - /* TODO: maybe add timeout? */ - event_add (&cbdata->ev, NULL); + ev_io_init (&cbdata->ev, rspamd_lua_subprocess_io, cbdata->sp[0], EV_READ); + cbdata->ev.data = cbdata; + ev_io_start (cbdata->event_loop, &cbdata->ev); return 0; } diff --git a/src/rspamd.h b/src/rspamd.h index 4deb2f933..9048a26bd 100644 --- a/src/rspamd.h +++ b/src/rspamd.h @@ -100,7 +100,7 @@ struct rspamd_worker { struct rspamd_abstract_worker_ctx { guint64 magic; /* Events base */ - struct ev_loop *ev_base; + struct ev_loop *event_loop; /* DNS resolver */ struct rspamd_dns_resolver *resolver; /* Config */ diff --git a/src/worker.c b/src/worker.c index a202d0917..62c781c05 100644 --- a/src/worker.c +++ b/src/worker.c @@ -97,7 +97,7 @@ rspamd_worker_call_finish_handlers (struct rspamd_worker *worker) if (cfg->on_term_scripts) { ctx = worker->ctx; /* Create a fake task object for async events */ - task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->ev_base); + task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop); task->resolver = ctx->resolver; task->flags |= RSPAMD_TASK_FLAG_PROCESSING; task->s = rspamd_session_create (task->task_pool, |