diff options
Diffstat (limited to 'src/libutil')
-rw-r--r-- | src/libutil/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/libutil/addr.c | 39 | ||||
-rw-r--r-- | src/libutil/addr.h | 10 | ||||
-rw-r--r-- | src/libutil/aio_event.c | 508 | ||||
-rw-r--r-- | src/libutil/aio_event.h | 59 | ||||
-rw-r--r-- | src/libutil/http_connection.c | 94 | ||||
-rw-r--r-- | src/libutil/http_connection.h | 10 | ||||
-rw-r--r-- | src/libutil/http_context.c | 52 | ||||
-rw-r--r-- | src/libutil/http_context.h | 8 | ||||
-rw-r--r-- | src/libutil/http_private.h | 4 | ||||
-rw-r--r-- | src/libutil/http_router.c | 46 | ||||
-rw-r--r-- | src/libutil/http_router.h | 7 | ||||
-rw-r--r-- | src/libutil/libev_helper.c | 119 | ||||
-rw-r--r-- | src/libutil/libev_helper.h | 78 | ||||
-rw-r--r-- | src/libutil/map.c | 233 | ||||
-rw-r--r-- | src/libutil/map.h | 4 | ||||
-rw-r--r-- | src/libutil/map_private.h | 24 | ||||
-rw-r--r-- | src/libutil/ssl_util.c | 77 | ||||
-rw-r--r-- | src/libutil/ssl_util.h | 5 | ||||
-rw-r--r-- | src/libutil/str_util.h | 12 | ||||
-rw-r--r-- | src/libutil/upstream.c | 32 | ||||
-rw-r--r-- | src/libutil/upstream.h | 2 | ||||
-rw-r--r-- | src/libutil/util.c | 54 | ||||
-rw-r--r-- | src/libutil/util.h | 28 |
24 files changed, 506 insertions, 1001 deletions
diff --git a/src/libutil/CMakeLists.txt b/src/libutil/CMakeLists.txt index f86d650f0..5a94a732c 100644 --- a/src/libutil/CMakeLists.txt +++ b/src/libutil/CMakeLists.txt @@ -1,7 +1,7 @@ # Librspamd-util SET(LIBRSPAMDUTILSRC ${CMAKE_CURRENT_SOURCE_DIR}/addr.c - ${CMAKE_CURRENT_SOURCE_DIR}/aio_event.c + ${CMAKE_CURRENT_SOURCE_DIR}/libev_helper.c ${CMAKE_CURRENT_SOURCE_DIR}/bloom.c ${CMAKE_CURRENT_SOURCE_DIR}/expression.c ${CMAKE_CURRENT_SOURCE_DIR}/fstring.c diff --git a/src/libutil/addr.c b/src/libutil/addr.c index 30a9ce66a..112c5d2cd 100644 --- a/src/libutil/addr.c +++ b/src/libutil/addr.c @@ -203,41 +203,10 @@ rspamd_ip_is_valid (const rspamd_inet_addr_t *addr) return ret; } -static void -rspamd_enable_accept_event (gint fd, short what, gpointer d) -{ - struct event *events = d; - - event_del (&events[1]); - event_add (&events[0], NULL); -} - -static void -rspamd_disable_accept_events (gint sock, GList *accept_events) -{ - GList *cur; - struct event *events; - const gdouble throttling = 0.5; - struct timeval tv; - struct event_base *ev_base; - - double_to_tv (throttling, &tv); - - for (cur = accept_events; cur != NULL; cur = g_list_next (cur)) { - events = cur->data; - - ev_base = event_get_base (&events[0]); - event_del (&events[0]); - event_set (&events[1], sock, EV_TIMEOUT, rspamd_enable_accept_event, - events); - event_base_set (ev_base, &events[1]); - event_add (&events[1], &tv); - } -} - gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **target, - GList *accept_events) + rspamd_accept_throttling_handler hdl, + void *hdl_data) { gint nfd, serrno; union sa_union su; @@ -254,7 +223,9 @@ rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **target, } else if (errno == EMFILE || errno == ENFILE) { /* Temporary disable accept event */ - rspamd_disable_accept_events (sock, accept_events); + if (hdl) { + hdl (sock, hdl_data); + } return 0; } diff --git a/src/libutil/addr.h b/src/libutil/addr.h index bfe586ad1..7efa5e318 100644 --- a/src/libutil/addr.h +++ b/src/libutil/addr.h @@ -221,15 +221,17 @@ int rspamd_inet_address_listen (const rspamd_inet_addr_t *addr, gint type, */ gboolean rspamd_ip_is_valid (const rspamd_inet_addr_t *addr); +typedef void (*rspamd_accept_throttling_handler)(gint, void *); /** * Accept from listening socket filling addr structure * @param sock listening socket - * @param addr allocated inet addr structure - * @param accept_events events for accepting new sockets + * @param target allocated inet addr structure * @return */ -gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **addr, - GList *accept_events); +gint rspamd_accept_from_socket (gint sock, + rspamd_inet_addr_t **target, + rspamd_accept_throttling_handler hdl, + void *hdl_data); /** * Parse host[:port[:priority]] line diff --git a/src/libutil/aio_event.c b/src/libutil/aio_event.c deleted file mode 100644 index 584feb501..000000000 --- a/src/libutil/aio_event.c +++ /dev/null @@ -1,508 +0,0 @@ -/*- - * Copyright 2016 Vsevolod Stakhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "config.h" -#include <event.h> -#include "aio_event.h" -#include "rspamd.h" -#include "unix-std.h" - -#ifdef HAVE_SYS_EVENTFD_H -#include <sys/eventfd.h> -#endif - -#ifdef HAVE_AIO_H -#include <aio.h> -#endif - -/* Linux syscall numbers */ -#if defined(__i386__) -# define SYS_io_setup 245 -# define SYS_io_destroy 246 -# define SYS_io_getevents 247 -# define SYS_io_submit 248 -# define SYS_io_cancel 249 -#elif defined(__x86_64__) -# define SYS_io_setup 206 -# define SYS_io_destroy 207 -# define SYS_io_getevents 208 -# define SYS_io_submit 209 -# define SYS_io_cancel 210 -#else -# warning \ - "aio is not supported on this platform, please contact author for details" -# define SYS_io_setup 0 -# define SYS_io_destroy 0 -# define SYS_io_getevents 0 -# define SYS_io_submit 0 -# define SYS_io_cancel 0 -#endif - -#define SYS_eventfd 323 -#define MAX_AIO_EV 64 - -struct io_cbdata { - gint fd; - rspamd_aio_cb cb; - guint64 len; - gpointer buf; - gpointer io_buf; - gpointer ud; -}; - -#ifdef LINUX - -/* Linux specific mappings and utilities to avoid using of libaio */ - -typedef unsigned long aio_context_t; - -typedef enum io_iocb_cmd { - IO_CMD_PREAD = 0, - IO_CMD_PWRITE = 1, - - IO_CMD_FSYNC = 2, - IO_CMD_FDSYNC = 3, - - IO_CMD_POLL = 5, - IO_CMD_NOOP = 6, -} io_iocb_cmd_t; - -#if defined(__LITTLE_ENDIAN) -#define PADDED(x,y) x, y -#elif defined(__BIG_ENDIAN) -#define PADDED(x,y) y, x -#else -#error edit for your odd byteorder. -#endif - -/* - * we always use a 64bit off_t when communicating - * with userland. its up to libraries to do the - * proper padding and aio_error abstraction - */ - -struct iocb { - /* these are internal to the kernel/libc. */ - guint64 aio_data; /* data to be returned in event's data */ - guint32 PADDED (aio_key, aio_reserved1); - /* the kernel sets aio_key to the req # */ - - /* common fields */ - guint16 aio_lio_opcode; /* see IOCB_CMD_ above */ - gint16 aio_reqprio; - guint32 aio_fildes; - - guint64 aio_buf; - guint64 aio_nbytes; - gint64 aio_offset; - - /* extra parameters */ - guint64 aio_reserved2; /* TODO: use this for a (struct sigevent *) */ - - /* flags for the "struct iocb" */ - guint32 aio_flags; - - /* - * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an - * eventfd to signal AIO readiness to - */ - guint32 aio_resfd; -}; - -struct io_event { - guint64 data; /* the data field from the iocb */ - guint64 obj; /* what iocb this event came from */ - gint64 res; /* result code for this event */ - gint64 res2; /* secondary result */ -}; - -/* Linux specific io calls */ -static int -io_setup (guint nr_reqs, aio_context_t *ctx) -{ - return syscall (SYS_io_setup, nr_reqs, ctx); -} - -static int -io_destroy (aio_context_t ctx) -{ - return syscall (SYS_io_destroy, ctx); -} - -static int -io_getevents (aio_context_t ctx, - long min_nr, - long nr, - struct io_event *events, - struct timespec *tmo) -{ - return syscall (SYS_io_getevents, ctx, min_nr, nr, events, tmo); -} - -static int -io_submit (aio_context_t ctx, long n, struct iocb **paiocb) -{ - return syscall (SYS_io_submit, ctx, n, paiocb); -} - -static int -io_cancel (aio_context_t ctx, struct iocb *iocb, struct io_event *result) -{ - return syscall (SYS_io_cancel, ctx, iocb, result); -} - -# ifndef HAVE_SYS_EVENTFD_H -static int -eventfd (guint initval, guint flags) -{ - return syscall (SYS_eventfd, initval); -} -# endif - -#endif - -/** - * AIO context - */ -struct aio_context { - struct event_base *base; - gboolean has_aio; /**< Whether we have aio support on a system */ -#ifdef LINUX - /* Eventfd variant */ - gint event_fd; - struct event eventfd_ev; - aio_context_t io_ctx; -#elif defined(HAVE_AIO_H) - /* POSIX aio */ - struct event rtsigs[128]; -#endif -}; - -#ifdef LINUX -/* Eventfd read callback */ -static void -rspamd_eventfdcb (gint fd, gshort what, gpointer ud) -{ - struct aio_context *ctx = ud; - guint64 ready; - gint done, i; - struct io_event event[32]; - struct timespec ts; - struct io_cbdata *ev_data; - - /* Eventfd returns number of events ready got from kernel */ - if (read (fd, &ready, 8) != 8) { - if (errno == EAGAIN) { - return; - } - msg_err ("eventfd read returned error: %s", strerror (errno)); - } - - ts.tv_sec = 0; - ts.tv_nsec = 0; - - while (ready) { - /* Get events ready */ - done = io_getevents (ctx->io_ctx, 1, 32, event, &ts); - - if (done > 0) { - ready -= done; - - for (i = 0; i < done; i++) { - ev_data = (struct io_cbdata *) (uintptr_t) event[i].data; - /* Call this callback */ - ev_data->cb (ev_data->fd, - event[i].res, - ev_data->len, - ev_data->buf, - ev_data->ud); - if (ev_data->io_buf) { - free (ev_data->io_buf); - } - g_free (ev_data); - } - } - else if (done == 0) { - /* No more events are ready */ - return; - } - else { - msg_err ("io_getevents failed: %s", strerror (errno)); - return; - } - } -} - -#endif - -/** - * Initialize aio with specified event base - */ -struct aio_context * -rspamd_aio_init (struct event_base *base) -{ - struct aio_context *new; - - /* First of all we need to detect which type of aio we can try to use */ - new = g_malloc0 (sizeof (struct aio_context)); - new->base = base; - -#ifdef LINUX - /* On linux we are trying to use io (3) and eventfd for notifying */ - new->event_fd = eventfd (0, 0); - if (new->event_fd == -1) { - msg_err ("eventfd failed: %s", strerror (errno)); - } - else { - /* Set this socket non-blocking */ - if (rspamd_socket_nonblocking (new->event_fd) == -1) { - msg_err ("non blocking for eventfd failed: %s", strerror (errno)); - close (new->event_fd); - } - else { - event_set (&new->eventfd_ev, - new->event_fd, - EV_READ | EV_PERSIST, - rspamd_eventfdcb, - new); - event_base_set (new->base, &new->eventfd_ev); - event_add (&new->eventfd_ev, NULL); - if (io_setup (MAX_AIO_EV, &new->io_ctx) == -1) { - msg_err ("io_setup failed: %s", strerror (errno)); - close (new->event_fd); - } - else { - new->has_aio = TRUE; - } - } - } -#elif defined(HAVE_AIO_H) - /* TODO: implement this */ -#endif - - return new; -} - -/** - * Open file for aio - */ -gint -rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags) -{ - gint fd = -1; - /* Fallback */ - if (!ctx->has_aio) { - return open (path, flags); - } -#ifdef LINUX - - fd = open (path, flags | O_DIRECT); - - return fd; -#elif defined(HAVE_AIO_H) - fd = open (path, flags); -#endif - - return fd; -} - -/** - * Asynchronous read of file - */ -gint -rspamd_aio_read (gint fd, - gpointer buf, - guint64 len, - guint64 offset, - struct aio_context *ctx, - rspamd_aio_cb cb, - gpointer ud) -{ - gint r = -1; - - if (ctx->has_aio) { -#ifdef LINUX - struct iocb *iocb[1]; - struct io_cbdata *cbdata; - - cbdata = g_malloc0 (sizeof (struct io_cbdata)); - cbdata->cb = cb; - cbdata->buf = buf; - cbdata->len = len; - cbdata->ud = ud; - cbdata->fd = fd; - cbdata->io_buf = NULL; - - iocb[0] = alloca (sizeof (struct iocb)); - memset (iocb[0], 0, sizeof (struct iocb)); - iocb[0]->aio_fildes = fd; - iocb[0]->aio_lio_opcode = IO_CMD_PREAD; - iocb[0]->aio_reqprio = 0; - iocb[0]->aio_buf = (guint64)((uintptr_t)buf); - iocb[0]->aio_nbytes = len; - iocb[0]->aio_offset = offset; - iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */; - iocb[0]->aio_resfd = ctx->event_fd; - iocb[0]->aio_data = (guint64)((uintptr_t)cbdata); - - /* Iocb is copied to kernel internally, so it is safe to put it on stack */ - if (io_submit (ctx->io_ctx, 1, iocb) == 1) { - return len; - } - else { - if (errno == EAGAIN || errno == ENOSYS) { - /* Fall back to sync read */ - goto blocking; - } - return -1; - } - -#elif defined(HAVE_AIO_H) -#endif - } - else { - /* Blocking variant */ - goto blocking; -blocking: -#ifdef _LARGEFILE64_SOURCE - r = lseek64 (fd, offset, SEEK_SET); -#else - r = lseek (fd, offset, SEEK_SET); -#endif - if (r > 0) { - r = read (fd, buf, len); - if (r >= 0) { - cb (fd, 0, r, buf, ud); - } - else { - cb (fd, r, -1, buf, ud); - } - } - } - - return r; -} - -/** - * Asynchronous write of file - */ -gint -rspamd_aio_write (gint fd, - gpointer buf, - guint64 len, - guint64 offset, - struct aio_context *ctx, - rspamd_aio_cb cb, - gpointer ud) -{ - gint r = -1; - - if (ctx->has_aio) { -#ifdef LINUX - struct iocb *iocb[1]; - struct io_cbdata *cbdata; - - cbdata = g_malloc0 (sizeof (struct io_cbdata)); - cbdata->cb = cb; - cbdata->buf = buf; - cbdata->len = len; - cbdata->ud = ud; - cbdata->fd = fd; - /* We need to align pointer on boundary of 512 bytes here */ - if (posix_memalign (&cbdata->io_buf, 512, len) != 0) { - return -1; - } - memcpy (cbdata->io_buf, buf, len); - - iocb[0] = alloca (sizeof (struct iocb)); - memset (iocb[0], 0, sizeof (struct iocb)); - iocb[0]->aio_fildes = fd; - iocb[0]->aio_lio_opcode = IO_CMD_PWRITE; - iocb[0]->aio_reqprio = 0; - iocb[0]->aio_buf = (guint64)((uintptr_t)cbdata->io_buf); - iocb[0]->aio_nbytes = len; - iocb[0]->aio_offset = offset; - iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */; - iocb[0]->aio_resfd = ctx->event_fd; - iocb[0]->aio_data = (guint64)((uintptr_t)cbdata); - - /* Iocb is copied to kernel internally, so it is safe to put it on stack */ - if (io_submit (ctx->io_ctx, 1, iocb) == 1) { - return len; - } - else { - if (errno == EAGAIN || errno == ENOSYS) { - /* Fall back to sync read */ - goto blocking; - } - return -1; - } - -#elif defined(HAVE_AIO_H) -#endif - } - else { - /* Blocking variant */ - goto blocking; -blocking: -#ifdef _LARGEFILE64_SOURCE - r = lseek64 (fd, offset, SEEK_SET); -#else - r = lseek (fd, offset, SEEK_SET); -#endif - if (r > 0) { - r = write (fd, buf, len); - if (r >= 0) { - cb (fd, 0, r, buf, ud); - } - else { - cb (fd, r, -1, buf, ud); - } - } - } - - return r; -} - -/** - * Close of aio operations - */ -gint -rspamd_aio_close (gint fd, struct aio_context *ctx) -{ - gint r = -1; - - if (ctx->has_aio) { -#ifdef LINUX - struct iocb iocb; - struct io_event ev; - - memset (&iocb, 0, sizeof (struct iocb)); - iocb.aio_fildes = fd; - iocb.aio_lio_opcode = IO_CMD_NOOP; - - /* Iocb is copied to kernel internally, so it is safe to put it on stack */ - r = io_cancel (ctx->io_ctx, &iocb, &ev); - close (fd); - return r; - -#elif defined(HAVE_AIO_H) -#endif - } - - r = close (fd); - - return r; -} diff --git a/src/libutil/aio_event.h b/src/libutil/aio_event.h deleted file mode 100644 index cccbed4e2..000000000 --- a/src/libutil/aio_event.h +++ /dev/null @@ -1,59 +0,0 @@ -/*- - * Copyright 2016 Vsevolod Stakhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef AIO_EVENT_H_ -#define AIO_EVENT_H_ - -#include "config.h" - -/** - * AIO context - */ -struct aio_context; - -/** - * Callback for notifying - */ -typedef void (*rspamd_aio_cb) (gint fd, gint res, guint64 len, gpointer data, - gpointer ud); - -/** - * Initialize aio with specified event base - */ -struct aio_context * rspamd_aio_init (struct event_base *base); - -/** - * Open file for aio - */ -gint rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags); - -/** - * Asynchronous read of file - */ -gint rspamd_aio_read (gint fd, gpointer buf, guint64 len, guint64 offset, - struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud); - -/** - * Asynchronous write of file - */ -gint rspamd_aio_write (gint fd, gpointer buf, guint64 len, guint64 offset, - struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud); - -/** - * Close of aio operations - */ -gint rspamd_aio_close (gint fd, struct aio_context *ctx); - -#endif /* AIO_EVENT_H_ */ diff --git a/src/libutil/http_connection.c b/src/libutil/http_connection.c index aa964f417..9bf7456e7 100644 --- a/src/libutil/http_connection.c +++ b/src/libutil/http_connection.c @@ -25,6 +25,7 @@ #include "ottery.h" #include "keypair_private.h" #include "cryptobox.h" +#include "libutil/libev_helper.h" #include "libutil/ssl_util.h" #include "libserver/url.h" @@ -67,9 +68,8 @@ struct rspamd_http_connection_private { struct rspamd_http_header *header; struct http_parser parser; struct http_parser_settings parser_cb; - struct event ev; - struct timeval tv; - struct timeval *ptv; + struct rspamd_io_ev ev; + ev_tstamp timeout; struct rspamd_http_message *msg; struct iovec *out; guint outlen; @@ -348,9 +348,7 @@ rspamd_http_on_headers_complete (http_parser * parser) if (msg->method == HTTP_HEAD) { /* We don't care about the rest */ - if (rspamd_event_pending (&priv->ev, EV_READ)) { - event_del (&priv->ev); - } + rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev); msg->code = parser->status_code; rspamd_http_connection_ref (conn); @@ -358,7 +356,7 @@ rspamd_http_on_headers_complete (http_parser * parser) if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) { rspamd_http_context_push_keepalive (conn->priv->ctx, conn, - msg, conn->priv->ctx->ev_base); + msg, conn->priv->ctx->event_loop); rspamd_http_connection_reset (conn); } else { @@ -532,17 +530,14 @@ rspamd_http_on_headers_complete_decrypted (http_parser *parser) if (msg->method == HTTP_HEAD) { /* We don't care about the rest */ - if (rspamd_event_pending (&priv->ev, EV_READ)) { - event_del (&priv->ev); - } - + rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev); msg->code = parser->status_code; rspamd_http_connection_ref (conn); ret = conn->finish_handler (conn, msg); if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) { rspamd_http_context_push_keepalive (conn->priv->ctx, conn, - msg, conn->priv->ctx->ev_base); + msg, conn->priv->ctx->event_loop); rspamd_http_connection_reset (conn); } else { @@ -692,16 +687,13 @@ rspamd_http_on_message_complete (http_parser * parser) } if (ret == 0) { - if (rspamd_event_pending (&priv->ev, EV_READ)) { - event_del (&priv->ev); - } - + rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev); rspamd_http_connection_ref (conn); ret = conn->finish_handler (conn, priv->msg); if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) { rspamd_http_context_push_keepalive (conn->priv->ctx, conn, - priv->msg, conn->priv->ctx->ev_base); + priv->msg, conn->priv->ctx->event_loop); rspamd_http_connection_reset (conn); } else { @@ -741,11 +733,11 @@ rspamd_http_simple_client_helper (struct rspamd_http_connection *conn) if (conn->opts & RSPAMD_HTTP_CLIENT_SHARED) { rspamd_http_connection_read_message_shared (conn, conn->ud, - conn->priv->ptv); + conn->priv->timeout); } else { rspamd_http_connection_read_message (conn, conn->ud, - conn->priv->ptv); + conn->priv->timeout); } if (priv->msg) { @@ -835,7 +827,6 @@ rspamd_http_write_helper (struct rspamd_http_connection *conn) else { /* Want to write more */ priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_RESETED; - event_add (&priv->ev, priv->ptv); } return; @@ -1269,10 +1260,7 @@ rspamd_http_connection_reset (struct rspamd_http_connection *conn) if (!(priv->flags & RSPAMD_HTTP_CONN_FLAG_RESETED)) { - if (rspamd_event_pending (&priv->ev, EV_READ|EV_WRITE|EV_TIMEOUT)) { - event_del (&priv->ev); - } - + rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev); rspamd_http_parser_reset (conn); } @@ -1468,7 +1456,7 @@ rspamd_http_connection_free (struct rspamd_http_connection *conn) static void rspamd_http_connection_read_message_common (struct rspamd_http_connection *conn, - gpointer ud, struct timeval *timeout, + gpointer ud, ev_tstamp timeout, gint flags) { struct rspamd_http_connection_private *priv = conn->priv; @@ -1490,42 +1478,30 @@ rspamd_http_connection_read_message_common (struct rspamd_http_connection *conn, priv->flags |= RSPAMD_HTTP_CONN_FLAG_ENCRYPTED; } - if (timeout == NULL) { - priv->ptv = NULL; - } - else { - memmove (&priv->tv, timeout, sizeof (struct timeval)); - priv->ptv = &priv->tv; - } - + priv->timeout = timeout; priv->header = NULL; priv->buf = g_malloc0 (sizeof (*priv->buf)); REF_INIT_RETAIN (priv->buf, rspamd_http_privbuf_dtor); priv->buf->data = rspamd_fstring_sized_new (8192); priv->flags |= RSPAMD_HTTP_CONN_FLAG_NEW_HEADER; - event_set (&priv->ev, - conn->fd, - EV_READ | EV_PERSIST, - rspamd_http_event_handler, - conn); - - event_base_set (priv->ctx->ev_base, &priv->ev); + rspamd_ev_watcher_init (&priv->ev, conn->fd, EV_READ, + rspamd_http_event_handler, conn); + rspamd_ev_watcher_start (priv->ctx->event_loop, &priv->ev, priv->timeout); priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_RESETED; - event_add (&priv->ev, priv->ptv); } void rspamd_http_connection_read_message (struct rspamd_http_connection *conn, - gpointer ud, struct timeval *timeout) + gpointer ud, ev_tstamp timeout) { rspamd_http_connection_read_message_common (conn, ud, timeout, 0); } void rspamd_http_connection_read_message_shared (struct rspamd_http_connection *conn, - gpointer ud, struct timeval *timeout) + gpointer ud, ev_tstamp timeout) { rspamd_http_connection_read_message_common (conn, ud, timeout, RSPAMD_HTTP_FLAG_SHMEM); @@ -1912,7 +1888,7 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn const gchar *host, const gchar *mime_type, gpointer ud, - struct timeval *timeout, + ev_tstamp timeout, gboolean allow_shared) { struct rspamd_http_connection_private *priv = conn->priv; @@ -1930,14 +1906,7 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn conn->ud = ud; priv->msg = msg; - - if (timeout == NULL) { - priv->ptv = NULL; - } - else if (timeout != &priv->tv) { - memcpy (&priv->tv, timeout, sizeof (struct timeval)); - priv->ptv = &priv->tv; - } + priv->timeout = timeout; priv->header = NULL; priv->buf = g_malloc0 (sizeof (*priv->buf)); @@ -2224,16 +2193,12 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn msg->flags &=~ RSPAMD_HTTP_FLAG_SSL; } - if (rspamd_event_pending (&priv->ev, EV_TIMEOUT|EV_WRITE|EV_READ)) { - event_del (&priv->ev); - } + rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev); if (msg->flags & RSPAMD_HTTP_FLAG_SSL) { gpointer ssl_ctx = (msg->flags & RSPAMD_HTTP_FLAG_SSL_NOVERIFY) ? priv->ctx->ssl_ctx_noverify : priv->ctx->ssl_ctx; - event_base_set (priv->ctx->ev_base, &priv->ev); - if (!ssl_ctx) { err = g_error_new (HTTP_ERROR, errno, "ssl message requested " "with no ssl ctx"); @@ -2249,12 +2214,12 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn rspamd_ssl_connection_free (priv->ssl); } - priv->ssl = rspamd_ssl_connection_new (ssl_ctx, priv->ctx->ev_base, + priv->ssl = rspamd_ssl_connection_new (ssl_ctx, priv->ctx->event_loop, !(msg->flags & RSPAMD_HTTP_FLAG_SSL_NOVERIFY)); g_assert (priv->ssl != NULL); if (!rspamd_ssl_connect_fd (priv->ssl, conn->fd, host, &priv->ev, - priv->ptv, rspamd_http_event_handler, + priv->timeout, rspamd_http_event_handler, rspamd_http_ssl_err_handler, conn)) { err = g_error_new (HTTP_ERROR, errno, @@ -2270,10 +2235,9 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn } } else { - event_set (&priv->ev, conn->fd, EV_WRITE, rspamd_http_event_handler, conn); - event_base_set (priv->ctx->ev_base, &priv->ev); - - event_add (&priv->ev, priv->ptv); + rspamd_ev_watcher_init (&priv->ev, conn->fd, EV_WRITE, + rspamd_http_event_handler, conn); + rspamd_ev_watcher_start (priv->ctx->event_loop, &priv->ev, priv->timeout); } } @@ -2283,7 +2247,7 @@ rspamd_http_connection_write_message (struct rspamd_http_connection *conn, const gchar *host, const gchar *mime_type, gpointer ud, - struct timeval *timeout) + ev_tstamp timeout) { rspamd_http_connection_write_message_common (conn, msg, host, mime_type, ud, timeout, FALSE); @@ -2295,7 +2259,7 @@ rspamd_http_connection_write_message_shared (struct rspamd_http_connection *conn const gchar *host, const gchar *mime_type, gpointer ud, - struct timeval *timeout) + ev_tstamp timeout) { rspamd_http_connection_write_message_common (conn, msg, host, mime_type, ud, timeout, TRUE); diff --git a/src/libutil/http_connection.h b/src/libutil/http_connection.h index 6240772da..fc1303446 100644 --- a/src/libutil/http_connection.h +++ b/src/libutil/http_connection.h @@ -31,7 +31,7 @@ #include "http_util.h" #include "addr.h" -#include <event.h> +#include "contrib/libev/ev.h" enum rspamd_http_connection_type { RSPAMD_HTTP_SERVER, @@ -221,12 +221,12 @@ gboolean rspamd_http_connection_is_encrypted (struct rspamd_http_connection *con void rspamd_http_connection_read_message ( struct rspamd_http_connection *conn, gpointer ud, - struct timeval *timeout); + ev_tstamp timeout); void rspamd_http_connection_read_message_shared ( struct rspamd_http_connection *conn, gpointer ud, - struct timeval *timeout); + ev_tstamp timeout); /** * Send reply using initialised connection @@ -241,7 +241,7 @@ void rspamd_http_connection_write_message ( const gchar *host, const gchar *mime_type, gpointer ud, - struct timeval *timeout); + ev_tstamp timeout); void rspamd_http_connection_write_message_shared ( struct rspamd_http_connection *conn, @@ -249,7 +249,7 @@ void rspamd_http_connection_write_message_shared ( const gchar *host, const gchar *mime_type, gpointer ud, - struct timeval *timeout); + ev_tstamp timeout); /** * Free connection structure diff --git a/src/libutil/http_context.c b/src/libutil/http_context.c index 95500aaad..95ab7021c 100644 --- a/src/libutil/http_context.c +++ b/src/libutil/http_context.c @@ -23,6 +23,7 @@ #include "contrib/libottery/ottery.h" #include "contrib/http-parser/http_parser.h" #include "rspamd.h" +#include "libev_helper.h" INIT_LOG_MODULE(http_context) @@ -38,7 +39,7 @@ struct rspamd_http_keepalive_cbdata { struct rspamd_http_context *ctx; GQueue *queue; GList *link; - struct event ev; + struct rspamd_io_ev ev; }; static void @@ -64,20 +65,16 @@ rspamd_http_keepalive_queue_cleanup (GQueue *conns) } static void -rspamd_http_context_client_rotate_ev (gint fd, short what, void *arg) +rspamd_http_context_client_rotate_ev (struct ev_loop *loop, ev_timer *w, int revents) { - struct timeval rot_tv; - struct rspamd_http_context *ctx = arg; + struct rspamd_http_context *ctx = (struct rspamd_http_context *)w->data; gpointer kp; - double_to_tv (ctx->config.client_key_rotate_time, &rot_tv); - rot_tv.tv_sec += ottery_rand_range (rot_tv.tv_sec); + w->repeat = rspamd_time_jitter (ctx->config.client_key_rotate_time, 0); + msg_debug_http_context ("rotate local keypair, next rotate in %.0f seconds", + w->repeat); - msg_debug_http_context ("rotate local keypair, next rotate in %d seconds", - (int)rot_tv.tv_sec); - - event_del (&ctx->client_rotate_ev); - event_add (&ctx->client_rotate_ev, &rot_tv); + ev_timer_again (loop, w); kp = ctx->client_kp; ctx->client_kp = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX, @@ -87,7 +84,7 @@ rspamd_http_context_client_rotate_ev (gint fd, short what, void *arg) static struct rspamd_http_context* rspamd_http_context_new_default (struct rspamd_config *cfg, - struct event_base *ev_base, + struct ev_loop *ev_base, struct upstream_ctx *ups_ctx) { struct rspamd_http_context *ctx; @@ -114,7 +111,7 @@ rspamd_http_context_new_default (struct rspamd_config *cfg, ctx->ssl_ctx_noverify = rspamd_init_ssl_ctx_noverify (); } - ctx->ev_base = ev_base; + ctx->event_loop = ev_base; ctx->keep_alive_hash = kh_init (rspamd_keep_alive_hash); @@ -186,16 +183,14 @@ rspamd_http_context_init (struct rspamd_http_context *ctx) ctx->server_kp_cache = rspamd_keypair_cache_new (ctx->config.kp_cache_size_server); } - if (ctx->config.client_key_rotate_time > 0 && ctx->ev_base) { - struct timeval tv; + if (ctx->config.client_key_rotate_time > 0 && ctx->event_loop) { double jittered = rspamd_time_jitter (ctx->config.client_key_rotate_time, 0); - double_to_tv (jittered, &tv); - event_set (&ctx->client_rotate_ev, -1, EV_TIMEOUT, - rspamd_http_context_client_rotate_ev, ctx); - event_base_set (ctx->ev_base, &ctx->client_rotate_ev); - event_add (&ctx->client_rotate_ev, &tv); + ev_timer_init (&ctx->client_rotate_ev, + rspamd_http_context_client_rotate_ev, jittered, 0); + ev_timer_start (ctx->event_loop, &ctx->client_rotate_ev); + ctx->client_rotate_ev.data = ctx; } if (ctx->config.http_proxy) { @@ -208,7 +203,7 @@ rspamd_http_context_init (struct rspamd_http_context *ctx) struct rspamd_http_context* rspamd_http_context_create (struct rspamd_config *cfg, - struct event_base *ev_base, + struct ev_loop *ev_base, struct upstream_ctx *ups_ctx) { struct rspamd_http_context *ctx; @@ -337,7 +332,7 @@ rspamd_http_context_free (struct rspamd_http_context *ctx) struct rspamd_http_context* rspamd_http_context_create_config (struct rspamd_http_context_cfg *cfg, - struct event_base *ev_base, + struct ev_loop *ev_base, struct upstream_ctx *ups_ctx) { struct rspamd_http_context *ctx; @@ -412,7 +407,7 @@ rspamd_http_context_check_keepalive (struct rspamd_http_context *ctx, struct rspamd_http_connection *conn; cbd = g_queue_pop_head (conns); - event_del (&cbd->ev); + rspamd_ev_watcher_stop (ctx->event_loop, &cbd->ev); conn = cbd->conn; g_free (cbd); @@ -491,6 +486,7 @@ rspamd_http_keepalive_handler (gint fd, short what, gpointer ud) cbdata->conn->keepalive_hash_key->host, cbdata->queue->length); rspamd_http_connection_unref (cbdata->conn); + rspamd_ev_watcher_stop (cbdata->ctx->event_loop, &cbdata->ev); g_free (cbdata); } @@ -498,10 +494,9 @@ void rspamd_http_context_push_keepalive (struct rspamd_http_context *ctx, struct rspamd_http_connection *conn, struct rspamd_http_message *msg, - struct event_base *ev_base) + struct ev_loop *event_loop) { struct rspamd_http_keepalive_cbdata *cbdata; - struct timeval tv; gdouble timeout = ctx->config.keepalive_interval; g_assert (conn->keepalive_hash_key != NULL); @@ -571,17 +566,14 @@ rspamd_http_context_push_keepalive (struct rspamd_http_context *ctx, cbdata->ctx = ctx; conn->finished = FALSE; - event_set (&cbdata->ev, conn->fd, EV_READ|EV_TIMEOUT, + rspamd_ev_watcher_init (&cbdata->ev, conn->fd, EV_READ, rspamd_http_keepalive_handler, cbdata); + rspamd_ev_watcher_start (event_loop, &cbdata->ev, timeout); msg_debug_http_context ("push keepalive element %s (%s), %d connections queued, %.1f timeout", rspamd_inet_address_to_string_pretty (cbdata->conn->keepalive_hash_key->addr), cbdata->conn->keepalive_hash_key->host, cbdata->queue->length, timeout); - - double_to_tv (timeout, &tv); - event_base_set (ev_base, &cbdata->ev); - event_add (&cbdata->ev, &tv); }
\ No newline at end of file diff --git a/src/libutil/http_context.h b/src/libutil/http_context.h index 4cf07fb48..c610ffbbd 100644 --- a/src/libutil/http_context.h +++ b/src/libutil/http_context.h @@ -21,7 +21,7 @@ #include "ucl.h" #include "addr.h" -#include <event.h> +#include "contrib/libev/ev.h" struct rspamd_http_context; struct rspamd_config; @@ -45,11 +45,11 @@ struct rspamd_http_context_cfg { * @return new context used for both client and server HTTP connections */ struct rspamd_http_context* rspamd_http_context_create (struct rspamd_config *cfg, - struct event_base *ev_base, struct upstream_ctx *ctx); + struct ev_loop *ev_base, struct upstream_ctx *ctx); struct rspamd_http_context* rspamd_http_context_create_config ( struct rspamd_http_context_cfg *cfg, - struct event_base *ev_base, + struct ev_loop *ev_base, struct upstream_ctx *ctx); /** * Destroys context @@ -93,6 +93,6 @@ void rspamd_http_context_prepare_keepalive (struct rspamd_http_context *ctx, void rspamd_http_context_push_keepalive (struct rspamd_http_context *ctx, struct rspamd_http_connection *conn, struct rspamd_http_message *msg, - struct event_base *ev_base); + struct ev_loop *ev_base); #endif diff --git a/src/libutil/http_private.h b/src/libutil/http_private.h index 368715891..f5a7dd9cc 100644 --- a/src/libutil/http_private.h +++ b/src/libutil/http_private.h @@ -100,8 +100,8 @@ struct rspamd_http_context { struct upstream_list *http_proxies; gpointer ssl_ctx; gpointer ssl_ctx_noverify; - struct event_base *ev_base; - struct event client_rotate_ev; + struct ev_loop *event_loop; + ev_timer client_rotate_ev; khash_t (rspamd_keep_alive_hash) *keep_alive_hash; }; diff --git a/src/libutil/http_router.c b/src/libutil/http_router.c index ec0eeb7b4..8d5913f0d 100644 --- a/src/libutil/http_router.c +++ b/src/libutil/http_router.c @@ -92,7 +92,7 @@ rspamd_http_router_error_handler (struct rspamd_http_connection *conn, NULL, "text/plain", entry, - entry->rt->ptv); + entry->rt->timeout); entry->is_reply = TRUE; } } @@ -210,7 +210,7 @@ rspamd_http_router_try_file (struct rspamd_http_connection_entry *entry, msg_debug ("requested file %s", realbuf); rspamd_http_connection_write_message (entry->conn, reply_msg, NULL, rspamd_http_router_detect_ct (realbuf), entry, - entry->rt->ptv); + entry->rt->timeout); return TRUE; } @@ -235,7 +235,7 @@ rspamd_http_router_send_error (GError *err, NULL, "text/plain", entry, - entry->rt->ptv); + entry->rt->timeout); } @@ -369,33 +369,25 @@ rspamd_http_router_finish_handler (struct rspamd_http_connection *conn, struct rspamd_http_connection_router * rspamd_http_router_new (rspamd_http_router_error_handler_t eh, rspamd_http_router_finish_handler_t fh, - struct timeval *timeout, + ev_tstamp timeout, const char *default_fs_path, struct rspamd_http_context *ctx) { - struct rspamd_http_connection_router * new; + struct rspamd_http_connection_router *nrouter; struct stat st; - new = g_malloc0 (sizeof (struct rspamd_http_connection_router)); - new->paths = g_hash_table_new_full (rspamd_ftok_icase_hash, + nrouter = g_malloc0 (sizeof (struct rspamd_http_connection_router)); + nrouter->paths = g_hash_table_new_full (rspamd_ftok_icase_hash, rspamd_ftok_icase_equal, rspamd_fstring_mapped_ftok_free, NULL); - new->regexps = g_ptr_array_new (); - new->conns = NULL; - new->error_handler = eh; - new->finish_handler = fh; - new->response_headers = g_hash_table_new_full (rspamd_strcase_hash, + nrouter->regexps = g_ptr_array_new (); + nrouter->conns = NULL; + nrouter->error_handler = eh; + nrouter->finish_handler = fh; + nrouter->response_headers = g_hash_table_new_full (rspamd_strcase_hash, rspamd_strcase_equal, g_free, g_free); - new->ev_base = ctx->ev_base; - - if (timeout) { - new->tv = *timeout; - new->ptv = &new->tv; - } - else { - new->ptv = NULL; - } - - new->default_fs_path = NULL; + nrouter->event_loop = ctx->event_loop; + nrouter->timeout = timeout; + nrouter->default_fs_path = NULL; if (default_fs_path != NULL) { if (stat (default_fs_path, &st) == -1) { @@ -406,14 +398,14 @@ rspamd_http_router_new (rspamd_http_router_error_handler_t eh, msg_err ("path %s is not a directory", default_fs_path); } else { - new->default_fs_path = realpath (default_fs_path, NULL); + nrouter->default_fs_path = realpath (default_fs_path, NULL); } } } - new->ctx = ctx; + nrouter->ctx = ctx; - return new; + return nrouter; } void @@ -517,7 +509,7 @@ rspamd_http_router_handle_socket (struct rspamd_http_connection_router *router, rspamd_http_connection_set_key (conn->conn, router->key); } - rspamd_http_connection_read_message (conn->conn, conn, router->ptv); + rspamd_http_connection_read_message (conn->conn, conn, router->timeout); DL_PREPEND (router->conns, conn); } diff --git a/src/libutil/http_router.h b/src/libutil/http_router.h index 8e8056240..b946067b7 100644 --- a/src/libutil/http_router.h +++ b/src/libutil/http_router.h @@ -44,9 +44,8 @@ struct rspamd_http_connection_router { GHashTable *paths; GHashTable *response_headers; GPtrArray *regexps; - struct timeval tv; - struct timeval *ptv; - struct event_base *ev_base; + ev_tstamp timeout; + struct ev_loop *event_loop; struct rspamd_http_context *ctx; gchar *default_fs_path; rspamd_http_router_handler_t unknown_method_handler; @@ -66,7 +65,7 @@ struct rspamd_http_connection_router { struct rspamd_http_connection_router * rspamd_http_router_new ( rspamd_http_router_error_handler_t eh, rspamd_http_router_finish_handler_t fh, - struct timeval *timeout, + ev_tstamp timeout, const char *default_fs_path, struct rspamd_http_context *ctx); diff --git a/src/libutil/libev_helper.c b/src/libutil/libev_helper.c new file mode 100644 index 000000000..81a23dea6 --- /dev/null +++ b/src/libutil/libev_helper.c @@ -0,0 +1,119 @@ +/*- + * Copyright 2019 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "libev_helper.h" + +static void +rspamd_ev_watcher_io_cb (EV_P_ struct ev_io *w, int revents) +{ + struct rspamd_io_ev *ev = (struct rspamd_io_ev *)w->data; + + ev->last_activity = ev_now (EV_A); + ev->cb (ev->io.fd, revents, ev->ud); +} + +static void +rspamd_ev_watcher_timer_cb (EV_P_ struct ev_timer *w, int revents) +{ + struct rspamd_io_ev *ev = (struct rspamd_io_ev *)w->data; + + ev_tstamp after = ev->last_activity - ev_now (EV_A) + ev->timeout; + + if (after < 0.) { + /* Real timeout */ + ev->cb (ev->io.fd, EV_TIMER, ev->ud); + } + else { + /* Start another cycle as there was some activity */ + w->repeat = after; + ev_timer_again (EV_A_ w); + } +} + + +void +rspamd_ev_watcher_init (struct rspamd_io_ev *ev, + int fd, + short what, + rspamd_ev_cb cb, + void *ud) +{ + ev_io_init (&ev->io, rspamd_ev_watcher_io_cb, fd, what); + ev->io.data = ev; + ev_init (&ev->tm, rspamd_ev_watcher_timer_cb); + ev->tm.data = ev; + ev->ud = ud; + ev->cb = cb; +} + +void +rspamd_ev_watcher_start (struct ev_loop *loop, + struct rspamd_io_ev *ev, + ev_tstamp timeout) +{ + g_assert (ev->cb != NULL); + + ev->last_activity = ev_now (EV_A); + ev_io_start (EV_A_ &ev->io); + + if (timeout > 0) { + ev->timeout = timeout; + ev_timer_set (&ev->tm, timeout, 0.0); + ev_timer_start (EV_A_ &ev->tm); + } +} + +void +rspamd_ev_watcher_stop (struct ev_loop *loop, + struct rspamd_io_ev *ev) +{ + if (ev_is_pending (&ev->io) || ev_is_active (&ev->io)) { + ev_io_stop (EV_A_ &ev->io); + } + + if (ev->timeout > 0) { + ev_timer_stop (EV_A_ &ev->tm); + } +} + +void +rspamd_ev_watcher_reschedule (struct ev_loop *loop, + struct rspamd_io_ev *ev, + short what) +{ + g_assert (ev->cb != NULL); + + if (ev_is_pending (&ev->io) || ev_is_active (&ev->io)) { + ev_io_stop (EV_A_ &ev->io); + ev_io_set (&ev->io, ev->io.fd, what); + ev_io_start (EV_A_ &ev->io); + } + else { + ev->io.data = ev; + ev_io_init (&ev->io, rspamd_ev_watcher_io_cb, ev->io.fd, what); + ev_io_start (EV_A_ &ev->io); + } + + if (ev->timeout > 0) { + if (!(ev_is_active (&ev->tm) || ev_is_pending (&ev->tm))) { + ev->tm.data = ev; + ev_timer_init (&ev->tm, rspamd_ev_watcher_timer_cb, ev->timeout, 0.0); + ev_timer_start (EV_A_ &ev->tm); + } + } + + ev->last_activity = ev_now (EV_A); +}
\ No newline at end of file diff --git a/src/libutil/libev_helper.h b/src/libutil/libev_helper.h new file mode 100644 index 000000000..cf52db557 --- /dev/null +++ b/src/libutil/libev_helper.h @@ -0,0 +1,78 @@ +/*- + * Copyright 2019 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RSPAMD_LIBEV_HELPER_H +#define RSPAMD_LIBEV_HELPER_H + +#include "config.h" +#include "contrib/libev/ev.h" + +/* + * This module is a little helper to simplify libevent->libev transition + * It allows to create timed IO watchers utilising both + */ + +typedef void (*rspamd_ev_cb)(int fd, short what, void *ud); + +struct rspamd_io_ev { + ev_io io; + ev_timer tm; + rspamd_ev_cb cb; + void *ud; + ev_tstamp last_activity; + ev_tstamp timeout; +}; + +/** + * Initialize watcher similar to event_init + * @param ev + * @param fd + * @param what + * @param cb + * @param ud + */ +void rspamd_ev_watcher_init (struct rspamd_io_ev *ev, + int fd, short what, rspamd_ev_cb cb, void *ud); + +/** + * Start watcher with the specific timeout + * @param loop + * @param ev + * @param timeout + */ +void rspamd_ev_watcher_start (struct ev_loop *loop, + struct rspamd_io_ev *ev, + ev_tstamp timeout); + +/** + * Stops watcher and clean it up + * @param loop + * @param ev + */ +void rspamd_ev_watcher_stop (struct ev_loop *loop, + struct rspamd_io_ev *ev); + +/** + * Convenience function to reschedule watcher with different events + * @param loop + * @param ev + * @param what + */ +void rspamd_ev_watcher_reschedule (struct ev_loop *loop, + struct rspamd_io_ev *ev, + short what); + +#endif diff --git a/src/libutil/map.c b/src/libutil/map.c index fc414ab00..9f43fa253 100644 --- a/src/libutil/map.c +++ b/src/libutil/map.c @@ -23,6 +23,7 @@ #include "http_private.h" #include "rspamd.h" #include "contrib/zstd/zstd.h" +#include "contrib/libev/ev.h" #undef MAP_DEBUG_REFS #ifdef MAP_DEBUG_REFS @@ -44,7 +45,7 @@ static void free_http_cbdata_common (struct http_callback_data *cbd, gboolean plan_new); static void free_http_cbdata_dtor (gpointer p); static void free_http_cbdata (struct http_callback_data *cbd); -static void rspamd_map_periodic_callback (gint fd, short what, void *ud); +static void rspamd_map_process_periodic (struct map_periodic_cbdata *cbd); static void rspamd_map_schedule_periodic (struct rspamd_map *map, gboolean locked, gboolean initial, gboolean errored); static gboolean read_map_file_chunks (struct rspamd_map *map, @@ -130,7 +131,7 @@ write_http_request (struct http_callback_data *cbd) cbd->data->host, NULL, cbd, - &cbd->tv); + cbd->timeout); } static gboolean @@ -274,7 +275,12 @@ free_http_cbdata_common (struct http_callback_data *cbd, gboolean plan_new) MAP_RELEASE (cbd->bk, "rspamd_map_backend"); - MAP_RELEASE (periodic, "periodic"); + + if (periodic) { + /* Detached in case of HTTP error */ + MAP_RELEASE (periodic, "periodic"); + } + g_free (cbd); } @@ -325,17 +331,21 @@ http_map_error (struct rspamd_http_connection *conn, cbd->bk->uri, cbd->addr ? rspamd_inet_address_to_string_pretty (cbd->addr) : "", err); - rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic); + MAP_RETAIN (cbd->periodic, "periodic"); + rspamd_map_process_periodic (cbd->periodic); + MAP_RELEASE (cbd->periodic, "periodic"); + /* Detach periodic as rspamd_map_process_periodic will destroy it */ + cbd->periodic = NULL; MAP_RELEASE (cbd, "http_callback_data"); } static void -rspamd_map_cache_cb (gint fd, short what, gpointer ud) +rspamd_map_cache_cb (struct ev_loop *loop, ev_timer *w, int revents) { - struct rspamd_http_map_cached_cbdata *cache_cbd = ud; + struct rspamd_http_map_cached_cbdata *cache_cbd = (struct rspamd_http_map_cached_cbdata *) + w->data; struct rspamd_map *map; struct http_map_data *data; - struct timeval tv; map = cache_cbd->map; data = cache_cbd->data; @@ -349,7 +359,7 @@ rspamd_map_cache_cb (gint fd, short what, gpointer ud) msg_info_map ("cached data is now expired (gen mismatch %L != %L) for %s", cache_cbd->gen, cache_cbd->data->gen, map->name); MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata"); - event_del (&cache_cbd->timeout); + ev_timer_stop (loop, &cache_cbd->timeout); g_free (cache_cbd); } else if (cache_cbd->data->last_checked >= cache_cbd->last_checked) { @@ -357,17 +367,25 @@ rspamd_map_cache_cb (gint fd, short what, gpointer ud) * We checked map but we have not found anything more recent, * reschedule cache check */ + if (cache_cbd->map->poll_timeout > + ev_now (loop) - cache_cbd->data->last_checked) { + w->repeat = cache_cbd->map->poll_timeout - + (ev_now (loop) - cache_cbd->data->last_checked); + } + else { + w->repeat = cache_cbd->map->poll_timeout; + } + cache_cbd->last_checked = cache_cbd->data->last_checked; msg_debug_map ("cached data is up to date for %s", map->name); - double_to_tv (map->poll_timeout * 2, &tv); - event_add (&cache_cbd->timeout, &tv); + ev_timer_again (loop, &cache_cbd->timeout); } else { data->cur_cache_cbd = NULL; g_atomic_int_set (&data->cache->available, 0); MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata"); msg_info_map ("cached data is now expired for %s", map->name); - event_del (&cache_cbd->timeout); + ev_timer_stop (loop, &cache_cbd->timeout); g_free (cache_cbd); } } @@ -436,7 +454,6 @@ http_map_finish (struct rspamd_http_connection *conn, struct rspamd_map_backend *bk; struct http_map_data *data; struct rspamd_http_map_cached_cbdata *cache_cbd; - struct timeval tv; const rspamd_ftok_t *expires_hdr, *etag_hdr; char next_check_date[128]; guchar *aux_data, *in = NULL; @@ -456,7 +473,7 @@ http_map_finish (struct rspamd_http_connection *conn, g_atomic_int_set (&data->cache->available, 0); data->cur_cache_cbd = NULL; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic); + rspamd_map_process_periodic (cbd->periodic); MAP_RELEASE (cbd, "http_callback_data"); return 0; @@ -622,6 +639,8 @@ read_data: } /* Check for expires */ + double cached_timeout = map->poll_timeout * 2; + expires_hdr = rspamd_http_message_find_header (msg, "Expires"); if (expires_hdr) { @@ -635,19 +654,12 @@ read_data: hdate = MIN (map->next_check, hdate); } - double cached_timeout = map->next_check - msg->date + - map->poll_timeout * 2; + cached_timeout = map->next_check - msg->date + + map->poll_timeout * 2; map->next_check = hdate; - double_to_tv (cached_timeout, &tv); - } - else { - double_to_tv (map->poll_timeout * 2, &tv); } } - else { - double_to_tv (map->poll_timeout * 2, &tv); - } /* Check for etag */ etag_hdr = rspamd_http_message_find_header (msg, "ETag"); @@ -682,16 +694,17 @@ read_data: data->cache->last_modified = cbd->data->last_modified; cache_cbd = g_malloc0 (sizeof (*cache_cbd)); cache_cbd->shm = cbd->shmem_data; + cache_cbd->event_loop = cbd->event_loop; cache_cbd->map = map; cache_cbd->data = cbd->data; cache_cbd->last_checked = cbd->data->last_checked; cache_cbd->gen = cbd->data->gen; MAP_RETAIN (cache_cbd->shm, "shmem_data"); - event_set (&cache_cbd->timeout, -1, EV_TIMEOUT, rspamd_map_cache_cb, - cache_cbd); - event_base_set (cbd->ev_base, &cache_cbd->timeout); - event_add (&cache_cbd->timeout, &tv); + ev_timer_init (&cache_cbd->timeout, rspamd_map_cache_cb, cached_timeout, + 0.0); + ev_timer_start (cbd->event_loop, &cache_cbd->timeout); + cache_cbd->timeout.data = cache_cbd; data->cur_cache_cbd = cache_cbd; if (map->next_check) { @@ -700,7 +713,7 @@ read_data: } else { rspamd_http_date_format (next_check_date, sizeof (next_check_date), - time (NULL) + map->poll_timeout); + ev_now (cbd->event_loop) + map->poll_timeout); } @@ -773,7 +786,7 @@ read_data: cbd->periodic->cur_backend ++; munmap (in, dlen); - rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic); + rspamd_map_process_periodic (cbd->periodic); } else if (msg->code == 304 && (cbd->check && cbd->stage == map_load_file)) { cbd->data->last_checked = msg->date; @@ -819,13 +832,13 @@ read_data: } else { rspamd_http_date_format (next_check_date, sizeof (next_check_date), - time (NULL) + map->poll_timeout); + ev_now (cbd->event_loop) + map->poll_timeout); } msg_info_map ("data is not modified for server %s, next check at %s", cbd->data->host, next_check_date); cbd->periodic->cur_backend ++; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic); + rspamd_map_process_periodic (cbd->periodic); } else { msg_info_map ("cannot load map %s from %s: HTTP error %d", @@ -838,7 +851,7 @@ read_data: err: cbd->periodic->errored = 1; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic); + rspamd_map_process_periodic (cbd->periodic); MAP_RELEASE (cbd, "http_callback_data"); return 0; @@ -951,6 +964,7 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data, } } + ev_stat_stat (map->event_loop, &data->st_ev); len = st.st_size; if (bk->is_signed) { @@ -1045,9 +1059,6 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data, map->read_callback (NULL, 0, &periodic->cbdata, TRUE); } - /* Also update at the read time */ - memcpy (&data->st, &st, sizeof (struct stat)); - return TRUE; } @@ -1143,7 +1154,6 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic) map = periodic->map; msg_debug_map ("periodic dtor %p", periodic); - event_del (&periodic->ev); if (periodic->need_modify) { /* We are done */ @@ -1162,6 +1172,16 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic) g_free (periodic); } +/* Called on timer execution */ +static void +rspamd_map_periodic_callback (struct ev_loop *loop, ev_timer *w, int revents) +{ + struct map_periodic_cbdata *cbd = (struct map_periodic_cbdata *)w->data; + + ev_timer_stop (loop, w); + rspamd_map_process_periodic (cbd); +} + static void rspamd_map_schedule_periodic (struct rspamd_map *map, gboolean locked, gboolean initial, gboolean errored) @@ -1221,17 +1241,15 @@ rspamd_map_schedule_periodic (struct rspamd_map *map, cbd->cbdata.cur_data = NULL; cbd->cbdata.map = map; cbd->map = map; - map->scheduled_check = TRUE; + map->scheduled_check = cbd; REF_INIT_RETAIN (cbd, rspamd_map_periodic_dtor); - evtimer_set (&cbd->ev, rspamd_map_periodic_callback, cbd); - event_base_set (map->ev_base, &cbd->ev); - + cbd->ev.data = cbd; + ev_timer_init (&cbd->ev, rspamd_map_periodic_callback, jittered_sec, 0.0); + ev_timer_start (map->event_loop, &cbd->ev); msg_debug_map ("schedule new periodic event %p in %.2f seconds", cbd, jittered_sec); - double_to_tv (jittered_sec, &map->tv); - evtimer_add (&cbd->ev, &map->tv); } static void @@ -1286,7 +1304,7 @@ rspamd_map_dns_callback (struct rdns_reply *reply, void *arg) msg_err_map ("cannot resolve %s: %s", cbd->data->host, rdns_strerror (reply->code)); cbd->periodic->errored = 1; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic); + rspamd_map_process_periodic (cbd->periodic); } } @@ -1567,7 +1585,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map, periodic->need_modify = TRUE; /* Reset the whole chain */ periodic->cur_backend = 0; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); } else { if (map->active_http) { @@ -1577,7 +1595,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map, else { /* Switch to the next backend */ periodic->cur_backend++; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); } } @@ -1592,7 +1610,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map, /* Switch to the next backend */ periodic->cur_backend++; data->last_modified = data->cache->last_modified; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); return; } @@ -1601,7 +1619,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map, else if (!map->active_http) { /* Switch to the next backend */ periodic->cur_backend ++; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); return; } @@ -1609,7 +1627,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map, check: cbd = g_malloc0 (sizeof (struct http_callback_data)); - cbd->ev_base = map->ev_base; + cbd->event_loop = map->event_loop; cbd->map = map; cbd->data = data; cbd->check = check; @@ -1618,7 +1636,6 @@ check: cbd->bk = bk; MAP_RETAIN (bk, "rspamd_map_backend"); cbd->stage = map_resolve_host2; - double_to_tv (map->cfg->map_timeout, &cbd->tv); REF_INIT_RETAIN (cbd, free_http_cbdata); msg_debug_map ("%s map data from %s", check ? "checking" : "reading", @@ -1673,9 +1690,8 @@ check: } static void -rspamd_map_http_check_callback (gint fd, short what, void *ud) +rspamd_map_http_check_callback (struct map_periodic_cbdata *cbd) { - struct map_periodic_cbdata *cbd = ud; struct rspamd_map *map; struct rspamd_map_backend *bk; @@ -1686,9 +1702,8 @@ rspamd_map_http_check_callback (gint fd, short what, void *ud) } static void -rspamd_map_http_read_callback (gint fd, short what, void *ud) +rspamd_map_http_read_callback (struct map_periodic_cbdata *cbd) { - struct map_periodic_cbdata *cbd = ud; struct rspamd_map *map; struct rspamd_map_backend *bk; @@ -1698,43 +1713,36 @@ rspamd_map_http_read_callback (gint fd, short what, void *ud) } static void -rspamd_map_file_check_callback (gint fd, short what, void *ud) +rspamd_map_file_check_callback (struct map_periodic_cbdata *periodic) { struct rspamd_map *map; - struct map_periodic_cbdata *periodic = ud; struct file_map_data *data; struct rspamd_map_backend *bk; - struct stat st; map = periodic->map; - bk = g_ptr_array_index (map->backends, periodic->cur_backend); data = bk->data.fd; - if (stat (data->filename, &st) != -1 && - (st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) { - /* File was modified since last check */ - msg_info_map ("old mtime is %t, new mtime is %t for map file %s", - data->st.st_mtime, st.st_mtime, data->filename); - memcpy (&data->st, &st, sizeof (struct stat)); + if (data->need_modify) { periodic->need_modify = TRUE; periodic->cur_backend = 0; + data->need_modify = FALSE; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); return; } - /* Switch to the next backend */ + map = periodic->map; + /* Switch to the next backend as the rest is handled by ev_stat */ periodic->cur_backend ++; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); } static void -rspamd_map_static_check_callback (gint fd, short what, void *ud) +rspamd_map_static_check_callback (struct map_periodic_cbdata *periodic) { struct rspamd_map *map; - struct map_periodic_cbdata *periodic = ud; struct static_map_data *data; struct rspamd_map_backend *bk; @@ -1746,21 +1754,20 @@ rspamd_map_static_check_callback (gint fd, short what, void *ud) periodic->need_modify = TRUE; periodic->cur_backend = 0; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); return; } /* Switch to the next backend */ periodic->cur_backend ++; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); } static void -rspamd_map_file_read_callback (gint fd, short what, void *ud) +rspamd_map_file_read_callback (struct map_periodic_cbdata *periodic) { struct rspamd_map *map; - struct map_periodic_cbdata *periodic = ud; struct file_map_data *data; struct rspamd_map_backend *bk; @@ -1777,14 +1784,13 @@ rspamd_map_file_read_callback (gint fd, short what, void *ud) /* Switch to the next backend */ periodic->cur_backend ++; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); } static void -rspamd_map_static_read_callback (gint fd, short what, void *ud) +rspamd_map_static_read_callback (struct map_periodic_cbdata *periodic) { struct rspamd_map *map; - struct map_periodic_cbdata *periodic = ud; struct static_map_data *data; struct rspamd_map_backend *bk; @@ -1801,18 +1807,17 @@ rspamd_map_static_read_callback (gint fd, short what, void *ud) /* Switch to the next backend */ periodic->cur_backend ++; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); } static void -rspamd_map_periodic_callback (gint fd, short what, void *ud) +rspamd_map_process_periodic (struct map_periodic_cbdata *cbd) { struct rspamd_map_backend *bk; - struct map_periodic_cbdata *cbd = ud; struct rspamd_map *map; map = cbd->map; - map->scheduled_check = FALSE; + map->scheduled_check = NULL; if (!cbd->locked) { if (!g_atomic_int_compare_and_exchange (cbd->map->locked, 0, 1)) { @@ -1863,13 +1868,13 @@ rspamd_map_periodic_callback (gint fd, short what, void *ud) switch (bk->protocol) { case MAP_PROTO_HTTP: case MAP_PROTO_HTTPS: - rspamd_map_http_read_callback (fd, what, cbd); + rspamd_map_http_read_callback (cbd); break; case MAP_PROTO_FILE: - rspamd_map_file_read_callback (fd, what, cbd); + rspamd_map_file_read_callback (cbd); break; case MAP_PROTO_STATIC: - rspamd_map_static_read_callback (fd, what, cbd); + rspamd_map_static_read_callback (cbd); break; } } else { @@ -1877,34 +1882,70 @@ rspamd_map_periodic_callback (gint fd, short what, void *ud) switch (bk->protocol) { case MAP_PROTO_HTTP: case MAP_PROTO_HTTPS: - rspamd_map_http_check_callback (fd, what, cbd); + rspamd_map_http_check_callback (cbd); break; case MAP_PROTO_FILE: - rspamd_map_file_check_callback (fd, what, cbd); + rspamd_map_file_check_callback (cbd); break; case MAP_PROTO_STATIC: - rspamd_map_static_check_callback (fd, what, cbd); + rspamd_map_static_check_callback (cbd); break; } } } } +static void +rspamd_map_on_stat (struct ev_loop *loop, ev_stat *w, int revents) +{ + struct rspamd_map *map = (struct rspamd_map *)w->data; + + if (w->attr.st_nlink > 0) { + + if (w->attr.st_mtime > w->prev.st_mtime) { + msg_info_map ("old mtime is %t, new mtime is %t for map file %s", + w->prev.st_mtime, w->attr.st_mtime, w->path); + + /* Fire need modify flag */ + struct rspamd_map_backend *bk; + guint i; + + PTR_ARRAY_FOREACH (map->backends, i, bk) { + if (bk->protocol == MAP_PROTO_FILE) { + bk->data.fd->need_modify = TRUE; + } + } + + map->next_check = 0; + + if (map->scheduled_check) { + ev_timer_stop (map->event_loop, &map->scheduled_check->ev); + MAP_RELEASE (map->scheduled_check, "rspamd_map_on_stat"); + map->scheduled_check = NULL; + } + + rspamd_map_schedule_periodic (map, FALSE, TRUE, FALSE); + } + } +} + /* Start watching event for all maps */ void rspamd_map_watch (struct rspamd_config *cfg, - struct event_base *ev_base, + struct ev_loop *event_loop, struct rspamd_dns_resolver *resolver, struct rspamd_worker *worker, gboolean active_http) { GList *cur = cfg->maps; struct rspamd_map *map; + struct rspamd_map_backend *bk; + guint i; /* First of all do synced read of data */ while (cur) { map = cur->data; - map->ev_base = ev_base; + map->event_loop = event_loop; map->r = resolver; map->wrk = worker; @@ -1922,6 +1963,21 @@ rspamd_map_watch (struct rspamd_config *cfg, } } + PTR_ARRAY_FOREACH (map->backends, i, bk) { + bk->event_loop = event_loop; + + if (bk->protocol == MAP_PROTO_FILE) { + struct file_map_data *data; + + data = bk->data.fd; + + ev_stat_init (&data->st_ev, rspamd_map_on_stat, + data->filename, map->poll_timeout * cfg->map_file_watch_multiplier); + data->st_ev.data = map; + ev_stat_start (event_loop, &data->st_ev); + } + } + rspamd_map_schedule_periodic (map, FALSE, TRUE, FALSE); cur = g_list_next (cur); @@ -2215,6 +2271,7 @@ rspamd_map_backend_dtor (struct rspamd_map_backend *bk) switch (bk->protocol) { case MAP_PROTO_FILE: if (bk->data.fd) { + ev_stat_stop (bk->event_loop, &bk->data.fd->st_ev); g_free (bk->data.fd->filename); g_free (bk->data.fd); } @@ -2249,7 +2306,8 @@ rspamd_map_backend_dtor (struct rspamd_map_backend *bk) if (data->cur_cache_cbd) { MAP_RELEASE (data->cur_cache_cbd->shm, "rspamd_http_map_cached_cbdata"); - event_del (&data->cur_cache_cbd->timeout); + ev_timer_stop (data->cur_cache_cbd->event_loop, + &data->cur_cache_cbd->timeout); g_free (data->cur_cache_cbd); data->cur_cache_cbd = NULL; } @@ -2308,7 +2366,6 @@ rspamd_map_parse_backend (struct rspamd_config *cfg, const gchar *map_line) /* Now check for each proto separately */ if (bk->protocol == MAP_PROTO_FILE) { fdata = g_malloc0 (sizeof (struct file_map_data)); - fdata->st.st_mtime = -1; if (access (bk->uri, R_OK) == -1) { if (errno != ENOENT) { diff --git a/src/libutil/map.h b/src/libutil/map.h index acf6eea4e..9f04d4c6c 100644 --- a/src/libutil/map.h +++ b/src/libutil/map.h @@ -2,7 +2,7 @@ #define RSPAMD_MAP_H #include "config.h" -#include <event.h> +#include "contrib/libev/ev.h" #include "ucl.h" #include "mem_pool.h" @@ -79,7 +79,7 @@ struct rspamd_map* rspamd_map_add_from_ucl (struct rspamd_config *cfg, * Start watching of maps by adding events to libevent event loop */ void rspamd_map_watch (struct rspamd_config *cfg, - struct event_base *ev_base, + struct ev_loop *event_loop, struct rspamd_dns_resolver *resolver, struct rspamd_worker *worker, gboolean active_http); diff --git a/src/libutil/map_private.h b/src/libutil/map_private.h index b32f0e390..e08c2dce3 100644 --- a/src/libutil/map_private.h +++ b/src/libutil/map_private.h @@ -54,14 +54,16 @@ enum fetch_proto { */ struct file_map_data { gchar *filename; - struct stat st; + gboolean need_modify; + ev_stat st_ev; }; struct http_map_data; struct rspamd_http_map_cached_cbdata { - struct event timeout; + ev_timer timeout; + struct ev_loop *event_loop; struct rspamd_storage_shmem *shm; struct rspamd_map *map; struct http_map_data *data; @@ -114,6 +116,7 @@ struct rspamd_map_backend { gboolean is_signed; gboolean is_compressed; gboolean is_fallback; + struct ev_loop *event_loop; guint32 id; struct rspamd_cryptobox_pubkey *trusted_pubkey; union rspamd_map_backend_data data; @@ -121,6 +124,8 @@ struct rspamd_map_backend { ref_entry_t ref; }; +struct map_periodic_cbdata; + struct rspamd_map { struct rspamd_dns_resolver *r; struct rspamd_config *cfg; @@ -130,12 +135,12 @@ struct rspamd_map { map_fin_cb_t fin_callback; map_dtor_t dtor; void **user_data; - struct event_base *ev_base; + struct ev_loop *event_loop; struct rspamd_worker *wrk; gchar *description; gchar *name; guint32 id; - gboolean scheduled_check; + struct map_periodic_cbdata *scheduled_check; rspamd_map_tmp_dtor tmp_dtor; gpointer tmp_dtor_data; rspamd_map_traverse_function traverse_function; @@ -143,7 +148,7 @@ struct rspamd_map { gsize nelts; guint64 digest; /* Should we check HTTP or just load cached data */ - struct timeval tv; + ev_tstamp timeout; gdouble poll_timeout; time_t next_check; gboolean active_http; @@ -164,7 +169,7 @@ enum rspamd_map_http_stage { struct map_periodic_cbdata { struct rspamd_map *map; struct map_cb_data cbdata; - struct event ev; + ev_timer ev; gboolean need_modify; gboolean errored; gboolean locked; @@ -183,7 +188,7 @@ struct rspamd_http_file_data { }; struct http_callback_data { - struct event_base *ev_base; + struct ev_loop *event_loop; struct rspamd_http_connection *conn; rspamd_inet_addr_t *addr; struct rspamd_map *map; @@ -191,16 +196,15 @@ struct http_callback_data { struct http_map_data *data; struct map_periodic_cbdata *periodic; struct rspamd_cryptobox_pubkey *pk; - gboolean check; struct rspamd_storage_shmem *shmem_data; struct rspamd_storage_shmem *shmem_sig; struct rspamd_storage_shmem *shmem_pubkey; gsize data_len; gsize sig_len; gsize pubkey_len; - + gboolean check; enum rspamd_map_http_stage stage; - struct timeval tv; + ev_tstamp timeout; ref_entry_t ref; }; diff --git a/src/libutil/ssl_util.c b/src/libutil/ssl_util.c index 95245aa4c..7d4612b3d 100644 --- a/src/libutil/ssl_util.c +++ b/src/libutil/ssl_util.c @@ -45,9 +45,8 @@ struct rspamd_ssl_connection { gboolean verify_peer; SSL *ssl; gchar *hostname; - struct event *ev; - struct event_base *ev_base; - struct timeval *tv; + struct rspamd_io_ev *ev; + struct ev_loop *event_loop; rspamd_ssl_handler_t handler; rspamd_ssl_error_handler_t err_handler; gpointer handler_data; @@ -407,7 +406,7 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud) gint ret; GError *err = NULL; - if (what == EV_TIMEOUT) { + if (what == EV_TIMER) { c->shut = ssl_shut_unclean; } @@ -417,7 +416,7 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud) ret = SSL_connect (c->ssl); if (ret == 1) { - event_del (c->ev); + rspamd_ev_watcher_stop (c->event_loop, c->ev); /* Verify certificate */ if ((!c->verify_peer) || rspamd_ssl_peer_verify (c)) { c->state = ssl_conn_connected; @@ -437,40 +436,30 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud) what = EV_WRITE; } else { + rspamd_ev_watcher_stop (c->event_loop, c->ev); rspamd_tls_set_error (ret, "connect", &err); c->err_handler (c->handler_data, err); g_error_free (err); return; } - event_del (c->ev); - event_set (c->ev, fd, what, rspamd_ssl_event_handler, c); - event_base_set (c->ev_base, c->ev); - event_add (c->ev, c->tv); + rspamd_ev_watcher_reschedule (c->event_loop, c->ev, what); + } break; case ssl_next_read: - event_del (c->ev); - /* Restore handler */ - event_set (c->ev, c->fd, EV_READ|EV_PERSIST, - c->handler, c->handler_data); - event_base_set (c->ev_base, c->ev); - event_add (c->ev, c->tv); + rspamd_ev_watcher_reschedule (c->event_loop, c->ev, EV_READ); c->state = ssl_conn_connected; c->handler (fd, EV_READ, c->handler_data); break; case ssl_next_write: case ssl_conn_connected: - event_del (c->ev); - /* Restore handler */ - event_set (c->ev, c->fd, EV_WRITE, - c->handler, c->handler_data); - event_base_set (c->ev_base, c->ev); - event_add (c->ev, c->tv); + rspamd_ev_watcher_reschedule (c->event_loop, c->ev, what); c->state = ssl_conn_connected; - c->handler (fd, EV_WRITE, c->handler_data); + c->handler (fd, what, c->handler_data); break; default: + rspamd_ev_watcher_stop (c->event_loop, c->ev); g_set_error (&err, rspamd_ssl_quark (), EINVAL, "ssl bad state error: %d", c->state); c->err_handler (c->handler_data, err); @@ -480,7 +469,7 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud) } struct rspamd_ssl_connection * -rspamd_ssl_connection_new (gpointer ssl_ctx, struct event_base *ev_base, +rspamd_ssl_connection_new (gpointer ssl_ctx, struct ev_loop *ev_base, gboolean verify_peer) { struct rspamd_ssl_connection *c; @@ -488,7 +477,7 @@ rspamd_ssl_connection_new (gpointer ssl_ctx, struct event_base *ev_base, g_assert (ssl_ctx != NULL); c = g_malloc0 (sizeof (*c)); c->ssl = SSL_new (ssl_ctx); - c->ev_base = ev_base; + c->event_loop = ev_base; c->verify_peer = verify_peer; return c; @@ -497,7 +486,7 @@ rspamd_ssl_connection_new (gpointer ssl_ctx, struct event_base *ev_base, gboolean rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd, - const gchar *hostname, struct event *ev, struct timeval *tv, + const gchar *hostname, struct rspamd_io_ev *ev, ev_tstamp timeout, rspamd_ssl_handler_t handler, rspamd_ssl_error_handler_t err_handler, gpointer handler_data) { @@ -534,17 +523,9 @@ rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd, if (ret == 1) { conn->state = ssl_conn_connected; - if (rspamd_event_pending (ev, EV_TIMEOUT|EV_WRITE|EV_READ)) { - event_del (ev); - } - - event_set (ev, fd, EV_WRITE, rspamd_ssl_event_handler, conn); - - if (conn->ev_base) { - event_base_set (conn->ev_base, ev); - } - - event_add (ev, tv); + rspamd_ev_watcher_stop (conn->event_loop, ev); + rspamd_ev_watcher_init (ev, fd, EV_WRITE, rspamd_ssl_event_handler, conn); + rspamd_ev_watcher_start (conn->event_loop, ev, timeout); } else { ret = SSL_get_error (conn->ssl, ret); @@ -561,13 +542,10 @@ rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd, return FALSE; } - if (rspamd_event_pending (ev, EV_TIMEOUT|EV_WRITE|EV_READ)) { - event_del (ev); - } - - event_set (ev, fd, what, rspamd_ssl_event_handler, conn); - event_base_set (conn->ev_base, ev); - event_add (ev, tv); + rspamd_ev_watcher_stop (conn->event_loop, ev); + rspamd_ev_watcher_init (ev, fd, EV_WRITE|EV_READ, + rspamd_ssl_event_handler, conn); + rspamd_ev_watcher_start (conn->event_loop, ev, timeout); } return TRUE; @@ -638,13 +616,8 @@ rspamd_ssl_read (struct rspamd_ssl_connection *conn, gpointer buf, return -1; } - event_del (conn->ev); - event_set (conn->ev, conn->fd, what, rspamd_ssl_event_handler, conn); - event_base_set (conn->ev_base, conn->ev); - event_add (conn->ev, conn->tv); - + rspamd_ev_watcher_reschedule (conn->event_loop, conn->ev, what); errno = EAGAIN; - } return -1; @@ -713,11 +686,7 @@ rspamd_ssl_write (struct rspamd_ssl_connection *conn, gconstpointer buf, return -1; } - event_del (conn->ev); - event_set (conn->ev, conn->fd, what, rspamd_ssl_event_handler, conn); - event_base_set (conn->ev_base, conn->ev); - event_add (conn->ev, conn->tv); - + rspamd_ev_watcher_reschedule (conn->event_loop, conn->ev, what); errno = EAGAIN; } diff --git a/src/libutil/ssl_util.h b/src/libutil/ssl_util.h index 73a940e00..f7f1652de 100644 --- a/src/libutil/ssl_util.h +++ b/src/libutil/ssl_util.h @@ -18,6 +18,7 @@ #include "config.h" #include "libutil/addr.h" +#include "libutil/libev_helper.h" struct rspamd_ssl_connection; @@ -30,7 +31,7 @@ typedef void (*rspamd_ssl_error_handler_t)(gpointer d, GError *err); * @return opaque connection data */ struct rspamd_ssl_connection * rspamd_ssl_connection_new (gpointer ssl_ctx, - struct event_base *ev_base, gboolean verify_peer); + struct ev_loop *ev_base, gboolean verify_peer); /** * Connects SSL session using the specified (connected) FD @@ -44,7 +45,7 @@ struct rspamd_ssl_connection * rspamd_ssl_connection_new (gpointer ssl_ctx, * @return TRUE if a session has been connected */ gboolean rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd, - const gchar *hostname, struct event *ev, struct timeval *tv, + const gchar *hostname, struct rspamd_io_ev *ev, ev_tstamp timeout, rspamd_ssl_handler_t handler, rspamd_ssl_error_handler_t err_handler, gpointer handler_data); diff --git a/src/libutil/str_util.h b/src/libutil/str_util.h index 8e8898a32..6fbb11ccf 100644 --- a/src/libutil/str_util.h +++ b/src/libutil/str_util.h @@ -83,10 +83,18 @@ gsize rspamd_strlcpy_safe (gchar *dst, const gchar *src, gsize siz); # if __has_feature(address_sanitizer) # define rspamd_strlcpy rspamd_strlcpy_safe # else -# define rspamd_strlcpy rspamd_strlcpy_fast +# ifdef __SANITIZE_ADDRESS__ +# define rspamd_strlcpy rspamd_strlcpy_safe +# else +# define rspamd_strlcpy rspamd_strlcpy_fast +# endif # endif #else -# define rspamd_strlcpy rspamd_strlcpy_fast +# ifdef __SANITIZE_ADDRESS__ +# define rspamd_strlcpy rspamd_strlcpy_safe +# else +# define rspamd_strlcpy rspamd_strlcpy_fast +# endif #endif /** diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index 3e04e68e9..c445751b4 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -50,7 +50,7 @@ struct upstream { guint dns_requests; gint active_idx; gchar *name; - struct event ev; + ev_timer ev; gdouble last_fail; gpointer ud; struct upstream_list *ls; @@ -92,7 +92,7 @@ struct upstream_list { struct upstream_ctx { struct rdns_resolver *res; - struct event_base *ev_base; + struct ev_loop *event_loop; struct upstream_limits limits; GQueue *upstreams; gboolean configured; @@ -119,7 +119,7 @@ static guint default_dns_retransmits = 2; void rspamd_upstreams_library_config (struct rspamd_config *cfg, struct upstream_ctx *ctx, - struct event_base *ev_base, + struct ev_loop *event_loop, struct rdns_resolver *resolver) { g_assert (ctx != NULL); @@ -141,7 +141,7 @@ rspamd_upstreams_library_config (struct rspamd_config *cfg, ctx->limits.dns_timeout = cfg->dns_timeout; } - ctx->ev_base = ev_base; + ctx->event_loop = event_loop; ctx->res = resolver; ctx->configured = TRUE; } @@ -366,12 +366,12 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg) } static void -rspamd_upstream_revive_cb (int fd, short what, void *arg) +rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents) { - struct upstream *up = (struct upstream *)arg; + struct upstream *up = (struct upstream *)w->data; RSPAMD_UPSTREAM_LOCK (up->lock); - event_del (&up->ev); + ev_timer_stop (loop, w); if (up->ls) { rspamd_upstream_set_active (up->ls, up); } @@ -414,7 +414,6 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) gdouble ntim; guint i; struct upstream *cur; - struct timeval tv; struct upstream_list_watcher *w; RSPAMD_UPSTREAM_LOCK (ls->lock); @@ -431,15 +430,14 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) rspamd_upstream_resolve_addrs (ls, up); REF_RETAIN (up); - evtimer_set (&up->ev, rspamd_upstream_revive_cb, up); - if (up->ctx->ev_base != NULL && up->ctx->configured) { - event_base_set (up->ctx->ev_base, &up->ev); - } - ntim = rspamd_time_jitter (ls->limits.revive_time, ls->limits.revive_jitter); - double_to_tv (ntim, &tv); - event_add (&up->ev, &tv); + ev_timer_init (&up->ev, rspamd_upstream_revive_cb, ntim, 0); + up->ev.data = up; + + if (up->ctx->event_loop != NULL && up->ctx->configured) { + ev_timer_start (up->ctx->event_loop, &up->ev); + } } DL_FOREACH (up->ls->watchers, w) { @@ -915,9 +913,7 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls) /* Here the upstreams list is already locked */ RSPAMD_UPSTREAM_LOCK (up->lock); - if (rspamd_event_pending (&up->ev, EV_TIMEOUT)) { - event_del (&up->ev); - } + ev_timer_stop (up->ctx->event_loop, &up->ev); g_ptr_array_add (ups->alive, up); up->active_idx = ups->alive->len - 1; RSPAMD_UPSTREAM_UNLOCK (up->lock); diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h index 75d840ce2..89ac0ee9e 100644 --- a/src/libutil/upstream.h +++ b/src/libutil/upstream.h @@ -41,7 +41,7 @@ void rspamd_upstreams_library_unref (struct upstream_ctx *ctx); * @param cfg */ void rspamd_upstreams_library_config (struct rspamd_config *cfg, - struct upstream_ctx *ctx, struct event_base *ev_base, + struct upstream_ctx *ctx, struct ev_loop *event_loop, struct rdns_resolver *resolver); /** diff --git a/src/libutil/util.c b/src/libutil/util.c index df10bf912..e7a5c2601 100644 --- a/src/libutil/util.c +++ b/src/libutil/util.c @@ -1612,42 +1612,6 @@ rspamd_thread_func (gpointer ud) return ud; } -/** - * Create new named thread - * @param name name pattern - * @param func function to start - * @param data data to pass to function - * @param err error pointer - * @return new thread object that can be joined - */ -GThread * -rspamd_create_thread (const gchar *name, - GThreadFunc func, - gpointer data, - GError **err) -{ - GThread *new; - struct rspamd_thread_data *td; - static gint32 id; - guint r; - - r = strlen (name); - td = g_malloc (sizeof (struct rspamd_thread_data)); - td->id = ++id; - td->name = g_malloc (r + sizeof ("4294967296")); - td->func = func; - td->data = data; - - rspamd_snprintf (td->name, r + sizeof ("4294967296"), "%s-%d", name, id); -#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 32)) - new = g_thread_try_new (td->name, rspamd_thread_func, td, err); -#else - new = g_thread_create (rspamd_thread_func, td, TRUE, err); -#endif - - return new; -} - struct hash_copy_callback_data { gpointer (*key_copy_func)(gconstpointer data, gpointer ud); gpointer (*value_copy_func)(gconstpointer data, gpointer ud); @@ -2570,24 +2534,6 @@ rspamd_constant_memcmp (const guchar *a, const guchar *b, gsize len) return (((gint32)(guint16)((guint32)r + 0x8000) - 0x8000) == 0); } -#if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x02000000UL -struct event_base * -event_get_base (struct event *ev) -{ - return ev->ev_base; -} -#endif - -int -rspamd_event_pending (struct event *ev, short what) -{ - if (ev->ev_base == NULL) { - return 0; - } - - return event_pending (ev, what, NULL); -} - int rspamd_file_xopen (const char *fname, int oflags, guint mode, gboolean allow_symlink) diff --git a/src/libutil/util.h b/src/libutil/util.h index 9d12285d4..21e4b320e 100644 --- a/src/libutil/util.h +++ b/src/libutil/util.h @@ -12,7 +12,7 @@ #include <netdb.h> #endif -#include <event.h> +#include "contrib/libev/ev.h" #include <time.h> struct rspamd_config; @@ -263,19 +263,6 @@ void rspamd_mutex_unlock (rspamd_mutex_t *mtx); void rspamd_mutex_free (rspamd_mutex_t *mtx); /** - * Create new named thread - * @param name name pattern - * @param func function to start - * @param data data to pass to function - * @param err error pointer - * @return new thread object that can be joined - */ -GThread * rspamd_create_thread (const gchar *name, - GThreadFunc func, - gpointer data, - GError **err); - -/** * Deep copy of one hash table to another * @param src source hash * @param dst destination hash @@ -426,19 +413,6 @@ void rspamd_random_seed_fast (void); */ gboolean rspamd_constant_memcmp (const guchar *a, const guchar *b, gsize len); -/* Special case for ancient libevent */ -#if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x02000000UL -struct event_base * event_get_base (struct event *ev); -#endif -/* CentOS libevent */ -#ifndef evsignal_set -#define evsignal_set(ev, x, cb, arg) \ - event_set((ev), (x), EV_SIGNAL|EV_PERSIST, (cb), (arg)) -#endif - -/* Avoid stupidity in libevent > 1.4 */ -int rspamd_event_pending (struct event *ev, short what); - /** * Open file without following symlinks or special stuff * @param fname filename |