summaryrefslogtreecommitdiffstats
path: root/perl
diff options
context:
space:
mode:
authorcebka@lenovo-laptop <cebka@lenovo-laptop>2010-03-03 19:34:51 +0300
committercebka@lenovo-laptop <cebka@lenovo-laptop>2010-03-03 19:34:51 +0300
commit9097fc043e38a27241381e383b97e00254fbf711 (patch)
tree39dd2467542a289d22ad43daa6f80c572e8262e6 /perl
parentdda6a0c72bcfc98fd151df83f99cd3fbebb4ba76 (diff)
downloadrspamd-9097fc043e38a27241381e383b97e00254fbf711.tar.gz
rspamd-9097fc043e38a27241381e383b97e00254fbf711.zip
* New Mail::Rspamd::Client
things TODO: - improve interaction with rspamc - improve documentation - test all features - test clustering - write CGI front-end
Diffstat (limited to 'perl')
-rw-r--r--perl/lib/Mail/Rspamd/Client.pm1121
1 files changed, 856 insertions, 265 deletions
diff --git a/perl/lib/Mail/Rspamd/Client.pm b/perl/lib/Mail/Rspamd/Client.pm
index 1a1d6d095..d850a093a 100644
--- a/perl/lib/Mail/Rspamd/Client.pm
+++ b/perl/lib/Mail/Rspamd/Client.pm
@@ -6,12 +6,10 @@ Mail::Rspamd::Client - Client for rspamd Protocol
=head1 SYNOPSIS
- my $client = new Mail::Rspamd::Client({port => 11333,
- host => 'localhost',
- ip => '127.0.0.1'});
+ my $client = new Mail::Rspamd::Client($config);
if ($client->ping()) {
- print "Ping is ok\n";
+ $self->{error} = "Ping is ok\n";
}
my $result = $client->check($testmsg);
@@ -32,7 +30,7 @@ package Mail::Rspamd::Client;
use IO::Socket;
use vars qw($VERSION);
-$VERSION = "1.00";
+$VERSION = "1.01";
my $EOL = "\015\012";
my $BLANK = $EOL x 2;
@@ -50,51 +48,539 @@ This method creates a new Mail::Rspamd::Client object.
=cut
sub new {
- my ($class, $args) = @_;
+ my ($class, $args) = @_;
+
+ $class = ref($class) || $class;
+
+ my $self = {};
+
+ # with a sockets_path set then it makes no sense to set host and port
+ if ($args->{hosts}) {
+ $self->{hosts} = $args->{hosts};
+ $self->{alive_hosts} = $self->{hosts};
+ }
+
+ if ($args->{username}) {
+ $self->{username} = $args->{username};
+ }
+ if ($args->{ip}) {
+ $self->{ip} = $args->{ip};
+ }
+ if ($args->{from}) {
+ $self->{from} = $args->{from};
+ }
+ if ($args->{subject}) {
+ $self->{subject} = $args->{subject};
+ }
+ if ($args->{rcpt}) {
+ $self->{rcpt} = $args->{rcpt};
+ }
+ if ($args->{timeout}) {
+ $self->{timeout} = $args->{timeout};
+ }
+ else {
+ $self->{timeout} = 5;
+ }
+ if ($args->{password}) {
+ $self->{password} = $args->{password};
+ }
+ if ($args->{statfile}) {
+ $self->{statfile} = $args->{statfile};
+ }
+ if ($args->{weight}) {
+ $self->{weight} = $args->{weight};
+ }
+ else {
+ $self->{weight} = 1;
+ }
+ if ($args->{imap_search}) {
+ $self->{imap_search} = $args->{imap_search};
+ }
+ else {
+ $self->{imap_search} = 'ALL';
+ }
+
+ if ($args->{command}) {
+ if ($args->{command} =~ /(SYMBOLS|PROCESS|CHECK|URLS|EMAILS)/i) {
+ $self->{'command'} = $1;
+ $self->{'control'} = 0;
+ }
+ elsif ($args->{command} =~ /(STAT|LEARN|SHUTDOWN|RELOAD|UPTIME|COUNTERS|FUZZY_ADD|FUZZY_DEL|WEIGHTS)/i) {
+ $self->{'command'} = $1;
+ $self->{'control'} = 1;
+ }
+ }
- $class = ref($class) || $class;
+ $self->{error} = "";
- my $self = {};
+ bless($self, $class);
- # with a sockets_path set then it makes no sense to set host and port
- if ($args->{socketpath}) {
- $self->{socketpath} = $args->{socketpath};
- }
- else {
- $self->{hosts} = $args->{hosts};
- $self->{alive_hosts} = $self->{hosts};
- }
+ $self;
+}
- if ($args->{username}) {
- $self->{username} = $args->{username};
- }
- if ($args->{ip}) {
- $self->{ip} = $args->{ip};
- }
- if ($args->{from}) {
- $self->{from} = $args->{from};
- }
- if ($args->{subject}) {
- $self->{subject} = $args->{subject};
- }
- if ($args->{rcpt}) {
- $self->{rcpt} = $args->{rcpt};
- }
- if ($args->{timeout}) {
- $self->{timeout} = $args->{timeout};
- }
- else {
- $self->{timeout} = 5;
- }
- if ($args->{password}) {
- $self->{password} = $args->{password};
- }
- bless($self, $class);
+sub make_ssl_socket {
+ my ($host, $port) = @_;
+
+ eval {
+ use IO::Socket::SSL;
+ } or $self->{error} = "IO::Socket::SSL required for imaps";
+
+ return IO::Socket::SSL->new("$host:$port");
+}
+
+
+# Currently just read stdin for user's message and pass it to rspamd
+sub _do_rspamc_command {
+ my ($self, $remote, $msg) = @_;
+
+ my %metrics;
+
+
+ my $msgsize = length($msg.$EOL);
+
+ local $SIG{PIPE} = 'IGNORE';
+
+ if (!(syswrite($remote, "$self->{command} $PROTOVERSION$EOL"))) {
+ $self->_mark_dead($remote);
+ return 0;
+ }
+ syswrite $remote, "Content-length: $msgsize$EOL";
+ syswrite $remote, "User: $self->{username}$EOL" if ($self->{username});
+ syswrite $remote, "From: $self->{from}$EOL" if ($self->{from});
+ syswrite $remote, "IP: $self->{ip}$EOL" if ($self->{ip});
+ syswrite $remote, "Subject: $self->{subject}$EOL" if ($self->{subject});
+ if (ref $self->{rcpt} eq "ARRAY") {
+ foreach ($self->{rcpt}) {
+ syswrite $remote, "Rcpt: $_ $EOL";
+ }
+ }
+ syswrite $remote, $EOL;
+ syswrite $remote, $msg;
+ syswrite $remote, $EOL;
+
+ return undef unless $self->_get_io_readiness($remote, 0);
+
+ my ($in, $res);
+ my $offset = 0;
+ do {
+ $res = sysread($remote, $in, 512, $offset);
+ if ($res > 0 && $res < 512) {
+ $self->_get_io_readiness($remote, 0);
+ }
+ $offset += $res;
+ } while ($res > 0);
+
+ my ($version, $resp_code, $resp_msg) = $self->_parse_response_line($in);
+
+ $self->{resp_code} = $resp_code;
+ $self->{resp_msg} = $resp_msg;
+
+ return undef unless (defined($resp_code) && $resp_code == 0);
+
+ my $cur_metric;
+ my @lines = split (/^/, $in);
+ foreach my $line (@lines) {
+ if ($line =~ m!Metric: (\S+); (\S+); (\S+) / (\S+)!) {
+ $metrics{$1} = {
+ isspam => $2,
+ score => $3 + 0,
+ threshold => $4 + 0,
+ symbols => [],
+ urls => [],
+ messages => [],
+ };
+ $cur_metric = $1;
+ }
+ elsif ($line =~ /^Symbol: (\S+)/ && $cur_metric) {
+ my $symref = $metrics{$cur_metric}->{'symbols'};
+ push(@$symref, $1);
+ }
+ elsif ($line =~ /^Urls: (\S+)/ && $cur_metric) {
+ @{ $metrics{$cur_metric}->{'urls'} } = split /\s/, $1;
+ }
+ elsif ($line =~ /^Message: (\S+)/ && $cur_metric) {
+ my $symref = $metrics{$cur_metric}->{'messages'};
+ push(@$symref, $1);
+ }
+ elsif ($line =~ /^${EOL}$/) {
+ last;
+ }
+ }
+
+ close $remote;
+
+ return \%metrics;
+
+}
+
+
+sub _do_control_command {
+ my ($self, $remote, $msg) = @_;
+
+ local $SIG{PIPE} = 'IGNORE';
+ my %res;
+
+ $res{error_code} = 0;
+
+ # Read greeting first
+ if (defined (my $greeting = <$remote>)) {
+ if ($greeting !~ /^Rspamd version/) {
+ $res{error} = "Not rspamd greeting line $greeting";
+ $res{error_code} = 500;
+ return \%res;
+ }
+ }
+
+ if ($self->{'command'} =~ /^learn$/i) {
+ if (!$self->{'statfile'}) {
+ $res{error} = "Statfile is not specified to learn command";
+ $res{error_code} = 500;
+ return \%res;
+ }
+
+ if ($self->_auth ($remote)) {
+ my $len = length ($msg);
+ syswrite $remote, "learn $self->{statfile} $len -w $self->{weight}" . $EOL;
+ syswrite $remote, $input . $EOL;
+ return undef unless $self->_get_io_readiness($remote, 0);
+ if (defined (my $reply = <$remote>)) {
+ if ($reply =~ /^learn ok, sum weight: ([0-9.]+)/) {
+ $res{error} = "Learn succeed. Sum weight: $1\n";
+ return \%res;
+ }
+ else {
+ $res{error_code} = 500;
+ $res{error} = "Learn failed\n";
+ return \%res;
+ }
+ }
+ }
+ else {
+ $res{error_code} = 403;
+ $res{error} = "Authentication failed\n";
+ return \%res;
+ }
+ }
+ elsif ($self->{'command'} =~ /^weights$/i) {
+ if (!$self->{'statfile'}) {
+ $res{error_code} = 500;
+ $res{error} = "Statfile is not specified to weights command";
+ return \%res;
+ }
+
+ my $len = length ($input);
+ $res{error} = "Sending $len bytes...\n";
+ syswrite $remote, "weights $self->{'statfile'} $len" . $EOL;
+ syswrite $remote, $input . $EOL;
+ return undef unless $self->_get_io_readiness($remote, 0);
+ while (defined (my $reply = <$remote>)) {
+ last if $reply =~ /^END/;
+ $res{error} .= $reply;
+ }
+ }
+ elsif ($self->{'command'} =~ /(reload|shutdown)/i) {
+ if ($self->_auth ($remote)) {
+ syswrite $remote, $self->{'command'} . $EOL;
+ while (defined (my $line = <$remote>)) {
+ last if $line =~ /^END/;
+ $res{error} .= $line;
+ }
+ }
+ else {
+ $res{error_code} = 403;
+ $res{error} = "Authentication failed\n";
+ return \%res;
+ }
+ }
+ elsif ($self->{'command'} =~ /(fuzzy_add|fuzzy_del)/i) {
+ if ($self->_auth ($remote)) {
+ my $len = length ($input);
+ syswrite $remote, $self->{'command'} . " $len $self->{'weight'}" . $EOL;
+ syswrite $remote, $input . $EOL;
+ return undef unless $self->_get_io_readiness($remote, 0);
+ if (defined (my $reply = <$remote>)) {
+ if ($reply =~ /^OK/) {
+ $res{error} = $self->{'command'} . " succeed\n";
+ return \%res;
+ }
+ else {
+ $res{error_code} = 500;
+ $res{error} = $self->{'command'} . " failed\n";
+ return \%res;
+ }
+ }
+ }
+ else {
+ $res{error_code} = 403;
+ $res{error} = "Authentication failed\n";
+ return \%res;
+ }
+
+ }
+ else {
+ syswrite $remote, $self->{'command'} . $EOL;
+ while (defined (my $line = <$remote>)) {
+ last if $line =~ /^END/;
+ $res{error} .= $line;
+ }
+ }
+
+ return \%res;
+}
+
+sub _process_file {
+ my $self = shift;
+ my $file = shift;
+
+ open(FILE, "< $file") or return;
+
+ my $input;
+ while (defined (my $line = <FILE>)) {
+ $input .= $line;
+ }
+
+ close FILE;
+ $self->do_all_cmd ($input);
+}
+
+sub _process_directory {
+ my $self = shift;
+ my $dir = shift;
+
+ opendir (DIR, $dir) or return;
+
+ while (defined (my $file = readdir (DIR))) {
+ $file = "$dir/$file";
+ if (-f $file) {
+ $self->_process_file ($file);
+ }
+ }
+ closedir (DIR);
+}
+
+sub _check_imap_reply {
+ my $self = shift;
+ my $sock = shift;
+ my $seq = shift;
+
+ my $input;
+
+ while (defined ($input = <$sock>)) {
+ chomp $input;
+ if ($input =~ /BAD|NO (.+)$/) {
+ $_[0] = $1;
+ return 0;
+ }
+ next if ($input =~ /^\*/);
+ if ($input =~ /^$seq OK/) {
+ return 1;
+ }
+
+ $_[0] = $input;
+ return 0;
+ }
- $self;
+ $_[0] = "timeout";
+
+ return 0;
}
+sub _parse_imap_body {
+ my $self = shift;
+ my $sock = shift;
+ my $seq = shift;
+ my $input;
+ my $got_body = 0;
+
+ while (defined (my $line = <$sock>)) {
+ if (!$got_body && $line =~ /^\*/) {
+ $got_body = 1;
+ next;
+ }
+ if ($line =~ /^$seq OK/) {
+ return $input;
+ }
+ elsif ($got_body) {
+ $input .= $line;
+ next;
+ }
+
+ return undef;
+ }
+
+ return undef;
+
+}
+
+sub _parse_imap_sequences {
+ my $self = shift;
+ my $sock = shift;
+ my $seq = shift;
+ my $input;
+
+ while (defined ($input = <$sock>)) {
+ chomp $input;
+ if ($input =~ /^\* SEARCH (.+)$/) {
+ @res = split (/\s/, $1);
+ next;
+ }
+ elsif ($input =~ /^$seq OK/) {
+ return \@res;
+ }
+ return undef;
+ }
+
+}
+
+sub process_imap {
+ my ($self, $ssl, $user, $password, $host, $mbox) = @_;
+ my $seq = 1;
+ my $sock;
+
+ if (!$password) {
+ eval {
+ use Term::ReadKey;
+ $self->{error} = "Enter IMAP password: ";
+ ReadMode 'noecho';
+ $password = ReadLine 0;
+ chomp $password;
+ ReadMode 'normal';
+ $self->{error} = "\n";
+ } or die "cannot get password. Check that Term::ReadKey is installed";
+ }
+
+ # Stupid code that does not take care of timeouts etc, just trying to extract messages
+ if ($ssl) {
+ $sock = $self->_make_ssl_socket ($host, 'imaps');
+ }
+ else {
+ $sock = $self->_make_tcp_socket ($host, 143);
+ }
+ my $reply = <$sock>;
+ if (!defined ($reply) || $reply !~ /^\* OK/) {
+ $self->{error} = "Imap server is not ready";
+ return;
+ }
+ syswrite $sock, "$seq LOGIN $user $password$EOL";
+ if (!$self->_check_imap_reply ($sock, $seq, $reply)) {
+ $self->{error} = "Cannot login to imap server: $reply";
+ return;
+ }
+ $seq ++;
+ syswrite $sock, "$seq SELECT $mbox$EOL";
+ if (!$self->_check_imap_reply ($sock, $seq, $reply)) {
+ $self->{error} = "Cannot select mbox $mbox: $reply";
+ return;
+ }
+ $seq ++;
+ syswrite $sock, "$seq SEARCH $self->{imap_search}$EOL";
+ my $messages;
+ if (!defined ($messages = $self->_parse_imap_sequences ($sock, $seq))) {
+ $self->{error} = "Cannot make search";
+ return;
+ }
+ $seq ++;
+ foreach my $message (@{ $messages }){
+ syswrite $sock, "$seq FETCH $message body[]$EOL";
+ if (defined (my $input = $self->_parse_imap_body ($sock, $seq))) {
+ $self->do_all_cmd ($input);
+ }
+ $seq ++;
+ }
+ syswrite $sock, "$seq LOGOUT$EOL";
+ close $sock;
+}
+
+=head2 process_item
+
+public instance (\%) process_item (String $item)
+
+Description:
+Do specified command for a single file, path or IMAP folder
+
+The return value is a hash reference containing results of each command for each server from cluster
+
+=cut
+
+sub process_item {
+ my $item = shift;
+
+ if (defined ($item)) {
+ if ($item =~ qr|^imap(s?):user:([^:]+):password:([^:]*):host:([^:]+):mbox:(.+)$|) {
+ return $self->_process_imap ($1, $2, $3, $4, $5);
+ }
+ elsif (-f $item) {
+ return $self->_process_file ($item);
+ }
+ elsif (-d $item) {
+ return $self->_process_directory ($item);
+ }
+ else {
+ warn "urecognized argument: $item";
+ }
+ }
+ undef;
+}
+
+=head2 process_path
+
+public instance (\%) process_path ()
+
+Description:
+Do specified command for each file in path or message in IMAP folder
+
+The return value is a hash reference containing results of each command for each server from cluster
+
+=cut
+sub process_path {
+ my %res;
+ foreach (@_) {
+ $res{$_} = $self->process_item($_);
+ }
+
+ return \%res;
+}
+
+=head2 do_all_cmd
+
+public instance (\%) do_all_cmd (String $msg)
+
+Description:
+This method makes a call to the the whole rspamd cluster and call specified command
+(in $self->{command}).
+
+The return value is a hash reference containing results of each command for each server from cluster
+
+=cut
+
+sub do_all_cmd {
+ my ($self, $input) = @_;
+
+ my %res;
+
+ foreach my $hostdef (@{ $self->{'hosts'} }) {
+ $self->_clear_errors();
+
+ my $remote = $self->_create_connection($hostdef);
+
+ if (! $remote) {
+ $res{$hostdef}->{error} = "Cannot connect to $hostdef";
+ }
+ else {
+ if ($self->{'control'}) {
+ $res{$hostdef} = $self->_do_control_command ($remote, $input);
+ }
+ else {
+ $res{$hostdef} = $self->_do_rspamc_command ($remote, $input);
+ }
+ }
+ }
+
+ return \%res;
+}
+
+
=head2 check
public instance (\%) check (String $msg)
@@ -117,107 +603,109 @@ symbols - array of symbols
=cut
sub check {
- my ($self, $msg) = @_;
+ my ($self, $msg) = @_;
+
+ $self->{command} = 'CHECK';
- my %metrics;
+ return $self->_do_rspamc_command ($self, $msg);
+}
- my $command = 'SYMBOLS';
+=head2 symbols
- $self->_clear_errors();
+public instance (\%) symbols (String $msg)
- my $remote = $self->_create_connection();
+Description:
+This method makes a call to the spamd server
- return 0 unless ($remote);
+The return value is a hash reference containing metrics indexed by name. Each metric
+is hash that contains data:
- my $msgsize = length($msg.$EOL);
+isspam
- local $SIG{PIPE} = 'IGNORE';
+score
- if (!(syswrite($remote, "$command $PROTOVERSION$EOL"))) {
- $self->_mark_dead($remote);
- return 0;
- }
- syswrite $remote, "Content-length: $msgsize$EOL";
- syswrite $remote, "User: $self->{username}$EOL" if ($self->{username});
- syswrite $remote, "From: $self->{from}$EOL" if ($self->{from});
- syswrite $remote, "IP: $self->{ip}$EOL" if ($self->{ip});
- syswrite $remote, "Subject: $self->{subject}$EOL" if ($self->{subject});
- if (ref $self->{rcpt} eq "ARRAY") {
- foreach ($self->{rcpt}) {
- syswrite $remote, "Rcpt: $_ $EOL";
- }
- }
- syswrite $remote, $EOL;
- syswrite $remote, $msg;
- syswrite $remote, $EOL;
-
- return undef unless $self->_get_io_readiness($remote, 0);
-
- my ($in, $res);
- my $offset = 0;
- do {
- $res = sysread($remote, $in, 512, $offset);
- if ($res > 0 && $res < 512) {
- $self->_get_io_readiness($remote, 0);
- }
- $offset += $res;
- } while ($res > 0);
-
- my ($version, $resp_code, $resp_msg) = $self->_parse_response_line($in);
-
- $self->{resp_code} = $resp_code;
- $self->{resp_msg} = $resp_msg;
-
- return undef unless (defined($resp_code) && $resp_code == 0);
-
- my $cur_metric;
- my @lines = split (/^/, $in);
- foreach my $line (@lines) {
- if ($line =~ m!Metric: (\S+); (\S+); (\S+) / (\S+)!) {
- $metrics{$1} = {
- isspam => $2,
- score => $3 + 0,
- threshold => $4 + 0,
- symbols => [],
- };
- $cur_metric = $1;
- }
- elsif ($line =~ /^Symbol: (\S+)/ && $cur_metric) {
- my $symref = $metrics{$cur_metric}->{'symbols'};
- push(@$symref, $1);
- }
- elsif ($line =~ /^${EOL}$/) {
- last;
- }
- }
+threshold
+
+symbols - array of symbols
+
+=cut
- close $remote;
+sub symbols {
+ my ($self, $msg) = @_;
+
+ $self->{command} = 'SYMBOLS';
- return \%metrics;
+ return $self->_do_rspamc_command ($self, $msg);
}
-sub _auth {
- my ($self, $sock) = @_;
+=head2 process
- local $SIG{PIPE} = 'IGNORE';
+public instance (\%) process (String $msg)
- if (!(syswrite($sock, "PASSWORD $self->{password}$EOL"))) {
- $self->_mark_dead($remote);
- return 0;
- }
+Description:
+This method makes a call to the spamd server
- return 0 unless $self->_get_io_readiness($sock, 0);
+The return value is a hash reference containing metrics indexed by name. Each metric
+is hash that contains data:
- if (sysread($sock, $reply, 255)) {
- if ($reply =~ /^password accepted/) {
- return 1;
- }
- }
+isspam
- return 0;
-
+score
+
+threshold
+
+symbols - array of symbols
+
+=cut
+sub process {
+ my ($self, $msg) = @_;
+
+ $self->{command} = 'PROCESS';
+
+ return $self->_do_rspamc_command ($self, $msg);
}
+=head2 emails
+
+public instance (\%) emails (String $msg)
+
+Description:
+This method makes a call to the spamd server
+
+The return value is a hash reference containing metrics indexed by name. Each metric
+is hash that contains data:
+
+emails - list of all emails in message
+=cut
+sub emails {
+ my ($self, $msg) = @_;
+
+ $self->{command} = 'EMAILS';
+
+ return $self->_do_rspamc_command ($self, $msg);
+}
+
+=head2 urls
+
+public instance (\%) urls (String $msg)
+
+Description:
+This method makes a call to the spamd server
+
+The return value is a hash reference containing metrics indexed by name. Each metric
+is hash that contains data:
+
+urls - list of all urls in message
+=cut
+sub urls {
+ my ($self, $msg) = @_;
+
+ $self->{command} = 'URLS';
+
+ return $self->_do_rspamc_command ($self, $msg);
+}
+
+
=head2 learn
public instance (\%) check (String $msg, String $statfile, Boolean in_class)
@@ -228,42 +716,56 @@ This method makes a call to the spamd learning a statfile with message.
=cut
sub learn {
- my ($self, $msg, $statfile, $in_class) = @_;
-
- my %metrics;
+ my ($self, $msg) = @_;
+
+ $self->{command} = 'LEARN';
- my $command = 'LEARN';
+ return $self->_do_control_command ($self, $msg);
+}
- $self->_clear_errors();
+sub weights {
+ my ($self, $msg) = @_;
+
+ $self->{command} = 'WEIGHTS';
- my $remote = $self->_create_connection();
+ return $self->_do_control_command ($self, $msg);
+}
- return 0 unless ($remote);
+sub fuzzy_add {
+ my ($self, $msg) = @_;
+
+ $self->{command} = 'FUZZY_ADD';
- return 0 unless $self->_auth($remote);
+ return $self->_do_control_command ($self, $msg);
+}
+sub fuzzy_del {
+ my ($self, $msg) = @_;
+
+ $self->{command} = 'FUZZY_DEL';
- my $msgsize = length($msg.$EOL);
+ return $self->_do_control_command ($self, $msg);
+}
- local $SIG{PIPE} = 'IGNORE';
+sub stat {
+ my ($self) = @_;
+
+ $self->{command} = 'STAT';
- if (!(syswrite ($remote, "$command $statfile $msgsize$EOL"))) {
- $self->_mark_dead($remote);
- return 0;
- }
+ return $self->_do_control_command ($self, undef);
+}
+sub uptime {
+ my ($self) = @_;
+
+ $self->{command} = 'UPTIME';
- syswrite($remote, $msg);
- syswrite($remote, $EOL);
-
- return undef unless $self->_get_io_readiness($remote, 0);
- if (sysread ($remote, $reply, 255)) {
- if ($reply =~ /^learn ok/) {
- close $remote;
- return 1;
- }
- }
+ return $self->_do_control_command ($self, undef);
+}
+sub counters {
+ my ($self) = @_;
+
+ $self->{command} = 'UPTIME';
- close $remote;
- return 0;
+ return $self->_do_control_command ($self, undef);
}
=head2 ping
@@ -277,29 +779,29 @@ if the server responded correctly.
=cut
sub ping {
- my ($self) = @_;
+ my ($self) = @_;
- my $remote = $self->_create_connection();
+ my $remote = $self->_create_connection();
- return 0 unless ($remote);
- local $SIG{PIPE} = 'IGNORE';
+ return 0 unless ($remote);
+ local $SIG{PIPE} = 'IGNORE';
- if (!(syswrite($remote, "PING $PROTOVERSION$EOL"))) {
- $self->_mark_dead($remote);
- return 0;
- }
- syswrite($remote, $EOL);
+ if (!(syswrite($remote, "PING $PROTOVERSION$EOL"))) {
+ $self->_mark_dead($remote);
+ return 0;
+ }
+ syswrite($remote, $EOL);
- return undef unless $self->_get_io_readiness($remote, 0);
+ return undef unless $self->_get_io_readiness($remote, 0);
my $line;
- sysread ($remote, $line, 255);
- close $remote;
- return undef unless $line;
+ sysread ($remote, $line, 255);
+ close $remote;
+ return undef unless $line;
- my ($version, $resp_code, $resp_msg) = $self->_parse_response_line($line);
- return 0 unless (defined($resp_msg) && $resp_msg eq 'PONG');
+ my ($version, $resp_code, $resp_msg) = $self->_parse_response_line($line);
+ return 0 unless (defined($resp_msg) && $resp_msg eq 'PONG');
- return 1;
+ return 1;
}
=head1 PRIVATE METHODS
@@ -317,117 +819,207 @@ On failure, it sets an internal error code and returns undef.
=cut
sub _create_connection {
- my ($self) = @_;
-
- my $remote;
- my $tries = 0;
-
- if ($self->{socketpath}) {
- $remote = IO::Socket::UNIX->new( Peer => $self->{socketpath},
- Type => SOCK_STREAM,
- Blocking => 0,
- );
- # Get write readiness
- if ($self->_get_io_readiness($remote, 1) == 0) {
- print "Connection timed out: $!\n";
- return undef;
+ my ($self, $hostdef) = @_;
+
+ my $remote;
+ my $tries = 0;
+
+ if (!defined ($hostdef)) {
+ my $server;
+
+ do {
+ $server = $self->_select_server();
+ $tries ++;
+
+ $remote = IO::Socket::INET->new( Proto => "tcp",
+ PeerAddr => $server->{host},
+ PeerPort => $server->{port},
+ Blocking => 0,
+ );
+ # Get write readiness
+ if (defined ($remote)) {
+ if ($self->_get_io_readiness($remote, 1) != 0) {
+ return $remote;
+ }
+ else {
+ close ($remote);
+ }
+ }
+ } while ($tries < 5);
+
+ return undef unless $server;
+ }
+
+ if ($hostdef =~ /^\//) {
+ if (! socket ($remote, PF_UNIX, SOCK_STREAM, 0)) {
+ print "Cannot create unix socket\n";
+ return undef;
+ }
+ my $sun = sockaddr_un($hostdef);
+ if (!connect ($remote, $sun)) {
+ print "Cannot connect to socket $hostdef\n";
+ close $remote;
+ return undef;
+ }
}
- }
- else {
- my $server;
-
- do {
- $server = $self->_select_server();
- $tries ++;
-
- $remote = IO::Socket::INET->new( Proto => "tcp",
- PeerAddr => $server->{host},
- PeerPort => $server->{port},
- Blocking => 0,
- );
- # Get write readiness
- if ($self->_get_io_readiness($remote, 1) != 0) {
- return $remote;
- }
- } while ($tries < 5);
-
- return undef unless $server;
- }
- unless ($remote) {
- print "Failed to create connection to spamd daemon: $!\n";
- return undef;
- }
- $remote;
+ elsif ($hostdef =~ /^\s*(([^:]+):(\d+))\s*$/) {
+ $remote = IO::Socket::INET->new( Proto => "tcp",
+ PeerAddr => $server->{host},
+ PeerPort => $server->{port},
+ Blocking => 0,
+ );
+ # Get write readiness
+ if (defined ($remote)) {
+ if ($self->_get_io_readiness($remote, 1) != 0) {
+ return $remote;
+ }
+ else {
+ close ($remote);
+ return undef;
+ }
+ }
+ }
+
+
+ unless ($remote) {
+ $self->{error} = "Failed to create connection to spamd daemon: $!\n";
+ return undef;
+ }
+ $remote;
+}
+
+=head2 _auth
+
+private instance (IO::Socket) _auth (Socket sock)
+
+Description:
+This method do control auth.
+
+On failure this method returns 0
+
+=cut
+sub _auth {
+ my ($self, $sock) = @_;
+
+ local $SIG{PIPE} = 'IGNORE';
+
+ if (!(syswrite($sock, "PASSWORD $self->{password}$EOL"))) {
+ $self->_mark_dead($remote);
+ return 0;
+ }
+
+ return 0 unless $self->_get_io_readiness($sock, 0);
+
+ if (sysread($sock, $reply, 255)) {
+ if ($reply =~ /^password accepted/) {
+ return 1;
+ }
+ }
+
+ return 0;
+
}
+=head2 _revive_dead
+
+private instance (IO::Socket) _revive_dead ()
+
+Description:
+This method marks dead upstreams as alive
+
+=cut
sub _revive_dead {
- my ($self) = @_;
-
- my $now = time();
- foreach my $s ($self->{dead_hosts}) {
- # revive after minute of downtime
- if (defined($s->{dead}) && $s->{dead} == 1 && $now - $s->{t} > 60) {
- $s->{dead} = 0;
- push(@{$self->{alive_hosts}}, $s->{host});
- }
- }
+ my ($self) = @_;
+
+ my $now = time();
+ foreach my $s ($self->{dead_hosts}) {
+ # revive after minute of downtime
+ if (defined($s->{dead}) && $s->{dead} == 1 && $now - $s->{t} > 60) {
+ $s->{dead} = 0;
+ push(@{$self->{alive_hosts}}, $s->{host});
+ }
+ }
1;
}
+=head2 _select_server
+
+private instance (IO::Socket) _select_server ()
+
+Description:
+This method returns one server from rspamd cluster or undef if there are no suitable ones
+
+=cut
sub _select_server {
- my($self) = @_;
-
- $self->_revive_dead();
- my $alive_num = scalar(@{$self->{alive_hosts}});
- if (!$alive_num) {
- $self->{alive_hosts} = $self->{hosts};
- $self->{dead_hosts} = ();
- $alive_num = scalar($self->{alive_hosts});
- }
-
- my $selected = $self->{alive_hosts}[int(rand($alive_num))];
- if ($selected =~ /^(\S+):(\d+)$/) {
- my $server = {
- host => $1,
- port => $2,
- };
- return $server;
- }
+ my($self) = @_;
+
+ $self->_revive_dead();
+ my $alive_num = scalar(@{$self->{alive_hosts}});
+ if (!$alive_num) {
+ $self->{alive_hosts} = $self->{hosts};
+ $self->{dead_hosts} = ();
+ $alive_num = scalar($self->{alive_hosts});
+ }
+
+ my $selected = $self->{alive_hosts}[int(rand($alive_num))];
+ if ($selected =~ /^(\S+):(\d+)$/) {
+ my $server = {
+ host => $1,
+ port => $2,
+ };
+ return $server;
+ }
- undef;
+ undef;
}
+=head2 _select_server
+
+private instance (IO::Socket) _mark_dead (String server)
+
+Description:
+This method marks upstream as dead for some time. It can be revived by _revive_dead method
+
+=cut
sub _mark_dead {
- my ($self, $server) = @_;
-
- my $now = time();
- $self->{dead_hosts}->{$server} = {
- host => $server,
- dead => 1,
- t => $now,
- };
- for (my $i = 0; $i < scalar (@{$self->{alive_hosts}}); $i ++) {
- if ($self->{alive_hosts} == $server) {
- splice(@{$self->{alive_hosts}}, $i, 1);
- last;
- }
- }
+ my ($self, $server) = @_;
+
+ my $now = time();
+ $self->{dead_hosts}->{$server} = {
+ host => $server,
+ dead => 1,
+ t => $now,
+ };
+ for (my $i = 0; $i < scalar (@{$self->{alive_hosts}}); $i ++) {
+ if ($self->{alive_hosts} == $server) {
+ splice(@{$self->{alive_hosts}}, $i, 1);
+ last;
+ }
+ }
}
+=head2 _get_io_readiness
+
+private instance (IO::Socket) _mark_dead (String server)
+
+Description:
+This method marks upstream as dead for some time. It can be revived by _revive_dead method
+
+=cut
sub _get_io_readiness {
- my ($self, $sock, $is_write) = @_;
+ my ($self, $sock, $is_write) = @_;
my $w = '';
vec($w, fileno($sock), 1) = 1;
- if ($is_write) {
- return select(undef, $w, undef, $self->{timeout});
- }
- else {
- return select($w, undef,undef, $self->{timeout});
- }
-
+ if ($is_write) {
+ return select(undef, $w, undef, $self->{timeout});
+ }
+ else {
+ return select($w, undef,undef, $self->{timeout});
+ }
+
undef;
}
@@ -445,10 +1037,10 @@ with the response line.
=cut
sub _parse_response_line {
- my ($self, $line) = @_;
+ my ($self, $line) = @_;
- $line =~ s/\r?\n$//;
- return split(/\s+/, $line, 3);
+ $line =~ s/\r?\n$//;
+ return split(/\s+/, $line, 3);
}
=head2 _clear_errors
@@ -461,12 +1053,11 @@ This method clears out any current errors.
=cut
sub _clear_errors {
- my ($self) = @_;
+ my ($self) = @_;
- $self->{resp_code} = undef;
- $self->{resp_msg} = undef;
+ $self->{resp_code} = undef;
+ $self->{resp_msg} = undef;
+ $self->{error} = undef;
}
1;
-
-