123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508 |
- /*-
- * Copyright 2016 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "config.h"
- #include <event.h>
- #include "aio_event.h"
- #include "rspamd.h"
- #include "unix-std.h"
-
- #ifdef HAVE_SYS_EVENTFD_H
- #include <sys/eventfd.h>
- #endif
-
- #ifdef HAVE_AIO_H
- #include <aio.h>
- #endif
-
- /* Linux syscall numbers */
- #if defined(__i386__)
- # define SYS_io_setup 245
- # define SYS_io_destroy 246
- # define SYS_io_getevents 247
- # define SYS_io_submit 248
- # define SYS_io_cancel 249
- #elif defined(__x86_64__)
- # define SYS_io_setup 206
- # define SYS_io_destroy 207
- # define SYS_io_getevents 208
- # define SYS_io_submit 209
- # define SYS_io_cancel 210
- #else
- # warning \
- "aio is not supported on this platform, please contact author for details"
- # define SYS_io_setup 0
- # define SYS_io_destroy 0
- # define SYS_io_getevents 0
- # define SYS_io_submit 0
- # define SYS_io_cancel 0
- #endif
-
- #define SYS_eventfd 323
- #define MAX_AIO_EV 64
-
- struct io_cbdata {
- gint fd;
- rspamd_aio_cb cb;
- guint64 len;
- gpointer buf;
- gpointer io_buf;
- gpointer ud;
- };
-
- #ifdef LINUX
-
- /* Linux specific mappings and utilities to avoid using of libaio */
-
- typedef unsigned long aio_context_t;
-
- typedef enum io_iocb_cmd {
- IO_CMD_PREAD = 0,
- IO_CMD_PWRITE = 1,
-
- IO_CMD_FSYNC = 2,
- IO_CMD_FDSYNC = 3,
-
- IO_CMD_POLL = 5,
- IO_CMD_NOOP = 6,
- } io_iocb_cmd_t;
-
- #if defined(__LITTLE_ENDIAN)
- #define PADDED(x,y) x, y
- #elif defined(__BIG_ENDIAN)
- #define PADDED(x,y) y, x
- #else
- #error edit for your odd byteorder.
- #endif
-
- /*
- * we always use a 64bit off_t when communicating
- * with userland. its up to libraries to do the
- * proper padding and aio_error abstraction
- */
-
- struct iocb {
- /* these are internal to the kernel/libc. */
- guint64 aio_data; /* data to be returned in event's data */
- guint32 PADDED (aio_key, aio_reserved1);
- /* the kernel sets aio_key to the req # */
-
- /* common fields */
- guint16 aio_lio_opcode; /* see IOCB_CMD_ above */
- gint16 aio_reqprio;
- guint32 aio_fildes;
-
- guint64 aio_buf;
- guint64 aio_nbytes;
- gint64 aio_offset;
-
- /* extra parameters */
- guint64 aio_reserved2; /* TODO: use this for a (struct sigevent *) */
-
- /* flags for the "struct iocb" */
- guint32 aio_flags;
-
- /*
- * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an
- * eventfd to signal AIO readiness to
- */
- guint32 aio_resfd;
- };
-
- struct io_event {
- guint64 data; /* the data field from the iocb */
- guint64 obj; /* what iocb this event came from */
- gint64 res; /* result code for this event */
- gint64 res2; /* secondary result */
- };
-
- /* Linux specific io calls */
- static int
- io_setup (guint nr_reqs, aio_context_t *ctx)
- {
- return syscall (SYS_io_setup, nr_reqs, ctx);
- }
-
- static int
- io_destroy (aio_context_t ctx)
- {
- return syscall (SYS_io_destroy, ctx);
- }
-
- static int
- io_getevents (aio_context_t ctx,
- long min_nr,
- long nr,
- struct io_event *events,
- struct timespec *tmo)
- {
- return syscall (SYS_io_getevents, ctx, min_nr, nr, events, tmo);
- }
-
- static int
- io_submit (aio_context_t ctx, long n, struct iocb **paiocb)
- {
- return syscall (SYS_io_submit, ctx, n, paiocb);
- }
-
- static int
- io_cancel (aio_context_t ctx, struct iocb *iocb, struct io_event *result)
- {
- return syscall (SYS_io_cancel, ctx, iocb, result);
- }
-
- # ifndef HAVE_SYS_EVENTFD_H
- static int
- eventfd (guint initval, guint flags)
- {
- return syscall (SYS_eventfd, initval);
- }
- # endif
-
- #endif
-
- /**
- * AIO context
- */
- struct aio_context {
- struct event_base *base;
- gboolean has_aio; /**< Whether we have aio support on a system */
- #ifdef LINUX
- /* Eventfd variant */
- gint event_fd;
- struct event eventfd_ev;
- aio_context_t io_ctx;
- #elif defined(HAVE_AIO_H)
- /* POSIX aio */
- struct event rtsigs[128];
- #endif
- };
-
- #ifdef LINUX
- /* Eventfd read callback */
- static void
- rspamd_eventfdcb (gint fd, gshort what, gpointer ud)
- {
- struct aio_context *ctx = ud;
- guint64 ready;
- gint done, i;
- struct io_event event[32];
- struct timespec ts;
- struct io_cbdata *ev_data;
-
- /* Eventfd returns number of events ready got from kernel */
- if (read (fd, &ready, 8) != 8) {
- if (errno == EAGAIN) {
- return;
- }
- msg_err ("eventfd read returned error: %s", strerror (errno));
- }
-
- ts.tv_sec = 0;
- ts.tv_nsec = 0;
-
- while (ready) {
- /* Get events ready */
- done = io_getevents (ctx->io_ctx, 1, 32, event, &ts);
-
- if (done > 0) {
- ready -= done;
-
- for (i = 0; i < done; i++) {
- ev_data = (struct io_cbdata *) (uintptr_t) event[i].data;
- /* Call this callback */
- ev_data->cb (ev_data->fd,
- event[i].res,
- ev_data->len,
- ev_data->buf,
- ev_data->ud);
- if (ev_data->io_buf) {
- free (ev_data->io_buf);
- }
- g_free (ev_data);
- }
- }
- else if (done == 0) {
- /* No more events are ready */
- return;
- }
- else {
- msg_err ("io_getevents failed: %s", strerror (errno));
- return;
- }
- }
- }
-
- #endif
-
- /**
- * Initialize aio with specified event base
- */
- struct aio_context *
- rspamd_aio_init (struct event_base *base)
- {
- struct aio_context *new;
-
- /* First of all we need to detect which type of aio we can try to use */
- new = g_malloc0 (sizeof (struct aio_context));
- new->base = base;
-
- #ifdef LINUX
- /* On linux we are trying to use io (3) and eventfd for notifying */
- new->event_fd = eventfd (0, 0);
- if (new->event_fd == -1) {
- msg_err ("eventfd failed: %s", strerror (errno));
- }
- else {
- /* Set this socket non-blocking */
- if (rspamd_socket_nonblocking (new->event_fd) == -1) {
- msg_err ("non blocking for eventfd failed: %s", strerror (errno));
- close (new->event_fd);
- }
- else {
- event_set (&new->eventfd_ev,
- new->event_fd,
- EV_READ | EV_PERSIST,
- rspamd_eventfdcb,
- new);
- event_base_set (new->base, &new->eventfd_ev);
- event_add (&new->eventfd_ev, NULL);
- if (io_setup (MAX_AIO_EV, &new->io_ctx) == -1) {
- msg_err ("io_setup failed: %s", strerror (errno));
- close (new->event_fd);
- }
- else {
- new->has_aio = TRUE;
- }
- }
- }
- #elif defined(HAVE_AIO_H)
- /* TODO: implement this */
- #endif
-
- return new;
- }
-
- /**
- * Open file for aio
- */
- gint
- rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags)
- {
- gint fd = -1;
- /* Fallback */
- if (!ctx->has_aio) {
- return open (path, flags);
- }
- #ifdef LINUX
-
- fd = open (path, flags | O_DIRECT);
-
- return fd;
- #elif defined(HAVE_AIO_H)
- fd = open (path, flags);
- #endif
-
- return fd;
- }
-
- /**
- * Asynchronous read of file
- */
- gint
- rspamd_aio_read (gint fd,
- gpointer buf,
- guint64 len,
- guint64 offset,
- struct aio_context *ctx,
- rspamd_aio_cb cb,
- gpointer ud)
- {
- gint r = -1;
-
- if (ctx->has_aio) {
- #ifdef LINUX
- struct iocb *iocb[1];
- struct io_cbdata *cbdata;
-
- cbdata = g_malloc0 (sizeof (struct io_cbdata));
- cbdata->cb = cb;
- cbdata->buf = buf;
- cbdata->len = len;
- cbdata->ud = ud;
- cbdata->fd = fd;
- cbdata->io_buf = NULL;
-
- iocb[0] = alloca (sizeof (struct iocb));
- memset (iocb[0], 0, sizeof (struct iocb));
- iocb[0]->aio_fildes = fd;
- iocb[0]->aio_lio_opcode = IO_CMD_PREAD;
- iocb[0]->aio_reqprio = 0;
- iocb[0]->aio_buf = (guint64)((uintptr_t)buf);
- iocb[0]->aio_nbytes = len;
- iocb[0]->aio_offset = offset;
- iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
- iocb[0]->aio_resfd = ctx->event_fd;
- iocb[0]->aio_data = (guint64)((uintptr_t)cbdata);
-
- /* Iocb is copied to kernel internally, so it is safe to put it on stack */
- if (io_submit (ctx->io_ctx, 1, iocb) == 1) {
- return len;
- }
- else {
- if (errno == EAGAIN || errno == ENOSYS) {
- /* Fall back to sync read */
- goto blocking;
- }
- return -1;
- }
-
- #elif defined(HAVE_AIO_H)
- #endif
- }
- else {
- /* Blocking variant */
- goto blocking;
- blocking:
- #ifdef _LARGEFILE64_SOURCE
- r = lseek64 (fd, offset, SEEK_SET);
- #else
- r = lseek (fd, offset, SEEK_SET);
- #endif
- if (r > 0) {
- r = read (fd, buf, len);
- if (r >= 0) {
- cb (fd, 0, r, buf, ud);
- }
- else {
- cb (fd, r, -1, buf, ud);
- }
- }
- }
-
- return r;
- }
-
- /**
- * Asynchronous write of file
- */
- gint
- rspamd_aio_write (gint fd,
- gpointer buf,
- guint64 len,
- guint64 offset,
- struct aio_context *ctx,
- rspamd_aio_cb cb,
- gpointer ud)
- {
- gint r = -1;
-
- if (ctx->has_aio) {
- #ifdef LINUX
- struct iocb *iocb[1];
- struct io_cbdata *cbdata;
-
- cbdata = g_malloc0 (sizeof (struct io_cbdata));
- cbdata->cb = cb;
- cbdata->buf = buf;
- cbdata->len = len;
- cbdata->ud = ud;
- cbdata->fd = fd;
- /* We need to align pointer on boundary of 512 bytes here */
- if (posix_memalign (&cbdata->io_buf, 512, len) != 0) {
- return -1;
- }
- memcpy (cbdata->io_buf, buf, len);
-
- iocb[0] = alloca (sizeof (struct iocb));
- memset (iocb[0], 0, sizeof (struct iocb));
- iocb[0]->aio_fildes = fd;
- iocb[0]->aio_lio_opcode = IO_CMD_PWRITE;
- iocb[0]->aio_reqprio = 0;
- iocb[0]->aio_buf = (guint64)((uintptr_t)cbdata->io_buf);
- iocb[0]->aio_nbytes = len;
- iocb[0]->aio_offset = offset;
- iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
- iocb[0]->aio_resfd = ctx->event_fd;
- iocb[0]->aio_data = (guint64)((uintptr_t)cbdata);
-
- /* Iocb is copied to kernel internally, so it is safe to put it on stack */
- if (io_submit (ctx->io_ctx, 1, iocb) == 1) {
- return len;
- }
- else {
- if (errno == EAGAIN || errno == ENOSYS) {
- /* Fall back to sync read */
- goto blocking;
- }
- return -1;
- }
-
- #elif defined(HAVE_AIO_H)
- #endif
- }
- else {
- /* Blocking variant */
- goto blocking;
- blocking:
- #ifdef _LARGEFILE64_SOURCE
- r = lseek64 (fd, offset, SEEK_SET);
- #else
- r = lseek (fd, offset, SEEK_SET);
- #endif
- if (r > 0) {
- r = write (fd, buf, len);
- if (r >= 0) {
- cb (fd, 0, r, buf, ud);
- }
- else {
- cb (fd, r, -1, buf, ud);
- }
- }
- }
-
- return r;
- }
-
- /**
- * Close of aio operations
- */
- gint
- rspamd_aio_close (gint fd, struct aio_context *ctx)
- {
- gint r = -1;
-
- if (ctx->has_aio) {
- #ifdef LINUX
- struct iocb iocb;
- struct io_event ev;
-
- memset (&iocb, 0, sizeof (struct iocb));
- iocb.aio_fildes = fd;
- iocb.aio_lio_opcode = IO_CMD_NOOP;
-
- /* Iocb is copied to kernel internally, so it is safe to put it on stack */
- r = io_cancel (ctx->io_ctx, &iocb, &ev);
- close (fd);
- return r;
-
- #elif defined(HAVE_AIO_H)
- #endif
- }
-
- r = close (fd);
-
- return r;
- }
|