]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Rework HTTP IO
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sun, 16 Jun 2019 17:32:55 +0000 (18:32 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 22 Jun 2019 09:57:29 +0000 (10:57 +0100)
15 files changed:
src/libutil/CMakeLists.txt
src/libutil/aio_event.c [deleted file]
src/libutil/aio_event.h [deleted file]
src/libutil/http_connection.c
src/libutil/http_connection.h
src/libutil/http_context.c
src/libutil/http_private.h
src/libutil/http_router.c
src/libutil/http_router.h
src/libutil/libev_helper.c
src/libutil/map_private.h
src/libutil/ssl_util.c
src/libutil/ssl_util.h
src/plugins/fuzzy_check.c
src/rspamd.h

index fd29c551214d9fd8cd92ce3c3e38140d8aa8a8da..5a94a732c5463c2dae5bf46d3af1bf4fa304d851 100644 (file)
@@ -2,7 +2,6 @@
 SET(LIBRSPAMDUTILSRC
                                ${CMAKE_CURRENT_SOURCE_DIR}/addr.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/libev_helper.c
-                               ${CMAKE_CURRENT_SOURCE_DIR}/aio_event.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/bloom.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/expression.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/fstring.c
diff --git a/src/libutil/aio_event.c b/src/libutil/aio_event.c
deleted file mode 100644 (file)
index d0c8d3f..0000000
+++ /dev/null
@@ -1,508 +0,0 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "config.h"
-#include "contrib/libev/ev.h"
-#include "aio_event.h"
-#include "rspamd.h"
-#include "unix-std.h"
-
-#ifdef HAVE_SYS_EVENTFD_H
-#include <sys/eventfd.h>
-#endif
-
-#ifdef HAVE_AIO_H
-#include <aio.h>
-#endif
-
-/* Linux syscall numbers */
-#if defined(__i386__)
-# define SYS_io_setup      245
-# define SYS_io_destroy    246
-# define SYS_io_getevents  247
-# define SYS_io_submit     248
-# define SYS_io_cancel     249
-#elif defined(__x86_64__)
-# define SYS_io_setup       206
-# define SYS_io_destroy     207
-# define SYS_io_getevents   208
-# define SYS_io_submit      209
-# define SYS_io_cancel      210
-#else
-# warning \
-       "aio is not supported on this platform, please contact author for details"
-# define SYS_io_setup       0
-# define SYS_io_destroy     0
-# define SYS_io_getevents   0
-# define SYS_io_submit      0
-# define SYS_io_cancel      0
-#endif
-
-#define SYS_eventfd       323
-#define MAX_AIO_EV        64
-
-struct io_cbdata {
-       gint fd;
-       rspamd_aio_cb cb;
-       guint64 len;
-       gpointer buf;
-       gpointer io_buf;
-       gpointer ud;
-};
-
-#ifdef LINUX
-
-/* Linux specific mappings and utilities to avoid using of libaio */
-
-typedef unsigned long aio_context_t;
-
-typedef enum io_iocb_cmd {
-       IO_CMD_PREAD = 0,
-       IO_CMD_PWRITE = 1,
-
-       IO_CMD_FSYNC = 2,
-       IO_CMD_FDSYNC = 3,
-
-       IO_CMD_POLL = 5,
-       IO_CMD_NOOP = 6,
-} io_iocb_cmd_t;
-
-#if defined(__LITTLE_ENDIAN)
-#define PADDED(x,y)     x, y
-#elif defined(__BIG_ENDIAN)
-#define PADDED(x,y)     y, x
-#else
-#error edit for your odd byteorder.
-#endif
-
-/*
- * we always use a 64bit off_t when communicating
- * with userland.  its up to libraries to do the
- * proper padding and aio_error abstraction
- */
-
-struct iocb {
-       /* these are internal to the kernel/libc. */
-       guint64 aio_data; /* data to be returned in event's data */
-       guint32 PADDED (aio_key, aio_reserved1);
-       /* the kernel sets aio_key to the req # */
-
-       /* common fields */
-       guint16 aio_lio_opcode; /* see IOCB_CMD_ above */
-       gint16 aio_reqprio;
-       guint32 aio_fildes;
-
-       guint64 aio_buf;
-       guint64 aio_nbytes;
-       gint64 aio_offset;
-
-       /* extra parameters */
-       guint64 aio_reserved2; /* TODO: use this for a (struct sigevent *) */
-
-       /* flags for the "struct iocb" */
-       guint32 aio_flags;
-
-       /*
-        * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an
-        * eventfd to signal AIO readiness to
-        */
-       guint32 aio_resfd;
-};
-
-struct io_event {
-       guint64 data;   /* the data field from the iocb */
-       guint64 obj;    /* what iocb this event came from */
-       gint64 res;     /* result code for this event */
-       gint64 res2;    /* secondary result */
-};
-
-/* Linux specific io calls */
-static int
-io_setup (guint nr_reqs, aio_context_t *ctx)
-{
-       return syscall (SYS_io_setup, nr_reqs, ctx);
-}
-
-static int
-io_destroy (aio_context_t ctx)
-{
-       return syscall (SYS_io_destroy, ctx);
-}
-
-static int
-io_getevents (aio_context_t ctx,
-       long min_nr,
-       long nr,
-       struct io_event *events,
-       struct timespec *tmo)
-{
-       return syscall (SYS_io_getevents, ctx, min_nr, nr, events, tmo);
-}
-
-static int
-io_submit (aio_context_t ctx, long n, struct iocb **paiocb)
-{
-       return syscall (SYS_io_submit, ctx, n, paiocb);
-}
-
-static int
-io_cancel (aio_context_t ctx, struct iocb *iocb, struct io_event *result)
-{
-       return syscall (SYS_io_cancel, ctx, iocb, result);
-}
-
-# ifndef HAVE_SYS_EVENTFD_H
-static int
-eventfd (guint initval, guint flags)
-{
-       return syscall (SYS_eventfd, initval);
-}
-# endif
-
-#endif
-
-/**
- * AIO context
- */
-struct aio_context {
-       struct ev_loop *base;
-       gboolean has_aio;       /**< Whether we have aio support on a system */
-#ifdef LINUX
-       /* Eventfd variant */
-       gint event_fd;
-       struct event eventfd_ev;
-       aio_context_t io_ctx;
-#elif defined(HAVE_AIO_H)
-       /* POSIX aio */
-       struct event rtsigs[128];
-#endif
-};
-
-#ifdef LINUX
-/* Eventfd read callback */
-static void
-rspamd_eventfdcb (gint fd, gshort what, gpointer ud)
-{
-       struct aio_context *ctx = ud;
-       guint64 ready;
-       gint done, i;
-       struct io_event event[32];
-       struct timespec ts;
-       struct io_cbdata *ev_data;
-
-       /* Eventfd returns number of events ready got from kernel */
-       if (read (fd, &ready, 8) != 8) {
-               if (errno == EAGAIN) {
-                       return;
-               }
-               msg_err ("eventfd read returned error: %s", strerror (errno));
-       }
-
-       ts.tv_sec = 0;
-       ts.tv_nsec = 0;
-
-       while (ready) {
-               /* Get events ready */
-               done = io_getevents (ctx->io_ctx, 1, 32, event, &ts);
-
-               if (done > 0) {
-                       ready -= done;
-
-                       for (i = 0; i < done; i++) {
-                               ev_data = (struct io_cbdata *) (uintptr_t) event[i].data;
-                               /* Call this callback */
-                               ev_data->cb (ev_data->fd,
-                                       event[i].res,
-                                       ev_data->len,
-                                       ev_data->buf,
-                                       ev_data->ud);
-                               if (ev_data->io_buf) {
-                                       free (ev_data->io_buf);
-                               }
-                               g_free (ev_data);
-                       }
-               }
-               else if (done == 0) {
-                       /* No more events are ready */
-                       return;
-               }
-               else {
-                       msg_err ("io_getevents failed: %s", strerror (errno));
-                       return;
-               }
-       }
-}
-
-#endif
-
-/**
- * Initialize aio with specified event base
- */
-struct aio_context *
-rspamd_aio_init (struct ev_loop *base)
-{
-       struct aio_context *new;
-
-       /* First of all we need to detect which type of aio we can try to use */
-       new = g_malloc0 (sizeof (struct aio_context));
-       new->base = base;
-
-#ifdef LINUX
-       /* On linux we are trying to use io (3) and eventfd for notifying */
-       new->event_fd = eventfd (0, 0);
-       if (new->event_fd == -1) {
-               msg_err ("eventfd failed: %s", strerror (errno));
-       }
-       else {
-               /* Set this socket non-blocking */
-               if (rspamd_socket_nonblocking (new->event_fd) == -1) {
-                       msg_err ("non blocking for eventfd failed: %s", strerror (errno));
-                       close (new->event_fd);
-               }
-               else {
-                       event_set (&new->eventfd_ev,
-                               new->event_fd,
-                               EV_READ | EV_PERSIST,
-                               rspamd_eventfdcb,
-                               new);
-                       event_base_set (new->base, &new->eventfd_ev);
-                       event_add (&new->eventfd_ev, NULL);
-                       if (io_setup (MAX_AIO_EV, &new->io_ctx) == -1) {
-                               msg_err ("io_setup failed: %s", strerror (errno));
-                               close (new->event_fd);
-                       }
-                       else {
-                               new->has_aio = TRUE;
-                       }
-               }
-       }
-#elif defined(HAVE_AIO_H)
-       /* TODO: implement this */
-#endif
-
-       return new;
-}
-
-/**
- * Open file for aio
- */
-gint
-rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags)
-{
-       gint fd = -1;
-       /* Fallback */
-       if (!ctx->has_aio) {
-               return open (path, flags);
-       }
-#ifdef LINUX
-
-       fd = open (path, flags | O_DIRECT);
-
-       return fd;
-#elif defined(HAVE_AIO_H)
-       fd = open (path, flags);
-#endif
-
-       return fd;
-}
-
-/**
- * Asynchronous read of file
- */
-gint
-rspamd_aio_read (gint fd,
-       gpointer buf,
-       guint64 len,
-       guint64 offset,
-       struct aio_context *ctx,
-       rspamd_aio_cb cb,
-       gpointer ud)
-{
-       gint r = -1;
-
-       if (ctx->has_aio) {
-#ifdef LINUX
-               struct iocb *iocb[1];
-               struct io_cbdata *cbdata;
-
-               cbdata = g_malloc0 (sizeof (struct io_cbdata));
-               cbdata->cb = cb;
-               cbdata->buf = buf;
-               cbdata->len = len;
-               cbdata->ud = ud;
-               cbdata->fd = fd;
-               cbdata->io_buf = NULL;
-
-               iocb[0] = alloca (sizeof (struct iocb));
-               memset (iocb[0], 0, sizeof (struct iocb));
-               iocb[0]->aio_fildes = fd;
-               iocb[0]->aio_lio_opcode = IO_CMD_PREAD;
-               iocb[0]->aio_reqprio = 0;
-               iocb[0]->aio_buf = (guint64)((uintptr_t)buf);
-               iocb[0]->aio_nbytes = len;
-               iocb[0]->aio_offset = offset;
-               iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
-               iocb[0]->aio_resfd = ctx->event_fd;
-               iocb[0]->aio_data = (guint64)((uintptr_t)cbdata);
-
-               /* Iocb is copied to kernel internally, so it is safe to put it on stack */
-               if (io_submit (ctx->io_ctx, 1, iocb) == 1) {
-                       return len;
-               }
-               else {
-                       if (errno == EAGAIN || errno == ENOSYS) {
-                               /* Fall back to sync read */
-                               goto blocking;
-                       }
-                       return -1;
-               }
-
-#elif defined(HAVE_AIO_H)
-#endif
-       }
-       else {
-               /* Blocking variant */
-               goto blocking;
-blocking:
-#ifdef _LARGEFILE64_SOURCE
-               r = lseek64 (fd, offset, SEEK_SET);
-#else
-               r = lseek (fd, offset, SEEK_SET);
-#endif
-               if (r > 0) {
-                       r = read (fd, buf, len);
-                       if (r >= 0) {
-                               cb (fd, 0, r, buf, ud);
-                       }
-                       else {
-                               cb (fd, r, -1, buf, ud);
-                       }
-               }
-       }
-
-       return r;
-}
-
-/**
- * Asynchronous write of file
- */
-gint
-rspamd_aio_write (gint fd,
-       gpointer buf,
-       guint64 len,
-       guint64 offset,
-       struct aio_context *ctx,
-       rspamd_aio_cb cb,
-       gpointer ud)
-{
-       gint r = -1;
-
-       if (ctx->has_aio) {
-#ifdef LINUX
-               struct iocb *iocb[1];
-               struct io_cbdata *cbdata;
-
-               cbdata = g_malloc0 (sizeof (struct io_cbdata));
-               cbdata->cb = cb;
-               cbdata->buf = buf;
-               cbdata->len = len;
-               cbdata->ud = ud;
-               cbdata->fd = fd;
-               /* We need to align pointer on boundary of 512 bytes here */
-               if (posix_memalign (&cbdata->io_buf, 512, len) != 0) {
-                       return -1;
-               }
-               memcpy (cbdata->io_buf, buf, len);
-
-               iocb[0] = alloca (sizeof (struct iocb));
-               memset (iocb[0], 0, sizeof (struct iocb));
-               iocb[0]->aio_fildes = fd;
-               iocb[0]->aio_lio_opcode = IO_CMD_PWRITE;
-               iocb[0]->aio_reqprio = 0;
-               iocb[0]->aio_buf = (guint64)((uintptr_t)cbdata->io_buf);
-               iocb[0]->aio_nbytes = len;
-               iocb[0]->aio_offset = offset;
-               iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
-               iocb[0]->aio_resfd = ctx->event_fd;
-               iocb[0]->aio_data = (guint64)((uintptr_t)cbdata);
-
-               /* Iocb is copied to kernel internally, so it is safe to put it on stack */
-               if (io_submit (ctx->io_ctx, 1, iocb) == 1) {
-                       return len;
-               }
-               else {
-                       if (errno == EAGAIN || errno == ENOSYS) {
-                               /* Fall back to sync read */
-                               goto blocking;
-                       }
-                       return -1;
-               }
-
-#elif defined(HAVE_AIO_H)
-#endif
-       }
-       else {
-               /* Blocking variant */
-               goto blocking;
-blocking:
-#ifdef _LARGEFILE64_SOURCE
-               r = lseek64 (fd, offset, SEEK_SET);
-#else
-               r = lseek (fd, offset, SEEK_SET);
-#endif
-               if (r > 0) {
-                       r = write (fd, buf, len);
-                       if (r >= 0) {
-                               cb (fd, 0, r, buf, ud);
-                       }
-                       else {
-                               cb (fd, r, -1, buf, ud);
-                       }
-               }
-       }
-
-       return r;
-}
-
-/**
- * Close of aio operations
- */
-gint
-rspamd_aio_close (gint fd, struct aio_context *ctx)
-{
-       gint r = -1;
-
-       if (ctx->has_aio) {
-#ifdef LINUX
-               struct iocb iocb;
-               struct io_event ev;
-
-               memset (&iocb, 0, sizeof (struct iocb));
-               iocb.aio_fildes = fd;
-               iocb.aio_lio_opcode = IO_CMD_NOOP;
-
-               /* Iocb is copied to kernel internally, so it is safe to put it on stack */
-               r = io_cancel (ctx->io_ctx, &iocb, &ev);
-               close (fd);
-               return r;
-
-#elif defined(HAVE_AIO_H)
-#endif
-       }
-
-       r = close (fd);
-
-       return r;
-}
diff --git a/src/libutil/aio_event.h b/src/libutil/aio_event.h
deleted file mode 100644 (file)
index ededd96..0000000
+++ /dev/null
@@ -1,59 +0,0 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef AIO_EVENT_H_
-#define AIO_EVENT_H_
-
-#include "config.h"
-
-/**
- * AIO context
- */
-struct aio_context;
-
-/**
- * Callback for notifying
- */
-typedef void (*rspamd_aio_cb) (gint fd, gint res, guint64 len, gpointer data,
-       gpointer ud);
-
-/**
- * Initialize aio with specified event base
- */
-struct aio_context * rspamd_aio_init (struct ev_loop *base);
-
-/**
- * Open file for aio
- */
-gint rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags);
-
-/**
- * Asynchronous read of file
- */
-gint rspamd_aio_read (gint fd, gpointer buf, guint64 len, guint64 offset,
-       struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud);
-
-/**
- * Asynchronous write of file
- */
-gint rspamd_aio_write (gint fd, gpointer buf, guint64 len, guint64 offset,
-       struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud);
-
-/**
- * Close of aio operations
- */
-gint rspamd_aio_close (gint fd, struct aio_context *ctx);
-
-#endif /* AIO_EVENT_H_ */
index aa964f417ab112be4a33ba3f65b5cc5d66cd9087..9bf7456e7f461ed60f625807d00d40fc4154dfc4 100644 (file)
@@ -25,6 +25,7 @@
 #include "ottery.h"
 #include "keypair_private.h"
 #include "cryptobox.h"
