You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

cgp_rspamd.pl 6.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. #!/usr/bin/env perl
  2. use warnings;
  3. use strict;
  4. use JSON::XS;
  5. use AnyEvent;
  6. use AnyEvent::HTTP;
  7. use AnyEvent::IO;
  8. use EV;
  9. use Pod::Usage;
  10. use Getopt::Long;
  11. use File::stat;
  12. my $rspamd_host = "localhost:11333";
  13. my $man = 0;
  14. my $help = 0;
  15. my $local = 0;
  16. my $header = "X-Spam: yes";
  17. my $max_size = 10 * 1024 * 1024; # 10 MB
  18. my $request_timeout = 15; # 15 seconds by default
  19. my $reject_message = "Spam message rejected";
  20. GetOptions(
  21. "host=s" => \$rspamd_host,
  22. "header=s" => \$header,
  23. "reject-message=s" => \$reject_message,
  24. "max-size=i" => \$max_size,
  25. "timeout=f" => \$request_timeout,
  26. "help|?" => \$help,
  27. "man" => \$man
  28. ) or pod2usage(2);
  29. pod2usage(1) if $help;
  30. pod2usage( -exitval => 0, -verbose => 2 ) if $man;
  31. my $scanned = 0;
  32. # Turn off bufferization as required by CGP
  33. $| = 1;
  34. sub cgp_string {
  35. my ($in) = @_;
  36. $in =~ s/\"/\\"/;
  37. $in =~ s/\n/\\n/;
  38. $in =~ s/\r/\\r/;
  39. return "\"$in\"";
  40. }
  41. sub rspamd_scan {
  42. my ( $tag, $file ) = @_;
  43. my $http_callback = sub {
  44. my ( $body, $hdr ) = @_;
  45. if ( $hdr && $hdr->{Status} =~ /^2/ ) {
  46. my $js = eval('decode_json($body)');
  47. $scanned++;
  48. if ( !$js ) {
  49. print "* Rspamd: Bad response for $file: invalid JSON: parse error\n";
  50. print "$tag FAILURE\n";
  51. }
  52. else {
  53. my $def = $js->{'default'};
  54. if ( !$def ) {
  55. print
  56. "* Rspamd: Bad response for $file: invalid JSON: default is missing\n";
  57. print "$tag FAILURE\n";
  58. }
  59. else {
  60. my $action = $def->{'action'};
  61. my $id = $js->{'message-id'};
  62. my $symbols = "";
  63. while ( my ( $k, $s ) = each( %{$def} ) ) {
  64. if ( ref($s) eq "HASH" && $s->{'score'} ) {
  65. $symbols .= sprintf "%s(%.2f);", $k, $s->{'score'};
  66. }
  67. }
  68. printf
  69. "* Rspamd: Scanned %s; id: <%s>; Score: %.2f / %.2f; Symbols: [%s]\n",
  70. $file, $id, $def->{'score'}, $def->{'required_score'}, $symbols;
  71. if ( $action eq 'reject' ) {
  72. print "$tag ERROR " . cgp_string($reject_message) . "\n";
  73. }
  74. elsif ( $action eq 'add header' || $action eq 'rewrite subject' ) {
  75. print "$tag ADDHEADER " . cgp_string($header) . " OK\n";
  76. }
  77. elsif ( $action eq 'soft reject' ) {
  78. print "$tag REJECT Try again later\n";
  79. }
  80. else {
  81. print "$tag OK\n";
  82. }
  83. }
  84. }
  85. }
  86. else {
  87. if ($hdr) {
  88. print
  89. "* Rspamd: Bad response for $file: HTTP error: $hdr->{Status} $hdr->{Reason}\n";
  90. }
  91. else {
  92. print "* Rspamd: Bad response for $file: IO error: $!\n";
  93. }
  94. print "$tag FAILURE\n";
  95. }
  96. };
  97. if ($local) {
  98. # Use file scan
  99. # XXX: not implemented now due to CGP queue format
  100. http_get(
  101. "http://$rspamd_host/symbols?file=$file",
  102. timeout => $request_timeout,
  103. $http_callback
  104. );
  105. }
  106. else {
  107. my $sb = stat($file);
  108. if ( !$sb || $sb->size > $max_size ) {
  109. if ($sb) {
  110. print "* File $file is too large: " . $sb->size . "\n$tag FAILURE\n";
  111. }
  112. else {
  113. print "* Cannot stat $file: $!\n$tag FAILURE\n";
  114. }
  115. return;
  116. }
  117. aio_load(
  118. $file,
  119. sub {
  120. my ($data) = @_;
  121. if ( !$data ) {
  122. print "* Cannot open $file: $!\n$tag FAILURE\n";
  123. return;
  124. }
  125. # Parse CGP format
  126. $data =~ s/^((?:[^\n]*\n)*?)\n(.*)$/$2/ms;
  127. my @envelope = split /\n/, $1;
  128. chomp(@envelope);
  129. my $from;
  130. my @rcpts;
  131. my $ip;
  132. foreach my $elt (@envelope) {
  133. if ( $elt =~ /^P\s[^<]*(<[^>]*>).*$/ ) {
  134. $from = $1;
  135. }
  136. elsif ( $elt =~ /^R\s[^<]*(<[^>]*>).*$/ ) {
  137. push @rcpts, $1;
  138. }
  139. elsif ( $elt =~ /^S .*\[(.+)\]/ ) {
  140. $ip = $1;
  141. }
  142. }
  143. my $headers = {};
  144. if ( $file =~ /\/([^\/.]+)\.msg$/ ) {
  145. $headers->{'Queue-ID'} = $1;
  146. }
  147. if ($from) {
  148. $headers->{From} = $from;
  149. }
  150. if ( scalar(@rcpts) > 0 ) {
  151. # XXX: Anyevent cannot parse headers with multiple values
  152. foreach (@rcpts) {
  153. $headers->{Rcpt} = $_;
  154. }
  155. }
  156. if ($ip) {
  157. $headers->{IP} = $ip;
  158. }
  159. http_post(
  160. "http://$rspamd_host/symbols", $data,
  161. timeout => $request_timeout,
  162. headers => $headers,
  163. $http_callback
  164. );
  165. }
  166. );
  167. }
  168. }
  169. # Show informational message
  170. print "* Rspamd CGP filter has been started\n";
  171. my $w = AnyEvent->io(
  172. fh => \*STDIN,
  173. poll => 'r',
  174. cb => sub {
  175. chomp( my $input = <STDIN> );
  176. if ( $input =~ /^(\d+)\s+(\S+)(\s+(\S+)\s*)?$/ ) {
  177. my $tag = $1;
  178. my $cmd = $2;
  179. if ( $cmd eq "INTF" ) {
  180. print "$input\n";
  181. }
  182. elsif ( $cmd eq "FILE" && $4 ) {
  183. my $file = $4;
  184. print "* Scanning file $file\n";
  185. rspamd_scan $tag, $file;
  186. }
  187. elsif ( $cmd eq "QUIT" ) {
  188. print "* Terminating after scanning of $scanned files\n";
  189. print "$tag OK\n";
  190. exit 0;
  191. }
  192. else {
  193. print "* Unknown command $cmd\n";
  194. print "$tag FAILURE\n";
  195. }
  196. }
  197. }
  198. );
  199. EV::run;
  200. __END__
  201. =head1 NAME
  202. cgp_rspamd - implements Rspamd filter for CommunigatePro MTA
  203. =head1 SYNOPSIS
  204. cgp_rspamd [options]
  205. Options:
  206. --host=hostport Rspamd host to connect (localhost:11333 by default)
  207. --header Add specific header for a spam message ("X-Spam: yes" by default)
  208. --reject-message Rejection message for spam mail ("Spam message rejected" by default)
  209. --timeout Timeout to read response from Rspamd (15 seconds by default)
  210. --max-size Maximum size of message to scan (10 megabytes by default)
  211. --help brief help message
  212. --man full documentation
  213. =head1 OPTIONS
  214. =over 8
  215. =item B<--host>
  216. Specifies Rspamd host to use for scanning
  217. =item B<--header>
  218. Specifies the header that should be added when Rspamd action is B<add header>
  219. or B<rewrite subject>.
  220. =item B<--reject-message>
  221. Specifies the rejection message for spam.
  222. =item B<--timeout>
  223. Sets timeout in seconds for waiting Rspamd reply for a message.
  224. =item B<--max-size>
  225. Define the maximum messages size to be processed by Rspamd in bytes.
  226. =item B<--help>
  227. Print a brief help message and exits.
  228. =item B<--man>
  229. Prints the manual page and exits.
  230. =back
  231. =head1 DESCRIPTION
  232. B<cgp_rspamd> is intended to scan messages processed with B<CommunigatePro> MTA
  233. on some Rspamd scanner. It reads standard input and parses CGP helpers
  234. protocol. On scan requests, this filter can query Rspamd to process a message.
  235. B<cgp_rspamd> can tell CGP to add header or reject SPAM messages depending on
  236. Rspamd scan result.
  237. =back
  238. =cut