From: Vsevolod Stakhov Date: Sun, 16 Jun 2019 17:32:55 +0000 (+0100) Subject: [Project] Rework HTTP IO X-Git-Tag: 2.0~755^2~41 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=e463ad556cb35ee39b92dbf7d3934d4187ab70d2;p=rspamd.git [Project] Rework HTTP IO --- diff --git a/src/libutil/CMakeLists.txt b/src/libutil/CMakeLists.txt index fd29c5512..5a94a732c 100644 --- a/src/libutil/CMakeLists.txt +++ b/src/libutil/CMakeLists.txt @@ -2,7 +2,6 @@ SET(LIBRSPAMDUTILSRC ${CMAKE_CURRENT_SOURCE_DIR}/addr.c ${CMAKE_CURRENT_SOURCE_DIR}/libev_helper.c - ${CMAKE_CURRENT_SOURCE_DIR}/aio_event.c ${CMAKE_CURRENT_SOURCE_DIR}/bloom.c ${CMAKE_CURRENT_SOURCE_DIR}/expression.c ${CMAKE_CURRENT_SOURCE_DIR}/fstring.c diff --git a/src/libutil/aio_event.c b/src/libutil/aio_event.c deleted file mode 100644 index d0c8d3f63..000000000 --- a/src/libutil/aio_event.c +++ /dev/null @@ -1,508 +0,0 @@ -/*- - * Copyright 2016 Vsevolod Stakhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "config.h" -#include "contrib/libev/ev.h" -#include "aio_event.h" -#include "rspamd.h" -#include "unix-std.h" - -#ifdef HAVE_SYS_EVENTFD_H -#include -#endif - -#ifdef HAVE_AIO_H -#include -#endif - -/* Linux syscall numbers */ -#if defined(__i386__) -# define SYS_io_setup 245 -# define SYS_io_destroy 246 -# define SYS_io_getevents 247 -# define SYS_io_submit 248 -# define SYS_io_cancel 249 -#elif defined(__x86_64__) -# define SYS_io_setup 206 -# define SYS_io_destroy 207 -# define SYS_io_getevents 208 -# define SYS_io_submit 209 -# define SYS_io_cancel 210 -#else -# warning \ - "aio is not supported on this platform, please contact author for details" -# define SYS_io_setup 0 -# define SYS_io_destroy 0 -# define SYS_io_getevents 0 -# define SYS_io_submit 0 -# define SYS_io_cancel 0 -#endif - -#define SYS_eventfd 323 -#define MAX_AIO_EV 64 - -struct io_cbdata { - gint fd; - rspamd_aio_cb cb; - guint64 len; - gpointer buf; - gpointer io_buf; - gpointer ud; -}; - -#ifdef LINUX - -/* Linux specific mappings and utilities to avoid using of libaio */ - -typedef unsigned long aio_context_t; - -typedef enum io_iocb_cmd { - IO_CMD_PREAD = 0, - IO_CMD_PWRITE = 1, - - IO_CMD_FSYNC = 2, - IO_CMD_FDSYNC = 3, - - IO_CMD_POLL = 5, - IO_CMD_NOOP = 6, -} io_iocb_cmd_t; - -#if defined(__LITTLE_ENDIAN) -#define PADDED(x,y) x, y -#elif defined(__BIG_ENDIAN) -#define PADDED(x,y) y, x -#else -#error edit for your odd byteorder. -#endif - -/* - * we always use a 64bit off_t when communicating - * with userland. its up to libraries to do the - * proper padding and aio_error abstraction - */ - -struct iocb { - /* these are internal to the kernel/libc. */ - guint64 aio_data; /* data to be returned in event's data */ - guint32 PADDED (aio_key, aio_reserved1); - /* the kernel sets aio_key to the req # */ - - /* common fields */ - guint16 aio_lio_opcode; /* see IOCB_CMD_ above */ - gint16 aio_reqprio; - guint32 aio_fildes; - - guint64 aio_buf; - guint64 aio_nbytes; - gint64 aio_offset; - - /* extra parameters */ - guint64 aio_reserved2; /* TODO: use this for a (struct sigevent *) */ - - /* flags for the "struct iocb" */ - guint32 aio_flags; - - /* - * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an - * eventfd to signal AIO readiness to - */ - guint32 aio_resfd; -}; - -struct io_event { - guint64 data; /* the data field from the iocb */ - guint64 obj; /* what iocb this event came from */ - gint64 res; /* result code for this event */ - gint64 res2; /* secondary result */ -}; - -/* Linux specific io calls */ -static int -io_setup (guint nr_reqs, aio_context_t *ctx) -{ - return syscall (SYS_io_setup, nr_reqs, ctx); -} - -static int -io_destroy (aio_context_t ctx) -{ - return syscall (SYS_io_destroy, ctx); -} - -static int -io_getevents (aio_context_t ctx, - long min_nr, - long nr, - struct io_event *events, - struct timespec *tmo) -{ - return syscall (SYS_io_getevents, ctx, min_nr, nr, events, tmo); -} - -static int -io_submit (aio_context_t ctx, long n, struct iocb **paiocb) -{ - return syscall (SYS_io_submit, ctx, n, paiocb); -} - -static int -io_cancel (aio_context_t ctx, struct iocb *iocb, struct io_event *result) -{ - return syscall (SYS_io_cancel, ctx, iocb, result); -} - -# ifndef HAVE_SYS_EVENTFD_H -static int -eventfd (guint initval, guint flags) -{ - return syscall (SYS_eventfd, initval); -} -# endif - -#endif - -/** - * AIO context - */ -struct aio_context { - struct ev_loop *base; - gboolean has_aio; /**< Whether we have aio support on a system */ -#ifdef LINUX - /* Eventfd variant */ - gint event_fd; - struct event eventfd_ev; - aio_context_t io_ctx; -#elif defined(HAVE_AIO_H) - /* POSIX aio */ - struct event rtsigs[128]; -#endif -}; - -#ifdef LINUX -/* Eventfd read callback */ -static void -rspamd_eventfdcb (gint fd, gshort what, gpointer ud) -{ - struct aio_context *ctx = ud; - guint64 ready; - gint done, i; - struct io_event event[32]; - struct timespec ts; - struct io_cbdata *ev_data; - - /* Eventfd returns number of events ready got from kernel */ - if (read (fd, &ready, 8) != 8) { - if (errno == EAGAIN) { - return; - } - msg_err ("eventfd read returned error: %s", strerror (errno)); - } - - ts.tv_sec = 0; - ts.tv_nsec = 0; - - while (ready) { - /* Get events ready */ - done = io_getevents (ctx->io_ctx, 1, 32, event, &ts); - - if (done > 0) { - ready -= done; - - for (i = 0; i < done; i++) { - ev_data = (struct io_cbdata *) (uintptr_t) event[i].data; - /* Call this callback */ - ev_data->cb (ev_data->fd, - event[i].res, - ev_data->len, - ev_data->buf, - ev_data->ud); - if (ev_data->io_buf) { - free (ev_data->io_buf); - } - g_free (ev_data); - } - } - else if (done == 0) { - /* No more events are ready */ - return; - } - else { - msg_err ("io_getevents failed: %s", strerror (errno)); - return; - } - } -} - -#endif - -/** - * Initialize aio with specified event base - */ -struct aio_context * -rspamd_aio_init (struct ev_loop *base) -{ - struct aio_context *new; - - /* First of all we need to detect which type of aio we can try to use */ - new = g_malloc0 (sizeof (struct aio_context)); - new->base = base; - -#ifdef LINUX - /* On linux we are trying to use io (3) and eventfd for notifying */ - new->event_fd = eventfd (0, 0); - if (new->event_fd == -1) { - msg_err ("eventfd failed: %s", strerror (errno)); - } - else { - /* Set this socket non-blocking */ - if (rspamd_socket_nonblocking (new->event_fd) == -1) { - msg_err ("non blocking for eventfd failed: %s", strerror (errno)); - close (new->event_fd); - } - else { - event_set (&new->eventfd_ev, - new->event_fd, - EV_READ | EV_PERSIST, - rspamd_eventfdcb, - new); - event_base_set (new->base, &new->eventfd_ev); - event_add (&new->eventfd_ev, NULL); - if (io_setup (MAX_AIO_EV, &new->io_ctx) == -1) { - msg_err ("io_setup failed: %s", strerror (errno)); - close (new->event_fd); - } - else { - new->has_aio = TRUE; - } - } - } -#elif defined(HAVE_AIO_H) - /* TODO: implement this */ -#endif - - return new; -} - -/** - * Open file for aio - */ -gint -rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags) -{ - gint fd = -1; - /* Fallback */ - if (!ctx->has_aio) { - return open (path, flags); - } -#ifdef LINUX - - fd = open (path, flags | O_DIRECT); - - return fd; -#elif defined(HAVE_AIO_H) - fd = open (path, flags); -#endif - - return fd; -} - -/** - * Asynchronous read of file - */ -gint -rspamd_aio_read (gint fd, - gpointer buf, - guint64 len, - guint64 offset, - struct aio_context *ctx, - rspamd_aio_cb cb, - gpointer ud) -{ - gint r = -1; - - if (ctx->has_aio) { -#ifdef LINUX - struct iocb *iocb[1]; - struct io_cbdata *cbdata; - - cbdata = g_malloc0 (sizeof (struct io_cbdata)); - cbdata->cb = cb; - cbdata->buf = buf; - cbdata->len = len; - cbdata->ud = ud; - cbdata->fd = fd; - cbdata->io_buf = NULL; - - iocb[0] = alloca (sizeof (struct iocb)); - memset (iocb[0], 0, sizeof (struct iocb)); - iocb[0]->aio_fildes = fd; - iocb[0]->aio_lio_opcode = IO_CMD_PREAD; - iocb[0]->aio_reqprio = 0; - iocb[0]->aio_buf = (guint64)((uintptr_t)buf); - iocb[0]->aio_nbytes = len; - iocb[0]->aio_offset = offset; - iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */; - iocb[0]->aio_resfd = ctx->event_fd; - iocb[0]->aio_data = (guint64)((uintptr_t)cbdata); - - /* Iocb is copied to kernel internally, so it is safe to put it on stack */ - if (io_submit (ctx->io_ctx, 1, iocb) == 1) { - return len; - } - else { - if (errno == EAGAIN || errno == ENOSYS) { - /* Fall back to sync read */ - goto blocking; - } - return -1; - } - -#elif defined(HAVE_AIO_H) -#endif - } - else { - /* Blocking variant */ - goto blocking; -blocking: -#ifdef _LARGEFILE64_SOURCE - r = lseek64 (fd, offset, SEEK_SET); -#else - r = lseek (fd, offset, SEEK_SET); -#endif - if (r > 0) { - r = read (fd, buf, len); - if (r >= 0) { - cb (fd, 0, r, buf, ud); - } - else { - cb (fd, r, -1, buf, ud); - } - } - } - - return r; -} - -/** - * Asynchronous write of file - */ -gint -rspamd_aio_write (gint fd, - gpointer buf, - guint64 len, - guint64 offset, - struct aio_context *ctx, - rspamd_aio_cb cb, - gpointer ud) -{ - gint r = -1; - - if (ctx->has_aio) { -#ifdef LINUX - struct iocb *iocb[1]; - struct io_cbdata *cbdata; - - cbdata = g_malloc0 (sizeof (struct io_cbdata)); - cbdata->cb = cb; - cbdata->buf = buf; - cbdata->len = len; - cbdata->ud = ud; - cbdata->fd = fd; - /* We need to align pointer on boundary of 512 bytes here */ - if (posix_memalign (&cbdata->io_buf, 512, len) != 0) { - return -1; - } - memcpy (cbdata->io_buf, buf, len); - - iocb[0] = alloca (sizeof (struct iocb)); - memset (iocb[0], 0, sizeof (struct iocb)); - iocb[0]->aio_fildes = fd; - iocb[0]->aio_lio_opcode = IO_CMD_PWRITE; - iocb[0]->aio_reqprio = 0; - iocb[0]->aio_buf = (guint64)((uintptr_t)cbdata->io_buf); - iocb[0]->aio_nbytes = len; - iocb[0]->aio_offset = offset; - iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */; - iocb[0]->aio_resfd = ctx->event_fd; - iocb[0]->aio_data = (guint64)((uintptr_t)cbdata); - - /* Iocb is copied to kernel internally, so it is safe to put it on stack */ - if (io_submit (ctx->io_ctx, 1, iocb) == 1) { - return len; - } - else { - if (errno == EAGAIN || errno == ENOSYS) { - /* Fall back to sync read */ - goto blocking; - } - return -1; - } - -#elif defined(HAVE_AIO_H) -#endif - } - else { - /* Blocking variant */ - goto blocking; -blocking: -#ifdef _LARGEFILE64_SOURCE - r = lseek64 (fd, offset, SEEK_SET); -#else - r = lseek (fd, offset, SEEK_SET); -#endif - if (r > 0) { - r = write (fd, buf, len); - if (r >= 0) { - cb (fd, 0, r, buf, ud); - } - else { - cb (fd, r, -1, buf, ud); - } - } - } - - return r; -} - -/** - * Close of aio operations - */ -gint -rspamd_aio_close (gint fd, struct aio_context *ctx) -{ - gint r = -1; - - if (ctx->has_aio) { -#ifdef LINUX - struct iocb iocb; - struct io_event ev; - - memset (&iocb, 0, sizeof (struct iocb)); - iocb.aio_fildes = fd; - iocb.aio_lio_opcode = IO_CMD_NOOP; - - /* Iocb is copied to kernel internally, so it is safe to put it on stack */ - r = io_cancel (ctx->io_ctx, &iocb, &ev); - close (fd); - return r; - -#elif defined(HAVE_AIO_H) -#endif - } - - r = close (fd); - - return r; -} diff --git a/src/libutil/aio_event.h b/src/libutil/aio_event.h deleted file mode 100644 index ededd96d4..000000000 --- a/src/libutil/aio_event.h +++ /dev/null @@ -1,59 +0,0 @@ -/*- - * Copyright 2016 Vsevolod Stakhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef AIO_EVENT_H_ -#define AIO_EVENT_H_ - -#include "config.h" - -/** - * AIO context - */ -struct aio_context; - -/** - * Callback for notifying - */ -typedef void (*rspamd_aio_cb) (gint fd, gint res, guint64 len, gpointer data, - gpointer ud); - -/** - * Initialize aio with specified event base - */ -struct aio_context * rspamd_aio_init (struct ev_loop *base); - -/** - * Open file for aio - */ -gint rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags); - -/** - * Asynchronous read of file - */ -gint rspamd_aio_read (gint fd, gpointer buf, guint64 len, guint64 offset, - struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud); - -/** - * Asynchronous write of file - */ -gint rspamd_aio_write (gint fd, gpointer buf, guint64 len, guint64 offset, - struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud); - -/** - * Close of aio operations - */ -gint rspamd_aio_close (gint fd, struct aio_context *ctx); - -#endif /* AIO_EVENT_H_ */ diff --git a/src/libutil/http_connection.c b/src/libutil/http_connection.c index aa964f417..9bf7456e7 100644 --- a/src/libutil/http_connection.c +++ b/src/libutil/http_connection.c @@ -25,6 +25,7 @@ #include "ottery.h" #include "keypair_private.h" #include "cryptobox.h" +#include "libutil/libev_helper.h" #include "libutil/ssl_util.h" #include "libserver/url.h" @@ -67,9 +68,8 @@ struct rspamd_http_connection_private { struct rspamd_http_header *header; struct http_parser parser; struct http_parser_settings parser_cb; - struct event ev; - struct timeval tv; - struct timeval *ptv; + struct rspamd_io_ev ev; + ev_tstamp timeout; struct rspamd_http_message *msg; struct iovec *out; guint outlen; @@ -348,9 +348,7 @@ rspamd_http_on_headers_complete (http_parser * parser) if (msg->method == HTTP_HEAD) { /* We don't care about the rest */ - if (rspamd_event_pending (&priv->ev, EV_READ)) { - event_del (&priv->ev); - } + rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev); msg->code = parser->status_code; rspamd_http_connection_ref (conn); @@ -358,7 +356,7 @@ rspamd_http_on_headers_complete (http_parser * parser) if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) { rspamd_http_context_push_keepalive (conn->priv->ctx, conn, - msg, conn->priv->ctx->ev_base); + msg, conn->priv->ctx->event_loop); rspamd_http_connection_reset (conn); } else { @@ -532,17 +530,14 @@ rspamd_http_on_headers_complete_decrypted (http_parser *parser) if (msg->method == HTTP_HEAD) { /* We don't care about the rest */ - if (rspamd_event_pending (&priv->ev, EV_READ)) { - event_del (&priv->ev); - } - + rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev); msg->code = parser->status_code; rspamd_http_connection_ref (conn); ret = conn->finish_handler (conn, msg); if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) { rspamd_http_context_push_keepalive (conn->priv->ctx, conn, - msg, conn->priv->ctx->ev_base); + msg, conn->priv->ctx->event_loop); rspamd_http_connection_reset (conn); } else { @@ -692,16 +687,13 @@ rspamd_http_on_message_complete (http_parser * parser) } if (ret == 0) { - if (rspamd_event_pending (&priv->ev, EV_READ)) { - event_del (&priv->ev); - } - + rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev); rspamd_http_connection_ref (conn); ret = conn->finish_handler (conn, priv->msg); if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) { rspamd_http_context_push_keepalive (conn->priv->ctx, conn, - priv->msg, conn->priv->ctx->ev_base); + priv->msg, conn->priv->ctx->event_loop); rspamd_http_connection_reset (conn); } else { @@ -741,11 +733,11 @@ rspamd_http_simple_client_helper (struct rspamd_http_connection *conn) if (conn->opts & RSPAMD_HTTP_CLIENT_SHARED) { rspamd_http_connection_read_message_shared (conn, conn->ud, - conn->priv->ptv); + conn->priv->timeout); } else { rspamd_http_connection_read_message (conn, conn->ud, - conn->priv->ptv); + conn->priv->timeout); } if (priv->msg) { @@ -835,7 +827,6 @@ rspamd_http_write_helper (struct rspamd_http_connection *conn) else { /* Want to write more */ priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_RESETED; - event_add (&priv->ev, priv->ptv); } return; @@ -1269,10 +1260,7 @@ rspamd_http_connection_reset (struct rspamd_http_connection *conn) if (!(priv->flags & RSPAMD_HTTP_CONN_FLAG_RESETED)) { - if (rspamd_event_pending (&priv->ev, EV_READ|EV_WRITE|EV_TIMEOUT)) { - event_del (&priv->ev); - } - + rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev); rspamd_http_parser_reset (conn); } @@ -1468,7 +1456,7 @@ rspamd_http_connection_free (struct rspamd_http_connection *conn) static void rspamd_http_connection_read_message_common (struct rspamd_http_connection *conn, - gpointer ud, struct timeval *timeout, + gpointer ud, ev_tstamp timeout, gint flags) { struct rspamd_http_connection_private *priv = conn->priv; @@ -1490,42 +1478,30 @@ rspamd_http_connection_read_message_common (struct rspamd_http_connection *conn, priv->flags |= RSPAMD_HTTP_CONN_FLAG_ENCRYPTED; } - if (timeout == NULL) { - priv->ptv = NULL; - } - else { - memmove (&priv->tv, timeout, sizeof (struct timeval)); - priv->ptv = &priv->tv; - } - + priv->timeout = timeout; priv->header = NULL; priv->buf = g_malloc0 (sizeof (*priv->buf)); REF_INIT_RETAIN (priv->buf, rspamd_http_privbuf_dtor); priv->buf->data = rspamd_fstring_sized_new (8192); priv->flags |= RSPAMD_HTTP_CONN_FLAG_NEW_HEADER; - event_set (&priv->ev, - conn->fd, - EV_READ | EV_PERSIST, - rspamd_http_event_handler, - conn); - - event_base_set (priv->ctx->ev_base, &priv->ev); + rspamd_ev_watcher_init (&priv->ev, conn->fd, EV_READ, + rspamd_http_event_handler, conn); + rspamd_ev_watcher_start (priv->ctx->event_loop, &priv->ev, priv->timeout); priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_RESETED; - event_add (&priv->ev, priv->ptv); } void rspamd_http_connection_read_message (struct rspamd_http_connection *conn, - gpointer ud, struct timeval *timeout) + gpointer ud, ev_tstamp timeout) { rspamd_http_connection_read_message_common (conn, ud, timeout, 0); } void rspamd_http_connection_read_message_shared (struct rspamd_http_connection *conn, - gpointer ud, struct timeval *timeout) + gpointer ud, ev_tstamp timeout) { rspamd_http_connection_read_message_common (conn, ud, timeout, RSPAMD_HTTP_FLAG_SHMEM); @@ -1912,7 +1888,7 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn const gchar *host, const gchar *mime_type, gpointer ud, - struct timeval *timeout, + ev_tstamp timeout, gboolean allow_shared) { struct rspamd_http_connection_private *priv = conn->priv; @@ -1930,14 +1906,7 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn conn->ud = ud; priv->msg = msg; - - if (timeout == NULL) { - priv->ptv = NULL; - } - else if (timeout != &priv->tv) { - memcpy (&priv->tv, timeout, sizeof (struct timeval)); - priv->ptv = &priv->tv; - } + priv->timeout = timeout; priv->header = NULL; priv->buf = g_malloc0 (sizeof (*priv->buf)); @@ -2224,16 +2193,12 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn msg->flags &=~ RSPAMD_HTTP_FLAG_SSL; } - if (rspamd_event_pending (&priv->ev, EV_TIMEOUT|EV_WRITE|EV_READ)) { - event_del (&priv->ev); - } + rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev); if (msg->flags & RSPAMD_HTTP_FLAG_SSL) { gpointer ssl_ctx = (msg->flags & RSPAMD_HTTP_FLAG_SSL_NOVERIFY) ? priv->ctx->ssl_ctx_noverify : priv->ctx->ssl_ctx; - event_base_set (priv->ctx->ev_base, &priv->ev); - if (!ssl_ctx) { err = g_error_new (HTTP_ERROR, errno, "ssl message requested " "with no ssl ctx"); @@ -2249,12 +2214,12 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn rspamd_ssl_connection_free (priv->ssl); } - priv->ssl = rspamd_ssl_connection_new (ssl_ctx, priv->ctx->ev_base, + priv->ssl = rspamd_ssl_connection_new (ssl_ctx, priv->ctx->event_loop, !(msg->flags & RSPAMD_HTTP_FLAG_SSL_NOVERIFY)); g_assert (priv->ssl != NULL); if (!rspamd_ssl_connect_fd (priv->ssl, conn->fd, host, &priv->ev, - priv->ptv, rspamd_http_event_handler, + priv->timeout, rspamd_http_event_handler, rspamd_http_ssl_err_handler, conn)) { err = g_error_new (HTTP_ERROR, errno, @@ -2270,10 +2235,9 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn } } else { - event_set (&priv->ev, conn->fd, EV_WRITE, rspamd_http_event_handler, conn); - event_base_set (priv->ctx->ev_base, &priv->ev); - - event_add (&priv->ev, priv->ptv); + rspamd_ev_watcher_init (&priv->ev, conn->fd, EV_WRITE, + rspamd_http_event_handler, conn); + rspamd_ev_watcher_start (priv->ctx->event_loop, &priv->ev, priv->timeout); } } @@ -2283,7 +2247,7 @@ rspamd_http_connection_write_message (struct rspamd_http_connection *conn, const gchar *host, const gchar *mime_type, gpointer ud, - struct timeval *timeout) + ev_tstamp timeout) { rspamd_http_connection_write_message_common (conn, msg, host, mime_type, ud, timeout, FALSE); @@ -2295,7 +2259,7 @@ rspamd_http_connection_write_message_shared (struct rspamd_http_connection *conn const gchar *host, const gchar *mime_type, gpointer ud, - struct timeval *timeout) + ev_tstamp timeout) { rspamd_http_connection_write_message_common (conn, msg, host, mime_type, ud, timeout, TRUE); diff --git a/src/libutil/http_connection.h b/src/libutil/http_connection.h index b4b401ecb..fc1303446 100644 --- a/src/libutil/http_connection.h +++ b/src/libutil/http_connection.h @@ -221,12 +221,12 @@ gboolean rspamd_http_connection_is_encrypted (struct rspamd_http_connection *con void rspamd_http_connection_read_message ( struct rspamd_http_connection *conn, gpointer ud, - struct timeval *timeout); + ev_tstamp timeout); void rspamd_http_connection_read_message_shared ( struct rspamd_http_connection *conn, gpointer ud, - struct timeval *timeout); + ev_tstamp timeout); /** * Send reply using initialised connection @@ -241,7 +241,7 @@ void rspamd_http_connection_write_message ( const gchar *host, const gchar *mime_type, gpointer ud, - struct timeval *timeout); + ev_tstamp timeout); void rspamd_http_connection_write_message_shared ( struct rspamd_http_connection *conn, @@ -249,7 +249,7 @@ void rspamd_http_connection_write_message_shared ( const gchar *host, const gchar *mime_type, gpointer ud, - struct timeval *timeout); + ev_tstamp timeout); /** * Free connection structure diff --git a/src/libutil/http_context.c b/src/libutil/http_context.c index b9add9ac9..f5d766d88 100644 --- a/src/libutil/http_context.c +++ b/src/libutil/http_context.c @@ -114,7 +114,7 @@ rspamd_http_context_new_default (struct rspamd_config *cfg, ctx->ssl_ctx_noverify = rspamd_init_ssl_ctx_noverify (); } - ctx->ev_base = ev_base; + ctx->event_loop = ev_base; ctx->keep_alive_hash = kh_init (rspamd_keep_alive_hash); @@ -186,7 +186,7 @@ rspamd_http_context_init (struct rspamd_http_context *ctx) ctx->server_kp_cache = rspamd_keypair_cache_new (ctx->config.kp_cache_size_server); } - if (ctx->config.client_key_rotate_time > 0 && ctx->ev_base) { + if (ctx->config.client_key_rotate_time > 0 && ctx->event_loop) { struct timeval tv; double jittered = rspamd_time_jitter (ctx->config.client_key_rotate_time, 0); @@ -194,7 +194,7 @@ rspamd_http_context_init (struct rspamd_http_context *ctx) double_to_tv (jittered, &tv); event_set (&ctx->client_rotate_ev, -1, EV_TIMEOUT, rspamd_http_context_client_rotate_ev, ctx); - event_base_set (ctx->ev_base, &ctx->client_rotate_ev); + event_base_set (ctx->event_loop, &ctx->client_rotate_ev); event_add (&ctx->client_rotate_ev, &tv); } diff --git a/src/libutil/http_private.h b/src/libutil/http_private.h index e29152c77..e09dbef40 100644 --- a/src/libutil/http_private.h +++ b/src/libutil/http_private.h @@ -100,8 +100,8 @@ struct rspamd_http_context { struct upstream_list *http_proxies; gpointer ssl_ctx; gpointer ssl_ctx_noverify; - struct ev_loop *ev_base; - struct event client_rotate_ev; + struct ev_loop *event_loop; + ev_periodic client_rotate_ev; khash_t (rspamd_keep_alive_hash) *keep_alive_hash; }; diff --git a/src/libutil/http_router.c b/src/libutil/http_router.c index ec0eeb7b4..8d5913f0d 100644 --- a/src/libutil/http_router.c +++ b/src/libutil/http_router.c @@ -92,7 +92,7 @@ rspamd_http_router_error_handler (struct rspamd_http_connection *conn, NULL, "text/plain", entry, - entry->rt->ptv); + entry->rt->timeout); entry->is_reply = TRUE; } } @@ -210,7 +210,7 @@ rspamd_http_router_try_file (struct rspamd_http_connection_entry *entry, msg_debug ("requested file %s", realbuf); rspamd_http_connection_write_message (entry->conn, reply_msg, NULL, rspamd_http_router_detect_ct (realbuf), entry, - entry->rt->ptv); + entry->rt->timeout); return TRUE; } @@ -235,7 +235,7 @@ rspamd_http_router_send_error (GError *err, NULL, "text/plain", entry, - entry->rt->ptv); + entry->rt->timeout); } @@ -369,33 +369,25 @@ rspamd_http_router_finish_handler (struct rspamd_http_connection *conn, struct rspamd_http_connection_router * rspamd_http_router_new (rspamd_http_router_error_handler_t eh, rspamd_http_router_finish_handler_t fh, - struct timeval *timeout, + ev_tstamp timeout, const char *default_fs_path, struct rspamd_http_context *ctx) { - struct rspamd_http_connection_router * new; + struct rspamd_http_connection_router *nrouter; struct stat st; - new = g_malloc0 (sizeof (struct rspamd_http_connection_router)); - new->paths = g_hash_table_new_full (rspamd_ftok_icase_hash, + nrouter = g_malloc0 (sizeof (struct rspamd_http_connection_router)); + nrouter->paths = g_hash_table_new_full (rspamd_ftok_icase_hash, rspamd_ftok_icase_equal, rspamd_fstring_mapped_ftok_free, NULL); - new->regexps = g_ptr_array_new (); - new->conns = NULL; - new->error_handler = eh; - new->finish_handler = fh; - new->response_headers = g_hash_table_new_full (rspamd_strcase_hash, + nrouter->regexps = g_ptr_array_new (); + nrouter->conns = NULL; + nrouter->error_handler = eh; + nrouter->finish_handler = fh; + nrouter->response_headers = g_hash_table_new_full (rspamd_strcase_hash, rspamd_strcase_equal, g_free, g_free); - new->ev_base = ctx->ev_base; - - if (timeout) { - new->tv = *timeout; - new->ptv = &new->tv; - } - else { - new->ptv = NULL; - } - - new->default_fs_path = NULL; + nrouter->event_loop = ctx->event_loop; + nrouter->timeout = timeout; + nrouter->default_fs_path = NULL; if (default_fs_path != NULL) { if (stat (default_fs_path, &st) == -1) { @@ -406,14 +398,14 @@ rspamd_http_router_new (rspamd_http_router_error_handler_t eh, msg_err ("path %s is not a directory", default_fs_path); } else { - new->default_fs_path = realpath (default_fs_path, NULL); + nrouter->default_fs_path = realpath (default_fs_path, NULL); } } } - new->ctx = ctx; + nrouter->ctx = ctx; - return new; + return nrouter; } void @@ -517,7 +509,7 @@ rspamd_http_router_handle_socket (struct rspamd_http_connection_router *router, rspamd_http_connection_set_key (conn->conn, router->key); } - rspamd_http_connection_read_message (conn->conn, conn, router->ptv); + rspamd_http_connection_read_message (conn->conn, conn, router->timeout); DL_PREPEND (router->conns, conn); } diff --git a/src/libutil/http_router.h b/src/libutil/http_router.h index 03886707a..b946067b7 100644 --- a/src/libutil/http_router.h +++ b/src/libutil/http_router.h @@ -44,9 +44,8 @@ struct rspamd_http_connection_router { GHashTable *paths; GHashTable *response_headers; GPtrArray *regexps; - struct timeval tv; - struct timeval *ptv; - struct ev_loop *ev_base; + ev_tstamp timeout; + struct ev_loop *event_loop; struct rspamd_http_context *ctx; gchar *default_fs_path; rspamd_http_router_handler_t unknown_method_handler; @@ -66,7 +65,7 @@ struct rspamd_http_connection_router { struct rspamd_http_connection_router * rspamd_http_router_new ( rspamd_http_router_error_handler_t eh, rspamd_http_router_finish_handler_t fh, - struct timeval *timeout, + ev_tstamp timeout, const char *default_fs_path, struct rspamd_http_context *ctx); diff --git a/src/libutil/libev_helper.c b/src/libutil/libev_helper.c index a0a0c509b..ac0f1fc0d 100644 --- a/src/libutil/libev_helper.c +++ b/src/libutil/libev_helper.c @@ -65,16 +65,26 @@ rspamd_ev_watcher_start (struct ev_loop *loop, ev_tstamp timeout) { ev->last_activity = ev_now (EV_A); - ev_timer_set (&ev->tm, timeout, 0.0); ev_io_start (EV_A_ &ev->io); - ev_timer_start (EV_A_ &ev->tm); + + if (timeout > 0) { + ev->timeout = timeout; + ev_timer_set (&ev->tm, timeout, 0.0); + ev_timer_start (EV_A_ &ev->tm); + } } void rspamd_ev_watcher_stop (struct ev_loop *loop, struct rspamd_io_ev *ev) { - ev_io_stop (EV_A_ &ev->io); + if (ev_is_pending (&ev->io) || ev_is_active (&ev->io)) { + ev_io_stop (EV_A_ &ev->io); + } + + if (ev->timeout > 0) { + ev_timer_stop (EV_A_ &ev->tm); + } } void diff --git a/src/libutil/map_private.h b/src/libutil/map_private.h index f7e0649e5..8b45881b6 100644 --- a/src/libutil/map_private.h +++ b/src/libutil/map_private.h @@ -61,7 +61,7 @@ struct file_map_data { struct http_map_data; struct rspamd_http_map_cached_cbdata { - struct event timeout; + ev_periodic timeout; struct rspamd_storage_shmem *shm; struct rspamd_map *map; struct http_map_data *data; diff --git a/src/libutil/ssl_util.c b/src/libutil/ssl_util.c index b417efb74..0fe6cc625 100644 --- a/src/libutil/ssl_util.c +++ b/src/libutil/ssl_util.c @@ -45,9 +45,8 @@ struct rspamd_ssl_connection { gboolean verify_peer; SSL *ssl; gchar *hostname; - struct event *ev; - struct ev_loop *ev_base; - struct timeval *tv; + struct rspamd_io_ev *ev; + struct ev_loop *event_loop; rspamd_ssl_handler_t handler; rspamd_ssl_error_handler_t err_handler; gpointer handler_data; @@ -407,7 +406,7 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud) gint ret; GError *err = NULL; - if (what == EV_TIMEOUT) { + if (what == EV_TIMER) { c->shut = ssl_shut_unclean; } @@ -417,7 +416,7 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud) ret = SSL_connect (c->ssl); if (ret == 1) { - event_del (c->ev); + rspamd_ev_watcher_stop (c->event_loop, c->ev); /* Verify certificate */ if ((!c->verify_peer) || rspamd_ssl_peer_verify (c)) { c->state = ssl_conn_connected; @@ -443,30 +442,18 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud) return; } - event_del (c->ev); - event_set (c->ev, fd, what, rspamd_ssl_event_handler, c); - event_base_set (c->ev_base, c->ev); - event_add (c->ev, c->tv); + rspamd_ev_watcher_reschedule (c->event_loop, c->ev, what); + } break; case ssl_next_read: - event_del (c->ev); - /* Restore handler */ - event_set (c->ev, c->fd, EV_READ|EV_PERSIST, - c->handler, c->handler_data); - event_base_set (c->ev_base, c->ev); - event_add (c->ev, c->tv); + rspamd_ev_watcher_reschedule (c->event_loop, c->ev, EV_READ); c->state = ssl_conn_connected; c->handler (fd, EV_READ, c->handler_data); break; case ssl_next_write: case ssl_conn_connected: - event_del (c->ev); - /* Restore handler */ - event_set (c->ev, c->fd, EV_WRITE, - c->handler, c->handler_data); - event_base_set (c->ev_base, c->ev); - event_add (c->ev, c->tv); + rspamd_ev_watcher_reschedule (c->event_loop, c->ev, EV_WRITE); c->state = ssl_conn_connected; c->handler (fd, EV_WRITE, c->handler_data); break; @@ -488,7 +475,7 @@ rspamd_ssl_connection_new (gpointer ssl_ctx, struct ev_loop *ev_base, g_assert (ssl_ctx != NULL); c = g_malloc0 (sizeof (*c)); c->ssl = SSL_new (ssl_ctx); - c->ev_base = ev_base; + c->event_loop = ev_base; c->verify_peer = verify_peer; return c; @@ -497,7 +484,7 @@ rspamd_ssl_connection_new (gpointer ssl_ctx, struct ev_loop *ev_base, gboolean rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd, - const gchar *hostname, struct event *ev, struct timeval *tv, + const gchar *hostname, struct rspamd_io_ev *ev, ev_tstamp timeout, rspamd_ssl_handler_t handler, rspamd_ssl_error_handler_t err_handler, gpointer handler_data) { @@ -534,17 +521,9 @@ rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd, if (ret == 1) { conn->state = ssl_conn_connected; - if (rspamd_event_pending (ev, EV_TIMEOUT|EV_WRITE|EV_READ)) { - event_del (ev); - } - - event_set (ev, fd, EV_WRITE, rspamd_ssl_event_handler, conn); - - if (conn->ev_base) { - event_base_set (conn->ev_base, ev); - } - - event_add (ev, tv); + rspamd_ev_watcher_stop (conn->event_loop, ev); + rspamd_ev_watcher_init (ev, fd, EV_WRITE, rspamd_ssl_event_handler, conn); + rspamd_ev_watcher_start (conn->event_loop, ev, timeout); } else { ret = SSL_get_error (conn->ssl, ret); @@ -561,13 +540,10 @@ rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd, return FALSE; } - if (rspamd_event_pending (ev, EV_TIMEOUT|EV_WRITE|EV_READ)) { - event_del (ev); - } - - event_set (ev, fd, what, rspamd_ssl_event_handler, conn); - event_base_set (conn->ev_base, ev); - event_add (ev, tv); + rspamd_ev_watcher_stop (conn->event_loop, ev); + rspamd_ev_watcher_init (ev, fd, EV_WRITE|EV_READ, + rspamd_ssl_event_handler, conn); + rspamd_ev_watcher_start (conn->event_loop, ev, timeout); } return TRUE; @@ -638,13 +614,8 @@ rspamd_ssl_read (struct rspamd_ssl_connection *conn, gpointer buf, return -1; } - event_del (conn->ev); - event_set (conn->ev, conn->fd, what, rspamd_ssl_event_handler, conn); - event_base_set (conn->ev_base, conn->ev); - event_add (conn->ev, conn->tv); - + rspamd_ev_watcher_reschedule (conn->event_loop, conn->ev, what); errno = EAGAIN; - } return -1; @@ -713,11 +684,7 @@ rspamd_ssl_write (struct rspamd_ssl_connection *conn, gconstpointer buf, return -1; } - event_del (conn->ev); - event_set (conn->ev, conn->fd, what, rspamd_ssl_event_handler, conn); - event_base_set (conn->ev_base, conn->ev); - event_add (conn->ev, conn->tv); - + rspamd_ev_watcher_reschedule (conn->event_loop, conn->ev, what); errno = EAGAIN; } diff --git a/src/libutil/ssl_util.h b/src/libutil/ssl_util.h index 3bae1edc0..f7f1652de 100644 --- a/src/libutil/ssl_util.h +++ b/src/libutil/ssl_util.h @@ -18,6 +18,7 @@ #include "config.h" #include "libutil/addr.h" +#include "libutil/libev_helper.h" struct rspamd_ssl_connection; @@ -44,7 +45,7 @@ struct rspamd_ssl_connection * rspamd_ssl_connection_new (gpointer ssl_ctx, * @return TRUE if a session has been connected */ gboolean rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd, - const gchar *hostname, struct event *ev, struct timeval *tv, + const gchar *hostname, struct rspamd_io_ev *ev, ev_tstamp timeout, rspamd_ssl_handler_t handler, rspamd_ssl_error_handler_t err_handler, gpointer handler_data); diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index ed5bc9132..2c91869d6 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -2912,7 +2912,7 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry, rspamd_http_connection_ref (entry->conn); event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s); - event_base_set (entry->rt->ev_base, &s->ev); + event_base_set (entry->rt->event_loop, &s->ev); event_add (&s->ev, NULL); evtimer_set (&s->timev, fuzzy_controller_timer_callback, @@ -2946,7 +2946,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent, /* Prepare task */ task = rspamd_task_new (session->wrk, session->cfg, NULL, - session->lang_det, conn_ent->rt->ev_base); + session->lang_det, conn_ent->rt->event_loop); task->cfg = ctx->cfg; saved = rspamd_mempool_alloc0 (session->pool, sizeof (gint)); err = rspamd_mempool_alloc0 (session->pool, sizeof (GError *)); diff --git a/src/rspamd.h b/src/rspamd.h index 375cba13f..4deb2f933 100644 --- a/src/rspamd.h +++ b/src/rspamd.h @@ -121,7 +121,7 @@ struct rspamd_worker_signal_cb { struct rspamd_worker_signal_handler { gint signo; gboolean enabled; - struct event ev; + ev_signal ev_sig; struct ev_loop *base; struct rspamd_worker *worker; struct rspamd_worker_signal_cb *cb;