+#include "libutil/libev_helper.h"
 #include "libutil/ssl_util.h"
 #include "libserver/url.h"
 
@@ -67,9 +68,8 @@ struct rspamd_http_connection_private {
        struct rspamd_http_header *header;
        struct http_parser parser;
        struct http_parser_settings parser_cb;
-       struct event ev;
-       struct timeval tv;
-       struct timeval *ptv;
+       struct rspamd_io_ev ev;
+       ev_tstamp timeout;
        struct rspamd_http_message *msg;
        struct iovec *out;
        guint outlen;
@@ -348,9 +348,7 @@ rspamd_http_on_headers_complete (http_parser * parser)
 
        if (msg->method == HTTP_HEAD) {
                /* We don't care about the rest */
-               if (rspamd_event_pending (&priv->ev, EV_READ)) {
-                       event_del (&priv->ev);
-               }
+               rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
 
                msg->code = parser->status_code;
                rspamd_http_connection_ref (conn);
@@ -358,7 +356,7 @@ rspamd_http_on_headers_complete (http_parser * parser)
 
                if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) {
                        rspamd_http_context_push_keepalive (conn->priv->ctx, conn,
-                                       msg, conn->priv->ctx->ev_base);
+                                       msg, conn->priv->ctx->event_loop);
                        rspamd_http_connection_reset (conn);
                }
                else {
@@ -532,17 +530,14 @@ rspamd_http_on_headers_complete_decrypted (http_parser *parser)
 
        if (msg->method == HTTP_HEAD) {
                /* We don't care about the rest */
-               if (rspamd_event_pending (&priv->ev, EV_READ)) {
-                       event_del (&priv->ev);
-               }
-
+               rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
                msg->code = parser->status_code;
                rspamd_http_connection_ref (conn);
                ret = conn->finish_handler (conn, msg);
 
                if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) {
                        rspamd_http_context_push_keepalive (conn->priv->ctx, conn,
-                                       msg, conn->priv->ctx->ev_base);
+                                       msg, conn->priv->ctx->event_loop);
                        rspamd_http_connection_reset (conn);
                }
                else {
@@ -692,16 +687,13 @@ rspamd_http_on_message_complete (http_parser * parser)
        }
 
        if (ret == 0) {
-               if (rspamd_event_pending (&priv->ev, EV_READ)) {
-                       event_del (&priv->ev);
-               }
-
+               rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
                rspamd_http_connection_ref (conn);
                ret = conn->finish_handler (conn, priv->msg);
 
                if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) {
                        rspamd_http_context_push_keepalive (conn->priv->ctx, conn,
-                                       priv->msg, conn->priv->ctx->ev_base);
+                                       priv->msg, conn->priv->ctx->event_loop);
                        rspamd_http_connection_reset (conn);
                }
                else {
@@ -741,11 +733,11 @@ rspamd_http_simple_client_helper (struct rspamd_http_connection *conn)
 
        if (conn->opts & RSPAMD_HTTP_CLIENT_SHARED) {
                rspamd_http_connection_read_message_shared (conn, conn->ud,
-                               conn->priv->ptv);
+                               conn->priv->timeout);
        }
        else {
                rspamd_http_connection_read_message (conn, conn->ud,
-                               conn->priv->ptv);
+                               conn->priv->timeout);
        }
 
        if (priv->msg) {
@@ -835,7 +827,6 @@ rspamd_http_write_helper (struct rspamd_http_connection *conn)
        else {
                /* Want to write more */
                priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_RESETED;
-               event_add (&priv->ev, priv->ptv);
        }
 
        return;
@@ -1269,10 +1260,7 @@ rspamd_http_connection_reset (struct rspamd_http_connection *conn)
 
        if (!(priv->flags & RSPAMD_HTTP_CONN_FLAG_RESETED)) {
 
-               if (rspamd_event_pending (&priv->ev, EV_READ|EV_WRITE|EV_TIMEOUT)) {
-                       event_del (&priv->ev);
-               }
-
+               rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
                rspamd_http_parser_reset (conn);
        }
 
@@ -1468,7 +1456,7 @@ rspamd_http_connection_free (struct rspamd_http_connection *conn)
 
 static void
 rspamd_http_connection_read_message_common (struct rspamd_http_connection *conn,
-               gpointer ud, struct timeval *timeout,
+               gpointer ud, ev_tstamp timeout,
                gint flags)
 {
        struct rspamd_http_connection_private *priv = conn->priv;
@@ -1490,42 +1478,30 @@ rspamd_http_connection_read_message_common (struct rspamd_http_connection *conn,
                priv->flags |= RSPAMD_HTTP_CONN_FLAG_ENCRYPTED;
        }
 
-       if (timeout == NULL) {
-               priv->ptv = NULL;
-       }
-       else {
-               memmove (&priv->tv, timeout, sizeof (struct timeval));
-               priv->ptv = &priv->tv;
-       }
-
+       priv->timeout = timeout;
        priv->header = NULL;
        priv->buf = g_malloc0 (sizeof (*priv->buf));
        REF_INIT_RETAIN (priv->buf, rspamd_http_privbuf_dtor);
        priv->buf->data = rspamd_fstring_sized_new (8192);
        priv->flags |= RSPAMD_HTTP_CONN_FLAG_NEW_HEADER;
 
-       event_set (&priv->ev,
-               conn->fd,
-               EV_READ | EV_PERSIST,
-               rspamd_http_event_handler,
-               conn);
-
-       event_base_set (priv->ctx->ev_base, &priv->ev);
+       rspamd_ev_watcher_init (&priv->ev, conn->fd, EV_READ,
+                       rspamd_http_event_handler, conn);
+       rspamd_ev_watcher_start (priv->ctx->event_loop, &priv->ev, priv->timeout);
 
        priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_RESETED;
-       event_add (&priv->ev, priv->ptv);
 }
 
 void
 rspamd_http_connection_read_message (struct rspamd_http_connection *conn,
-               gpointer ud, struct timeval *timeout)
+               gpointer ud, ev_tstamp timeout)
 {
        rspamd_http_connection_read_message_common (conn, ud, timeout, 0);
 }
 
 void
 rspamd_http_connection_read_message_shared (struct rspamd_http_connection *conn,
-               gpointer ud, struct timeval *timeout)
+               gpointer ud, ev_tstamp timeout)
 {
        rspamd_http_connection_read_message_common (conn, ud, timeout,
                        RSPAMD_HTTP_FLAG_SHMEM);
@@ -1912,7 +1888,7 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn
                                                                                         const gchar *host,
                                                                                         const gchar *mime_type,
                                                                                         gpointer ud,
-                                                                                        struct timeval *timeout,
+                                                                                        ev_tstamp timeout,
                                                                                         gboolean allow_shared)
 {
        struct rspamd_http_connection_private *priv = conn->priv;
@@ -1930,14 +1906,7 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn
 
        conn->ud = ud;
        priv->msg = msg;
-
-       if (timeout == NULL) {
-               priv->ptv = NULL;
-       }
-       else if (timeout != &priv->tv) {
-               memcpy (&priv->tv, timeout, sizeof (struct timeval));
-               priv->ptv = &priv->tv;
-       }
+       priv->timeout = timeout;
 
        priv->header = NULL;
        priv->buf = g_malloc0 (sizeof (*priv->buf));
@@ -2224,16 +2193,12 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn
                msg->flags &=~ RSPAMD_HTTP_FLAG_SSL;
        }
 
