]> source.dussan.org Git - rspamd.git/commitdiff
* Implement timeouts in rspamd perl module
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Sat, 24 Oct 2009 00:51:53 +0000 (04:51 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Sat, 24 Oct 2009 00:51:53 +0000 (04:51 +0400)
* Implement upstream logics
* Add learn command
* Small fixes to other parts of client's API

perl/Client.pm

index 13c8e2a8020d16a3b38f1339c19611b693231d6c..b82756131e0f2136cd08dec292ed9851218876da 100644 (file)
@@ -30,6 +30,7 @@ the spamd protocol.
 package Mail::Rspamd::Client;
 
 use IO::Socket;
+use IO::Select;
 
 my $EOL = "\015\012";
 my $BLANK = $EOL x 2;
@@ -58,8 +59,8 @@ sub new {
     $self->{socketpath} = $args->{socketpath};
   }
   else {
-    $self->{port} = $args->{port};
-    $self->{host} = $args->{host};
+    $self->{hosts} = $args->{hosts};
+    $self->{alive_hosts} = $self->{hosts};
   }
 
   if ($args->{username}) {
@@ -77,6 +78,15 @@ sub new {
   if ($args->{rcpt}) {
     $self->{rcpt} = $args->{rcpt};
   }
+  if ($args->{timeout}) {
+    $self->{timeout} = $args->{timeout};
+  }
+  else {
+    $self->{timeout} = 5;
+  }
+  if ($args->{password}) {
+    $self->{password} = $args->{password};
+  }
 
   bless($self, $class);
 
@@ -119,7 +129,12 @@ sub check {
 
   my $msgsize = length($msg.$EOL);
 
-  print $remote "$command $PROTOVERSION$EOL";
+  local $SIG{PIPE} = 'IGNORE';
+
+  if (!(print $remote "$command $PROTOVERSION$EOL")) {
+    $self->_mark_dead($remote);
+    return 0;
+  }
   print $remote "Content-length: $msgsize$EOL";
   print $remote "User: $self->{username}$EOL" if ($self->{username});
   print $remote "From: $self->{from}$EOL" if ($self->{from});
@@ -133,6 +148,8 @@ sub check {
   print $remote "$EOL";
   print $remote $msg;
   print $remote "$EOL";
+  
+  return undef unless $self->_get_io_readiness($remote, 0);
 
   my $line = <$remote>;
   return undef unless (defined $line);
@@ -145,7 +162,8 @@ sub check {
   return undef unless ($resp_code == 0);
 
   my $cur_metric;
-  while ($line = <$remote>) {
+
+  while ($self->_get_io_readiness($remote, 0) && ($line = <$remote>)) {
     if ($line =~ m!Metric: (\S+); (\S+); (\S+) / (\S+)!) {
       $metrics{$1} = {
         isspam => $2,
@@ -164,16 +182,82 @@ sub check {
     }
   }
 
-  my $return_msg;
-  while(<$remote>) {
-    $return_msg .= $_;
-  }
-
   close $remote;
 
   return \%metrics;
 }
 
+sub _auth {
+  my ($self, $sock) = @_;
+
+  local $SIG{PIPE} = 'IGNORE';
+
+  if (!(print $sock "PASSWORD $self->{password}$EOL")) {
+    $self->_mark_dead($remote);
+    return 0;
+  }
+
+  return 0 unless $self->_get_io_readiness($sock, 0);
+
+  if (defined (my $reply = <$sock>)) {
+    my $end = <$sock>;
+    if ($reply =~ /^password accepted/) {
+      return 1;
+    }
+  }
+
+  return 0;
+  
+}
+
+=head2 learn
+
+public instance (\%) check (String $msg, String $statfile, Boolean in_class)
+
+Description:
+This method makes a call to the spamd learning a statfile with message.
+
+=cut
+
+sub learn {
+  my ($self, $msg, $statfile, $in_class) = @_;
+
+  my %metrics;
+
+  my $command = 'LEARN';
+
+  $self->_clear_errors();
+
+  my $remote = $self->_create_connection();
+
+  return 0 unless ($remote);
+
+  return 0 unless $self->_auth($remote);
+
+  my $msgsize = length($msg.$EOL);
+
+  local $SIG{PIPE} = 'IGNORE';
+
+  if (!(print $remote "$command $statfile $msgsize$EOL")) {
+    $self->_mark_dead($remote);
+    return 0;
+  }
+
+  print $remote $msg;
+  print $remote "$EOL";
+  
+  return undef unless $self->_get_io_readiness($remote, 0);
+  if (defined (my $reply = <$sock>)) {
+    if ($reply =~ /^learn ok/) {
+      close $remote;
+      return 1;
+    }
+  }
+
+  close $remote;
+  return 0;
+}
+
 =head2 ping
 
 public instance (Boolean) ping ()
@@ -190,10 +274,15 @@ sub ping {
   my $remote = $self->_create_connection();
 
   return 0 unless ($remote);
+  local $SIG{PIPE} = 'IGNORE';
 
-  print $remote "PING $PROTOVERSION$EOL";
+  if (!(print $remote "PING $PROTOVERSION$EOL")) {
+    $self->_mark_dead($remote);
+    return 0;
+  }
   print $remote "$EOL";
 
+  return undef unless $self->_get_io_readiness($remote, 0);
   my $line = <$remote>;
   close $remote;
   return undef unless (defined $line);
@@ -222,27 +311,121 @@ sub _create_connection {
   my ($self) = @_;
 
   my $remote;
+  my $tries = 0;
 
   if ($self->{socketpath}) {
     $remote = IO::Socket::UNIX->new( Peer => $self->{socketpath},
                                     Type => SOCK_STREAM,
+             Blocking  => 0,
                                   );
+    # Get write readiness
+    if ($self->_get_io_readiness($remote, 1) == 0) {
+      print "Connection timed out: $!\n";
+      return undef;
+    }
   }
   else {
-    $remote = IO::Socket::INET->new( Proto     => "tcp",
-                                    PeerAddr  => $self->{host},
-                                    PeerPort  => $self->{port},
-                                  );
+    my $server;
+
+    do {
+      $server = $self->_select_server();
+      $tries ++;
+
+      $remote = IO::Socket::INET->new( Proto     => "tcp",
+                PeerAddr  => $server->{host},
+                PeerPort  => $server->{port},
+                Blocking  => 0,
+              );
+      # Get write readiness
+      if ($self->_get_io_readiness($remote, 1) != 0) {
+        return $remote;
+      }
+      else {
+        next;
+      }
+    } while ($tries < 5);
+
+    return undef unless $server;
   }
-
   unless ($remote) {
     print "Failed to create connection to spamd daemon: $!\n";
     return undef;
   }
-
   $remote;
 }
 
+sub _revive_dead {
+  my ($self) = @_;
+
+  my $now = time();
+  foreach my $s ($self->{dead_hosts}) {
+    # revive after minute of downtime
+    if ($s->{dead} == 1 && $now - $s->{t} > 60) {
+      $s->{dead} = 0;
+      push(@{$self->{alive_hosts}}, $s->{host});
+    }
+  }
+
+  1;
+}
+
+sub _select_server {
+  my($self) = @_;
+    
+  $self->_revive_dead();
+  my $alive_num = scalar(@{$self->{alive_hosts}});
+  if (!$alive_num) {
+    $self->{alive_hosts} = $self->{hosts};
+    $self->{dead_hosts} = ();
+    $alive_num = scalar($self->{alive_hosts});
+  }
+  
+  my $selected = $self->{alive_hosts}[int(rand($alive_num))];
+  if ($selected =~ /^(\S+):(\d+)$/) {
+    my $server = {
+        host => $1,
+        port => $2,
+    };
+    return $server;
+  }
+
+  undef;
+}
+
+
+sub _mark_dead {
+  my ($self, $server) = @_;
+  
+  my $now = time();
+  $self->{dead_hosts}->{$server} = {
+    host => $server,
+    dead => 1,
+    t => $now,
+  };
+  for (my $i = 0; $i < scalar (@{$self->{alive_hosts}}); $i ++) {
+    if ($self->{alive_hosts} == $server) {
+      splice(@{$self->{alive_hosts}}, $i, 1);
+      last;
+    }
+  }
+}
+
+sub _get_io_readiness {
+  my ($self, $sock, $is_write) = @_;
+  my $s = IO::Select->new();
+  $s->add($sock);
+
+  if ($is_write) {
+    @ready = $s->can_write($self->{timeout});
+  }
+  else {
+    @ready = $s->can_read($self->{timeout});
+  }
+  
+
+  scalar(@ready);
+}
+
 =head2 _parse_response_line
 
 private instance (@) _parse_response_line (String $line)