]> source.dussan.org Git - rspamd.git/commitdiff
* Fix redirector connection procedure
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 10 Mar 2009 16:54:20 +0000 (19:54 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 10 Mar 2009 16:54:20 +0000 (19:54 +0300)
* Add more strict login
* Add new header Queue-ID to protocol
* Log message id or queue id
* Add config file for redirector
* Add ability to set regexp and domains list to check with redirector

src/main.h
src/message.c
src/plugins/surbl.c
src/protocol.c
src/worker.c
utils/redirector.pl.in

index 8661346f186b72d92e7fbf657718124a63074a57..0a727bcce3a523d2baf346b4f4bd9ecfa00bbb8b 100644 (file)
@@ -174,6 +174,7 @@ struct worker_task {
        int sock;                                                                                                       /**< socket descriptor                                                          */
        char *helo;                                                                                                     /**< helo header value                                                          */
        char *from;                                                                                                     /**< from header value                                                          */
+       char *queue_id;                                                                                         /**< queue id if specified                                                      */
        GList *rcpt;                                                                                            /**< recipients list                                                            */
        unsigned int nrcpt;                                                                                     /**< number of recipients                                                       */
        struct in_addr from_addr;                                                                       /**< client addr in numeric form                                        */
index d1f79ebfb37ec639719142c1000f7552851574d4..61f89317e4df6cdfd9802fefba1f67e276d953e9 100644 (file)
@@ -362,7 +362,10 @@ process_message (struct worker_task *task)
        g_mime_message_foreach_part (message, mime_foreach_callback, task);
 #endif
        
-       msg_info ("process_message: found %d parts in message", task->parts_count);
+       msg_debug ("process_message: found %d parts in message", task->parts_count);
+       if (task->queue_id == NULL) {
+               task->queue_id = (char *)g_mime_message_get_message_id (task->message);
+       }
 
        task->worker->srv->stat->messages_scanned ++;
 
index 8b3928f287d2ea383d70ce13b74da0d072a82bb8..a5ea46865a6121b4d3faf86023bf41260044eacd 100644 (file)
@@ -386,11 +386,13 @@ dns_callback (int result, char type, int count, int ttl, void *addresses, void *
        *(param->url->host + param->url->hostlen) = 0;
        /* If we have result from DNS server, this url exists in SURBL, so increase score */
        if (result == DNS_ERR_NONE && type == DNS_IPv4_A) {
-               msg_info ("surbl_check: url %s is in surbl %s", param->url->host, param->suffix->suffix);
+               msg_info ("surbl_check: <%s> url %s is in surbl %s", 
+                                       param->task->queue_id, param->url->host, param->suffix->suffix);
                process_dns_results (param->task, param->suffix, param->url->host, (uint32_t)(((in_addr_t *)addresses)[0]));
        }
        else {
-               msg_debug ("surbl_check: url %s is not in surbl %s", param->url->host, param->suffix->suffix);
+               msg_debug ("surbl_check: <%s> url %s is not in surbl %s", 
+                                       param->task->queue_id, param->url->host, param->suffix->suffix);
        }
        *(param->url->host + param->url->hostlen) = c;
        
@@ -553,7 +555,8 @@ redirector_callback (int fd, short what, void *arg)
                        }
                        else {
                                event_del (&param->ev);
-                               msg_info ("redirector_callback: connection to redirector timed out while waiting for write");
+                               msg_info ("redirector_callback: <%s> connection to redirector timed out while waiting for write",
+                                                       param->task->queue_id);
                                param->task->save.saved --;
                                make_surbl_requests (param->url, param->task, param->tree);
 
@@ -577,7 +580,8 @@ redirector_callback (int fd, short what, void *arg)
                                                }
                                        }
                                        if (*p == '\0') {
-                                               msg_info ("redirector_callback: got reply from redirector: '%s' -> '%s'", struri (param->url), c);
+                                               msg_info ("redirector_callback: <%s> got reply from redirector: '%s' -> '%s'", 
+                                                                       param->task->queue_id, struri (param->url), c);
                                                parse_uri (param->url, c, param->task->task_pool);
                                        }
                                }
