/*- * Copyright 2016 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "config.h" #include #include "aio_event.h" #include "rspamd.h" #include "unix-std.h" #ifdef HAVE_SYS_EVENTFD_H #include #endif #ifdef HAVE_AIO_H #include #endif /* Linux syscall numbers */ #if defined(__i386__) # define SYS_io_setup 245 # define SYS_io_destroy 246 # define SYS_io_getevents 247 # define SYS_io_submit 248 # define SYS_io_cancel 249 #elif defined(__x86_64__) # define SYS_io_setup 206 # define SYS_io_destroy 207 # define SYS_io_getevents 208 # define SYS_io_submit 209 # define SYS_io_cancel 210 #else # warning \ "aio is not supported on this platform, please contact author for details" # define SYS_io_setup 0 # define SYS_io_destroy 0 # define SYS_io_getevents 0 # define SYS_io_submit 0 # define SYS_io_cancel 0 #endif #define SYS_eventfd 323 #define MAX_AIO_EV 64 struct io_cbdata { gint fd; rspamd_aio_cb cb; guint64 len; gpointer buf; gpointer io_buf; gpointer ud; }; #ifdef LINUX /* Linux specific mappings and utilities to avoid using of libaio */ typedef unsigned long aio_context_t; typedef enum io_iocb_cmd { IO_CMD_PREAD = 0, IO_CMD_PWRITE = 1, IO_CMD_FSYNC = 2, IO_CMD_FDSYNC = 3, IO_CMD_POLL = 5, IO_CMD_NOOP = 6, } io_iocb_cmd_t; #if defined(__LITTLE_ENDIAN) #define PADDED(x,y) x, y #elif defined(__BIG_ENDIAN) #define PADDED(x,y) y, x #else #error edit for your odd byteorder. #endif /* * we always use a 64bit off_t when communicating * with userland. its up to libraries to do the * proper padding and aio_error abstraction */ struct iocb { /* these are internal to the kernel/libc. */ guint64 aio_data; /* data to be returned in event's data */ guint32 PADDED (aio_key, aio_reserved1); /* the kernel sets aio_key to the req # */ /* common fields */ guint16 aio_lio_opcode; /* see IOCB_CMD_ above */ gint16 aio_reqprio; guint32 aio_fildes; guint64 aio_buf; guint64 aio_nbytes; gint64 aio_offset; /* extra parameters */ guint64 aio_reserved2; /* TODO: use this for a (struct sigevent *) */ /* flags for the "struct iocb" */ guint32 aio_flags; /* * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an * eventfd to signal AIO readiness to */ guint32 aio_resfd; }; struct io_event { guint64 data; /* the data field from the iocb */ guint64 obj; /* what iocb this event came from */ gint64 res; /* result code for this event */ gint64 res2; /* secondary result */ }; /* Linux specific io calls */ static int io_setup (guint nr_reqs, aio_context_t *ctx) { return syscall (SYS_io_setup, nr_reqs, ctx); } static int io_destroy (aio_context_t ctx) { return syscall (SYS_io_destroy, ctx); } static int io_getevents (aio_context_t ctx, long min_nr, long nr, struct io_event *events, struct timespec *tmo) { return syscall (SYS_io_getevents, ctx, min_nr, nr, events, tmo); } static int io_submit (aio_context_t ctx, long n, struct iocb **paiocb) { return syscall (SYS_io_submit, ctx, n, paiocb); } static int io_cancel (aio_context_t ctx, struct iocb *iocb, struct io_event *result) { return syscall (SYS_io_cancel, ctx, iocb, result); } # ifndef HAVE_SYS_EVENTFD_H static int eventfd (guint initval, guint flags) { return syscall (SYS_eventfd, initval); } # endif #endif /** * AIO context */ struct aio_context { struct event_base *base; gboolean has_aio; /**< Whether we have aio support on a system */ #ifdef LINUX /* Eventfd variant */ gint event_fd; struct event eventfd_ev; aio_context_t io_ctx; #elif defined(HAVE_AIO_H) /* POSIX aio */ struct event rtsigs[128]; #endif }; #ifdef LINUX /* Eventfd read callback */ static void rspamd_eventfdcb (gint fd, gshort what, gpointer ud) { struct aio_context *ctx = ud; guint64 ready; gint done, i; struct io_event event[32]; struct timespec ts; struct io_cbdata *ev_data; /* Eventfd returns number of events ready got from kernel */ if (read (fd, &ready, 8) != 8) { if (errno == EAGAIN) { return; } msg_err ("eventfd read returned error: %s", strerror (errno)); } ts.tv_sec = 0; ts.tv_nsec = 0; while (ready) { /* Get events ready */ done = io_getevents (ctx->io_ctx, 1, 32, event, &ts); if (done > 0) { ready -= done; for (i = 0; i < done; i++) { ev_data = (struct io_cbdata *) (uintptr_t) event[i].data; /* Call this callback */ ev_data->cb (ev_data->fd, event[i].res, ev_data->len, ev_data->buf, ev_data->ud); if (ev_data->io_buf) { free (ev_data->io_buf); } g_free (ev_data); } } else if (done == 0) { /* No more events are ready */ return; } else { msg_err ("io_getevents failed: %s", strerror (errno)); return; } } } #endif /** * Initialize aio with specified event base */ struct aio_context * rspamd_aio_init (struct event_base *base) { struct aio_context *new; /* First of all we need to detect which type of aio we can try to use */ new = g_malloc0 (sizeof (struct aio_context)); new->base = base; #ifdef LINUX /* On linux we are trying to use io (3) and eventfd for notifying */ new->event_fd = eventfd (0, 0); if (new->event_fd == -1) { msg_err ("eventfd failed: %s", strerror (errno)); } else { /* Set this socket non-blocking */ if (rspamd_socket_nonblocking (new->event_fd) == -1) { msg_err ("non blocking for eventfd failed: %s", strerror (errno)); close (new->event_fd); } else { event_set (&new->eventfd_ev, new->event_fd, EV_READ | EV_PERSIST, rspamd_eventfdcb, new); event_base_set (new->base, &new->eventfd_ev); event_add (&new->eventfd_ev, NULL); if (io_setup (MAX_AIO_EV, &new->io_ctx) == -1) { msg_err ("io_setup failed: %s", strerror (errno)); close (new->event_fd); } else { new->has_aio = TRUE; } } } #elif defined(HAVE_AIO_H) /* TODO: implement this */ #endif return new; } /** * Open file for aio */ gint rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags) { gint fd = -1; /* Fallback */ if (!ctx->has_aio) { return open (path, flags); } #ifdef LINUX fd = open (path, flags | O_DIRECT); return fd; #elif defined(HAVE_AIO_H) fd = open (path, flags); #endif return fd; } /** * Asynchronous read of file */ gint rspamd_aio_read (gint fd, gpointer buf, guint64 len, guint64 offset, struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud) { gint r = -1; if (ctx->has_aio) { #ifdef LINUX struct iocb *iocb[1]; struct io_cbdata *cbdata; cbdata = g_malloc0 (sizeof (struct io_cbdata)); cbdata->cb = cb; cbdata->buf = buf; cbdata->len = len; cbdata->ud = ud; cbdata->fd = fd; cbdata->io_buf = NULL; iocb[0] = alloca (sizeof (struct iocb)); memset (iocb[0], 0, sizeof (struct iocb)); iocb[0]->aio_fildes = fd; iocb[0]->aio_lio_opcode = IO_CMD_PREAD; iocb[0]->aio_reqprio = 0; iocb[0]->aio_buf = (guint64)((uintptr_t)buf); iocb[0]->aio_nbytes = len; iocb[0]->aio_offset = offset; iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */; iocb[0]->aio_resfd = ctx->event_fd; iocb[0]->aio_data = (guint64)((uintptr_t)cbdata); /* Iocb is copied to kernel internally, so it is safe to put it on stack */ if (io_submit (ctx->io_ctx, 1, iocb) == 1) { return len; } else { if (errno == EAGAIN || errno == ENOSYS) { /* Fall back to sync read */ goto blocking; } return -1; } #elif defined(HAVE_AIO_H) #endif } else { /* Blocking variant */ goto blocking; blocking: #ifdef _LARGEFILE64_SOURCE r = lseek64 (fd, offset, SEEK_SET); #else r = lseek (fd, offset, SEEK_SET); #endif if (r > 0) { r = read (fd, buf, len); if (r >= 0) { cb (fd, 0, r, buf, ud); } else { cb (fd, r, -1, buf, ud); } } } return r; } /** * Asynchronous write of file */ gint rspamd_aio_write (gint fd, gpointer buf, guint64 len, guint64 offset, struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud) { gint r = -1; if (ctx->has_aio) { #ifdef LINUX struct iocb *iocb[1]; struct io_cbdata *cbdata; cbdata = g_malloc0 (sizeof (struct io_cbdata)); cbdata->cb = cb; cbdata->buf = buf; cbdata->len = len; cbdata->ud = ud; cbdata->fd = fd; /* We need to align pointer on boundary of 512 bytes here */ if (posix_memalign (&cbdata->io_buf, 512, len) != 0) { return -1; } memcpy (cbdata->io_buf, buf, len); iocb[0] = alloca (sizeof (struct iocb)); memset (iocb[0], 0, sizeof (struct iocb)); iocb[0]->aio_fildes = fd; iocb[0]->aio_lio_opcode = IO_CMD_PWRITE; iocb[0]->aio_reqprio = 0; iocb[0]->aio_buf = (guint64)((uintptr_t)cbdata->io_buf); iocb[0]->aio_nbytes = len; iocb[0]->aio_offset = offset; iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */; iocb[0]->aio_resfd = ctx->event_fd; iocb[0]->aio_data = (guint64)((uintptr_t)cbdata); /* Iocb is copied to kernel internally, so it is safe to put it on stack */ if (io_submit (ctx->io_ctx, 1, iocb) == 1) { return len; } else { if (errno == EAGAIN || errno == ENOSYS) { /* Fall back to sync read */ goto blocking; } return -1; } #elif defined(HAVE_AIO_H) #endif } else { /* Blocking variant */ goto blocking; blocking: #ifdef _LARGEFILE64_SOURCE r = lseek64 (fd, offset, SEEK_SET); #else r = lseek (fd, offset, SEEK_SET); #endif if (r > 0) { r = write (fd, buf, len); if (r >= 0) { cb (fd, 0, r, buf, ud); } else { cb (fd, r, -1, buf, ud); } } } return r; } /** * Close of aio operations */ gint rspamd_aio_close (gint fd, struct aio_context *ctx) { gint r = -1; if (ctx->has_aio) { #ifdef LINUX struct iocb iocb; struct io_event ev; memset (&iocb, 0, sizeof (struct iocb)); iocb.aio_fildes = fd; iocb.aio_lio_opcode = IO_CMD_NOOP; /* Iocb is copied to kernel internally, so it is safe to put it on stack */ r = io_cancel (ctx->io_ctx, &iocb, &ev); close (fd); return r; #elif defined(HAVE_AIO_H) #endif } r = close (fd); return r; }