diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-07-29 20:17:29 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-07-29 20:17:29 +0400 |
commit | c4621d35aebaf9c6d466bf7a15a9de340b20b0ce (patch) | |
tree | 75553d38928ccbb2b029bb7646b67ab8b0afac8f /src/worker.c | |
parent | eb9facb84cad4017db7ac877b5a0446472756308 (diff) | |
download | rspamd-c4621d35aebaf9c6d466bf7a15a9de340b20b0ce.tar.gz rspamd-c4621d35aebaf9c6d466bf7a15a9de340b20b0ce.zip |
* Add support for extending controller protocol by modules
* Add write support via controller to fuzzy storage
TODO: Add delete and check commands support to controller interface
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 107 |
1 files changed, 61 insertions, 46 deletions
diff --git a/src/worker.c b/src/worker.c index 726f7fe00..4832c71fc 100644 --- a/src/worker.c +++ b/src/worker.c @@ -4,11 +4,11 @@ * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED @@ -40,8 +40,8 @@ #include <evdns.h> #ifndef WITHOUT_PERL -#include <EXTERN.h> /* from the Perl distribution */ -#include <perl.h> /* from the Perl distribution */ +#include <EXTERN.h> /* from the Perl distribution */ +#include <perl.h> /* from the Perl distribution */ extern PerlInterpreter *perl_interpreter; #endif @@ -111,7 +111,7 @@ rcpt_destruct (void *pointer) /* * Free all structures of worker_task */ -static void +void free_task (struct worker_task *task, gboolean is_soft) { GList *part; @@ -135,14 +135,18 @@ free_task (struct worker_task *task, gboolean is_soft) g_list_free (task->urls); } memory_pool_delete (task->task_pool); - if (is_soft) { - /* Plan dispatcher shutdown */ - task->dispatcher->wanna_die = 1; + if (task->dispatcher) { + if (is_soft) { + /* Plan dispatcher shutdown */ + task->dispatcher->wanna_die = 1; + } + else { + rspamd_remove_dispatcher (task->dispatcher); + } } - else { - rspamd_remove_dispatcher (task->dispatcher); + if (task->sock != -1) { + close (task->sock); } - close (task->sock); g_free (task); } } @@ -174,13 +178,13 @@ read_socket (f_str_t *in, void *arg) task->msg->len = in->len; msg_debug ("read_socket: got string of length %ld", (long int)task->msg->len); r = process_message (task); - if (r == -1) { - msg_warn ("read_socket: processing of message failed"); + if (r == -1) { + msg_warn ("read_socket: processing of message failed"); task->last_error = "MIME processing error"; task->error_code = RSPAMD_FILTER_ERROR; task->state = WRITE_ERROR; write_socket (task); - } + } if (task->cmd == CMD_OTHER) { /* Skip filters */ task->state = WRITE_REPLY; @@ -249,36 +253,10 @@ err_socket (GError *err, void *arg) free_task (task, FALSE); } -/* - * Accept new connection and construct task - */ -static void -accept_socket (int fd, short what, void *arg) +struct worker_task * +construct_task (struct rspamd_worker *worker) { - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - struct sockaddr_storage ss; - struct sockaddr_in *sin; struct worker_task *new_task; - socklen_t addrlen = sizeof(ss); - int nfd; - - if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { - msg_warn ("accept_socket: accept failed: %s", strerror (errno)); - return; - } - /* Check for EAGAIN */ - if (nfd == 0) { - msg_debug ("accept_socket: cannot accept socket as it was already accepted by other worker"); - return; - } - - if (ss.ss_family == AF_UNIX) { - msg_info ("accept_socket: accepted connection from unix socket"); - } - else if (ss.ss_family == AF_INET) { - sin = (struct sockaddr_in *) &ss; - msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port)); - } new_task = g_malloc (sizeof (struct worker_task)); @@ -286,7 +264,6 @@ accept_socket (int fd, short what, void *arg) bzero (new_task, sizeof (struct worker_task)); new_task->worker = worker; new_task->state = READ_COMMAND; - new_task->sock = nfd; new_task->cfg = worker->srv->cfg; new_task->from_addr.s_addr = INADDR_NONE; new_task->view_checked = FALSE; @@ -307,12 +284,50 @@ accept_socket (int fd, short what, void *arg) new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal); memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->re_cache); + return new_task; +} + +/* + * Accept new connection and construct task + */ +static void +accept_socket (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + struct sockaddr_storage ss; + struct sockaddr_in *sin; + struct worker_task *new_task; + socklen_t addrlen = sizeof(ss); + int nfd; + + if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { + msg_warn ("accept_socket: accept failed: %s", strerror (errno)); + return; + } + /* Check for EAGAIN */ + if (nfd == 0) { + msg_debug ("accept_socket: cannot accept socket as it was already accepted by other worker"); + return; + } + + if (ss.ss_family == AF_UNIX) { + msg_info ("accept_socket: accepted connection from unix socket"); + } + else if (ss.ss_family == AF_INET) { + sin = (struct sockaddr_in *) &ss; + msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port)); + } + + new_task = construct_task (worker); + + new_task->sock = nfd; worker->srv->stat->connections_count ++; /* Set up dispatcher */ new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &io_tv, (void *)new_task); + } /* |