aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-03-10 19:54:20 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-03-10 19:54:20 +0300
commite7017a518bbec3f2de37275ac6195ea55cb6b19e (patch)
tree084f1c81a1d0606e575abd2156e5c4314615652c
parent9cfc5813c0a07ddce2bd0c611a9de84f67a4a910 (diff)
downloadrspamd-e7017a518bbec3f2de37275ac6195ea55cb6b19e.tar.gz
rspamd-e7017a518bbec3f2de37275ac6195ea55cb6b19e.zip
* Fix redirector connection procedure
* Add more strict login * Add new header Queue-ID to protocol * Log message id or queue id * Add config file for redirector * Add ability to set regexp and domains list to check with redirector
-rw-r--r--src/main.h1
-rw-r--r--src/message.c5
-rw-r--r--src/plugins/surbl.c24
-rw-r--r--src/protocol.c13
-rw-r--r--src/worker.c9
-rwxr-xr-xutils/redirector.pl.in49
6 files changed, 89 insertions, 12 deletions
diff --git a/src/main.h b/src/main.h
index 8661346f1..0a727bcce 100644
--- a/src/main.h
+++ b/src/main.h
@@ -174,6 +174,7 @@ struct worker_task {
int sock; /**< socket descriptor */
char *helo; /**< helo header value */
char *from; /**< from header value */
+ char *queue_id; /**< queue id if specified */
GList *rcpt; /**< recipients list */
unsigned int nrcpt; /**< number of recipients */
struct in_addr from_addr; /**< client addr in numeric form */
diff --git a/src/message.c b/src/message.c
index d1f79ebfb..61f89317e 100644
--- a/src/message.c
+++ b/src/message.c
@@ -362,7 +362,10 @@ process_message (struct worker_task *task)
g_mime_message_foreach_part (message, mime_foreach_callback, task);
#endif
- msg_info ("process_message: found %d parts in message", task->parts_count);
+ msg_debug ("process_message: found %d parts in message", task->parts_count);
+ if (task->queue_id == NULL) {
+ task->queue_id = (char *)g_mime_message_get_message_id (task->message);
+ }
task->worker->srv->stat->messages_scanned ++;
diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c
index 8b3928f28..a5ea46865 100644
--- a/src/plugins/surbl.c
+++ b/src/plugins/surbl.c
@@ -386,11 +386,13 @@ dns_callback (int result, char type, int count, int ttl, void *addresses, void *
*(param->url->host + param->url->hostlen) = 0;
/* If we have result from DNS server, this url exists in SURBL, so increase score */
if (result == DNS_ERR_NONE && type == DNS_IPv4_A) {
- msg_info ("surbl_check: url %s is in surbl %s", param->url->host, param->suffix->suffix);
+ msg_info ("surbl_check: <%s> url %s is in surbl %s",
+ param->task->queue_id, param->url->host, param->suffix->suffix);
process_dns_results (param->task, param->suffix, param->url->host, (uint32_t)(((in_addr_t *)addresses)[0]));
}
else {
- msg_debug ("surbl_check: url %s is not in surbl %s", param->url->host, param->suffix->suffix);
+ msg_debug ("surbl_check: <%s> url %s is not in surbl %s",
+ param->task->queue_id, param->url->host, param->suffix->suffix);
}
*(param->url->host + param->url->hostlen) = c;
@@ -553,7 +555,8 @@ redirector_callback (int fd, short what, void *arg)
}
else {
event_del (&param->ev);
- msg_info ("redirector_callback: connection to redirector timed out while waiting for write");
+ msg_info ("redirector_callback: <%s> connection to redirector timed out while waiting for write",
+ param->task->queue_id);
param->task->save.saved --;
make_surbl_requests (param->url, param->task, param->tree);
@@ -577,7 +580,8 @@ redirector_callback (int fd, short what, void *arg)
}
}
if (*p == '\0') {
- msg_info ("redirector_callback: got reply from redirector: '%s' -> '%s'", struri (param->url), c);
+ msg_info ("redirector_callback: <%s> got reply from redirector: '%s' -> '%s'",
+ param->task->queue_id, struri (param->url), c);
parse_uri (param->url, c, param->task->task_pool);
}
}
@@ -592,7 +596,8 @@ redirector_callback (int fd, short what, void *arg)
}
else {
event_del (&param->ev);
- msg_info ("redirector_callback: reading redirector timed out, while waiting for read");
+ msg_info ("redirector_callback: <%s> reading redirector timed out, while waiting for read",
+ param->task->queue_id);
param->task->save.saved --;
make_surbl_requests (param->url, param->task, param->tree);
if (param->task->save.saved == 0) {
@@ -613,10 +618,11 @@ register_redirector_call (struct uri *url, struct worker_task *task, GTree *url_
struct redirector_param *param;
struct timeval timeout;
- s = make_tcp_socket (&surbl_module_ctx->redirector_addr, htons (surbl_module_ctx->redirector_port), FALSE);
+ s = make_tcp_socket (&surbl_module_ctx->redirector_addr, surbl_module_ctx->redirector_port, FALSE);
if (s == -1) {
- msg_info ("register_redirector_call: cannot create tcp socket failed: %s", strerror (errno));
+ msg_info ("register_redirector_call: <%s> cannot create tcp socket failed: %s",
+ task->queue_id, strerror (errno));
task->save.saved --;
make_surbl_requests (url, task, url_tree);
return;
@@ -641,7 +647,7 @@ surbl_test_url (struct worker_task *task)
struct memcached_param *param;
GTree *url_tree;
- url_tree = g_tree_new ((GCompareFunc)g_strcasecmp);
+ url_tree = g_tree_new ((GCompareFunc)g_ascii_strcasecmp);
TAILQ_FOREACH (url, &task->urls, next) {
msg_debug ("surbl_test_url: check url %s", struri (url));
@@ -660,7 +666,7 @@ surbl_test_url (struct worker_task *task)
}
}
- g_tree_destroy (url_tree);
+ memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_tree_destroy, url_tree);
return 0;
}
diff --git a/src/protocol.c b/src/protocol.c
index 598c851c6..257426813 100644
--- a/src/protocol.c
+++ b/src/protocol.c
@@ -77,6 +77,7 @@
#define IP_ADDR_HEADER "IP"
#define NRCPT_HEADER "Recipient-Number"
#define RCPT_HEADER "Rcpt"
+#define QUEUE_ID_HEADER "Queue-ID"
#define ERROR_HEADER "Error"
/*
* Reply messages
@@ -247,6 +248,18 @@ parse_header (struct worker_task *task, char *line)
return -1;
}
break;
+ case 'q':
+ case 'Q':
+ /* Queue id */
+ if (strncasecmp (headern, QUEUE_ID_HEADER, sizeof (QUEUE_ID_HEADER) - 1) == 0) {
+ task->queue_id = memory_pool_strdup (task->task_pool, line);
+ msg_debug ("parse_header: read queue_id header, value: %s", task->queue_id);
+ }
+ else {
+ msg_info ("parse_header: wrong header: %s", headern);
+ return -1;
+ }
+ break;
case 'r':
case 'R':
/* rcpt */
diff --git a/src/worker.c b/src/worker.c
index 6c3b245c5..ee10d9a84 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -220,6 +220,7 @@ accept_socket (int fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
struct sockaddr_storage ss;
+ struct sockaddr_in *sin;
struct worker_task *new_task;
socklen_t addrlen = sizeof(ss);
int nfd;
@@ -228,6 +229,14 @@ accept_socket (int fd, short what, void *arg)
msg_warn ("accept_socket: accept failed: %s", strerror (errno));
return;
}
+
+ if (ss.ss_family == AF_UNIX) {
+ msg_info ("accept_socket: accepted connection from unix socket");
+ }
+ else if (ss.ss_family == AF_INET) {
+ sin = (struct sockaddr_in *) &ss;
+ msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
+ }
new_task = g_malloc (sizeof (struct worker_task));
diff --git a/utils/redirector.pl.in b/utils/redirector.pl.in
index c406b48a3..f1a7f3196 100755
--- a/utils/redirector.pl.in
+++ b/utils/redirector.pl.in
@@ -42,10 +42,13 @@ my %cfg = (
logfile => '/var/log/rspamd-redirector.log',
do_log => 0,
debug => 0,
+ check_regexp => 'http://[^/]+/',
+ check_domains => [ 'narod.ru', 'test.ru' ],
digest_bits => 256,
cache_expire => 3600,
user => '@RSPAMD_USER@',
group => '@RSPAMD_GROUP@',
+ cfg_file => '@CMAKE_INSTALL_PREFIX@/etc/rspamd-redirector.conf',
);
our $do_reopen_log = 0;
@@ -59,8 +62,8 @@ Proc::Daemon::Init if !$cfg{debug};
if ($> == 0) {
my $uid = getpwnam($cfg{user}) or die "user $cfg{user} unknown";
my $gid = getgrnam($cfg{group}) or die "group $cfg{group} unknown";
- $< = $uid;
- $) = $gid;
+ $< = $> = $uid;
+ $) = $( = $gid;
}
die "Cannot write to pidfile $cfg{pidfile}" if ! open(PID, "> $cfg{pidfile}");
@@ -88,6 +91,7 @@ my $memd = new Cache::Memcached::Fast({
$SIG{USR1} = sub { $do_reopen_log = 1; };
$SIG{INT} = sub { $poe_kernel->stop(); };
$SIG{QUIT} = sub { $poe_kernel->stop(); };
+$SIG{PIPE} = 'IGNORE';
write_log ("", "Starting URL resolver");
@@ -104,6 +108,17 @@ POE::Component::Client::HTTP->spawn(
),
);
+sub read_file {
+ my ($file) = @_;
+
+ open(IN, $file) or die "Can't open $file: $!";
+ local $/;
+ my $content = <IN>;
+ close IN;
+
+ return $content;
+}
+
sub reopen_log {
if ($cfg{do_log}) {
close (LOG);
@@ -403,6 +418,30 @@ sub process_input {
return;
}
+ my $domain;
+ if ($request->uri =~ /^http:\/\/([^\/]+)\//) {
+ my @parts = split(/\./, $1);
+ my $c1 = pop @parts;
+ my $c2 = pop @parts;
+ $domain = "$c2.$c1";
+ }
+
+ if ((defined($cfg{check_regexp}) && $request->uri !~ $cfg{check_regexp}) ||
+ (defined($cfg{check_domains}) && ($_ = grep(/$domain/, $cfg{check_domains})) == 0)) {
+ write_log ($heap->{remote_ip}, "Uri is not checked: " . $request->uri);
+ my $new_response = HTTP::Response->new(200);
+ $new_response->header("Uri", $request->uri);
+ $new_response->header("Connection", "close");
+ $new_response->header("Proxy-Connection", "close");
+
+ # Avoid sending the response if the client has gone away.
+ $heap->{client}->put($new_response) if defined $heap->{client};
+ $kernel->yield("shutdown");
+
+ # Shut down the client's connection when the response is sent.
+ return;
+ }
+
# Check cache first
my $redirect = memcached_check_url($request->uri);
if ($redirect) {
@@ -426,6 +465,12 @@ sub process_input {
$kernel->post( "cl", "request", "got_response", $new_request, [0, ""]);
}
+# Try to eval config file
+if (-f $cfg{cfg_file}) {
+ my $config = read_file ($cfg{cfg_file});
+ eval $config;
+}
+
POE::Component::Server::TCP->new
( Alias => "",
Port => $cfg{port},