-       if (rspamd_event_pending (&priv->ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
-               event_del (&priv->ev);
-       }
+       rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
 
        if (msg->flags & RSPAMD_HTTP_FLAG_SSL) {
                gpointer ssl_ctx = (msg->flags & RSPAMD_HTTP_FLAG_SSL_NOVERIFY) ?
                                priv->ctx->ssl_ctx_noverify : priv->ctx->ssl_ctx;
 
-               event_base_set (priv->ctx->ev_base, &priv->ev);
-
                if (!ssl_ctx) {
                        err = g_error_new (HTTP_ERROR, errno, "ssl message requested "
                                        "with no ssl ctx");
@@ -2249,12 +2214,12 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn
                                rspamd_ssl_connection_free (priv->ssl);
                        }
 
-                       priv->ssl = rspamd_ssl_connection_new (ssl_ctx, priv->ctx->ev_base,
+                       priv->ssl = rspamd_ssl_connection_new (ssl_ctx, priv->ctx->event_loop,
                                        !(msg->flags & RSPAMD_HTTP_FLAG_SSL_NOVERIFY));
                        g_assert (priv->ssl != NULL);
 
                        if (!rspamd_ssl_connect_fd (priv->ssl, conn->fd, host, &priv->ev,
-                                       priv->ptv, rspamd_http_event_handler,
+                                       priv->timeout, rspamd_http_event_handler,
                                        rspamd_http_ssl_err_handler, conn)) {
 
                                err = g_error_new (HTTP_ERROR, errno,
@@ -2270,10 +2235,9 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn
                }
        }
        else {
-               event_set (&priv->ev, conn->fd, EV_WRITE, rspamd_http_event_handler, conn);
-               event_base_set (priv->ctx->ev_base, &priv->ev);
-
-               event_add (&priv->ev, priv->ptv);
+               rspamd_ev_watcher_init (&priv->ev, conn->fd, EV_WRITE,
+                               rspamd_http_event_handler, conn);
+               rspamd_ev_watcher_start (priv->ctx->event_loop, &priv->ev, priv->timeout);
        }
 }
 
@@ -2283,7 +2247,7 @@ rspamd_http_connection_write_message (struct rspamd_http_connection *conn,
                                                                          const gchar *host,
                                                                          const gchar *mime_type,
                                                                          gpointer ud,
-                                                                         struct timeval *timeout)
+                                                                         ev_tstamp timeout)
 {
        rspamd_http_connection_write_message_common (conn, msg, host, mime_type,
                        ud, timeout, FALSE);
@@ -2295,7 +2259,7 @@ rspamd_http_connection_write_message_shared (struct rspamd_http_connection *conn
                                                                                         const gchar *host,
                                                                                         const gchar *mime_type,
                                                                                         gpointer ud,
-                                                                                        struct timeval *timeout)
+                                                                                        ev_tstamp timeout)
 {
        rspamd_http_connection_write_message_common (conn, msg, host, mime_type,
                        ud, timeout, TRUE);
index b4b401ecbb38f59595032f6b87d44d15cc97f1cc..fc130344666f961e6f73184a3edb4357c5bf8962 100644 (file)
@@ -221,12 +221,12 @@ gboolean rspamd_http_connection_is_encrypted (struct rspamd_http_connection *con
 void rspamd_http_connection_read_message (
                struct rspamd_http_connection *conn,
                gpointer ud,
-               struct timeval *timeout);
+               ev_tstamp timeout);
 
 void rspamd_http_connection_read_message_shared (
                struct rspamd_http_connection *conn,
                gpointer ud,
-               struct timeval *timeout);
+               ev_tstamp timeout);
 
 /**
  * Send reply using initialised connection
@@ -241,7 +241,7 @@ void rspamd_http_connection_write_message (
                const gchar *host,
                const gchar *mime_type,
                gpointer ud,
-               struct timeval *timeout);
+               ev_tstamp timeout);
 
 void rspamd_http_connection_write_message_shared (
                struct rspamd_http_connection *conn,
@@ -249,7 +249,7 @@ void rspamd_http_connection_write_message_shared (
                const gchar *host,
                const gchar *mime_type,
                gpointer ud,
-               struct timeval *timeout);
+               ev_tstamp timeout);
 
 /**
  * Free connection structure
index b9add9ac92f8349c0df9356faf653e17a647b305..f5d766d8869f796770bea797c3eb6e7805f98833 100644 (file)
@@ -114,7 +114,7 @@ rspamd_http_context_new_default (struct rspamd_config *cfg,
                ctx->ssl_ctx_noverify = rspamd_init_ssl_ctx_noverify ();
        }
 
-       ctx->ev_base = ev_base;
+       ctx->event_loop = ev_base;
 
        ctx->keep_alive_hash = kh_init (rspamd_keep_alive_hash);
 
@@ -186,7 +186,7 @@ rspamd_http_context_init (struct rspamd_http_context *ctx)
                ctx->server_kp_cache = rspamd_keypair_cache_new (ctx->config.kp_cache_size_server);
        }
 
-       if (ctx->config.client_key_rotate_time > 0 && ctx->ev_base) {
+       if (ctx->config.client_key_rotate_time > 0 && ctx->event_loop) {
                struct timeval tv;
                double jittered = rspamd_time_jitter (ctx->config.client_key_rotate_time,
                                0);
@@ -194,7 +194,7 @@ rspamd_http_context_init (struct rspamd_http_context *ctx)
                double_to_tv (jittered, &tv);
                event_set (&ctx->client_rotate_ev, -1, EV_TIMEOUT,
                                rspamd_http_context_client_rotate_ev, ctx);
-               event_base_set (ctx->ev_base, &ctx->client_rotate_ev);
+               event_base_set (ctx->event_loop, &ctx->client_rotate_ev);
                event_add (&ctx->client_rotate_ev, &tv);
        }
 
index e29152c77b4138d994669db6134c762214326c1f..e09dbef408365b39b1618e59e1786e8e57dc3a79 100644 (file)
@@ -100,8 +100,8 @@ struct rspamd_http_context {
        struct upstream_list *http_proxies;
        gpointer ssl_ctx;
        gpointer ssl_ctx_noverify;
-       struct ev_loop *ev_base;
-       struct event client_rotate_ev;
+       struct ev_loop *event_loop;
+       ev_periodic client_rotate_ev;
        khash_t (rspamd_keep_alive_hash) *keep_alive_hash;
 };
 
index ec0eeb7b486a76f5e7a9fd6ec03b4749dce5cca6..8d5913f0d249b36fddffd717fb0c8579f2ea85b5 100644 (file)
@@ -92,7 +92,7 @@ rspamd_http_router_error_handler (struct rspamd_http_connection *conn,
                                NULL,
                                "text/plain",
                                entry,
-                               entry->rt->ptv);
+                               entry->rt->timeout);
                entry->is_reply = TRUE;
        }
 }
@@ -210,7 +210,7 @@ rspamd_http_router_try_file (struct rspamd_http_connection_entry *entry,
        msg_debug ("requested file %s", realbuf);
        rspamd_http_connection_write_message (entry->conn, reply_msg, NULL,
                        rspamd_http_router_detect_ct (realbuf), entry,
-                       entry->rt->ptv);
+                       entry->rt->timeout);
 
        return TRUE;
 }
@@ -235,7 +235,7 @@ rspamd_http_router_send_error (GError *err,
                        NULL,
                        "text/plain",
                        entry,
-                       entry->rt->ptv);
+                       entry->rt->timeout);
 }
 
 
@@ -369,33 +369,25 @@ rspamd_http_router_finish_handler (struct rspamd_http_connection *conn,
 struct rspamd_http_connection_router *
 rspamd_http_router_new (rspamd_http_router_error_handler_t eh,
                                                rspamd_http_router_finish_handler_t fh,
-                                               struct timeval *timeout,
+                                               ev_tstamp timeout,
                                                const char *default_fs_path,
                                                struct rspamd_http_context *ctx)
 {
-       struct rspamd_http_connection_router * new;
+       struct rspamd_http_connection_router *nrouter;
        struct stat st;
 
-       new = g_malloc0 (sizeof (struct rspamd_http_connection_router));
-       new->paths = g_hash_table_new_full (rspamd_ftok_icase_hash,
+       nrouter = g_malloc0 (sizeof (struct rspamd_http_connection_router));
+       nrouter->paths = g_hash_table_new_full (rspamd_ftok_icase_hash,
                        rspamd_ftok_icase_equal, rspamd_fstring_mapped_ftok_free, NULL);
-       new->regexps = g_ptr_array_new ();
-       new->conns = NULL;
-       new->error_handler = eh;
-       new->finish_handler = fh;
-       new->response_headers = g_hash_table_new_full (rspamd_strcase_hash,
+       nrouter->regexps = g_ptr_array_new ();
+       nrouter->conns = NULL;
+       nrouter->error_handler = eh;
+       nrouter->finish_handler = fh;
+       nrouter->response_headers = g_hash_table_new_full (rspamd_strcase_hash,
                        rspamd_strcase_equal, g_free, g_free);
-       new->ev_base = ctx->ev_base;
-
-       if (timeout) {
-               new->tv = *timeout;
-               new->ptv = &new->tv;
-       }
-       else {
-               new->ptv = NULL;
-       }
-
-       new->default_fs_path = NULL;
+       nrouter->event_loop = ctx->event_loop;
+       nrouter->timeout = timeout;
+       nrouter->default_fs_path = NULL;
 
        if (default_fs_path != NULL) {
                if (stat (default_fs_path, &st) == -1) {
@@ -406,14 +398,14 @@ rspamd_http_router_new (rspamd_http_router_error_handler_t eh,
                                msg_err ("path %s is not a directory", default_fs_path);
                        }
                        else {
-                               new->default_fs_path = realpath (default_fs_path, NULL);
+                               nrouter->default_fs_path = realpath (default_fs_path, NULL);
                        }
                }
        }
 
-       new->ctx = ctx;
+       nrouter->ctx = ctx;
 
-       return new;
+       return nrouter;
 }
 
 void
@@ -517,7 +509,7 @@ rspamd_http_router_handle_socket (struct rspamd_http_connection_router *router,
                rspamd_http_connection_set_key (conn->conn, router->key);
        }
 
-       rspamd_http_connection_read_message (conn->conn, conn, router->ptv);
+       rspamd_http_connection_read_message (conn->conn, conn, router->timeout);
        DL_PREPEND (router->conns, conn);
 }
 
index 03886707af3b5c4c003ed698494b3354d1468cf0..b946067b7249839d531ea1bb73cc23f583b1e8ca 100644 (file)
@@ -44,9 +44,8 @@ struct rspamd_http_connection_router {
        GHashTable *paths;
        GHashTable *response_headers;
        GPtrArray *regexps;
-       struct timeval tv;
-       struct timeval *ptv;
-       struct ev_loop *ev_base;
+       ev_tstamp timeout;
+       struct ev_loop *event_loop;
        struct rspamd_http_context *ctx;
        gchar *default_fs_path;
        rspamd_http_router_handler_t unknown_method_handler;
@@ -66,7 +65,7 @@ struct rspamd_http_connection_router {
 struct rspamd_http_connection_router * rspamd_http_router_new (
                rspamd_http_router_error_handler_t eh,
                rspamd_http_router_finish_handler_t fh,
-               struct timeval *timeout,
+               ev_tstamp timeout,
                const char *default_fs_path,
                struct rspamd_http_context *ctx);
 
index a0a0c509bb26c4ba7ca784bea3c496c1c2b7c633..ac0f1fc0d5008251cfcd52f5f4ee5d56015e6dbc 100644 (file)
@@ -65,16 +65,26 @@ rspamd_ev_watcher_start (struct ev_loop *loop,
                                                 ev_tstamp timeout)
 {
        ev->last_activity = ev_now (EV_A);
-       ev_timer_set (&ev->tm, timeout, 0.0);
        ev_io_start (EV_A_ &ev->io);
-       ev_timer_start (EV_A_ &ev->tm);
+
+       if (timeout > 0) {
+               ev->timeout = timeout;
+               ev_timer_set (&ev->tm, timeout, 0.0);
+               ev_timer_start (EV_A_ &ev->tm);
+       }
 }
 
 void
 rspamd_ev_watcher_stop (struct ev_loop *loop,
                                                struct rspamd_io_ev *ev)
 {
-       ev_io_stop (EV_A_ &ev->io);
+       if (ev_is_pending (&ev->io) || ev_is_active (&ev->io)) {
+               ev_io_stop (EV_A_ &ev->io);
+       }
+
+       if (ev->timeout > 0) {
+               ev_timer_stop (EV_A_ &ev->tm);
+       }
 }
 
 void
index f7e0649e5bf8deda94f3339e23102a28dfb69dc9..8b45881b698b658b15346a208648b970a951037b 100644 (file)
@@ -61,7 +61,7 @@ struct file_map_data {
 struct http_map_data;
 
 struct rspamd_http_map_cached_cbdata {
-       struct event timeout;
+       ev_periodic timeout;
        struct rspamd_storage_shmem *shm;
        struct rspamd_map *map;
        struct http_map_data *data;
index b417efb74c2d95c576afb83aa79c9b544595d063..0fe6cc625d6ffc2edf3953613bcfffebcc59886a 100644 (file)
@@ -45,9 +45,8 @@ struct rspamd_ssl_connection {
        gboolean verify_peer;
        SSL *ssl;
        gchar *hostname;
-       struct event *ev;
-       struct ev_loop *ev_base;
-       struct timeval *tv;
+       struct rspamd_io_ev *ev;
+       struct ev_loop *event_loop;
        rspamd_ssl_handler_t handler;
        rspamd_ssl_error_handler_t err_handler;
        gpointer handler_data;
@@ -407,7 +406,7 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud)
        gint ret;
        GError *err = NULL;
 
-       if (what == EV_TIMEOUT) {
+       if (what == EV_TIMER) {
                c->shut = ssl_shut_unclean;
        }
 
@@ -417,7 +416,7 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud)
                ret = SSL_connect (c->ssl);
 
                if (ret == 1) {
-                       event_del (c->ev);
+                       rspamd_ev_watcher_stop (c->event_loop, c->ev);
                        /* Verify certificate */
                        if ((!c->verify_peer) || rspamd_ssl_peer_verify (c)) {
                                c->state = ssl_conn_connected;
@@ -443,30 +442,18 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud)
                                return;
                        }
 
