You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

cgp_rspamd.pl 10KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. #!/usr/bin/env perl
  2. use warnings;
  3. use strict;
  4. use JSON::XS;
  5. use AnyEvent;
  6. use AnyEvent::HTTP;
  7. use AnyEvent::IO;
  8. use EV;
  9. use Pod::Usage;
  10. use Getopt::Long;
  11. use File::stat;
  12. my $rspamd_host = "localhost:11333";
  13. my $man = 0;
  14. my $help = 0;
  15. my $local = 0;
  16. my $header = "X-Spam: yes";
  17. my $max_size = 10 * 1024 * 1024; # 10 MB
  18. my $request_timeout = 15; # 15 seconds by default
  19. my $reject_message = "Spam message rejected";
  20. GetOptions(
  21. "host=s" => \$rspamd_host,
  22. "header=s" => \$header,
  23. "reject-message=s" => \$reject_message,
  24. "max-size=i" => \$max_size,
  25. "timeout=f" => \$request_timeout,
  26. "help|?" => \$help,
  27. "man" => \$man
  28. ) or pod2usage(2);
  29. pod2usage(1) if $help;
  30. pod2usage( -exitval => 0, -verbose => 2 ) if $man;
  31. my $main_domain = cgp_main_domain();
  32. my $scanned = 0;
  33. # Turn off bufferization as required by CGP
  34. $| = 1;
  35. sub cgp_main_domain {
  36. if ( open( my $fh, 'Settings/Main.settings' ) ) {
  37. while (<$fh>) {
  38. if (/^\s+DomainName\s+=\s+([^;]+);/) {
  39. return $1;
  40. }
  41. }
  42. }
  43. }
  44. sub cgp_string {
  45. my ($in) = @_;
  46. $in =~ s/\"/\\"/g;
  47. $in =~ s/\n/\\n/gms;
  48. $in =~ s/\r/\\r/mgs;
  49. $in =~ s/\t/ /g;
  50. return "\"$in\"";
  51. }
  52. sub rspamd_scan {
  53. my ( $tag, $file ) = @_;
  54. my $http_callback = sub {
  55. my ( $body, $hdr ) = @_;
  56. if ( $hdr && $hdr->{Status} =~ /^2/ ) {
  57. my $js = eval('decode_json($body)');
  58. $scanned++;
  59. if ( !$js ) {
  60. print "* Rspamd: Bad response for $file: invalid JSON: parse error\n";
  61. print "$tag FAILURE\n";
  62. }
  63. else {
  64. my $def = $js;
  65. my $headers = "";
  66. if ( !$def ) {
  67. print "* Rspamd: Bad response for $file: invalid JSON: default is missing\n";
  68. print "$tag FAILURE\n";
  69. }
  70. else {
  71. my $action = $def->{'action'};
  72. my $id = $js->{'message-id'};
  73. my $symbols = "";
  74. while ( my ( $k, $s ) = each( %{ $def->{'symbols'} } ) ) {
  75. $symbols .= sprintf "%s(%.2f);", $k, $s->{'score'};
  76. }
  77. printf
  78. "* Rspamd: Scanned %s; id: <%s>; Score: %.2f / %.2f; Symbols: [%s]\n",
  79. $file, $id, $def->{'score'}, $def->{'required_score'}, $symbols;
  80. if ( $js->{'dkim-signature'} ) {
  81. $headers .= "DKIM-Signature: " . $js->{'dkim-signature'};
  82. }
  83. if ( $js->{'milter'} ) {
  84. my $block = $js->{'milter'};
  85. if ( $block->{'add_headers'} ) {
  86. while ( my ( $h, $v ) = each( %{ $block->{'add_headers'} } ) ) {
  87. if ( ref($v) eq 'HASH' ) {
  88. if ( $headers eq "" ) {
  89. $headers .= "$h: $v->{value}";
  90. }
  91. else {
  92. $headers .= "\\e$h: $v->{value}";
  93. }
  94. }
  95. else {
  96. if ( $headers eq "" ) {
  97. $headers .= "$h: $v";
  98. }
  99. else {
  100. $headers .= "\\e$h: $v";
  101. }
  102. }
  103. }
  104. }
  105. }
  106. if ( $action eq 'reject' ) {
  107. print "$tag DISCARD\n";
  108. return;
  109. }
  110. elsif ( $action eq 'add header' || $action eq 'rewrite subject' ) {
  111. if ( $headers eq "" ) {
  112. $headers .= "$header";
  113. }
  114. else {
  115. $headers .= "\\e$header";
  116. }
  117. }
  118. elsif ( $action eq 'soft reject' ) {
  119. print "$tag REJECTED Try again later\n";
  120. return;
  121. }
  122. if ( $headers eq "" ) {
  123. print "$tag OK\n";
  124. }
  125. else {
  126. print "$tag ADDHEADER " . cgp_string($headers) . " OK\n";
  127. }
  128. }
  129. }
  130. }
  131. else {
  132. if ($hdr) {
  133. print "* Rspamd: Bad response for $file: HTTP error: $hdr->{Status} $hdr->{Reason}\n";
  134. }
  135. else {
  136. print "* Rspamd: Bad response for $file: IO error: $!\n";
  137. }
  138. print "$tag FAILURE\n";
  139. }
  140. };
  141. if ($local) {
  142. # Use file scan
  143. # XXX: not implemented now due to CGP queue format
  144. http_get(
  145. "http://$rspamd_host/symbols?file=$file",
  146. timeout => $request_timeout,
  147. $http_callback
  148. );
  149. }
  150. else {
  151. my $sb = stat($file);
  152. if ( !$sb || $sb->size > $max_size ) {
  153. if ($sb) {
  154. print "* File $file is too large: " . $sb->size . "\n$tag FAILURE\n";
  155. }
  156. else {
  157. print "* Cannot stat $file: $!\n$tag FAILURE\n";
  158. }
  159. return;
  160. }
  161. aio_load(
  162. $file,
  163. sub {
  164. my ($data) = @_;
  165. if ( !$data ) {
  166. print "* Cannot open $file: $!\n$tag FAILURE\n";
  167. return;
  168. }
  169. # Parse CGP format
  170. $data =~ s/^((?:[^\n]*\n)*?)\n(.*)$/$2/ms;
  171. my @envelope = split /\n/, $1;
  172. chomp(@envelope);
  173. my $from;
  174. my @rcpts;
  175. my $ip;
  176. my $user;
  177. foreach my $elt (@envelope) {
  178. if ( $elt =~ /^P\s[^<]*(<[^>]*>).*$/ ) {
  179. $from = $1;
  180. }
  181. elsif ( $elt =~ /^R\s[^<]*(<[^>]*>).*$/ ) {
  182. push @rcpts, $1;
  183. }
  184. elsif ( $elt =~ /^S (?:<([^>]+)> )?(?:SMTP|HTTPU?|AIRSYNC|XIMSS) \[([0-9a-f.:]+)\]/ ) {
  185. if ($1) {
  186. $user = $1;
  187. }
  188. if ($2) {
  189. $ip = $2;
  190. }
  191. }
  192. elsif ( $elt =~ /^S (?:<([^>]+)> )?(?:DSN|GROUP|LIST|PBX|PIPE|RULE) \[0\.0\.0\.0\]/ ) {
  193. if ($1) {
  194. $user = $1;
  195. }
  196. $ip = '127.2.4.7';
  197. }
  198. }
  199. my $headers = {};
  200. if ( $file =~ /\/([^\/.]+)\.msg$/ ) {
  201. $headers->{'Queue-ID'} = $1;
  202. }
  203. if ($from) {
  204. $headers->{From} = $from;
  205. }
  206. if ( scalar(@rcpts) > 0 ) {
  207. # XXX: Anyevent cannot parse headers with multiple values
  208. $headers->{Rcpt} = join( ',', @rcpts );
  209. }
  210. if ($ip) {
  211. $headers->{IP} = $ip;
  212. }
  213. if ($user) {
  214. $headers->{User} = $user;
  215. }
  216. if ($main_domain) {
  217. $headers->{'MTA-Tag'} = $main_domain;
  218. }
  219. http_post(
  220. "http://$rspamd_host/checkv2", $data,
  221. timeout => $request_timeout,
  222. headers => $headers,
  223. $http_callback
  224. );
  225. }
  226. );
  227. }
  228. }
  229. # Show informational message
  230. print "* Rspamd CGP filter has been started\n";
  231. my $w = AnyEvent->io(
  232. fh => \*STDIN,
  233. poll => 'r',
  234. cb => sub {
  235. chomp( my $input = <STDIN> );
  236. if ( $input =~ /^(\d+)\s+(\S+)(\s+(\S+)\s*)?$/ ) {
  237. my $tag = $1;
  238. my $cmd = $2;
  239. if ( $cmd eq "INTF" ) {
  240. print "$input\n";
  241. }
  242. elsif ( $cmd eq "FILE" && $4 ) {
  243. my $file = $4;
  244. print "* Scanning file $file\n";
  245. rspamd_scan $tag, $file;
  246. }
  247. elsif ( $cmd eq "QUIT" ) {
  248. print "* Terminating after scanning of $scanned files\n";
  249. print "$tag OK\n";
  250. exit 0;
  251. }
  252. else {
  253. print "* Unknown command $cmd\n";
  254. print "$tag FAILURE\n";
  255. }
  256. }
  257. }
  258. );
  259. EV::run;
  260. __END__
  261. =head1 NAME
  262. cgp_rspamd - implements Rspamd filter for CommunigatePro MTA
  263. =head1 SYNOPSIS
  264. cgp_rspamd [options]
  265. Options:
  266. --host=hostport Rspamd host to connect (localhost:11333 by default)
  267. --header Add specific header for a spam message ("X-Spam: yes" by default)
  268. --reject-message Rejection message for spam mail ("Spam message rejected" by default)
  269. --timeout Timeout to read response from Rspamd (15 seconds by default)
  270. --max-size Maximum size of message to scan (10 megabytes by default)
  271. --help brief help message
  272. --man full documentation
  273. =head1 OPTIONS
  274. =over 8
  275. =item B<--host>
  276. Specifies Rspamd host to use for scanning
  277. =item B<--header>
  278. Specifies the header that should be added when Rspamd action is B<add header> or B<rewrite subject>.
  279. =item B<--reject-message>
  280. Specifies the rejection message for spam.
  281. =item B<--timeout>
  282. Sets timeout in seconds for waiting Rspamd reply for a message.
  283. =item B<--max-size>
  284. Define the maximum messages size to be processed by Rspamd in bytes.
  285. =item B<--help>
  286. Print a brief help message and exits.
  287. =item B<--man>
  288. Prints the manual page and exits.
  289. =back
  290. =head1 DESCRIPTION
  291. B<cgp_rspamd> is intended to scan messages processed with B<CommunigatePro> MTA on some Rspamd scanner. It reads
  292. standard input and parses CGP helpers protocol. On scan requests, this filter can query Rspamd to process a message.
  293. B<cgp_rspamd> can tell CGP to add header or reject SPAM messages depending on Rspamd scan result.
  294. =cut