]> source.dussan.org Git - rspamd.git/commitdiff
* Add AIO framework for linux io(3) interface.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 16 Jan 2012 13:36:22 +0000 (17:36 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 16 Jan 2012 13:36:22 +0000 (17:36 +0400)
CMakeLists.txt
config.h.in
lib/CMakeLists.txt
src/aio_event.c [new file with mode: 0644]
src/aio_event.h [new file with mode: 0644]

index 5eae18c4614c300edbc0f6a7fa8827278709840a..c02fbab9ed26ab28ebd446ed2981d76d58b37044 100644 (file)
@@ -653,6 +653,9 @@ CHECK_INCLUDE_FILES(glob.h HAVE_GLOB_H)
 CHECK_INCLUDE_FILES(poll.h HAVE_POLL_H)
 CHECK_INCLUDE_FILES(sys/sendfile.h HAVE_SYS_SENDFILE_H)
 CHECK_INCLUDE_FILES(linux/falloc.h HAVE_LINUX_FALLOC_H)
+CHECK_INCLUDE_FILES(sys/eventfd.h HAVE_SYS_EVENTFD_H)
+CHECK_INCLUDE_FILES(aio.h HAVE_AIO_H)
+CHECK_INCLUDE_FILES(libaio.h HAVE_LIBAIO_H)
 
 # Some dependencies
 IF(HAVE_SYS_WAIT_H)
@@ -679,6 +682,8 @@ CHECK_FUNCTION_EXISTS(mkstemp HAVE_MKSTEMP)
 CHECK_FUNCTION_EXISTS(setitimer HAVE_SETITIMER)
 CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
 
+# 
+
 # Check macros
 CHECK_SYMBOL_EXISTS(PATH_MAX limits.h HAVE_PATH_MAX)
 CHECK_SYMBOL_EXISTS(MAXPATHLEN sys/param.h HAVE_MAXPATHLEN)
@@ -876,6 +881,7 @@ IF(GMIME24)
 ELSE(GMIME24)
        TARGET_LINK_LIBRARIES(rspamd ${GMIME2_LIBRARIES})
 ENDIF(GMIME24)
+
 IF(ENABLE_STATIC MATCHES "ON")
        TARGET_LINK_LIBRARIES(rspamd ${PCRE_LIBRARIES})
 ENDIF(ENABLE_STATIC MATCHES "ON")
index aeefa9b0ab04099ac59e479dfecae39fc0c228f7..8eedae5c03e1b9ef464cf27197d0da7b85644070 100644 (file)
 
 #cmakedefine HAVE_SENDFILE       1
 #cmakedefine HAVE_SYS_SENDFILE_H 1
+#cmakedefine HAVE_SYS_EVENTFD_H  1
+#cmakedefine HAVE_AIO_H          1
+#cmakedefine HAVE_LIBAIO_H       1
 
 #cmakedefine HAVE_MKSTEMP        1
 
 #define HAVE_SETLOCALE 1
 #endif
 
+#ifdef HAVE_SYS_EVENTFD_H
+#include <sys/eventfd.h>
+#endif
+
+#ifdef HAVE_AIO_H
+#include <aio.h>
+#endif
+
 #ifdef HAVE_SYS_SENDFILE_H
 #include <sys/sendfile.h>
 #endif
index 5d52e1ab974ea4a53663dd4cbb98ca6a67c93b68..0355fe3f910c36088cb30b9d10ef2b9010c582ad 100644 (file)
@@ -35,7 +35,9 @@ INSTALL(TARGETS rspamdclient rspamdclient_static LIBRARY PUBLIC_HEADER
 
     
 # Librspamdserver
-SET(RSPAMDLIBSRC ../src/binlog.c
+SET(RSPAMDLIBSRC
+                               ../src/aio_event.c 
+                               ../src/binlog.c
                 ../src/bloom.c
                                ../src/buffer.c
                                ../src/cfg_utils.c
diff --git a/src/aio_event.c b/src/aio_event.c
new file mode 100644 (file)
index 0000000..465ce4c
--- /dev/null
@@ -0,0 +1,414 @@
+/* Copyright (c) 2010-2011, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *       * Redistributions of source code must retain the above copyright
+ *         notice, this list of conditions and the following disclaimer.
+ *       * Redistributions in binary form must reproduce the above copyright
+ *         notice, this list of conditions and the following disclaimer in the
+ *         documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "aio_event.h"
+#include "main.h"
+
+/* Linux syscall numbers */
+#define SYS_io_setup      245
+#define SYS_io_destroy    246
+#define SYS_io_getevents  247
+#define SYS_io_submit     248
+#define SYS_io_cancel     249
+#define SYS_eventfd       323
+#define MAX_AIO_EV        32768
+
+struct io_cbdata {
+       rspamd_aio_cb cb;
+       gsize len;
+       gpointer buf;
+       gpointer ud;
+};
+
+#ifdef LINUX
+
+/* Linux specific mappings and utilities to avoid using of libaio */
+
+typedef unsigned int aio_context_t;
+
+typedef enum io_iocb_cmd {
+       IO_CMD_PREAD = 0,
+       IO_CMD_PWRITE = 1,
+
+       IO_CMD_FSYNC = 2,
+       IO_CMD_FDSYNC = 3,
+
+       IO_CMD_POLL = 5,
+       IO_CMD_NOOP = 6,
+} io_iocb_cmd_t;
+
+struct io_iocb_common {
+       void *buf;
+       unsigned __pad1;
+       long nbytes;
+       unsigned __pad2;
+       long long offset;
+       long long __pad3;
+    unsigned flags;
+    unsigned resfd;
+}; /* result code is the amount read or -'ve errno */
+
+struct iocb {
+       void *data;
+       unsigned key;
+       short aio_lio_opcode;
+       short aio_reqprio;
+       int aio_fildes;
+       union {
+               struct io_iocb_common c;
+       } u;
+};
+
+struct io_event {
+       uint64_t  data;  /* the data field from the iocb */
+       uint64_t  obj;   /* what iocb this event came from */
+       int64_t   res;   /* result code for this event */
+       int64_t   res2;  /* secondary result */
+};
+
+/* Linux specific io calls */
+static int
+io_setup (guint nr_reqs, aio_context_t *ctx)
+{
+    return syscall (SYS_io_setup, nr_reqs, ctx);
+}
+
+static int
+io_destroy (aio_context_t ctx)
+{
+    return syscall (SYS_io_destroy, ctx);
+}
+
+static int
+io_getevents (aio_context_t ctx, long min_nr, long nr, struct io_event *events, struct timespec *tmo)
+{
+       return syscall (SYS_io_getevents, ctx, min_nr, nr, events, tmo);
+}
+
+static int
+io_submit (aio_context_t ctx, long n, struct iocb **paiocb)
+{
+    return syscall (SYS_io_submit, ctx, n, paiocb);
+}
+
+static int
+io_cancel (aio_context_t ctx, struct iocb *iocb, struct io_event *result)
+{
+    return syscall (SYS_io_cancel, ctx, iocb, result);
+}
+
+# ifndef HAVE_SYS_EVENTFD_H
+static int
+eventfd(guint initval, guint flags)
+{
+       return syscall (SYS_eventfd, initval);
+}
+# endif
+
+#endif
+
+/**
+ * AIO context
+ */
+struct aio_context {
+       struct event_base *base;
+       gboolean has_aio;               /**< Whether we have aio support on a system */
+#ifdef LINUX
+       /* Eventfd variant */
+       gint event_fd;
+       struct event eventfd_ev;
+       aio_context_t io_ctx;
+#elif defined(HAVE_AIO_H)
+       /* POSIX aio */
+       struct event rtsigs[SIGRTMAX - SIGRTMIN];
+#endif
+};
+
+#ifdef LINUX
+/* Eventfd read callback */
+static void
+rspamd_eventfdcb (gint fd, gshort what, gpointer ud)
+{
+       struct aio_context                                      *ctx = ud;
+       guint64                                                          ready;
+       gint                                                             done, i;
+       struct io_event                                          event[64];
+       struct timespec                                          ts;
+       struct io_cbdata                                        *ev_data;
+
+       /* Eventfd returns number of events ready got from kernel */
+       if (read (fd, &ready, 8) != 8) {
+               if (errno == EAGAIN) {
+                       return;
+               }
+               msg_err ("eventfd read returned error: %s", strerror (errno));
+       }
+
+       ts.tv_sec = 0;
+       ts.tv_nsec = 0;
+
+       while (ready) {
+               /* Get events ready */
+               done = io_getevents (ctx->io_ctx, 1, 64, event, &ts);
+
+               if (done > 0) {
+                       ready -= done;
+
+                       for (i = 0; i < done; i ++) {
+                               ev_data = (struct io_cbdata *) (uintptr_t) event[i].data;
+                               /* Call this callback */
+                               ev_data->cb (event[i].res, ev_data->len, ev_data->buf, ev_data->ud);
+                       }
+               }
+               else if (done == 0) {
+                       /* No more events are ready */
+                       return;
+               }
+               else {
+                       msg_err ("io_getevents failed: %s", strerror (errno));
+                       return;
+               }
+       }
+}
+
+#endif
+
+/**
+ * Initialize aio with specified event base
+ */
+struct aio_context*
+rspamd_aio_init (struct event_base *base)
+{
+       struct aio_context                                      *new;
+
+       /* First of all we need to detect which type of aio we can try to use */
+       new = g_malloc0 (sizeof (struct aio_context));
+       new->base = base;
+
+#ifdef LINUX
+       /* On linux we are trying to use io (3) and eventfd for notifying */
+       new->event_fd = eventfd (0, 0);
+       if (new->event_fd == -1) {
+               msg_err ("eventfd failed: %s", strerror (errno));
+       }
+       else {
+               /* Set this socket non-blocking */
+               if (make_socket_nonblocking (new->event_fd) == -1) {
+                       msg_err ("non blocking for eventfd failed: %s", strerror (errno));
+                       close (new->event_fd);
+               }
+               else {
+                       event_set (&new->eventfd_ev, new->event_fd, EV_READ|EV_PERSIST, rspamd_eventfdcb, new);
+                       event_base_set (new->base, &new->eventfd_ev);
+                       if (io_setup (MAX_AIO_EV, &new->io_ctx) == -1) {
+                               msg_err ("io_setup failed: %s", strerror (errno));
+                               close (new->event_fd);
+                       }
+                       else {
+                               new->has_aio = TRUE;
+                       }
+               }
+       }
+#elif defined(HAVE_AIO_H)
+       /* TODO: implement this */
+#endif
+
+       return new;
+}
+
+/**
+ * Open file for aio
+ */
+gint
+rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags)
+{
+       gint                                                                            fd = -1;
+       /* Fallback */
+       if (!ctx->has_aio) {
+               return open (path, flags);
+       }
+#ifdef LINUX
+
+       fd = open (path, flags | O_DIRECT | O_NONBLOCK);
+
+       return fd;
+#elif defined(HAVE_AIO_H)
+       fd = open (path, flags | O_NONBLOCK);
+#endif
+
+       return fd;
+}
+
+/**
+ * Asynchronous read of file
+ */
+gint
+rspamd_aio_read (gint fd, gpointer buf, gsize len, struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud)
+{
+       struct io_cbdata                                                        *cbdata;
+       gint                                                                             r = -1;
+
+       if (ctx->has_aio) {
+#ifdef LINUX
+               struct iocb                                                             *iocb[1];
+
+               cbdata = g_slice_alloc (sizeof (struct io_cbdata));
+               cbdata->cb = cb;
+               cbdata->buf = buf;
+               cbdata->len = len;
+               cbdata->ud = ud;
+
+               iocb[0] = alloca (sizeof (struct iocb));
+               memset (iocb[0], 0, sizeof (struct iocb));
+               iocb[0]->aio_fildes = fd;
+               iocb[0]->aio_lio_opcode = IO_CMD_PREAD;
+               iocb[0]->aio_reqprio = 0;
+               iocb[0]->u.c.buf = buf;
+               iocb[0]->u.c.nbytes = len;
+               iocb[0]->u.c.offset = 0;
+               iocb[0]->u.c.flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
+               iocb[0]->u.c.resfd = ctx->event_fd;
+               iocb[0]->data = cbdata;
+
+               /* Iocb is copied to kernel internally, so it is safe to put it on stack */
+               if (io_submit (ctx->io_ctx, 1, iocb) == 1) {
+                       return len;
+               }
+               else {
+                       if (errno == EAGAIN || errno == ENOSYS) {
+                               /* Fall back to sync read */
+                               goto blocking;
+                       }
+                       return -1;
+               }
+
+#elif defined(HAVE_AIO_H)
+#endif
+       }
+       else {
+               /* Blocking variant */
+blocking:
+               r = read (fd, buf, len);
+               if (r >= 0) {
+                       cb (0, r, buf, ud);
+               }
+               else {
+                       cb (r, -1, buf, ud);
+               }
+       }
+
+       return r;
+}
+
+/**
+ * Asynchronous write of file
+ */
+gint
+rspamd_aio_write (gint fd, gpointer buf, gsize len, struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud)
+{
+       struct io_cbdata                                                        *cbdata;
+       gint                                                                             r = -1;
+
+       if (ctx->has_aio) {
+#ifdef LINUX
+               struct iocb                                                              *iocb[1];
+
+               cbdata = g_slice_alloc (sizeof (struct io_cbdata));
+               cbdata->cb = cb;
+               cbdata->buf = buf;
+               cbdata->len = len;
+               cbdata->ud = ud;
+
+               iocb[0] = alloca (sizeof (struct iocb));
+               memset (iocb[0], 0, sizeof (struct iocb));
+               iocb[0]->aio_fildes = fd;
+               iocb[0]->aio_lio_opcode = IO_CMD_PWRITE;
+               iocb[0]->aio_reqprio = 0;
+               iocb[0]->u.c.buf = buf;
+               iocb[0]->u.c.nbytes = len;
+               iocb[0]->u.c.offset = 0;
+               iocb[0]->u.c.flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
+               iocb[0]->u.c.resfd = ctx->event_fd;
+               iocb[0]->data = cbdata;
+
+               /* Iocb is copied to kernel internally, so it is safe to put it on stack */
+               if (io_submit (ctx->io_ctx, 1, iocb) == 1) {
+                       return len;
+               }
+               else {
+                       if (errno == EAGAIN || errno == ENOSYS) {
+                               /* Fall back to sync read */
+                               goto blocking;
+                       }
+                       return -1;
+               }
+
+#elif defined(HAVE_AIO_H)
+#endif
+       }
+       else {
+               /* Blocking variant */
+blocking:
+               r = write (fd, buf, len);
+               if (r >= 0) {
+                       cb (0, r, buf, ud);
+               }
+               else {
+                       cb (r, -1, buf, ud);
+               }
+       }
+
+       return r;
+}
+
+/**
+ * Close of aio operations
+ */
+gint
+rspamd_aio_close (gint fd, struct aio_context *ctx)
+{
+       gint                                                                             r = -1;
+
+       if (ctx->has_aio) {
+#ifdef LINUX
+               struct iocb                                                              iocb;
+               struct io_event                                                  ev;
+
+               memset (&iocb, 0, sizeof (struct iocb));
+               iocb.aio_fildes = fd;
+               iocb.aio_lio_opcode = IO_CMD_NOOP;
+
+               /* Iocb is copied to kernel internally, so it is safe to put it on stack */
+               r = io_cancel (ctx->io_ctx, &iocb, &ev);
+               close (fd);
+               return r;
+
+#elif defined(HAVE_AIO_H)
+#endif
+       }
+
+       r = close (fd);
+
+       return r;
+}
diff --git a/src/aio_event.h b/src/aio_event.h
new file mode 100644 (file)
index 0000000..3149174
--- /dev/null
@@ -0,0 +1,65 @@
+/* Copyright (c) 2010-2011, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *       * Redistributions of source code must retain the above copyright
+ *         notice, this list of conditions and the following disclaimer.
+ *       * Redistributions in binary form must reproduce the above copyright
+ *         notice, this list of conditions and the following disclaimer in the
+ *         documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#ifndef AIO_EVENT_H_
+#define AIO_EVENT_H_
+
+#include "config.h"
+
+/**
+ * AIO context
+ */
+struct aio_context;
+
+/**
+ * Callback for notifying
+ */
+typedef void (*rspamd_aio_cb) (gint res, gsize len, gpointer data, gpointer ud);
+
+/**
+ * Initialize aio with specified event base
+ */
+struct aio_context* rspamd_aio_init (struct event_base *base);
+
+/**
+ * Open file for aio
+ */
+gint rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags);
+
+/**
+ * Asynchronous read of file
+ */
+gint rspamd_aio_read (gint fd, gpointer buf, gsize len, struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud);
+
+/**
+ * Asynchronous write of file
+ */
+gint rspamd_aio_write (gint fd, gpointer buf, gsize len, struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud);
+
+/**
+ * Close of aio operations
+ */
+gint rspamd_aio_close (gint fd, struct aio_context *ctx);
+
+#endif /* AIO_EVENT_H_ */