package Mail::Rspamd::Client;
use IO::Socket;
+use IO::Select;
my $EOL = "\015\012";
my $BLANK = $EOL x 2;
$self->{socketpath} = $args->{socketpath};
}
else {
- $self->{port} = $args->{port};
- $self->{host} = $args->{host};
+ $self->{hosts} = $args->{hosts};
+ $self->{alive_hosts} = $self->{hosts};
}
if ($args->{username}) {
if ($args->{rcpt}) {
$self->{rcpt} = $args->{rcpt};
}
+ if ($args->{timeout}) {
+ $self->{timeout} = $args->{timeout};
+ }
+ else {
+ $self->{timeout} = 5;
+ }
+ if ($args->{password}) {
+ $self->{password} = $args->{password};
+ }
bless($self, $class);
my $msgsize = length($msg.$EOL);
- print $remote "$command $PROTOVERSION$EOL";
+ local $SIG{PIPE} = 'IGNORE';
+
+ if (!(print $remote "$command $PROTOVERSION$EOL")) {
+ $self->_mark_dead($remote);
+ return 0;
+ }
print $remote "Content-length: $msgsize$EOL";
print $remote "User: $self->{username}$EOL" if ($self->{username});
print $remote "From: $self->{from}$EOL" if ($self->{from});
print $remote "$EOL";
print $remote $msg;
print $remote "$EOL";
+
+ return undef unless $self->_get_io_readiness($remote, 0);
my $line = <$remote>;
return undef unless (defined $line);
return undef unless ($resp_code == 0);
my $cur_metric;
- while ($line = <$remote>) {
+
+ while ($self->_get_io_readiness($remote, 0) && ($line = <$remote>)) {
if ($line =~ m!Metric: (\S+); (\S+); (\S+) / (\S+)!) {
$metrics{$1} = {
isspam => $2,
}
}
- my $return_msg;
- while(<$remote>) {
- $return_msg .= $_;
- }
-
close $remote;
return \%metrics;
}
+sub _auth {
+ my ($self, $sock) = @_;
+
+ local $SIG{PIPE} = 'IGNORE';
+
+ if (!(print $sock "PASSWORD $self->{password}$EOL")) {
+ $self->_mark_dead($remote);
+ return 0;
+ }
+
+ return 0 unless $self->_get_io_readiness($sock, 0);
+
+ if (defined (my $reply = <$sock>)) {
+ my $end = <$sock>;
+ if ($reply =~ /^password accepted/) {
+ return 1;
+ }
+ }
+
+ return 0;
+
+}
+
+=head2 learn
+
+public instance (\%) check (String $msg, String $statfile, Boolean in_class)
+
+Description:
+This method makes a call to the spamd learning a statfile with message.
+
+=cut
+
+sub learn {
+ my ($self, $msg, $statfile, $in_class) = @_;
+
+ my %metrics;
+
+ my $command = 'LEARN';
+
+ $self->_clear_errors();
+
+ my $remote = $self->_create_connection();
+
+ return 0 unless ($remote);
+
+ return 0 unless $self->_auth($remote);
+
+ my $msgsize = length($msg.$EOL);
+
+ local $SIG{PIPE} = 'IGNORE';
+
+ if (!(print $remote "$command $statfile $msgsize$EOL")) {
+ $self->_mark_dead($remote);
+ return 0;
+ }
+
+ print $remote $msg;
+ print $remote "$EOL";
+
+ return undef unless $self->_get_io_readiness($remote, 0);
+ if (defined (my $reply = <$sock>)) {
+ if ($reply =~ /^learn ok/) {
+ close $remote;
+ return 1;
+ }
+ }
+
+ close $remote;
+ return 0;
+}
+
=head2 ping
public instance (Boolean) ping ()
my $remote = $self->_create_connection();
return 0 unless ($remote);
+ local $SIG{PIPE} = 'IGNORE';
- print $remote "PING $PROTOVERSION$EOL";
+ if (!(print $remote "PING $PROTOVERSION$EOL")) {
+ $self->_mark_dead($remote);
+ return 0;
+ }
print $remote "$EOL";
+ return undef unless $self->_get_io_readiness($remote, 0);
my $line = <$remote>;
close $remote;
return undef unless (defined $line);
my ($self) = @_;
my $remote;
+ my $tries = 0;
if ($self->{socketpath}) {
$remote = IO::Socket::UNIX->new( Peer => $self->{socketpath},
Type => SOCK_STREAM,
+ Blocking => 0,
);
+ # Get write readiness
+ if ($self->_get_io_readiness($remote, 1) == 0) {
+ print "Connection timed out: $!\n";
+ return undef;
+ }
}
else {
- $remote = IO::Socket::INET->new( Proto => "tcp",
- PeerAddr => $self->{host},
- PeerPort => $self->{port},
- );
+ my $server;
+
+ do {
+ $server = $self->_select_server();
+ $tries ++;
+
+ $remote = IO::Socket::INET->new( Proto => "tcp",
+ PeerAddr => $server->{host},
+ PeerPort => $server->{port},
+ Blocking => 0,
+ );
+ # Get write readiness
+ if ($self->_get_io_readiness($remote, 1) != 0) {
+ return $remote;
+ }
+ else {
+ next;
+ }
+ } while ($tries < 5);
+
+ return undef unless $server;
}
-
unless ($remote) {
print "Failed to create connection to spamd daemon: $!\n";
return undef;
}
-
$remote;
}
+sub _revive_dead {
+ my ($self) = @_;
+
+ my $now = time();
+ foreach my $s ($self->{dead_hosts}) {
+ # revive after minute of downtime
+ if ($s->{dead} == 1 && $now - $s->{t} > 60) {
+ $s->{dead} = 0;
+ push(@{$self->{alive_hosts}}, $s->{host});
+ }
+ }
+
+ 1;
+}
+
+sub _select_server {
+ my($self) = @_;
+
+ $self->_revive_dead();
+ my $alive_num = scalar(@{$self->{alive_hosts}});
+ if (!$alive_num) {
+ $self->{alive_hosts} = $self->{hosts};
+ $self->{dead_hosts} = ();
+ $alive_num = scalar($self->{alive_hosts});
+ }
+
+ my $selected = $self->{alive_hosts}[int(rand($alive_num))];
+ if ($selected =~ /^(\S+):(\d+)$/) {
+ my $server = {
+ host => $1,
+ port => $2,
+ };
+ return $server;
+ }
+
+ undef;
+}
+
+
+sub _mark_dead {
+ my ($self, $server) = @_;
+
+ my $now = time();
+ $self->{dead_hosts}->{$server} = {
+ host => $server,
+ dead => 1,
+ t => $now,
+ };
+ for (my $i = 0; $i < scalar (@{$self->{alive_hosts}}); $i ++) {
+ if ($self->{alive_hosts} == $server) {
+ splice(@{$self->{alive_hosts}}, $i, 1);
+ last;
+ }
+ }
+}
+
+sub _get_io_readiness {
+ my ($self, $sock, $is_write) = @_;
+ my $s = IO::Select->new();
+ $s->add($sock);
+
+ if ($is_write) {
+ @ready = $s->can_write($self->{timeout});
+ }
+ else {
+ @ready = $s->can_read($self->{timeout});
+ }
+
+
+ scalar(@ready);
+}
+
=head2 _parse_response_line
private instance (@) _parse_response_line (String $line)