summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-02-19 21:16:30 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-02-19 21:16:30 +0300
commitbcece60fa1bfd4bbb09a64c058835fe3245e1d18 (patch)
tree813faac7ee96029e604a8a73c88ffee4e10a579a
parent2dca592dce2fdde806af89e5cf036bd11bd4df61 (diff)
downloadrspamd-bcece60fa1bfd4bbb09a64c058835fe3245e1d18.tar.gz
rspamd-bcece60fa1bfd4bbb09a64c058835fe3245e1d18.zip
* Implement rspamd IO with IO dispatcher (TODO: still some issues with timeouts must be resolved)
-rw-r--r--CMakeLists.txt3
-rw-r--r--src/buffer.c355
-rw-r--r--src/buffer.h99
-rw-r--r--src/controller.c210
-rw-r--r--src/filter.c3
-rw-r--r--src/fstring.h5
-rw-r--r--src/main.h9
-rw-r--r--src/message.c4
-rw-r--r--src/plugins/regexp.c2
-rw-r--r--src/protocol.c50
-rw-r--r--src/protocol.h2
-rw-r--r--src/util.c45
-rw-r--r--src/util.h4
-rw-r--r--src/worker.c87
14 files changed, 615 insertions, 263 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 70e14b472..8fedd0bff 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -211,7 +211,8 @@ SET(RSPAMDSRC src/modules.c
src/fstring.c
src/filter.c
src/controller.c
- src/cfg_utils.c)
+ src/cfg_utils.c
+ src/buffer.c)
SET(TOKENIZERSSRC src/tokenizers/tokenizers.c
src/tokenizers/osb.c)
diff --git a/src/buffer.c b/src/buffer.c
new file mode 100644
index 000000000..46ef98850
--- /dev/null
+++ b/src/buffer.c
@@ -0,0 +1,355 @@
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "buffer.h"
+
+#define G_DISPATCHER_ERROR dispatcher_error_quark()
+
+static void dispatcher_cb (int fd, short what, void *arg);
+
+static inline GQuark
+dispatcher_error_quark (void)
+{
+ return g_quark_from_static_string ("g-dispatcher-error-quark");
+}
+
+#define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
+
+static void
+write_buffers (int fd, rspamd_io_dispatcher_t *d)
+{
+ GList *cur;
+ GError *err;
+ rspamd_buffer_t *buf;
+ ssize_t r;
+
+ /* Fix order */
+ if (d->out_buffers) {
+ d->out_buffers = g_list_reverse (d->out_buffers);
+ }
+ cur = g_list_first (d->out_buffers);
+ while (cur) {
+ buf = (rspamd_buffer_t *)cur->data;
+ if (BUFREMAIN (buf) == 0) {
+ /* Skip empty buffers */
+ cur = g_list_next (cur);
+ continue;
+ }
+
+ r = write (fd, buf->pos, BUFREMAIN (buf));
+ if (r == -1 && errno != EAGAIN) {
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+ d->err_callback (err, d->user_data);
+ return;
+ }
+ }
+ else if (r > 0) {
+ buf->pos += r;
+ if (BUFREMAIN (buf) != 0) {
+ /* Continue with this buffer */
+ continue;
+ }
+ }
+ else if (r == 0) {
+ /* Got EOF while we wait for data */
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
+ d->err_callback (err, d->user_data);
+ return;
+ }
+ }
+ else if (errno == EAGAIN) {
+ /* Wait for other event */
+ return;
+ }
+ cur = g_list_next (cur);
+ }
+
+ if (cur == NULL) {
+ /* Disable write event for this time */
+ g_list_free (d->out_buffers);
+ d->out_buffers = NULL;
+
+ if (d->write_callback) {
+ d->write_callback (d->user_data);
+ }
+
+ event_del (d->ev);
+ event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
+}
+
+static void
+read_buffers (int fd, rspamd_io_dispatcher_t *d)
+{
+ ssize_t r, len;
+ GError *err;
+ f_str_t res;
+ char *c;
+
+ if (d->in_buf == NULL) {
+ d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
+ if (d->policy == BUFFER_LINE) {
+ d->in_buf->data = fstralloc (d->pool, BUFSIZ);
+ }
+ else {
+ d->in_buf->data = fstralloc (d->pool, d->nchars);
+ }
+ d->in_buf->pos = d->in_buf->data->begin;
+ }
+
+ if (BUFREMAIN (d->in_buf) == 0) {
+ /* Buffer is full, try to call callback with overflow error */
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, E2BIG, "buffer overflow");
+ d->err_callback (err, d->user_data);
+ return;
+ }
+ }
+ else {
+ /* Try to read the whole buffer */
+ r = read (fd, d->in_buf->pos, BUFREMAIN (d->in_buf));
+ if (r == -1 && errno != EAGAIN) {
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+ d->err_callback (err, d->user_data);
+ return;
+ }
+ }
+ else if (r == 0) {
+ /* Got EOF while we wait for data */
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
+ d->err_callback (err, d->user_data);
+ return;
+ }
+ }
+ else if (errno == EAGAIN) {
+ return;
+ }
+ else {
+ d->in_buf->pos += r;
+ d->in_buf->data->len += r;
+ }
+
+ }
+
+ c = d->in_buf->data->begin;
+ r = 0;
+ len = d->in_buf->data->len;
+
+ switch (d->policy) {
+ case BUFFER_LINE:
+ while (r < len) {
+ if (*c == '\r' || *c == '\n') {
+ res.begin = d->in_buf->data->begin;
+ res.len = r;
+ if (d->read_callback) {
+ d->read_callback (&res, d->user_data);
+ if (r < len - 1 && *(c + 1) == '\n') {
+ r ++;
+ c ++;
+ }
+ /* Move remaining string to begin of buffer (draining) */
+ memmove (d->in_buf->data->begin, c, len - r);
+ c = d->in_buf->data->begin;
+ d->in_buf->data->len -= r + 1;
+ d->in_buf->pos -= r + 1;
+ r = 0;
+ len = d->in_buf->data->len;
+ continue;
+ }
+ }
+ r ++;
+ c ++;
+ }
+ break;
+ case BUFFER_CHARACTER:
+ while (r < len) {
+ if (r == d->nchars) {
+ res.begin = d->in_buf->data->begin;
+ res.len = r;
+ if (d->read_callback) {
+ d->read_callback (&res, d->user_data);
+ /* Move remaining string to begin of buffer (draining) */
+ memmove (d->in_buf->data->begin, c, len - r);
+ c = d->in_buf->data->begin;
+ d->in_buf->data->len -= r;
+ d->in_buf->pos -= r;
+ r = 0;
+ len = d->in_buf->data->len;
+ continue;
+ }
+
+ }
+ r ++;
+ c ++;
+ }
+ break;
+ }
+}
+
+#undef BUFREMAIN
+
+static void
+dispatcher_cb (int fd, short what, void *arg)
+{
+ rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *)arg;
+ GError *err;
+
+ switch (what) {
+ case EV_TIMEOUT:
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, ETIMEDOUT, "IO timeout");
+ d->err_callback (err, d->user_data);
+ }
+ break;
+ case EV_WRITE:
+ /* No data to write, disable further EV_WRITE to this fd */
+ if (d->out_buffers == NULL) {
+ event_del (d->ev);
+ event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
+ else {
+ write_buffers (fd, d);
+ }
+ break;
+ case EV_READ:
+ read_buffers (fd, d);
+ break;
+ }
+}
+
+
+rspamd_io_dispatcher_t*
+rspamd_create_dispatcher (int fd, enum io_policy policy,
+ dispatcher_read_callback_t read_cb,
+ dispatcher_write_callback_t write_cb,
+ dispatcher_err_callback_t err_cb,
+ struct timeval *tv, void *user_data)
+{
+ rspamd_io_dispatcher_t *new;
+
+ if (fd == -1) {
+ return NULL;
+ }
+
+ new = g_malloc (sizeof (rspamd_io_dispatcher_t));
+ bzero (new, sizeof (rspamd_io_dispatcher_t));
+
+ new->pool = memory_pool_new (memory_pool_get_size ());
+ new->tv = memory_pool_alloc (new->pool, sizeof (struct timeval));
+ memcpy (new->tv, tv, sizeof (struct timeval));
+ new->policy = policy;
+ new->read_callback = read_cb;
+ new->write_callback = write_cb;
+ new->err_callback = err_cb;
+ new->user_data = user_data;
+
+ new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
+ new->fd = fd;
+
+ event_set (new->ev, fd, EV_READ | EV_WRITE | EV_PERSIST, dispatcher_cb, (void *)new);
+ event_add (new->ev, new->tv);
+
+ return new;
+}
+
+void
+rspamd_remove_dispatcher (rspamd_io_dispatcher_t *dispatcher)
+{
+ if (dispatcher != NULL) {
+ event_del (dispatcher->ev);
+ memory_pool_delete (dispatcher->pool);
+ g_free (dispatcher);
+ }
+}
+
+void
+rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
+ enum io_policy policy,
+ size_t nchars)
+{
+ f_str_t *tmp;
+
+ if (d->policy != policy || d->nchars != nchars) {
+ d->policy = policy;
+ d->nchars = nchars ? nchars : BUFSIZ;
+ /* Resize input buffer if needed */
+ if (policy == BUFFER_CHARACTER && nchars != 0) {
+ if (d->in_buf && d->in_buf->data->size < nchars) {
+ tmp = fstralloc (d->pool, d->nchars);
+ memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
+ tmp->len = d->in_buf->data->len;
+ d->in_buf->data = tmp;
+ }
+ }
+ else if (policy == BUFFER_LINE) {
+ if (d->in_buf && d->nchars < BUFSIZ) {
+ tmp = fstralloc (d->pool, BUFSIZ);
+ memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
+ tmp->len = d->in_buf->data->len;
+ d->in_buf->data = tmp;
+ }
+ }
+ }
+}
+
+void
+rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
+ void *data,
+ size_t len, gboolean delayed)
+{
+ rspamd_buffer_t *newbuf;
+
+ newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
+ newbuf->data = memory_pool_alloc (d->pool, sizeof (f_str_t));
+
+ newbuf->data->begin = memory_pool_alloc (d->pool, len);
+ memcpy (newbuf->data->begin, data, len);
+ newbuf->data->size = len;
+ newbuf->pos = newbuf->data->begin;
+
+ d->out_buffers = g_list_prepend (d->out_buffers, newbuf);
+
+ if (!delayed) {
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_READ | EV_WRITE | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
+}
+
+void
+rspamd_dispatcher_pause (rspamd_io_dispatcher_t *d)
+{
+ event_del (d->ev);
+}
+
+/*
+ * vi:ts=4
+ */
diff --git a/src/buffer.h b/src/buffer.h
new file mode 100644
index 000000000..93c7818dd
--- /dev/null
+++ b/src/buffer.h
@@ -0,0 +1,99 @@
+/**
+ * @file buffer.h
+ * Implements buffered IO
+ */
+
+#ifndef RSPAMD_BUFFER_H
+#define RSPAMD_BUFFER_H
+
+#include "config.h"
+#include "mem_pool.h"
+#include "fstring.h"
+
+typedef void (*dispatcher_read_callback_t)(f_str_t *in, void *user_data);
+typedef void (*dispatcher_write_callback_t)(void *user_data);
+typedef void (*dispatcher_err_callback_t)(GError *err, void *user_data);
+
+/**
+ * Types of IO handling
+ */
+enum io_policy {
+ BUFFER_LINE, /**< call handler when we have line ready */
+ BUFFER_CHARACTER, /**< call handler when we have some characters */
+};
+
+/**
+ * Buffer structure
+ */
+typedef struct rspamd_buffer_s {
+ f_str_t *data; /**< buffer logic */
+ char *pos; /**< current position */
+} rspamd_buffer_t;
+
+typedef struct rspamd_io_dispatcher_s {
+ rspamd_buffer_t *in_buf; /**< input buffer */
+ GList *out_buffers; /**< out buffers chain */
+ struct timeval *tv; /**< io timeout */
+ struct event *ev; /**< libevent io event */
+ memory_pool_t *pool; /**< where to store data */
+ enum io_policy policy; /**< IO policy */
+ size_t nchars; /**< how many chars to read */
+ int fd; /**< descriptor */
+ dispatcher_read_callback_t read_callback; /**< read callback */
+ dispatcher_write_callback_t write_callback; /**< write callback */
+ dispatcher_err_callback_t err_callback; /**< error callback */
+ void *user_data; /**< user's data for callbacks */
+} rspamd_io_dispatcher_t;
+
+/**
+ * Creates rspamd IO dispatcher for specified descriptor
+ * @param fd descriptor to IO
+ * @param policy IO policy
+ * @param read_cb read callback handler
+ * @param write_cb write callback handler
+ * @param err_cb error callback handler
+ * @param tv IO timeout
+ * @param user_data pointer to user's data
+ * @return new dispatcher object or NULL in case of failure
+ */
+rspamd_io_dispatcher_t* rspamd_create_dispatcher (int fd,
+ enum io_policy policy,
+ dispatcher_read_callback_t read_cb,
+ dispatcher_write_callback_t write_cb,
+ dispatcher_err_callback_t err_cb,
+ struct timeval *tv,
+ void *user_data);
+
+/**
+ * Set new policy for dispatcher
+ * @param d pointer to dispatcher's object
+ * @param policy IO policy
+ * @param nchars number of characters in buffer for character policy
+ */
+void rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
+ enum io_policy policy,
+ size_t nchars);
+
+/**
+ * Write data when it would be possible
+ * @param d pointer to dispatcher's object
+ * @param data data to write
+ * @param len length of data
+ */
+void rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
+ void *data,
+ size_t len, gboolean delayed);
+
+/**
+ * Pause IO events on dispatcher
+ * @param d pointer to dispatcher's object
+ */
+void rspamd_dispatcher_pause (rspamd_io_dispatcher_t *d);
+
+/**
+ * Frees dispatcher object
+ * @param dispatcher pointer to dispatcher's object
+ */
+void rspamd_remove_dispatcher (rspamd_io_dispatcher_t *dispatcher);
+
+#endif
diff --git a/src/controller.c b/src/controller.c
index 41f8a9649..7733bc924 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -36,6 +36,9 @@
#define CRLF "\r\n"
#define END "END" CRLF
+/* 120 seconds for controller's IO */
+#define CONTROLLER_IO_TIMEOUT 120
+
enum command_type {
COMMAND_PASSWORD,
COMMAND_QUIT,
@@ -68,6 +71,7 @@ static GCompletion *comp;
static time_t start_time;
static char greetingbuf[1024];
+static struct timeval io_tv;
static
void sig_handler (int signo)
@@ -110,8 +114,7 @@ free_session (struct controller_session *session)
struct mime_part *p;
msg_debug ("free_session: freeing session %p", session);
- bufferevent_disable (session->bev, EV_READ | EV_WRITE);
- bufferevent_free (session->bev);
+ rspamd_remove_dispatcher (session->dispatcher);
while ((part = g_list_first (session->parts))) {
session->parts = g_list_remove_link (session->parts, part);
@@ -132,7 +135,7 @@ check_auth (struct controller_command *cmd, struct controller_session *session)
if (cmd->privilleged && !session->authorized) {
r = snprintf (out_buf, sizeof (out_buf), "not authorized" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return 0;
}
@@ -156,18 +159,18 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
if (!arg || *arg == '\0') {
msg_debug ("process_command: empty password passed");
r = snprintf (out_buf, sizeof (out_buf), "password command requires one argument" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
if (strncmp (arg, session->cfg->control_password, strlen (arg)) == 0) {
session->authorized = 1;
r = snprintf (out_buf, sizeof (out_buf), "password accepted" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
}
else {
session->authorized = 0;
r = snprintf (out_buf, sizeof (out_buf), "password NOT accepted" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
}
break;
case COMMAND_QUIT:
@@ -176,7 +179,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
case COMMAND_RELOAD:
if (check_auth (cmd, session)) {
r = snprintf (out_buf, sizeof (out_buf), "reload request sent" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
kill (getppid (), SIGHUP);
}
break;
@@ -199,13 +202,13 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
mem_st.shared_chunks_allocated);
r += snprintf (out_buf + r, sizeof (out_buf) - r, "Chunks freed: %zd" CRLF,
mem_st.chunks_freed);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
}
break;
case COMMAND_SHUTDOWN:
if (check_auth (cmd, session)) {
r = snprintf (out_buf, sizeof (out_buf), "shutdown request sent" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
kill (getppid (), SIGTERM);
}
break;
@@ -235,7 +238,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
minutes, minutes > 1 ? "s" : " ",
(int)uptime, uptime > 1 ? "s" : " ");
}
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
}
break;
case COMMAND_LEARN:
@@ -244,37 +247,28 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
if (!arg || *arg == '\0') {
msg_debug ("process_command: no statfile specified in learn command");
r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
arg = *(cmd_args + 1);
if (arg == NULL || *arg == '\0') {
msg_debug ("process_command: no statfile size specified in learn command");
r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
size = strtoul (arg, &err_str, 10);
if (err_str && *err_str != '\0') {
msg_debug ("process_command: statfile size is invalid: %s", arg);
r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
- bufferevent_write (session->bev, out_buf, r);
- return;
- }
- session->learn_buf = memory_pool_alloc0 (session->session_pool, sizeof (f_str_buf_t));
- session->learn_buf->buf = fstralloc (session->session_pool, size);
- if (session->learn_buf->buf == NULL) {
- r = snprintf (out_buf, sizeof (out_buf), "allocating buffer for learn failed" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
- session->learn_buf->pos = session->learn_buf->buf->begin;
- update_buf_size (session->learn_buf);
statfile = g_hash_table_lookup (session->cfg->statfiles, *cmd_args);
if (statfile == NULL) {
r = snprintf (out_buf, sizeof (out_buf), "statfile %s is not defined" CRLF, *cmd_args);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
@@ -302,7 +296,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
arg = *(cmd_args + 1);
if (!arg || *arg == '\0') {
r = snprintf (out_buf, sizeof (out_buf), "recipient is not defined" CRLF, arg);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
session->learn_rcpt = memory_pool_strdup (session->session_pool, arg);
@@ -311,7 +305,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
arg = *(cmd_args + 1);
if (!arg || *arg == '\0') {
r = snprintf (out_buf, sizeof (out_buf), "from is not defined" CRLF, arg);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
session->learn_from = memory_pool_strdup (session->session_pool, arg);
@@ -321,7 +315,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
break;
default:
r = snprintf (out_buf, sizeof (out_buf), "tokenizer is not defined" CRLF, arg);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
}
@@ -333,15 +327,16 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
if (statfile_pool_create (session->worker->srv->statfile_pool,
session->learn_filename, statfile->size / sizeof (struct stat_file_block)) == -1) {
r = snprintf (out_buf, sizeof (out_buf), "cannot create statfile %s" CRLF, session->learn_filename);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
if (statfile_pool_open (session->worker->srv->statfile_pool, session->learn_filename) == -1) {
r = snprintf (out_buf, sizeof (out_buf), "cannot open statfile %s" CRLF, session->learn_filename);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
}
+ rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size);
session->state = STATE_LEARN;
}
break;
@@ -355,13 +350,13 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
"(*) shutdown - shutdown rspamd" CRLF
" stat - show different rspamd stat" CRLF
" uptime - rspamd uptime" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
break;
}
}
static void
-read_socket (struct bufferevent *bev, void *arg)
+read_socket (f_str_t *in, void *arg)
{
struct controller_session *session = (struct controller_session *)arg;
struct classifier_ctx *cls_ctx;
@@ -375,101 +370,73 @@ read_socket (struct bufferevent *bev, void *arg)
switch (session->state) {
case STATE_COMMAND:
- s = buffer_readline (session->session_pool, EVBUFFER_INPUT (bev));
- msg_debug ("read_socket: got '%s' string from user", s);
- if (s != NULL && *s != 0) {
- len = strlen (s);
- /* Remove end of line characters from string */
- if (s[len - 1] == '\n') {
- if (s[len - 2] == '\r') {
- s[len - 2] = 0;
- }
- s[len - 1] = 0;
- }
- params = g_strsplit (s, " ", -1);
- len = g_strv_length (params);
- if (len > 0) {
- cmd = g_strstrip (params[0]);
- comp_list = g_completion_complete (comp, cmd, NULL);
- switch (g_list_length (comp_list)) {
- case 1:
- process_command ((struct controller_command *)comp_list->data, &params[1], session);
- break;
- case 0:
- msg_debug ("Unknown command: '%s'", cmd);
- i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
- bufferevent_write (bev, out_buf, i);
- break;
- default:
- msg_debug ("Ambigious command: '%s'", cmd);
- i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF);
- bufferevent_write (bev, out_buf, i);
- break;
- }
+ s = fstrcstr (in, session->session_pool);
+ params = g_strsplit (s, " ", -1);
+ len = g_strv_length (params);
+ if (len > 0) {
+ cmd = g_strstrip (params[0]);
+ comp_list = g_completion_complete (comp, cmd, NULL);
+ switch (g_list_length (comp_list)) {
+ case 1:
+ process_command ((struct controller_command *)comp_list->data, &params[1], session);
+ break;
+ case 0:
+ msg_debug ("Unknown command: '%s'", cmd);
+ i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE);
+ break;
+ default:
+ msg_debug ("Ambigious command: '%s'", cmd);
+ i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE);
+ break;
}
- if (session->state == STATE_COMMAND) {
- session->state = STATE_REPLY;
- }
- if (session->state != STATE_LEARN) {
- bufferevent_write (bev, END, sizeof (END) - 1);
- bufferevent_enable (bev, EV_WRITE);
- }
- g_strfreev (params);
}
- else {
- bufferevent_enable (bev, EV_WRITE);
- }
+ if (session->state == STATE_COMMAND) {
+ session->state = STATE_REPLY;
+ }
+ if (session->state != STATE_LEARN) {
+ rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE);
+ }
+
+ g_strfreev (params);
break;
case STATE_LEARN:
- i = bufferevent_read (bev, session->learn_buf->pos, session->learn_buf->free);
- if (i > 0) {
- session->learn_buf->pos += i;
- update_buf_size (session->learn_buf);
- if (session->learn_buf->free == 0) {
- process_learn (session);
- while ((content = get_next_text_part (session->session_pool, session->parts, &cur)) != NULL) {
- c.begin = content->data;
- c.len = content->len;
- if (!session->learn_tokenizer->tokenize_func (session->learn_tokenizer,
- session->session_pool, &c, &tokens)) {
- i = snprintf (out_buf, sizeof (out_buf), "learn fail, tokenizer error" CRLF);
- bufferevent_write (bev, out_buf, i);
- session->state = STATE_REPLY;
- return;
- }
- }
- cls_ctx = session->learn_classifier->init_func (session->session_pool);
- session->learn_classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool,
- session->learn_filename, tokens, session->in_class);
- session->worker->srv->stat->messages_learned ++;
- i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
- bufferevent_write (bev, out_buf, i);
- bufferevent_enable (bev, EV_WRITE);
-
- /* Clean learned parts */
- while ((cur = g_list_first (session->parts))) {
- session->parts = g_list_remove_link (session->parts, cur);
- p = (struct mime_part *)cur->data;
- g_byte_array_free (p->content, FALSE);
- g_list_free_1 (cur);
- }
-
+ session->learn_buf = in;
+ process_learn (session);
+ while ((content = get_next_text_part (session->session_pool, session->parts, &cur)) != NULL) {
+ c.begin = content->data;
+ c.len = content->len;
+ if (!session->learn_tokenizer->tokenize_func (session->learn_tokenizer,
+ session->session_pool, &c, &tokens)) {
+ i = snprintf (out_buf, sizeof (out_buf), "learn fail, tokenizer error" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE);
session->state = STATE_REPLY;
- break;
+ return;
}
}
- else {
- i = snprintf (out_buf, sizeof (out_buf), "read error: %d" CRLF, i);
- bufferevent_write (bev, out_buf, i);
- bufferevent_enable (bev, EV_WRITE);
- session->state = STATE_REPLY;
+ cls_ctx = session->learn_classifier->init_func (session->session_pool);
+ session->learn_classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool,
+ session->learn_filename, tokens, session->in_class);
+ session->worker->srv->stat->messages_learned ++;
+ i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE);
+
+ /* Clean learned parts */
+ while ((cur = g_list_first (session->parts))) {
+ session->parts = g_list_remove_link (session->parts, cur);
+ p = (struct mime_part *)cur->data;
+ g_byte_array_free (p->content, FALSE);
+ g_list_free_1 (cur);
}
+
+ session->state = STATE_REPLY;
break;
}
}
static void
-write_socket (struct bufferevent *bev, void *arg)
+write_socket (void *arg)
{
struct controller_session *session = (struct controller_session *)arg;
@@ -482,16 +449,15 @@ write_socket (struct bufferevent *bev, void *arg)
}
else if (session->state == STATE_REPLY) {
session->state = STATE_COMMAND;
+ rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_LINE, BUFSIZ);
}
- bufferevent_disable (bev, EV_WRITE);
- bufferevent_enable (bev, EV_READ);
}
static void
-err_socket (struct bufferevent *bev, short what, void *arg)
+err_socket (GError *err, void *arg)
{
struct controller_session *session = (struct controller_session *)arg;
- msg_info ("closing control connection");
+ msg_info ("err_socket: abnormally closing control connection, error: %s", err->message);
/* Free buffers */
free_session (session);
}
@@ -525,10 +491,11 @@ accept_socket (int fd, short what, void *arg)
new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1);
worker->srv->stat->control_connections_count ++;
- /* Read event */
- new_session->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_session);
- bufferevent_write (new_session->bev, greetingbuf, strlen (greetingbuf));
- bufferevent_enable (new_session->bev, EV_WRITE);
+ /* Set up dispatcher */
+ new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket,
+ write_socket, err_socket, &io_tv,
+ (void *)new_session);
+ rspamd_dispatcher_write (new_session->dispatcher, greetingbuf, strlen (greetingbuf), FALSE);
}
void
@@ -592,6 +559,9 @@ start_controller (struct rspamd_worker *worker)
/* Send SIGUSR2 to parent */
kill (getppid (), SIGUSR2);
+
+ io_tv.tv_sec = CONTROLLER_IO_TIMEOUT;
+ io_tv.tv_usec = 0;
event_loop (0);
}
diff --git a/src/filter.c b/src/filter.c
index 088ac4376..a1d1e43fa 100644
--- a/src/filter.c
+++ b/src/filter.c
@@ -234,9 +234,6 @@ continue_process_filters (struct worker_task *task)
}
/* Process all metrics */
g_hash_table_foreach (task->results, metric_process_callback, task);
- /* All done */
- bufferevent_enable (task->bev, EV_WRITE);
- evbuffer_drain (task->bev->output, EVBUFFER_LENGTH (task->bev->output));
process_statfiles (task);
return 1;
}
diff --git a/src/fstring.h b/src/fstring.h
index 896cd8dcf..000ba74c6 100644
--- a/src/fstring.h
+++ b/src/fstring.h
@@ -5,12 +5,7 @@
#ifndef FSTRING_H
#define FSTRING_H
-#include <sys/types.h>
#include "config.h"
-
-#ifdef HAVE_STDINT_H
-#include <stdint.h>
-#endif
#include "mem_pool.h"
#define update_buf_size(x) (x)->free = (x)->buf->size - ((x)->pos - (x)->buf->begin); (x)->buf->len = (x)->pos - (x)->buf->begin
diff --git a/src/main.h b/src/main.h
index e1244d0ba..ec74ad03b 100644
--- a/src/main.h
+++ b/src/main.h
@@ -14,6 +14,7 @@
#include "memcached.h"
#include "protocol.h"
#include "filter.h"
+#include "buffer.h"
/* Default values */
#define FIXED_CONFIG_FILE "./rspamd.conf"
@@ -133,14 +134,14 @@ struct controller_session {
/* Access to authorized commands */
int authorized; /**< whether this session is authorized */
memory_pool_t *session_pool; /**< memory pool for session */
- struct bufferevent *bev; /**< buffered event for IO */
struct config_file *cfg; /**< pointer to config file */
char *learn_rcpt; /**< recipient for learning */
char *learn_from; /**< from address for learning */
struct tokenizer *learn_tokenizer; /**< tokenizer for learning */
struct classifier *learn_classifier; /**< classifier for learning */
char *learn_filename; /**< real filename for learning */
- f_str_buf_t *learn_buf; /**< learn input */
+ rspamd_io_dispatcher_t *dispatcher; /**< IO dispatcher object */
+ f_str_t *learn_buf; /**< learn input */
GList *parts; /**< extracted mime parts */
int in_class; /**< positive or negative learn */
};
@@ -168,8 +169,8 @@ struct worker_task {
GList *rcpt; /**< recipients list */
unsigned int nrcpt; /**< number of recipients */
struct in_addr from_addr; /**< client addr in numeric form */
- f_str_buf_t *msg; /**< message buffer */
- struct bufferevent *bev; /**< buffered event for IO */
+ f_str_t *msg; /**< message buffer */
+ rspamd_io_dispatcher_t *dispatcher; /**< IO dispatcher object */
memcached_ctx_t *memc_ctx; /**< memcached context associated with task */
int parts_count; /**< mime parts count */
GMimeMessage *message; /**< message, parsed with GMime */
diff --git a/src/message.c b/src/message.c
index 5c0f15cf9..d5ec43653 100644
--- a/src/message.c
+++ b/src/message.c
@@ -304,7 +304,7 @@ process_message (struct worker_task *task)
GMimeParser *parser;
GMimeStream *stream;
- stream = g_mime_stream_mem_new_with_buffer (task->msg->buf->begin, task->msg->buf->len);
+ stream = g_mime_stream_mem_new_with_buffer (task->msg->begin, task->msg->len);
/* create a new parser object to parse the stream */
parser = g_mime_parser_new_with_stream (stream);
@@ -393,7 +393,7 @@ process_learn (struct controller_session *session)
GMimeParser *parser;
GMimeStream *stream;
- stream = g_mime_stream_mem_new_with_buffer (session->learn_buf->buf->begin, session->learn_buf->buf->len);
+ stream = g_mime_stream_mem_new_with_buffer (session->learn_buf->begin, session->learn_buf->len);
/* create a new parser object to parse the stream */
parser = g_mime_parser_new_with_stream (stream);
diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c
index 1ecfcb36d..9cc37ea7e 100644
--- a/src/plugins/regexp.c
+++ b/src/plugins/regexp.c
@@ -180,7 +180,7 @@ process_regexp (struct rspamd_regexp *re, struct worker_task *task)
return 0;
case REGEXP_MESSAGE:
msg_debug ("process_message: checking message regexp: /%s/", re->regexp_text);
- if (g_regex_match_full (re->regexp, task->msg->buf->begin, task->msg->buf->len, 0, 0, NULL, NULL) == TRUE) {
+ if (g_regex_match_full (re->regexp, task->msg->begin, task->msg->len, 0, 0, NULL, NULL) == TRUE) {
return 1;
}
return 0;
diff --git a/src/protocol.c b/src/protocol.c
index 92b331ac2..ab0db5f78 100644
--- a/src/protocol.c
+++ b/src/protocol.c
@@ -174,14 +174,17 @@ static int
parse_header (struct worker_task *task, char *line)
{
char *headern, *err, *tmp;
-
+
+ msg_debug ("parse_header: got line from worker: %s", line);
/* Check end of headers */
if (*line == '\0') {
+ msg_debug ("parse_header: got empty line, assume it as end of headers");
if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) {
task->state = WRITE_REPLY;
}
else {
if (task->content_length > 0) {
+ rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_CHARACTER, task->content_length);
task->state = READ_MESSAGE;
}
else {
@@ -209,14 +212,7 @@ parse_header (struct worker_task *task, char *line)
if (strncasecmp (headern, CONTENT_LENGTH_HEADER, sizeof (CONTENT_LENGTH_HEADER) - 1) == 0) {
if (task->content_length == 0) {
task->content_length = strtoul (line, &err, 10);
- task->msg = memory_pool_alloc0 (task->task_pool, sizeof (f_str_buf_t));
- task->msg->buf = fstralloc (task->task_pool, task->content_length);
- if (task->msg->buf == NULL) {
- msg_err ("read_socket: cannot allocate memory for message buffer");
- return -1;
- }
- task->msg->pos = task->msg->buf->begin;
- update_buf_size (task->msg);
+ msg_debug ("parse_header: read Content-Length header, value: %lu", (unsigned long int)task->content_length);
}
}
else {
@@ -229,6 +225,7 @@ parse_header (struct worker_task *task, char *line)
/* helo */
if (strncasecmp (headern, HELO_HEADER, sizeof (HELO_HEADER) - 1) == 0) {
task->helo = memory_pool_strdup (task->task_pool, line);
+ msg_debug ("parse_header: read helo header, value: %s", task->helo);
}
else {
msg_info ("parse_header: wrong header: %s", headern);
@@ -240,6 +237,7 @@ parse_header (struct worker_task *task, char *line)
/* from */
if (strncasecmp (headern, FROM_HEADER, sizeof (FROM_HEADER) - 1) == 0) {
task->from = memory_pool_strdup (task->task_pool, line);
+ msg_debug ("parse_header: read from header, value: %s", task->from);
}
else {
msg_info ("parse_header: wrong header: %s", headern);
@@ -252,6 +250,7 @@ parse_header (struct worker_task *task, char *line)
if (strncasecmp (headern, RCPT_HEADER, sizeof (RCPT_HEADER) - 1) == 0) {
tmp = memory_pool_strdup (task->task_pool, line);
task->rcpt = g_list_prepend (task->rcpt, tmp);
+ msg_debug ("parse_header: read rcpt header, value: %s", tmp);
}
else {
msg_info ("parse_header: wrong header: %s", headern);
@@ -263,6 +262,7 @@ parse_header (struct worker_task *task, char *line)
/* nrcpt */
if (strncasecmp (headern, NRCPT_HEADER, sizeof (NRCPT_HEADER) - 1) == 0) {
task->nrcpt = strtoul (line, &err, 10);
+ msg_debug ("parse_header: read rcpt header, value: %d", (int)task->nrcpt);
}
else {
msg_info ("parse_header: wrong header: %s", headern);
@@ -277,6 +277,7 @@ parse_header (struct worker_task *task, char *line)
msg_info ("parse_header: bad ip header: '%s'", line);
return -1;
}
+ msg_debug ("parse_header: read IP header, value: %s", line);
}
else {
msg_info ("parse_header: wrong header: %s", headern);
@@ -292,14 +293,14 @@ parse_header (struct worker_task *task, char *line)
}
int
-read_rspamd_input_line (struct worker_task *task, char *line)
+read_rspamd_input_line (struct worker_task *task, f_str_t *line)
{
switch (task->state) {
case READ_COMMAND:
- return parse_command (task, line);
+ return parse_command (task, fstrcstr (line, task->task_pool));
break;
case READ_HEADER:
- return parse_header (task, line);
+ return parse_header (task, fstrcstr (line, task->task_pool));
break;
}
}
@@ -323,7 +324,7 @@ show_url_header (struct worker_task *task)
/* Do header folding */
if (host.len + r >= OUTBUFSIZ - 3) {
outbuf[r ++] = '\r'; outbuf[r ++] = '\n'; outbuf[r] = ' ';
- bufferevent_write (task->bev, outbuf, r);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE);
r = 0;
}
/* Write url host to buf */
@@ -340,7 +341,7 @@ show_url_header (struct worker_task *task)
*(host.begin + host.len) = c;
}
}
- bufferevent_write (task->bev, outbuf, r);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
}
static void
@@ -363,7 +364,7 @@ show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data
r = snprintf (outbuf, sizeof (outbuf), "%s: %s ; %.2f / %.2f" CRLF, (char *)metric_name,
(is_spam) ? "True" : "False", metric_res->score, metric_res->metric->required_score);
}
- bufferevent_write (task->bev, outbuf, r);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
}
static int
@@ -374,7 +375,7 @@ write_check_reply (struct worker_task *task)
struct metric_result *metric_res;
r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, "OK");
- bufferevent_write (task->bev, outbuf, r);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE);
if (task->proto == SPAMC_PROTO) {
/* Ignore metrics, just write report for 'default' metric */
metric_res = g_hash_table_lookup (task->results, "default");
@@ -391,7 +392,7 @@ write_check_reply (struct worker_task *task)
/* URL stat */
show_url_header (task);
}
- bufferevent_write (task->bev, CRLF, sizeof (CRLF) - 1);
+ rspamd_dispatcher_write (task->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE);
return 0;
}
@@ -423,7 +424,7 @@ show_metric_symbols (gpointer metric_name, gpointer metric_value, void *user_dat
g_list_free (symbols);
msg_debug ("show_metric_symbols: write symbols line: %s", outbuf);
outbuf[r++] = '\r'; outbuf[r++] = '\n';
- bufferevent_write (task->bev, outbuf, r);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
}
static int
@@ -450,6 +451,7 @@ write_symbols_reply (struct worker_task *task)
/* Write result for each metric separately */
g_hash_table_foreach (task->results, show_metric_symbols, task);
}
+
return 0;
}
@@ -460,9 +462,9 @@ write_process_reply (struct worker_task *task)
char outbuf[OUTBUFSIZ];
r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF "Content-Length: %zd" CRLF CRLF,
- (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, "OK", task->msg->buf->len);
- bufferevent_write (task->bev, outbuf, r);
- bufferevent_write (task->bev, task->msg->buf->begin, task->msg->buf->len);
+ (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, "OK", task->msg->len);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE);
+ rspamd_dispatcher_write (task->dispatcher, task->msg->begin, task->msg->len, FALSE);
return 0;
}
@@ -486,7 +488,7 @@ write_reply (struct worker_task *task)
msg_debug ("write_reply: writing error: %s", outbuf);
}
/* Write to bufferevent error message */
- bufferevent_write (task->bev, outbuf, r);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
}
else {
switch (task->cmd) {
@@ -504,11 +506,11 @@ write_reply (struct worker_task *task)
case CMD_SKIP:
r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER,
SPAMD_OK);
- bufferevent_write (task->bev, outbuf, r);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
break;
case CMD_PING:
r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER);
- bufferevent_write (task->bev, outbuf, r);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
break;
}
}
diff --git a/src/protocol.h b/src/protocol.h
index 243e216aa..74e7f7f98 100644
--- a/src/protocol.h
+++ b/src/protocol.h
@@ -36,7 +36,7 @@ enum rspamd_command {
* @param line line of user's input
* @return 0 if line was successfully parsed and -1 if we have protocol error
*/
-int read_rspamd_input_line (struct worker_task *task, char *line);
+int read_rspamd_input_line (struct worker_task *task, f_str_t *line);
/**
* Write reply for specified task command
diff --git a/src/util.c b/src/util.c
index 952c3a93e..cffa5e06b 100644
--- a/src/util.c
+++ b/src/util.c
@@ -901,50 +901,5 @@ resolve_stat_filename (memory_pool_t *pool, char *pattern, char *rcpt, char *fro
}
/*
- * These functions are from libevent where they are static and not exported anywhere
- * XXX: think how to avoid this
- */
-
-char *
-buffer_readline (memory_pool_t *pool, struct evbuffer *buf)
-{
- u_char *data = EVBUFFER_DATA (buf);
- size_t len = EVBUFFER_LENGTH (buf);
- char *line, fch, sch;
- unsigned int i;
-
- for (i = 0; i < len; i++) {
- if (data[i] == '\r' || data[i] == '\n') {
- break;
- }
- }
-
- if (i == len) {
- return (NULL);
- }
-
- line = memory_pool_alloc (pool, i + 1);
-
- memcpy (line, data, i);
- line[i] = '\0';
-
- /*
- * Some protocols terminate a line with '\r\n', so check for
- * that, too.
- */
- if ( i < len - 1 ) {
- fch = data[i], sch = data[i+1];
-
- /* Drain one more character if needed */
- if ( (sch == '\r' || sch == '\n') && sch != fch )
- i += 1;
- }
-
- evbuffer_drain (buf, i + 1);
-
- return (line);
-}
-
-/*
* vi:ts=4
*/
diff --git a/src/util.h b/src/util.h
index fa5bbfbdb..69ae0fe92 100644
--- a/src/util.h
+++ b/src/util.h
@@ -55,8 +55,4 @@ void file_log_function (const gchar *log_domain, GLogLevelFlags log_level, const
/* Replace %r with rcpt value and %f with from value, new string is allocated in pool */
char* resolve_stat_filename (memory_pool_t *pool, char *pattern, char *rcpt, char *from);
-/* Replace libevent evbuffer_readline with memory_pool variant */
-char* buffer_readline (memory_pool_t *pool, struct evbuffer *buf);
-
-
#endif
diff --git a/src/worker.c b/src/worker.c
index caacda838..6d0c413cd 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -40,6 +40,8 @@
#include <perl.h> /* from the Perl distribution */
#define TASK_POOL_SIZE 4095
+/* 2 seconds for worker's IO */
+#define WORKER_IO_TIMEOUT 2
const f_str_t CRLF = {
/* begin */"\r\n",
@@ -47,8 +49,12 @@ const f_str_t CRLF = {
/* size */2
};
+static struct timeval io_tv;
+
extern PerlInterpreter *perl_interpreter;
+static void write_socket (void *arg);
+
static
void sig_handler (int signo)
{
@@ -113,8 +119,7 @@ free_task (struct worker_task *task)
g_list_free_1 (part);
}
memory_pool_delete (task->task_pool);
- bufferevent_disable (task->bev, EV_READ | EV_WRITE);
- bufferevent_free (task->bev);
+ rspamd_remove_dispatcher (task->dispatcher);
close (task->sock);
g_free (task);
}
@@ -124,66 +129,40 @@ free_task (struct worker_task *task)
* Callback that is called when there is data to read in buffer
*/
static void
-read_socket (struct bufferevent *bev, void *arg)
+read_socket (f_str_t *in, void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
ssize_t r;
- char *s;
switch (task->state) {
case READ_COMMAND:
case READ_HEADER:
- s = buffer_readline (task->task_pool, EVBUFFER_INPUT (bev));
- if (s == NULL) {
- msg_debug ("read_socket: got incomplete line from user");
- return;
- }
- if (read_rspamd_input_line (task, s) != 0) {
+ if (read_rspamd_input_line (task, in) != 0) {
task->last_error = "Read error";
task->error_code = RSPAMD_NETWORK_ERROR;
task->state = WRITE_ERROR;
- }
- if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) {
- bufferevent_enable (bev, EV_WRITE);
- bufferevent_disable (bev, EV_READ);
+ write_socket (task);
}
break;
case READ_MESSAGE:
- r = bufferevent_read (bev, task->msg->pos, task->msg->free);
- if (r > 0) {
- task->msg->pos += r;
- msg_debug ("read_socket: read %zd bytes from socket, %zd bytes left", r, task->msg->free);
- update_buf_size (task->msg);
- if (task->msg->free == 0) {
- r = process_message (task);
- r = process_filters (task);
- if (r == -1) {
- task->last_error = "Filter processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
- }
- else if (r == 0) {
- task->state = WAIT_FILTER;
- }
- else {
- process_statfiles (task);
- }
- }
- if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) {
- bufferevent_enable (bev, EV_WRITE);
- bufferevent_disable (bev, EV_READ);
- evbuffer_drain (bev->output, EVBUFFER_LENGTH (bev->output));
- }
+ task->msg = in;
+ r = process_message (task);
+ r = process_filters (task);
+ if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ write_socket (task);
+ }
+ else if (r == 0) {
+ task->state = WAIT_FILTER;
+ rspamd_dispatcher_pause (task->dispatcher);
}
else {
- msg_warn ("read_socket: cannot read data to buffer (free space: %zd): %ld", task->msg->free, (long int)r);
- bufferevent_disable (bev, EV_READ);
- free_task (task);
+ process_statfiles (task);
+ write_socket (task);
}
break;
- case WAIT_FILTER:
- bufferevent_disable (bev, EV_READ);
- break;
}
}
@@ -191,7 +170,7 @@ read_socket (struct bufferevent *bev, void *arg)
* Callback for socket writing
*/
static void
-write_socket (struct bufferevent *bev, void *arg)
+write_socket (void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
@@ -199,12 +178,10 @@ write_socket (struct bufferevent *bev, void *arg)
case WRITE_REPLY:
write_reply (task);
task->state = CLOSING_CONNECTION;
- bufferevent_disable (bev, EV_READ);
break;
case WRITE_ERROR:
write_reply (task);
task->state = CLOSING_CONNECTION;
- bufferevent_disable (bev, EV_READ);
break;
case CLOSING_CONNECTION:
msg_debug ("write_socket: normally closing connection");
@@ -221,10 +198,10 @@ write_socket (struct bufferevent *bev, void *arg)
* Called if something goes wrong
*/
static void
-err_socket (struct bufferevent *bev, short what, void *arg)
+err_socket (GError *err, void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
- msg_info ("err_socket: abnormally closing connection");
+ msg_info ("err_socket: abnormally closing connection, error: %s", err->message);
/* Free buffers */
free_task (task);
}
@@ -266,9 +243,10 @@ accept_socket (int fd, short what, void *arg)
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results);
worker->srv->stat->connections_count ++;
- /* Read event */
- new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task);
- bufferevent_enable (new_task->bev, EV_READ);
+ /* Set up dispatcher */
+ new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket,
+ write_socket, err_socket, &io_tv,
+ (void *)new_task);
}
/*
@@ -304,6 +282,9 @@ start_worker (struct rspamd_worker *worker, int listen_sock)
/* Send SIGUSR2 to parent */
kill (getppid (), SIGUSR2);
+ io_tv.tv_sec = WORKER_IO_TIMEOUT;
+ io_tv.tv_usec = 0;
+
event_loop (0);
}