You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

cgp_rspamd.pl 8.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. #!/usr/bin/env perl
  2. use warnings;
  3. use strict;
  4. use JSON::XS;
  5. use AnyEvent;
  6. use AnyEvent::HTTP;
  7. use AnyEvent::IO;
  8. use EV;
  9. use Pod::Usage;
  10. use Getopt::Long;
  11. use File::stat;
  12. my $rspamd_host = "localhost:11333";
  13. my $man = 0;
  14. my $help = 0;
  15. my $local = 0;
  16. my $header = "X-Spam: yes";
  17. my $max_size = 10 * 1024 * 1024; # 10 MB
  18. my $request_timeout = 15; # 15 seconds by default
  19. my $reject_message = "Spam message rejected";
  20. GetOptions(
  21. "host=s" => \$rspamd_host,
  22. "header=s" => \$header,
  23. "reject-message=s" => \$reject_message,
  24. "max-size=i" => \$max_size,
  25. "timeout=f" => \$request_timeout,
  26. "help|?" => \$help,
  27. "man" => \$man
  28. ) or pod2usage(2);
  29. pod2usage(1) if $help;
  30. pod2usage( -exitval => 0, -verbose => 2 ) if $man;
  31. my $main_domain = cgp_main_domain();
  32. my $scanned = 0;
  33. # Turn off bufferization as required by CGP
  34. $| = 1;
  35. sub cgp_main_domain {
  36. if ( open(my $fh, 'Settings/Main.settings') ) {
  37. while (<$fh>) {
  38. if ( /^\s+DomainName\s+=\s+([^;]+);/ ) {
  39. return $1;
  40. }
  41. }
  42. }
  43. }
  44. sub cgp_string {
  45. my ($in) = @_;
  46. $in =~ s/\"/\\"/g;
  47. $in =~ s/\n/\\n/gms;
  48. $in =~ s/\r/\\r/mgs;
  49. $in =~ s/\t/ /g;
  50. return "\"$in\"";
  51. }
  52. sub rspamd_scan {
  53. my ( $tag, $file ) = @_;
  54. my $http_callback = sub {
  55. my ( $body, $hdr ) = @_;
  56. if ( $hdr && $hdr->{Status} =~ /^2/ ) {
  57. my $js = eval('decode_json($body)');
  58. $scanned++;
  59. if ( !$js ) {
  60. print "* Rspamd: Bad response for $file: invalid JSON: parse error\n";
  61. print "$tag FAILURE\n";
  62. }
  63. else {
  64. my $def = $js;
  65. my $headers = "";
  66. if ( !$def ) {
  67. print
  68. "* Rspamd: Bad response for $file: invalid JSON: default is missing\n";
  69. print "$tag FAILURE\n";
  70. }
  71. else {
  72. my $action = $def->{'action'};
  73. my $id = $js->{'message-id'};
  74. my $symbols = "";
  75. while ( my ( $k, $s ) = each( %{$def->{'symbols'}}) ) {
  76. $symbols .= sprintf "%s(%.2f);", $k, $s->{'score'};
  77. }
  78. printf
  79. "* Rspamd: Scanned %s; id: <%s>; Score: %.2f / %.2f; Symbols: [%s]\n",
  80. $file, $id, $def->{'score'}, $def->{'required_score'}, $symbols;
  81. if ( $js->{'dkim-signature'} ) {
  82. $headers .= "DKIM-Signature: " . $js->{'dkim-signature'};
  83. }
  84. if ( $js->{'milter'} ) {
  85. my $block = $js->{'milter'};
  86. if ( $block->{'add_headers'} ) {
  87. while ( my ( $h, $v ) = each( %{ $block->{'add_headers'} } ) ) {
  88. if (ref($v) eq 'HASH') {
  89. if ($headers eq "") {
  90. $headers .= "$h: $v->{value}";
  91. }
  92. else {
  93. $headers .= "\\e$h: $v->{value}";
  94. }
  95. }
  96. else {
  97. if ($headers eq "") {
  98. $headers .= "$h: $v";
  99. }
  100. else {
  101. $headers .= "\\e$h: $v";
  102. }
  103. }
  104. }
  105. }
  106. }
  107. if ( $action eq 'reject' ) {
  108. print "$tag DISCARD\n";
  109. return;
  110. }
  111. elsif ( $action eq 'add header' || $action eq 'rewrite subject' ) {
  112. if ( $headers eq "" ) {
  113. $headers .= "$header";
  114. }
  115. else {
  116. $headers .= "\\e$header";
  117. }
  118. }
  119. elsif ( $action eq 'soft reject' ) {
  120. print "$tag REJECT Try again later\n";
  121. return;
  122. }
  123. if ( $headers eq "" ) {
  124. print "$tag OK\n";
  125. }
  126. else {
  127. print "$tag ADDHEADER " . cgp_string($headers) . " OK\n";
  128. }
  129. }
  130. }
  131. }
  132. else {
  133. if ($hdr) {
  134. print
  135. "* Rspamd: Bad response for $file: HTTP error: $hdr->{Status} $hdr->{Reason}\n";
  136. }
  137. else {
  138. print "* Rspamd: Bad response for $file: IO error: $!\n";
  139. }
  140. print "$tag FAILURE\n";
  141. }
  142. };
  143. if ($local) {
  144. # Use file scan
  145. # XXX: not implemented now due to CGP queue format
  146. http_get(
  147. "http://$rspamd_host/symbols?file=$file",
  148. timeout => $request_timeout,
  149. $http_callback
  150. );
  151. }
  152. else {
  153. my $sb = stat($file);
  154. if ( !$sb || $sb->size > $max_size ) {
  155. if ($sb) {
  156. print "* File $file is too large: " . $sb->size . "\n$tag FAILURE\n";
  157. }
  158. else {
  159. print "* Cannot stat $file: $!\n$tag FAILURE\n";
  160. }
  161. return;
  162. }
  163. aio_load(
  164. $file,
  165. sub {
  166. my ($data) = @_;
  167. if ( !$data ) {
  168. print "* Cannot open $file: $!\n$tag FAILURE\n";
  169. return;
  170. }
  171. # Parse CGP format
  172. $data =~ s/^((?:[^\n]*\n)*?)\n(.*)$/$2/ms;
  173. my @envelope = split /\n/, $1;
  174. chomp(@envelope);
  175. my $from;
  176. my @rcpts;
  177. my $ip;
  178. my $user;
  179. foreach my $elt (@envelope) {
  180. if ( $elt =~ /^P\s[^<]*(<[^>]*>).*$/ ) {
  181. $from = $1;
  182. }
  183. elsif ( $elt =~ /^R\s[^<]*(<[^>]*>).*$/ ) {
  184. push @rcpts, $1;
  185. }
  186. elsif ( $elt =~ /^S (?:<([^>]+)> )?(?:SMTP|HTTPU?|AIRSYNC|XIMSS) \[([0-9a-f.:]+)\]/ ) {
  187. if ($1) {
  188. $user = $1;
  189. }
  190. if ($2) {
  191. $ip = $2;
  192. }
  193. }
  194. elsif ( $elt =~ /^S (?:<([^>]+)> )?(?:DSN|GROUP|LIST|PBX|PIPE|RULE) \[0\.0\.0\.0\]/ ) {
  195. if ($1) {
  196. $user = $1;
  197. }
  198. $ip = '127.2.4.7';
  199. }
  200. }
  201. my $headers = {};
  202. if ( $file =~ /\/([^\/.]+)\.msg$/ ) {
  203. $headers->{'Queue-ID'} = $1;
  204. }
  205. if ($from) {
  206. $headers->{From} = $from;
  207. }
  208. if ( scalar(@rcpts) > 0 ) {
  209. # XXX: Anyevent cannot parse headers with multiple values
  210. $headers->{Rcpt} = join(',', @rcpts);
  211. }
  212. if ($ip) {
  213. $headers->{IP} = $ip;
  214. }
  215. if ($user) {
  216. $headers->{User} = $user;
  217. }
  218. if ($main_domain) {
  219. $headers->{'MTA-Tag'} = $main_domain;
  220. }
  221. http_post(
  222. "http://$rspamd_host/checkv2", $data,
  223. timeout => $request_timeout,
  224. headers => $headers,
  225. $http_callback
  226. );
  227. }
  228. );
  229. }
  230. }
  231. # Show informational message
  232. print "* Rspamd CGP filter has been started\n";
  233. my $w = AnyEvent->io(
  234. fh => \*STDIN,
  235. poll => 'r',
  236. cb => sub {
  237. chomp( my $input = <STDIN> );
  238. if ( $input =~ /^(\d+)\s+(\S+)(\s+(\S+)\s*)?$/ ) {
  239. my $tag = $1;
  240. my $cmd = $2;
  241. if ( $cmd eq "INTF" ) {
  242. print "$input\n";
  243. }
  244. elsif ( $cmd eq "FILE" && $4 ) {
  245. my $file = $4;
  246. print "* Scanning file $file\n";
  247. rspamd_scan $tag, $file;
  248. }
  249. elsif ( $cmd eq "QUIT" ) {
  250. print "* Terminating after scanning of $scanned files\n";
  251. print "$tag OK\n";
  252. exit 0;
  253. }
  254. else {
  255. print "* Unknown command $cmd\n";
  256. print "$tag FAILURE\n";
  257. }
  258. }
  259. }
  260. );
  261. EV::run;
  262. __END__
  263. =head1 NAME
  264. cgp_rspamd - implements Rspamd filter for CommunigatePro MTA
  265. =head1 SYNOPSIS
  266. cgp_rspamd [options]
  267. Options:
  268. --host=hostport Rspamd host to connect (localhost:11333 by default)
  269. --header Add specific header for a spam message ("X-Spam: yes" by default)
  270. --reject-message Rejection message for spam mail ("Spam message rejected" by default)
  271. --timeout Timeout to read response from Rspamd (15 seconds by default)
  272. --max-size Maximum size of message to scan (10 megabytes by default)
  273. --help brief help message
  274. --man full documentation
  275. =head1 OPTIONS
  276. =over 8
  277. =item B<--host>
  278. Specifies Rspamd host to use for scanning
  279. =item B<--header>
  280. Specifies the header that should be added when Rspamd action is B<add header>
  281. or B<rewrite subject>.
  282. =item B<--reject-message>
  283. Specifies the rejection message for spam.
  284. =item B<--timeout>
  285. Sets timeout in seconds for waiting Rspamd reply for a message.
  286. =item B<--max-size>
  287. Define the maximum messages size to be processed by Rspamd in bytes.
  288. =item B<--help>
  289. Print a brief help message and exits.
  290. =item B<--man>
  291. Prints the manual page and exits.
  292. =back
  293. =head1 DESCRIPTION
  294. B<cgp_rspamd> is intended to scan messages processed with B<CommunigatePro> MTA
  295. on some Rspamd scanner. It reads standard input and parses CGP helpers
  296. protocol. On scan requests, this filter can query Rspamd to process a message.
  297. B<cgp_rspamd> can tell CGP to add header or reject SPAM messages depending on
  298. Rspamd scan result.
  299. =cut