diff options
author | cebka@lenovo-laptop <cebka@lenovo-laptop> | 2010-03-03 19:34:51 +0300 |
---|---|---|
committer | cebka@lenovo-laptop <cebka@lenovo-laptop> | 2010-03-03 19:34:51 +0300 |
commit | 9097fc043e38a27241381e383b97e00254fbf711 (patch) | |
tree | 39dd2467542a289d22ad43daa6f80c572e8262e6 | |
parent | dda6a0c72bcfc98fd151df83f99cd3fbebb4ba76 (diff) | |
download | rspamd-9097fc043e38a27241381e383b97e00254fbf711.tar.gz rspamd-9097fc043e38a27241381e383b97e00254fbf711.zip |
* New Mail::Rspamd::Client
things TODO:
- improve interaction with rspamc
- improve documentation
- test all features
- test clustering
- write CGI front-end
-rw-r--r-- | CMakeLists.txt | 10 | ||||
-rw-r--r-- | perl/lib/Mail/Rspamd/Client.pm | 1121 | ||||
-rwxr-xr-x | rspamc.pl.in | 490 |
3 files changed, 899 insertions, 722 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 22d2c21f0..dc6158b08 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -489,14 +489,13 @@ IF(PERL_EXECUTABLE) COMMAND ${PERL_EXECUTABLE} ./Makefile.PL WORKING_DIRECTORY perl) CONFIGURE_FILE(perl/Makefile.PL.in perl/Makefile.PL) -ENDIF(PERL_EXECUTABLE) -IF(ENABLE_PERL MATCHES "ON") - ADD_CUSTOM_TARGET(perlxs + ADD_CUSTOM_TARGET(perlmodule COMMAND make DEPENDS perl/Makefile WORKING_DIRECTORY perl VERBATIM) -ENDIF(ENABLE_PERL MATCHES "ON") + +ENDIF(PERL_EXECUTABLE) CONFIGURE_FILE(config.h.in src/config.h) CONFIGURE_FILE(rspamc.pl.in rspamc.pl @ONLY) @@ -520,6 +519,9 @@ IF(ENABLE_PERL MATCHES "ON") ADD_DEPENDENCIES(rspamd perlxs ${LEX_OUTPUT} ${YACC_OUTPUT}) ENDIF(ENABLE_PERL MATCHES "ON") +IF(PERL_EXECUTABLE) + ADD_DEPENDENCIES(rspamd perlmodule) +ENDIF(PERL_EXECUTABLE) IF(ENABLE_LUA MATCHES "ON") TARGET_LINK_LIBRARIES(rspamd rspamd_lua) diff --git a/perl/lib/Mail/Rspamd/Client.pm b/perl/lib/Mail/Rspamd/Client.pm index 1a1d6d095..d850a093a 100644 --- a/perl/lib/Mail/Rspamd/Client.pm +++ b/perl/lib/Mail/Rspamd/Client.pm @@ -6,12 +6,10 @@ Mail::Rspamd::Client - Client for rspamd Protocol =head1 SYNOPSIS - my $client = new Mail::Rspamd::Client({port => 11333, - host => 'localhost', - ip => '127.0.0.1'}); + my $client = new Mail::Rspamd::Client($config); if ($client->ping()) { - print "Ping is ok\n"; + $self->{error} = "Ping is ok\n"; } my $result = $client->check($testmsg); @@ -32,7 +30,7 @@ package Mail::Rspamd::Client; use IO::Socket; use vars qw($VERSION); -$VERSION = "1.00"; +$VERSION = "1.01"; my $EOL = "\015\012"; my $BLANK = $EOL x 2; @@ -50,51 +48,539 @@ This method creates a new Mail::Rspamd::Client object. =cut sub new { - my ($class, $args) = @_; + my ($class, $args) = @_; + + $class = ref($class) || $class; + + my $self = {}; + + # with a sockets_path set then it makes no sense to set host and port + if ($args->{hosts}) { + $self->{hosts} = $args->{hosts}; + $self->{alive_hosts} = $self->{hosts}; + } + + if ($args->{username}) { + $self->{username} = $args->{username}; + } + if ($args->{ip}) { + $self->{ip} = $args->{ip}; + } + if ($args->{from}) { + $self->{from} = $args->{from}; + } + if ($args->{subject}) { + $self->{subject} = $args->{subject}; + } + if ($args->{rcpt}) { + $self->{rcpt} = $args->{rcpt}; + } + if ($args->{timeout}) { + $self->{timeout} = $args->{timeout}; + } + else { + $self->{timeout} = 5; + } + if ($args->{password}) { + $self->{password} = $args->{password}; + } + if ($args->{statfile}) { + $self->{statfile} = $args->{statfile}; + } + if ($args->{weight}) { + $self->{weight} = $args->{weight}; + } + else { + $self->{weight} = 1; + } + if ($args->{imap_search}) { + $self->{imap_search} = $args->{imap_search}; + } + else { + $self->{imap_search} = 'ALL'; + } + + if ($args->{command}) { + if ($args->{command} =~ /(SYMBOLS|PROCESS|CHECK|URLS|EMAILS)/i) { + $self->{'command'} = $1; + $self->{'control'} = 0; + } + elsif ($args->{command} =~ /(STAT|LEARN|SHUTDOWN|RELOAD|UPTIME|COUNTERS|FUZZY_ADD|FUZZY_DEL|WEIGHTS)/i) { + $self->{'command'} = $1; + $self->{'control'} = 1; + } + } - $class = ref($class) || $class; + $self->{error} = ""; - my $self = {}; + bless($self, $class); - # with a sockets_path set then it makes no sense to set host and port - if ($args->{socketpath}) { - $self->{socketpath} = $args->{socketpath}; - } - else { - $self->{hosts} = $args->{hosts}; - $self->{alive_hosts} = $self->{hosts}; - } + $self; +} - if ($args->{username}) { - $self->{username} = $args->{username}; - } - if ($args->{ip}) { - $self->{ip} = $args->{ip}; - } - if ($args->{from}) { - $self->{from} = $args->{from}; - } - if ($args->{subject}) { - $self->{subject} = $args->{subject}; - } - if ($args->{rcpt}) { - $self->{rcpt} = $args->{rcpt}; - } - if ($args->{timeout}) { - $self->{timeout} = $args->{timeout}; - } - else { - $self->{timeout} = 5; - } - if ($args->{password}) { - $self->{password} = $args->{password}; - } - bless($self, $class); +sub make_ssl_socket { + my ($host, $port) = @_; + + eval { + use IO::Socket::SSL; + } or $self->{error} = "IO::Socket::SSL required for imaps"; + + return IO::Socket::SSL->new("$host:$port"); +} + + +# Currently just read stdin for user's message and pass it to rspamd +sub _do_rspamc_command { + my ($self, $remote, $msg) = @_; + + my %metrics; + + + my $msgsize = length($msg.$EOL); + + local $SIG{PIPE} = 'IGNORE'; + + if (!(syswrite($remote, "$self->{command} $PROTOVERSION$EOL"))) { + $self->_mark_dead($remote); + return 0; + } + syswrite $remote, "Content-length: $msgsize$EOL"; + syswrite $remote, "User: $self->{username}$EOL" if ($self->{username}); + syswrite $remote, "From: $self->{from}$EOL" if ($self->{from}); + syswrite $remote, "IP: $self->{ip}$EOL" if ($self->{ip}); + syswrite $remote, "Subject: $self->{subject}$EOL" if ($self->{subject}); + if (ref $self->{rcpt} eq "ARRAY") { + foreach ($self->{rcpt}) { + syswrite $remote, "Rcpt: $_ $EOL"; + } + } + syswrite $remote, $EOL; + syswrite $remote, $msg; + syswrite $remote, $EOL; + + return undef unless $self->_get_io_readiness($remote, 0); + + my ($in, $res); + my $offset = 0; + do { + $res = sysread($remote, $in, 512, $offset); + if ($res > 0 && $res < 512) { + $self->_get_io_readiness($remote, 0); + } + $offset += $res; + } while ($res > 0); + + my ($version, $resp_code, $resp_msg) = $self->_parse_response_line($in); + + $self->{resp_code} = $resp_code; + $self->{resp_msg} = $resp_msg; + + return undef unless (defined($resp_code) && $resp_code == 0); + + my $cur_metric; + my @lines = split (/^/, $in); + foreach my $line (@lines) { + if ($line =~ m!Metric: (\S+); (\S+); (\S+) / (\S+)!) { + $metrics{$1} = { + isspam => $2, + score => $3 + 0, + threshold => $4 + 0, + symbols => [], + urls => [], + messages => [], + }; + $cur_metric = $1; + } + elsif ($line =~ /^Symbol: (\S+)/ && $cur_metric) { + my $symref = $metrics{$cur_metric}->{'symbols'}; + push(@$symref, $1); + } + elsif ($line =~ /^Urls: (\S+)/ && $cur_metric) { + @{ $metrics{$cur_metric}->{'urls'} } = split /\s/, $1; + } + elsif ($line =~ /^Message: (\S+)/ && $cur_metric) { + my $symref = $metrics{$cur_metric}->{'messages'}; + push(@$symref, $1); + } + elsif ($line =~ /^${EOL}$/) { + last; + } + } + + close $remote; + + return \%metrics; + +} + + +sub _do_control_command { + my ($self, $remote, $msg) = @_; + + local $SIG{PIPE} = 'IGNORE'; + my %res; + + $res{error_code} = 0; + + # Read greeting first + if (defined (my $greeting = <$remote>)) { + if ($greeting !~ /^Rspamd version/) { + $res{error} = "Not rspamd greeting line $greeting"; + $res{error_code} = 500; + return \%res; + } + } + + if ($self->{'command'} =~ /^learn$/i) { + if (!$self->{'statfile'}) { + $res{error} = "Statfile is not specified to learn command"; + $res{error_code} = 500; + return \%res; + } + + if ($self->_auth ($remote)) { + my $len = length ($msg); + syswrite $remote, "learn $self->{statfile} $len -w $self->{weight}" . $EOL; + syswrite $remote, $input . $EOL; + return undef unless $self->_get_io_readiness($remote, 0); + if (defined (my $reply = <$remote>)) { + if ($reply =~ /^learn ok, sum weight: ([0-9.]+)/) { + $res{error} = "Learn succeed. Sum weight: $1\n"; + return \%res; + } + else { + $res{error_code} = 500; + $res{error} = "Learn failed\n"; + return \%res; + } + } + } + else { + $res{error_code} = 403; + $res{error} = "Authentication failed\n"; + return \%res; + } + } + elsif ($self->{'command'} =~ /^weights$/i) { + if (!$self->{'statfile'}) { + $res{error_code} = 500; + $res{error} = "Statfile is not specified to weights command"; + return \%res; + } + + my $len = length ($input); + $res{error} = "Sending $len bytes...\n"; + syswrite $remote, "weights $self->{'statfile'} $len" . $EOL; + syswrite $remote, $input . $EOL; + return undef unless $self->_get_io_readiness($remote, 0); + while (defined (my $reply = <$remote>)) { + last if $reply =~ /^END/; + $res{error} .= $reply; + } + } + elsif ($self->{'command'} =~ /(reload|shutdown)/i) { + if ($self->_auth ($remote)) { + syswrite $remote, $self->{'command'} . $EOL; + while (defined (my $line = <$remote>)) { + last if $line =~ /^END/; + $res{error} .= $line; + } + } + else { + $res{error_code} = 403; + $res{error} = "Authentication failed\n"; + return \%res; + } + } + elsif ($self->{'command'} =~ /(fuzzy_add|fuzzy_del)/i) { + if ($self->_auth ($remote)) { + my $len = length ($input); + syswrite $remote, $self->{'command'} . " $len $self->{'weight'}" . $EOL; + syswrite $remote, $input . $EOL; + return undef unless $self->_get_io_readiness($remote, 0); + if (defined (my $reply = <$remote>)) { + if ($reply =~ /^OK/) { + $res{error} = $self->{'command'} . " succeed\n"; + return \%res; + } + else { + $res{error_code} = 500; + $res{error} = $self->{'command'} . " failed\n"; + return \%res; + } + } + } + else { + $res{error_code} = 403; + $res{error} = "Authentication failed\n"; + return \%res; + } + + } + else { + syswrite $remote, $self->{'command'} . $EOL; + while (defined (my $line = <$remote>)) { + last if $line =~ /^END/; + $res{error} .= $line; + } + } + + return \%res; +} + +sub _process_file { + my $self = shift; + my $file = shift; + + open(FILE, "< $file") or return; + + my $input; + while (defined (my $line = <FILE>)) { + $input .= $line; + } + + close FILE; + $self->do_all_cmd ($input); +} + +sub _process_directory { + my $self = shift; + my $dir = shift; + + opendir (DIR, $dir) or return; + + while (defined (my $file = readdir (DIR))) { + $file = "$dir/$file"; + if (-f $file) { + $self->_process_file ($file); + } + } + closedir (DIR); +} + +sub _check_imap_reply { + my $self = shift; + my $sock = shift; + my $seq = shift; + + my $input; + + while (defined ($input = <$sock>)) { + chomp $input; + if ($input =~ /BAD|NO (.+)$/) { + $_[0] = $1; + return 0; + } + next if ($input =~ /^\*/); + if ($input =~ /^$seq OK/) { + return 1; + } + + $_[0] = $input; + return 0; + } - $self; + $_[0] = "timeout"; + + return 0; } +sub _parse_imap_body { + my $self = shift; + my $sock = shift; + my $seq = shift; + my $input; + my $got_body = 0; + + while (defined (my $line = <$sock>)) { + if (!$got_body && $line =~ /^\*/) { + $got_body = 1; + next; + } + if ($line =~ /^$seq OK/) { + return $input; + } + elsif ($got_body) { + $input .= $line; + next; + } + + return undef; + } + + return undef; + +} + +sub _parse_imap_sequences { + my $self = shift; + my $sock = shift; + my $seq = shift; + my $input; + + while (defined ($input = <$sock>)) { + chomp $input; + if ($input =~ /^\* SEARCH (.+)$/) { + @res = split (/\s/, $1); + next; + } + elsif ($input =~ /^$seq OK/) { + return \@res; + } + return undef; + } + +} + +sub process_imap { + my ($self, $ssl, $user, $password, $host, $mbox) = @_; + my $seq = 1; + my $sock; + + if (!$password) { + eval { + use Term::ReadKey; + $self->{error} = "Enter IMAP password: "; + ReadMode 'noecho'; + $password = ReadLine 0; + chomp $password; + ReadMode 'normal'; + $self->{error} = "\n"; + } or die "cannot get password. Check that Term::ReadKey is installed"; + } + + # Stupid code that does not take care of timeouts etc, just trying to extract messages + if ($ssl) { + $sock = $self->_make_ssl_socket ($host, 'imaps'); + } + else { + $sock = $self->_make_tcp_socket ($host, 143); + } + my $reply = <$sock>; + if (!defined ($reply) || $reply !~ /^\* OK/) { + $self->{error} = "Imap server is not ready"; + return; + } + syswrite $sock, "$seq LOGIN $user $password$EOL"; + if (!$self->_check_imap_reply ($sock, $seq, $reply)) { + $self->{error} = "Cannot login to imap server: $reply"; + return; + } + $seq ++; + syswrite $sock, "$seq SELECT $mbox$EOL"; + if (!$self->_check_imap_reply ($sock, $seq, $reply)) { + $self->{error} = "Cannot select mbox $mbox: $reply"; + return; + } + $seq ++; + syswrite $sock, "$seq SEARCH $self->{imap_search}$EOL"; + my $messages; + if (!defined ($messages = $self->_parse_imap_sequences ($sock, $seq))) { + $self->{error} = "Cannot make search"; + return; + } + $seq ++; + foreach my $message (@{ $messages }){ + syswrite $sock, "$seq FETCH $message body[]$EOL"; + if (defined (my $input = $self->_parse_imap_body ($sock, $seq))) { + $self->do_all_cmd ($input); + } + $seq ++; + } + syswrite $sock, "$seq LOGOUT$EOL"; + close $sock; +} + +=head2 process_item + +public instance (\%) process_item (String $item) + +Description: +Do specified command for a single file, path or IMAP folder + +The return value is a hash reference containing results of each command for each server from cluster + +=cut + +sub process_item { + my $item = shift; + + if (defined ($item)) { + if ($item =~ qr|^imap(s?):user:([^:]+):password:([^:]*):host:([^:]+):mbox:(.+)$|) { + return $self->_process_imap ($1, $2, $3, $4, $5); + } + elsif (-f $item) { + return $self->_process_file ($item); + } + elsif (-d $item) { + return $self->_process_directory ($item); + } + else { + warn "urecognized argument: $item"; + } + } + undef; +} + +=head2 process_path + +public instance (\%) process_path () + +Description: +Do specified command for each file in path or message in IMAP folder + +The return value is a hash reference containing results of each command for each server from cluster + +=cut +sub process_path { + my %res; + foreach (@_) { + $res{$_} = $self->process_item($_); + } + + return \%res; +} + +=head2 do_all_cmd + +public instance (\%) do_all_cmd (String $msg) + +Description: +This method makes a call to the the whole rspamd cluster and call specified command +(in $self->{command}). + +The return value is a hash reference containing results of each command for each server from cluster + +=cut + +sub do_all_cmd { + my ($self, $input) = @_; + + my %res; + + foreach my $hostdef (@{ $self->{'hosts'} }) { + $self->_clear_errors(); + + my $remote = $self->_create_connection($hostdef); + + if (! $remote) { + $res{$hostdef}->{error} = "Cannot connect to $hostdef"; + } + else { + if ($self->{'control'}) { + $res{$hostdef} = $self->_do_control_command ($remote, $input); + } + else { + $res{$hostdef} = $self->_do_rspamc_command ($remote, $input); + } + } + } + + return \%res; +} + + =head2 check public instance (\%) check (String $msg) @@ -117,107 +603,109 @@ symbols - array of symbols =cut sub check { - my ($self, $msg) = @_; + my ($self, $msg) = @_; + + $self->{command} = 'CHECK'; - my %metrics; + return $self->_do_rspamc_command ($self, $msg); +} - my $command = 'SYMBOLS'; +=head2 symbols - $self->_clear_errors(); +public instance (\%) symbols (String $msg) - my $remote = $self->_create_connection(); +Description: +This method makes a call to the spamd server - return 0 unless ($remote); +The return value is a hash reference containing metrics indexed by name. Each metric +is hash that contains data: - my $msgsize = length($msg.$EOL); +isspam - local $SIG{PIPE} = 'IGNORE'; +score - if (!(syswrite($remote, "$command $PROTOVERSION$EOL"))) { - $self->_mark_dead($remote); - return 0; - } - syswrite $remote, "Content-length: $msgsize$EOL"; - syswrite $remote, "User: $self->{username}$EOL" if ($self->{username}); - syswrite $remote, "From: $self->{from}$EOL" if ($self->{from}); - syswrite $remote, "IP: $self->{ip}$EOL" if ($self->{ip}); - syswrite $remote, "Subject: $self->{subject}$EOL" if ($self->{subject}); - if (ref $self->{rcpt} eq "ARRAY") { - foreach ($self->{rcpt}) { - syswrite $remote, "Rcpt: $_ $EOL"; - } - } - syswrite $remote, $EOL; - syswrite $remote, $msg; - syswrite $remote, $EOL; - - return undef unless $self->_get_io_readiness($remote, 0); - - my ($in, $res); - my $offset = 0; - do { - $res = sysread($remote, $in, 512, $offset); - if ($res > 0 && $res < 512) { - $self->_get_io_readiness($remote, 0); - } - $offset += $res; - } while ($res > 0); - - my ($version, $resp_code, $resp_msg) = $self->_parse_response_line($in); - - $self->{resp_code} = $resp_code; - $self->{resp_msg} = $resp_msg; - - return undef unless (defined($resp_code) && $resp_code == 0); - - my $cur_metric; - my @lines = split (/^/, $in); - foreach my $line (@lines) { - if ($line =~ m!Metric: (\S+); (\S+); (\S+) / (\S+)!) { - $metrics{$1} = { - isspam => $2, - score => $3 + 0, - threshold => $4 + 0, - symbols => [], - }; - $cur_metric = $1; - } - elsif ($line =~ /^Symbol: (\S+)/ && $cur_metric) { - my $symref = $metrics{$cur_metric}->{'symbols'}; - push(@$symref, $1); - } - elsif ($line =~ /^${EOL}$/) { - last; - } - } +threshold + +symbols - array of symbols + +=cut - close $remote; +sub symbols { + my ($self, $msg) = @_; + + $self->{command} = 'SYMBOLS'; - return \%metrics; + return $self->_do_rspamc_command ($self, $msg); } -sub _auth { - my ($self, $sock) = @_; +=head2 process - local $SIG{PIPE} = 'IGNORE'; +public instance (\%) process (String $msg) - if (!(syswrite($sock, "PASSWORD $self->{password}$EOL"))) { - $self->_mark_dead($remote); - return 0; - } +Description: +This method makes a call to the spamd server - return 0 unless $self->_get_io_readiness($sock, 0); +The return value is a hash reference containing metrics indexed by name. Each metric +is hash that contains data: - if (sysread($sock, $reply, 255)) { - if ($reply =~ /^password accepted/) { - return 1; - } - } +isspam - return 0; - +score + +threshold + +symbols - array of symbols + +=cut +sub process { + my ($self, $msg) = @_; + + $self->{command} = 'PROCESS'; + + return $self->_do_rspamc_command ($self, $msg); } +=head2 emails + +public instance (\%) emails (String $msg) + +Description: +This method makes a call to the spamd server + +The return value is a hash reference containing metrics indexed by name. Each metric +is hash that contains data: + +emails - list of all emails in message +=cut +sub emails { + my ($self, $msg) = @_; + + $self->{command} = 'EMAILS'; + + return $self->_do_rspamc_command ($self, $msg); +} + +=head2 urls + +public instance (\%) urls (String $msg) + +Description: +This method makes a call to the spamd server + +The return value is a hash reference containing metrics indexed by name. Each metric +is hash that contains data: + +urls - list of all urls in message +=cut +sub urls { + my ($self, $msg) = @_; + + $self->{command} = 'URLS'; + + return $self->_do_rspamc_command ($self, $msg); +} + + =head2 learn public instance (\%) check (String $msg, String $statfile, Boolean in_class) @@ -228,42 +716,56 @@ This method makes a call to the spamd learning a statfile with message. =cut sub learn { - my ($self, $msg, $statfile, $in_class) = @_; - - my %metrics; + my ($self, $msg) = @_; + + $self->{command} = 'LEARN'; - my $command = 'LEARN'; + return $self->_do_control_command ($self, $msg); +} - $self->_clear_errors(); +sub weights { + my ($self, $msg) = @_; + + $self->{command} = 'WEIGHTS'; - my $remote = $self->_create_connection(); + return $self->_do_control_command ($self, $msg); +} - return 0 unless ($remote); +sub fuzzy_add { + my ($self, $msg) = @_; + + $self->{command} = 'FUZZY_ADD'; - return 0 unless $self->_auth($remote); + return $self->_do_control_command ($self, $msg); +} +sub fuzzy_del { + my ($self, $msg) = @_; + + $self->{command} = 'FUZZY_DEL'; - my $msgsize = length($msg.$EOL); + return $self->_do_control_command ($self, $msg); +} - local $SIG{PIPE} = 'IGNORE'; +sub stat { + my ($self) = @_; + + $self->{command} = 'STAT'; - if (!(syswrite ($remote, "$command $statfile $msgsize$EOL"))) { - $self->_mark_dead($remote); - return 0; - } + return $self->_do_control_command ($self, undef); +} +sub uptime { + my ($self) = @_; + + $self->{command} = 'UPTIME'; - syswrite($remote, $msg); - syswrite($remote, $EOL); - - return undef unless $self->_get_io_readiness($remote, 0); - if (sysread ($remote, $reply, 255)) { - if ($reply =~ /^learn ok/) { - close $remote; - return 1; - } - } + return $self->_do_control_command ($self, undef); +} +sub counters { + my ($self) = @_; + + $self->{command} = 'UPTIME'; - close $remote; - return 0; + return $self->_do_control_command ($self, undef); } =head2 ping @@ -277,29 +779,29 @@ if the server responded correctly. =cut sub ping { - my ($self) = @_; + my ($self) = @_; - my $remote = $self->_create_connection(); + my $remote = $self->_create_connection(); - return 0 unless ($remote); - local $SIG{PIPE} = 'IGNORE'; + return 0 unless ($remote); + local $SIG{PIPE} = 'IGNORE'; - if (!(syswrite($remote, "PING $PROTOVERSION$EOL"))) { - $self->_mark_dead($remote); - return 0; - } - syswrite($remote, $EOL); + if (!(syswrite($remote, "PING $PROTOVERSION$EOL"))) { + $self->_mark_dead($remote); + return 0; + } + syswrite($remote, $EOL); - return undef unless $self->_get_io_readiness($remote, 0); + return undef unless $self->_get_io_readiness($remote, 0); my $line; - sysread ($remote, $line, 255); - close $remote; - return undef unless $line; + sysread ($remote, $line, 255); + close $remote; + return undef unless $line; - my ($version, $resp_code, $resp_msg) = $self->_parse_response_line($line); - return 0 unless (defined($resp_msg) && $resp_msg eq 'PONG'); + my ($version, $resp_code, $resp_msg) = $self->_parse_response_line($line); + return 0 unless (defined($resp_msg) && $resp_msg eq 'PONG'); - return 1; + return 1; } =head1 PRIVATE METHODS @@ -317,117 +819,207 @@ On failure, it sets an internal error code and returns undef. =cut sub _create_connection { - my ($self) = @_; - - my $remote; - my $tries = 0; - - if ($self->{socketpath}) { - $remote = IO::Socket::UNIX->new( Peer => $self->{socketpath}, - Type => SOCK_STREAM, - Blocking => 0, - ); - # Get write readiness - if ($self->_get_io_readiness($remote, 1) == 0) { - print "Connection timed out: $!\n"; - return undef; + my ($self, $hostdef) = @_; + + my $remote; + my $tries = 0; + + if (!defined ($hostdef)) { + my $server; + + do { + $server = $self->_select_server(); + $tries ++; + + $remote = IO::Socket::INET->new( Proto => "tcp", + PeerAddr => $server->{host}, + PeerPort => $server->{port}, + Blocking => 0, + ); + # Get write readiness + if (defined ($remote)) { + if ($self->_get_io_readiness($remote, 1) != 0) { + return $remote; + } + else { + close ($remote); + } + } + } while ($tries < 5); + + return undef unless $server; + } + + if ($hostdef =~ /^\//) { + if (! socket ($remote, PF_UNIX, SOCK_STREAM, 0)) { + print "Cannot create unix socket\n"; + return undef; + } + my $sun = sockaddr_un($hostdef); + if (!connect ($remote, $sun)) { + print "Cannot connect to socket $hostdef\n"; + close $remote; + return undef; + } } - } - else { - my $server; - - do { - $server = $self->_select_server(); - $tries ++; - - $remote = IO::Socket::INET->new( Proto => "tcp", - PeerAddr => $server->{host}, - PeerPort => $server->{port}, - Blocking => 0, - ); - # Get write readiness - if ($self->_get_io_readiness($remote, 1) != 0) { - return $remote; - } - } while ($tries < 5); - - return undef unless $server; - } - unless ($remote) { - print "Failed to create connection to spamd daemon: $!\n"; - return undef; - } - $remote; + elsif ($hostdef =~ /^\s*(([^:]+):(\d+))\s*$/) { + $remote = IO::Socket::INET->new( Proto => "tcp", + PeerAddr => $server->{host}, + PeerPort => $server->{port}, + Blocking => 0, + ); + # Get write readiness + if (defined ($remote)) { + if ($self->_get_io_readiness($remote, 1) != 0) { + return $remote; + } + else { + close ($remote); + return undef; + } + } + } + + + unless ($remote) { + $self->{error} = "Failed to create connection to spamd daemon: $!\n"; + return undef; + } + $remote; +} + +=head2 _auth + +private instance (IO::Socket) _auth (Socket sock) + +Description: +This method do control auth. + +On failure this method returns 0 + +=cut +sub _auth { + my ($self, $sock) = @_; + + local $SIG{PIPE} = 'IGNORE'; + + if (!(syswrite($sock, "PASSWORD $self->{password}$EOL"))) { + $self->_mark_dead($remote); + return 0; + } + + return 0 unless $self->_get_io_readiness($sock, 0); + + if (sysread($sock, $reply, 255)) { + if ($reply =~ /^password accepted/) { + return 1; + } + } + + return 0; + } +=head2 _revive_dead + +private instance (IO::Socket) _revive_dead () + +Description: +This method marks dead upstreams as alive + +=cut sub _revive_dead { - my ($self) = @_; - - my $now = time(); - foreach my $s ($self->{dead_hosts}) { - # revive after minute of downtime - if (defined($s->{dead}) && $s->{dead} == 1 && $now - $s->{t} > 60) { - $s->{dead} = 0; - push(@{$self->{alive_hosts}}, $s->{host}); - } - } + my ($self) = @_; + + my $now = time(); + foreach my $s ($self->{dead_hosts}) { + # revive after minute of downtime + if (defined($s->{dead}) && $s->{dead} == 1 && $now - $s->{t} > 60) { + $s->{dead} = 0; + push(@{$self->{alive_hosts}}, $s->{host}); + } + } 1; } +=head2 _select_server + +private instance (IO::Socket) _select_server () + +Description: +This method returns one server from rspamd cluster or undef if there are no suitable ones + +=cut sub _select_server { - my($self) = @_; - - $self->_revive_dead(); - my $alive_num = scalar(@{$self->{alive_hosts}}); - if (!$alive_num) { - $self->{alive_hosts} = $self->{hosts}; - $self->{dead_hosts} = (); - $alive_num = scalar($self->{alive_hosts}); - } - - my $selected = $self->{alive_hosts}[int(rand($alive_num))]; - if ($selected =~ /^(\S+):(\d+)$/) { - my $server = { - host => $1, - port => $2, - }; - return $server; - } + my($self) = @_; + + $self->_revive_dead(); + my $alive_num = scalar(@{$self->{alive_hosts}}); + if (!$alive_num) { + $self->{alive_hosts} = $self->{hosts}; + $self->{dead_hosts} = (); + $alive_num = scalar($self->{alive_hosts}); + } + + my $selected = $self->{alive_hosts}[int(rand($alive_num))]; + if ($selected =~ /^(\S+):(\d+)$/) { + my $server = { + host => $1, + port => $2, + }; + return $server; + } - undef; + undef; } +=head2 _select_server + +private instance (IO::Socket) _mark_dead (String server) + +Description: +This method marks upstream as dead for some time. It can be revived by _revive_dead method + +=cut sub _mark_dead { - my ($self, $server) = @_; - - my $now = time(); - $self->{dead_hosts}->{$server} = { - host => $server, - dead => 1, - t => $now, - }; - for (my $i = 0; $i < scalar (@{$self->{alive_hosts}}); $i ++) { - if ($self->{alive_hosts} == $server) { - splice(@{$self->{alive_hosts}}, $i, 1); - last; - } - } + my ($self, $server) = @_; + + my $now = time(); + $self->{dead_hosts}->{$server} = { + host => $server, + dead => 1, + t => $now, + }; + for (my $i = 0; $i < scalar (@{$self->{alive_hosts}}); $i ++) { + if ($self->{alive_hosts} == $server) { + splice(@{$self->{alive_hosts}}, $i, 1); + last; + } + } } +=head2 _get_io_readiness + +private instance (IO::Socket) _mark_dead (String server) + +Description: +This method marks upstream as dead for some time. It can be revived by _revive_dead method + +=cut sub _get_io_readiness { - my ($self, $sock, $is_write) = @_; + my ($self, $sock, $is_write) = @_; my $w = ''; vec($w, fileno($sock), 1) = 1; - if ($is_write) { - return select(undef, $w, undef, $self->{timeout}); - } - else { - return select($w, undef,undef, $self->{timeout}); - } - + if ($is_write) { + return select(undef, $w, undef, $self->{timeout}); + } + else { + return select($w, undef,undef, $self->{timeout}); + } + undef; } @@ -445,10 +1037,10 @@ with the response line. =cut sub _parse_response_line { - my ($self, $line) = @_; + my ($self, $line) = @_; - $line =~ s/\r?\n$//; - return split(/\s+/, $line, 3); + $line =~ s/\r?\n$//; + return split(/\s+/, $line, 3); } =head2 _clear_errors @@ -461,12 +1053,11 @@ This method clears out any current errors. =cut sub _clear_errors { - my ($self) = @_; + my ($self) = @_; - $self->{resp_code} = undef; - $self->{resp_msg} = undef; + $self->{resp_code} = undef; + $self->{resp_msg} = undef; + $self->{error} = undef; } 1; - - diff --git a/rspamc.pl.in b/rspamc.pl.in index f5a01d3ed..a8f410ba8 100755 --- a/rspamc.pl.in +++ b/rspamc.pl.in @@ -9,6 +9,8 @@ use Socket qw(:DEFAULT :crlf); use Getopt::Std; +use Data::Dumper; +use Mail::Rspamd::Client; my %cfg = ( 'conf_file' => '@CMAKE_INSTALL_PREFIX@/etc/rspamd.conf', @@ -64,14 +66,39 @@ EOD exit; }; +sub load_hosts_file { + my $file = shift; + + open (HOSTS, "< $file") or die "cannot open file $file"; + $cfg{'hosts'} = [ ]; + while (<HOSTS>) { + chomp; + next if $_ =~ /^\s*#/; + if ($_ =~ /^\s*(([^:]+):(\d+))\s*$/) { + push (@{ $cfg{'hosts'} }, $1); + } + elsif ($_ =~ /^\s*([^:]+)\s*$/) { + if ($cfg{'control'}) { + push (@{ $cfg{'hosts'} }, "$1:11334"); + } + else { + push (@{ $cfg{'hosts'} }, "$1:11333"); + } + } + elsif ($_ =~ /^\s*(\/\S*)\s*$/) { + push (@{ $cfg{'hosts'} }, "$1"); + } + } + close HOSTS; +} + # Load rspamd config params sub parse_config { my ($is_ctrl) = @_; if (! open CONF, "< $cfg{'conf_file'}") { print "Config file $cfg{'conf_file'} cannot be opened\n"; - main::HELP_MESSAGE(); - exit; + return; } my $ctrl = 0, $skip = 0; @@ -92,7 +119,6 @@ sub parse_config { && $_ =~ /^\s*bind_socket\s*=\s*((([^:]+):(\d+))|(\/\S*))/i) { if ($3 && $4) { $cfg{'hosts'} = [ "$3:$4" ]; - $cfg{'is_unix'} = 0; } else { $cfg{'hosts'} = [ "$5" ]; @@ -107,453 +133,6 @@ sub parse_config { } -sub make_tcp_socket { - my ($host, $port) = @_; - my $proto = getprotobyname('tcp'); - my $sin; - - if (!socket ($sock, PF_INET, SOCK_STREAM, $proto)) { - print "Cannot create tcp socket\n"; - return undef; - } - if ($host eq '*') { - $host = '127.0.0.1'; - } - if (inet_aton ($host)) { - $sin = sockaddr_in ($port, inet_aton($host)); - } - else { - my $addr = gethostbyname($host); - if (!$addr) { - print "Cannot resolve $host\n"; - close $sock; - return undef; - } - $sin = sockaddr_in ($port, $addr); - } - - if (! connect ($sock, $sin)) { - print "Cannot connect to socket $host:$port\n"; - close $sock; - return undef; - } - - return $sock; -} - -sub make_ssl_socket { - my ($host, $port) = @_; - - eval { - use IO::Socket::SSL; - } or die "IO::Socket::SSL required for imaps"; - - return IO::Socket::SSL->new("$host:$port"); -} - -sub connect_socket { - my $hostdef = shift; - my $sock; - - if ($hostdef =~ /^\//) { - if (! socket ($sock, PF_UNIX, SOCK_STREAM, 0)) { - print "Cannot create unix socket\n"; - return undef; - } - my $sun = sockaddr_un($hostdef); - if (!connect ($sock, $sun)) { - print "Cannot connect to socket $hostdef\n"; - close $sock; - return undef; - } - } - elsif ($hostdef =~ /^\s*(([^:]+):(\d+))\s*$/) { - $sock = make_tcp_socket ($2, $3); - } - - return $sock; -} - -# Currently just read stdin for user's message and pass it to rspamd -sub do_rspamc_command { - my ($sock, $input) = @_; - - print "Sending ". length ($input) ." bytes...\n"; - - syswrite $sock, "$cfg{'command'} RSPAMC/1.1 $CRLF"; - if ($cfg{'deliver_to'}) { - syswrite $sock, "Deliver-To: " . $cfg{'deliver_to'} . $CRLF; - } - syswrite $sock, "Content-Length: " . length ($input) . $CRLF . $CRLF; - syswrite $sock, $input; - syswrite $sock, $CRLF; - while (defined (my $line = <$sock>)) { - print $line; - } - - return 1; -} - -sub do_ctrl_auth { - my ($sock) = @_; - my $res = 0; - - syswrite $sock, "password $cfg{'password'}" . $CRLF; - if (defined (my $reply = <$sock>)) { - if ($reply =~ /^password accepted/) { - $res = 1; - } - } - - # END - return 0 unless <$sock>; - - return $res; -} - -sub do_control_command { - my ($sock, $input) = @_; - - # Read greeting first - if (defined (my $greeting = <$sock>)) { - if ($greeting !~ /^Rspamd version/) { - print "Not rspamd greeting line $greeting"; - return 0; - } - } - if ($cfg{'command'} =~ /^learn$/i) { - die "statfile is not specified to learn command" if !$cfg{'statfile'}; - - - if (do_ctrl_auth ($sock)) { - my $len = length ($input); - print "Sending $len bytes...\n"; - syswrite $sock, "learn $cfg{'statfile'} $len -w $cfg{weight}" . $CRLF; - syswrite $sock, $input . $CRLF; - if (defined (my $reply = <$sock>)) { - if ($reply =~ /^learn ok, sum weight: ([0-9.]+)/) { - print "Learn succeed. Sum weight: $1\n"; - return 1; - } - else { - print "Learn failed\n"; - return 0; - } - } - } - else { - print "Authentication failed\n"; - return 0; - } - } - if ($cfg{'command'} =~ /^weights$/i) { - die "statfile is not specified to weights command" if !$cfg{'statfile'}; - - - my $len = length ($input); - print "Sending $len bytes...\n"; - syswrite $sock, "weights $cfg{'statfile'} $len" . $CRLF; - syswrite $sock, $input . $CRLF; - while (defined (my $reply = <$sock>)) { - last if $reply =~ /^END/; - print $reply; - } - } - elsif ($cfg{'command'} =~ /(reload|shutdown)/i) { - if (do_ctrl_auth ($sock)) { - syswrite $sock, $cfg{'command'} . $CRLF; - while (defined (my $line = <$sock>)) { - last if $line =~ /^END/; - print $line; - } - } - else { - print "Authentication failed\n"; - return 0; - } - } - elsif ($cfg{'command'} =~ /(fuzzy_add|fuzzy_del)/i) { - if (do_ctrl_auth ($sock)) { - my $len = length ($input); - print "Sending $len bytes...\n"; - syswrite $sock, $cfg{'command'} . " $len $cfg{'weight'}" . $CRLF; - syswrite $sock, $input . $CRLF; - if (defined (my $reply = <$sock>)) { - if ($reply =~ /^OK/) { - print $cfg{'command'} . " succeed\n"; - return 1; - } - else { - print $cfg{'command'} . " failed\n"; - return 0; - } - } - } - else { - print "Authentication failed\n"; - return 0; - } - - } - else { - syswrite $sock, $cfg{'command'} . $CRLF; - while (defined (my $line = <$sock>)) { - last if $line =~ /^END/; - print $line; - } - } - - return 1; -} - -sub process_file { - my $file = shift; - - print "Process file: $file\n"; - open(FILE, "< $file") or return; - - my $input; - while (defined (my $line = <FILE>)) { - $input .= $line; - } - - close FILE; - do_cmd ($input); -} - -sub process_directory { - my $dir = shift; - - opendir (DIR, $dir) or return; - - while (defined (my $file = readdir (DIR))) { - $file = "$dir/$file"; - if (-f $file) { - process_file ($file); - } - } - closedir (DIR); -} - -sub check_imap_reply { - my $sock = shift; - my $seq = shift; - - my $input; - - while (defined ($input = <$sock>)) { - chomp $input; - if ($input =~ /BAD|NO (.+)$/) { - $_[0] = $1; - return 0; - } - next if ($input =~ /^\*/); - if ($input =~ /^$seq OK/) { - return 1; - } - - $_[0] = $input; - return 0; - } - - $_[0] = "timeout"; - - return 0; -} - -sub parse_imap_body { - my $sock = shift; - my $seq = shift; - my $input; - my $got_body = 0; - - while (defined (my $line = <$sock>)) { - if (!$got_body && $line =~ /^\*/) { - $got_body = 1; - next; - } - if ($line =~ /^$seq OK/) { - return $input; - } - elsif ($got_body) { - $input .= $line; - next; - } - - return undef; - } - - return undef; - -} - -sub parse_imap_sequences { - my $sock = shift; - my $seq = shift; - my $input; - - while (defined ($input = <$sock>)) { - chomp $input; - if ($input =~ /^\* SEARCH (.+)$/) { - @res = split (/\s/, $1); - next; - } - elsif ($input =~ /^$seq OK/) { - return \@res; - } - return undef; - } - -} - -sub process_imap { - my ($ssl, $user, $password, $host, $mbox) = @_; - my $seq = 1; - my $sock; - - if (!$password) { - eval { - use Term::ReadKey; - print "Enter IMAP password: "; - ReadMode 'noecho'; - $password = ReadLine 0; - chomp $password; - ReadMode 'normal'; - print "\n"; - } or die "cannot get password. Check that Term::ReadKey is installed"; - } - print "Process imap: host: $host, mbox: $mbox\n"; - - # Stupid code that does not take care of timeouts etc, just trying to extract messages - if ($ssl) { - $sock = make_ssl_socket ($host, 'imaps'); - } - else { - $sock = make_tcp_socket ($host, 143); - } - my $reply = <$sock>; - if (!defined ($reply) || $reply !~ /^\* OK/) { - print "Imap server is not ready\n"; - return; - } - syswrite $sock, "$seq LOGIN $user $password$CRLF"; - if (!check_imap_reply ($sock, $seq, $reply)) { - print "Cannot login to imap server: $reply\n"; - return; - } - $seq ++; - syswrite $sock, "$seq SELECT $mbox$CRLF"; - if (!check_imap_reply ($sock, $seq, $reply)) { - print "Cannot select mbox $mbox: $reply\n"; - return; - } - $seq ++; - syswrite $sock, "$seq SEARCH $cfg{imap_search}$CRLF"; - my $messages; - if (!defined ($messages = parse_imap_sequences ($sock, $seq))) { - print "Cannot make search\n"; - return; - } - $seq ++; - foreach my $message (@{ $messages }){ - syswrite $sock, "$seq FETCH $message body[]$CRLF"; - if (defined (my $input = parse_imap_body ($sock, $seq))) { - do_cmd ($input); - } - $seq ++; - } - syswrite $sock, "$seq LOGOUT$CRLF"; - close $sock; -} - -# Single item -sub process_item { - my $item = shift; - - print "Processing $item\n"; - if (defined ($item)) { - if ($item =~ qr|^imap(s?):user:([^:]+):password:([^:]*):host:([^:]+):mbox:(.+)$|) { - process_imap ($1, $2, $3, $4, $5); - } - elsif (-f $item) { - process_file ($item); - } - elsif (-d $item) { - process_directory ($item); - } - else { - warn "urecognized argument: $item"; - } - } -} - -# Do specified command for each file in path or -sub process_path { - foreach (@_) { - process_item($_); - } -} - -# Do specified command for specified input -sub do_cmd { - my $input = shift; - my $res; - - print "*" x 20 . "\n"; - foreach my $hostdef (@{ $cfg{'hosts'} }) { - print "Do $cfg{command} on $hostdef\n"; - my $sock = connect_socket ($hostdef); - - if (! $sock) { - print "Result: failed (on connect stage)\n"; - print "*" x 20 . "\n"; - next; - } - - if ($cfg{'control'}) { - $res = do_control_command ($sock, $input); - } - else { - $res = do_rspamc_command ($sock, $input); - } - - close ($sock); - if (! $res) { - print "Result: failed (on command stage)\n"; - } - else { - print "Result: OK\n"; - } - print "*" x 20 . "\n"; - } -} - -sub load_hosts_file { - my $file = shift; - - open (HOSTS, "< $file") or die "cannot open file $file"; - $cfg{'hosts'} = [ ]; - while (<HOSTS>) { - chomp; - next if $_ =~ /^\s*#/; - if ($_ =~ /^\s*(([^:]+):(\d+))\s*$/) { - push (@{ $cfg{'hosts'} }, $1); - } - elsif ($_ =~ /^\s*([^:]+)\s*$/) { - if ($cfg{'control'}) { - push (@{ $cfg{'hosts'} }, "$1:11334"); - } - else { - push (@{ $cfg{'hosts'} }, "$1:11333"); - } - } - elsif ($_ =~ /^\s*(\/\S*)\s*$/) { - push (@{ $cfg{'hosts'} }, "$1"); - } - } - close FILE; -} - ############################# Main part ########################################### my %args; @@ -605,7 +184,7 @@ if (defined ($args{w})) { $cfg{'weight'} = $args{w}; } -if ($cmd =~ /(SYMBOLS|SCAN|PROCESS|CHECK|REPORT_IFSPAM|REPORT|URLS|EMAILS)/i) { +if ($cmd =~ /(SYMBOLS|PROCESS|CHECK|URLS|EMAILS)/i) { $cfg{'command'} = $1; $cfg{'control'} = 0; } @@ -629,6 +208,8 @@ if (defined ($args{H})) { load_hosts_file ($args{H}); } +my $rspamd = Mail::Rspamd::Client->new(\%cfg); + if (!defined ($path[0]) || ! $cfg{'require_input'}) { my $input; if ($cfg{'require_input'}) { @@ -636,8 +217,11 @@ if (!defined ($path[0]) || ! $cfg{'require_input'}) { $input .= $line; } } - do_cmd ($input); + + my $res = $rspamd->do_all_cmd ($input); + print Dumper($res); } else { - process_path (@path); + my $res = $rspamd->process_path (@path); + print Dumper($res); } |