-                       event_del (c->ev);
-                       event_set (c->ev, fd, what, rspamd_ssl_event_handler, c);
-                       event_base_set (c->ev_base, c->ev);
-                       event_add (c->ev, c->tv);
+                       rspamd_ev_watcher_reschedule (c->event_loop, c->ev, what);
+
                }
                break;
        case ssl_next_read:
-               event_del (c->ev);
-               /* Restore handler */
-               event_set (c->ev, c->fd, EV_READ|EV_PERSIST,
-                               c->handler, c->handler_data);
-               event_base_set (c->ev_base, c->ev);
-               event_add (c->ev, c->tv);
+               rspamd_ev_watcher_reschedule (c->event_loop, c->ev, EV_READ);
                c->state = ssl_conn_connected;
                c->handler (fd, EV_READ, c->handler_data);
                break;
        case ssl_next_write:
        case ssl_conn_connected:
-               event_del (c->ev);
-               /* Restore handler */
-               event_set (c->ev, c->fd, EV_WRITE,
-                               c->handler, c->handler_data);
-               event_base_set (c->ev_base, c->ev);
-               event_add (c->ev, c->tv);
+               rspamd_ev_watcher_reschedule (c->event_loop, c->ev, EV_WRITE);
                c->state = ssl_conn_connected;
                c->handler (fd, EV_WRITE, c->handler_data);
                break;
