summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-16 18:32:55 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-22 10:57:29 +0100
commite463ad556cb35ee39b92dbf7d3934d4187ab70d2 (patch)
treea016f5503e746b586616d862c68ad233f262c6a9
parent71e0dbf9608026c347279886097790e51e9e5506 (diff)
downloadrspamd-e463ad556cb35ee39b92dbf7d3934d4187ab70d2.tar.gz
rspamd-e463ad556cb35ee39b92dbf7d3934d4187ab70d2.zip
[Project] Rework HTTP IO
-rw-r--r--src/libutil/CMakeLists.txt1
-rw-r--r--src/libutil/aio_event.c508
-rw-r--r--src/libutil/aio_event.h59
-rw-r--r--src/libutil/http_connection.c94
-rw-r--r--src/libutil/http_connection.h8
-rw-r--r--src/libutil/http_context.c6
-rw-r--r--src/libutil/http_private.h4
-rw-r--r--src/libutil/http_router.c46
-rw-r--r--src/libutil/http_router.h7
-rw-r--r--src/libutil/libev_helper.c16
-rw-r--r--src/libutil/map_private.h2
-rw-r--r--src/libutil/ssl_util.c71
-rw-r--r--src/libutil/ssl_util.h3
-rw-r--r--src/plugins/fuzzy_check.c4
-rw-r--r--src/rspamd.h2
15 files changed, 98 insertions, 733 deletions
diff --git a/src/libutil/CMakeLists.txt b/src/libutil/CMakeLists.txt
index fd29c5512..5a94a732c 100644
--- a/src/libutil/CMakeLists.txt
+++ b/src/libutil/CMakeLists.txt
@@ -2,7 +2,6 @@
SET(LIBRSPAMDUTILSRC
${CMAKE_CURRENT_SOURCE_DIR}/addr.c
${CMAKE_CURRENT_SOURCE_DIR}/libev_helper.c
- ${CMAKE_CURRENT_SOURCE_DIR}/aio_event.c
${CMAKE_CURRENT_SOURCE_DIR}/bloom.c
${CMAKE_CURRENT_SOURCE_DIR}/expression.c
${CMAKE_CURRENT_SOURCE_DIR}/fstring.c
diff --git a/src/libutil/aio_event.c b/src/libutil/aio_event.c
deleted file mode 100644
index d0c8d3f63..000000000
--- a/src/libutil/aio_event.c
+++ /dev/null
@@ -1,508 +0,0 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "config.h"
-#include "contrib/libev/ev.h"
-#include "aio_event.h"
-#include "rspamd.h"
-#include "unix-std.h"
-
-#ifdef HAVE_SYS_EVENTFD_H
-#include <sys/eventfd.h>
-#endif
-
-#ifdef HAVE_AIO_H
-#include <aio.h>
-#endif
-
-/* Linux syscall numbers */
-#if defined(__i386__)
-# define SYS_io_setup 245
-# define SYS_io_destroy 246
-# define SYS_io_getevents 247
-# define SYS_io_submit 248
-# define SYS_io_cancel 249
-#elif defined(__x86_64__)
-# define SYS_io_setup 206
-# define SYS_io_destroy 207
-# define SYS_io_getevents 208
-# define SYS_io_submit 209
-# define SYS_io_cancel 210
-#else
-# warning \
- "aio is not supported on this platform, please contact author for details"
-# define SYS_io_setup 0
-# define SYS_io_destroy 0
-# define SYS_io_getevents 0
-# define SYS_io_submit 0
-# define SYS_io_cancel 0
-#endif
-
-#define SYS_eventfd 323
-#define MAX_AIO_EV 64
-
-struct io_cbdata {
- gint fd;
- rspamd_aio_cb cb;
- guint64 len;
- gpointer buf;
- gpointer io_buf;
- gpointer ud;
-};
-
-#ifdef LINUX
-
-/* Linux specific mappings and utilities to avoid using of libaio */
-
-typedef unsigned long aio_context_t;
-
-typedef enum io_iocb_cmd {
- IO_CMD_PREAD = 0,
- IO_CMD_PWRITE = 1,
-
- IO_CMD_FSYNC = 2,
- IO_CMD_FDSYNC = 3,
-
- IO_CMD_POLL = 5,
- IO_CMD_NOOP = 6,
-} io_iocb_cmd_t;
-
-#if defined(__LITTLE_ENDIAN)
-#define PADDED(x,y) x, y
-#elif defined(__BIG_ENDIAN)
-#define PADDED(x,y) y, x
-#else
-#error edit for your odd byteorder.
-#endif
-
-/*
- * we always use a 64bit off_t when communicating
- * with userland. its up to libraries to do the
- * proper padding and aio_error abstraction
- */
-
-struct iocb {
- /* these are internal to the kernel/libc. */
- guint64 aio_data; /* data to be returned in event's data */
- guint32 PADDED (aio_key, aio_reserved1);
- /* the kernel sets aio_key to the req # */
-
- /* common fields */
- guint16 aio_lio_opcode; /* see IOCB_CMD_ above */
- gint16 aio_reqprio;
- guint32 aio_fildes;
-
- guint64 aio_buf;
- guint64 aio_nbytes;
- gint64 aio_offset;
-
- /* extra parameters */
- guint64 aio_reserved2; /* TODO: use this for a (struct sigevent *) */
-
- /* flags for the "struct iocb" */
- guint32 aio_flags;
-
- /*
- * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an
- * eventfd to signal AIO readiness to
- */
- guint32 aio_resfd;
-};
-
-struct io_event {
- guint64 data; /* the data field from the iocb */
- guint64 obj; /* what iocb this event came from */
- gint64 res; /* result code for this event */
- gint64 res2; /* secondary result */
-};
-
-/* Linux specific io calls */
-static int
-io_setup (guint nr_reqs, aio_context_t *ctx)
-{
- return syscall (SYS_io_setup, nr_reqs, ctx);
-}
-
-static int
-io_destroy (aio_context_t ctx)
-{
- return syscall (SYS_io_destroy, ctx);
-}
-
-static int
-io_getevents (aio_context_t ctx,
- long min_nr,
- long nr,
- struct io_event *events,
- struct timespec *tmo)
-{
- return syscall (SYS_io_getevents, ctx, min_nr, nr, events, tmo);
-}
-
-static int
-io_submit (aio_context_t ctx, long n, struct iocb **paiocb)
-{
- return syscall (SYS_io_submit, ctx, n, paiocb);
-}
-
-static int
-io_cancel (aio_context_t ctx, struct iocb *iocb, struct io_event *result)
-{
- return syscall (SYS_io_cancel, ctx, iocb, result);
-}
-
-# ifndef HAVE_SYS_EVENTFD_H
-static int
-eventfd (guint initval, guint flags)
-{
- return syscall (SYS_eventfd, initval);
-}
-# endif
-
-#endif
-
-/**
- * AIO context
- */
-struct aio_context {
- struct ev_loop *base;
- gboolean has_aio; /**< Whether we have aio support on a system */
-#ifdef LINUX
- /* Eventfd variant */
- gint event_fd;
- struct event eventfd_ev;
- aio_context_t io_ctx;
-#elif defined(HAVE_AIO_H)
- /* POSIX aio */
- struct event rtsigs[128];
-#endif
-};
-
-#ifdef LINUX
-/* Eventfd read callback */
-static void
-rspamd_eventfdcb (gint fd, gshort what, gpointer ud)
-{
- struct aio_context *ctx = ud;
- guint64 ready;
- gint done, i;
- struct io_event event[32];
- struct timespec ts;
- struct io_cbdata *ev_data;
-
- /* Eventfd returns number of events ready got from kernel */
- if (read (fd, &ready, 8) != 8) {
- if (errno == EAGAIN) {
- return;
- }
- msg_err ("eventfd read returned error: %s", strerror (errno));
- }
-
- ts.tv_sec = 0;
- ts.tv_nsec = 0;
-
- while (ready) {
- /* Get events ready */
- done = io_getevents (ctx->io_ctx, 1, 32, event, &ts);
-
- if (done > 0) {
- ready -= done;
-
- for (i = 0; i < done; i++) {
- ev_data = (struct io_cbdata *) (uintptr_t) event[i].data;
- /* Call this callback */
- ev_data->cb (ev_data->fd,
- event[i].res,
- ev_data->len,
- ev_data->buf,
- ev_data->ud);
- if (ev_data->io_buf) {
- free (ev_data->io_buf);
- }
- g_free (ev_data);
- }
- }
- else if (done == 0) {
- /* No more events are ready */
- return;
- }
- else {
- msg_err ("io_getevents failed: %s", strerror (errno));
- return;
- }
- }
-}
-
-#endif
-
-/**
- * Initialize aio with specified event base
- */
-struct aio_context *
-rspamd_aio_init (struct ev_loop *base)
-{
- struct aio_context *new;
-
- /* First of all we need to detect which type of aio we can try to use */
- new = g_malloc0 (sizeof (struct aio_context));
- new->base = base;
-
-#ifdef LINUX
- /* On linux we are trying to use io (3) and eventfd for notifying */
- new->event_fd = eventfd (0, 0);
- if (new->event_fd == -1) {
- msg_err ("eventfd failed: %s", strerror (errno));
- }
- else {
- /* Set this socket non-blocking */
- if (rspamd_socket_nonblocking (new->event_fd) == -1) {
- msg_err ("non blocking for eventfd failed: %s", strerror (errno));
- close (new->event_fd);
- }
- else {
- event_set (&new->eventfd_ev,
- new->event_fd,
- EV_READ | EV_PERSIST,
- rspamd_eventfdcb,
- new);
- event_base_set (new->base, &new->eventfd_ev);
- event_add (&new->eventfd_ev, NULL);
- if (io_setup (MAX_AIO_EV, &new->io_ctx) == -1) {
- msg_err ("io_setup failed: %s", strerror (errno));
- close (new->event_fd);
- }
- else {
- new->has_aio = TRUE;
- }
- }
- }
-#elif defined(HAVE_AIO_H)
- /* TODO: implement this */
-#endif
-
- return new;
-}
-
-/**
- * Open file for aio
- */
-gint
-rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags)
-{
- gint fd = -1;
- /* Fallback */
- if (!ctx->has_aio) {
- return open (path, flags);
- }
-#ifdef LINUX
-
- fd = open (path, flags | O_DIRECT);
-
- return fd;
-#elif defined(HAVE_AIO_H)
- fd = open (path, flags);
-#endif
-
- return fd;
-}
-
-/**
- * Asynchronous read of file
- */
-gint
-rspamd_aio_read (gint fd,
- gpointer buf,
- guint64 len,
- guint64 offset,
- struct aio_context *ctx,
- rspamd_aio_cb cb,
- gpointer ud)
-{
- gint r = -1;
-
- if (ctx->has_aio) {
-#ifdef LINUX
- struct iocb *iocb[1];
- struct io_cbdata *cbdata;
-
- cbdata = g_malloc0 (sizeof (struct io_cbdata));
- cbdata->cb = cb;
- cbdata->buf = buf;
- cbdata->len = len;
- cbdata->ud = ud;
- cbdata->fd = fd;
- cbdata->io_buf = NULL;
-
- iocb[0] = alloca (sizeof (struct iocb));
- memset (iocb[0], 0, sizeof (struct iocb));
- iocb[0]->aio_fildes = fd;
- iocb[0]->aio_lio_opcode = IO_CMD_PREAD;
- iocb[0]->aio_reqprio = 0;
- iocb[0]->aio_buf = (guint64)((uintptr_t)buf);
- iocb[0]->aio_nbytes = len;
- iocb[0]->aio_offset = offset;
- iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
- iocb[0]->aio_resfd = ctx->event_fd;
- iocb[0]->aio_data = (guint64)((uintptr_t)cbdata);
-
- /* Iocb is copied to kernel internally, so it is safe to put it on stack */
- if (io_submit (ctx->io_ctx, 1, iocb) == 1) {
- return len;
- }
- else {
- if (errno == EAGAIN || errno == ENOSYS) {
- /* Fall back to sync read */
- goto blocking;
- }
- return -1;
- }
-
-#elif defined(HAVE_AIO_H)
-#endif
- }
- else {
- /* Blocking variant */
- goto blocking;
-blocking:
-#ifdef _LARGEFILE64_SOURCE
- r = lseek64 (fd, offset, SEEK_SET);
-#else
- r = lseek (fd, offset, SEEK_SET);
-#endif
- if (r > 0) {
- r = read (fd, buf, len);
- if (r >= 0) {
- cb (fd, 0, r, buf, ud);
- }
- else {
- cb (fd, r, -1, buf, ud);
- }
- }
- }
-
- return r;
-}
-
-/**
- * Asynchronous write of file
- */
-gint
-rspamd_aio_write (gint fd,
- gpointer buf,
- guint64 len,
- guint64 offset,
- struct aio_context *ctx,
- rspamd_aio_cb cb,
- gpointer ud)
-{
- gint r = -1;
-
- if (ctx->has_aio) {
-#ifdef LINUX
- struct iocb *iocb[1];
- struct io_cbdata *cbdata;
-
- cbdata = g_malloc0 (sizeof (struct io_cbdata));
- cbdata->cb = cb;
- cbdata->buf = buf;
- cbdata->len = len;
- cbdata->ud = ud;
- cbdata->fd = fd;
- /* We need to align pointer on boundary of 512 bytes here */
- if (posix_memalign (&cbdata->io_buf, 512, len) != 0) {
- return -1;
- }
- memcpy (cbdata->io_buf, buf, len);
-
- iocb[0] = alloca (sizeof (struct iocb));
- memset (iocb[0], 0, sizeof (struct iocb));
- iocb[0]->aio_fildes = fd;
- iocb[0]->aio_lio_opcode = IO_CMD_PWRITE;
- iocb[0]->aio_reqprio = 0;
- iocb[0]->aio_buf = (guint64)((uintptr_t)cbdata->io_buf);
- iocb[0]->aio_nbytes = len;
- iocb[0]->aio_offset = offset;
- iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
- iocb[0]->aio_resfd = ctx->event_fd;
- iocb[0]->aio_data = (guint64)((uintptr_t)cbdata);
-
- /* Iocb is copied to kernel internally, so it is safe to put it on stack */
- if (io_submit (ctx->io_ctx, 1, iocb) == 1) {
- return len;
- }
- else {
- if (errno == EAGAIN || errno == ENOSYS) {
- /* Fall back to sync read */
- goto blocking;
- }
- return -1;
- }
-
-#elif defined(HAVE_AIO_H)
-#endif
- }
- else {
- /* Blocking variant */
- goto blocking;
-blocking:
-#ifdef _LARGEFILE64_SOURCE
- r = lseek64 (fd, offset, SEEK_SET);
-#else
- r = lseek (fd, offset, SEEK_SET);
-#endif
- if (r > 0) {
- r = write (fd, buf, len);
- if (r >= 0) {
- cb (fd, 0, r, buf, ud);
- }
- else {
- cb (fd, r, -1, buf, ud);
- }
- }
- }
-
- return r;
-}
-
-/**
- * Close of aio operations
- */
-gint
-rspamd_aio_close (gint fd, struct aio_context *ctx)
-{
- gint r = -1;
-
- if (ctx->has_aio) {
-#ifdef LINUX
- struct iocb iocb;
- struct io_event ev;
-
- memset (&iocb, 0, sizeof (struct iocb));
- iocb.aio_fildes = fd;
- iocb.aio_lio_opcode = IO_CMD_NOOP;
-
- /* Iocb is copied to kernel internally, so it is safe to put it on stack */
- r = io_cancel (ctx->io_ctx, &iocb, &ev);
- close (fd);
- return r;
-
-#elif defined(HAVE_AIO_H)
-#endif
- }
-
- r = close (fd);
-
- return r;
-}
diff --git a/src/libutil/aio_event.h b/src/libutil/aio_event.h
deleted file mode 100644
index ededd96d4..000000000
--- a/src/libutil/aio_event.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef AIO_EVENT_H_
-#define AIO_EVENT_H_
-
-#include "config.h"
-
-/**
- * AIO context
- */
-struct aio_context;
-
-/**
- * Callback for notifying
- */
-typedef void (*rspamd_aio_cb) (gint fd, gint res, guint64 len, gpointer data,
- gpointer ud);
-
-/**
- * Initialize aio with specified event base
- */
-struct aio_context * rspamd_aio_init (struct ev_loop *base);
-
-/**
- * Open file for aio
- */
-gint rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags);
-
-/**
- * Asynchronous read of file
- */
-gint rspamd_aio_read (gint fd, gpointer buf, guint64 len, guint64 offset,
- struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud);
-
-/**
- * Asynchronous write of file
- */
-gint rspamd_aio_write (gint fd, gpointer buf, guint64 len, guint64 offset,
- struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud);
-
-/**
- * Close of aio operations
- */
-gint rspamd_aio_close (gint fd, struct aio_context *ctx);
-
-#endif /* AIO_EVENT_H_ */
diff --git a/src/libutil/http_connection.c b/src/libutil/http_connection.c
index aa964f417..9bf7456e7 100644
--- a/src/libutil/http_connection.c
+++ b/src/libutil/http_connection.c
@@ -25,6 +25,7 @@
#include "ottery.h"
#include "keypair_private.h"
#include "cryptobox.h"
+#include "libutil/libev_helper.h"
#include "libutil/ssl_util.h"
#include "libserver/url.h"
@@ -67,9 +68,8 @@ struct rspamd_http_connection_private {
struct rspamd_http_header *header;
struct http_parser parser;
struct http_parser_settings parser_cb;
- struct event ev;
- struct timeval tv;
- struct timeval *ptv;
+ struct rspamd_io_ev ev;
+ ev_tstamp timeout;
struct rspamd_http_message *msg;
struct iovec *out;
guint outlen;
@@ -348,9 +348,7 @@ rspamd_http_on_headers_complete (http_parser * parser)
if (msg->method == HTTP_HEAD) {
/* We don't care about the rest */
- if (rspamd_event_pending (&priv->ev, EV_READ)) {
- event_del (&priv->ev);
- }
+ rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
msg->code = parser->status_code;
rspamd_http_connection_ref (conn);
@@ -358,7 +356,7 @@ rspamd_http_on_headers_complete (http_parser * parser)
if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) {
rspamd_http_context_push_keepalive (conn->priv->ctx, conn,
- msg, conn->priv->ctx->ev_base);
+ msg, conn->priv->ctx->event_loop);
rspamd_http_connection_reset (conn);
}
else {
@@ -532,17 +530,14 @@ rspamd_http_on_headers_complete_decrypted (http_parser *parser)
if (msg->method == HTTP_HEAD) {
/* We don't care about the rest */
- if (rspamd_event_pending (&priv->ev, EV_READ)) {
- event_del (&priv->ev);
- }
-
+ rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
msg->code = parser->status_code;
rspamd_http_connection_ref (conn);
ret = conn->finish_handler (conn, msg);
if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) {
rspamd_http_context_push_keepalive (conn->priv->ctx, conn,
- msg, conn->priv->ctx->ev_base);
+ msg, conn->priv->ctx->event_loop);
rspamd_http_connection_reset (conn);
}
else {
@@ -692,16 +687,13 @@ rspamd_http_on_message_complete (http_parser * parser)
}
if (ret == 0) {
- if (rspamd_event_pending (&priv->ev, EV_READ)) {
- event_del (&priv->ev);
- }
-
+ rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
rspamd_http_connection_ref (conn);
ret = conn->finish_handler (conn, priv->msg);
if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) {
rspamd_http_context_push_keepalive (conn->priv->ctx, conn,
- priv->msg, conn->priv->ctx->ev_base);
+ priv->msg, conn->priv->ctx->event_loop);
rspamd_http_connection_reset (conn);
}
else {
@@ -741,11 +733,11 @@ rspamd_http_simple_client_helper (struct rspamd_http_connection *conn)
if (conn->opts & RSPAMD_HTTP_CLIENT_SHARED) {
rspamd_http_connection_read_message_shared (conn, conn->ud,
- conn->priv->ptv);
+ conn->priv->timeout);
}
else {
rspamd_http_connection_read_message (conn, conn->ud,
- conn->priv->ptv);
+ conn->priv->timeout);
}
if (priv->msg) {
@@ -835,7 +827,6 @@ rspamd_http_write_helper (struct rspamd_http_connection *conn)
else {
/* Want to write more */
priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_RESETED;
- event_add (&priv->ev, priv->ptv);
}
return;
@@ -1269,10 +1260,7 @@ rspamd_http_connection_reset (struct rspamd_http_connection *conn)
if (!(priv->flags & RSPAMD_HTTP_CONN_FLAG_RESETED)) {
- if (rspamd_event_pending (&priv->ev, EV_READ|EV_WRITE|EV_TIMEOUT)) {
- event_del (&priv->ev);
- }
-
+ rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
rspamd_http_parser_reset (conn);
}
@@ -1468,7 +1456,7 @@ rspamd_http_connection_free (struct rspamd_http_connection *conn)
static void
rspamd_http_connection_read_message_common (struct rspamd_http_connection *conn,
- gpointer ud, struct timeval *timeout,
+ gpointer ud, ev_tstamp timeout,
gint flags)
{
struct rspamd_http_connection_private *priv = conn->priv;
@@ -1490,42 +1478,30 @@ rspamd_http_connection_read_message_common (struct rspamd_http_connection *conn,
priv->flags |= RSPAMD_HTTP_CONN_FLAG_ENCRYPTED;
}
- if (timeout == NULL) {
- priv->ptv = NULL;
- }
- else {
- memmove (&priv->tv, timeout, sizeof (struct timeval));
- priv->ptv = &priv->tv;
- }
-
+ priv->timeout = timeout;
priv->header = NULL;
priv->buf = g_malloc0 (sizeof (*priv->buf));
REF_INIT_RETAIN (priv->buf, rspamd_http_privbuf_dtor);
priv->buf->data = rspamd_fstring_sized_new (8192);
priv->flags |= RSPAMD_HTTP_CONN_FLAG_NEW_HEADER;
- event_set (&priv->ev,
- conn->fd,
- EV_READ | EV_PERSIST,
- rspamd_http_event_handler,
- conn);
-
- event_base_set (priv->ctx->ev_base, &priv->ev);
+ rspamd_ev_watcher_init (&priv->ev, conn->fd, EV_READ,
+ rspamd_http_event_handler, conn);
+ rspamd_ev_watcher_start (priv->ctx->event_loop, &priv->ev, priv->timeout);
priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_RESETED;
- event_add (&priv->ev, priv->ptv);
}
void
rspamd_http_connection_read_message (struct rspamd_http_connection *conn,
- gpointer ud, struct timeval *timeout)
+ gpointer ud, ev_tstamp timeout)
{
rspamd_http_connection_read_message_common (conn, ud, timeout, 0);
}
void
rspamd_http_connection_read_message_shared (struct rspamd_http_connection *conn,
- gpointer ud, struct timeval *timeout)
+ gpointer ud, ev_tstamp timeout)
{
rspamd_http_connection_read_message_common (conn, ud, timeout,
RSPAMD_HTTP_FLAG_SHMEM);
@@ -1912,7 +1888,7 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn
const gchar *host,
const gchar *mime_type,
gpointer ud,
- struct timeval *timeout,
+ ev_tstamp timeout,
gboolean allow_shared)
{
struct rspamd_http_connection_private *priv = conn->priv;
@@ -1930,14 +1906,7 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn
conn->ud = ud;
priv->msg = msg;
-
- if (timeout == NULL) {
- priv->ptv = NULL;
- }
- else if (timeout != &priv->tv) {
- memcpy (&priv->tv, timeout, sizeof (struct timeval));
- priv->ptv = &priv->tv;
- }
+ priv->timeout = timeout;
priv->header = NULL;
priv->buf = g_malloc0 (sizeof (*priv->buf));
@@ -2224,16 +2193,12 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn
msg->flags &=~ RSPAMD_HTTP_FLAG_SSL;
}
- if (rspamd_event_pending (&priv->ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
- event_del (&priv->ev);
- }
+ rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
if (msg->flags & RSPAMD_HTTP_FLAG_SSL) {
gpointer ssl_ctx = (msg->flags & RSPAMD_HTTP_FLAG_SSL_NOVERIFY) ?
priv->ctx->ssl_ctx_noverify : priv->ctx->ssl_ctx;
- event_base_set (priv->ctx->ev_base, &priv->ev);
-
if (!ssl_ctx) {
err = g_error_new (HTTP_ERROR, errno, "ssl message requested "
"with no ssl ctx");
@@ -2249,12 +2214,12 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn
rspamd_ssl_connection_free (priv->ssl);
}
- priv->ssl = rspamd_ssl_connection_new (ssl_ctx, priv->ctx->ev_base,
+ priv->ssl = rspamd_ssl_connection_new (ssl_ctx, priv->ctx->event_loop,
!(msg->flags & RSPAMD_HTTP_FLAG_SSL_NOVERIFY));
g_assert (priv->ssl != NULL);
if (!rspamd_ssl_connect_fd (priv->ssl, conn->fd, host, &priv->ev,
- priv->ptv, rspamd_http_event_handler,
+ priv->timeout, rspamd_http_event_handler,
rspamd_http_ssl_err_handler, conn)) {
err = g_error_new (HTTP_ERROR, errno,
@@ -2270,10 +2235,9 @@ rspamd_http_connection_write_message_common (struct rspamd_http_connection *conn
}
}
else {
- event_set (&priv->ev, conn->fd, EV_WRITE, rspamd_http_event_handler, conn);
- event_base_set (priv->ctx->ev_base, &priv->ev);
-
- event_add (&priv->ev, priv->ptv);
+ rspamd_ev_watcher_init (&priv->ev, conn->fd, EV_WRITE,
+ rspamd_http_event_handler, conn);
+ rspamd_ev_watcher_start (priv->ctx->event_loop, &priv->ev, priv->timeout);
}
}
@@ -2283,7 +2247,7 @@ rspamd_http_connection_write_message (struct rspamd_http_connection *conn,
const gchar *host,
const gchar *mime_type,
gpointer ud,
- struct timeval *timeout)
+ ev_tstamp timeout)
{
rspamd_http_connection_write_message_common (conn, msg, host, mime_type,
ud, timeout, FALSE);
@@ -2295,7 +2259,7 @@ rspamd_http_connection_write_message_shared (struct rspamd_http_connection *conn
const gchar *host,
const gchar *mime_type,
gpointer ud,
- struct timeval *timeout)
+ ev_tstamp timeout)
{
rspamd_http_connection_write_message_common (conn, msg, host, mime_type,
ud, timeout, TRUE);
diff --git a/src/libutil/http_connection.h b/src/libutil/http_connection.h
index b4b401ecb..fc1303446 100644
--- a/src/libutil/http_connection.h
+++ b/src/libutil/http_connection.h
@@ -221,12 +221,12 @@ gboolean rspamd_http_connection_is_encrypted (struct rspamd_http_connection *con
void rspamd_http_connection_read_message (
struct rspamd_http_connection *conn,
gpointer ud,
- struct timeval *timeout);
+ ev_tstamp timeout);
void rspamd_http_connection_read_message_shared (
struct rspamd_http_connection *conn,
gpointer ud,
- struct timeval *timeout);
+ ev_tstamp timeout);
/**
* Send reply using initialised connection
@@ -241,7 +241,7 @@ void rspamd_http_connection_write_message (
const gchar *host,
const gchar *mime_type,
gpointer ud,
- struct timeval *timeout);
+ ev_tstamp timeout);
void rspamd_http_connection_write_message_shared (
struct rspamd_http_connection *conn,
@@ -249,7 +249,7 @@ void rspamd_http_connection_write_message_shared (
const gchar *host,
const gchar *mime_type,
gpointer ud,
- struct timeval *timeout);
+ ev_tstamp timeout);
/**
* Free connection structure
diff --git a/src/libutil/http_context.c b/src/libutil/http_context.c
index b9add9ac9..f5d766d88 100644
--- a/src/libutil/http_context.c
+++ b/src/libutil/http_context.c
@@ -114,7 +114,7 @@ rspamd_http_context_new_default (struct rspamd_config *cfg,
ctx->ssl_ctx_noverify = rspamd_init_ssl_ctx_noverify ();
}
- ctx->ev_base = ev_base;
+ ctx->event_loop = ev_base;
ctx->keep_alive_hash = kh_init (rspamd_keep_alive_hash);
@@ -186,7 +186,7 @@ rspamd_http_context_init (struct rspamd_http_context *ctx)
ctx->server_kp_cache = rspamd_keypair_cache_new (ctx->config.kp_cache_size_server);
}
- if (ctx->config.client_key_rotate_time > 0 && ctx->ev_base) {
+ if (ctx->config.client_key_rotate_time > 0 && ctx->event_loop) {
struct timeval tv;
double jittered = rspamd_time_jitter (ctx->config.client_key_rotate_time,
0);
@@ -194,7 +194,7 @@ rspamd_http_context_init (struct rspamd_http_context *ctx)
double_to_tv (jittered, &tv);
event_set (&ctx->client_rotate_ev, -1, EV_TIMEOUT,
rspamd_http_context_client_rotate_ev, ctx);
- event_base_set (ctx->ev_base, &ctx->client_rotate_ev);
+ event_base_set (ctx->event_loop, &ctx->client_rotate_ev);
event_add (&ctx->client_rotate_ev, &tv);
}
diff --git a/src/libutil/http_private.h b/src/libutil/http_private.h
index e29152c77..e09dbef40 100644
--- a/src/libutil/http_private.h
+++ b/src/libutil/http_private.h
@@ -100,8 +100,8 @@ struct rspamd_http_context {
struct upstream_list *http_proxies;
gpointer ssl_ctx;
gpointer ssl_ctx_noverify;
- struct ev_loop *ev_base;
- struct event client_rotate_ev;
+ struct ev_loop *event_loop;
+ ev_periodic client_rotate_ev;
khash_t (rspamd_keep_alive_hash) *keep_alive_hash;
};
diff --git a/src/libutil/http_router.c b/src/libutil/http_router.c
index ec0eeb7b4..8d5913f0d 100644
--- a/src/libutil/http_router.c
+++ b/src/libutil/http_router.c
@@ -92,7 +92,7 @@ rspamd_http_router_error_handler (struct rspamd_http_connection *conn,
NULL,
"text/plain",
entry,
- entry->rt->ptv);
+ entry->rt->timeout);
entry->is_reply = TRUE;
}
}
@@ -210,7 +210,7 @@ rspamd_http_router_try_file (struct rspamd_http_connection_entry *entry,
msg_debug ("requested file %s", realbuf);
rspamd_http_connection_write_message (entry->conn, reply_msg, NULL,
rspamd_http_router_detect_ct (realbuf), entry,
- entry->rt->ptv);
+ entry->rt->timeout);
return TRUE;
}
@@ -235,7 +235,7 @@ rspamd_http_router_send_error (GError *err,
NULL,
"text/plain",
entry,
- entry->rt->ptv);
+ entry->rt->timeout);
}
@@ -369,33 +369,25 @@ rspamd_http_router_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_connection_router *
rspamd_http_router_new (rspamd_http_router_error_handler_t eh,
rspamd_http_router_finish_handler_t fh,
- struct timeval *timeout,
+ ev_tstamp timeout,
const char *default_fs_path,
struct rspamd_http_context *ctx)
{
- struct rspamd_http_connection_router * new;
+ struct rspamd_http_connection_router *nrouter;
struct stat st;
- new = g_malloc0 (sizeof (struct rspamd_http_connection_router));
- new->paths = g_hash_table_new_full (rspamd_ftok_icase_hash,
+ nrouter = g_malloc0 (sizeof (struct rspamd_http_connection_router));
+ nrouter->paths = g_hash_table_new_full (rspamd_ftok_icase_hash,
rspamd_ftok_icase_equal, rspamd_fstring_mapped_ftok_free, NULL);
- new->regexps = g_ptr_array_new ();
- new->conns = NULL;
- new->error_handler = eh;
- new->finish_handler = fh;
- new->response_headers = g_hash_table_new_full (rspamd_strcase_hash,
+ nrouter->regexps = g_ptr_array_new ();
+ nrouter->conns = NULL;
+ nrouter->error_handler = eh;
+ nrouter->finish_handler = fh;
+ nrouter->response_headers = g_hash_table_new_full (rspamd_strcase_hash,
rspamd_strcase_equal, g_free, g_free);
- new->ev_base = ctx->ev_base;
-
- if (timeout) {
- new->tv = *timeout;
- new->ptv = &new->tv;
- }
- else {
- new->ptv = NULL;
- }
-
- new->default_fs_path = NULL;
+ nrouter->event_loop = ctx->event_loop;
+ nrouter->timeout = timeout;
+ nrouter->default_fs_path = NULL;
if (default_fs_path != NULL) {
if (stat (default_fs_path, &st) == -1) {
@@ -406,14 +398,14 @@ rspamd_http_router_new (rspamd_http_router_error_handler_t eh,
msg_err ("path %s is not a directory", default_fs_path);
}
else {
- new->default_fs_path = realpath (default_fs_path, NULL);
+ nrouter->default_fs_path = realpath (default_fs_path, NULL);
}
}
}
- new->ctx = ctx;
+ nrouter->ctx = ctx;
- return new;
+ return nrouter;
}
void
@@ -517,7 +509,7 @@ rspamd_http_router_handle_socket (struct rspamd_http_connection_router *router,
rspamd_http_connection_set_key (conn->conn, router->key);
}
- rspamd_http_connection_read_message (conn->conn, conn, router->ptv);
+ rspamd_http_connection_read_message (conn->conn, conn, router->timeout);
DL_PREPEND (router->conns, conn);
}
diff --git a/src/libutil/http_router.h b/src/libutil/http_router.h
index 03886707a..b946067b7 100644
--- a/src/libutil/http_router.h
+++ b/src/libutil/http_router.h
@@ -44,9 +44,8 @@ struct rspamd_http_connection_router {
GHashTable *paths;
GHashTable *response_headers;
GPtrArray *regexps;
- struct timeval tv;
- struct timeval *ptv;
- struct ev_loop *ev_base;
+ ev_tstamp timeout;
+ struct ev_loop *event_loop;
struct rspamd_http_context *ctx;
gchar *default_fs_path;
rspamd_http_router_handler_t unknown_method_handler;
@@ -66,7 +65,7 @@ struct rspamd_http_connection_router {
struct rspamd_http_connection_router * rspamd_http_router_new (
rspamd_http_router_error_handler_t eh,
rspamd_http_router_finish_handler_t fh,
- struct timeval *timeout,
+ ev_tstamp timeout,
const char *default_fs_path,
struct rspamd_http_context *ctx);
diff --git a/src/libutil/libev_helper.c b/src/libutil/libev_helper.c
index a0a0c509b..ac0f1fc0d 100644
--- a/src/libutil/libev_helper.c
+++ b/src/libutil/libev_helper.c
@@ -65,16 +65,26 @@ rspamd_ev_watcher_start (struct ev_loop *loop,
ev_tstamp timeout)
{
ev->last_activity = ev_now (EV_A);
- ev_timer_set (&ev->tm, timeout, 0.0);
ev_io_start (EV_A_ &ev->io);
- ev_timer_start (EV_A_ &ev->tm);
+
+ if (timeout > 0) {
+ ev->timeout = timeout;
+ ev_timer_set (&ev->tm, timeout, 0.0);
+ ev_timer_start (EV_A_ &ev->tm);
+ }
}
void
rspamd_ev_watcher_stop (struct ev_loop *loop,
struct rspamd_io_ev *ev)
{
- ev_io_stop (EV_A_ &ev->io);
+ if (ev_is_pending (&ev->io) || ev_is_active (&ev->io)) {
+ ev_io_stop (EV_A_ &ev->io);
+ }
+
+ if (ev->timeout > 0) {
+ ev_timer_stop (EV_A_ &ev->tm);
+ }
}
void
diff --git a/src/libutil/map_private.h b/src/libutil/map_private.h
index f7e0649e5..8b45881b6 100644
--- a/src/libutil/map_private.h
+++ b/src/libutil/map_private.h
@@ -61,7 +61,7 @@ struct file_map_data {
struct http_map_data;
struct rspamd_http_map_cached_cbdata {
- struct event timeout;
+ ev_periodic timeout;
struct rspamd_storage_shmem *shm;
struct rspamd_map *map;
struct http_map_data *data;
diff --git a/src/libutil/ssl_util.c b/src/libutil/ssl_util.c
index b417efb74..0fe6cc625 100644
--- a/src/libutil/ssl_util.c
+++ b/src/libutil/ssl_util.c
@@ -45,9 +45,8 @@ struct rspamd_ssl_connection {
gboolean verify_peer;
SSL *ssl;
gchar *hostname;
- struct event *ev;
- struct ev_loop *ev_base;
- struct timeval *tv;
+ struct rspamd_io_ev *ev;
+ struct ev_loop *event_loop;
rspamd_ssl_handler_t handler;
rspamd_ssl_error_handler_t err_handler;
gpointer handler_data;
@@ -407,7 +406,7 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud)
gint ret;
GError *err = NULL;
- if (what == EV_TIMEOUT) {
+ if (what == EV_TIMER) {
c->shut = ssl_shut_unclean;
}
@@ -417,7 +416,7 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud)
ret = SSL_connect (c->ssl);
if (ret == 1) {
- event_del (c->ev);
+ rspamd_ev_watcher_stop (c->event_loop, c->ev);
/* Verify certificate */
if ((!c->verify_peer) || rspamd_ssl_peer_verify (c)) {
c->state = ssl_conn_connected;
@@ -443,30 +442,18 @@ rspamd_ssl_event_handler (gint fd, short what, gpointer ud)
return;
}
- event_del (c->ev);
- event_set (c->ev, fd, what, rspamd_ssl_event_handler, c);
- event_base_set (c->ev_base, c->ev);
- event_add (c->ev, c->tv);
+ rspamd_ev_watcher_reschedule (c->event_loop, c->ev, what);
+
}
break;
case ssl_next_read:
- event_del (c->ev);
- /* Restore handler */
- event_set (c->ev, c->fd, EV_READ|EV_PERSIST,
- c->handler, c->handler_data);
- event_base_set (c->ev_base, c->ev);
- event_add (c->ev, c->tv);
+ rspamd_ev_watcher_reschedule (c->event_loop, c->ev, EV_READ);
c->state = ssl_conn_connected;
c->handler (fd, EV_READ, c->handler_data);
break;
case ssl_next_write:
case ssl_conn_connected:
- event_del (c->ev);
- /* Restore handler */
- event_set (c->ev, c->fd, EV_WRITE,
- c->handler, c->handler_data);
- event_base_set (c->ev_base, c->ev);
- event_add (c->ev, c->tv);
+ rspamd_ev_watcher_reschedule (c->event_loop, c->ev, EV_WRITE);
c->state = ssl_conn_connected;
c->handler (fd, EV_WRITE, c->handler_data);
break;
@@ -488,7 +475,7 @@ rspamd_ssl_connection_new (gpointer ssl_ctx, struct ev_loop *ev_base,
g_assert (ssl_ctx != NULL);
c = g_malloc0 (sizeof (*c));
c->ssl = SSL_new (ssl_ctx);
- c->ev_base = ev_base;
+ c->event_loop = ev_base;
c->verify_peer = verify_peer;
return c;
@@ -497,7 +484,7 @@ rspamd_ssl_connection_new (gpointer ssl_ctx, struct ev_loop *ev_base,
gboolean
rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd,
- const gchar *hostname, struct event *ev, struct timeval *tv,
+ const gchar *hostname, struct rspamd_io_ev *ev, ev_tstamp timeout,
rspamd_ssl_handler_t handler, rspamd_ssl_error_handler_t err_handler,
gpointer handler_data)
{
@@ -534,17 +521,9 @@ rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd,
if (ret == 1) {
conn->state = ssl_conn_connected;
- if (rspamd_event_pending (ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
- event_del (ev);
- }
-
- event_set (ev, fd, EV_WRITE, rspamd_ssl_event_handler, conn);
-
- if (conn->ev_base) {
- event_base_set (conn->ev_base, ev);
- }
-
- event_add (ev, tv);
+ rspamd_ev_watcher_stop (conn->event_loop, ev);
+ rspamd_ev_watcher_init (ev, fd, EV_WRITE, rspamd_ssl_event_handler, conn);
+ rspamd_ev_watcher_start (conn->event_loop, ev, timeout);
}
else {
ret = SSL_get_error (conn->ssl, ret);
@@ -561,13 +540,10 @@ rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd,
return FALSE;
}
- if (rspamd_event_pending (ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
- event_del (ev);
- }
-
- event_set (ev, fd, what, rspamd_ssl_event_handler, conn);
- event_base_set (conn->ev_base, ev);
- event_add (ev, tv);
+ rspamd_ev_watcher_stop (conn->event_loop, ev);
+ rspamd_ev_watcher_init (ev, fd, EV_WRITE|EV_READ,
+ rspamd_ssl_event_handler, conn);
+ rspamd_ev_watcher_start (conn->event_loop, ev, timeout);
}
return TRUE;
@@ -638,13 +614,8 @@ rspamd_ssl_read (struct rspamd_ssl_connection *conn, gpointer buf,
return -1;
}
- event_del (conn->ev);
- event_set (conn->ev, conn->fd, what, rspamd_ssl_event_handler, conn);
- event_base_set (conn->ev_base, conn->ev);
- event_add (conn->ev, conn->tv);
-
+ rspamd_ev_watcher_reschedule (conn->event_loop, conn->ev, what);
errno = EAGAIN;
-
}
return -1;
@@ -713,11 +684,7 @@ rspamd_ssl_write (struct rspamd_ssl_connection *conn, gconstpointer buf,
return -1;
}
- event_del (conn->ev);
- event_set (conn->ev, conn->fd, what, rspamd_ssl_event_handler, conn);
- event_base_set (conn->ev_base, conn->ev);
- event_add (conn->ev, conn->tv);
-
+ rspamd_ev_watcher_reschedule (conn->event_loop, conn->ev, what);
errno = EAGAIN;
}
diff --git a/src/libutil/ssl_util.h b/src/libutil/ssl_util.h
index 3bae1edc0..f7f1652de 100644
--- a/src/libutil/ssl_util.h
+++ b/src/libutil/ssl_util.h
@@ -18,6 +18,7 @@
#include "config.h"
#include "libutil/addr.h"
+#include "libutil/libev_helper.h"
struct rspamd_ssl_connection;
@@ -44,7 +45,7 @@ struct rspamd_ssl_connection * rspamd_ssl_connection_new (gpointer ssl_ctx,
* @return TRUE if a session has been connected
*/
gboolean rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd,
- const gchar *hostname, struct event *ev, struct timeval *tv,
+ const gchar *hostname, struct rspamd_io_ev *ev, ev_tstamp timeout,
rspamd_ssl_handler_t handler, rspamd_ssl_error_handler_t err_handler,
gpointer handler_data);
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index ed5bc9132..2c91869d6 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -2912,7 +2912,7 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
rspamd_http_connection_ref (entry->conn);
event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
- event_base_set (entry->rt->ev_base, &s->ev);
+ event_base_set (entry->rt->event_loop, &s->ev);
event_add (&s->ev, NULL);
evtimer_set (&s->timev, fuzzy_controller_timer_callback,
@@ -2946,7 +2946,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent,
/* Prepare task */
task = rspamd_task_new (session->wrk, session->cfg, NULL,
- session->lang_det, conn_ent->rt->ev_base);
+ session->lang_det, conn_ent->rt->event_loop);
task->cfg = ctx->cfg;
saved = rspamd_mempool_alloc0 (session->pool, sizeof (gint));
err = rspamd_mempool_alloc0 (session->pool, sizeof (GError *));
diff --git a/src/rspamd.h b/src/rspamd.h
index 375cba13f..4deb2f933 100644
--- a/src/rspamd.h
+++ b/src/rspamd.h
@@ -121,7 +121,7 @@ struct rspamd_worker_signal_cb {
struct rspamd_worker_signal_handler {
gint signo;
gboolean enabled;
- struct event ev;
+ ev_signal ev_sig;
struct ev_loop *base;
struct rspamd_worker *worker;
struct rspamd_worker_signal_cb *cb;