浏览代码

* Add AIO framework for linux io(3) interface.

tags/0.4.7
Vsevolod Stakhov 12 年前
父节点
当前提交
9f2cb66ccb
共有 5 个文件被更改,包括 499 次插入1 次删除
  1. 6
    0
      CMakeLists.txt
  2. 11
    0
      config.h.in
  3. 3
    1
      lib/CMakeLists.txt
  4. 414
    0
      src/aio_event.c
  5. 65
    0
      src/aio_event.h

+ 6
- 0
CMakeLists.txt 查看文件

@@ -653,6 +653,9 @@ CHECK_INCLUDE_FILES(glob.h HAVE_GLOB_H)
CHECK_INCLUDE_FILES(poll.h HAVE_POLL_H)
CHECK_INCLUDE_FILES(sys/sendfile.h HAVE_SYS_SENDFILE_H)
CHECK_INCLUDE_FILES(linux/falloc.h HAVE_LINUX_FALLOC_H)
CHECK_INCLUDE_FILES(sys/eventfd.h HAVE_SYS_EVENTFD_H)
CHECK_INCLUDE_FILES(aio.h HAVE_AIO_H)
CHECK_INCLUDE_FILES(libaio.h HAVE_LIBAIO_H)

# Some dependencies
IF(HAVE_SYS_WAIT_H)
@@ -679,6 +682,8 @@ CHECK_FUNCTION_EXISTS(mkstemp HAVE_MKSTEMP)
CHECK_FUNCTION_EXISTS(setitimer HAVE_SETITIMER)
CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)

#

# Check macros
CHECK_SYMBOL_EXISTS(PATH_MAX limits.h HAVE_PATH_MAX)
CHECK_SYMBOL_EXISTS(MAXPATHLEN sys/param.h HAVE_MAXPATHLEN)
@@ -876,6 +881,7 @@ IF(GMIME24)
ELSE(GMIME24)
TARGET_LINK_LIBRARIES(rspamd ${GMIME2_LIBRARIES})
ENDIF(GMIME24)

IF(ENABLE_STATIC MATCHES "ON")
TARGET_LINK_LIBRARIES(rspamd ${PCRE_LIBRARIES})
ENDIF(ENABLE_STATIC MATCHES "ON")

+ 11
- 0
config.h.in 查看文件

@@ -175,6 +175,9 @@

#cmakedefine HAVE_SENDFILE 1
#cmakedefine HAVE_SYS_SENDFILE_H 1
#cmakedefine HAVE_SYS_EVENTFD_H 1
#cmakedefine HAVE_AIO_H 1
#cmakedefine HAVE_LIBAIO_H 1

#cmakedefine HAVE_MKSTEMP 1

@@ -378,6 +381,14 @@
#define HAVE_SETLOCALE 1
#endif

#ifdef HAVE_SYS_EVENTFD_H
#include <sys/eventfd.h>
#endif

#ifdef HAVE_AIO_H
#include <aio.h>
#endif

#ifdef HAVE_SYS_SENDFILE_H
#include <sys/sendfile.h>
#endif

+ 3
- 1
lib/CMakeLists.txt 查看文件

