aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-02-19 21:16:30 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-02-19 21:16:30 +0300
commitbcece60fa1bfd4bbb09a64c058835fe3245e1d18 (patch)
tree813faac7ee96029e604a8a73c88ffee4e10a579a /src/worker.c
parent2dca592dce2fdde806af89e5cf036bd11bd4df61 (diff)
downloadrspamd-bcece60fa1bfd4bbb09a64c058835fe3245e1d18.tar.gz
rspamd-bcece60fa1bfd4bbb09a64c058835fe3245e1d18.zip
* Implement rspamd IO with IO dispatcher (TODO: still some issues with timeouts must be resolved)
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c87
1 files changed, 34 insertions, 53 deletions
diff --git a/src/worker.c b/src/worker.c
index caacda838..6d0c413cd 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -40,6 +40,8 @@
#include <perl.h> /* from the Perl distribution */
#define TASK_POOL_SIZE 4095
+/* 2 seconds for worker's IO */
+#define WORKER_IO_TIMEOUT 2
const f_str_t CRLF = {
/* begin */"\r\n",
@@ -47,8 +49,12 @@ const f_str_t CRLF = {
/* size */2
};
+static struct timeval io_tv;
+
extern PerlInterpreter *perl_interpreter;
+static void write_socket (void *arg);
+
static
void sig_handler (int signo)
{
@@ -113,8 +119,7 @@ free_task (struct worker_task *task)
g_list_free_1 (part);
}
memory_pool_delete (task->task_pool);
- bufferevent_disable (task->bev, EV_READ | EV_WRITE);
- bufferevent_free (task->bev);
+ rspamd_remove_dispatcher (task->dispatcher);
close (task->sock);
g_free (task);
}
@@ -124,66 +129,40 @@ free_task (struct worker_task *task)
* Callback that is called when there is data to read in buffer
*/
static void
-read_socket (struct bufferevent *bev, void *arg)
+read_socket (f_str_t *in, void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
ssize_t r;
- char *s;
switch (task->state) {
case READ_COMMAND:
case READ_HEADER:
- s = buffer_readline (task->task_pool, EVBUFFER_INPUT (bev));
- if (s == NULL) {
- msg_debug ("read_socket: got incomplete line from user");
- return;
- }
- if (read_rspamd_input_line (task, s) != 0) {
+ if (read_rspamd_input_line (task, in) != 0) {
task->last_error = "Read error";
task->error_code = RSPAMD_NETWORK_ERROR;
task->state = WRITE_ERROR;
- }
- if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) {
- bufferevent_enable (bev, EV_WRITE);
- bufferevent_disable (bev, EV_READ);
+ write_socket (task);
}
break;
case READ_MESSAGE:
- r = bufferevent_read (bev, task->msg->pos, task->msg->free);
- if (r > 0) {
- task->msg->pos += r;
- msg_debug ("read_socket: read %zd bytes from socket, %zd bytes left", r, task->msg->free);
- update_buf_size (task->msg);
- if (task->msg->free == 0) {
- r = process_message (task);
- r = process_filters (task);
- if (r == -1) {
- task->last_error = "Filter processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
- }
- else if (r == 0) {
- task->state = WAIT_FILTER;
- }
- else {
- process_statfiles (task);
- }
- }
- if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) {
- bufferevent_enable (bev, EV_WRITE);
- bufferevent_disable (bev, EV_READ);
- evbuffer_drain (bev->output, EVBUFFER_LENGTH (bev->output));
- }
+ task->msg = in;
+ r = process_message (task);
+ r = process_filters (task);
+ if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ write_socket (task);
+ }
+ else if (r == 0) {
+ task->state = WAIT_FILTER;
+ rspamd_dispatcher_pause (task->dispatcher);
}
else {
- msg_warn ("read_socket: cannot read data to buffer (free space: %zd): %ld", task->msg->free, (long int)r);
- bufferevent_disable (bev, EV_READ);
- free_task (task);
+ process_statfiles (task);
+ write_socket (task);
}
break;
- case WAIT_FILTER:
- bufferevent_disable (bev, EV_READ);
- break;
}
}
@@ -191,7 +170,7 @@ read_socket (struct bufferevent *bev, void *arg)
* Callback for socket writing
*/
static void
-write_socket (struct bufferevent *bev, void *arg)
+write_socket (void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
@@ -199,12 +178,10 @@ write_socket (struct bufferevent *bev, void *arg)
case WRITE_REPLY:
write_reply (task);
task->state = CLOSING_CONNECTION;
- bufferevent_disable (bev, EV_READ);
break;
case WRITE_ERROR:
write_reply (task);
task->state = CLOSING_CONNECTION;
- bufferevent_disable (bev, EV_READ);
break;
case CLOSING_CONNECTION:
msg_debug ("write_socket: normally closing connection");
@@ -221,10 +198,10 @@ write_socket (struct bufferevent *bev, void *arg)
* Called if something goes wrong
*/
static void
-err_socket (struct bufferevent *bev, short what, void *arg)
+err_socket (GError *err, void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
- msg_info ("err_socket: abnormally closing connection");
+ msg_info ("err_socket: abnormally closing connection, error: %s", err->message);
/* Free buffers */
free_task (task);
}
@@ -266,9 +243,10 @@ accept_socket (int fd, short what, void *arg)
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results);
worker->srv->stat->connections_count ++;
- /* Read event */
- new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task);
- bufferevent_enable (new_task->bev, EV_READ);
+ /* Set up dispatcher */
+ new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket,
+ write_socket, err_socket, &io_tv,
+ (void *)new_task);
}
/*
@@ -304,6 +282,9 @@ start_worker (struct rspamd_worker *worker, int listen_sock)
/* Send SIGUSR2 to parent */
kill (getppid (), SIGUSR2);
+ io_tv.tv_sec = WORKER_IO_TIMEOUT;
+ io_tv.tv_usec = 0;
+
event_loop (0);
}