@@ -592,7 +596,8 @@ redirector_callback (int fd, short what, void *arg)
                        }
                        else {
                                event_del (&param->ev);
-                               msg_info ("redirector_callback: reading redirector timed out, while waiting for read");
+                               msg_info ("redirector_callback: <%s> reading redirector timed out, while waiting for read",
+                                                       param->task->queue_id);
                                param->task->save.saved --;
                                make_surbl_requests (param->url, param->task, param->tree);
                                if (param->task->save.saved == 0) {
@@ -613,10 +618,11 @@ register_redirector_call (struct uri *url, struct worker_task *task, GTree *url_
        struct redirector_param *param;
        struct timeval timeout;
 
-       s = make_tcp_socket (&surbl_module_ctx->redirector_addr, htons (surbl_module_ctx->redirector_port), FALSE);
+       s = make_tcp_socket (&surbl_module_ctx->redirector_addr, surbl_module_ctx->redirector_port, FALSE);
 
        if (s == -1) {
-               msg_info ("register_redirector_call: cannot create tcp socket failed: %s", strerror (errno));
+               msg_info ("register_redirector_call: <%s> cannot create tcp socket failed: %s", 
+                                       task->queue_id, strerror (errno));
                task->save.saved --;
                make_surbl_requests (url, task, url_tree);
                return; 
@@ -641,7 +647,7 @@ surbl_test_url (struct worker_task *task)
        struct memcached_param *param;
        GTree *url_tree;
 
-       url_tree = g_tree_new ((GCompareFunc)g_strcasecmp);
+       url_tree = g_tree_new ((GCompareFunc)g_ascii_strcasecmp);
 
        TAILQ_FOREACH (url, &task->urls, next) {
                msg_debug ("surbl_test_url: check url %s", struri (url));
@@ -660,7 +666,7 @@ surbl_test_url (struct worker_task *task)
                }
        }
 
-       g_tree_destroy (url_tree);
+       memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_tree_destroy, url_tree);
        return 0;
 }
 
index 598c851c6d50d42627b0e60bc7caa4a7b3c182dd..257426813deba378f3c77c7072a9e4b11a554e8d 100644 (file)
@@ -77,6 +77,7 @@
 #define IP_ADDR_HEADER "IP"
 #define NRCPT_HEADER "Recipient-Number"
 #define RCPT_HEADER "Rcpt"
+#define QUEUE_ID_HEADER "Queue-ID"
 #define ERROR_HEADER "Error"
 /*
  * Reply messages
@@ -247,6 +248,18 @@ parse_header (struct worker_task *task, char *line)
                                return -1;
                        }
                        break;
+               case 'q':
+               case 'Q':
+                       /* Queue id */
+                       if (strncasecmp (headern, QUEUE_ID_HEADER, sizeof (QUEUE_ID_HEADER) - 1) == 0) {
+                               task->queue_id = memory_pool_strdup (task->task_pool, line);
+                               msg_debug ("parse_header: read queue_id header, value: %s", task->queue_id);
+                       }
+                       else {
+                               msg_info ("parse_header: wrong header: %s", headern);
+                               return -1;
+                       }
+                       break;
                case 'r':
                case 'R':
                        /* rcpt */
index 6c3b245c5bcedd0c87e54ef5f2f34f8ea58b1356..ee10d9a848f84340ea9315c0e74f7228ba073533 100644 (file)
@@ -220,6 +220,7 @@ accept_socket (int fd, short what, void *arg)
 {
        struct rspamd_worker *worker = (struct rspamd_worker *)arg;
        struct sockaddr_storage ss;
+    struct sockaddr_in *sin;
        struct worker_task *new_task;
        socklen_t addrlen = sizeof(ss);
        int nfd;
@@ -228,6 +229,14 @@ accept_socket (int fd, short what, void *arg)
                msg_warn ("accept_socket: accept failed: %s", strerror (errno));
                return;
        }
+
+    if (ss.ss_family == AF_UNIX) {
+        msg_info ("accept_socket: accepted connection from unix socket");
+    }
+    else if (ss.ss_family == AF_INET) {
+        sin = (struct sockaddr_in *) &ss;
+        msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
+    }
        
        new_task = g_malloc (sizeof (struct worker_task));
 
index c406b48a3a042e1abd37abc451dd7206239e898a..f1a7f3196f4df2bb4b42caaff26d51ad8d4b209f 100755 (executable)
@@ -42,10 +42,13 @@ my %cfg = (
     logfile =>  '/var/log/rspamd-redirector.log',
     do_log  =>  0,
     debug   =>  0,
+    check_regexp => 'http://[^/]+/',
+    check_domains =>  [ 'narod.ru', 'test.ru' ],
     digest_bits => 256,
     cache_expire => 3600,
     user    => '@RSPAMD_USER@',
     group   => '@RSPAMD_GROUP@',
+    cfg_file => '@CMAKE_INSTALL_PREFIX@/etc/rspamd-redirector.conf',
 );
 
 our $do_reopen_log = 0;
@@ -59,8 +62,8 @@ Proc::Daemon::Init if !$cfg{debug};
 if ($> == 0) {
     my $uid = getpwnam($cfg{user}) or die "user $cfg{user} unknown";
     my $gid = getgrnam($cfg{group}) or die "group $cfg{group} unknown";
-    $< = $uid;
-    $) = $gid;
+    $< = $> = $uid;
+    $) = $( = $gid;
 }
 
 die "Cannot write to pidfile $cfg{pidfile}" if ! open(PID, "> $cfg{pidfile}");