@@ -35,7 +35,9 @@ INSTALL(TARGETS rspamdclient rspamdclient_static LIBRARY PUBLIC_HEADER

# Librspamdserver
SET(RSPAMDLIBSRC ../src/binlog.c
SET(RSPAMDLIBSRC
../src/aio_event.c
../src/binlog.c
../src/bloom.c
../src/buffer.c
../src/cfg_utils.c

+ 414
- 0
src/aio_event.c 查看文件

@@ -0,0 +1,414 @@
/* Copyright (c) 2010-2011, Vsevolod Stakhov
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

#include "config.h"
#include "aio_event.h"
#include "main.h"

/* Linux syscall numbers */
#define SYS_io_setup 245
#define SYS_io_destroy 246
#define SYS_io_getevents 247
#define SYS_io_submit 248
#define SYS_io_cancel 249
#define SYS_eventfd 323
#define MAX_AIO_EV 32768

struct io_cbdata {
rspamd_aio_cb cb;
gsize len;
gpointer buf;
gpointer ud;
};

#ifdef LINUX

/* Linux specific mappings and utilities to avoid using of libaio */

typedef unsigned int aio_context_t;

typedef enum io_iocb_cmd {
IO_CMD_PREAD = 0,
IO_CMD_PWRITE = 1,

IO_CMD_FSYNC = 2,
IO_CMD_FDSYNC = 3,

IO_CMD_POLL = 5,
IO_CMD_NOOP = 6,
} io_iocb_cmd_t;

struct io_iocb_common {
void *buf;
unsigned __pad1;
long nbytes;
unsigned __pad2;
long long offset;
long long __pad3;
unsigned flags;
unsigned resfd;
}; /* result code is the amount read or -'ve errno */

struct iocb {
void *data;
unsigned key;
short aio_lio_opcode;
short aio_reqprio;
int aio_fildes;
union {
struct io_iocb_common c;
} u;
};

struct io_event {
uint64_t data; /* the data field from the iocb */
uint64_t obj; /* what iocb this event came from */
int64_t res; /* result code for this event */
int64_t res2; /* secondary result */
};

/* Linux specific io calls */
static int
io_setup (guint nr_reqs, aio_context_t *ctx)
{
return syscall (SYS_io_setup, nr_reqs, ctx);
}

static int
io_destroy (aio_context_t ctx)
{
return syscall (SYS_io_destroy, ctx);
}

static int
io_getevents (aio_context_t ctx, long min_nr, long nr, struct io_event *events, struct timespec *tmo)
{
return syscall (SYS_io_getevents, ctx, min_nr, nr, events, tmo);
}

static int
io_submit (aio_context_t ctx, long n, struct iocb **paiocb)
{
return syscall (SYS_io_submit, ctx, n, paiocb);
}

static int
io_cancel (aio_context_t ctx, struct iocb *iocb, struct io_event *result)
{
return syscall (SYS_io_cancel, ctx, iocb, result);
}

# ifndef HAVE_SYS_EVENTFD_H
static int
eventfd(guint initval, guint flags)
{
return syscall (SYS_eventfd, initval);
}
# endif

#endif

/**
* AIO context
*/
struct aio_context {
struct event_base *base;
gboolean has_aio; /**< Whether we have aio support on a system */
#ifdef LINUX
/* Eventfd variant */
gint event_fd;
struct event eventfd_ev;
aio_context_t io_ctx;
#elif defined(HAVE_AIO_H)
/* POSIX aio */
struct event rtsigs[SIGRTMAX - SIGRTMIN];
#endif
};

#ifdef LINUX
/* Eventfd read callback */
static void
rspamd_eventfdcb (gint fd, gshort what, gpointer ud)
{
struct aio_context *ctx = ud;
guint64 ready;
gint done, i;
struct io_event event[64];
struct timespec ts;
struct io_cbdata *ev_data;

/* Eventfd returns number of events ready got from kernel */
if (read (fd, &ready, 8) != 8) {
if (errno == EAGAIN) {
return;
}
msg_err ("eventfd read returned error: %s", strerror (errno));
}

ts.tv_sec = 0;
ts.tv_nsec = 0;

while (ready) {
/* Get events ready */
done = io_getevents (ctx->io_ctx, 1, 64, event, &ts);

if (done > 0) {
ready -= done;

for (i = 0; i < done; i ++) {
ev_data = (struct io_cbdata *) (uintptr_t) event[i].data;
/* Call this callback */
ev_data->cb (event[i].res, ev_data->len, ev_data->buf, ev_data->ud);
}
}
else if (done == 0) {
/* No more events are ready */
return;
}
else {
msg_err ("io_getevents failed: %s", strerror (errno));
return;
}
}
}

#endif

/**
* Initialize aio with specified event base
*/
struct aio_context*
rspamd_aio_init (struct event_base *base)
{
struct aio_context *new;

/* First of all we need to detect which type of aio we can try to use */
new = g_malloc0 (sizeof (struct aio_context));
new->base = base;

#ifdef LINUX
/* On linux we are trying to use io (3) and eventfd for notifying */
new->event_fd = eventfd (0, 0);
if (new->event_fd == -1) {
msg_err ("eventfd failed: %s", strerror (errno));
}
else {
/* Set this socket non-blocking */
if (make_socket_nonblocking (new->event_fd) == -1) {
msg_err ("non blocking for eventfd failed: %s", strerror (errno));
close (new->event_fd);
}
else {
event_set (&new->eventfd_ev, new->event_fd, EV_READ|EV_PERSIST, rspamd_eventfdcb, new);
event_base_set (new->base, &new->eventfd_ev);
if (io_setup (MAX_AIO_EV, &new->io_ctx) == -1) {
msg_err ("io_setup failed: %s", strerror (errno));
close (new->event_fd);
}
else {
new->has_aio = TRUE;
}
}
}
#elif defined(HAVE_AIO_H)
/* TODO: implement this */
#endif

return new;
}

/**
* Open file for aio
*/
gint
rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags)
{
gint fd = -1;
/* Fallback */
if (!ctx->has_aio) {
return open (path, flags);
}
#ifdef LINUX

fd = open (path, flags | O_DIRECT | O_NONBLOCK);

return fd;
#elif defined(HAVE_AIO_H)
fd = open (path, flags | O_NONBLOCK);
#endif

return fd;
}

/**
* Asynchronous read of file
*/
gint
rspamd_aio_read (gint fd, gpointer buf, gsize len, struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud)
{
struct io_cbdata *cbdata;
gint r = -1;

if (ctx->has_aio) {
#ifdef LINUX
struct iocb *iocb[1];

cbdata = g_slice_alloc (sizeof (struct io_cbdata));
cbdata->cb = cb;
cbdata->buf = buf;
cbdata->len = len;
cbdata->ud = ud;

iocb[0] = alloca (sizeof (struct iocb));
memset (iocb[0], 0, sizeof (struct iocb));
iocb[0]->aio_fildes = fd;
iocb[0]->aio_lio_opcode = IO_CMD_PREAD;
iocb[0]->aio_reqprio = 0;
iocb[0]->u.c.buf = buf;
iocb[0]->u.c.nbytes = len;
iocb[0]->u.c.offset = 0;
iocb[0]->u.c.flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
iocb[0]->u.c.resfd = ctx->event_fd;
iocb[0]->data = cbdata;

/* Iocb is copied to kernel internally, so it is safe to put it on stack */
if (io_submit (ctx->io_ctx, 1, iocb) == 1) {
return len;
}
else {
if (errno == EAGAIN || errno == ENOSYS) {
/* Fall back to sync read */
goto blocking;
}
return -1;
}

#elif defined(HAVE_AIO_H)
#endif
}
else {
/* Blocking variant */
blocking:
r = read (fd, buf, len);
if (r >= 0) {
cb (0, r, buf, ud);
}
else {
cb (r, -1, buf, ud);
}
}

return r;
}

/**
* Asynchronous write of file
*/
gint
rspamd_aio_write (gint fd, gpointer buf, gsize len, struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud)
{
struct io_cbdata *cbdata;
gint r = -1;

if (ctx->has_aio) {
#ifdef LINUX
struct iocb *iocb[1];

cbdata = g_slice_alloc (sizeof (struct io_cbdata));
cbdata->cb = cb;
cbdata->buf = buf;
cbdata->len = len;
cbdata->ud = ud;

iocb[0] = alloca (sizeof (struct iocb));
memset (iocb[0], 0, sizeof (struct iocb));
iocb[0]->aio_fildes = fd;
iocb[0]->aio_lio_opcode = IO_CMD_PWRITE;
iocb[0]->aio_reqprio = 0;
iocb[0]->u.c.buf = buf;
iocb[0]->u.c.nbytes = len;
iocb[0]->u.c.offset = 0;
iocb[0]->u.c.flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
iocb[0]->u.c.resfd = ctx->event_fd;
iocb[0]->data = cbdata;

/* Iocb is copied to kernel internally, so it is safe to put it on stack */
if (io_submit (ctx->io_ctx, 1, iocb) == 1) {
return len;
}
else {
if (errno == EAGAIN || errno == ENOSYS) {
/* Fall back to sync read */
goto blocking;
}
return -1;
}

#elif defined(HAVE_AIO_H)
#endif
}
else {
/* Blocking variant */
blocking:
r = write (fd, buf, len);
if (r >= 0) {
cb (0, r, buf, ud);
}
else {
cb (r, -1, buf, ud);
}
}

return r;
}

/**
* Close of aio operations
*/
gint
rspamd_aio_close (gint fd, struct aio_context *ctx)
{
gint r = -1;

if (ctx->has_aio) {
#ifdef LINUX
struct iocb iocb;
struct io_event ev;

memset (&iocb, 0, sizeof (struct iocb));
iocb.aio_fildes = fd;
iocb.aio_lio_opcode = IO_CMD_NOOP;

/* Iocb is copied to kernel internally, so it is safe to put it on stack */
r = io_cancel (ctx->io_ctx, &iocb, &ev);
close (fd);
return r;

#elif defined(HAVE_AIO_H)
#endif
}

r = close (fd);

return r;
}

+ 65
- 0
src/aio_event.h 查看文件

@@ -0,0 +1,65 @@
/* Copyright (c) 2010-2011, Vsevolod Stakhov
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/


#ifndef AIO_EVENT_H_
#define AIO_EVENT_H_

#include "config.h"

/**
* AIO context
*/
struct aio_context;

/**
* Callback for notifying
*/
typedef void (*rspamd_aio_cb) (gint res, gsize len, gpointer data, gpointer ud);

/**
* Initialize aio with specified event base
*/
struct aio_context* rspamd_aio_init (struct event_base *base);

/**
* Open file for aio
*/
gint rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags);

/**
* Asynchronous read of file
*/
gint rspamd_aio_read (gint fd, gpointer buf, gsize len, struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud);

/**
* Asynchronous write of file
*/
gint rspamd_aio_write (gint fd, gpointer buf, gsize len, struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud);

/**
* Close of aio operations
*/
gint rspamd_aio_close (gint fd, struct aio_context *ctx);

#endif /* AIO_EVENT_H_ */

正在加载...
取消
保存