@@ -488,7 +475,7 @@ rspamd_ssl_connection_new (gpointer ssl_ctx, struct ev_loop *ev_base,
        g_assert (ssl_ctx != NULL);
        c = g_malloc0 (sizeof (*c));
        c->ssl = SSL_new (ssl_ctx);
-       c->ev_base = ev_base;
+       c->event_loop = ev_base;
        c->verify_peer = verify_peer;
 
        return c;
@@ -497,7 +484,7 @@ rspamd_ssl_connection_new (gpointer ssl_ctx, struct ev_loop *ev_base,
 
 gboolean
 rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd,
-               const gchar *hostname, struct event *ev, struct timeval *tv,
+               const gchar *hostname, struct rspamd_io_ev *ev, ev_tstamp timeout,
                rspamd_ssl_handler_t handler, rspamd_ssl_error_handler_t err_handler,
                gpointer handler_data)
 {
@@ -534,17 +521,9 @@ rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd,
        if (ret == 1) {
                conn->state = ssl_conn_connected;
 
-               if (rspamd_event_pending (ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
-                       event_del (ev);
-               }
-
-               event_set (ev, fd, EV_WRITE, rspamd_ssl_event_handler, conn);
-
-               if (conn->ev_base) {
-                       event_base_set (conn->ev_base, ev);
-               }
-
-               event_add (ev, tv);
+               rspamd_ev_watcher_stop (conn->event_loop, ev);
+               rspamd_ev_watcher_init (ev, fd, EV_WRITE, rspamd_ssl_event_handler, conn);
+               rspamd_ev_watcher_start (conn->event_loop, ev, timeout);
        }
        else {
                ret = SSL_get_error (conn->ssl, ret);
@@ -561,13 +540,10 @@ rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd,
                        return FALSE;
                }
 
-               if (rspamd_event_pending (ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
-                       event_del (ev);
-               }
-
-               event_set (ev, fd, what, rspamd_ssl_event_handler, conn);
-               event_base_set (conn->ev_base, ev);
-               event_add (ev, tv);
+               rspamd_ev_watcher_stop (conn->event_loop, ev);
+               rspamd_ev_watcher_init (ev, fd, EV_WRITE|EV_READ,
+                               rspamd_ssl_event_handler, conn);
+               rspamd_ev_watcher_start (conn->event_loop, ev, timeout);
        }
 
        return TRUE;
@@ -638,13 +614,8 @@ rspamd_ssl_read (struct rspamd_ssl_connection *conn, gpointer buf,
                        return -1;
                }
 
-               event_del (conn->ev);
-               event_set (conn->ev, conn->fd, what, rspamd_ssl_event_handler, conn);
-               event_base_set (conn->ev_base, conn->ev);
-               event_add (conn->ev, conn->tv);
-
+               rspamd_ev_watcher_reschedule (conn->event_loop, conn->ev, what);
                errno = EAGAIN;
-
        }
 
        return -1;
@@ -713,11 +684,7 @@ rspamd_ssl_write (struct rspamd_ssl_connection *conn, gconstpointer buf,
                        return -1;
                }
 
-               event_del (conn->ev);
-               event_set (conn->ev, conn->fd, what, rspamd_ssl_event_handler, conn);
-               event_base_set (conn->ev_base, conn->ev);
-               event_add (conn->ev, conn->tv);
-
+               rspamd_ev_watcher_reschedule (conn->event_loop, conn->ev, what);
                errno = EAGAIN;
        }
 
index 3bae1edc0935d952a303ab5082b2be1033661a2c..f7f1652dee06b2d386136bd73d2fdb1706cc3f69 100644 (file)
@@ -18,6 +18,7 @@
 
 #include "config.h"
 #include "libutil/addr.h"
+#include "libutil/libev_helper.h"
 
 struct rspamd_ssl_connection;
 
@@ -44,7 +45,7 @@ struct rspamd_ssl_connection * rspamd_ssl_connection_new (gpointer ssl_ctx,
  * @return TRUE if a session has been connected
  */
 gboolean rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd,
-               const gchar *hostname, struct event *ev, struct timeval *tv,
+               const gchar *hostname, struct rspamd_io_ev *ev, ev_tstamp timeout,
                rspamd_ssl_handler_t handler, rspamd_ssl_error_handler_t err_handler,
                gpointer handler_data);
 
index ed5bc9132615f795ed75e9cf3e0179c3be349fd1..2c91869d66130069f178930dd945042f524937ca 100644 (file)
@@ -2912,7 +2912,7 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
                        rspamd_http_connection_ref (entry->conn);
 
                        event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
-                       event_base_set (entry->rt->ev_base, &s->ev);
+                       event_base_set (entry->rt->event_loop, &s->ev);
                        event_add (&s->ev, NULL);
 
                        evtimer_set (&s->timev, fuzzy_controller_timer_callback,
@@ -2946,7 +2946,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent,
 
        /* Prepare task */
        task = rspamd_task_new (session->wrk, session->cfg, NULL,
-                       session->lang_det, conn_ent->rt->ev_base);
+                       session->lang_det, conn_ent->rt->event_loop);
        task->cfg = ctx->cfg;
        saved = rspamd_mempool_alloc0 (session->pool, sizeof (gint));
        err = rspamd_mempool_alloc0 (session->pool, sizeof (GError *));
index 375cba13f9160c87d64a6251db595738550ca41a..4deb2f9337b5ea13c3e8b10b4648426fbc2a48a8 100644 (file)
@@ -121,7 +121,7 @@ struct rspamd_worker_signal_cb {
 struct rspamd_worker_signal_handler {
        gint signo;
        gboolean enabled;
-       struct event ev;
+       ev_signal ev_sig;
        struct ev_loop *base;
        struct rspamd_worker *worker;
        struct rspamd_worker_signal_cb *cb;