summaryrefslogtreecommitdiffstats
path: root/src/libutil
diff options
context:
space:
mode:
Diffstat (limited to 'src/libutil')
-rw-r--r--src/libutil/CMakeLists.txt2
-rw-r--r--src/libutil/addr.c39
-rw-r--r--src/libutil/addr.h10
-rw-r--r--src/libutil/aio_event.c508
-rw-r--r--src/libutil/aio_event.h59
-rw-r--r--src/libutil/http_connection.c94
-rw-r--r--src/libutil/http_connection.h10
-rw-r--r--src/libutil/http_context.c52
-rw-r--r--src/libutil/http_context.h8
-rw-r--r--src/libutil/http_private.h4
-rw-r--r--src/libutil/http_router.c46
-rw-r--r--src/libutil/http_router.h7
-rw-r--r--src/libutil/libev_helper.c119
-rw-r--r--src/libutil/libev_helper.h78
-rw-r--r--src/libutil/map.c233
-rw-r--r--src/libutil/map.h4
-rw-r--r--src/libutil/map_private.h24
-rw-r--r--src/libutil/ssl_util.c77
-rw-r--r--src/libutil/ssl_util.h5
-rw-r--r--src/libutil/str_util.h12
-rw-r--r--src/libutil/upstream.c32
-rw-r--r--src/libutil/upstream.h2
-rw-r--r--src/libutil/util.c54
-rw-r--r--src/libutil/util.h28
24 files changed, 506 insertions, 1001 deletions
diff --git a/src/libutil/CMakeLists.txt b/src/libutil/CMakeLists.txt
index f86d650f0..5a94a732c 100644
--- a/src/libutil/CMakeLists.txt
+++ b/src/libutil/CMakeLists.txt
@@ -1,7 +1,7 @@
# Librspamd-util
SET(LIBRSPAMDUTILSRC
${CMAKE_CURRENT_SOURCE_DIR}/addr.c
- ${CMAKE_CURRENT_SOURCE_DIR}/aio_event.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/libev_helper.c
${CMAKE_CURRENT_SOURCE_DIR}/bloom.c
${CMAKE_CURRENT_SOURCE_DIR}/expression.c
${CMAKE_CURRENT_SOURCE_DIR}/fstring.c
diff --git a/src/libutil/addr.c b/src/libutil/addr.c
index 30a9ce66a..112c5d2cd 100644
--- a/src/libutil/addr.c
+++ b/src/libutil/addr.c
@@ -203,41 +203,10 @@ rspamd_ip_is_valid (const rspamd_inet_addr_t *addr)
return ret;
}
-static void
-rspamd_enable_accept_event (gint fd, short what, gpointer d)
-{
- struct event *events = d;
-
- event_del (&events[1]);
- event_add (&events[0], NULL);
-}
-
-static void
-rspamd_disable_accept_events (gint sock, GList *accept_events)
-{
- GList *cur;
- struct event *events;
- const gdouble throttling = 0.5;
- struct timeval tv;
- struct event_base *ev_base;
-
- double_to_tv (throttling, &tv);
-
- for (cur = accept_events; cur != NULL; cur = g_list_next (cur)) {
- events = cur->data;
-
- ev_base = event_get_base (&events[0]);
- event_del (&events[0]);
- event_set (&events[1], sock, EV_TIMEOUT, rspamd_enable_accept_event,
- events);
- event_base_set (ev_base, &events[1]);
- event_add (&events[1], &tv);
- }
-}
-
gint
rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **target,
- GList *accept_events)
+ rspamd_accept_throttling_handler hdl,
+ void *hdl_data)
{
gint nfd, serrno;
union sa_union su;
@@ -254,7 +223,9 @@ rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **target,
}
else if (errno == EMFILE || errno == ENFILE) {
/* Temporary disable accept event */
- rspamd_disable_accept_events (sock, accept_events);
+ if (hdl) {
+ hdl (sock, hdl_data);
+ }
return 0;
}
diff --git a/src/libutil/addr.h b/src/libutil/addr.h
index bfe586ad1..7efa5e318 100644
--- a/src/libutil/addr.h
+++ b/src/libutil/addr.h
@@ -221,15 +221,17 @@ int rspamd_inet_address_listen (const rspamd_inet_addr_t *addr, gint type,
*/
gboolean rspamd_ip_is_valid (const rspamd_inet_addr_t *addr);
+typedef void (*rspamd_accept_throttling_handler)(gint, void *);
/**
* Accept from listening socket filling addr structure
* @param sock listening socket
- * @param addr allocated inet addr structure
- * @param accept_events events for accepting new sockets
+ * @param target allocated inet addr structure
* @return
*/
-gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **addr,
- GList *accept_events);
+gint rspamd_accept_from_socket (gint sock,
+ rspamd_inet_addr_t **target,
+ rspamd_accept_throttling_handler hdl,
+ void *hdl_data);
/**
* Parse host[:port[:priority]] line
diff --git a/src/libutil/aio_event.c b/src/libutil/aio_event.c
deleted file mode 100644
index 584feb501..000000000
--- a/src/libutil/aio_event.c
+++ /dev/null
@@ -1,508 +0,0 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "config.h"
-#include <event.h>
-#include "aio_event.h"
-#include "rspamd.h"
-#include "unix-std.h"
-
-#ifdef HAVE_SYS_EVENTFD_H
-#include <sys/eventfd.h>
-#endif
-
-#ifdef HAVE_AIO_H
-#include <aio.h>
-#endif
-
-/* Linux syscall numbers */
-#if defined(__i386__)
-# define SYS_io_setup 245
-# define SYS_io_destroy 246
-# define SYS_io_getevents 247
-# define SYS_io_submit 248
-# define SYS_io_cancel 249
-#elif defined(__x86_64__)
-# define SYS_io_setup 206
-# define SYS_io_destroy 207
-# define SYS_io_getevents 208
-# define SYS_io_submit 209
-# define SYS_io_cancel 210
-#else
-# warning \
- "aio is not supported on this platform, please contact author for details"
-# define SYS_io_setup 0
-# define SYS_io_destroy 0
-# define SYS_io_getevents 0
-# define SYS_io_submit 0
-# define SYS_io_cancel 0
-#endif
-
-#define SYS_eventfd 323
-#define MAX_AIO_EV 64
-
-struct io_cbdata {
- gint fd;
- rspamd_aio_cb cb;
- guint64 len;
- gpointer buf;
- gpointer io_buf;
- gpointer ud;
-};
-
-#ifdef LINUX
-
-/* Linux specific mappings and utilities to avoid using of libaio */
-
-typedef unsigned long aio_context_t;
-
-typedef enum io_iocb_cmd {
- IO_CMD_PREAD = 0,
- IO_CMD_PWRITE = 1,
-
- IO_CMD_FSYNC = 2,
- IO_CMD_FDSYNC = 3,
-
- IO_CMD_POLL = 5,
- IO_CMD_NOOP = 6,
-} io_iocb_cmd_t;
-
-#if defined(__LITTLE_ENDIAN)
-#define PADDED(x,y) x, y
-#elif defined(__BIG_ENDIAN)
-#define PADDED(x,y) y, x
-#else
-#error edit for your odd byteorder.
-#endif
-
-/*
- * we always use a 64bit off_t when communicating
- * with userland. its up to libraries to do the
- * proper padding and aio_error abstraction
- */
-
-struct iocb {
- /* these are internal to the kernel/libc. */
- guint64 aio_data; /* data to be returned in event's data */
- guint32 PADDED (aio_key, aio_reserved1);
- /* the kernel sets aio_key to the req # */
-
- /* common fields */
- guint16 aio_lio_opcode; /* see IOCB_CMD_ above */
- gint16 aio_reqprio;
- guint32 aio_fildes;
-
- guint64 aio_buf;
- guint64 aio_nbytes;
- gint64 aio_offset;
-
- /* extra parameters */
- guint64 aio_reserved2; /* TODO: use this for a (struct sigevent *) */
-
- /* flags for the "struct iocb" */
- guint32 aio_flags;
-
- /*
- * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an
- * eventfd to signal AIO readiness to
- */
- guint32 aio_resfd;
-};
-
-struct io_event {
- guint64 data; /* the data field from the iocb */
- guint64 obj; /* what iocb this event came from */
- gint64 res; /* result code for this event */
- gint64 res2; /* secondary result */
-};
-
-/* Linux specific io calls */
-static int
-io_setup (guint nr_reqs, aio_context_t *ctx)
-{
- return syscall (SYS_io_setup, nr_reqs, ctx);
-}
-
-static int
-io_destroy (aio_context_t ctx)
-{
- return syscall (SYS_io_destroy, ctx);
-}
-
-static int
-io_getevents (aio_context_t ctx,
- long min_nr,
- long nr,
- struct io_event *events,
- struct timespec *tmo)
-{
- return syscall (SYS_io_getevents, ctx, min_nr, nr, events, tmo);
-}
-
-static int
-io_submit (aio_context_t ctx, long n, struct iocb **paiocb)
-{
- return syscall (SYS_io_submit, ctx, n, paiocb);
-}
-
-static int
-io_cancel (aio_context_t ctx, struct iocb *iocb, struct io_event *result)
-{
- return syscall (SYS_io_cancel, ctx, iocb, result);
-}
-
-# ifndef HAVE_SYS_EVENTFD_H
-static int
-eventfd (guint initval, guint flags)
-{
- return syscall (SYS_eventfd, initval);
-}
-# endif
-
-#endif
-
-/**
- * AIO context
- */
-struct aio_context {
- struct event_base *base;
- gboolean has_aio; /**< Whether we have aio support on a system */
-#ifdef LINUX
- /* Eventfd variant */
- gint event_fd;
- struct event eventfd_ev;
- aio_context_t io_ctx;
-#elif defined(HAVE_AIO_H)
- /* POSIX aio */
- struct event rtsigs[128];
-#endif
-};
-
-#ifdef LINUX
-/* Eventfd read callback */
-static void
-rspamd_eventfdcb (gint fd, gshort what, gpointer ud)
-{
- struct aio_context *ctx = ud;
- guint64 ready;
- gint done, i;
- struct io_event event[32];
- struct timespec ts;
- struct io_cbdata *ev_data;
-
- /* Eventfd returns number of events ready got from kernel */
- if (read (fd, &ready, 8) != 8) {
- if (errno == EAGAIN) {
- return;
- }
- msg_err ("eventfd read returned error: %s", strerror (errno));
- }
-
- ts.tv_sec = 0;
- ts.tv_nsec = 0;
-
- while (ready) {
- /* Get events ready */
- done = io_getevents (ctx->io_ctx, 1, 32, event, &ts);
-
- if (done > 0) {
- ready -= done;
-
- for (i = 0; i < done; i++) {
- ev_data = (struct io_cbdata *) (uintptr_t) event[i].data;
- /* Call this callback */
- ev_data->cb (ev_data->fd,
- event[i].res,
- ev_data->len,
- ev_data->buf,
- ev_data->ud);
- if (ev_data->io_buf) {
- free (ev_data->io_buf);
- }
- g_free (ev_data);
- }
- }
- else if (done == 0) {
- /* No more events are ready */
- return;
- }
- else {
- msg_err ("io_getevents failed: %s", strerror (errno));
- return;
- }
- }
-}
-
-#endif
-
-/**
- * Initialize aio with specified event base
- */
-struct aio_context *
-rspamd_aio_init (struct event_base *base)
-{
- struct aio_context *new;
-
- /* First of all we need to detect which type of aio we can try to use */
- new = g_malloc0 (sizeof (struct aio_context));
- new->base = base;
-
-#ifdef LINUX
- /* On linux we are trying to use io (3) and eventfd for notifying */
- new->event_fd = eventfd (0, 0);
- if (new->event_fd == -1) {
- msg_err ("eventfd failed: %s", strerror (errno));
- }
- else {
- /* Set this socket non-blocking */
- if (rspamd_socket_nonblocking (new->event_fd) == -1) {
- msg_err ("non blocking for eventfd failed: %s", strerror (errno));
- close (new->event_fd);
- }
- else {
- event_set (&new->eventfd_ev,
- new->event_fd,
- EV_READ | EV_PERSIST,
- rspamd_eventfdcb,
- new);
- event_base_set (new->base, &new->eventfd_ev);
- event_add (&new->eventfd_ev, NULL);
- if (io_setup (MAX_AIO_EV, &new->io_ctx) == -1) {
- msg_err ("io_setup failed: %s", strerror (errno));
- close (new->event_fd);
- }
- else {
- new->has_aio = TRUE;
- }
- }
- }
-#elif defined(HAVE_AIO_H)
- /* TODO: implement this */
-#endif
-
- return new;
-}
-
-/**
- * Open file for aio
- */
-gint
-rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags)
-{
- gint fd = -1;
- /* Fallback */
- if (!ctx->has_aio) {
- return open (path, flags);
- }
-#ifdef LINUX
-
- fd = open (path, flags | O_DIRECT);
-
- return fd;
-#elif defined(HAVE_AIO_H)
- fd = open (path, flags);
-#endif
-
- return fd;
-}
-
-/**
- * Asynchronous read of file
- */
-gint
-rspamd_aio_read (gint fd,
- gpointer buf,
- guint64 len,
- guint64 offset,
- struct aio_context *ctx,
- rspamd_aio_cb cb,
- gpointer ud)
-{
- gint r = -1;
-
- if (ctx->has_aio) {
-#ifdef LINUX
- struct iocb *iocb[1];
- struct io_cbdata *cbdata;
-
- cbdata = g_malloc0 (sizeof (struct io_cbdata));
- cbdata->cb = cb;
- cbdata->buf = buf;
- cbdata->len = len;
- cbdata->ud = ud;
- cbdata->fd = fd;
- cbdata->io_buf = NULL;
-
- iocb[0] = alloca (sizeof (struct iocb));
- memset (iocb[0], 0, sizeof (struct iocb));
- iocb[0]->aio_fildes = fd;
- iocb[0]->aio_lio_opcode = IO_CMD_PREAD;
- iocb[0]->aio_reqprio = 0;
- iocb[0]->aio_buf = (guint64)((uintptr_t)buf);
- iocb[0]->aio_nbytes = len;
- iocb[0]->aio_offset = offset;
- iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
- iocb[0]->aio_resfd = ctx->event_fd;
- iocb[0]->aio_data = (guint64)((uintptr_t)cbdata);
-
- /* Iocb is copied to kernel internally, so it is safe to put it on stack */
- if (io_submit (ctx->io_ctx, 1, iocb) == 1) {
- return len;
- }
- else {
- if (errno == EAGAIN || errno == ENOSYS) {
- /* Fall back to sync read */
- goto blocking;
- }
- return -1;
- }
-
-#elif defined(HAVE_AIO_H)
-#endif
- }
- else {
- /* Blocking variant */
- goto blocking;
-blocking:
-#ifdef _LARGEFILE64_SOURCE
- r = lseek64 (fd, offset, SEEK_SET);
-#else
- r = lseek (fd, offset, SEEK_SET);
-#endif
- if (r > 0) {
- r = read (fd, buf, len);
- if (r >= 0) {
- cb (fd, 0, r, buf, ud);
- }
- else {
- cb (fd, r, -1, buf, ud);
- }
- }
- }
-
- return r;
-}
-
-/**
- * Asynchronous write of file
- */
-gint
-rspamd_aio_write (gint fd,
- gpointer buf,
- guint64 len,
- guint64 offset,
- struct aio_context *ctx,
- rspamd_aio_cb cb,
- gpointer ud)
-{
- gint r = -1;
-
- if (ctx->has_aio) {
-#ifdef LINUX
- struct iocb *iocb[1];
- struct io_cbdata *cbdata;
-
- cbdata = g_malloc0 (sizeof (struct io_cbdata));
- cbdata->cb = cb;
- cbdata->buf = buf;
- cbdata->len = len;
- cbdata->ud = ud;
- cbdata->fd = fd;
- /* We need to align pointer on boundary of 512 bytes here */
- if (posix_memalign (&cbdata->io_buf, 512, len) != 0) {
- return -1;
- }
- memcpy (cbdata->io_buf, buf, len);
-
- iocb[0] = alloca (sizeof (struct iocb));
- memset (iocb[0], 0, sizeof (struct iocb));
- iocb[0]->aio_fildes = fd;
- iocb[0]->aio_lio_opcode = IO_CMD_PWRITE;
- iocb[0]->aio_reqprio = 0;
- iocb[0]->aio_buf = (guint64)((uintptr_t)cbdata->io_buf);
- iocb[0]->aio_nbytes = len;
- iocb[0]->aio_offset = offset;
- iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
- iocb[0]->aio_resfd = ctx->event_fd;
- iocb[0]->aio_data = (guint64)((uintptr_t)cbdata);
-
- /* Iocb is copied to kernel internally, so it is safe to put it on stack */
- if (io_submit (ctx->io_ctx, 1, iocb) == 1) {
- return len;
- }
- else {
- if (errno == EAGAIN || errno == ENOSYS) {
- /* Fall back to sync read */
- goto blocking;
- }
- return -1;
- }
-
-#elif defined(HAVE_AIO_H)
-#endif
- }
- else {
- /* Blocking variant */
- goto blocking;
-blocking:
-#ifdef _LARGEFILE64_SOURCE
- r = lseek64 (fd, offset, SEEK_SET);
-#else
- r = lseek (fd, offset, SEEK_SET);
-#endif
- if (r > 0) {
- r = write (fd, buf, len);
- if (r >= 0) {
- cb (fd, 0, r, buf, ud);
- }
- else {
- cb (fd, r, -1, buf, ud);
- }
- }
- }
-
- return r;
-}
-
-/**
- * Close of aio operations
- */
-gint
-rspamd_aio_close (gint fd, struct aio_context *ctx)
-{
- gint r = -1;
-
- if (ctx->has_aio) {
-#ifdef LINUX
- struct iocb iocb;
- struct io_event ev;
-
- memset (&iocb, 0, sizeof (struct iocb));
- iocb.aio_fildes = fd;
- iocb.aio_lio_opcode = IO_CMD_NOOP;
-
- /* Iocb is copied to kernel internally, so it is safe to put it on stack */
- r = io_cancel (ctx->io_ctx, &iocb, &ev);
- close (fd);
- return r;
-
-#elif defined(HAVE_AIO_H)
-#endif
- }
-
- r = close (fd);
-
- return r;
-}
diff --git a/src/libutil/aio_event.h b/src/libutil/aio_event.h
deleted file mode 100644
index cccbed4e2..000000000
--- a/src/libutil/aio_event.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef AIO_EVENT_H_
-#define AIO_EVENT_H_
-
-#include "config.h"
-
-/**
- * AIO context
- */
-struct aio_context;
-
-/**
- * Callback for notifying
- */
-typedef void (*rspamd_aio_cb) (gint fd, gint res, guint64 len, gpointer data,
- gpointer ud);
-
-/**
- * Initialize aio with specified event base
- */
-struct aio_context * rspamd_aio_init (struct event_base *base);
-
-/**
- * Open file for aio
- */
-gint rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags);
-
-/**
- * Asynchronous read of file
- */
-gint rspamd_aio_read (gint fd, gpointer buf, guint64 len, guint64 offset,
- struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud);
-
-/**
- * Asynchronous write of file
- */
-gint rspamd_aio_write (gint fd, gpointer buf, guint64 len, guint64 offset,
- struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud);
-
-/**
- * Close of aio operations
- */
-gint rspamd_aio_close (gint fd, struct aio_context *ctx);
-
-#endif /* AIO_EVENT_H_ */
diff --git a/src/libutil/http_connection.c b/src/libutil/http_connection.c
index aa964f417..9bf7456e7 100644
--- a/src/libutil/http_connection.c
+++ b/src/libutil/http_connection.c
@@ -25,6 +25,7 @@
#include "ottery.h"
#include "keypair_private.h"
#include "cryptobox.h"
+#include "libutil/libev_helper.h"
#include "libutil/ssl_util.h"
#include "libserver/url.h"
@@ -67,9 +68,8 @@ struct rspamd_http_connection_private {
struct rspamd_http_header *header;
struct http_parser parser;
struct http_parser_settings parser_cb;
- struct event ev;
- struct timeval tv;
- struct timeval *ptv;
+ struct rspamd_io_ev ev;
+ ev_tstamp timeout;
struct rspamd_http_message *msg;
struct iovec *out;
guint outlen;
@@ -348,9 +348,7 @@ rspamd_http_on_headers_complete (http_parser * parser)
if (msg->method == HTTP_HEAD) {
/* We don't care about the rest */
- if (rspamd_event_pending (&priv->ev, EV_READ)) {
- event_del (&priv->ev);
- }
+ rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
msg->code = parser->status_code;
rspamd_http_connection_ref (conn);
@@ -358,7 +356,7 @@ rspamd_http_on_headers_complete (http_parser * parser)
if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) {
rspamd_http_context_push_keepalive (conn->priv->ctx, conn,
- msg, conn->priv->ctx->ev_base);
+ msg, conn->priv->ctx->event_loop);
rspamd_http_connection_reset (conn);
}
else {
@@ -532,17 +530,14 @@ rspamd_http_on_headers_complete_decrypted (http_parser *parser)
if (msg->method == HTTP_HEAD) {
/* We don't care about the rest */
- if (rspamd_event_pending (&priv->ev, EV_READ)) {
- event_del (&priv->ev);
- }
-
+ rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
msg->code = parser->status_code;
rspamd_http_connection_ref (conn);
ret = conn->finish_handler (conn, msg);
if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) {
rspamd_http_context_push_keepalive (conn->priv->ctx, conn,
- msg, conn->priv->ctx->ev_base);
+ msg, conn->priv->ctx->event_loop);
rspamd_http_connection_reset (conn);
}
else {
@@ -692,16 +687,13 @@ rspamd_http_on_message_complete (http_parser * parser)
}
if (ret == 0) {
- if (rspamd_event_pending (&priv->ev, EV_READ)) {
- event_del (&priv->ev);
- }
-
+ rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
rspamd_http_connection_ref (conn);
ret = conn->finish_handler (conn, priv->msg);
if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) {
rspamd_http_context_push_keepalive (conn->priv->ctx, conn,
- priv->msg, conn->priv->ctx->ev_base);
+ priv->msg, conn->priv->ctx->event_loop);
rspamd_http_connection_reset (conn);
}
else {
@@ -741,11 +733,11 @@ rspamd_http_simple_client_helper (struct rspamd_http_connection *conn)
if (conn->opts & RSPAMD_HTTP_CLIENT_SHARED) {
rspamd_http_connection_read_message_shared (conn, conn->ud,
- conn->priv->ptv);
+ conn->priv->timeout);
}
else {
rspamd_http_connection_read_message (conn, conn->ud,
- conn->priv->ptv);
+ conn->priv->timeout);
}
if (priv->msg) {
@@ -835,7 +827,6 @@ rspamd_http_write_helper (struct rspamd_http_connection *conn)
else {
/* Want to write more */
priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_RESETED;
- event_add (&priv->ev, priv->ptv);
}
return;
@@ -1269,10 +1260,7 @@ rspamd_http_connection_reset (struct rspamd_http_connection *conn)
if (!(priv->flags & RSPAMD_HTTP_CONN_FLAG_RESETED)) {
- if (rspamd_event_pending (&priv->ev, EV_READ|EV_WRITE|EV_TIMEOUT)) {
- event_del (&priv->ev);
- }
-
+ rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
rspamd_http_parser_reset (conn);
}
@@ -1468,7 +1456,7 @@ rspamd_http_connection_free (struct rspamd_http_connection *conn)
static void
rspamd_http_connection_read_message_common (struct rspamd_http_connection *conn,
- gpointer ud, struct timeval *timeout,
+ gpointer ud, ev_tstamp timeout,
gint flags)
{
struct rspamd_http_connection_private *priv = conn->priv;
@@ -1490,42 +1478,30 @@ rspamd_http_connection_read_message_common (struct rspamd_http_connection *conn,
priv->flags |= RSPAMD_HTTP_CONN_FLAG_ENCRYPTED;
}
- if (timeout == NULL) {
- priv->ptv = NULL;
- }
- else {
- memmove (&priv->tv, timeout, sizeof (struct timeval));
- priv->ptv = &priv->tv;
- }
-
+ priv->timeout = timeout;
priv->header = NULL;
priv->buf = g_malloc0 (sizeof (*priv->buf));
REF_INIT_RETAIN (priv->buf, rspamd_http_privbuf_dtor);
priv->buf->data = rspamd_fstring_sized_new (8192);
priv->flags |= RSPAMD_HTTP_CONN_FLAG_NEW_HEADER;
- event_set (&priv->ev,
- conn->fd,
- EV_READ | EV_PERSIST,
- rspamd_http_event_handler,
- conn);
-
- event_base_set (priv->ctx->ev_base, &priv->ev);
+ rspamd_ev_watcher_init (&priv->ev, conn->fd, EV_READ,
+ rspamd_http_event_handler, conn);
+ rspamd_ev_watcher_start (priv->ctx->event_loop, &priv->ev, priv->timeout);
priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_RESETED;
- event_add (&priv->ev, priv->ptv);
}
void
rspamd_http_connection_read_message (struct rspamd_http_connection *conn,
- gpointer ud, struct timeval *timeout)
+ gpointer ud, ev_tstamp timeout)
{
rspamd_http_connection_read_message_common (conn, ud, timeout, 0);
}
void
rspamd_http_connection_read_message_shared (struct rspamd_http_connection *conn,
- gpointer ud, struct timeval *timeout)
+ gpointer ud, ev_tstamp timeout)
{
rspamd_http_connection_read_message_common (conn, ud, timeout,
RSPAMD_HTTP_FLAG_SHMEM);
@@ -1912,7 +1888,7 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn
const gchar *host,
const gchar *mime_type,
gpointer ud,
- struct timeval *timeout,
+ ev_tstamp timeout,
gboolean allow_shared)
{
struct rspamd_http_connection_private *priv = conn->priv;
@@ -1930,14 +1906,7 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn
conn->ud = ud;
priv->msg = msg;
-
- if (timeout == NULL) {
- priv->ptv = NULL;
- }
- else if (timeout != &priv->tv) {
- memcpy (&priv->tv, timeout, sizeof (struct timeval));
- priv->ptv = &priv->tv;
- }
+ priv->timeout = timeout;
priv->header = NULL;
priv->buf = g_malloc0 (sizeof (*priv->buf));
@@ -2224,16 +2193,12 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn
msg->flags &=~ RSPAMD_HTTP_FLAG_SSL;
}
- if (rspamd_event_pending (&priv->ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
- event_del (&priv->ev);
- }
+ rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
if (msg->flags & RSPAMD_HTTP_FLAG_SSL) {
gpointer ssl_ctx = (msg->flags & RSPAMD_HTTP_FLAG_SSL_NOVERIFY) ?
priv->ctx->ssl_ctx_noverify : priv->ctx->ssl_ctx;
- event_base_set (priv->ctx->ev_base, &priv->ev);
-
if (!ssl_ctx) {
err = g_error_new (HTTP_ERROR, errno, "ssl message requested "
"with no ssl ctx");
@@ -2249,12 +2214,12 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn
rspamd_ssl_connection_free (priv->ssl);
}
- priv->ssl = rspamd_ssl_connection_new (ssl_ctx, priv->ctx->ev_base,
+ priv->ssl = rspamd_ssl_connection_new (ssl_ctx, priv->ctx->event_loop,
!(msg->flags & RSPAMD_HTTP_FLAG_SSL_NOVERIFY));
g_assert (priv->ssl != NULL);
if (!rspamd_ssl_connect_fd (priv->ssl, conn->fd, host, &priv->ev,
- priv->ptv, rspamd_http_event_handler,
+ priv->timeout, rspamd_http_event_handler,
rspamd_http_ssl_err_handler, conn)) {
err = g_error_new (HTTP_ERROR, errno,
@@ -2270,10 +2235,9 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn
}
}
else {
- event_set (&priv->ev, conn->fd, EV_WRITE, rspamd_http_event_handler, conn);
- event_base_set (priv->ctx->ev_base, &priv->ev);
-
- event_add (&priv->ev, priv->ptv);
+ rspamd_ev_watcher_init (&priv->ev, conn->fd, EV_WRITE,
+ rspamd_http_event_handler, conn);
+ rspamd_ev_watcher_start (priv->ctx->event_loop, &priv->ev, priv->timeout);
}
}
@@ -2283,7 +2247,7 @@ rspamd_http_connection_write_message (struct rspamd_http_connection *conn,
const gchar *host,
const gchar *mime_type,
gpointer ud,
- struct timeval *timeout)
+ ev_tstamp timeout)
{
rspamd_http_connection_write_message_common (conn, msg, host, mime_type,
ud, timeout, FALSE);
@@ -2295,7 +2259,7 @@ rspamd_http_connection_write_message_shared (struct rspamd_http_connection *conn
const gchar *host,
const gchar *mime_type,
gpointer ud,
- struct timeval *timeout)
+ ev_tstamp timeout)
{
rspamd_http_connection_write_message_common (conn, msg, host, mime_type,
ud, timeout, TRUE);
diff --git a/src/libutil/http_connection.h b/src/libutil/http_connection.h
index 6240772da..fc1303446 100644
--- a/src/libutil/http_connection.h
+++ b/src/libutil/http_connection.h
@@ -31,7 +31,7 @@
#include "http_util.h"
#include "addr.h"
-#include <event.h>
+#include "contrib/libev/ev.h"
enum rspamd_http_connection_type {
RSPAMD_HTTP_SERVER,
@@ -221,12 +221,12 @@ gboolean rspamd_http_connection_is_encrypted (struct rspamd_http_connection *con
void rspamd_http_connection_read_message (
struct rspamd_http_connection *conn,
gpointer ud,
- struct timeval *timeout);
+ ev_tstamp timeout);
void rspamd_http_connection_read_message_shared (
struct rspamd_http_connection *conn,
gpointer ud,
- struct timeval *timeout);
+ ev_tstamp timeout);
/**
* Send reply using initialised connection
@@ -241,7 +241,7 @@ void rspamd_http_connection_write_message (
const gchar *host,
const gchar *mime_type,
gpointer ud,
- struct timeval *timeout);
+ ev_tstamp timeout);
void rspamd_http_connection_write_message_shared (
struct rspamd_http_connection *conn,
@@ -249,7 +249,7 @@ void rspamd_http_connection_write_message_shared (
const gchar *host,
const gchar *mime_type,
gpointer ud,
- struct timeval *timeout);
+ ev_tstamp timeout);
/**
* Free connection structure
diff --git a/src/libutil/http_context.c b/src/libutil/http_context.c
index 95500aaad..95ab7021c 100644
--- a/src/libutil/http_context.c
+++ b/src/libutil/http_context.c
@@ -23,6 +23,7 @@
#include "contrib/libottery/ottery.h"
#include "contrib/http-parser/http_parser.h"
#include "rspamd.h"
+#include "libev_helper.h"
INIT_LOG_MODULE(http_context)
@@ -38,7 +39,7 @@ struct rspamd_http_keepalive_cbdata {
struct rspamd_http_context *ctx;
GQueue *queue;
GList *link;
- struct event ev;
+ struct rspamd_io_ev ev;
};
static void
@@ -64,20 +65,16 @@ rspamd_http_keepalive_queue_cleanup (GQueue *conns)
}
static void
-rspamd_http_context_client_rotate_ev (gint fd, short what, void *arg)
+rspamd_http_context_client_rotate_ev (struct ev_loop *loop, ev_timer *w, int revents)
{
- struct timeval rot_tv;
- struct rspamd_http_context *ctx = arg;
+ struct rspamd_http_context *ctx = (struct rspamd_http_context *)w->data;
gpointer kp;
- double_to_tv (ctx->config.client_key_rotate_time, &rot_tv);
- rot_tv.tv_sec += ottery_rand_range (rot_tv.tv_sec);
+ w->repeat = rspamd_time_jitter (ctx->config.client_key_rotate_time, 0);
+ msg_debug_http_context ("rotate local keypair, next rotate in %.0f seconds",
+ w->repeat);
- msg_debug_http_context ("rotate local keypair, next rotate in %d seconds",
- (int)rot_tv.tv_sec);
-
- event_del (&ctx->client_rotate_ev);
- event_add (&ctx->client_rotate_ev, &rot_tv);
+ ev_timer_again (loop, w);
kp = ctx->client_kp;
ctx->client_kp = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX,
@@ -87,7 +84,7 @@ rspamd_http_context_client_rotate_ev (gint fd, short what, void *arg)
static struct rspamd_http_context*
rspamd_http_context_new_default (struct rspamd_config *cfg,
- struct event_base *ev_base,
+ struct ev_loop *ev_base,
struct upstream_ctx *ups_ctx)
{
struct rspamd_http_context *ctx;
@@ -114,7 +111,7 @@ rspamd_http_context_new_default (struct rspamd_config *cfg,
ctx->ssl_ctx_noverify = rspamd_init_ssl_ctx_noverify ();
}
- ctx->ev_base = ev_base;
+ ctx->event_loop = ev_base;
ctx->keep_alive_hash = kh_init (rspamd_keep_alive_hash);
@@ -186,16 +183,14 @@ rspamd_http_context_init (struct rspamd_http_context *ctx)
ctx->server_kp_cache = rspamd_keypair_cache_new (ctx->config.kp_cache_size_server);
}
- if (ctx->config.client_key_rotate_time > 0 && ctx->ev_base) {
- struct timeval tv;
+ if (ctx->config.client_key_rotate_time > 0 && ctx->event_loop) {
double jittered = rspamd_time_jitter (ctx->config.client_key_rotate_time,
0);
- double_to_tv (jittered, &tv);
- event_set (&ctx->client_rotate_ev, -1, EV_TIMEOUT,
- rspamd_http_context_client_rotate_ev, ctx);
- event_base_set (ctx->ev_base, &ctx->client_rotate_ev);
- event_add (&ctx->client_rotate_ev, &tv);
+ ev_timer_init (&ctx->client_rotate_ev,
+ rspamd_http_context_client_rotate_ev, jittered, 0);
+ ev_timer_start (ctx->event_loop, &ctx->client_rotate_ev);
+ ctx->client_rotate_ev.data = ctx;
}
if (ctx->config.http_proxy) {
@@ -208,7 +203,7 @@ rspamd_http_context_init (struct rspamd_http_context *ctx)
struct rspamd_http_context*
rspamd_http_context_create (struct rspamd_config *cfg,
- struct event_base *ev_base,
+ struct ev_loop *ev_base,
struct upstream_ctx *ups_ctx)
{
struct rspamd_http_context *ctx;
@@ -337,7 +332,7 @@ rspamd_http_context_free (struct rspamd_http_context *ctx)
struct rspamd_http_context*
rspamd_http_context_create_config (struct rspamd_http_context_cfg *cfg,
- struct event_base *ev_base,
+ struct ev_loop *ev_base,
struct upstream_ctx *ups_ctx)
{
struct rspamd_http_context *ctx;
@@ -412,7 +407,7 @@ rspamd_http_context_check_keepalive (struct rspamd_http_context *ctx,
struct rspamd_http_connection *conn;
cbd = g_queue_pop_head (conns);
- event_del (&cbd->ev);
+ rspamd_ev_watcher_stop (ctx->event_loop, &cbd->ev);
conn = cbd->conn;
g_free (cbd);
@@ -491,6 +486,7 @@ rspamd_http_keepalive_handler (gint fd, short what, gpointer ud)
cbdata->conn->keepalive_hash_key->host,
cbdata->queue->length);
rspamd_http_connection_unref (cbdata->conn);
+ rspamd_ev_watcher_stop (cbdata->ctx->event_loop, &cbdata->ev);
g_free (cbdata);
}
@@ -498,10 +494,9 @@ void
rspamd_http_context_push_keepalive (struct rspamd_http_context *ctx,
struct rspamd_http_connection *conn,
struct rspamd_http_message *msg,
- struct event_base *ev_base)
+ struct ev_loop *event_loop)
{
struct rspamd_http_keepalive_cbdata *cbdata;
- struct timeval tv;
gdouble timeout = ctx->config.keepalive_interval;
g_assert (conn->keepalive_hash_key != NULL);
@@ -571,17 +566,14 @@ rspamd_http_context_push_keepalive (struct rspamd_http_context *ctx,
cbdata->ctx = ctx;
conn->finished = FALSE;
- event_set (&cbdata->ev, conn->fd, EV_READ|EV_TIMEOUT,
+ rspamd_ev_watcher_init (&cbdata->ev, conn->fd, EV_READ,
rspamd_http_keepalive_handler,
cbdata);
+ rspamd_ev_watcher_start (event_loop, &cbdata->ev, timeout);
msg_debug_http_context ("push keepalive element %s (%s), %d connections queued, %.1f timeout",
rspamd_inet_address_to_string_pretty (cbdata->conn->keepalive_hash_key->addr),
cbdata->conn->keepalive_hash_key->host,
cbdata->queue->length,
timeout);
-
- double_to_tv (timeout, &tv);
- event_base_set (ev_base, &cbdata->ev);
- event_add (&cbdata->ev, &tv);
} \ No newline at end of file
diff --git a/src/libutil/http_context.h b/src/libutil/http_context.h
index 4cf07fb48..c610ffbbd 100644
--- a/src/libutil/http_context.h
+++ b/src/libutil/http_context.h
@@ -21,7 +21,7 @@
#include "ucl.h"
#include "addr.h"
-#include <event.h>
+#include "contrib/libev/ev.h"
struct rspamd_http_context;
struct rspamd_config;
@@ -45,11 +45,11 @@ struct rspamd_http_context_cfg {
* @return new context used for both client and server HTTP connections
*/
struct rspamd_http_context* rspamd_http_context_create (struct rspamd_config *cfg,
- struct event_base *ev_base, struct upstream_ctx *ctx);
+ struct ev_loop *ev_base, struct upstream_ctx *ctx);
struct rspamd_http_context* rspamd_http_context_create_config (
struct rspamd_http_context_cfg *cfg,
- struct event_base *ev_base,
+ struct ev_loop *ev_base,
struct upstream_ctx *ctx);
/**
* Destroys context
@@ -93,6 +93,6 @@ void rspamd_http_context_prepare_keepalive (struct rspamd_http_context *ctx,
void rspamd_http_context_push_keepalive (struct rspamd_http_context *ctx,
struct rspamd_http_connection *conn,
struct rspamd_http_message *msg,
- struct event_base *ev_base);
+ struct ev_loop *ev_base);
#endif
diff --git a/src/libutil/http_private.h b/src/libutil/http_private.h
index 368715891..f5a7dd9cc 100644
--- a/src/libutil/http_private.h
+++ b/src/libutil/http_private.h
@@ -100,8 +100,8 @@ struct rspamd_http_context {
struct upstream_list *http_proxies;
gpointer ssl_ctx;
gpointer ssl_ctx_noverify;
- struct event_base *ev_base;
- struct event client_rotate_ev;
+ struct ev_loop *event_loop;
+ ev_timer client_rotate_ev;
khash_t (rspamd_keep_alive_hash) *keep_alive_hash;
};
diff --git a/src/libutil/http_router.c b/src/libutil/http_router.c
index ec0eeb7b4..8d5913f0d 100644
--- a/src/libutil/http_router.c
+++ b/src/libutil/http_router.c
@@ -92,7 +92,7 @@ rspamd_http_router_error_handler (struct rspamd_http_connection *conn,
NULL,
"text/plain",
entry,
- entry->rt->ptv);
+ entry->rt->timeout);
entry->is_reply = TRUE;
}
}
@@ -210,7 +210,7 @@ rspamd_http_router_try_file (struct rspamd_http_connection_entry *entry,
msg_debug ("requested file %s", realbuf);
rspamd_http_connection_write_message (entry->conn, reply_msg, NULL,
rspamd_http_router_detect_ct (realbuf), entry,
- entry->rt->ptv);
+ entry->rt->timeout);
return TRUE;
}
@@ -235,7 +235,7 @@ rspamd_http_router_send_error (GError *err,
NULL,
"text/plain",
entry,
- entry->rt->ptv);
+ entry->rt->timeout);
}
@@ -369,33 +369,25 @@ rspamd_http_router_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_connection_router *
rspamd_http_router_new (rspamd_http_router_error_handler_t eh,
rspamd_http_router_finish_handler_t fh,
- struct timeval *timeout,
+ ev_tstamp timeout,
const char *default_fs_path,
struct rspamd_http_context *ctx)
{
- struct rspamd_http_connection_router * new;
+ struct rspamd_http_connection_router *nrouter;
struct stat st;
- new = g_malloc0 (sizeof (struct rspamd_http_connection_router));
- new->paths = g_hash_table_new_full (rspamd_ftok_icase_hash,
+ nrouter = g_malloc0 (sizeof (struct rspamd_http_connection_router));
+ nrouter->paths = g_hash_table_new_full (rspamd_ftok_icase_hash,
rspamd_ftok_icase_equal, rspamd_fstring_mapped_ftok_free, NULL);
- new->regexps = g_ptr_array_new ();
- new->conns = NULL;
- new->error_handler = eh;
- new->finish_handler = fh;
- new->response_headers = g_hash_table_new_full (rspamd_strcase_hash,
+ nrouter->regexps = g_ptr_array_new ();
+ nrouter->conns = NULL;
+ nrouter->error_handler = eh;
+ nrouter->finish_handler = fh;
+ nrouter->response_headers = g_hash_table_new_full (rspamd_strcase_hash,
rspamd_strcase_equal, g_free, g_free);
- new->ev_base = ctx->ev_base;
-
- if (timeout) {
- new->tv = *timeout;
- new->ptv = &new->tv;
- }
- else {
- new->ptv = NULL;
- }
-
- new->default_fs_path = NULL;
+ nrouter->event_loop = ctx->event_loop;
+ nrouter->timeout = timeout;
+ nrouter->default_fs_path = NULL;
if (default_fs_path != NULL) {
if (stat (default_fs_path, &st) == -1) {
@@ -406,14 +398,14 @@ rspamd_http_router_new (rspamd_http_router_error_handler_t eh,
msg_err ("path %s is not a directory", default_fs_path);
}
else {
- new->default_fs_path = realpath (default_fs_path, NULL);
+ nrouter->default_fs_path = realpath (default_fs_path, NULL);
}
}
}
- new->ctx = ctx;
+ nrouter->ctx = ctx;
- return new;
+ return nrouter;
}
void
@@ -517,7 +509,7 @@ rspamd_http_router_handle_socket (struct rspamd_http_connection_router *router,
rspamd_http_connection_set_key (conn->conn, router->key);
}
- rspamd_http_connection_read_message (conn->conn, conn, router->ptv);
+ rspamd_http_connection_read_message (conn->conn, conn, router->timeout);
DL_PREPEND (router->conns, conn);
}
diff --git a/src/libutil/http_router.h b/src/libutil/http_router.h
index 8e8056240..b946067b7 100644
--- a/src/libutil/http_router.h
+++ b/src/libutil/http_router.h
@@ -44,9 +44,8 @@ struct rspamd_http_connection_router {
GHashTable *paths;
GHashTable *response_headers;
GPtrArray *regexps;
- struct timeval tv;
- struct timeval *ptv;
- struct event_base *ev_base;
+ ev_tstamp timeout;
+ struct ev_loop *event_loop;
struct rspamd_http_context *ctx;
gchar *default_fs_path;
rspamd_http_router_handler_t unknown_method_handler;
@@ -66,7 +65,7 @@ struct rspamd_http_connection_router {
struct rspamd_http_connection_router * rspamd_http_router_new (
rspamd_http_router_error_handler_t eh,
rspamd_http_router_finish_handler_t fh,
- struct timeval *timeout,
+ ev_tstamp timeout,
const char *default_fs_path,
struct rspamd_http_context *ctx);
diff --git a/src/libutil/libev_helper.c b/src/libutil/libev_helper.c
new file mode 100644
index 000000000..81a23dea6
--- /dev/null
+++ b/src/libutil/libev_helper.c
@@ -0,0 +1,119 @@
+/*-
+ * Copyright 2019 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "libev_helper.h"
+
+static void
+rspamd_ev_watcher_io_cb (EV_P_ struct ev_io *w, int revents)
+{
+ struct rspamd_io_ev *ev = (struct rspamd_io_ev *)w->data;
+
+ ev->last_activity = ev_now (EV_A);
+ ev->cb (ev->io.fd, revents, ev->ud);
+}
+
+static void
+rspamd_ev_watcher_timer_cb (EV_P_ struct ev_timer *w, int revents)
+{
+ struct rspamd_io_ev *ev = (struct rspamd_io_ev *)w->data;
+
+ ev_tstamp after = ev->last_activity - ev_now (EV_A) + ev->timeout;
+
+ if (after < 0.) {
+ /* Real timeout */
+ ev->cb (ev->io.fd, EV_TIMER, ev->ud);
+ }
+ else {
+ /* Start another cycle as there was some activity */
+ w->repeat = after;
+ ev_timer_again (EV_A_ w);
+ }
+}
+
+
+void
+rspamd_ev_watcher_init (struct rspamd_io_ev *ev,
+ int fd,
+ short what,
+ rspamd_ev_cb cb,
+ void *ud)
+{
+ ev_io_init (&ev->io, rspamd_ev_watcher_io_cb, fd, what);
+ ev->io.data = ev;
+ ev_init (&ev->tm, rspamd_ev_watcher_timer_cb);
+ ev->tm.data = ev;
+ ev->ud = ud;
+ ev->cb = cb;
+}
+
+void
+rspamd_ev_watcher_start (struct ev_loop *loop,
+ struct rspamd_io_ev *ev,
+ ev_tstamp timeout)
+{
+ g_assert (ev->cb != NULL);
+
+ ev->last_activity = ev_now (EV_A);
+ ev_io_start (EV_A_ &ev->io);
+
+ if (timeout > 0) {
+ ev->timeout = timeout;
+ ev_timer_set (&ev->tm, timeout, 0.0);
+ ev_timer_start (EV_A_ &ev->tm);
+ }
+}
+
+void
+rspamd_ev_watcher_stop (struct ev_loop *loop,
+ struct rspamd_io_ev *ev)
+{
+ if (ev_is_pending (&ev->io) || ev_is_active (&ev->io)) {
+ ev_io_stop (EV_A_ &ev->io);
+ }
+
+ if (ev->timeout > 0) {
+ ev_timer_stop (EV_A_ &ev->tm);
+ }
+}
+
+void
+rspamd_ev_watcher_reschedule (struct ev_loop *loop,
+ struct rspamd_io_ev *ev,
+ short what)
+{
+ g_assert (ev->cb != NULL);
+
+ if (ev_is_pending (&ev->io) || ev_is_active (&ev->io)) {
+ ev_io_stop (EV_A_ &ev->io);
+ ev_io_set (&ev->io, ev->io.fd, what);
+ ev_io_start (EV_A_ &ev->io);
+ }
+ else {
+ ev->io.data = ev;
+ ev_io_init (&ev->io, rspamd_ev_watcher_io_cb, ev->io.fd, what);
+ ev_io_start (EV_A_ &ev->io);
+ }
+
+ if (ev->timeout > 0) {
+ if (!(ev_is_active (&ev->tm) || ev_is_pending (&ev->tm))) {
+ ev->tm.data = ev;
+ ev_timer_init (&ev->tm, rspamd_ev_watcher_timer_cb, ev->timeout, 0.0);
+ ev_timer_start (EV_A_ &ev->tm);
+ }
+ }
+
+ ev->last_activity = ev_now (EV_A);
+} \ No newline at end of file
diff --git a/src/libutil/libev_helper.h b/src/libutil/libev_helper.h
new file mode 100644
index 000000000..cf52db557
--- /dev/null
+++ b/src/libutil/libev_helper.h
@@ -0,0 +1,78 @@
+/*-
+ * Copyright 2019 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef RSPAMD_LIBEV_HELPER_H
+#define RSPAMD_LIBEV_HELPER_H
+
+#include "config.h"
+#include "contrib/libev/ev.h"
+
+/*
+ * This module is a little helper to simplify libevent->libev transition
+ * It allows to create timed IO watchers utilising both
+ */
+
+typedef void (*rspamd_ev_cb)(int fd, short what, void *ud);
+
+struct rspamd_io_ev {
+ ev_io io;
+ ev_timer tm;
+ rspamd_ev_cb cb;
+ void *ud;
+ ev_tstamp last_activity;
+ ev_tstamp timeout;
+};
+
+/**
+ * Initialize watcher similar to event_init
+ * @param ev
+ * @param fd
+ * @param what
+ * @param cb
+ * @param ud
+ */
+void rspamd_ev_watcher_init (struct rspamd_io_ev *ev,
+ int fd, short what, rspamd_ev_cb cb, void *ud);
+
+/**
+ * Start watcher with the specific timeout
+ * @param loop
+ * @param ev
+ * @param timeout
+ */
+void rspamd_ev_watcher_start (struct ev_loop *loop,
+ struct rspamd_io_ev *ev,
+ ev_tstamp timeout);
+
+/**
+ * Stops watcher and clean it up
+ * @param loop
+ * @param ev
+ */
+void rspamd_ev_watcher_stop (struct ev_loop *loop,
+ struct rspamd_io_ev *ev);
+
+/**
+ * Convenience function to reschedule watcher with different events
+ * @param loop
+ * @param ev
+ * @param what
+ */
+void rspamd_ev_watcher_reschedule (struct ev_loop *loop,
+ struct rspamd_io_ev *ev,
+ short what);
+
+#endif
diff --git a/src/libutil/map.c b/src/libutil/map.c
index fc414ab00..9f43fa253 100644
--- a/src/libutil/map.c
+++ b/src/libutil/map.c
@@ -23,6 +23,7 @@
#include "http_private.h"
#include "rspamd.h"
#include "contrib/zstd/zstd.h"
+#include "contrib/libev/ev.h"
#undef MAP_DEBUG_REFS
#ifdef MAP_DEBUG_REFS
@@ -44,7 +45,7 @@ static void free_http_cbdata_common (struct http_callback_data *cbd,
gboolean plan_new);
static void free_http_cbdata_dtor (gpointer p);
static void free_http_cbdata (struct http_callback_data *cbd);
-static void rspamd_map_periodic_callback (gint fd, short what, void *ud);
+static void rspamd_map_process_periodic (struct map_periodic_cbdata *cbd);
static void rspamd_map_schedule_periodic (struct rspamd_map *map, gboolean locked,
gboolean initial, gboolean errored);
static gboolean read_map_file_chunks (struct rspamd_map *map,
@@ -130,7 +131,7 @@ write_http_request (struct http_callback_data *cbd)
cbd->data->host,
NULL,
cbd,
- &cbd->tv);
+ cbd->timeout);
}
static gboolean
@@ -274,7 +275,12 @@ free_http_cbdata_common (struct http_callback_data *cbd, gboolean plan_new)
MAP_RELEASE (cbd->bk, "rspamd_map_backend");
- MAP_RELEASE (periodic, "periodic");
+
+ if (periodic) {
+ /* Detached in case of HTTP error */
+ MAP_RELEASE (periodic, "periodic");
+ }
+
g_free (cbd);
}
@@ -325,17 +331,21 @@ http_map_error (struct rspamd_http_connection *conn,
cbd->bk->uri,
cbd->addr ? rspamd_inet_address_to_string_pretty (cbd->addr) : "",
err);
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ MAP_RETAIN (cbd->periodic, "periodic");
+ rspamd_map_process_periodic (cbd->periodic);
+ MAP_RELEASE (cbd->periodic, "periodic");
+ /* Detach periodic as rspamd_map_process_periodic will destroy it */
+ cbd->periodic = NULL;
MAP_RELEASE (cbd, "http_callback_data");
}
static void
-rspamd_map_cache_cb (gint fd, short what, gpointer ud)
+rspamd_map_cache_cb (struct ev_loop *loop, ev_timer *w, int revents)
{
- struct rspamd_http_map_cached_cbdata *cache_cbd = ud;
+ struct rspamd_http_map_cached_cbdata *cache_cbd = (struct rspamd_http_map_cached_cbdata *)
+ w->data;
struct rspamd_map *map;
struct http_map_data *data;
- struct timeval tv;
map = cache_cbd->map;
data = cache_cbd->data;
@@ -349,7 +359,7 @@ rspamd_map_cache_cb (gint fd, short what, gpointer ud)
msg_info_map ("cached data is now expired (gen mismatch %L != %L) for %s",
cache_cbd->gen, cache_cbd->data->gen, map->name);
MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata");
- event_del (&cache_cbd->timeout);
+ ev_timer_stop (loop, &cache_cbd->timeout);
g_free (cache_cbd);
}
else if (cache_cbd->data->last_checked >= cache_cbd->last_checked) {
@@ -357,17 +367,25 @@ rspamd_map_cache_cb (gint fd, short what, gpointer ud)
* We checked map but we have not found anything more recent,
* reschedule cache check
*/
+ if (cache_cbd->map->poll_timeout >
+ ev_now (loop) - cache_cbd->data->last_checked) {
+ w->repeat = cache_cbd->map->poll_timeout -
+ (ev_now (loop) - cache_cbd->data->last_checked);
+ }
+ else {
+ w->repeat = cache_cbd->map->poll_timeout;
+ }
+
cache_cbd->last_checked = cache_cbd->data->last_checked;
msg_debug_map ("cached data is up to date for %s", map->name);
- double_to_tv (map->poll_timeout * 2, &tv);
- event_add (&cache_cbd->timeout, &tv);
+ ev_timer_again (loop, &cache_cbd->timeout);
}
else {
data->cur_cache_cbd = NULL;
g_atomic_int_set (&data->cache->available, 0);
MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata");
msg_info_map ("cached data is now expired for %s", map->name);
- event_del (&cache_cbd->timeout);
+ ev_timer_stop (loop, &cache_cbd->timeout);
g_free (cache_cbd);
}
}
@@ -436,7 +454,6 @@ http_map_finish (struct rspamd_http_connection *conn,
struct rspamd_map_backend *bk;
struct http_map_data *data;
struct rspamd_http_map_cached_cbdata *cache_cbd;
- struct timeval tv;
const rspamd_ftok_t *expires_hdr, *etag_hdr;
char next_check_date[128];
guchar *aux_data, *in = NULL;
@@ -456,7 +473,7 @@ http_map_finish (struct rspamd_http_connection *conn,
g_atomic_int_set (&data->cache->available, 0);
data->cur_cache_cbd = NULL;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
MAP_RELEASE (cbd, "http_callback_data");
return 0;
@@ -622,6 +639,8 @@ read_data:
}
/* Check for expires */
+ double cached_timeout = map->poll_timeout * 2;
+
expires_hdr = rspamd_http_message_find_header (msg, "Expires");
if (expires_hdr) {
@@ -635,19 +654,12 @@ read_data:
hdate = MIN (map->next_check, hdate);
}
- double cached_timeout = map->next_check - msg->date +
- map->poll_timeout * 2;
+ cached_timeout = map->next_check - msg->date +
+ map->poll_timeout * 2;
map->next_check = hdate;
- double_to_tv (cached_timeout, &tv);
- }
- else {
- double_to_tv (map->poll_timeout * 2, &tv);
}
}
- else {
- double_to_tv (map->poll_timeout * 2, &tv);
- }
/* Check for etag */
etag_hdr = rspamd_http_message_find_header (msg, "ETag");
@@ -682,16 +694,17 @@ read_data:
data->cache->last_modified = cbd->data->last_modified;
cache_cbd = g_malloc0 (sizeof (*cache_cbd));
cache_cbd->shm = cbd->shmem_data;
+ cache_cbd->event_loop = cbd->event_loop;
cache_cbd->map = map;
cache_cbd->data = cbd->data;
cache_cbd->last_checked = cbd->data->last_checked;
cache_cbd->gen = cbd->data->gen;
MAP_RETAIN (cache_cbd->shm, "shmem_data");
- event_set (&cache_cbd->timeout, -1, EV_TIMEOUT, rspamd_map_cache_cb,
- cache_cbd);
- event_base_set (cbd->ev_base, &cache_cbd->timeout);
- event_add (&cache_cbd->timeout, &tv);
+ ev_timer_init (&cache_cbd->timeout, rspamd_map_cache_cb, cached_timeout,
+ 0.0);
+ ev_timer_start (cbd->event_loop, &cache_cbd->timeout);
+ cache_cbd->timeout.data = cache_cbd;
data->cur_cache_cbd = cache_cbd;
if (map->next_check) {
@@ -700,7 +713,7 @@ read_data:
}
else {
rspamd_http_date_format (next_check_date, sizeof (next_check_date),
- time (NULL) + map->poll_timeout);
+ ev_now (cbd->event_loop) + map->poll_timeout);
}
@@ -773,7 +786,7 @@ read_data:
cbd->periodic->cur_backend ++;
munmap (in, dlen);
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
}
else if (msg->code == 304 && (cbd->check && cbd->stage == map_load_file)) {
cbd->data->last_checked = msg->date;
@@ -819,13 +832,13 @@ read_data:
}
else {
rspamd_http_date_format (next_check_date, sizeof (next_check_date),
- time (NULL) + map->poll_timeout);
+ ev_now (cbd->event_loop) + map->poll_timeout);
}
msg_info_map ("data is not modified for server %s, next check at %s",
cbd->data->host, next_check_date);
cbd->periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
}
else {
msg_info_map ("cannot load map %s from %s: HTTP error %d",
@@ -838,7 +851,7 @@ read_data:
err:
cbd->periodic->errored = 1;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
MAP_RELEASE (cbd, "http_callback_data");
return 0;
@@ -951,6 +964,7 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data,
}
}
+ ev_stat_stat (map->event_loop, &data->st_ev);
len = st.st_size;
if (bk->is_signed) {
@@ -1045,9 +1059,6 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data,
map->read_callback (NULL, 0, &periodic->cbdata, TRUE);
}
- /* Also update at the read time */
- memcpy (&data->st, &st, sizeof (struct stat));
-
return TRUE;
}
@@ -1143,7 +1154,6 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
map = periodic->map;
msg_debug_map ("periodic dtor %p", periodic);
- event_del (&periodic->ev);
if (periodic->need_modify) {
/* We are done */
@@ -1162,6 +1172,16 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
g_free (periodic);
}
+/* Called on timer execution */
+static void
+rspamd_map_periodic_callback (struct ev_loop *loop, ev_timer *w, int revents)
+{
+ struct map_periodic_cbdata *cbd = (struct map_periodic_cbdata *)w->data;
+
+ ev_timer_stop (loop, w);
+ rspamd_map_process_periodic (cbd);
+}
+
static void
rspamd_map_schedule_periodic (struct rspamd_map *map,
gboolean locked, gboolean initial, gboolean errored)
@@ -1221,17 +1241,15 @@ rspamd_map_schedule_periodic (struct rspamd_map *map,
cbd->cbdata.cur_data = NULL;
cbd->cbdata.map = map;
cbd->map = map;
- map->scheduled_check = TRUE;
+ map->scheduled_check = cbd;
REF_INIT_RETAIN (cbd, rspamd_map_periodic_dtor);
- evtimer_set (&cbd->ev, rspamd_map_periodic_callback, cbd);
- event_base_set (map->ev_base, &cbd->ev);
-
+ cbd->ev.data = cbd;
+ ev_timer_init (&cbd->ev, rspamd_map_periodic_callback, jittered_sec, 0.0);
+ ev_timer_start (map->event_loop, &cbd->ev);
msg_debug_map ("schedule new periodic event %p in %.2f seconds",
cbd, jittered_sec);
- double_to_tv (jittered_sec, &map->tv);
- evtimer_add (&cbd->ev, &map->tv);
}
static void
@@ -1286,7 +1304,7 @@ rspamd_map_dns_callback (struct rdns_reply *reply, void *arg)
msg_err_map ("cannot resolve %s: %s", cbd->data->host,
rdns_strerror (reply->code));
cbd->periodic->errored = 1;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
}
}
@@ -1567,7 +1585,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
periodic->need_modify = TRUE;
/* Reset the whole chain */
periodic->cur_backend = 0;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
else {
if (map->active_http) {
@@ -1577,7 +1595,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
else {
/* Switch to the next backend */
periodic->cur_backend++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
}
@@ -1592,7 +1610,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
/* Switch to the next backend */
periodic->cur_backend++;
data->last_modified = data->cache->last_modified;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
return;
}
@@ -1601,7 +1619,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
else if (!map->active_http) {
/* Switch to the next backend */
periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
return;
}
@@ -1609,7 +1627,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
check:
cbd = g_malloc0 (sizeof (struct http_callback_data));
- cbd->ev_base = map->ev_base;
+ cbd->event_loop = map->event_loop;
cbd->map = map;
cbd->data = data;
cbd->check = check;
@@ -1618,7 +1636,6 @@ check:
cbd->bk = bk;
MAP_RETAIN (bk, "rspamd_map_backend");
cbd->stage = map_resolve_host2;
- double_to_tv (map->cfg->map_timeout, &cbd->tv);
REF_INIT_RETAIN (cbd, free_http_cbdata);
msg_debug_map ("%s map data from %s", check ? "checking" : "reading",
@@ -1673,9 +1690,8 @@ check:
}
static void
-rspamd_map_http_check_callback (gint fd, short what, void *ud)
+rspamd_map_http_check_callback (struct map_periodic_cbdata *cbd)
{
- struct map_periodic_cbdata *cbd = ud;
struct rspamd_map *map;
struct rspamd_map_backend *bk;
@@ -1686,9 +1702,8 @@ rspamd_map_http_check_callback (gint fd, short what, void *ud)
}
static void
-rspamd_map_http_read_callback (gint fd, short what, void *ud)
+rspamd_map_http_read_callback (struct map_periodic_cbdata *cbd)
{
- struct map_periodic_cbdata *cbd = ud;
struct rspamd_map *map;
struct rspamd_map_backend *bk;
@@ -1698,43 +1713,36 @@ rspamd_map_http_read_callback (gint fd, short what, void *ud)
}
static void
-rspamd_map_file_check_callback (gint fd, short what, void *ud)
+rspamd_map_file_check_callback (struct map_periodic_cbdata *periodic)
{
struct rspamd_map *map;
- struct map_periodic_cbdata *periodic = ud;
struct file_map_data *data;
struct rspamd_map_backend *bk;
- struct stat st;
map = periodic->map;
-
bk = g_ptr_array_index (map->backends, periodic->cur_backend);
data = bk->data.fd;
- if (stat (data->filename, &st) != -1 &&
- (st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) {
- /* File was modified since last check */
- msg_info_map ("old mtime is %t, new mtime is %t for map file %s",
- data->st.st_mtime, st.st_mtime, data->filename);
- memcpy (&data->st, &st, sizeof (struct stat));
+ if (data->need_modify) {
periodic->need_modify = TRUE;
periodic->cur_backend = 0;
+ data->need_modify = FALSE;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
return;
}
- /* Switch to the next backend */
+ map = periodic->map;
+ /* Switch to the next backend as the rest is handled by ev_stat */
periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
static void
-rspamd_map_static_check_callback (gint fd, short what, void *ud)
+rspamd_map_static_check_callback (struct map_periodic_cbdata *periodic)
{
struct rspamd_map *map;
- struct map_periodic_cbdata *periodic = ud;
struct static_map_data *data;
struct rspamd_map_backend *bk;
@@ -1746,21 +1754,20 @@ rspamd_map_static_check_callback (gint fd, short what, void *ud)
periodic->need_modify = TRUE;
periodic->cur_backend = 0;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
return;
}
/* Switch to the next backend */
periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
static void
-rspamd_map_file_read_callback (gint fd, short what, void *ud)
+rspamd_map_file_read_callback (struct map_periodic_cbdata *periodic)
{
struct rspamd_map *map;
- struct map_periodic_cbdata *periodic = ud;
struct file_map_data *data;
struct rspamd_map_backend *bk;
@@ -1777,14 +1784,13 @@ rspamd_map_file_read_callback (gint fd, short what, void *ud)
/* Switch to the next backend */
periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
static void
-rspamd_map_static_read_callback (gint fd, short what, void *ud)
+rspamd_map_static_read_callback (struct map_periodic_cbdata *periodic)
{
struct rspamd_map *map;
- struct map_periodic_cbdata *periodic = ud;
struct static_map_data *data;
struct rspamd_map_backend *bk;
@@ -1801,18 +1807,17 @@ rspamd_map_static_read_callback (gint fd, short what, void *ud)
/* Switch to the next backend */
periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
static void
-rspamd_map_periodic_callback (gint fd, short what, void *ud)
+rspamd_map_process_periodic (struct map_periodic_cbdata *cbd)
{
struct rspamd_map_backend *bk;
- struct map_periodic_cbdata *cbd = ud;
struct rspamd_map *map;
map = cbd->map;
- map->scheduled_check = FALSE;
+ map->scheduled_check = NULL;
if (!cbd->locked) {
if (!g_atomic_int_compare_and_exchange (cbd->map->locked, 0, 1)) {
@@ -1863,13 +1868,13 @@ rspamd_map_periodic_callback (gint fd, short what, void *ud)
switch (bk->protocol) {
case MAP_PROTO_HTTP:
case MAP_PROTO_HTTPS:
- rspamd_map_http_read_callback (fd, what, cbd);
+ rspamd_map_http_read_callback (cbd);
break;
case MAP_PROTO_FILE:
- rspamd_map_file_read_callback (fd, what, cbd);
+ rspamd_map_file_read_callback (cbd);
break;
case MAP_PROTO_STATIC:
- rspamd_map_static_read_callback (fd, what, cbd);
+ rspamd_map_static_read_callback (cbd);
break;
}
} else {
@@ -1877,34 +1882,70 @@ rspamd_map_periodic_callback (gint fd, short what, void *ud)
switch (bk->protocol) {
case MAP_PROTO_HTTP:
case MAP_PROTO_HTTPS:
- rspamd_map_http_check_callback (fd, what, cbd);
+ rspamd_map_http_check_callback (cbd);
break;
case MAP_PROTO_FILE:
- rspamd_map_file_check_callback (fd, what, cbd);
+ rspamd_map_file_check_callback (cbd);
break;
case MAP_PROTO_STATIC:
- rspamd_map_static_check_callback (fd, what, cbd);
+ rspamd_map_static_check_callback (cbd);
break;
}
}
}
}
+static void
+rspamd_map_on_stat (struct ev_loop *loop, ev_stat *w, int revents)
+{
+ struct rspamd_map *map = (struct rspamd_map *)w->data;
+
+ if (w->attr.st_nlink > 0) {
+
+ if (w->attr.st_mtime > w->prev.st_mtime) {
+ msg_info_map ("old mtime is %t, new mtime is %t for map file %s",
+ w->prev.st_mtime, w->attr.st_mtime, w->path);
+
+ /* Fire need modify flag */
+ struct rspamd_map_backend *bk;
+ guint i;
+
+ PTR_ARRAY_FOREACH (map->backends, i, bk) {
+ if (bk->protocol == MAP_PROTO_FILE) {
+ bk->data.fd->need_modify = TRUE;
+ }
+ }
+
+ map->next_check = 0;
+
+ if (map->scheduled_check) {
+ ev_timer_stop (map->event_loop, &map->scheduled_check->ev);
+ MAP_RELEASE (map->scheduled_check, "rspamd_map_on_stat");
+ map->scheduled_check = NULL;
+ }
+
+ rspamd_map_schedule_periodic (map, FALSE, TRUE, FALSE);
+ }
+ }
+}
+
/* Start watching event for all maps */
void
rspamd_map_watch (struct rspamd_config *cfg,
- struct event_base *ev_base,
+ struct ev_loop *event_loop,
struct rspamd_dns_resolver *resolver,
struct rspamd_worker *worker,
gboolean active_http)
{
GList *cur = cfg->maps;
struct rspamd_map *map;
+ struct rspamd_map_backend *bk;
+ guint i;
/* First of all do synced read of data */
while (cur) {
map = cur->data;
- map->ev_base = ev_base;
+ map->event_loop = event_loop;
map->r = resolver;
map->wrk = worker;
@@ -1922,6 +1963,21 @@ rspamd_map_watch (struct rspamd_config *cfg,
}
}
+ PTR_ARRAY_FOREACH (map->backends, i, bk) {
+ bk->event_loop = event_loop;
+
+ if (bk->protocol == MAP_PROTO_FILE) {
+ struct file_map_data *data;
+
+ data = bk->data.fd;
+
+ ev_stat_init (&data->st_ev, rspamd_map_on_stat,
+ data->filename, map->poll_timeout * cfg->map_file_watch_multiplier);
+ data->st_ev.data = map;
+ ev_stat_start (event_loop, &data->st_ev);
+ }
+ }
+
rspamd_map_schedule_periodic (map, FALSE, TRUE, FALSE);
cur = g_list_next (cur);
@@ -2215,6 +2271,7 @@ rspamd_map_backend_dtor (struct rspamd_map_backend *bk)
switch (bk->protocol) {
case MAP_PROTO_FILE:
if (bk->data.fd) {
+ ev_stat_stop (bk->event_loop, &bk->data.fd->st_ev);
g_free (bk->data.fd->filename);
g_free (bk->data.fd);
}
@@ -2249,7 +2306,8 @@ rspamd_map_backend_dtor (struct rspamd_map_backend *bk)
if (data->cur_cache_cbd) {
MAP_RELEASE (data->cur_cache_cbd->shm,
"rspamd_http_map_cached_cbdata");
- event_del (&data->cur_cache_cbd->timeout);
+ ev_timer_stop (data->cur_cache_cbd->event_loop,
+ &data->cur_cache_cbd->timeout);
g_free (data->cur_cache_cbd);
data->cur_cache_cbd = NULL;
}
@@ -2308,7 +2366,6 @@ rspamd_map_parse_backend (struct rspamd_config *cfg, const gchar *map_line)
/* Now check for each proto separately */
if (bk->protocol == MAP_PROTO_FILE) {
fdata = g_malloc0 (sizeof (struct file_map_data));
- fdata->st.st_mtime = -1;
if (access (bk->uri, R_OK) == -1) {
if (errno != ENOENT) {
diff --git a/src/libutil/map.h b/src/libutil/map.h
index acf6eea4e..9f04d4c6c 100644
--- a/src/libutil/map.h
+++ b/src/libutil/map.h
@@ -2,7 +2,7 @@
#define RSPAMD_MAP_H
#include "config.h"
-#include <event.h>
+#include "contrib/libev/ev.h"
#include "ucl.h"
#include "mem_pool.h"
@@ -79,7 +79,7 @@ struct rspamd_map* rspamd_map_add_from_ucl (struct rspamd_config *cfg,
* Start watching of maps by adding events to libevent event loop
*/
void rspamd_map_watch (struct rspamd_config *cfg,
- struct event_base *ev_base,
+ struct ev_loop *event_loop,
struct rspamd_dns_resolver *resolver,
struct rspamd_worker *worker,
gboolean active_http);
diff --git a/src/libutil/map_private.h b/src/libutil/map_private.h
index b32f0e390..e08c2dce3 100644
--- a/src/libutil/map_private.h
+++ b/src/libutil/map_private.h
@@ -54,14 +54,16 @@ enum fetch_proto {
*/
struct file_map_data {
gchar *filename;
- struct stat st;
+ gboolean need_modify;
+ ev_stat st_ev;
};
struct http_map_data;
struct rspamd_http_map_cached_cbdata {
- struct event timeout;
+ ev_timer timeout;
+ struct ev_loop *event_loop;
struct rspamd_storage_shmem *shm;
struct rspamd_map *map;
struct http_map_data *data;
@@ -114,6 +116,7 @@ struct rspamd_map_backend {
gboolean is_signed;
gboolean is_compressed;
gboolean is_fallback;
+ struct ev_loop *event_loop;
guint32 id;
struct rspamd_cryptobox_pubkey *trusted_pubkey;
union rspamd_map_backend_data data;
@@ -121,6 +124,8 @@ struct rspamd_map_backend {
ref_entry_t ref;
};
+struct map_periodic_cbdata;
+
struct rspamd_map {
struct rspamd_dns_resolver *r;
struct rspamd_config *cfg;
@@ -130,12 +135,12 @@ struct rspamd_map {
map_fin_cb_t fin_callback;
map_dtor_t dtor;
void **user_data;
- struct event_base *ev_base;
+ struct ev_loop *event_loop;
struct rspamd_worker *wrk;
gchar *description;
gchar *name;
guint32 id;
- gboolean scheduled_check;
+ struct map_periodic_cbdata *scheduled_check;
rspamd_map_tmp_dtor tmp_dtor;
gpointer tmp_dtor_data;
rspamd_map_traverse_function traverse_function;
@@ -143,7 +148,7 @@ struct rspamd_map {
gsize nelts;
guint64 digest;
/* Should we check HTTP or just load cached data */
- struct timeval tv;
+ ev_tstamp timeout;
gdouble poll_timeout;
time_t next_check;
gboolean active_http;
@@ -164,7 +169,7 @@ enum rspamd_map_http_stage {
struct map_periodic_cbdata {
struct rspamd_map *map;
struct map_cb_data cbdata;
- struct event ev;
+ ev_timer ev;
gboolean need_modify;
gboolean errored;
gboolean locked;
@@ -183,7 +188,7 @@ struct rspamd_http_file_data {
};
struct http_callback_data {
- struct event_base *ev_base;
+ struct ev_loop *event_loop;
struct rspamd_http_connection *conn;
rspamd_inet_addr_t *addr;
struct rspamd_map *map;
@@ -191,16 +196,15 @@ struct http_callback_data {
struct http_map_data *data;
struct map_periodic_cbdata *periodic;
struct rspamd_cryptobox_pubkey *pk;
- gboolean check;
struct rspamd_storage_shmem *shmem_data;
struct rspamd_storage_shmem *shmem_sig;
struct rspamd_storage_shmem *shmem_pubkey;
gsize data_len;
gsize sig_len;
gsize pubkey_len;
-
+ gboolean check;
enum rspamd_map_http_stage stage;
- struct timeval tv;
+ ev_tstamp timeout;
ref_entry_t ref;
};
diff --git a/src/libutil/ssl_util.c b/src/libutil/ssl_util.c
index 95245aa4c..7d4612b3d 100644
--- a/src/libutil/ssl_util.c
+++ b/src/libutil/ssl_util.c
@@ -45,9 +45,8 @@ struct rspamd_ssl_connection {
gboolean verify_peer;
SSL *ssl;
gchar *hostname;
- struct event *ev;
- struct event_base *ev_base;
- struct timeval *tv;
+ struct rspamd_io_ev *ev;
+ struct ev_loop *event_loop;
rspamd_ssl_handler_t handler;
rspamd_ssl_error_handler_t err_handler;
gpointer handler_data;
@@ -407,7 +406,7 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud)
gint ret;
GError *err = NULL;
- if (what == EV_TIMEOUT) {
+ if (what == EV_TIMER) {
c->shut = ssl_shut_unclean;
}
@@ -417,7 +416,7 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud)
ret = SSL_connect (c->ssl);
if (ret == 1) {
- event_del (c->ev);
+ rspamd_ev_watcher_stop (c->event_loop, c->ev);
/* Verify certificate */
if ((!c->verify_peer) || rspamd_ssl_peer_verify (c)) {
c->state = ssl_conn_connected;
@@ -437,40 +436,30 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud)
what = EV_WRITE;
}
else {
+ rspamd_ev_watcher_stop (c->event_loop, c->ev);
rspamd_tls_set_error (ret, "connect", &err);
c->err_handler (c->handler_data, err);
g_error_free (err);
return;
}
- event_del (c->ev);
- event_set (c->ev, fd, what, rspamd_ssl_event_handler, c);
- event_base_set (c->ev_base, c->ev);
- event_add (c->ev, c->tv);
+ rspamd_ev_watcher_reschedule (c->event_loop, c->ev, what);
+
}
break;
case ssl_next_read:
- event_del (c->ev);
- /* Restore handler */
- event_set (c->ev, c->fd, EV_READ|EV_PERSIST,
- c->handler, c->handler_data);
- event_base_set (c->ev_base, c->ev);
- event_add (c->ev, c->tv);
+ rspamd_ev_watcher_reschedule (c->event_loop, c->ev, EV_READ);
c->state = ssl_conn_connected;
c->handler (fd, EV_READ, c->handler_data);
break;
case ssl_next_write:
case ssl_conn_connected:
- event_del (c->ev);
- /* Restore handler */
- event_set (c->ev, c->fd, EV_WRITE,
- c->handler, c->handler_data);
- event_base_set (c->ev_base, c->ev);
- event_add (c->ev, c->tv);
+ rspamd_ev_watcher_reschedule (c->event_loop, c->ev, what);
c->state = ssl_conn_connected;
- c->handler (fd, EV_WRITE, c->handler_data);
+ c->handler (fd, what, c->handler_data);
break;
default:
+ rspamd_ev_watcher_stop (c->event_loop, c->ev);
g_set_error (&err, rspamd_ssl_quark (), EINVAL,
"ssl bad state error: %d", c->state);
c->err_handler (c->handler_data, err);
@@ -480,7 +469,7 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud)
}
struct rspamd_ssl_connection *
-rspamd_ssl_connection_new (gpointer ssl_ctx, struct event_base *ev_base,
+rspamd_ssl_connection_new (gpointer ssl_ctx, struct ev_loop *ev_base,
gboolean verify_peer)
{
struct rspamd_ssl_connection *c;
@@ -488,7 +477,7 @@ rspamd_ssl_connection_new (gpointer ssl_ctx, struct event_base *ev_base,
g_assert (ssl_ctx != NULL);
c = g_malloc0 (sizeof (*c));
c->ssl = SSL_new (ssl_ctx);
- c->ev_base = ev_base;
+ c->event_loop = ev_base;
c->verify_peer = verify_peer;
return c;
@@ -497,7 +486,7 @@ rspamd_ssl_connection_new (gpointer ssl_ctx, struct event_base *ev_base,
gboolean
rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd,
- const gchar *hostname, struct event *ev, struct timeval *tv,
+ const gchar *hostname, struct rspamd_io_ev *ev, ev_tstamp timeout,
rspamd_ssl_handler_t handler, rspamd_ssl_error_handler_t err_handler,
gpointer handler_data)
{
@@ -534,17 +523,9 @@ rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd,
if (ret == 1) {
conn->state = ssl_conn_connected;
- if (rspamd_event_pending (ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
- event_del (ev);
- }
-
- event_set (ev, fd, EV_WRITE, rspamd_ssl_event_handler, conn);
-
- if (conn->ev_base) {
- event_base_set (conn->ev_base, ev);
- }
-
- event_add (ev, tv);
+ rspamd_ev_watcher_stop (conn->event_loop, ev);
+ rspamd_ev_watcher_init (ev, fd, EV_WRITE, rspamd_ssl_event_handler, conn);
+ rspamd_ev_watcher_start (conn->event_loop, ev, timeout);
}
else {
ret = SSL_get_error (conn->ssl, ret);
@@ -561,13 +542,10 @@ rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd,
return FALSE;
}
- if (rspamd_event_pending (ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
- event_del (ev);
- }
-
- event_set (ev, fd, what, rspamd_ssl_event_handler, conn);
- event_base_set (conn->ev_base, ev);
- event_add (ev, tv);
+ rspamd_ev_watcher_stop (conn->event_loop, ev);
+ rspamd_ev_watcher_init (ev, fd, EV_WRITE|EV_READ,
+ rspamd_ssl_event_handler, conn);
+ rspamd_ev_watcher_start (conn->event_loop, ev, timeout);
}
return TRUE;
@@ -638,13 +616,8 @@ rspamd_ssl_read (struct rspamd_ssl_connection *conn, gpointer buf,
return -1;
}
- event_del (conn->ev);
- event_set (conn->ev, conn->fd, what, rspamd_ssl_event_handler, conn);
- event_base_set (conn->ev_base, conn->ev);
- event_add (conn->ev, conn->tv);
-
+ rspamd_ev_watcher_reschedule (conn->event_loop, conn->ev, what);
errno = EAGAIN;
-
}
return -1;
@@ -713,11 +686,7 @@ rspamd_ssl_write (struct rspamd_ssl_connection *conn, gconstpointer buf,
return -1;
}
- event_del (conn->ev);
- event_set (conn->ev, conn->fd, what, rspamd_ssl_event_handler, conn);
- event_base_set (conn->ev_base, conn->ev);
- event_add (conn->ev, conn->tv);
-
+ rspamd_ev_watcher_reschedule (conn->event_loop, conn->ev, what);
errno = EAGAIN;
}
diff --git a/src/libutil/ssl_util.h b/src/libutil/ssl_util.h
index 73a940e00..f7f1652de 100644
--- a/src/libutil/ssl_util.h
+++ b/src/libutil/ssl_util.h
@@ -18,6 +18,7 @@
#include "config.h"
#include "libutil/addr.h"
+#include "libutil/libev_helper.h"
struct rspamd_ssl_connection;
@@ -30,7 +31,7 @@ typedef void (*rspamd_ssl_error_handler_t)(gpointer d, GError *err);
* @return opaque connection data
*/
struct rspamd_ssl_connection * rspamd_ssl_connection_new (gpointer ssl_ctx,
- struct event_base *ev_base, gboolean verify_peer);
+ struct ev_loop *ev_base, gboolean verify_peer);
/**
* Connects SSL session using the specified (connected) FD
@@ -44,7 +45,7 @@ struct rspamd_ssl_connection * rspamd_ssl_connection_new (gpointer ssl_ctx,
* @return TRUE if a session has been connected
*/
gboolean rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd,
- const gchar *hostname, struct event *ev, struct timeval *tv,
+ const gchar *hostname, struct rspamd_io_ev *ev, ev_tstamp timeout,
rspamd_ssl_handler_t handler, rspamd_ssl_error_handler_t err_handler,
gpointer handler_data);
diff --git a/src/libutil/str_util.h b/src/libutil/str_util.h
index 8e8898a32..6fbb11ccf 100644
--- a/src/libutil/str_util.h
+++ b/src/libutil/str_util.h
@@ -83,10 +83,18 @@ gsize rspamd_strlcpy_safe (gchar *dst, const gchar *src, gsize siz);
# if __has_feature(address_sanitizer)
# define rspamd_strlcpy rspamd_strlcpy_safe
# else
-# define rspamd_strlcpy rspamd_strlcpy_fast
+# ifdef __SANITIZE_ADDRESS__
+# define rspamd_strlcpy rspamd_strlcpy_safe
+# else
+# define rspamd_strlcpy rspamd_strlcpy_fast
+# endif
# endif
#else
-# define rspamd_strlcpy rspamd_strlcpy_fast
+# ifdef __SANITIZE_ADDRESS__
+# define rspamd_strlcpy rspamd_strlcpy_safe
+# else
+# define rspamd_strlcpy rspamd_strlcpy_fast
+# endif
#endif
/**
diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c
index 3e04e68e9..c445751b4 100644
--- a/src/libutil/upstream.c
+++ b/src/libutil/upstream.c
@@ -50,7 +50,7 @@ struct upstream {
guint dns_requests;
gint active_idx;
gchar *name;
- struct event ev;
+ ev_timer ev;
gdouble last_fail;
gpointer ud;
struct upstream_list *ls;
@@ -92,7 +92,7 @@ struct upstream_list {
struct upstream_ctx {
struct rdns_resolver *res;
- struct event_base *ev_base;
+ struct ev_loop *event_loop;
struct upstream_limits limits;
GQueue *upstreams;
gboolean configured;
@@ -119,7 +119,7 @@ static guint default_dns_retransmits = 2;
void
rspamd_upstreams_library_config (struct rspamd_config *cfg,
struct upstream_ctx *ctx,
- struct event_base *ev_base,
+ struct ev_loop *event_loop,
struct rdns_resolver *resolver)
{
g_assert (ctx != NULL);
@@ -141,7 +141,7 @@ rspamd_upstreams_library_config (struct rspamd_config *cfg,
ctx->limits.dns_timeout = cfg->dns_timeout;
}
- ctx->ev_base = ev_base;
+ ctx->event_loop = event_loop;
ctx->res = resolver;
ctx->configured = TRUE;
}
@@ -366,12 +366,12 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
}
static void
-rspamd_upstream_revive_cb (int fd, short what, void *arg)
+rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents)
{
- struct upstream *up = (struct upstream *)arg;
+ struct upstream *up = (struct upstream *)w->data;
RSPAMD_UPSTREAM_LOCK (up->lock);
- event_del (&up->ev);
+ ev_timer_stop (loop, w);
if (up->ls) {
rspamd_upstream_set_active (up->ls, up);
}
@@ -414,7 +414,6 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
gdouble ntim;
guint i;
struct upstream *cur;
- struct timeval tv;
struct upstream_list_watcher *w;
RSPAMD_UPSTREAM_LOCK (ls->lock);
@@ -431,15 +430,14 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
rspamd_upstream_resolve_addrs (ls, up);
REF_RETAIN (up);
- evtimer_set (&up->ev, rspamd_upstream_revive_cb, up);
- if (up->ctx->ev_base != NULL && up->ctx->configured) {
- event_base_set (up->ctx->ev_base, &up->ev);
- }
-
ntim = rspamd_time_jitter (ls->limits.revive_time,
ls->limits.revive_jitter);
- double_to_tv (ntim, &tv);
- event_add (&up->ev, &tv);
+ ev_timer_init (&up->ev, rspamd_upstream_revive_cb, ntim, 0);
+ up->ev.data = up;
+
+ if (up->ctx->event_loop != NULL && up->ctx->configured) {
+ ev_timer_start (up->ctx->event_loop, &up->ev);
+ }
}
DL_FOREACH (up->ls->watchers, w) {
@@ -915,9 +913,7 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
/* Here the upstreams list is already locked */
RSPAMD_UPSTREAM_LOCK (up->lock);
- if (rspamd_event_pending (&up->ev, EV_TIMEOUT)) {
- event_del (&up->ev);
- }
+ ev_timer_stop (up->ctx->event_loop, &up->ev);
g_ptr_array_add (ups->alive, up);
up->active_idx = ups->alive->len - 1;
RSPAMD_UPSTREAM_UNLOCK (up->lock);
diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h
index 75d840ce2..89ac0ee9e 100644
--- a/src/libutil/upstream.h
+++ b/src/libutil/upstream.h
@@ -41,7 +41,7 @@ void rspamd_upstreams_library_unref (struct upstream_ctx *ctx);
* @param cfg
*/
void rspamd_upstreams_library_config (struct rspamd_config *cfg,
- struct upstream_ctx *ctx, struct event_base *ev_base,
+ struct upstream_ctx *ctx, struct ev_loop *event_loop,
struct rdns_resolver *resolver);
/**
diff --git a/src/libutil/util.c b/src/libutil/util.c
index df10bf912..e7a5c2601 100644
--- a/src/libutil/util.c
+++ b/src/libutil/util.c
@@ -1612,42 +1612,6 @@ rspamd_thread_func (gpointer ud)
return ud;
}
-/**
- * Create new named thread
- * @param name name pattern
- * @param func function to start
- * @param data data to pass to function
- * @param err error pointer
- * @return new thread object that can be joined
- */
-GThread *
-rspamd_create_thread (const gchar *name,
- GThreadFunc func,
- gpointer data,
- GError **err)
-{
- GThread *new;
- struct rspamd_thread_data *td;
- static gint32 id;
- guint r;
-
- r = strlen (name);
- td = g_malloc (sizeof (struct rspamd_thread_data));
- td->id = ++id;
- td->name = g_malloc (r + sizeof ("4294967296"));
- td->func = func;
- td->data = data;
-
- rspamd_snprintf (td->name, r + sizeof ("4294967296"), "%s-%d", name, id);
-#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 32))
- new = g_thread_try_new (td->name, rspamd_thread_func, td, err);
-#else
- new = g_thread_create (rspamd_thread_func, td, TRUE, err);
-#endif
-
- return new;
-}
-
struct hash_copy_callback_data {
gpointer (*key_copy_func)(gconstpointer data, gpointer ud);
gpointer (*value_copy_func)(gconstpointer data, gpointer ud);
@@ -2570,24 +2534,6 @@ rspamd_constant_memcmp (const guchar *a, const guchar *b, gsize len)
return (((gint32)(guint16)((guint32)r + 0x8000) - 0x8000) == 0);
}
-#if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x02000000UL
-struct event_base *
-event_get_base (struct event *ev)
-{
- return ev->ev_base;
-}
-#endif
-
-int
-rspamd_event_pending (struct event *ev, short what)
-{
- if (ev->ev_base == NULL) {
- return 0;
- }
-
- return event_pending (ev, what, NULL);
-}
-
int
rspamd_file_xopen (const char *fname, int oflags, guint mode,
gboolean allow_symlink)
diff --git a/src/libutil/util.h b/src/libutil/util.h
index 9d12285d4..21e4b320e 100644
--- a/src/libutil/util.h
+++ b/src/libutil/util.h
@@ -12,7 +12,7 @@
#include <netdb.h>
#endif
-#include <event.h>
+#include "contrib/libev/ev.h"
#include <time.h>
struct rspamd_config;
@@ -263,19 +263,6 @@ void rspamd_mutex_unlock (rspamd_mutex_t *mtx);
void rspamd_mutex_free (rspamd_mutex_t *mtx);
/**
- * Create new named thread
- * @param name name pattern
- * @param func function to start
- * @param data data to pass to function
- * @param err error pointer
- * @return new thread object that can be joined
- */
-GThread * rspamd_create_thread (const gchar *name,
- GThreadFunc func,
- gpointer data,
- GError **err);
-
-/**
* Deep copy of one hash table to another
* @param src source hash
* @param dst destination hash
@@ -426,19 +413,6 @@ void rspamd_random_seed_fast (void);
*/
gboolean rspamd_constant_memcmp (const guchar *a, const guchar *b, gsize len);
-/* Special case for ancient libevent */
-#if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x02000000UL
-struct event_base * event_get_base (struct event *ev);
-#endif
-/* CentOS libevent */
-#ifndef evsignal_set
-#define evsignal_set(ev, x, cb, arg) \
- event_set((ev), (x), EV_SIGNAL|EV_PERSIST, (cb), (arg))
-#endif
-
-/* Avoid stupidity in libevent > 1.4 */
-int rspamd_event_pending (struct event *ev, short what);
-
/**
* Open file without following symlinks or special stuff
* @param fname filename