SET(LIBRSPAMDUTILSRC
${CMAKE_CURRENT_SOURCE_DIR}/addr.c
${CMAKE_CURRENT_SOURCE_DIR}/libev_helper.c
- ${CMAKE_CURRENT_SOURCE_DIR}/aio_event.c
${CMAKE_CURRENT_SOURCE_DIR}/bloom.c
${CMAKE_CURRENT_SOURCE_DIR}/expression.c
${CMAKE_CURRENT_SOURCE_DIR}/fstring.c
+++ /dev/null
-/*-
- * Copyright 2016 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "config.h"
-#include "contrib/libev/ev.h"
-#include "aio_event.h"
-#include "rspamd.h"
-#include "unix-std.h"
-
-#ifdef HAVE_SYS_EVENTFD_H
-#include <sys/eventfd.h>
-#endif
-
-#ifdef HAVE_AIO_H
-#include <aio.h>
-#endif
-
-/* Linux syscall numbers */
-#if defined(__i386__)
-# define SYS_io_setup 245
-# define SYS_io_destroy 246
-# define SYS_io_getevents 247
-# define SYS_io_submit 248
-# define SYS_io_cancel 249
-#elif defined(__x86_64__)
-# define SYS_io_setup 206
-# define SYS_io_destroy 207
-# define SYS_io_getevents 208
-# define SYS_io_submit 209
-# define SYS_io_cancel 210
-#else
-# warning \
- "aio is not supported on this platform, please contact author for details"
-# define SYS_io_setup 0
-# define SYS_io_destroy 0
-# define SYS_io_getevents 0
-# define SYS_io_submit 0
-# define SYS_io_cancel 0
-#endif
-
-#define SYS_eventfd 323
-#define MAX_AIO_EV 64
-
-struct io_cbdata {
- gint fd;
- rspamd_aio_cb cb;
- guint64 len;
- gpointer buf;
- gpointer io_buf;
- gpointer ud;
-};
-
-#ifdef LINUX
-
-/* Linux specific mappings and utilities to avoid using of libaio */
-
-typedef unsigned long aio_context_t;
-
-typedef enum io_iocb_cmd {
- IO_CMD_PREAD = 0,
- IO_CMD_PWRITE = 1,
-
- IO_CMD_FSYNC = 2,
- IO_CMD_FDSYNC = 3,
-
- IO_CMD_POLL = 5,
- IO_CMD_NOOP = 6,
-} io_iocb_cmd_t;
-
-#if defined(__LITTLE_ENDIAN)
-#define PADDED(x,y) x, y
-#elif defined(__BIG_ENDIAN)
-#define PADDED(x,y) y, x
-#else
-#error edit for your odd byteorder.
-#endif
-
-/*
- * we always use a 64bit off_t when communicating
- * with userland. its up to libraries to do the
- * proper padding and aio_error abstraction
- */
-
-struct iocb {
- /* these are internal to the kernel/libc. */
- guint64 aio_data; /* data to be returned in event's data */
- guint32 PADDED (aio_key, aio_reserved1);
- /* the kernel sets aio_key to the req # */
-
- /* common fields */
- guint16 aio_lio_opcode; /* see IOCB_CMD_ above */
- gint16 aio_reqprio;
- guint32 aio_fildes;
-
- guint64 aio_buf;
- guint64 aio_nbytes;
- gint64 aio_offset;
-
- /* extra parameters */
- guint64 aio_reserved2; /* TODO: use this for a (struct sigevent *) */
-
- /* flags for the "struct iocb" */
- guint32 aio_flags;
-
- /*
- * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an
- * eventfd to signal AIO readiness to
- */
- guint32 aio_resfd;
-};
-
-struct io_event {
- guint64 data; /* the data field from the iocb */
- guint64 obj; /* what iocb this event came from */
- gint64 res; /* result code for this event */
- gint64 res2; /* secondary result */
-};
-
-/* Linux specific io calls */
-static int
-io_setup (guint nr_reqs, aio_context_t *ctx)
-{
- return syscall (SYS_io_setup, nr_reqs, ctx);
-}
-
-static int
-io_destroy (aio_context_t ctx)
-{
- return syscall (SYS_io_destroy, ctx);
-}
-
-static int
-io_getevents (aio_context_t ctx,
- long min_nr,
- long nr,
- struct io_event *events,
- struct timespec *tmo)
-{
- return syscall (SYS_io_getevents, ctx, min_nr, nr, events, tmo);
-}
-
-static int
-io_submit (aio_context_t ctx, long n, struct iocb **paiocb)
-{
- return syscall (SYS_io_submit, ctx, n, paiocb);
-}
-
-static int
-io_cancel (aio_context_t ctx, struct iocb *iocb, struct io_event *result)
-{
- return syscall (SYS_io_cancel, ctx, iocb, result);
-}
-
-# ifndef HAVE_SYS_EVENTFD_H
-static int
-eventfd (guint initval, guint flags)
-{
- return syscall (SYS_eventfd, initval);
-}
-# endif
-
-#endif
-
-/**
- * AIO context
- */
-struct aio_context {
- struct ev_loop *base;
- gboolean has_aio; /**< Whether we have aio support on a system */
-#ifdef LINUX
- /* Eventfd variant */
- gint event_fd;
- struct event eventfd_ev;
- aio_context_t io_ctx;
-#elif defined(HAVE_AIO_H)
- /* POSIX aio */
- struct event rtsigs[128];
-#endif
-};
-
-#ifdef LINUX
-/* Eventfd read callback */
-static void
-rspamd_eventfdcb (gint fd, gshort what, gpointer ud)
-{
- struct aio_context *ctx = ud;
- guint64 ready;
- gint done, i;
- struct io_event event[32];
- struct timespec ts;
- struct io_cbdata *ev_data;
-
- /* Eventfd returns number of events ready got from kernel */
- if (read (fd, &ready, 8) != 8) {
- if (errno == EAGAIN) {
- return;
- }
- msg_err ("eventfd read returned error: %s", strerror (errno));
- }
-
- ts.tv_sec = 0;
- ts.tv_nsec = 0;
-
- while (ready) {
- /* Get events ready */
- done = io_getevents (ctx->io_ctx, 1, 32, event, &ts);
-
- if (done > 0) {
- ready -= done;
-
- for (i = 0; i < done; i++) {
- ev_data = (struct io_cbdata *) (uintptr_t) event[i].data;
- /* Call this callback */
- ev_data->cb (ev_data->fd,
- event[i].res,
- ev_data->len,
- ev_data->buf,
- ev_data->ud);
- if (ev_data->io_buf) {
- free (ev_data->io_buf);
- }
- g_free (ev_data);
- }
- }
- else if (done == 0) {
- /* No more events are ready */
- return;
- }
- else {
- msg_err ("io_getevents failed: %s", strerror (errno));
- return;
- }
- }
-}
-
-#endif
-
-/**
- * Initialize aio with specified event base
- */
-struct aio_context *
-rspamd_aio_init (struct ev_loop *base)
-{
- struct aio_context *new;
-
- /* First of all we need to detect which type of aio we can try to use */
- new = g_malloc0 (sizeof (struct aio_context));
- new->base = base;
-
-#ifdef LINUX
- /* On linux we are trying to use io (3) and eventfd for notifying */
- new->event_fd = eventfd (0, 0);
- if (new->event_fd == -1) {
- msg_err ("eventfd failed: %s", strerror (errno));
- }
- else {
- /* Set this socket non-blocking */
- if (rspamd_socket_nonblocking (new->event_fd) == -1) {
- msg_err ("non blocking for eventfd failed: %s", strerror (errno));
- close (new->event_fd);
- }
- else {
- event_set (&new->eventfd_ev,
- new->event_fd,
- EV_READ | EV_PERSIST,
- rspamd_eventfdcb,
- new);
- event_base_set (new->base, &new->eventfd_ev);
- event_add (&new->eventfd_ev, NULL);
- if (io_setup (MAX_AIO_EV, &new->io_ctx) == -1) {
- msg_err ("io_setup failed: %s", strerror (errno));
- close (new->event_fd);
- }
- else {
- new->has_aio = TRUE;
- }
- }
- }
-#elif defined(HAVE_AIO_H)
- /* TODO: implement this */
-#endif
-
- return new;
-}
-
-/**
- * Open file for aio
- */
-gint
-rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags)
-{
- gint fd = -1;
- /* Fallback */
- if (!ctx->has_aio) {
- return open (path, flags);
- }
-#ifdef LINUX
-
- fd = open (path, flags | O_DIRECT);
-
- return fd;
-#elif defined(HAVE_AIO_H)
- fd = open (path, flags);
-#endif
-
- return fd;
-}
-
-/**
- * Asynchronous read of file
- */
-gint
-rspamd_aio_read (gint fd,
- gpointer buf,
- guint64 len,
- guint64 offset,
- struct aio_context *ctx,
- rspamd_aio_cb cb,
- gpointer ud)
-{
- gint r = -1;
-
- if (ctx->has_aio) {
-#ifdef LINUX
- struct iocb *iocb[1];
- struct io_cbdata *cbdata;
-
- cbdata = g_malloc0 (sizeof (struct io_cbdata));
- cbdata->cb = cb;
- cbdata->buf = buf;
- cbdata->len = len;
- cbdata->ud = ud;
- cbdata->fd = fd;
- cbdata->io_buf = NULL;
-
- iocb[0] = alloca (sizeof (struct iocb));
- memset (iocb[0], 0, sizeof (struct iocb));
- iocb[0]->aio_fildes = fd;
- iocb[0]->aio_lio_opcode = IO_CMD_PREAD;
- iocb[0]->aio_reqprio = 0;
- iocb[0]->aio_buf = (guint64)((uintptr_t)buf);
- iocb[0]->aio_nbytes = len;
- iocb[0]->aio_offset = offset;
- iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
- iocb[0]->aio_resfd = ctx->event_fd;
- iocb[0]->aio_data = (guint64)((uintptr_t)cbdata);
-
- /* Iocb is copied to kernel internally, so it is safe to put it on stack */
- if (io_submit (ctx->io_ctx, 1, iocb) == 1) {
- return len;
- }
- else {
- if (errno == EAGAIN || errno == ENOSYS) {
- /* Fall back to sync read */
- goto blocking;
- }
- return -1;
- }
-
-#elif defined(HAVE_AIO_H)
-#endif
- }
- else {
- /* Blocking variant */
- goto blocking;
-blocking:
-#ifdef _LARGEFILE64_SOURCE
- r = lseek64 (fd, offset, SEEK_SET);
-#else
- r = lseek (fd, offset, SEEK_SET);
-#endif
- if (r > 0) {
- r = read (fd, buf, len);
- if (r >= 0) {
- cb (fd, 0, r, buf, ud);
- }
- else {
- cb (fd, r, -1, buf, ud);
- }
- }
- }
-
- return r;
-}
-
-/**
- * Asynchronous write of file
- */
-gint
-rspamd_aio_write (gint fd,
- gpointer buf,
- guint64 len,
- guint64 offset,
- struct aio_context *ctx,
- rspamd_aio_cb cb,
- gpointer ud)
-{
- gint r = -1;
-
- if (ctx->has_aio) {
-#ifdef LINUX
- struct iocb *iocb[1];
- struct io_cbdata *cbdata;
-
- cbdata = g_malloc0 (sizeof (struct io_cbdata));
- cbdata->cb = cb;
- cbdata->buf = buf;
- cbdata->len = len;
- cbdata->ud = ud;
- cbdata->fd = fd;
- /* We need to align pointer on boundary of 512 bytes here */
- if (posix_memalign (&cbdata->io_buf, 512, len) != 0) {
- return -1;
- }
- memcpy (cbdata->io_buf, buf, len);
-
- iocb[0] = alloca (sizeof (struct iocb));
- memset (iocb[0], 0, sizeof (struct iocb));
- iocb[0]->aio_fildes = fd;
- iocb[0]->aio_lio_opcode = IO_CMD_PWRITE;
- iocb[0]->aio_reqprio = 0;
- iocb[0]->aio_buf = (guint64)((uintptr_t)cbdata->io_buf);
- iocb[0]->aio_nbytes = len;
- iocb[0]->aio_offset = offset;
- iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
- iocb[0]->aio_resfd = ctx->event_fd;
- iocb[0]->aio_data = (guint64)((uintptr_t)cbdata);
-
- /* Iocb is copied to kernel internally, so it is safe to put it on stack */
- if (io_submit (ctx->io_ctx, 1, iocb) == 1) {
- return len;
- }
- else {
- if (errno == EAGAIN || errno == ENOSYS) {
- /* Fall back to sync read */
- goto blocking;
- }
- return -1;
- }
-
-#elif defined(HAVE_AIO_H)
-#endif
- }
- else {
- /* Blocking variant */
- goto blocking;
-blocking:
-#ifdef _LARGEFILE64_SOURCE
- r = lseek64 (fd, offset, SEEK_SET);
-#else
- r = lseek (fd, offset, SEEK_SET);
-#endif
- if (r > 0) {
- r = write (fd, buf, len);
- if (r >= 0) {
- cb (fd, 0, r, buf, ud);
- }
- else {
- cb (fd, r, -1, buf, ud);
- }
- }
- }
-
- return r;
-}
-
-/**
- * Close of aio operations
- */
-gint
-rspamd_aio_close (gint fd, struct aio_context *ctx)
-{
- gint r = -1;
-
- if (ctx->has_aio) {
-#ifdef LINUX
- struct iocb iocb;
- struct io_event ev;
-
- memset (&iocb, 0, sizeof (struct iocb));
- iocb.aio_fildes = fd;
- iocb.aio_lio_opcode = IO_CMD_NOOP;
-
- /* Iocb is copied to kernel internally, so it is safe to put it on stack */
- r = io_cancel (ctx->io_ctx, &iocb, &ev);
- close (fd);
- return r;
-
-#elif defined(HAVE_AIO_H)
-#endif
- }
-
- r = close (fd);
-
- return r;
-}
+++ /dev/null
-/*-
- * Copyright 2016 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef AIO_EVENT_H_
-#define AIO_EVENT_H_
-
-#include "config.h"
-
-/**
- * AIO context
- */
-struct aio_context;
-
-/**
- * Callback for notifying
- */
-typedef void (*rspamd_aio_cb) (gint fd, gint res, guint64 len, gpointer data,
- gpointer ud);
-
-/**
- * Initialize aio with specified event base
- */
-struct aio_context * rspamd_aio_init (struct ev_loop *base);
-
-/**
- * Open file for aio
- */
-gint rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags);
-
-/**
- * Asynchronous read of file
- */
-gint rspamd_aio_read (gint fd, gpointer buf, guint64 len, guint64 offset,
- struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud);
-
-/**
- * Asynchronous write of file
- */
-gint rspamd_aio_write (gint fd, gpointer buf, guint64 len, guint64 offset,
- struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud);
-
-/**
- * Close of aio operations
- */
-gint rspamd_aio_close (gint fd, struct aio_context *ctx);
-
-#endif /* AIO_EVENT_H_ */
#include "ottery.h"
#include "keypair_private.h"
#include "cryptobox.h"
+#include "libutil/libev_helper.h"
#include "libutil/ssl_util.h"
#include "libserver/url.h"
struct rspamd_http_header *header;
struct http_parser parser;
struct http_parser_settings parser_cb;
- struct event ev;
- struct timeval tv;
- struct timeval *ptv;
+ struct rspamd_io_ev ev;
+ ev_tstamp timeout;
struct rspamd_http_message *msg;
struct iovec *out;
guint outlen;
if (msg->method == HTTP_HEAD) {
/* We don't care about the rest */
- if (rspamd_event_pending (&priv->ev, EV_READ)) {
- event_del (&priv->ev);
- }
+ rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
msg->code = parser->status_code;
rspamd_http_connection_ref (conn);
if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) {
rspamd_http_context_push_keepalive (conn->priv->ctx, conn,
- msg, conn->priv->ctx->ev_base);
+ msg, conn->priv->ctx->event_loop);
rspamd_http_connection_reset (conn);
}
else {
if (msg->method == HTTP_HEAD) {
/* We don't care about the rest */
- if (rspamd_event_pending (&priv->ev, EV_READ)) {
- event_del (&priv->ev);
- }
-
+ rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
msg->code = parser->status_code;
rspamd_http_connection_ref (conn);
ret = conn->finish_handler (conn, msg);
if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) {
rspamd_http_context_push_keepalive (conn->priv->ctx, conn,
- msg, conn->priv->ctx->ev_base);
+ msg, conn->priv->ctx->event_loop);
rspamd_http_connection_reset (conn);
}
else {
}
if (ret == 0) {
- if (rspamd_event_pending (&priv->ev, EV_READ)) {
- event_del (&priv->ev);
- }
-
+ rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
rspamd_http_connection_ref (conn);
ret = conn->finish_handler (conn, priv->msg);
if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) {
rspamd_http_context_push_keepalive (conn->priv->ctx, conn,
- priv->msg, conn->priv->ctx->ev_base);
+ priv->msg, conn->priv->ctx->event_loop);
rspamd_http_connection_reset (conn);
}
else {
if (conn->opts & RSPAMD_HTTP_CLIENT_SHARED) {
rspamd_http_connection_read_message_shared (conn, conn->ud,
- conn->priv->ptv);
+ conn->priv->timeout);
}
else {
rspamd_http_connection_read_message (conn, conn->ud,
- conn->priv->ptv);
+ conn->priv->timeout);
}
if (priv->msg) {
else {
/* Want to write more */
priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_RESETED;
- event_add (&priv->ev, priv->ptv);
}
return;
if (!(priv->flags & RSPAMD_HTTP_CONN_FLAG_RESETED)) {
- if (rspamd_event_pending (&priv->ev, EV_READ|EV_WRITE|EV_TIMEOUT)) {
- event_del (&priv->ev);
- }
-
+ rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
rspamd_http_parser_reset (conn);
}
static void
rspamd_http_connection_read_message_common (struct rspamd_http_connection *conn,
- gpointer ud, struct timeval *timeout,
+ gpointer ud, ev_tstamp timeout,
gint flags)
{
struct rspamd_http_connection_private *priv = conn->priv;
priv->flags |= RSPAMD_HTTP_CONN_FLAG_ENCRYPTED;
}
- if (timeout == NULL) {
- priv->ptv = NULL;
- }
- else {
- memmove (&priv->tv, timeout, sizeof (struct timeval));
- priv->ptv = &priv->tv;
- }
-
+ priv->timeout = timeout;
priv->header = NULL;
priv->buf = g_malloc0 (sizeof (*priv->buf));
REF_INIT_RETAIN (priv->buf, rspamd_http_privbuf_dtor);
priv->buf->data = rspamd_fstring_sized_new (8192);
priv->flags |= RSPAMD_HTTP_CONN_FLAG_NEW_HEADER;
- event_set (&priv->ev,
- conn->fd,
- EV_READ | EV_PERSIST,
- rspamd_http_event_handler,
- conn);
-
- event_base_set (priv->ctx->ev_base, &priv->ev);
+ rspamd_ev_watcher_init (&priv->ev, conn->fd, EV_READ,
+ rspamd_http_event_handler, conn);
+ rspamd_ev_watcher_start (priv->ctx->event_loop, &priv->ev, priv->timeout);
priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_RESETED;
- event_add (&priv->ev, priv->ptv);
}
void
rspamd_http_connection_read_message (struct rspamd_http_connection *conn,
- gpointer ud, struct timeval *timeout)
+ gpointer ud, ev_tstamp timeout)
{
rspamd_http_connection_read_message_common (conn, ud, timeout, 0);
}
void
rspamd_http_connection_read_message_shared (struct rspamd_http_connection *conn,
- gpointer ud, struct timeval *timeout)
+ gpointer ud, ev_tstamp timeout)
{
rspamd_http_connection_read_message_common (conn, ud, timeout,
RSPAMD_HTTP_FLAG_SHMEM);
const gchar *host,
const gchar *mime_type,
gpointer ud,
- struct timeval *timeout,
+ ev_tstamp timeout,
gboolean allow_shared)
{
struct rspamd_http_connection_private *priv = conn->priv;
conn->ud = ud;
priv->msg = msg;
-
- if (timeout == NULL) {
- priv->ptv = NULL;
- }
- else if (timeout != &priv->tv) {
- memcpy (&priv->tv, timeout, sizeof (struct timeval));
- priv->ptv = &priv->tv;
- }
+ priv->timeout = timeout;
priv->header = NULL;
priv->buf = g_malloc0 (sizeof (*priv->buf));
msg->flags &=~ RSPAMD_HTTP_FLAG_SSL;
}
- if (rspamd_event_pending (&priv->ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
- event_del (&priv->ev);
- }
+ rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
if (msg->flags & RSPAMD_HTTP_FLAG_SSL) {
gpointer ssl_ctx = (msg->flags & RSPAMD_HTTP_FLAG_SSL_NOVERIFY) ?
priv->ctx->ssl_ctx_noverify : priv->ctx->ssl_ctx;
- event_base_set (priv->ctx->ev_base, &priv->ev);
-
if (!ssl_ctx) {
err = g_error_new (HTTP_ERROR, errno, "ssl message requested "
"with no ssl ctx");
rspamd_ssl_connection_free (priv->ssl);
}
- priv->ssl = rspamd_ssl_connection_new (ssl_ctx, priv->ctx->ev_base,
+ priv->ssl = rspamd_ssl_connection_new (ssl_ctx, priv->ctx->event_loop,
!(msg->flags & RSPAMD_HTTP_FLAG_SSL_NOVERIFY));
g_assert (priv->ssl != NULL);
if (!rspamd_ssl_connect_fd (priv->ssl, conn->fd, host, &priv->ev,
- priv->ptv, rspamd_http_event_handler,
+ priv->timeout, rspamd_http_event_handler,
rspamd_http_ssl_err_handler, conn)) {
err = g_error_new (HTTP_ERROR, errno,
}
}
else {
- event_set (&priv->ev, conn->fd, EV_WRITE, rspamd_http_event_handler, conn);
- event_base_set (priv->ctx->ev_base, &priv->ev);
-
- event_add (&priv->ev, priv->ptv);
+ rspamd_ev_watcher_init (&priv->ev, conn->fd, EV_WRITE,
+ rspamd_http_event_handler, conn);
+ rspamd_ev_watcher_start (priv->ctx->event_loop, &priv->ev, priv->timeout);
}
}
const gchar *host,
const gchar *mime_type,
gpointer ud,
- struct timeval *timeout)
+ ev_tstamp timeout)
{
rspamd_http_connection_write_message_common (conn, msg, host, mime_type,
ud, timeout, FALSE);
const gchar *host,
const gchar *mime_type,
gpointer ud,
- struct timeval *timeout)
+ ev_tstamp timeout)
{
rspamd_http_connection_write_message_common (conn, msg, host, mime_type,
ud, timeout, TRUE);
void rspamd_http_connection_read_message (
struct rspamd_http_connection *conn,
gpointer ud,
- struct timeval *timeout);
+ ev_tstamp timeout);
void rspamd_http_connection_read_message_shared (
struct rspamd_http_connection *conn,
gpointer ud,
- struct timeval *timeout);
+ ev_tstamp timeout);
/**
* Send reply using initialised connection
const gchar *host,
const gchar *mime_type,
gpointer ud,
- struct timeval *timeout);
+ ev_tstamp timeout);
void rspamd_http_connection_write_message_shared (
struct rspamd_http_connection *conn,
const gchar *host,
const gchar *mime_type,
gpointer ud,
- struct timeval *timeout);
+ ev_tstamp timeout);
/**
* Free connection structure
ctx->ssl_ctx_noverify = rspamd_init_ssl_ctx_noverify ();
}
- ctx->ev_base = ev_base;
+ ctx->event_loop = ev_base;
ctx->keep_alive_hash = kh_init (rspamd_keep_alive_hash);
ctx->server_kp_cache = rspamd_keypair_cache_new (ctx->config.kp_cache_size_server);
}
- if (ctx->config.client_key_rotate_time > 0 && ctx->ev_base) {
+ if (ctx->config.client_key_rotate_time > 0 && ctx->event_loop) {
struct timeval tv;
double jittered = rspamd_time_jitter (ctx->config.client_key_rotate_time,
0);
double_to_tv (jittered, &tv);
event_set (&ctx->client_rotate_ev, -1, EV_TIMEOUT,
rspamd_http_context_client_rotate_ev, ctx);
- event_base_set (ctx->ev_base, &ctx->client_rotate_ev);
+ event_base_set (ctx->event_loop, &ctx->client_rotate_ev);
event_add (&ctx->client_rotate_ev, &tv);
}
struct upstream_list *http_proxies;
gpointer ssl_ctx;
gpointer ssl_ctx_noverify;
- struct ev_loop *ev_base;
- struct event client_rotate_ev;
+ struct ev_loop *event_loop;
+ ev_periodic client_rotate_ev;
khash_t (rspamd_keep_alive_hash) *keep_alive_hash;
};
NULL,
"text/plain",
entry,
- entry->rt->ptv);
+ entry->rt->timeout);
entry->is_reply = TRUE;
}
}
msg_debug ("requested file %s", realbuf);
rspamd_http_connection_write_message (entry->conn, reply_msg, NULL,
rspamd_http_router_detect_ct (realbuf), entry,
- entry->rt->ptv);
+ entry->rt->timeout);
return TRUE;
}
NULL,
"text/plain",
entry,
- entry->rt->ptv);
+ entry->rt->timeout);
}
struct rspamd_http_connection_router *
rspamd_http_router_new (rspamd_http_router_error_handler_t eh,
rspamd_http_router_finish_handler_t fh,
- struct timeval *timeout,
+ ev_tstamp timeout,
const char *default_fs_path,
struct rspamd_http_context *ctx)
{
- struct rspamd_http_connection_router * new;
+ struct rspamd_http_connection_router *nrouter;
struct stat st;
- new = g_malloc0 (sizeof (struct rspamd_http_connection_router));
- new->paths = g_hash_table_new_full (rspamd_ftok_icase_hash,
+ nrouter = g_malloc0 (sizeof (struct rspamd_http_connection_router));
+ nrouter->paths = g_hash_table_new_full (rspamd_ftok_icase_hash,
rspamd_ftok_icase_equal, rspamd_fstring_mapped_ftok_free, NULL);
- new->regexps = g_ptr_array_new ();
- new->conns = NULL;
- new->error_handler = eh;
- new->finish_handler = fh;
- new->response_headers = g_hash_table_new_full (rspamd_strcase_hash,
+ nrouter->regexps = g_ptr_array_new ();
+ nrouter->conns = NULL;
+ nrouter->error_handler = eh;
+ nrouter->finish_handler = fh;
+ nrouter->response_headers = g_hash_table_new_full (rspamd_strcase_hash,
rspamd_strcase_equal, g_free, g_free);
- new->ev_base = ctx->ev_base;
-
- if (timeout) {
- new->tv = *timeout;
- new->ptv = &new->tv;
- }
- else {
- new->ptv = NULL;
- }
-
- new->default_fs_path = NULL;
+ nrouter->event_loop = ctx->event_loop;
+ nrouter->timeout = timeout;
+ nrouter->default_fs_path = NULL;
if (default_fs_path != NULL) {
if (stat (default_fs_path, &st) == -1) {
msg_err ("path %s is not a directory", default_fs_path);
}
else {
- new->default_fs_path = realpath (default_fs_path, NULL);
+ nrouter->default_fs_path = realpath (default_fs_path, NULL);
}
}
}
- new->ctx = ctx;
+ nrouter->ctx = ctx;
- return new;
+ return nrouter;
}
void
rspamd_http_connection_set_key (conn->conn, router->key);
}
- rspamd_http_connection_read_message (conn->conn, conn, router->ptv);
+ rspamd_http_connection_read_message (conn->conn, conn, router->timeout);
DL_PREPEND (router->conns, conn);
}
GHashTable *paths;
GHashTable *response_headers;
GPtrArray *regexps;
- struct timeval tv;
- struct timeval *ptv;
- struct ev_loop *ev_base;
+ ev_tstamp timeout;
+ struct ev_loop *event_loop;
struct rspamd_http_context *ctx;
gchar *default_fs_path;
rspamd_http_router_handler_t unknown_method_handler;
struct rspamd_http_connection_router * rspamd_http_router_new (
rspamd_http_router_error_handler_t eh,
rspamd_http_router_finish_handler_t fh,
- struct timeval *timeout,
+ ev_tstamp timeout,
const char *default_fs_path,
struct rspamd_http_context *ctx);
ev_tstamp timeout)
{
ev->last_activity = ev_now (EV_A);
- ev_timer_set (&ev->tm, timeout, 0.0);
ev_io_start (EV_A_ &ev->io);
- ev_timer_start (EV_A_ &ev->tm);
+
+ if (timeout > 0) {
+ ev->timeout = timeout;
+ ev_timer_set (&ev->tm, timeout, 0.0);
+ ev_timer_start (EV_A_ &ev->tm);
+ }
}
void
rspamd_ev_watcher_stop (struct ev_loop *loop,
struct rspamd_io_ev *ev)
{
- ev_io_stop (EV_A_ &ev->io);
+ if (ev_is_pending (&ev->io) || ev_is_active (&ev->io)) {
+ ev_io_stop (EV_A_ &ev->io);
+ }
+
+ if (ev->timeout > 0) {
+ ev_timer_stop (EV_A_ &ev->tm);
+ }
}
void
struct http_map_data;
struct rspamd_http_map_cached_cbdata {
- struct event timeout;
+ ev_periodic timeout;
struct rspamd_storage_shmem *shm;
struct rspamd_map *map;
struct http_map_data *data;
gboolean verify_peer;
SSL *ssl;
gchar *hostname;
- struct event *ev;
- struct ev_loop *ev_base;
- struct timeval *tv;
+ struct rspamd_io_ev *ev;
+ struct ev_loop *event_loop;
rspamd_ssl_handler_t handler;
rspamd_ssl_error_handler_t err_handler;
gpointer handler_data;
gint ret;
GError *err = NULL;
- if (what == EV_TIMEOUT) {
+ if (what == EV_TIMER) {
c->shut = ssl_shut_unclean;
}
ret = SSL_connect (c->ssl);
if (ret == 1) {
- event_del (c->ev);
+ rspamd_ev_watcher_stop (c->event_loop, c->ev);
/* Verify certificate */
if ((!c->verify_peer) || rspamd_ssl_peer_verify (c)) {
c->state = ssl_conn_connected;
return;
}
- event_del (c->ev);
- event_set (c->ev, fd, what, rspamd_ssl_event_handler, c);
- event_base_set (c->ev_base, c->ev);
- event_add (c->ev, c->tv);
+ rspamd_ev_watcher_reschedule (c->event_loop, c->ev, what);
+
}
break;
case ssl_next_read:
- event_del (c->ev);
- /* Restore handler */
- event_set (c->ev, c->fd, EV_READ|EV_PERSIST,
- c->handler, c->handler_data);
- event_base_set (c->ev_base, c->ev);
- event_add (c->ev, c->tv);
+ rspamd_ev_watcher_reschedule (c->event_loop, c->ev, EV_READ);
c->state = ssl_conn_connected;
c->handler (fd, EV_READ, c->handler_data);
break;
case ssl_next_write:
case ssl_conn_connected:
- event_del (c->ev);
- /* Restore handler */
- event_set (c->ev, c->fd, EV_WRITE,
- c->handler, c->handler_data);
- event_base_set (c->ev_base, c->ev);
- event_add (c->ev, c->tv);
+ rspamd_ev_watcher_reschedule (c->event_loop, c->ev, EV_WRITE);
c->state = ssl_conn_connected;
c->handler (fd, EV_WRITE, c->handler_data);
break;
g_assert (ssl_ctx != NULL);
c = g_malloc0 (sizeof (*c));
c->ssl = SSL_new (ssl_ctx);
- c->ev_base = ev_base;
+ c->event_loop = ev_base;
c->verify_peer = verify_peer;
return c;
gboolean
rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd,
- const gchar *hostname, struct event *ev, struct timeval *tv,
+ const gchar *hostname, struct rspamd_io_ev *ev, ev_tstamp timeout,
rspamd_ssl_handler_t handler, rspamd_ssl_error_handler_t err_handler,
gpointer handler_data)
{
if (ret == 1) {
conn->state = ssl_conn_connected;
- if (rspamd_event_pending (ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
- event_del (ev);
- }
-
- event_set (ev, fd, EV_WRITE, rspamd_ssl_event_handler, conn);
-
- if (conn->ev_base) {
- event_base_set (conn->ev_base, ev);
- }
-
- event_add (ev, tv);
+ rspamd_ev_watcher_stop (conn->event_loop, ev);
+ rspamd_ev_watcher_init (ev, fd, EV_WRITE, rspamd_ssl_event_handler, conn);
+ rspamd_ev_watcher_start (conn->event_loop, ev, timeout);
}
else {
ret = SSL_get_error (conn->ssl, ret);
return FALSE;
}
- if (rspamd_event_pending (ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
- event_del (ev);
- }
-
- event_set (ev, fd, what, rspamd_ssl_event_handler, conn);
- event_base_set (conn->ev_base, ev);
- event_add (ev, tv);
+ rspamd_ev_watcher_stop (conn->event_loop, ev);
+ rspamd_ev_watcher_init (ev, fd, EV_WRITE|EV_READ,
+ rspamd_ssl_event_handler, conn);
+ rspamd_ev_watcher_start (conn->event_loop, ev, timeout);
}
return TRUE;
return -1;
}
- event_del (conn->ev);
- event_set (conn->ev, conn->fd, what, rspamd_ssl_event_handler, conn);
- event_base_set (conn->ev_base, conn->ev);
- event_add (conn->ev, conn->tv);
-
+ rspamd_ev_watcher_reschedule (conn->event_loop, conn->ev, what);
errno = EAGAIN;
-
}
return -1;
return -1;
}
- event_del (conn->ev);
- event_set (conn->ev, conn->fd, what, rspamd_ssl_event_handler, conn);
- event_base_set (conn->ev_base, conn->ev);
- event_add (conn->ev, conn->tv);
-
+ rspamd_ev_watcher_reschedule (conn->event_loop, conn->ev, what);
errno = EAGAIN;
}
#include "config.h"
#include "libutil/addr.h"
+#include "libutil/libev_helper.h"
struct rspamd_ssl_connection;
* @return TRUE if a session has been connected
*/
gboolean rspamd_ssl_connect_fd (struct rspamd_ssl_connection *conn, gint fd,
- const gchar *hostname, struct event *ev, struct timeval *tv,
+ const gchar *hostname, struct rspamd_io_ev *ev, ev_tstamp timeout,
rspamd_ssl_handler_t handler, rspamd_ssl_error_handler_t err_handler,
gpointer handler_data);
rspamd_http_connection_ref (entry->conn);
event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
- event_base_set (entry->rt->ev_base, &s->ev);
+ event_base_set (entry->rt->event_loop, &s->ev);
event_add (&s->ev, NULL);
evtimer_set (&s->timev, fuzzy_controller_timer_callback,
/* Prepare task */
task = rspamd_task_new (session->wrk, session->cfg, NULL,
- session->lang_det, conn_ent->rt->ev_base);
+ session->lang_det, conn_ent->rt->event_loop);
task->cfg = ctx->cfg;
saved = rspamd_mempool_alloc0 (session->pool, sizeof (gint));
err = rspamd_mempool_alloc0 (session->pool, sizeof (GError *));
struct rspamd_worker_signal_handler {
gint signo;
gboolean enabled;
- struct event ev;
+ ev_signal ev_sig;
struct ev_loop *base;
struct rspamd_worker *worker;
struct rspamd_worker_signal_cb *cb;