diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-02-19 21:16:30 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-02-19 21:16:30 +0300 |
commit | bcece60fa1bfd4bbb09a64c058835fe3245e1d18 (patch) | |
tree | 813faac7ee96029e604a8a73c88ffee4e10a579a /src/worker.c | |
parent | 2dca592dce2fdde806af89e5cf036bd11bd4df61 (diff) | |
download | rspamd-bcece60fa1bfd4bbb09a64c058835fe3245e1d18.tar.gz rspamd-bcece60fa1bfd4bbb09a64c058835fe3245e1d18.zip |
* Implement rspamd IO with IO dispatcher (TODO: still some issues with timeouts must be resolved)
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 87 |
1 files changed, 34 insertions, 53 deletions
diff --git a/src/worker.c b/src/worker.c index caacda838..6d0c413cd 100644 --- a/src/worker.c +++ b/src/worker.c @@ -40,6 +40,8 @@ #include <perl.h> /* from the Perl distribution */ #define TASK_POOL_SIZE 4095 +/* 2 seconds for worker's IO */ +#define WORKER_IO_TIMEOUT 2 const f_str_t CRLF = { /* begin */"\r\n", @@ -47,8 +49,12 @@ const f_str_t CRLF = { /* size */2 }; +static struct timeval io_tv; + extern PerlInterpreter *perl_interpreter; +static void write_socket (void *arg); + static void sig_handler (int signo) { @@ -113,8 +119,7 @@ free_task (struct worker_task *task) g_list_free_1 (part); } memory_pool_delete (task->task_pool); - bufferevent_disable (task->bev, EV_READ | EV_WRITE); - bufferevent_free (task->bev); + rspamd_remove_dispatcher (task->dispatcher); close (task->sock); g_free (task); } @@ -124,66 +129,40 @@ free_task (struct worker_task *task) * Callback that is called when there is data to read in buffer */ static void -read_socket (struct bufferevent *bev, void *arg) +read_socket (f_str_t *in, void *arg) { struct worker_task *task = (struct worker_task *)arg; ssize_t r; - char *s; switch (task->state) { case READ_COMMAND: case READ_HEADER: - s = buffer_readline (task->task_pool, EVBUFFER_INPUT (bev)); - if (s == NULL) { - msg_debug ("read_socket: got incomplete line from user"); - return; - } - if (read_rspamd_input_line (task, s) != 0) { + if (read_rspamd_input_line (task, in) != 0) { task->last_error = "Read error"; task->error_code = RSPAMD_NETWORK_ERROR; task->state = WRITE_ERROR; - } - if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) { - bufferevent_enable (bev, EV_WRITE); - bufferevent_disable (bev, EV_READ); + write_socket (task); } break; case READ_MESSAGE: - r = bufferevent_read (bev, task->msg->pos, task->msg->free); - if (r > 0) { - task->msg->pos += r; - msg_debug ("read_socket: read %zd bytes from socket, %zd bytes left", r, task->msg->free); - update_buf_size (task->msg); - if (task->msg->free == 0) { - r = process_message (task); - r = process_filters (task); - if (r == -1) { - task->last_error = "Filter processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; - } - else if (r == 0) { - task->state = WAIT_FILTER; - } - else { - process_statfiles (task); - } - } - if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) { - bufferevent_enable (bev, EV_WRITE); - bufferevent_disable (bev, EV_READ); - evbuffer_drain (bev->output, EVBUFFER_LENGTH (bev->output)); - } + task->msg = in; + r = process_message (task); + r = process_filters (task); + if (r == -1) { + task->last_error = "Filter processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + write_socket (task); + } + else if (r == 0) { + task->state = WAIT_FILTER; + rspamd_dispatcher_pause (task->dispatcher); } else { - msg_warn ("read_socket: cannot read data to buffer (free space: %zd): %ld", task->msg->free, (long int)r); - bufferevent_disable (bev, EV_READ); - free_task (task); + process_statfiles (task); + write_socket (task); } break; - case WAIT_FILTER: - bufferevent_disable (bev, EV_READ); - break; } } @@ -191,7 +170,7 @@ read_socket (struct bufferevent *bev, void *arg) * Callback for socket writing */ static void -write_socket (struct bufferevent *bev, void *arg) +write_socket (void *arg) { struct worker_task *task = (struct worker_task *)arg; @@ -199,12 +178,10 @@ write_socket (struct bufferevent *bev, void *arg) case WRITE_REPLY: write_reply (task); task->state = CLOSING_CONNECTION; - bufferevent_disable (bev, EV_READ); break; case WRITE_ERROR: write_reply (task); task->state = CLOSING_CONNECTION; - bufferevent_disable (bev, EV_READ); break; case CLOSING_CONNECTION: msg_debug ("write_socket: normally closing connection"); @@ -221,10 +198,10 @@ write_socket (struct bufferevent *bev, void *arg) * Called if something goes wrong */ static void -err_socket (struct bufferevent *bev, short what, void *arg) +err_socket (GError *err, void *arg) { struct worker_task *task = (struct worker_task *)arg; - msg_info ("err_socket: abnormally closing connection"); + msg_info ("err_socket: abnormally closing connection, error: %s", err->message); /* Free buffers */ free_task (task); } @@ -266,9 +243,10 @@ accept_socket (int fd, short what, void *arg) memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results); worker->srv->stat->connections_count ++; - /* Read event */ - new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task); - bufferevent_enable (new_task->bev, EV_READ); + /* Set up dispatcher */ + new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, + write_socket, err_socket, &io_tv, + (void *)new_task); } /* @@ -304,6 +282,9 @@ start_worker (struct rspamd_worker *worker, int listen_sock) /* Send SIGUSR2 to parent */ kill (getppid (), SIGUSR2); + io_tv.tv_sec = WORKER_IO_TIMEOUT; + io_tv.tv_usec = 0; + event_loop (0); } |