diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-03-10 19:54:20 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-03-10 19:54:20 +0300 |
commit | e7017a518bbec3f2de37275ac6195ea55cb6b19e (patch) | |
tree | 084f1c81a1d0606e575abd2156e5c4314615652c | |
parent | 9cfc5813c0a07ddce2bd0c611a9de84f67a4a910 (diff) | |
download | rspamd-e7017a518bbec3f2de37275ac6195ea55cb6b19e.tar.gz rspamd-e7017a518bbec3f2de37275ac6195ea55cb6b19e.zip |
* Fix redirector connection procedure
* Add more strict login
* Add new header Queue-ID to protocol
* Log message id or queue id
* Add config file for redirector
* Add ability to set regexp and domains list to check with redirector
-rw-r--r-- | src/main.h | 1 | ||||
-rw-r--r-- | src/message.c | 5 | ||||
-rw-r--r-- | src/plugins/surbl.c | 24 | ||||
-rw-r--r-- | src/protocol.c | 13 | ||||
-rw-r--r-- | src/worker.c | 9 | ||||
-rwxr-xr-x | utils/redirector.pl.in | 49 |
6 files changed, 89 insertions, 12 deletions
diff --git a/src/main.h b/src/main.h index 8661346f1..0a727bcce 100644 --- a/src/main.h +++ b/src/main.h @@ -174,6 +174,7 @@ struct worker_task { int sock; /**< socket descriptor */ char *helo; /**< helo header value */ char *from; /**< from header value */ + char *queue_id; /**< queue id if specified */ GList *rcpt; /**< recipients list */ unsigned int nrcpt; /**< number of recipients */ struct in_addr from_addr; /**< client addr in numeric form */ diff --git a/src/message.c b/src/message.c index d1f79ebfb..61f89317e 100644 --- a/src/message.c +++ b/src/message.c @@ -362,7 +362,10 @@ process_message (struct worker_task *task) g_mime_message_foreach_part (message, mime_foreach_callback, task); #endif - msg_info ("process_message: found %d parts in message", task->parts_count); + msg_debug ("process_message: found %d parts in message", task->parts_count); + if (task->queue_id == NULL) { + task->queue_id = (char *)g_mime_message_get_message_id (task->message); + } task->worker->srv->stat->messages_scanned ++; diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c index 8b3928f28..a5ea46865 100644 --- a/src/plugins/surbl.c +++ b/src/plugins/surbl.c @@ -386,11 +386,13 @@ dns_callback (int result, char type, int count, int ttl, void *addresses, void * *(param->url->host + param->url->hostlen) = 0; /* If we have result from DNS server, this url exists in SURBL, so increase score */ if (result == DNS_ERR_NONE && type == DNS_IPv4_A) { - msg_info ("surbl_check: url %s is in surbl %s", param->url->host, param->suffix->suffix); + msg_info ("surbl_check: <%s> url %s is in surbl %s", + param->task->queue_id, param->url->host, param->suffix->suffix); process_dns_results (param->task, param->suffix, param->url->host, (uint32_t)(((in_addr_t *)addresses)[0])); } else { - msg_debug ("surbl_check: url %s is not in surbl %s", param->url->host, param->suffix->suffix); + msg_debug ("surbl_check: <%s> url %s is not in surbl %s", + param->task->queue_id, param->url->host, param->suffix->suffix); } *(param->url->host + param->url->hostlen) = c; @@ -553,7 +555,8 @@ redirector_callback (int fd, short what, void *arg) } else { event_del (¶m->ev); - msg_info ("redirector_callback: connection to redirector timed out while waiting for write"); + msg_info ("redirector_callback: <%s> connection to redirector timed out while waiting for write", + param->task->queue_id); param->task->save.saved --; make_surbl_requests (param->url, param->task, param->tree); @@ -577,7 +580,8 @@ redirector_callback (int fd, short what, void *arg) } } if (*p == '\0') { - msg_info ("redirector_callback: got reply from redirector: '%s' -> '%s'", struri (param->url), c); + msg_info ("redirector_callback: <%s> got reply from redirector: '%s' -> '%s'", + param->task->queue_id, struri (param->url), c); parse_uri (param->url, c, param->task->task_pool); } } @@ -592,7 +596,8 @@ redirector_callback (int fd, short what, void *arg) } else { event_del (¶m->ev); - msg_info ("redirector_callback: reading redirector timed out, while waiting for read"); + msg_info ("redirector_callback: <%s> reading redirector timed out, while waiting for read", + param->task->queue_id); param->task->save.saved --; make_surbl_requests (param->url, param->task, param->tree); if (param->task->save.saved == 0) { @@ -613,10 +618,11 @@ register_redirector_call (struct uri *url, struct worker_task *task, GTree *url_ struct redirector_param *param; struct timeval timeout; - s = make_tcp_socket (&surbl_module_ctx->redirector_addr, htons (surbl_module_ctx->redirector_port), FALSE); + s = make_tcp_socket (&surbl_module_ctx->redirector_addr, surbl_module_ctx->redirector_port, FALSE); if (s == -1) { - msg_info ("register_redirector_call: cannot create tcp socket failed: %s", strerror (errno)); + msg_info ("register_redirector_call: <%s> cannot create tcp socket failed: %s", + task->queue_id, strerror (errno)); task->save.saved --; make_surbl_requests (url, task, url_tree); return; @@ -641,7 +647,7 @@ surbl_test_url (struct worker_task *task) struct memcached_param *param; GTree *url_tree; - url_tree = g_tree_new ((GCompareFunc)g_strcasecmp); + url_tree = g_tree_new ((GCompareFunc)g_ascii_strcasecmp); TAILQ_FOREACH (url, &task->urls, next) { msg_debug ("surbl_test_url: check url %s", struri (url)); @@ -660,7 +666,7 @@ surbl_test_url (struct worker_task *task) } } - g_tree_destroy (url_tree); + memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_tree_destroy, url_tree); return 0; } diff --git a/src/protocol.c b/src/protocol.c index 598c851c6..257426813 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -77,6 +77,7 @@ #define IP_ADDR_HEADER "IP" #define NRCPT_HEADER "Recipient-Number" #define RCPT_HEADER "Rcpt" +#define QUEUE_ID_HEADER "Queue-ID" #define ERROR_HEADER "Error" /* * Reply messages @@ -247,6 +248,18 @@ parse_header (struct worker_task *task, char *line) return -1; } break; + case 'q': + case 'Q': + /* Queue id */ + if (strncasecmp (headern, QUEUE_ID_HEADER, sizeof (QUEUE_ID_HEADER) - 1) == 0) { + task->queue_id = memory_pool_strdup (task->task_pool, line); + msg_debug ("parse_header: read queue_id header, value: %s", task->queue_id); + } + else { + msg_info ("parse_header: wrong header: %s", headern); + return -1; + } + break; case 'r': case 'R': /* rcpt */ diff --git a/src/worker.c b/src/worker.c index 6c3b245c5..ee10d9a84 100644 --- a/src/worker.c +++ b/src/worker.c @@ -220,6 +220,7 @@ accept_socket (int fd, short what, void *arg) { struct rspamd_worker *worker = (struct rspamd_worker *)arg; struct sockaddr_storage ss; + struct sockaddr_in *sin; struct worker_task *new_task; socklen_t addrlen = sizeof(ss); int nfd; @@ -228,6 +229,14 @@ accept_socket (int fd, short what, void *arg) msg_warn ("accept_socket: accept failed: %s", strerror (errno)); return; } + + if (ss.ss_family == AF_UNIX) { + msg_info ("accept_socket: accepted connection from unix socket"); + } + else if (ss.ss_family == AF_INET) { + sin = (struct sockaddr_in *) &ss; + msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port)); + } new_task = g_malloc (sizeof (struct worker_task)); diff --git a/utils/redirector.pl.in b/utils/redirector.pl.in index c406b48a3..f1a7f3196 100755 --- a/utils/redirector.pl.in +++ b/utils/redirector.pl.in @@ -42,10 +42,13 @@ my %cfg = ( logfile => '/var/log/rspamd-redirector.log', do_log => 0, debug => 0, + check_regexp => 'http://[^/]+/', + check_domains => [ 'narod.ru', 'test.ru' ], digest_bits => 256, cache_expire => 3600, user => '@RSPAMD_USER@', group => '@RSPAMD_GROUP@', + cfg_file => '@CMAKE_INSTALL_PREFIX@/etc/rspamd-redirector.conf', ); our $do_reopen_log = 0; @@ -59,8 +62,8 @@ Proc::Daemon::Init if !$cfg{debug}; if ($> == 0) { my $uid = getpwnam($cfg{user}) or die "user $cfg{user} unknown"; my $gid = getgrnam($cfg{group}) or die "group $cfg{group} unknown"; - $< = $uid; - $) = $gid; + $< = $> = $uid; + $) = $( = $gid; } die "Cannot write to pidfile $cfg{pidfile}" if ! open(PID, "> $cfg{pidfile}"); @@ -88,6 +91,7 @@ my $memd = new Cache::Memcached::Fast({ $SIG{USR1} = sub { $do_reopen_log = 1; }; $SIG{INT} = sub { $poe_kernel->stop(); }; $SIG{QUIT} = sub { $poe_kernel->stop(); }; +$SIG{PIPE} = 'IGNORE'; write_log ("", "Starting URL resolver"); @@ -104,6 +108,17 @@ POE::Component::Client::HTTP->spawn( ), ); +sub read_file { + my ($file) = @_; + + open(IN, $file) or die "Can't open $file: $!"; + local $/; + my $content = <IN>; + close IN; + + return $content; +} + sub reopen_log { if ($cfg{do_log}) { close (LOG); @@ -403,6 +418,30 @@ sub process_input { return; } + my $domain; + if ($request->uri =~ /^http:\/\/([^\/]+)\//) { + my @parts = split(/\./, $1); + my $c1 = pop @parts; + my $c2 = pop @parts; + $domain = "$c2.$c1"; + } + + if ((defined($cfg{check_regexp}) && $request->uri !~ $cfg{check_regexp}) || + (defined($cfg{check_domains}) && ($_ = grep(/$domain/, $cfg{check_domains})) == 0)) { + write_log ($heap->{remote_ip}, "Uri is not checked: " . $request->uri); + my $new_response = HTTP::Response->new(200); + $new_response->header("Uri", $request->uri); + $new_response->header("Connection", "close"); + $new_response->header("Proxy-Connection", "close"); + + # Avoid sending the response if the client has gone away. + $heap->{client}->put($new_response) if defined $heap->{client}; + $kernel->yield("shutdown"); + + # Shut down the client's connection when the response is sent. + return; + } + # Check cache first my $redirect = memcached_check_url($request->uri); if ($redirect) { @@ -426,6 +465,12 @@ sub process_input { $kernel->post( "cl", "request", "got_response", $new_request, [0, ""]); } +# Try to eval config file +if (-f $cfg{cfg_file}) { + my $config = read_file ($cfg{cfg_file}); + eval $config; +} + POE::Component::Server::TCP->new ( Alias => "", Port => $cfg{port}, |