@@ -88,6 +91,7 @@ my $memd = new Cache::Memcached::Fast({
 $SIG{USR1} = sub { $do_reopen_log = 1; };
 $SIG{INT} = sub { $poe_kernel->stop(); };
 $SIG{QUIT} = sub { $poe_kernel->stop(); };
+$SIG{PIPE} = 'IGNORE';
 
 write_log ("", "Starting URL resolver");
 
@@ -104,6 +108,17 @@ POE::Component::Client::HTTP->spawn(
     ), 
 );
 
+sub read_file {
+    my ($file) = @_;
+
+    open(IN, $file) or die "Can't open $file: $!";
+    local $/;
+    my $content = <IN>;
+    close IN;
+
+    return $content;
+}
+
 sub reopen_log {
     if ($cfg{do_log}) {
         close (LOG);
@@ -403,6 +418,30 @@ sub process_input {
         return;
     }
     
+    my $domain;
+    if ($request->uri =~ /^http:\/\/([^\/]+)\//) {
+        my @parts = split(/\./, $1);
+        my $c1 = pop @parts;
+        my $c2 = pop @parts;
+        $domain = "$c2.$c1";
+    }
+
+    if ((defined($cfg{check_regexp}) && $request->uri !~ $cfg{check_regexp}) ||
+        (defined($cfg{check_domains}) && ($_ = grep(/$domain/, $cfg{check_domains})) == 0)) {
+        write_log ($heap->{remote_ip}, "Uri is not checked: " . $request->uri);
+        my $new_response = HTTP::Response->new(200);
+        $new_response->header("Uri", $request->uri); 
+        $new_response->header("Connection", "close"); 
+        $new_response->header("Proxy-Connection", "close"); 
+
+        # Avoid sending the response if the client has gone away.
+        $heap->{client}->put($new_response) if defined $heap->{client};
+        $kernel->yield("shutdown");
+
+        # Shut down the client's connection when the response is sent.
+        return;
+    }
+
     # Check cache first
     my $redirect = memcached_check_url($request->uri);
     if ($redirect) {
@@ -426,6 +465,12 @@ sub process_input {
     $kernel->post( "cl", "request", "got_response", $new_request, [0, ""]);
 }
 
+# Try to eval config file
+if (-f $cfg{cfg_file}) {
+    my $config = read_file ($cfg{cfg_file});
+    eval $config;
+}
+
 POE::Component::Server::TCP->new
   ( Alias => "",
     Port         => $cfg{port},