From bcece60fa1bfd4bbb09a64c058835fe3245e1d18 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 19 Feb 2009 21:16:30 +0300 Subject: * Implement rspamd IO with IO dispatcher (TODO: still some issues with timeouts must be resolved) --- CMakeLists.txt | 3 +- src/buffer.c | 355 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/buffer.h | 99 ++++++++++++++ src/controller.c | 210 +++++++++++++----------------- src/filter.c | 3 - src/fstring.h | 5 - src/main.h | 9 +- src/message.c | 4 +- src/plugins/regexp.c | 2 +- src/protocol.c | 50 ++++---- src/protocol.h | 2 +- src/util.c | 45 ------- src/util.h | 4 - src/worker.c | 87 +++++-------- 14 files changed, 615 insertions(+), 263 deletions(-) create mode 100644 src/buffer.c create mode 100644 src/buffer.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 70e14b472..8fedd0bff 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -211,7 +211,8 @@ SET(RSPAMDSRC src/modules.c src/fstring.c src/filter.c src/controller.c - src/cfg_utils.c) + src/cfg_utils.c + src/buffer.c) SET(TOKENIZERSSRC src/tokenizers/tokenizers.c src/tokenizers/osb.c) diff --git a/src/buffer.c b/src/buffer.c new file mode 100644 index 000000000..46ef98850 --- /dev/null +++ b/src/buffer.c @@ -0,0 +1,355 @@ +/* + * Copyright (c) 2009, Rambler media + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "buffer.h" + +#define G_DISPATCHER_ERROR dispatcher_error_quark() + +static void dispatcher_cb (int fd, short what, void *arg); + +static inline GQuark +dispatcher_error_quark (void) +{ + return g_quark_from_static_string ("g-dispatcher-error-quark"); +} + +#define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin) + +static void +write_buffers (int fd, rspamd_io_dispatcher_t *d) +{ + GList *cur; + GError *err; + rspamd_buffer_t *buf; + ssize_t r; + + /* Fix order */ + if (d->out_buffers) { + d->out_buffers = g_list_reverse (d->out_buffers); + } + cur = g_list_first (d->out_buffers); + while (cur) { + buf = (rspamd_buffer_t *)cur->data; + if (BUFREMAIN (buf) == 0) { + /* Skip empty buffers */ + cur = g_list_next (cur); + continue; + } + + r = write (fd, buf->pos, BUFREMAIN (buf)); + if (r == -1 && errno != EAGAIN) { + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); + d->err_callback (err, d->user_data); + return; + } + } + else if (r > 0) { + buf->pos += r; + if (BUFREMAIN (buf) != 0) { + /* Continue with this buffer */ + continue; + } + } + else if (r == 0) { + /* Got EOF while we wait for data */ + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF"); + d->err_callback (err, d->user_data); + return; + } + } + else if (errno == EAGAIN) { + /* Wait for other event */ + return; + } + cur = g_list_next (cur); + } + + if (cur == NULL) { + /* Disable write event for this time */ + g_list_free (d->out_buffers); + d->out_buffers = NULL; + + if (d->write_callback) { + d->write_callback (d->user_data); + } + + event_del (d->ev); + event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } +} + +static void +read_buffers (int fd, rspamd_io_dispatcher_t *d) +{ + ssize_t r, len; + GError *err; + f_str_t res; + char *c; + + if (d->in_buf == NULL) { + d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); + if (d->policy == BUFFER_LINE) { + d->in_buf->data = fstralloc (d->pool, BUFSIZ); + } + else { + d->in_buf->data = fstralloc (d->pool, d->nchars); + } + d->in_buf->pos = d->in_buf->data->begin; + } + + if (BUFREMAIN (d->in_buf) == 0) { + /* Buffer is full, try to call callback with overflow error */ + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, E2BIG, "buffer overflow"); + d->err_callback (err, d->user_data); + return; + } + } + else { + /* Try to read the whole buffer */ + r = read (fd, d->in_buf->pos, BUFREMAIN (d->in_buf)); + if (r == -1 && errno != EAGAIN) { + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); + d->err_callback (err, d->user_data); + return; + } + } + else if (r == 0) { + /* Got EOF while we wait for data */ + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF"); + d->err_callback (err, d->user_data); + return; + } + } + else if (errno == EAGAIN) { + return; + } + else { + d->in_buf->pos += r; + d->in_buf->data->len += r; + } + + } + + c = d->in_buf->data->begin; + r = 0; + len = d->in_buf->data->len; + + switch (d->policy) { + case BUFFER_LINE: + while (r < len) { + if (*c == '\r' || *c == '\n') { + res.begin = d->in_buf->data->begin; + res.len = r; + if (d->read_callback) { + d->read_callback (&res, d->user_data); + if (r < len - 1 && *(c + 1) == '\n') { + r ++; + c ++; + } + /* Move remaining string to begin of buffer (draining) */ + memmove (d->in_buf->data->begin, c, len - r); + c = d->in_buf->data->begin; + d->in_buf->data->len -= r + 1; + d->in_buf->pos -= r + 1; + r = 0; + len = d->in_buf->data->len; + continue; + } + } + r ++; + c ++; + } + break; + case BUFFER_CHARACTER: + while (r < len) { + if (r == d->nchars) { + res.begin = d->in_buf->data->begin; + res.len = r; + if (d->read_callback) { + d->read_callback (&res, d->user_data); + /* Move remaining string to begin of buffer (draining) */ + memmove (d->in_buf->data->begin, c, len - r); + c = d->in_buf->data->begin; + d->in_buf->data->len -= r; + d->in_buf->pos -= r; + r = 0; + len = d->in_buf->data->len; + continue; + } + + } + r ++; + c ++; + } + break; + } +} + +#undef BUFREMAIN + +static void +dispatcher_cb (int fd, short what, void *arg) +{ + rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *)arg; + GError *err; + + switch (what) { + case EV_TIMEOUT: + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, ETIMEDOUT, "IO timeout"); + d->err_callback (err, d->user_data); + } + break; + case EV_WRITE: + /* No data to write, disable further EV_WRITE to this fd */ + if (d->out_buffers == NULL) { + event_del (d->ev); + event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } + else { + write_buffers (fd, d); + } + break; + case EV_READ: + read_buffers (fd, d); + break; + } +} + + +rspamd_io_dispatcher_t* +rspamd_create_dispatcher (int fd, enum io_policy policy, + dispatcher_read_callback_t read_cb, + dispatcher_write_callback_t write_cb, + dispatcher_err_callback_t err_cb, + struct timeval *tv, void *user_data) +{ + rspamd_io_dispatcher_t *new; + + if (fd == -1) { + return NULL; + } + + new = g_malloc (sizeof (rspamd_io_dispatcher_t)); + bzero (new, sizeof (rspamd_io_dispatcher_t)); + + new->pool = memory_pool_new (memory_pool_get_size ()); + new->tv = memory_pool_alloc (new->pool, sizeof (struct timeval)); + memcpy (new->tv, tv, sizeof (struct timeval)); + new->policy = policy; + new->read_callback = read_cb; + new->write_callback = write_cb; + new->err_callback = err_cb; + new->user_data = user_data; + + new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event)); + new->fd = fd; + + event_set (new->ev, fd, EV_READ | EV_WRITE | EV_PERSIST, dispatcher_cb, (void *)new); + event_add (new->ev, new->tv); + + return new; +} + +void +rspamd_remove_dispatcher (rspamd_io_dispatcher_t *dispatcher) +{ + if (dispatcher != NULL) { + event_del (dispatcher->ev); + memory_pool_delete (dispatcher->pool); + g_free (dispatcher); + } +} + +void +rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d, + enum io_policy policy, + size_t nchars) +{ + f_str_t *tmp; + + if (d->policy != policy || d->nchars != nchars) { + d->policy = policy; + d->nchars = nchars ? nchars : BUFSIZ; + /* Resize input buffer if needed */ + if (policy == BUFFER_CHARACTER && nchars != 0) { + if (d->in_buf && d->in_buf->data->size < nchars) { + tmp = fstralloc (d->pool, d->nchars); + memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len); + tmp->len = d->in_buf->data->len; + d->in_buf->data = tmp; + } + } + else if (policy == BUFFER_LINE) { + if (d->in_buf && d->nchars < BUFSIZ) { + tmp = fstralloc (d->pool, BUFSIZ); + memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len); + tmp->len = d->in_buf->data->len; + d->in_buf->data = tmp; + } + } + } +} + +void +rspamd_dispatcher_write (rspamd_io_dispatcher_t *d, + void *data, + size_t len, gboolean delayed) +{ + rspamd_buffer_t *newbuf; + + newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); + newbuf->data = memory_pool_alloc (d->pool, sizeof (f_str_t)); + + newbuf->data->begin = memory_pool_alloc (d->pool, len); + memcpy (newbuf->data->begin, data, len); + newbuf->data->size = len; + newbuf->pos = newbuf->data->begin; + + d->out_buffers = g_list_prepend (d->out_buffers, newbuf); + + if (!delayed) { + event_del (d->ev); + event_set (d->ev, d->fd, EV_READ | EV_WRITE | EV_PERSIST, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } +} + +void +rspamd_dispatcher_pause (rspamd_io_dispatcher_t *d) +{ + event_del (d->ev); +} + +/* + * vi:ts=4 + */ diff --git a/src/buffer.h b/src/buffer.h new file mode 100644 index 000000000..93c7818dd --- /dev/null +++ b/src/buffer.h @@ -0,0 +1,99 @@ +/** + * @file buffer.h + * Implements buffered IO + */ + +#ifndef RSPAMD_BUFFER_H +#define RSPAMD_BUFFER_H + +#include "config.h" +#include "mem_pool.h" +#include "fstring.h" + +typedef void (*dispatcher_read_callback_t)(f_str_t *in, void *user_data); +typedef void (*dispatcher_write_callback_t)(void *user_data); +typedef void (*dispatcher_err_callback_t)(GError *err, void *user_data); + +/** + * Types of IO handling + */ +enum io_policy { + BUFFER_LINE, /**< call handler when we have line ready */ + BUFFER_CHARACTER, /**< call handler when we have some characters */ +}; + +/** + * Buffer structure + */ +typedef struct rspamd_buffer_s { + f_str_t *data; /**< buffer logic */ + char *pos; /**< current position */ +} rspamd_buffer_t; + +typedef struct rspamd_io_dispatcher_s { + rspamd_buffer_t *in_buf; /**< input buffer */ + GList *out_buffers; /**< out buffers chain */ + struct timeval *tv; /**< io timeout */ + struct event *ev; /**< libevent io event */ + memory_pool_t *pool; /**< where to store data */ + enum io_policy policy; /**< IO policy */ + size_t nchars; /**< how many chars to read */ + int fd; /**< descriptor */ + dispatcher_read_callback_t read_callback; /**< read callback */ + dispatcher_write_callback_t write_callback; /**< write callback */ + dispatcher_err_callback_t err_callback; /**< error callback */ + void *user_data; /**< user's data for callbacks */ +} rspamd_io_dispatcher_t; + +/** + * Creates rspamd IO dispatcher for specified descriptor + * @param fd descriptor to IO + * @param policy IO policy + * @param read_cb read callback handler + * @param write_cb write callback handler + * @param err_cb error callback handler + * @param tv IO timeout + * @param user_data pointer to user's data + * @return new dispatcher object or NULL in case of failure + */ +rspamd_io_dispatcher_t* rspamd_create_dispatcher (int fd, + enum io_policy policy, + dispatcher_read_callback_t read_cb, + dispatcher_write_callback_t write_cb, + dispatcher_err_callback_t err_cb, + struct timeval *tv, + void *user_data); + +/** + * Set new policy for dispatcher + * @param d pointer to dispatcher's object + * @param policy IO policy + * @param nchars number of characters in buffer for character policy + */ +void rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d, + enum io_policy policy, + size_t nchars); + +/** + * Write data when it would be possible + * @param d pointer to dispatcher's object + * @param data data to write + * @param len length of data + */ +void rspamd_dispatcher_write (rspamd_io_dispatcher_t *d, + void *data, + size_t len, gboolean delayed); + +/** + * Pause IO events on dispatcher + * @param d pointer to dispatcher's object + */ +void rspamd_dispatcher_pause (rspamd_io_dispatcher_t *d); + +/** + * Frees dispatcher object + * @param dispatcher pointer to dispatcher's object + */ +void rspamd_remove_dispatcher (rspamd_io_dispatcher_t *dispatcher); + +#endif diff --git a/src/controller.c b/src/controller.c index 41f8a9649..7733bc924 100644 --- a/src/controller.c +++ b/src/controller.c @@ -36,6 +36,9 @@ #define CRLF "\r\n" #define END "END" CRLF +/* 120 seconds for controller's IO */ +#define CONTROLLER_IO_TIMEOUT 120 + enum command_type { COMMAND_PASSWORD, COMMAND_QUIT, @@ -68,6 +71,7 @@ static GCompletion *comp; static time_t start_time; static char greetingbuf[1024]; +static struct timeval io_tv; static void sig_handler (int signo) @@ -110,8 +114,7 @@ free_session (struct controller_session *session) struct mime_part *p; msg_debug ("free_session: freeing session %p", session); - bufferevent_disable (session->bev, EV_READ | EV_WRITE); - bufferevent_free (session->bev); + rspamd_remove_dispatcher (session->dispatcher); while ((part = g_list_first (session->parts))) { session->parts = g_list_remove_link (session->parts, part); @@ -132,7 +135,7 @@ check_auth (struct controller_command *cmd, struct controller_session *session) if (cmd->privilleged && !session->authorized) { r = snprintf (out_buf, sizeof (out_buf), "not authorized" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return 0; } @@ -156,18 +159,18 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control if (!arg || *arg == '\0') { msg_debug ("process_command: empty password passed"); r = snprintf (out_buf, sizeof (out_buf), "password command requires one argument" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } if (strncmp (arg, session->cfg->control_password, strlen (arg)) == 0) { session->authorized = 1; r = snprintf (out_buf, sizeof (out_buf), "password accepted" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); } else { session->authorized = 0; r = snprintf (out_buf, sizeof (out_buf), "password NOT accepted" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); } break; case COMMAND_QUIT: @@ -176,7 +179,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control case COMMAND_RELOAD: if (check_auth (cmd, session)) { r = snprintf (out_buf, sizeof (out_buf), "reload request sent" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); kill (getppid (), SIGHUP); } break; @@ -199,13 +202,13 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control mem_st.shared_chunks_allocated); r += snprintf (out_buf + r, sizeof (out_buf) - r, "Chunks freed: %zd" CRLF, mem_st.chunks_freed); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); } break; case COMMAND_SHUTDOWN: if (check_auth (cmd, session)) { r = snprintf (out_buf, sizeof (out_buf), "shutdown request sent" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); kill (getppid (), SIGTERM); } break; @@ -235,7 +238,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control minutes, minutes > 1 ? "s" : " ", (int)uptime, uptime > 1 ? "s" : " "); } - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); } break; case COMMAND_LEARN: @@ -244,37 +247,28 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control if (!arg || *arg == '\0') { msg_debug ("process_command: no statfile specified in learn command"); r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } arg = *(cmd_args + 1); if (arg == NULL || *arg == '\0') { msg_debug ("process_command: no statfile size specified in learn command"); r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } size = strtoul (arg, &err_str, 10); if (err_str && *err_str != '\0') { msg_debug ("process_command: statfile size is invalid: %s", arg); r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF); - bufferevent_write (session->bev, out_buf, r); - return; - } - session->learn_buf = memory_pool_alloc0 (session->session_pool, sizeof (f_str_buf_t)); - session->learn_buf->buf = fstralloc (session->session_pool, size); - if (session->learn_buf->buf == NULL) { - r = snprintf (out_buf, sizeof (out_buf), "allocating buffer for learn failed" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } - session->learn_buf->pos = session->learn_buf->buf->begin; - update_buf_size (session->learn_buf); statfile = g_hash_table_lookup (session->cfg->statfiles, *cmd_args); if (statfile == NULL) { r = snprintf (out_buf, sizeof (out_buf), "statfile %s is not defined" CRLF, *cmd_args); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } @@ -302,7 +296,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control arg = *(cmd_args + 1); if (!arg || *arg == '\0') { r = snprintf (out_buf, sizeof (out_buf), "recipient is not defined" CRLF, arg); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } session->learn_rcpt = memory_pool_strdup (session->session_pool, arg); @@ -311,7 +305,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control arg = *(cmd_args + 1); if (!arg || *arg == '\0') { r = snprintf (out_buf, sizeof (out_buf), "from is not defined" CRLF, arg); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } session->learn_from = memory_pool_strdup (session->session_pool, arg); @@ -321,7 +315,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control break; default: r = snprintf (out_buf, sizeof (out_buf), "tokenizer is not defined" CRLF, arg); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } } @@ -333,15 +327,16 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control if (statfile_pool_create (session->worker->srv->statfile_pool, session->learn_filename, statfile->size / sizeof (struct stat_file_block)) == -1) { r = snprintf (out_buf, sizeof (out_buf), "cannot create statfile %s" CRLF, session->learn_filename); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } if (statfile_pool_open (session->worker->srv->statfile_pool, session->learn_filename) == -1) { r = snprintf (out_buf, sizeof (out_buf), "cannot open statfile %s" CRLF, session->learn_filename); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } } + rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size); session->state = STATE_LEARN; } break; @@ -355,13 +350,13 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control "(*) shutdown - shutdown rspamd" CRLF " stat - show different rspamd stat" CRLF " uptime - rspamd uptime" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); break; } } static void -read_socket (struct bufferevent *bev, void *arg) +read_socket (f_str_t *in, void *arg) { struct controller_session *session = (struct controller_session *)arg; struct classifier_ctx *cls_ctx; @@ -375,101 +370,73 @@ read_socket (struct bufferevent *bev, void *arg) switch (session->state) { case STATE_COMMAND: - s = buffer_readline (session->session_pool, EVBUFFER_INPUT (bev)); - msg_debug ("read_socket: got '%s' string from user", s); - if (s != NULL && *s != 0) { - len = strlen (s); - /* Remove end of line characters from string */ - if (s[len - 1] == '\n') { - if (s[len - 2] == '\r') { - s[len - 2] = 0; - } - s[len - 1] = 0; - } - params = g_strsplit (s, " ", -1); - len = g_strv_length (params); - if (len > 0) { - cmd = g_strstrip (params[0]); - comp_list = g_completion_complete (comp, cmd, NULL); - switch (g_list_length (comp_list)) { - case 1: - process_command ((struct controller_command *)comp_list->data, ¶ms[1], session); - break; - case 0: - msg_debug ("Unknown command: '%s'", cmd); - i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF); - bufferevent_write (bev, out_buf, i); - break; - default: - msg_debug ("Ambigious command: '%s'", cmd); - i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF); - bufferevent_write (bev, out_buf, i); - break; - } + s = fstrcstr (in, session->session_pool); + params = g_strsplit (s, " ", -1); + len = g_strv_length (params); + if (len > 0) { + cmd = g_strstrip (params[0]); + comp_list = g_completion_complete (comp, cmd, NULL); + switch (g_list_length (comp_list)) { + case 1: + process_command ((struct controller_command *)comp_list->data, ¶ms[1], session); + break; + case 0: + msg_debug ("Unknown command: '%s'", cmd); + i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE); + break; + default: + msg_debug ("Ambigious command: '%s'", cmd); + i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE); + break; } - if (session->state == STATE_COMMAND) { - session->state = STATE_REPLY; - } - if (session->state != STATE_LEARN) { - bufferevent_write (bev, END, sizeof (END) - 1); - bufferevent_enable (bev, EV_WRITE); - } - g_strfreev (params); } - else { - bufferevent_enable (bev, EV_WRITE); - } + if (session->state == STATE_COMMAND) { + session->state = STATE_REPLY; + } + if (session->state != STATE_LEARN) { + rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE); + } + + g_strfreev (params); break; case STATE_LEARN: - i = bufferevent_read (bev, session->learn_buf->pos, session->learn_buf->free); - if (i > 0) { - session->learn_buf->pos += i; - update_buf_size (session->learn_buf); - if (session->learn_buf->free == 0) { - process_learn (session); - while ((content = get_next_text_part (session->session_pool, session->parts, &cur)) != NULL) { - c.begin = content->data; - c.len = content->len; - if (!session->learn_tokenizer->tokenize_func (session->learn_tokenizer, - session->session_pool, &c, &tokens)) { - i = snprintf (out_buf, sizeof (out_buf), "learn fail, tokenizer error" CRLF); - bufferevent_write (bev, out_buf, i); - session->state = STATE_REPLY; - return; - } - } - cls_ctx = session->learn_classifier->init_func (session->session_pool); - session->learn_classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool, - session->learn_filename, tokens, session->in_class); - session->worker->srv->stat->messages_learned ++; - i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF); - bufferevent_write (bev, out_buf, i); - bufferevent_enable (bev, EV_WRITE); - - /* Clean learned parts */ - while ((cur = g_list_first (session->parts))) { - session->parts = g_list_remove_link (session->parts, cur); - p = (struct mime_part *)cur->data; - g_byte_array_free (p->content, FALSE); - g_list_free_1 (cur); - } - + session->learn_buf = in; + process_learn (session); + while ((content = get_next_text_part (session->session_pool, session->parts, &cur)) != NULL) { + c.begin = content->data; + c.len = content->len; + if (!session->learn_tokenizer->tokenize_func (session->learn_tokenizer, + session->session_pool, &c, &tokens)) { + i = snprintf (out_buf, sizeof (out_buf), "learn fail, tokenizer error" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE); session->state = STATE_REPLY; - break; + return; } } - else { - i = snprintf (out_buf, sizeof (out_buf), "read error: %d" CRLF, i); - bufferevent_write (bev, out_buf, i); - bufferevent_enable (bev, EV_WRITE); - session->state = STATE_REPLY; + cls_ctx = session->learn_classifier->init_func (session->session_pool); + session->learn_classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool, + session->learn_filename, tokens, session->in_class); + session->worker->srv->stat->messages_learned ++; + i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE); + + /* Clean learned parts */ + while ((cur = g_list_first (session->parts))) { + session->parts = g_list_remove_link (session->parts, cur); + p = (struct mime_part *)cur->data; + g_byte_array_free (p->content, FALSE); + g_list_free_1 (cur); } + + session->state = STATE_REPLY; break; } } static void -write_socket (struct bufferevent *bev, void *arg) +write_socket (void *arg) { struct controller_session *session = (struct controller_session *)arg; @@ -482,16 +449,15 @@ write_socket (struct bufferevent *bev, void *arg) } else if (session->state == STATE_REPLY) { session->state = STATE_COMMAND; + rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_LINE, BUFSIZ); } - bufferevent_disable (bev, EV_WRITE); - bufferevent_enable (bev, EV_READ); } static void -err_socket (struct bufferevent *bev, short what, void *arg) +err_socket (GError *err, void *arg) { struct controller_session *session = (struct controller_session *)arg; - msg_info ("closing control connection"); + msg_info ("err_socket: abnormally closing control connection, error: %s", err->message); /* Free buffers */ free_session (session); } @@ -525,10 +491,11 @@ accept_socket (int fd, short what, void *arg) new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1); worker->srv->stat->control_connections_count ++; - /* Read event */ - new_session->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_session); - bufferevent_write (new_session->bev, greetingbuf, strlen (greetingbuf)); - bufferevent_enable (new_session->bev, EV_WRITE); + /* Set up dispatcher */ + new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, + write_socket, err_socket, &io_tv, + (void *)new_session); + rspamd_dispatcher_write (new_session->dispatcher, greetingbuf, strlen (greetingbuf), FALSE); } void @@ -592,6 +559,9 @@ start_controller (struct rspamd_worker *worker) /* Send SIGUSR2 to parent */ kill (getppid (), SIGUSR2); + + io_tv.tv_sec = CONTROLLER_IO_TIMEOUT; + io_tv.tv_usec = 0; event_loop (0); } diff --git a/src/filter.c b/src/filter.c index 088ac4376..a1d1e43fa 100644 --- a/src/filter.c +++ b/src/filter.c @@ -234,9 +234,6 @@ continue_process_filters (struct worker_task *task) } /* Process all metrics */ g_hash_table_foreach (task->results, metric_process_callback, task); - /* All done */ - bufferevent_enable (task->bev, EV_WRITE); - evbuffer_drain (task->bev->output, EVBUFFER_LENGTH (task->bev->output)); process_statfiles (task); return 1; } diff --git a/src/fstring.h b/src/fstring.h index 896cd8dcf..000ba74c6 100644 --- a/src/fstring.h +++ b/src/fstring.h @@ -5,12 +5,7 @@ #ifndef FSTRING_H #define FSTRING_H -#include #include "config.h" - -#ifdef HAVE_STDINT_H -#include -#endif #include "mem_pool.h" #define update_buf_size(x) (x)->free = (x)->buf->size - ((x)->pos - (x)->buf->begin); (x)->buf->len = (x)->pos - (x)->buf->begin diff --git a/src/main.h b/src/main.h index e1244d0ba..ec74ad03b 100644 --- a/src/main.h +++ b/src/main.h @@ -14,6 +14,7 @@ #include "memcached.h" #include "protocol.h" #include "filter.h" +#include "buffer.h" /* Default values */ #define FIXED_CONFIG_FILE "./rspamd.conf" @@ -133,14 +134,14 @@ struct controller_session { /* Access to authorized commands */ int authorized; /**< whether this session is authorized */ memory_pool_t *session_pool; /**< memory pool for session */ - struct bufferevent *bev; /**< buffered event for IO */ struct config_file *cfg; /**< pointer to config file */ char *learn_rcpt; /**< recipient for learning */ char *learn_from; /**< from address for learning */ struct tokenizer *learn_tokenizer; /**< tokenizer for learning */ struct classifier *learn_classifier; /**< classifier for learning */ char *learn_filename; /**< real filename for learning */ - f_str_buf_t *learn_buf; /**< learn input */ + rspamd_io_dispatcher_t *dispatcher; /**< IO dispatcher object */ + f_str_t *learn_buf; /**< learn input */ GList *parts; /**< extracted mime parts */ int in_class; /**< positive or negative learn */ }; @@ -168,8 +169,8 @@ struct worker_task { GList *rcpt; /**< recipients list */ unsigned int nrcpt; /**< number of recipients */ struct in_addr from_addr; /**< client addr in numeric form */ - f_str_buf_t *msg; /**< message buffer */ - struct bufferevent *bev; /**< buffered event for IO */ + f_str_t *msg; /**< message buffer */ + rspamd_io_dispatcher_t *dispatcher; /**< IO dispatcher object */ memcached_ctx_t *memc_ctx; /**< memcached context associated with task */ int parts_count; /**< mime parts count */ GMimeMessage *message; /**< message, parsed with GMime */ diff --git a/src/message.c b/src/message.c index 5c0f15cf9..d5ec43653 100644 --- a/src/message.c +++ b/src/message.c @@ -304,7 +304,7 @@ process_message (struct worker_task *task) GMimeParser *parser; GMimeStream *stream; - stream = g_mime_stream_mem_new_with_buffer (task->msg->buf->begin, task->msg->buf->len); + stream = g_mime_stream_mem_new_with_buffer (task->msg->begin, task->msg->len); /* create a new parser object to parse the stream */ parser = g_mime_parser_new_with_stream (stream); @@ -393,7 +393,7 @@ process_learn (struct controller_session *session) GMimeParser *parser; GMimeStream *stream; - stream = g_mime_stream_mem_new_with_buffer (session->learn_buf->buf->begin, session->learn_buf->buf->len); + stream = g_mime_stream_mem_new_with_buffer (session->learn_buf->begin, session->learn_buf->len); /* create a new parser object to parse the stream */ parser = g_mime_parser_new_with_stream (stream); diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c index 1ecfcb36d..9cc37ea7e 100644 --- a/src/plugins/regexp.c +++ b/src/plugins/regexp.c @@ -180,7 +180,7 @@ process_regexp (struct rspamd_regexp *re, struct worker_task *task) return 0; case REGEXP_MESSAGE: msg_debug ("process_message: checking message regexp: /%s/", re->regexp_text); - if (g_regex_match_full (re->regexp, task->msg->buf->begin, task->msg->buf->len, 0, 0, NULL, NULL) == TRUE) { + if (g_regex_match_full (re->regexp, task->msg->begin, task->msg->len, 0, 0, NULL, NULL) == TRUE) { return 1; } return 0; diff --git a/src/protocol.c b/src/protocol.c index 92b331ac2..ab0db5f78 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -174,14 +174,17 @@ static int parse_header (struct worker_task *task, char *line) { char *headern, *err, *tmp; - + + msg_debug ("parse_header: got line from worker: %s", line); /* Check end of headers */ if (*line == '\0') { + msg_debug ("parse_header: got empty line, assume it as end of headers"); if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) { task->state = WRITE_REPLY; } else { if (task->content_length > 0) { + rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_CHARACTER, task->content_length); task->state = READ_MESSAGE; } else { @@ -209,14 +212,7 @@ parse_header (struct worker_task *task, char *line) if (strncasecmp (headern, CONTENT_LENGTH_HEADER, sizeof (CONTENT_LENGTH_HEADER) - 1) == 0) { if (task->content_length == 0) { task->content_length = strtoul (line, &err, 10); - task->msg = memory_pool_alloc0 (task->task_pool, sizeof (f_str_buf_t)); - task->msg->buf = fstralloc (task->task_pool, task->content_length); - if (task->msg->buf == NULL) { - msg_err ("read_socket: cannot allocate memory for message buffer"); - return -1; - } - task->msg->pos = task->msg->buf->begin; - update_buf_size (task->msg); + msg_debug ("parse_header: read Content-Length header, value: %lu", (unsigned long int)task->content_length); } } else { @@ -229,6 +225,7 @@ parse_header (struct worker_task *task, char *line) /* helo */ if (strncasecmp (headern, HELO_HEADER, sizeof (HELO_HEADER) - 1) == 0) { task->helo = memory_pool_strdup (task->task_pool, line); + msg_debug ("parse_header: read helo header, value: %s", task->helo); } else { msg_info ("parse_header: wrong header: %s", headern); @@ -240,6 +237,7 @@ parse_header (struct worker_task *task, char *line) /* from */ if (strncasecmp (headern, FROM_HEADER, sizeof (FROM_HEADER) - 1) == 0) { task->from = memory_pool_strdup (task->task_pool, line); + msg_debug ("parse_header: read from header, value: %s", task->from); } else { msg_info ("parse_header: wrong header: %s", headern); @@ -252,6 +250,7 @@ parse_header (struct worker_task *task, char *line) if (strncasecmp (headern, RCPT_HEADER, sizeof (RCPT_HEADER) - 1) == 0) { tmp = memory_pool_strdup (task->task_pool, line); task->rcpt = g_list_prepend (task->rcpt, tmp); + msg_debug ("parse_header: read rcpt header, value: %s", tmp); } else { msg_info ("parse_header: wrong header: %s", headern); @@ -263,6 +262,7 @@ parse_header (struct worker_task *task, char *line) /* nrcpt */ if (strncasecmp (headern, NRCPT_HEADER, sizeof (NRCPT_HEADER) - 1) == 0) { task->nrcpt = strtoul (line, &err, 10); + msg_debug ("parse_header: read rcpt header, value: %d", (int)task->nrcpt); } else { msg_info ("parse_header: wrong header: %s", headern); @@ -277,6 +277,7 @@ parse_header (struct worker_task *task, char *line) msg_info ("parse_header: bad ip header: '%s'", line); return -1; } + msg_debug ("parse_header: read IP header, value: %s", line); } else { msg_info ("parse_header: wrong header: %s", headern); @@ -292,14 +293,14 @@ parse_header (struct worker_task *task, char *line) } int -read_rspamd_input_line (struct worker_task *task, char *line) +read_rspamd_input_line (struct worker_task *task, f_str_t *line) { switch (task->state) { case READ_COMMAND: - return parse_command (task, line); + return parse_command (task, fstrcstr (line, task->task_pool)); break; case READ_HEADER: - return parse_header (task, line); + return parse_header (task, fstrcstr (line, task->task_pool)); break; } } @@ -323,7 +324,7 @@ show_url_header (struct worker_task *task) /* Do header folding */ if (host.len + r >= OUTBUFSIZ - 3) { outbuf[r ++] = '\r'; outbuf[r ++] = '\n'; outbuf[r] = ' '; - bufferevent_write (task->bev, outbuf, r); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE); r = 0; } /* Write url host to buf */ @@ -340,7 +341,7 @@ show_url_header (struct worker_task *task) *(host.begin + host.len) = c; } } - bufferevent_write (task->bev, outbuf, r); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE); } static void @@ -363,7 +364,7 @@ show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data r = snprintf (outbuf, sizeof (outbuf), "%s: %s ; %.2f / %.2f" CRLF, (char *)metric_name, (is_spam) ? "True" : "False", metric_res->score, metric_res->metric->required_score); } - bufferevent_write (task->bev, outbuf, r); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE); } static int @@ -374,7 +375,7 @@ write_check_reply (struct worker_task *task) struct metric_result *metric_res; r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, "OK"); - bufferevent_write (task->bev, outbuf, r); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE); if (task->proto == SPAMC_PROTO) { /* Ignore metrics, just write report for 'default' metric */ metric_res = g_hash_table_lookup (task->results, "default"); @@ -391,7 +392,7 @@ write_check_reply (struct worker_task *task) /* URL stat */ show_url_header (task); } - bufferevent_write (task->bev, CRLF, sizeof (CRLF) - 1); + rspamd_dispatcher_write (task->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE); return 0; } @@ -423,7 +424,7 @@ show_metric_symbols (gpointer metric_name, gpointer metric_value, void *user_dat g_list_free (symbols); msg_debug ("show_metric_symbols: write symbols line: %s", outbuf); outbuf[r++] = '\r'; outbuf[r++] = '\n'; - bufferevent_write (task->bev, outbuf, r); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE); } static int @@ -450,6 +451,7 @@ write_symbols_reply (struct worker_task *task) /* Write result for each metric separately */ g_hash_table_foreach (task->results, show_metric_symbols, task); } + return 0; } @@ -460,9 +462,9 @@ write_process_reply (struct worker_task *task) char outbuf[OUTBUFSIZ]; r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF "Content-Length: %zd" CRLF CRLF, - (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, "OK", task->msg->buf->len); - bufferevent_write (task->bev, outbuf, r); - bufferevent_write (task->bev, task->msg->buf->begin, task->msg->buf->len); + (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, "OK", task->msg->len); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE); + rspamd_dispatcher_write (task->dispatcher, task->msg->begin, task->msg->len, FALSE); return 0; } @@ -486,7 +488,7 @@ write_reply (struct worker_task *task) msg_debug ("write_reply: writing error: %s", outbuf); } /* Write to bufferevent error message */ - bufferevent_write (task->bev, outbuf, r); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE); } else { switch (task->cmd) { @@ -504,11 +506,11 @@ write_reply (struct worker_task *task) case CMD_SKIP: r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, SPAMD_OK); - bufferevent_write (task->bev, outbuf, r); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE); break; case CMD_PING: r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER); - bufferevent_write (task->bev, outbuf, r); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE); break; } } diff --git a/src/protocol.h b/src/protocol.h index 243e216aa..74e7f7f98 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -36,7 +36,7 @@ enum rspamd_command { * @param line line of user's input * @return 0 if line was successfully parsed and -1 if we have protocol error */ -int read_rspamd_input_line (struct worker_task *task, char *line); +int read_rspamd_input_line (struct worker_task *task, f_str_t *line); /** * Write reply for specified task command diff --git a/src/util.c b/src/util.c index 952c3a93e..cffa5e06b 100644 --- a/src/util.c +++ b/src/util.c @@ -900,51 +900,6 @@ resolve_stat_filename (memory_pool_t *pool, char *pattern, char *rcpt, char *fro return new; } -/* - * These functions are from libevent where they are static and not exported anywhere - * XXX: think how to avoid this - */ - -char * -buffer_readline (memory_pool_t *pool, struct evbuffer *buf) -{ - u_char *data = EVBUFFER_DATA (buf); - size_t len = EVBUFFER_LENGTH (buf); - char *line, fch, sch; - unsigned int i; - - for (i = 0; i < len; i++) { - if (data[i] == '\r' || data[i] == '\n') { - break; - } - } - - if (i == len) { - return (NULL); - } - - line = memory_pool_alloc (pool, i + 1); - - memcpy (line, data, i); - line[i] = '\0'; - - /* - * Some protocols terminate a line with '\r\n', so check for - * that, too. - */ - if ( i < len - 1 ) { - fch = data[i], sch = data[i+1]; - - /* Drain one more character if needed */ - if ( (sch == '\r' || sch == '\n') && sch != fch ) - i += 1; - } - - evbuffer_drain (buf, i + 1); - - return (line); -} - /* * vi:ts=4 */ diff --git a/src/util.h b/src/util.h index fa5bbfbdb..69ae0fe92 100644 --- a/src/util.h +++ b/src/util.h @@ -55,8 +55,4 @@ void file_log_function (const gchar *log_domain, GLogLevelFlags log_level, const /* Replace %r with rcpt value and %f with from value, new string is allocated in pool */ char* resolve_stat_filename (memory_pool_t *pool, char *pattern, char *rcpt, char *from); -/* Replace libevent evbuffer_readline with memory_pool variant */ -char* buffer_readline (memory_pool_t *pool, struct evbuffer *buf); - - #endif diff --git a/src/worker.c b/src/worker.c index caacda838..6d0c413cd 100644 --- a/src/worker.c +++ b/src/worker.c @@ -40,6 +40,8 @@ #include /* from the Perl distribution */ #define TASK_POOL_SIZE 4095 +/* 2 seconds for worker's IO */ +#define WORKER_IO_TIMEOUT 2 const f_str_t CRLF = { /* begin */"\r\n", @@ -47,8 +49,12 @@ const f_str_t CRLF = { /* size */2 }; +static struct timeval io_tv; + extern PerlInterpreter *perl_interpreter; +static void write_socket (void *arg); + static void sig_handler (int signo) { @@ -113,8 +119,7 @@ free_task (struct worker_task *task) g_list_free_1 (part); } memory_pool_delete (task->task_pool); - bufferevent_disable (task->bev, EV_READ | EV_WRITE); - bufferevent_free (task->bev); + rspamd_remove_dispatcher (task->dispatcher); close (task->sock); g_free (task); } @@ -124,66 +129,40 @@ free_task (struct worker_task *task) * Callback that is called when there is data to read in buffer */ static void -read_socket (struct bufferevent *bev, void *arg) +read_socket (f_str_t *in, void *arg) { struct worker_task *task = (struct worker_task *)arg; ssize_t r; - char *s; switch (task->state) { case READ_COMMAND: case READ_HEADER: - s = buffer_readline (task->task_pool, EVBUFFER_INPUT (bev)); - if (s == NULL) { - msg_debug ("read_socket: got incomplete line from user"); - return; - } - if (read_rspamd_input_line (task, s) != 0) { + if (read_rspamd_input_line (task, in) != 0) { task->last_error = "Read error"; task->error_code = RSPAMD_NETWORK_ERROR; task->state = WRITE_ERROR; - } - if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) { - bufferevent_enable (bev, EV_WRITE); - bufferevent_disable (bev, EV_READ); + write_socket (task); } break; case READ_MESSAGE: - r = bufferevent_read (bev, task->msg->pos, task->msg->free); - if (r > 0) { - task->msg->pos += r; - msg_debug ("read_socket: read %zd bytes from socket, %zd bytes left", r, task->msg->free); - update_buf_size (task->msg); - if (task->msg->free == 0) { - r = process_message (task); - r = process_filters (task); - if (r == -1) { - task->last_error = "Filter processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; - } - else if (r == 0) { - task->state = WAIT_FILTER; - } - else { - process_statfiles (task); - } - } - if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) { - bufferevent_enable (bev, EV_WRITE); - bufferevent_disable (bev, EV_READ); - evbuffer_drain (bev->output, EVBUFFER_LENGTH (bev->output)); - } + task->msg = in; + r = process_message (task); + r = process_filters (task); + if (r == -1) { + task->last_error = "Filter processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + write_socket (task); + } + else if (r == 0) { + task->state = WAIT_FILTER; + rspamd_dispatcher_pause (task->dispatcher); } else { - msg_warn ("read_socket: cannot read data to buffer (free space: %zd): %ld", task->msg->free, (long int)r); - bufferevent_disable (bev, EV_READ); - free_task (task); + process_statfiles (task); + write_socket (task); } break; - case WAIT_FILTER: - bufferevent_disable (bev, EV_READ); - break; } } @@ -191,7 +170,7 @@ read_socket (struct bufferevent *bev, void *arg) * Callback for socket writing */ static void -write_socket (struct bufferevent *bev, void *arg) +write_socket (void *arg) { struct worker_task *task = (struct worker_task *)arg; @@ -199,12 +178,10 @@ write_socket (struct bufferevent *bev, void *arg) case WRITE_REPLY: write_reply (task); task->state = CLOSING_CONNECTION; - bufferevent_disable (bev, EV_READ); break; case WRITE_ERROR: write_reply (task); task->state = CLOSING_CONNECTION; - bufferevent_disable (bev, EV_READ); break; case CLOSING_CONNECTION: msg_debug ("write_socket: normally closing connection"); @@ -221,10 +198,10 @@ write_socket (struct bufferevent *bev, void *arg) * Called if something goes wrong */ static void -err_socket (struct bufferevent *bev, short what, void *arg) +err_socket (GError *err, void *arg) { struct worker_task *task = (struct worker_task *)arg; - msg_info ("err_socket: abnormally closing connection"); + msg_info ("err_socket: abnormally closing connection, error: %s", err->message); /* Free buffers */ free_task (task); } @@ -266,9 +243,10 @@ accept_socket (int fd, short what, void *arg) memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results); worker->srv->stat->connections_count ++; - /* Read event */ - new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task); - bufferevent_enable (new_task->bev, EV_READ); + /* Set up dispatcher */ + new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, + write_socket, err_socket, &io_tv, + (void *)new_task); } /* @@ -304,6 +282,9 @@ start_worker (struct rspamd_worker *worker, int listen_sock) /* Send SIGUSR2 to parent */ kill (getppid (), SIGUSR2); + io_tv.tv_sec = WORKER_IO_TIMEOUT; + io_tv.tv_usec = 0; + event_loop (0); } -- cgit v1.2.3