diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-10-24 04:51:53 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-10-24 04:51:53 +0400 |
commit | b2c710add941f2db703d28b1eaab2ef2d7a2aa46 (patch) | |
tree | 002ebe2935a8121e91844a616c6d24c8a3fc40e1 | |
parent | fa60a3c26a8aa956199583170c47f8f0347e8279 (diff) | |
download | rspamd-b2c710add941f2db703d28b1eaab2ef2d7a2aa46.tar.gz rspamd-b2c710add941f2db703d28b1eaab2ef2d7a2aa46.zip |
* Implement timeouts in rspamd perl module
* Implement upstream logics
* Add learn command
* Small fixes to other parts of client's API
-rw-r--r-- | perl/Client.pm | 215 |
1 files changed, 199 insertions, 16 deletions
diff --git a/perl/Client.pm b/perl/Client.pm index 13c8e2a80..b82756131 100644 --- a/perl/Client.pm +++ b/perl/Client.pm @@ -30,6 +30,7 @@ the spamd protocol. package Mail::Rspamd::Client; use IO::Socket; +use IO::Select; my $EOL = "\015\012"; my $BLANK = $EOL x 2; @@ -58,8 +59,8 @@ sub new { $self->{socketpath} = $args->{socketpath}; } else { - $self->{port} = $args->{port}; - $self->{host} = $args->{host}; + $self->{hosts} = $args->{hosts}; + $self->{alive_hosts} = $self->{hosts}; } if ($args->{username}) { @@ -77,6 +78,15 @@ sub new { if ($args->{rcpt}) { $self->{rcpt} = $args->{rcpt}; } + if ($args->{timeout}) { + $self->{timeout} = $args->{timeout}; + } + else { + $self->{timeout} = 5; + } + if ($args->{password}) { + $self->{password} = $args->{password}; + } bless($self, $class); @@ -119,7 +129,12 @@ sub check { my $msgsize = length($msg.$EOL); - print $remote "$command $PROTOVERSION$EOL"; + local $SIG{PIPE} = 'IGNORE'; + + if (!(print $remote "$command $PROTOVERSION$EOL")) { + $self->_mark_dead($remote); + return 0; + } print $remote "Content-length: $msgsize$EOL"; print $remote "User: $self->{username}$EOL" if ($self->{username}); print $remote "From: $self->{from}$EOL" if ($self->{from}); @@ -133,6 +148,8 @@ sub check { print $remote "$EOL"; print $remote $msg; print $remote "$EOL"; + + return undef unless $self->_get_io_readiness($remote, 0); my $line = <$remote>; return undef unless (defined $line); @@ -145,7 +162,8 @@ sub check { return undef unless ($resp_code == 0); my $cur_metric; - while ($line = <$remote>) { + + while ($self->_get_io_readiness($remote, 0) && ($line = <$remote>)) { if ($line =~ m!Metric: (\S+); (\S+); (\S+) / (\S+)!) { $metrics{$1} = { isspam => $2, @@ -164,16 +182,82 @@ sub check { } } - my $return_msg; - while(<$remote>) { - $return_msg .= $_; - } - close $remote; return \%metrics; } +sub _auth { + my ($self, $sock) = @_; + + local $SIG{PIPE} = 'IGNORE'; + + if (!(print $sock "PASSWORD $self->{password}$EOL")) { + $self->_mark_dead($remote); + return 0; + } + + return 0 unless $self->_get_io_readiness($sock, 0); + + if (defined (my $reply = <$sock>)) { + my $end = <$sock>; + if ($reply =~ /^password accepted/) { + return 1; + } + } + + return 0; + +} + +=head2 learn + +public instance (\%) check (String $msg, String $statfile, Boolean in_class) + +Description: +This method makes a call to the spamd learning a statfile with message. + +=cut + +sub learn { + my ($self, $msg, $statfile, $in_class) = @_; + + my %metrics; + + my $command = 'LEARN'; + + $self->_clear_errors(); + + my $remote = $self->_create_connection(); + + return 0 unless ($remote); + + return 0 unless $self->_auth($remote); + + my $msgsize = length($msg.$EOL); + + local $SIG{PIPE} = 'IGNORE'; + + if (!(print $remote "$command $statfile $msgsize$EOL")) { + $self->_mark_dead($remote); + return 0; + } + + print $remote $msg; + print $remote "$EOL"; + + return undef unless $self->_get_io_readiness($remote, 0); + if (defined (my $reply = <$sock>)) { + if ($reply =~ /^learn ok/) { + close $remote; + return 1; + } + } + + close $remote; + return 0; +} + =head2 ping public instance (Boolean) ping () @@ -190,10 +274,15 @@ sub ping { my $remote = $self->_create_connection(); return 0 unless ($remote); + local $SIG{PIPE} = 'IGNORE'; - print $remote "PING $PROTOVERSION$EOL"; + if (!(print $remote "PING $PROTOVERSION$EOL")) { + $self->_mark_dead($remote); + return 0; + } print $remote "$EOL"; + return undef unless $self->_get_io_readiness($remote, 0); my $line = <$remote>; close $remote; return undef unless (defined $line); @@ -222,27 +311,121 @@ sub _create_connection { my ($self) = @_; my $remote; + my $tries = 0; if ($self->{socketpath}) { $remote = IO::Socket::UNIX->new( Peer => $self->{socketpath}, Type => SOCK_STREAM, + Blocking => 0, ); + # Get write readiness + if ($self->_get_io_readiness($remote, 1) == 0) { + print "Connection timed out: $!\n"; + return undef; + } } else { - $remote = IO::Socket::INET->new( Proto => "tcp", - PeerAddr => $self->{host}, - PeerPort => $self->{port}, - ); + my $server; + + do { + $server = $self->_select_server(); + $tries ++; + + $remote = IO::Socket::INET->new( Proto => "tcp", + PeerAddr => $server->{host}, + PeerPort => $server->{port}, + Blocking => 0, + ); + # Get write readiness + if ($self->_get_io_readiness($remote, 1) != 0) { + return $remote; + } + else { + next; + } + } while ($tries < 5); + + return undef unless $server; } - unless ($remote) { print "Failed to create connection to spamd daemon: $!\n"; return undef; } - $remote; } +sub _revive_dead { + my ($self) = @_; + + my $now = time(); + foreach my $s ($self->{dead_hosts}) { + # revive after minute of downtime + if ($s->{dead} == 1 && $now - $s->{t} > 60) { + $s->{dead} = 0; + push(@{$self->{alive_hosts}}, $s->{host}); + } + } + + 1; +} + +sub _select_server { + my($self) = @_; + + $self->_revive_dead(); + my $alive_num = scalar(@{$self->{alive_hosts}}); + if (!$alive_num) { + $self->{alive_hosts} = $self->{hosts}; + $self->{dead_hosts} = (); + $alive_num = scalar($self->{alive_hosts}); + } + + my $selected = $self->{alive_hosts}[int(rand($alive_num))]; + if ($selected =~ /^(\S+):(\d+)$/) { + my $server = { + host => $1, + port => $2, + }; + return $server; + } + + undef; +} + + +sub _mark_dead { + my ($self, $server) = @_; + + my $now = time(); + $self->{dead_hosts}->{$server} = { + host => $server, + dead => 1, + t => $now, + }; + for (my $i = 0; $i < scalar (@{$self->{alive_hosts}}); $i ++) { + if ($self->{alive_hosts} == $server) { + splice(@{$self->{alive_hosts}}, $i, 1); + last; + } + } +} + +sub _get_io_readiness { + my ($self, $sock, $is_write) = @_; + my $s = IO::Select->new(); + $s->add($sock); + + if ($is_write) { + @ready = $s->can_write($self->{timeout}); + } + else { + @ready = $s->can_read($self->{timeout}); + } + + + scalar(@ready); +} + =head2 _parse_response_line private instance (@) _parse_response_line (String $line) |