#!/usr/bin/env perl use warnings; use strict; use JSON::XS; use AnyEvent; use AnyEvent::HTTP; use AnyEvent::IO; use EV; use Pod::Usage; use Getopt::Long; use File::stat; my $rspamd_host = "localhost:11333"; my $man = 0; my $help = 0; my $local = 0; my $header = "X-Spam: yes"; my $max_size = 10 * 1024 * 1024; # 10 MB my $request_timeout = 15; # 15 seconds by default my $reject_message = "Spam message rejected"; GetOptions( "host=s" => \$rspamd_host, "header=s" => \$header, "reject-message=s" => \$reject_message, "max-size=i" => \$max_size, "timeout=f" => \$request_timeout, "help|?" => \$help, "man" => \$man ) or pod2usage(2); pod2usage(1) if $help; pod2usage( -exitval => 0, -verbose => 2 ) if $man; my $scanned = 0; # Turn off bufferization as required by CGP $| = 1; sub cgp_string { my ($in) = @_; $in =~ s/\"/\\"/g; $in =~ s/\n/\\n/gms; $in =~ s/\r/\\r/mgs; $in =~ s/\t/ /g; return "\"$in\""; } sub rspamd_scan { my ( $tag, $file ) = @_; my $http_callback = sub { my ( $body, $hdr ) = @_; if ( $hdr && $hdr->{Status} =~ /^2/ ) { my $js = eval('decode_json($body)'); $scanned++; if ( !$js ) { print "* Rspamd: Bad response for $file: invalid JSON: parse error\n"; print "$tag FAILURE\n"; } else { my $def = $js; my $headers = ""; if ( !$def ) { print "* Rspamd: Bad response for $file: invalid JSON: default is missing\n"; print "$tag FAILURE\n"; } else { my $action = $def->{'action'}; my $id = $js->{'message-id'}; my $symbols = ""; while ( my ( $k, $s ) = each( %{$def->{'symbols'}}) ) { $symbols .= sprintf "%s(%.2f);", $k, $s->{'score'}; } printf "* Rspamd: Scanned %s; id: <%s>; Score: %.2f / %.2f; Symbols: [%s]\n", $file, $id, $def->{'score'}, $def->{'required_score'}, $symbols; if ( $js->{'dkim-signature'} ) { $headers .= "DKIM-Signature: " . $js->{'dkim-signature'}; } if ( $js->{'milter'} ) { my $block = $js->{'milter'}; if ( $block->{'add_headers'} ) { while ( my ( $h, $v ) = each( %{ $block->{'add_headers'} } ) ) { if (ref($v) eq 'HASH') { if ($headers eq "") { $headers .= "$h: $v->{value}"; } else { $headers .= "\\e$h: $v->{value}"; } } else { if ($headers eq "") { $headers .= "$h: $v"; } else { $headers .= "\\e$h: $v"; } } } } } if ( $action eq 'reject' ) { print "$tag DISCARD\n"; return; } elsif ( $action eq 'add header' || $action eq 'rewrite subject' ) { if ( $headers eq "" ) { $headers .= "$header"; } else { $headers .= "\\e$header"; } } elsif ( $action eq 'soft reject' ) { print "$tag REJECT Try again later\n"; return; } if ( $headers eq "" ) { print "$tag OK\n"; } else { print "$tag ADDHEADER " . cgp_string($headers) . " OK\n"; } } } } else { if ($hdr) { print "* Rspamd: Bad response for $file: HTTP error: $hdr->{Status} $hdr->{Reason}\n"; } else { print "* Rspamd: Bad response for $file: IO error: $!\n"; } print "$tag FAILURE\n"; } }; if ($local) { # Use file scan # XXX: not implemented now due to CGP queue format http_get( "http://$rspamd_host/symbols?file=$file", timeout => $request_timeout, $http_callback ); } else { my $sb = stat($file); if ( !$sb || $sb->size > $max_size ) { if ($sb) { print "* File $file is too large: " . $sb->size . "\n$tag FAILURE\n"; } else { print "* Cannot stat $file: $!\n$tag FAILURE\n"; } return; } aio_load( $file, sub { my ($data) = @_; if ( !$data ) { print "* Cannot open $file: $!\n$tag FAILURE\n"; return; } # Parse CGP format $data =~ s/^((?:[^\n]*\n)*?)\n(.*)$/$2/ms; my @envelope = split /\n/, $1; chomp(@envelope); my $from; my @rcpts; my $ip; foreach my $elt (@envelope) { if ( $elt =~ /^P\s[^<]*(<[^>]*>).*$/ ) { $from = $1; } elsif ( $elt =~ /^R\s[^<]*(<[^>]*>).*$/ ) { push @rcpts, $1; } elsif ( $elt =~ /^S .*\[(.+)\]/ ) { $ip = $1; } } my $headers = {}; if ( $file =~ /\/([^\/.]+)\.msg$/ ) { $headers->{'Queue-ID'} = $1; } if ($from) { $headers->{From} = $from; } if ( scalar(@rcpts) > 0 ) { # XXX: Anyevent cannot parse headers with multiple values $headers->{Rcpt} = join(',', @rcpts); } if ($ip) { $headers->{IP} = $ip; } http_post( "http://$rspamd_host/checkv2", $data, timeout => $request_timeout, headers => $headers, $http_callback ); } ); } } # Show informational message print "* Rspamd CGP filter has been started\n"; my $w = AnyEvent->io( fh => \*STDIN, poll => 'r', cb => sub { chomp( my $input = ); if ( $input =~ /^(\d+)\s+(\S+)(\s+(\S+)\s*)?$/ ) { my $tag = $1; my $cmd = $2; if ( $cmd eq "INTF" ) { print "$input\n"; } elsif ( $cmd eq "FILE" && $4 ) { my $file = $4; print "* Scanning file $file\n"; rspamd_scan $tag, $file; } elsif ( $cmd eq "QUIT" ) { print "* Terminating after scanning of $scanned files\n"; print "$tag OK\n"; exit 0; } else { print "* Unknown command $cmd\n"; print "$tag FAILURE\n"; } } } ); EV::run; __END__ =head1 NAME cgp_rspamd - implements Rspamd filter for CommunigatePro MTA =head1 SYNOPSIS cgp_rspamd [options] Options: --host=hostport Rspamd host to connect (localhost:11333 by default) --header Add specific header for a spam message ("X-Spam: yes" by default) --reject-message Rejection message for spam mail ("Spam message rejected" by default) --timeout Timeout to read response from Rspamd (15 seconds by default) --max-size Maximum size of message to scan (10 megabytes by default) --help brief help message --man full documentation =head1 OPTIONS =over 8 =item B<--host> Specifies Rspamd host to use for scanning =item B<--header> Specifies the header that should be added when Rspamd action is B or B. =item B<--reject-message> Specifies the rejection message for spam. =item B<--timeout> Sets timeout in seconds for waiting Rspamd reply for a message. =item B<--max-size> Define the maximum messages size to be processed by Rspamd in bytes. =item B<--help> Print a brief help message and exits. =item B<--man> Prints the manual page and exits. =back =head1 DESCRIPTION B is intended to scan messages processed with B MTA on some Rspamd scanner. It reads standard input and parses CGP helpers protocol. On scan requests, this filter can query Rspamd to process a message. B can tell CGP to add header or reject SPAM messages depending on Rspamd scan result. =back =cut