struct rspamd_config *cfg;
lua_State *L;
gdouble timeout;
- struct event ev;
+ ev_timer ev;
gint cbref;
gboolean need_jitter;
};
static void
-lua_periodic_callback (gint unused_fd, short what, gpointer ud)
+lua_periodic_callback (struct ev_loop *loop, ev_timer *w, int revents)
{
- struct rspamd_lua_periodic *periodic = ud;
+ struct rspamd_lua_periodic *periodic = (struct rspamd_lua_periodic *)w->data;
struct rspamd_config **pcfg, *cfg;
struct ev_loop **pev_base;
struct thread_entry *thread;
rspamd_lua_setclass (L, "rspamd{ev_base}", -1);
*pev_base = periodic->ev_base;
- event_del (&periodic->ev);
lua_thread_call (thread, 2);
}
lua_State *L;
struct rspamd_lua_periodic *periodic = thread->cd;
gboolean plan_more = FALSE;
- struct timeval tv;
gdouble timeout = 0.0;
L = thread->lua_state;
timeout = rspamd_time_jitter (timeout, 0.0);
}
- double_to_tv (timeout, &tv);
- event_add (&periodic->ev, &tv);
+ periodic->ev.repeat = timeout;
+ ev_timer_again (periodic->ev_base, &periodic->ev);
}
else {
luaL_unref (L, LUA_REGISTRYINDEX, periodic->cbref);
+ ev_timer_stop (periodic->ev_base, &periodic->ev);
g_free (periodic);
}
}
msg_err_config ("call to finishing script failed: %s", msg);
- lua_periodic_callback_finish(thread, ret);
+ lua_periodic_callback_finish (thread, ret);
}
struct rspamd_config *cfg = lua_check_config (L, 1);
struct ev_loop *ev_base = lua_check_ev_base (L, 2);
gdouble timeout = lua_tonumber (L, 3);
- struct timeval tv;
struct rspamd_lua_periodic *periodic;
gboolean need_jitter = FALSE;
periodic->need_jitter = need_jitter;
lua_pushvalue (L, 4);
periodic->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
- event_set (&periodic->ev, -1, EV_TIMEOUT, lua_periodic_callback, periodic);
- event_base_set (ev_base, &periodic->ev);
if (need_jitter) {
timeout = rspamd_time_jitter (timeout, 0.0);
}
- double_to_tv (timeout, &tv);
- event_add (&periodic->ev, &tv);
+ ev_timer_init (&periodic->ev, lua_periodic_callback, timeout, 0.0);
+ periodic->ev.data = periodic;
+ ev_timer_start (ev_base, &periodic->ev);
return 0;
}
struct rspamd_async_session *session;
struct rspamd_symcache_item *item;
struct rspamd_http_message *msg;
- struct ev_loop *ev_base;
+ struct ev_loop *event_loop;
struct rspamd_config *cfg;
struct rspamd_task *task;
- struct timeval tv;
+ ev_tstamp timeout;
struct rspamd_cryptobox_keypair *local_kp;
struct rspamd_cryptobox_pubkey *peer_pk;
rspamd_inet_addr_t *addr;
ref_entry_t ref;
};
-static const int default_http_timeout = 5000;
+static const gdouble default_http_timeout = 5.0;
static struct rspamd_dns_resolver *
lua_http_global_resolver (struct ev_loop *ev_base)
rspamd_http_connection_write_message (cbd->conn, msg,
cbd->host, cbd->mime_type, cbd,
- &cbd->tv);
+ cbd->timeout);
return TRUE;
}
lua_pushstring (L, "timeout");
lua_gettable (L, 1);
if (lua_type (L, -1) == LUA_TNUMBER) {
- timeout = lua_tonumber (L, -1) * 1000.;
+ timeout = lua_tonumber (L, -1);
}
lua_pop (L, 1);
lua_gettable (L, 1);
if (lua_type (L, -1) == LUA_TNUMBER) {
- max_size = lua_tonumber (L, -1);
+ max_size = lua_tointeger (L, -1);
}
lua_pop (L, 1);
cbd = g_malloc0 (sizeof (*cbd));
cbd->cbref = cbref;
cbd->msg = msg;
- cbd->ev_base = ev_base;
+ cbd->event_loop = ev_base;
cbd->mime_type = mime_type;
- msec_to_tv (timeout, &cbd->tv);
+ cbd->timeout = timeout;
cbd->fd = -1;
cbd->cfg = cfg;
cbd->peer_pk = peer_key;
struct rspamd_task *task;
struct rspamd_symcache_item *item;
struct rspamd_async_session *s;
- struct ev_loop *ev_base;
+ struct ev_loop *event_loop;
struct rspamd_config *cfg;
struct rspamd_redis_pool *pool;
gchar *server;
struct lua_redis_userdata *c;
struct lua_redis_ctx *ctx;
struct lua_redis_request_specific_userdata *next;
- struct event timeout;
+ ev_timer timeout_ev;
guint flags;
};
if (ud->ctx) {
LL_FOREACH_SAFE (ud->specific, cur, tmp) {
- if (rspamd_event_pending (&cur->timeout, EV_TIMEOUT)) {
- event_del (&cur->timeout);
- }
+ ev_timer_stop (ud->event_loop, &cur->timeout_ev);
if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
is_successful = FALSE;
ctx = sp_ud->ctx;
- if (rspamd_event_pending (&sp_ud->timeout, EV_TIMEOUT)) {
- event_del (&sp_ud->timeout);
- }
+ ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev);
msg_debug ("finished redis query %p from session %p", sp_ud, ctx);
sp_ud->flags |= LUA_REDIS_SPECIFIC_FINISHED;
return;
}
- if (rspamd_event_pending (&sp_ud->timeout, EV_TIMEOUT)) {
- event_del (&sp_ud->timeout);
- }
-
+ ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev);
msg_debug ("got reply from redis: %p for query %p", ac, sp_ud);
struct lua_redis_result *result = g_malloc0 (sizeof *result);
}
static void
-lua_redis_timeout_sync (int fd, short what, gpointer priv)
+lua_redis_timeout_sync (EV_P_ ev_timer *w, int revents)
{
- struct lua_redis_request_specific_userdata *sp_ud = priv;
+ struct lua_redis_request_specific_userdata *sp_ud =
+ (struct lua_redis_request_specific_userdata *)w->data;
struct lua_redis_ctx *ctx = sp_ud->ctx;
redisAsyncContext *ac;
}
static void
-lua_redis_timeout (int fd, short what, gpointer u)
+lua_redis_timeout (EV_P_ ev_timer *w, int revents)
{
- struct lua_redis_request_specific_userdata *sp_ud = u;
+ struct lua_redis_request_specific_userdata *sp_ud =
+ (struct lua_redis_request_specific_userdata *)w->data;
struct lua_redis_ctx *ctx;
redisAsyncContext *ac;
static struct lua_redis_ctx *
rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_async)
{
- struct lua_redis_ctx *ctx;
+ struct lua_redis_ctx *ctx = NULL;
rspamd_inet_addr_t *ip = NULL;
- struct lua_redis_userdata *ud;
+ struct lua_redis_userdata *ud = NULL;
struct rspamd_lua_ip *addr = NULL;
struct rspamd_task *task = NULL;
const gchar *host;
ud->s = session;
ud->cfg = cfg;
ud->pool = cfg->redis_pool;
- ud->ev_base = ev_base;
+ ud->event_loop = ev_base;
ud->task = task;
if (task) {
struct lua_redis_userdata *ud;
struct lua_redis_ctx *ctx, **pctx;
const gchar *cmd = NULL;
- struct timeval tv;
gdouble timeout = REDIS_DEFAULT_TIMEOUT;
gint cbref = -1;
gboolean ret = FALSE;
REDIS_RETAIN (ctx); /* Cleared by fin event */
ctx->cmds_pending ++;
- double_to_tv (timeout, &tv);
- event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud);
- event_base_set (ud->ev_base, &sp_ud->timeout);
- event_add (&sp_ud->timeout, &tv);
+ sp_ud->timeout_ev.data = sp_ud;
+ ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout, timeout, 0.0);
+ ev_timer_start (ud->event_loop, &sp_ud->timeout_ev);
ret = TRUE;
}
else {
const gchar *cmd = NULL;
gint args_pos = 2;
gint cbref = -1, ret;
- struct timeval tv;
if (ctx) {
if (ctx->flags & LUA_REDIS_TERMINATED) {
}
}
- double_to_tv (sp_ud->c->timeout, &tv);
+ sp_ud->timeout_ev.data = sp_ud;
if (IS_ASYNC (ctx)) {
- event_set (&sp_ud->timeout, -1, EV_TIMEOUT,
- lua_redis_timeout, sp_ud);
+ ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout,
+ sp_ud->c->timeout, 0.0);
}
else {
- event_set (&sp_ud->timeout, -1, EV_TIMEOUT,
- lua_redis_timeout_sync, sp_ud);
+ ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout_sync,
+ sp_ud->c->timeout, 0.0);
}
- event_base_set (ud->ev_base, &sp_ud->timeout);
- event_add (&sp_ud->timeout, &tv);
+ ev_timer_start (ud->event_loop, &sp_ud->timeout_ev);
REDIS_RETAIN (ctx);
ctx->cmds_pending ++;
}
* Closes TCP connection
*/
LUA_FUNCTION_DEF (tcp, close);
-/***
- * @method tcp:set_timeout(seconds)
- *
- * Sets new timeout for a TCP connection in **seconds**
- * @param {number} seconds floating point value that specifies new timeout
- */
-LUA_FUNCTION_DEF (tcp, set_timeout);
/***
* @method tcp:add_read(callback, [pattern])
static const struct luaL_reg tcp_libm[] = {
LUA_INTERFACE_DEF (tcp, close),
- LUA_INTERFACE_DEF (tcp, set_timeout),
LUA_INTERFACE_DEF (tcp, add_read),
LUA_INTERFACE_DEF (tcp, add_write),
LUA_INTERFACE_DEF (tcp, shift_callback),
*/
LUA_FUNCTION_DEF (tcp_sync, close);
-/***
- * @method set_timeout(seconds)
- *
- * Sets timeout for IO operations
- */
-LUA_FUNCTION_DEF (tcp_sync, set_timeout);
-
/***
* @method read_once()
*
static const struct luaL_reg tcp_sync_libm[] = {
LUA_INTERFACE_DEF (tcp_sync, close),
- LUA_INTERFACE_DEF (tcp_sync, set_timeout),
LUA_INTERFACE_DEF (tcp_sync, read_once),
LUA_INTERFACE_DEF (tcp_sync, write),
LUA_INTERFACE_DEF (tcp_sync, eof),
struct lua_tcp_cbdata {
struct rspamd_async_session *session;
struct rspamd_async_event *async_ev;
- struct ev_loop *ev_base;
- struct timeval tv;
+ struct ev_loop *event_loop;
rspamd_inet_addr_t *addr;
GByteArray *in;
GQueue *handlers;
guint port;
guint flags;
gchar tag[7];
- struct event ev;
+ struct rspamd_io_ev ev;
struct lua_tcp_dtor *dtors;
ref_entry_t ref;
struct rspamd_task *task;
static void
lua_tcp_void_finalyser (gpointer arg) {}
-static const int default_tcp_timeout = 5000;
+static const gdouble default_tcp_timeout = 5.0;
static struct rspamd_dns_resolver *
lua_tcp_global_resolver (struct ev_loop *ev_base,
}
if (cbd->fd != -1) {
- event_del (&cbd->ev);
+ rspamd_ev_watcher_stop (cbd->event_loop, &cbd->ev);
close (cbd->fd);
cbd->fd = -1;
}
static void
lua_tcp_plan_read (struct lua_tcp_cbdata *cbd)
{
- event_del (&cbd->ev);
-#ifdef EV_CLOSED
- event_set (&cbd->ev, cbd->fd, EV_READ|EV_CLOSED,
- lua_tcp_handler, cbd);
-#else
- event_set (&cbd->ev, cbd->fd, EV_READ, lua_tcp_handler, cbd);
-#endif
- event_base_set (cbd->ev_base, &cbd->ev);
- event_add (&cbd->ev, &cbd->tv);
+ rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, EV_READ);
}
static void
}
else {
/* Want to write more */
- event_add (&cbd->ev, &cbd->tv);
}
return;
msg_debug_tcp ("plan new read");
if (can_read) {
/* We need to plan a new event */
- event_set (&cbd->ev, cbd->fd, EV_READ, lua_tcp_handler, cbd);
- event_base_set (cbd->ev_base, &cbd->ev);
- event_add (&cbd->ev, &cbd->tv);
+ rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev,
+ EV_READ);
}
else {
/* Cannot read more */
if (hdl->h.w.pos < hdl->h.w.total_bytes) {
msg_debug_tcp ("plan new write");
if (can_write) {
- event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd);
- event_base_set (cbd->ev_base, &cbd->ev);
- event_add (&cbd->ev, &cbd->tv);
+ rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev,
+ EV_WRITE);
}
else {
/* Cannot write more */
}
else { /* LUA_WANT_CONNECT */
msg_debug_tcp ("plan new connect");
- event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd);
- event_base_set (cbd->ev_base, &cbd->ev);
- event_add (&cbd->ev, &cbd->tv);
+ rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev,
+ EV_WRITE);
}
}
}
verify_peer = TRUE;
}
- event_base_set (cbd->ev_base, &cbd->ev);
cbd->ssl_conn =
- rspamd_ssl_connection_new (ssl_ctx, cbd->ev_base, verify_peer);
+ rspamd_ssl_connection_new (ssl_ctx, cbd->event_loop, verify_peer);
if (!rspamd_ssl_connect_fd (cbd->ssl_conn, fd, cbd->hostname, &cbd->ev,
- &cbd->tv, lua_tcp_handler, lua_tcp_ssl_on_error, cbd)) {
+ cbd->ev.timeout, lua_tcp_handler, lua_tcp_ssl_on_error, cbd)) {
lua_tcp_push_error (cbd, TRUE, "ssl connection failed: %s",
strerror (errno));
guint port;
gint cbref, tp, conn_cbref = -1;
gsize plen = 0;
- struct ev_loop *ev_base;
+ struct ev_loop *event_loop = NULL;
struct lua_tcp_cbdata *cbd;
struct rspamd_dns_resolver *resolver = NULL;
struct rspamd_async_session *session = NULL;
lua_pushstring (L, "port");
lua_gettable (L, -2);
if (lua_type (L, -1) == LUA_TNUMBER) {
- port = luaL_checknumber (L, -1);
+ port = lua_tointeger (L, -1);
}
else {
/* We assume that it is a unix socket */
lua_gettable (L, -2);
if (lua_type (L, -1) == LUA_TUSERDATA) {
task = lua_check_task (L, -1);
- ev_base = task->event_loop;
+ event_loop = task->event_loop;
resolver = task->resolver;
session = task->s;
cfg = task->cfg;
lua_pushstring (L, "ev_base");
lua_gettable (L, -2);
if (rspamd_lua_check_udata_maybe (L, -1, "rspamd{ev_base}")) {
- ev_base = *(struct ev_loop **)lua_touserdata (L, -1);
+ event_loop = *(struct ev_loop **)lua_touserdata (L, -1);
}
else {
- ev_base = NULL;
+ event_loop = ev_default_loop (0);
}
lua_pop (L, 1);
resolver = *(struct rspamd_dns_resolver **)lua_touserdata (L, -1);
}
else {
- resolver = lua_tcp_global_resolver (ev_base, cfg);
+ resolver = lua_tcp_global_resolver (event_loop, cfg);
}
lua_pop (L, 1);
}
lua_pushstring (L, "timeout");
lua_gettable (L, -2);
if (lua_type (L, -1) == LUA_TNUMBER) {
- timeout = lua_tonumber (L, -1) * 1000.;
+ timeout = lua_tonumber (L, -1);
}
lua_pop (L, 1);
g_queue_push_tail (cbd->handlers, wh);
}
- cbd->ev_base = ev_base;
- msec_to_tv (timeout, &cbd->tv);
+ cbd->event_loop = event_loop;
cbd->fd = -1;
cbd->port = port;
+ cbd->ev.timeout = timeout;
if (ssl) {
cbd->flags |= LUA_TCP_FLAG_SSL;
rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h);
cbd->handlers = g_queue_new ();
- cbd->ev_base = ev_base;
+ cbd->event_loop = ev_base;
cbd->flags |= LUA_TCP_FLAG_SYNC;
- double_to_tv (timeout, &cbd->tv);
cbd->fd = -1;
cbd->port = (guint16)port;
return 0;
}
-static gint
-lua_tcp_set_timeout (lua_State *L)
-{
- LUA_TRACE_POINT;
- struct lua_tcp_cbdata *cbd = lua_check_tcp (L, 1);
- gdouble seconds = lua_tonumber (L, 2);
-
- if (cbd == NULL) {
- return luaL_error (L, "invalid arguments");
- }
- if (!lua_isnumber (L, 2)) {
- return luaL_error (L, "invalid arguments: 'seconds' is expected to be number");
- }
-
- double_to_tv (seconds, &cbd->tv);
-
- return 0;
-}
-
static gint
lua_tcp_add_read (lua_State *L)
{
cbd->flags |= LUA_TCP_FLAG_FINISHED;
if (cbd->fd != -1) {
- event_del (&cbd->ev);
+ rspamd_ev_watcher_stop (cbd->event_loop, &cbd->ev);
close (cbd->fd);
cbd->fd = -1;
}
if (cbd->fd != -1) {
msg_debug ("closing sync TCP connection");
- event_del (&cbd->ev);
+ rspamd_ev_watcher_stop (cbd->event_loop, &cbd->ev);
close (cbd->fd);
cbd->fd = -1;
}
cbd->async_ev = NULL;
}
-static int
-lua_tcp_sync_set_timeout (lua_State *L)
-{
- LUA_TRACE_POINT;
- struct lua_tcp_cbdata *cbd = lua_check_sync_tcp (L, 1);
- gdouble seconds = lua_tonumber (L, 2);
-
- if (cbd == NULL) {
- return luaL_error (L, "invalid arguments: self is not rspamd{tcp_sync}");
- }
- if (lua_type (L, 2) != LUA_TNUMBER) {
- return luaL_error (L, "invalid arguments: second parameter is expected to be number");
- }
-
- double_to_tv (seconds, &cbd->tv);
-
- return 0;
-}
-
static int
lua_tcp_sync_read_once (lua_State *L)
{
verify_peer = TRUE;
}
- event_base_set (cbd->ev_base, &cbd->ev);
cbd->ssl_conn =
- rspamd_ssl_connection_new (ssl_ctx, cbd->ev_base, verify_peer);
+ rspamd_ssl_connection_new (ssl_ctx, cbd->event_loop, verify_peer);
if (!rspamd_ssl_connect_fd (cbd->ssl_conn, cbd->fd, cbd->hostname, &cbd->ev,
- &cbd->tv, lua_tcp_handler, lua_tcp_ssl_on_error, cbd)) {
+ cbd->ev.timeout, lua_tcp_handler, lua_tcp_ssl_on_error, cbd)) {
lua_tcp_push_error (cbd, TRUE, "ssl connection failed: %s",
strerror (errno));
}
#include "utlist.h"
#include "unix-std.h"
#include <math.h>
+#include <src/libutil/libev_helper.h>
static const gchar *M = "rspamd lua udp";
};
struct lua_udp_cbdata {
- struct event io;
- struct timeval tv;
- struct ev_loop *ev_base;
+ struct ev_loop *event_loop;
+ struct rspamd_io_ev ev;
struct rspamd_async_event *async_ev;
struct rspamd_task *task;
rspamd_mempool_t *pool;
struct lua_udp_cbdata *cbd = (struct lua_udp_cbdata *)p;
if (cbd->sock != -1) {
- if (cbd->io.ev_base != NULL) {
- event_del (&cbd->io);
- }
-
+ rspamd_ev_watcher_stop (cbd->event_loop, &cbd->ev);
close (cbd->sock);
}
L = cbd->L;
- event_del (&cbd->io);
-
if (what == EV_TIMEOUT) {
if (cbd->sent && cbd->retransmits > 0) {
r = lua_try_send_request (cbd);
if (r == RSPAMD_SENT_OK) {
- event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
- event_base_set (cbd->ev_base, &cbd->io);
- event_add (&cbd->io, &cbd->tv);
+ rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, EV_READ);
lua_udp_maybe_register_event (cbd);
cbd->retransmits --;
}
}
else {
cbd->retransmits --;
- event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
- event_base_set (cbd->ev_base, &cbd->io);
- event_add (&cbd->io, &cbd->tv);
+ rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, EV_WRITE);
}
}
else {
if (r == RSPAMD_SENT_OK) {
if (cbd->cbref != -1) {
- event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
- event_base_set (cbd->ev_base, &cbd->io);
- event_add (&cbd->io, &cbd->tv);
+ rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, EV_READ);
cbd->sent = TRUE;
}
else {
}
else {
cbd->retransmits --;
- event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
- event_base_set (cbd->ev_base, &cbd->io);
- event_add (&cbd->io, &cbd->tv);
+ rspamd_ev_watcher_reschedule (cbd->event_loop, &cbd->ev, EV_WRITE);
}
}
else if (what == EV_READ) {
lua_gettable (L, -2);
if (lua_type (L, -1) == LUA_TNUMBER) {
- port = luaL_checknumber (L, -1);
+ port = lua_tointeger (L, -1);
}
else {
/* We assume that it is a unix socket */
}
-
- if (!ev_base || !pool) {
- rspamd_inet_address_free (addr);
-
- return luaL_error (L, "invalid arguments");
- }
-
cbd = rspamd_mempool_alloc0 (pool, sizeof (*cbd));
- cbd->ev_base = ev_base;
+ cbd->event_loop = ev_base;
cbd->pool = pool;
cbd->s = session;
cbd->addr = addr;
cbd->sock = rspamd_socket_create (rspamd_inet_address_get_af (addr),
SOCK_DGRAM, 0, TRUE);
cbd->cbref = -1;
- double_to_tv (timeout, &cbd->tv);
+ cbd->ev.timeout = timeout;
if (cbd->sock == -1) {
rspamd_inet_address_free (addr);
return 2;
}
- event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
- event_base_set (cbd->ev_base, &cbd->io);
- event_add (&cbd->io, &cbd->tv);
+ rspamd_ev_watcher_init (&cbd->ev, cbd->sock, EV_READ,
+ lua_udp_io_handler, cbd);
+ rspamd_ev_watcher_start (cbd->event_loop, &cbd->ev, timeout);
cbd->sent = TRUE;
}
return 2;
}
else {
- event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
- event_base_set (cbd->ev_base, &cbd->io);
- event_add (&cbd->io, &cbd->tv);
+ rspamd_ev_watcher_init (&cbd->ev, cbd->sock, EV_WRITE,
+ lua_udp_io_handler, cbd);
+ rspamd_ev_watcher_start (cbd->event_loop, &cbd->ev, timeout);
if (!lua_udp_maybe_register_event (cbd)) {
lua_pushboolean (L, false);
pev_base = lua_newuserdata (L, sizeof (struct ev_loop *));
rspamd_lua_setclass (L, "rspamd{ev_base}", -1);
- *pev_base = event_init ();
+ *pev_base = ev_default_loop (EVFLAG_SIGNALFD);
return 1;
}
message = luaL_checklstring (L, 2, &mlen);
if (cfg != NULL && message != NULL) {
- base = event_init ();
+ base = ev_loop_new (EVFLAG_SIGNALFD);
rspamd_init_filters (cfg, FALSE);
task = rspamd_task_new (NULL, cfg, NULL, NULL, base);
task->msg.begin = rspamd_mempool_alloc (task->task_pool, mlen);
}
else {
if (rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL)) {
- event_base_loop (base, 0);
+ ev_loop (base, 0);
if (res != NULL) {
ucl_object_push_lua (L, res, true);
}
}
- event_base_free (base);
+ ev_loop_destroy (base);
}
else {
lua_pushnil (L);
ev_base = lua_check_ev_base (L, 1);
if (lua_isnumber (L, 2)) {
- flags = lua_tonumber (L, 2);
+ flags = lua_tointeger (L, 2);
}
- int ret = event_base_loop (ev_base, flags);
+ int ret = ev_run (ev_base, flags);
lua_pushinteger (L, ret);
return 1;
GString *out_buf;
goffset out_pos;
struct rspamd_worker *wrk;
- struct ev_loop *ev_base;
- struct event ev;
+ struct ev_loop *event_loop;
+ ev_io ev;
};
static void
if (!cbdata->replied) {
/* We still need to call on_complete callback */
+ ev_io_stop (cbdata->event_loop, &cbdata->ev);
rspamd_lua_call_on_complete (cbdata->L, cbdata,
"Worker has died without reply", NULL, 0);
- event_del (&cbdata->ev);
}
/* Free structures */
srv_cmd.cmd.on_fork.state = child_dead;
srv_cmd.cmd.on_fork.cpid = cbdata->cpid;
srv_cmd.cmd.on_fork.ppid = getpid ();
- rspamd_srv_send_command (cbdata->wrk, cbdata->ev_base, &srv_cmd, -1,
+ rspamd_srv_send_command (cbdata->wrk, cbdata->event_loop, &srv_cmd, -1,
NULL, NULL);
g_free (cbdata);
}
static void
-rspamd_lua_subprocess_io (gint fd, short what, gpointer ud)
+rspamd_lua_subprocess_io (EV_P_ ev_io *w, int revents)
{
- struct rspamd_lua_process_cbdata *cbdata = ud;
+ struct rspamd_lua_process_cbdata *cbdata =
+ (struct rspamd_lua_process_cbdata *)w->data;
gssize r;
if (cbdata->sz == (guint64)-1) {
sizeof (guint64) - cbdata->io_buf->len);
if (r == 0) {
+ ev_io_stop (cbdata->event_loop, &cbdata->ev);
rspamd_lua_call_on_complete (cbdata->L, cbdata,
"Unexpected EOF", NULL, 0);
- event_del (&cbdata->ev);
cbdata->replied = TRUE;
kill (cbdata->cpid, SIGTERM);
return;
}
else {
+ ev_io_stop (cbdata->event_loop, &cbdata->ev);
rspamd_lua_call_on_complete (cbdata->L, cbdata,
strerror (errno), NULL, 0);
- event_del (&cbdata->ev);
cbdata->replied = TRUE;
kill (cbdata->cpid, SIGTERM);
cbdata->sz - cbdata->io_buf->len);
if (r == 0) {
+ ev_io_stop (cbdata->event_loop, &cbdata->ev);
rspamd_lua_call_on_complete (cbdata->L, cbdata,
"Unexpected EOF", NULL, 0);
- event_del (&cbdata->ev);
cbdata->replied = TRUE;
kill (cbdata->cpid, SIGTERM);
return;
}
else {
+ ev_io_stop (cbdata->event_loop, &cbdata->ev);
rspamd_lua_call_on_complete (cbdata->L, cbdata,
strerror (errno), NULL, 0);
- event_del (&cbdata->ev);
cbdata->replied = TRUE;
kill (cbdata->cpid, SIGTERM);
if (cbdata->io_buf->len == cbdata->sz) {
gchar rep[4];
+ ev_io_stop (cbdata->event_loop, &cbdata->ev);
/* Finished reading data */
if (cbdata->is_error) {
cbdata->io_buf->str[cbdata->io_buf->len] = '\0';
NULL, cbdata->io_buf->str, cbdata->io_buf->len);
}
- event_del (&cbdata->ev);
cbdata->replied = TRUE;
/* Write reply to the child */
actx = w->ctx;
cbdata->wrk = w;
cbdata->L = L;
- cbdata->ev_base = actx->ev_base;
+ cbdata->event_loop = actx->event_loop;
cbdata->sz = (guint64)-1;
pid = fork ();
close (cbdata->sp[0]);
/* Here we assume that we can block on writing results */
rspamd_socket_blocking (cbdata->sp[1]);
- event_reinit (cbdata->ev_base);
+ ev_loop_destroy (EV_DEFAULT);
+ cbdata->event_loop = ev_default_loop (EVFLAG_SIGNALFD);
g_hash_table_remove_all (w->signal_events);
rspamd_worker_unblock_signals ();
rspamd_lua_execute_lua_subprocess (L, cbdata);
srv_cmd.cmd.on_fork.state = child_create;
srv_cmd.cmd.on_fork.cpid = pid;
srv_cmd.cmd.on_fork.ppid = getpid ();
- rspamd_srv_send_command (w, cbdata->ev_base, &srv_cmd, -1, NULL, NULL);
+ rspamd_srv_send_command (w, cbdata->event_loop, &srv_cmd, -1, NULL, NULL);
close (cbdata->sp[1]);
rspamd_socket_nonblocking (cbdata->sp[0]);
/* Parent */
- rspamd_worker_set_signal_handler (SIGCHLD, w, cbdata->ev_base,
+ rspamd_worker_set_signal_handler (SIGCHLD, w, cbdata->event_loop,
rspamd_lua_cld_handler,
cbdata);
/* Add result pipe waiting */
- event_set (&cbdata->ev, cbdata->sp[0], EV_READ | EV_PERSIST,
- rspamd_lua_subprocess_io, cbdata);
- event_base_set (cbdata->ev_base, &cbdata->ev);
- /* TODO: maybe add timeout? */
- event_add (&cbdata->ev, NULL);
+ ev_io_init (&cbdata->ev, rspamd_lua_subprocess_io, cbdata->sp[0], EV_READ);
+ cbdata->ev.data = cbdata;
+ ev_io_start (cbdata->event_loop, &cbdata->ev);
return 0;
}
struct rspamd_abstract_worker_ctx {
guint64 magic;
/* Events base */
- struct ev_loop *ev_base;
+ struct ev_loop *event_loop;
/* DNS resolver */
struct rspamd_dns_resolver *resolver;
/* Config */
if (cfg->on_term_scripts) {
ctx = worker->ctx;
/* Create a fake task object for async events */
- task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->ev_base);
+ task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop);
task->resolver = ctx->resolver;
task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
task->s = rspamd_session_create (task->task_pool,