Przeglądaj źródła

* Add initial LMTP support and LDA delivery to rspamd

tags/0.2.7
Vsevolod Stakhov 15 lat temu
rodzic
commit
1cd34f5283
17 zmienionych plików z 1032 dodań i 66 usunięć
  1. 4
    2
      CMakeLists.txt
  2. 9
    0
      config.h.in
  3. 10
    0
      rspamd.conf.sample
  4. 28
    2
      src/cfg_file.h
  5. 4
    0
      src/cfg_file.l
  6. 84
    2
      src/cfg_file.y
  7. 81
    53
      src/cfg_utils.c
  8. 1
    1
      src/controller.c
  9. 31
    0
      src/fstring.c
  10. 5
    1
      src/fstring.h
  11. 314
    0
      src/lmtp.c
  12. 20
    0
      src/lmtp.h
  13. 380
    0
      src/lmtp_proto.c
  14. 44
    0
      src/lmtp_proto.h
  15. 14
    3
      src/main.c
  16. 3
    0
      src/main.h
  17. 0
    2
      src/worker.c

+ 4
- 2
CMakeLists.txt Wyświetl plik

@@ -145,7 +145,7 @@ CHECK_INCLUDE_FILES(netinet/in.h HAVE_NETINET_IN_H)
CHECK_INCLUDE_FILES(arpa/inet.h HAVE_ARPA_INET_H)
CHECK_INCLUDE_FILES(netdb.h HAVE_NETDB_H)
CHECK_INCLUDE_FILES(syslog.h HAVE_SYSLOG_H)
CHECK_INCLUDE_FILES(libgen.h HAVE_LIBGEN_H)

CHECK_FUNCTION_EXISTS(setproctitle HAVE_SETPROCTITLE)
CHECK_FUNCTION_EXISTS(getpagesize HAVE_GETPAGESIZE)
@@ -212,7 +212,9 @@ SET(RSPAMDSRC src/modules.c
src/filter.c
src/controller.c
src/cfg_utils.c
src/buffer.c)
src/buffer.c
src/lmtp.c
src/lmtp_proto.c)

SET(TOKENIZERSSRC src/tokenizers/tokenizers.c
src/tokenizers/osb.c)

+ 9
- 0
config.h.in Wyświetl plik

@@ -37,6 +37,8 @@

#cmakedefine HAVE_LIBUTIL_H 1

#cmakedefine HAVE_LIBGEN_H 1

#cmakedefine HAVE_ENDIAN_H 1
#cmakedefine HAVE_SYS_ENDIAN_H 1
#cmakedefine HAVE_MACHINE_ENDIAN_H 1
@@ -192,14 +194,21 @@
#include <math.h>
#endif

/* libutil */
#ifdef HAVE_LIBUTIL_H
#include <libutil.h>
#endif

/* syslog */
#ifdef HAVE_SYSLOG_H
#include <syslog.h>
#endif

#ifdef HAVE_LIBGEN_H
#include <libgen.h>
#define HAVE_DIRNAME 1
#endif

#include <errno.h>
#include <signal.h>
#include <event.h>

+ 10
- 0
rspamd.conf.sample Wyświetl plik

@@ -76,4 +76,14 @@ factors {
"winnow" = 5.5;
};

lmtp {
enabled = yes;
bind_socket = localhost:11335;
};

delivery {
enabled = yes;
agent = "/dev/null";
};

url_filters = "surbl";

+ 28
- 2
src/cfg_file.h Wyświetl plik

@@ -14,6 +14,7 @@

#define DEFAULT_BIND_PORT 768
#define DEFAULT_CONTROL_PORT 7608
#define DEFAULT_LMTP_PORT 7609
#define MAX_MEMCACHED_SERVERS 48
#define DEFAULT_MEMCACHED_PORT 11211
/* Memcached timeouts */
@@ -38,6 +39,16 @@ struct classifier;

enum { VAL_UNDEF=0, VAL_TRUE, VAL_FALSE };

/**
* Types of rspamd bind lines
*/
enum rspamd_cred_type {
CRED_NORMAL,
CRED_CONTROL,
CRED_LMTP,
CRED_DELIVERY,
};

/**
* Regexp type: /H - header, /M - mime, /U - url
*/
@@ -161,6 +172,21 @@ struct config_file {
unsigned int memcached_maxerrors; /**< maximum number of errors */
unsigned int memcached_connect_timeout; /**< connection timeout */

gboolean lmtp_enable; /**< is lmtp agent is enabled */
char *lmtp_host; /**< host for lmtp agent */
struct in_addr lmtp_addr; /**< bind address for lmtp */
uint16_t lmtp_port; /**< bind port for lmtp agent */
uint16_t lmtp_family; /**< bind family for lmtp agent */
char *lmtp_metric; /**< metric to use in lmtp module */

gboolean delivery_enable; /**< is delivery agent is enabled */
char *deliver_host; /**< host for mail deliviring */
struct in_addr deliver_addr; /**< its address */
uint16_t deliver_port; /**< port for deliviring */
uint16_t deliver_family; /**< socket family for delivirnig */
char *deliver_agent_path; /**< deliver to pipe instead of socket */
gboolean deliver_lmtp; /**< use LMTP instead of SMTP */

LIST_HEAD (modulesq, perl_module) perl_modules; /**< linked list of perl modules to load */

LIST_HEAD (headersq, filter) header_filters; /**< linked list of all header's filters */
@@ -193,10 +219,10 @@ int add_memcached_server (struct config_file *cf, char *str);
* Parse bind credits
* @param cf config file to use
* @param str line that presents bind line
* @param is_control flag that defines whether this credits are for controller
* @param type type of credits
* @return 1 if line was successfully parsed and 0 in case of error
*/
int parse_bind_line (struct config_file *cf, char *str, char is_control);
int parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type);

/**
* Init default values

+ 4
- 0
src/cfg_file.l Wyświetl plik

@@ -47,6 +47,10 @@ required_score return REQUIRED_SCORE;
function return FUNCTION;
control return CONTROL;
password return PASSWORD;
lmtp return LMTP;
enabled return ENABLED;
delivery return DELIVERY;
agent return AGENT;

statfile return STATFILE;
alias return ALIAS;

+ 84
- 2
src/cfg_file.y Wyświetl plik

@@ -43,6 +43,7 @@ struct statfile *cur_statfile = NULL;
%token LOGGING LOG_TYPE LOG_TYPE_CONSOLE LOG_TYPE_SYSLOG LOG_TYPE_FILE
%token LOG_LEVEL LOG_LEVEL_DEBUG LOG_LEVEL_INFO LOG_LEVEL_WARNING LOG_LEVEL_ERROR LOG_FACILITY LOG_FILENAME
%token STATFILE ALIAS PATTERN WEIGHT STATFILE_POOL_SIZE SIZE TOKENIZER CLASSIFIER
%token DELIVERY LMTP ENABLED AGENT

%type <string> STRING
%type <string> VARIABLE
@@ -84,6 +85,8 @@ command :
| logging
| statfile
| statfile_pool_size
| lmtp
| delivery
;

tempdir :
@@ -125,7 +128,7 @@ controlcmd:

controlsock:
BINDSOCK EQSIGN bind_cred {
if (!parse_bind_line (cfg, $3, 1)) {
if (!parse_bind_line (cfg, $3, CRED_CONTROL)) {
yyerror ("yyparse: parse_bind_line");
YYERROR;
}
@@ -141,7 +144,7 @@ controlpassword:

bindsock:
BINDSOCK EQSIGN bind_cred {
if (!parse_bind_line (cfg, $3, 0)) {
if (!parse_bind_line (cfg, $3, CRED_NORMAL)) {
yyerror ("yyparse: parse_bind_line");
YYERROR;
}
@@ -659,6 +662,85 @@ statfile_pool_size:
cfg->max_statfile_size = $3;
}
;

lmtp:
LMTP OBRACE lmtpbody EBRACE
;

lmtpbody:
lmtpcmd SEMICOLON
| lmtpbody lmtpcmd SEMICOLON
;

lmtpcmd:
lmtpenabled
| lmtpsock
| lmtpmetric
;

lmtpenabled:
ENABLED EQSIGN FLAG {
cfg->lmtp_enable = $3;
}
;

lmtpsock:
BINDSOCK EQSIGN bind_cred {
if (!parse_bind_line (cfg, $3, CRED_LMTP)) {
yyerror ("yyparse: parse_bind_line");
YYERROR;
}
free ($3);
}
;
lmtpmetric:
METRIC EQSIGN QUOTEDSTRING {
cfg->lmtp_metric = memory_pool_strdup (cfg->cfg_pool, $3);
}
;

delivery:
DELIVERY OBRACE deliverybody EBRACE
;

deliverybody:
deliverycmd SEMICOLON
| deliverybody deliverycmd SEMICOLON
;

deliverycmd:
deliveryenabled
| deliverysock
| deliveryagent
| deliverylmtp
;

deliveryenabled:
ENABLED EQSIGN FLAG {
cfg->delivery_enable = $3;
}
;

deliverysock:
BINDSOCK EQSIGN bind_cred {
if (!parse_bind_line (cfg, $3, CRED_DELIVERY)) {
yyerror ("yyparse: parse_bind_line");
YYERROR;
}
free ($3);
}
;
deliverylmtp:
LMTP EQSIGN FLAG {
cfg->deliver_lmtp = $3;
}
;
deliveryagent:
AGENT EQSIGN QUOTEDSTRING {
cfg->deliver_agent_path = memory_pool_strdup (cfg->cfg_pool, $3);
}
;

%%
/*
* vi:ts=4

+ 81
- 53
src/cfg_utils.c Wyświetl plik

@@ -81,85 +81,107 @@ add_memcached_server (struct config_file *cf, char *str)
}

int
parse_bind_line (struct config_file *cf, char *str, char is_control)
parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type)
{
char *cur_tok, *err_str;
struct hostent *hent;
size_t s;
char **host;
int16_t *family, *port;
struct in_addr *addr;
if (str == NULL) return 0;
cur_tok = strsep (&str, ":");

switch (type) {
case CRED_NORMAL:
host = &cf->bind_host;
port = &cf->bind_port;
*port = DEFAULT_BIND_PORT;
family = &cf->bind_family;
addr = &cf->bind_addr;
break;
case CRED_CONTROL:
host = &cf->control_host;
port = &cf->control_port;
*port = DEFAULT_CONTROL_PORT;
family = &cf->control_family;
addr = &cf->control_addr;
break;
case CRED_LMTP:
host = &cf->lmtp_host;
port = &cf->lmtp_port;
*port = DEFAULT_LMTP_PORT;
family = &cf->lmtp_family;
addr = &cf->lmtp_addr;
break;
case CRED_DELIVERY:
host = &cf->deliver_host;
port = &cf->deliver_port;
*port = 25;
family = &cf->deliver_family;
addr = &cf->deliver_addr;
break;
}
if (cur_tok[0] == '/' || cur_tok[0] == '.') {
if (is_control) {
cf->control_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
cf->control_family = AF_UNIX;
}
else {
cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
cf->bind_family = AF_UNIX;
}
return 1;

} else {
if (str == '\0') {
if (is_control) {
cf->control_port = DEFAULT_CONTROL_PORT;
#ifdef HAVE_DIRNAME
/* Try to check path of bind credit */
struct stat st;
int fd;
char *copy = memory_pool_strdup (cf->cfg_pool, cur_tok);
if (stat (copy, &st) == -1) {
if (errno == ENOENT) {
if ((fd = open (cur_tok, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) {
yyerror ("parse_bind_line: cannot open path %s for making socket, %m", cur_tok);
return 0;
}
else {
close (fd);
unlink (cur_tok);
}
}
else {
cf->bind_port = DEFAULT_BIND_PORT;
yyerror ("parse_bind_line: cannot stat path %s for making socket, %m", cur_tok);
return 0;
}
}
else {
if (is_control) {
cf->control_port = (uint16_t)strtoul (str, &err_str, 10);
}
else {
cf->bind_port = (uint16_t)strtoul (str, &err_str, 10);
if (unlink (cur_tok) == -1) {
yyerror ("parse_bind_line: cannot remove path %s for making socket, %m", cur_tok);
return 0;
}
}
#endif
*host = memory_pool_strdup (cf->cfg_pool, cur_tok);
*family = AF_UNIX;
return 1;

} else {
if (*str != '\0') {
*port = (uint16_t)strtoul (str, &err_str, 10);
if (*err_str != '\0') {
yyerror ("parse_bind_line: cannot read numeric value: %s", err_str);
return 0;
}
}
if (is_control) {
if (!inet_aton (cur_tok, &cf->control_addr)) {
/* Try to call gethostbyname */
hent = gethostbyname (cur_tok);
if (hent == NULL) {
return 0;
}
else {
cf->control_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
memcpy((char *)&cf->control_addr, hent->h_addr, sizeof(struct in_addr));
s = strlen (cur_tok) + 1;
}
if (!inet_aton (cur_tok, addr)) {
/* Try to call gethostbyname */
hent = gethostbyname (cur_tok);
if (hent == NULL) {
return 0;
}
else {
cf->control_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
*host = memory_pool_strdup (cf->cfg_pool, cur_tok);
memcpy((char *)addr, hent->h_addr, sizeof(struct in_addr));
s = strlen (cur_tok) + 1;
}

cf->control_family = AF_INET;
}
else {
if (!inet_aton (cur_tok, &cf->bind_addr)) {
/* Try to call gethostbyname */
hent = gethostbyname (cur_tok);
if (hent == NULL) {
return 0;
}
else {
cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
memcpy((char *)&cf->bind_addr, hent->h_addr, sizeof(struct in_addr));
s = strlen (cur_tok) + 1;
}
}
else {
cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
}

cf->bind_family = AF_INET;
*host = memory_pool_strdup (cf->cfg_pool, cur_tok);
}
*family = AF_INET;

return 1;
}
@@ -191,6 +213,7 @@ init_defaults (struct config_file *cfg)
cfg->composite_symbols = g_hash_table_new (g_str_hash, g_str_equal);
cfg->statfiles = g_hash_table_new (g_str_hash, g_str_equal);
cfg->cfg_params = g_hash_table_new (g_str_hash, g_str_equal);
cfg->lmtp_metric = "default";

def_metric = memory_pool_alloc (cfg->cfg_pool, sizeof (struct metric));
def_metric->name = "default";
@@ -512,6 +535,11 @@ fill_cfg_params (struct config_file *cfg)
void
post_load_config (struct config_file *cfg)
{
if (cfg->lmtp_enable && !cfg->delivery_enable) {
yywarn ("post_load_config: lmtp is enabled, but delivery is not enabled, disabling lmtp");
cfg->lmtp_enable = FALSE;
}

g_hash_table_foreach (cfg->variables, substitute_all_variables, cfg);
g_hash_table_foreach (cfg->modules_opts, substitute_module_variables, cfg);
parse_filters_str (cfg, cfg->header_filters_str, SCRIPT_HEADER);

+ 1
- 1
src/controller.c Wyświetl plik

@@ -534,7 +534,7 @@ start_controller (struct rspamd_worker *worker)
}
else {
un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un));
if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->bind_host, un_addr)) == -1) {
if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->control_host, un_addr)) == -1) {
msg_err ("start_controller: cannot create unix listen socket. %m");
exit(-errno);
}

+ 31
- 0
src/fstring.c Wyświetl plik

@@ -91,6 +91,37 @@ fstrstr (f_str_t *orig, f_str_t *pattern)

}

/*
* Search for pattern in orig ignoring case
*/
ssize_t
fstrstri (f_str_t *orig, f_str_t *pattern)
{
register ssize_t cur = 0, pcur = 0;

if (pattern->len > orig->len) {
return -1;
}

while (cur < orig->len) {
if (tolower (*(orig->begin + cur)) == tolower (*pattern->begin)) {
while (cur < orig->len && pcur < pattern->len) {
if (tolower (*(orig->begin + cur)) != tolower (*(pattern->begin + pcur))) {
pcur = 0;
break;
}
cur ++;
pcur ++;
}
return cur - pattern->len;
}
cur ++;
}

return -1;

}

/*
* Split string by tokens
* word contains parsed word

+ 5
- 1
src/fstring.h Wyświetl plik

@@ -42,6 +42,11 @@ ssize_t fstrrchr (f_str_t *src, char c);
*/
ssize_t fstrstr (f_str_t *orig, f_str_t *pattern);

/*
* Search for pattern in orig ignoring case
*/
ssize_t fstrstri (f_str_t *orig, f_str_t *pattern);

/*
* Split string by tokens
* word contains parsed word
@@ -88,7 +93,6 @@ f_str_t* fstrgrow (memory_pool_t *pool, f_str_t *orig, size_t newlen);
*/
uint32_t fstrhash (f_str_t *str);


/*
* Make copy of string to 0-terminated string
*/

+ 314
- 0
src/lmtp.c Wyświetl plik

@@ -0,0 +1,314 @@
/*
* Copyright (c) 2009, Rambler media
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

#include "config.h"
#include "buffer.h"
#include "main.h"
#include "lmtp.h"
#include "lmtp_proto.h"
#include "cfg_file.h"
#include "url.h"
#include "modules.h"
#include "message.h"

static char greetingbuf[1024];
static struct timeval io_tv;

static void write_socket (void *arg);

static
void sig_handler (int signo)
{
switch (signo) {
case SIGINT:
case SIGTERM:
_exit (1);
break;
}
}

/*
* Config reload is designed by sending sigusr to active workers and pending shutdown of them
*/
static void
sigusr_handler (int fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
/* Do not accept new connections, preparing to end worker's process */
struct timeval tv;
tv.tv_sec = SOFT_SHUTDOWN_TIME;
tv.tv_usec = 0;
event_del (&worker->sig_ev);
event_del (&worker->bind_ev);
do_reopen_log = 1;
msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
event_loopexit (&tv);
return;
}

/*
* Destructor for recipients list
*/
static void
rcpt_destruct (void *pointer)
{
struct worker_task *task = (struct worker_task *)pointer;

if (task->rcpt) {
g_list_free (task->rcpt);
}
}

/*
* Free all structures of lmtp proto
*/
static void
free_task (struct rspamd_lmtp_proto *lmtp)
{
GList *part;
struct mime_part *p;

if (lmtp) {
msg_debug ("free_task: free pointer %p", lmtp->task);
if (lmtp->task->memc_ctx) {
memc_close_ctx (lmtp->task->memc_ctx);
}
while ((part = g_list_first (lmtp->task->parts))) {
lmtp->task->parts = g_list_remove_link (lmtp->task->parts, part);
p = (struct mime_part *)part->data;
g_byte_array_free (p->content, FALSE);
g_list_free_1 (part);
}
memory_pool_delete (lmtp->task->task_pool);
/* Plan dispatcher shutdown */
lmtp->task->dispatcher->wanna_die = 1;
close (lmtp->task->sock);
g_free (lmtp->task);
g_free (lmtp);
}
}

/*
* Callback that is called when there is data to read in buffer
*/
static void
read_socket (f_str_t *in, void *arg)
{
struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
struct worker_task *task = lmtp->task;
ssize_t r;

switch (task->state) {
case READ_COMMAND:
case READ_HEADER:
if (read_lmtp_input_line (lmtp, in) != 0) {
msg_info ("read_lmtp_socket: closing lmtp connection due to protocol error");
lmtp->task->state = CLOSING_CONNECTION;
}
/* Task was read, recall read handler once more with new state to process message and write reply */
if (task->state == READ_MESSAGE) {
read_socket (in, arg);
}
break;
case READ_MESSAGE:
r = process_message (lmtp->task);
r = process_filters (lmtp->task);
if (r == -1) {
task->last_error = "Filter processing error";
task->error_code = LMTP_FAILURE;
task->state = WRITE_ERROR;
write_socket (lmtp);
}
else if (r == 0) {
task->state = WAIT_FILTER;
rspamd_dispatcher_pause (lmtp->task->dispatcher);
}
else {
process_statfiles (lmtp->task);
task->state = WRITE_REPLY;
write_socket (lmtp);
}
break;
}
}

/*
* Callback for socket writing
*/
static void
write_socket (void *arg)
{
struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
switch (lmtp->task->state) {
case WRITE_REPLY:
write_lmtp_reply (lmtp);
lmtp->task->state = CLOSING_CONNECTION;
break;
case WRITE_ERROR:
write_lmtp_reply (lmtp);
lmtp->task->state = CLOSING_CONNECTION;
break;
case CLOSING_CONNECTION:
msg_debug ("lmtp_write_socket: normally closing connection");
free_task (lmtp);
break;
}
}

/*
* Called if something goes wrong
*/
static void
err_socket (GError *err, void *arg)
{
struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
msg_info ("lmtp_err_socket: abnormally closing connection, error: %s", err->message);
/* Free buffers */
free_task (lmtp);
}

/*
* Accept new connection and construct task
*/
static void
accept_socket (int fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
struct sockaddr_storage ss;
struct worker_task *new_task;
struct rspamd_lmtp_proto *lmtp;
socklen_t addrlen = sizeof(ss);
int nfd, on = 1;
struct linger linger;

if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
return;
}
if (event_make_socket_nonblocking(fd) < 0) {
return;
}

/* Socket options */
setsockopt (nfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on));
setsockopt (nfd, SOL_SOCKET, SO_REUSEADDR, (void *)&on, sizeof(on));
linger.l_onoff = 1;
linger.l_linger = 2;
setsockopt (nfd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger));

lmtp = g_malloc (sizeof (struct rspamd_lmtp_proto));
new_task = g_malloc (sizeof (struct worker_task));
bzero (new_task, sizeof (struct worker_task));
new_task->worker = worker;
new_task->state = READ_COMMAND;
new_task->sock = nfd;
new_task->cfg = worker->srv->cfg;
TAILQ_INIT (&new_task->urls);
new_task->task_pool = memory_pool_new (memory_pool_get_size ());
/* Add destructor for recipients list (it would be better to use anonymous function here */
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)rcpt_destruct, new_task);
new_task->results = g_hash_table_new (g_str_hash, g_str_equal);
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results);
worker->srv->stat->connections_count ++;
lmtp->task = new_task;
lmtp->state = LMTP_READ_LHLO;

/* Set up dispatcher */
new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket,
write_socket, err_socket, &io_tv,
(void *)lmtp);
rspamd_dispatcher_write (lmtp->task->dispatcher, greetingbuf, strlen (greetingbuf), FALSE);
}

/*
* Start lmtp worker process
*/
void
start_lmtp_worker (struct rspamd_worker *worker)
{
struct sigaction signals;
int listen_sock, i;
struct sockaddr_un *un_addr;
char *hostbuf;
long int hostmax;

worker->srv->pid = getpid ();
worker->srv->type = TYPE_LMTP;
event_init ();
g_mime_init (0);

init_signals (&signals, sig_handler);
sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);

/* SIGUSR2 handler */
signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
signal_add (&worker->sig_ev, NULL);
/* Create listen socket */
if (worker->srv->cfg->lmtp_family == AF_INET) {
if ((listen_sock = make_socket (&worker->srv->cfg->lmtp_addr, worker->srv->cfg->lmtp_port)) == -1) {
msg_err ("start_lmtp: cannot create tcp listen socket. %m");
exit(-errno);
}
}
else {
un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un));
if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->lmtp_host, un_addr)) == -1) {
msg_err ("start_lmtp: cannot create unix listen socket. %m");
exit(-errno);
}
}
if (listen (listen_sock, -1) == -1) {
msg_err ("start_lmtp: cannot listen on socket. %m");
exit(-errno);
}
/* Accept event */
event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
event_add(&worker->bind_ev, NULL);

/* Perform modules configuring */
for (i = 0; i < MODULES_NUM; i ++) {
modules[i].module_config_func (worker->srv->cfg);
}

/* Fill hostname buf */
hostmax = sysconf (_SC_HOST_NAME_MAX) + 1;
hostbuf = alloca (hostmax);
gethostname (hostbuf, hostmax);
hostbuf[hostmax - 1] = '\0';
snprintf (greetingbuf, sizeof (greetingbuf), "%d rspamd version %s LMTP on %s Ready\r\n", LMTP_OK, RVERSION, hostbuf);

/* Send SIGUSR2 to parent */
kill (getppid (), SIGUSR2);

io_tv.tv_sec = WORKER_IO_TIMEOUT;
io_tv.tv_usec = 0;

event_loop (0);
}

/*
* vi:ts=4
*/

+ 20
- 0
src/lmtp.h Wyświetl plik

@@ -0,0 +1,20 @@
#ifndef RSPAMD_LMTP_H
#define RSPAMD_LMTP_H

#include "config.h"
#include "main.h"

#define LMTP_GREETING 220
#define LMTP_QUIT 221
#define LMTP_OK 250
#define LMTP_DATA 354
#define LMTP_ERROR_PROCESS 500
#define LMTP_FAILURE 530
#define LMTP_AUTH_ERROR 503
#define LMTP_BAD_CMD 503
#define LMTP_NO_RCPT 554
#define LMTP_TEMP_FAIL 421

void start_lmtp_worker (struct rspamd_worker *worker);

#endif

+ 380
- 0
src/lmtp_proto.c Wyświetl plik

@@ -0,0 +1,380 @@
/*
* Copyright (c) 2009, Rambler media
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

#include "config.h"
#include "main.h"
#include "cfg_file.h"
#include "lmtp.h"
#include "lmtp_proto.h"

/* Max line size as it is defined in rfc2822 */
#define OUTBUFSIZ 1000

/* LMTP commands */
static f_str_t lhlo_command = {
.begin = "LHLO",
.len = sizeof ("LHLO") - 1
};
static f_str_t mail_command = {
.begin = "MAIL FROM:",
.len = sizeof ("MAIL FROM:") - 1
};
static f_str_t rcpt_command = {
.begin = "RCPT TO:",
.len = sizeof ("RCPT TO:") - 1
};
static f_str_t data_command = {
.begin = "DATA",
.len = sizeof ("DATA") - 1
};
static f_str_t data_dot = {
.begin = ".\r\n",
.len = sizeof (".\r\n") - 1
};

static void
out_lmtp_reply (struct rspamd_lmtp_proto *lmtp, int code, char *rcode, char *msg)
{
char outbuf[OUTBUFSIZ];
int r;
if (*rcode == '\0') {
r = snprintf (outbuf, OUTBUFSIZ, "%d %s\r\n", code, msg);
}
else {
r = snprintf (outbuf, OUTBUFSIZ, "%d %s %s\r\n", code, rcode, msg);
}
rspamd_dispatcher_write (lmtp->task->dispatcher, outbuf, r, FALSE);
}

int
read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t *line)
{
char *c, *rcpt;
unsigned int i = 0, l = 0, size;

switch (lmtp->state) {
case LMTP_READ_LHLO:
/* Search LHLO line */
if ((i = fstrstri (line, &lhlo_command)) == -1) {
msg_info ("read_lmtp_input_line: LHLO expected but not found");
out_lmtp_reply (lmtp, LMTP_BAD_CMD, "5.0.0", "Need LHLO here");
return -1;
}
else {
i += lhlo_command.len;
c = line->begin + i;
/* Skip spaces */
while (isspace (*c) && i < line->len) {
i ++;
c ++;
}
lmtp->task->helo = memory_pool_alloc (lmtp->task->task_pool, line->len - i + 1);
/* Strlcpy makes string null terminated by design */
g_strlcpy (lmtp->task->helo, c, line->len - i + 1);
lmtp->state = LMTP_READ_FROM;
out_lmtp_reply (lmtp, LMTP_OK, "", "Ok");
return 0;
}
break;
case LMTP_READ_FROM:
/* Search MAIL FROM: line */
if ((i = fstrstri (line, &mail_command)) == -1) {
msg_info ("read_lmtp_input_line: MAIL expected but not found");
out_lmtp_reply (lmtp, LMTP_BAD_CMD, "5.0.0", "Need MAIL here");
return -1;
}
else {
i += mail_command.len;
c = line->begin + i;
/* Get data from brackets (<>)*/
while (*c++ != '<' && i < line->len) {
i ++;
}
while (*c != '>' && i < line->len) {
l ++;
c ++;
i ++;
}

lmtp->task->from = memory_pool_alloc (lmtp->task->task_pool, l + 1);
/* Strlcpy makes string null terminated by design */
g_strlcpy (lmtp->task->from, c - l, l + 1);
lmtp->state = LMTP_READ_RCPT;
out_lmtp_reply (lmtp, LMTP_OK, "2.1.0", "Sender ok");
return 0;
}
break;
case LMTP_READ_RCPT:
/* Search RCPT_TO: line */
if ((i = fstrstri (line, &rcpt_command)) == -1) {
msg_info ("read_lmtp_input_line: RCPT expected but not found");
out_lmtp_reply (lmtp, LMTP_BAD_CMD, "5.0.0", "Need RCPT here");
return -1;
}
else {
i += rcpt_command.len;
c = line->begin + i;
/* Get data from brackets (<>)*/
while (*c++ != '<' && i < line->len) {
i ++;
}
while (*c != '>' && i < line->len) {
l ++;
c ++;
i ++;
}
rcpt = memory_pool_alloc (lmtp->task->task_pool, l + 1);
/* Strlcpy makes string null terminated by design */
g_strlcpy (rcpt, c - l, l + 1);
lmtp->task->rcpt = g_list_prepend (lmtp->task->rcpt, rcpt);
lmtp->state = LMTP_READ_DATA;
out_lmtp_reply (lmtp, LMTP_OK, "2.1.0", "Recipient ok");
return 0;
}
break;
case LMTP_READ_DATA:
/* Search DATA line */
if ((i = fstrstri (line, &data_command)) == -1) {
msg_info ("read_lmtp_input_line: DATA expected but not found");
out_lmtp_reply (lmtp, LMTP_BAD_CMD, "5.0.0", "Need DATA here");
return -1;
}
else {
i += rcpt_command.len;
c = line->begin + i;
/* Skip spaces */
while (isspace (*c++)) {
i ++;
}
rcpt = memory_pool_alloc (lmtp->task->task_pool, line->len - i + 1);
/* Strlcpy makes string null terminated by design */
g_strlcpy (rcpt, c, line->len - i + 1);
lmtp->task->rcpt = g_list_prepend (lmtp->task->rcpt, rcpt);
lmtp->state = LMTP_READ_MESSAGE;
out_lmtp_reply (lmtp, LMTP_DATA, "", "Enter message, ending with \".\" on a line by itself");
lmtp->task->msg = fstralloc (lmtp->task->task_pool, BUFSIZ);
return 0;
}
break;
case LMTP_READ_MESSAGE:
if (strncmp (line->begin, data_dot.begin, line->len) == 0) {
lmtp->state = LMTP_READ_DOT;
lmtp->task->state = READ_MESSAGE;
return 0;
}
else {
l = lmtp->task->msg->len;
size = lmtp->task->msg->size;
if (l + line->len > size) {
/* Grow buffer */
if (line->len > size) {
size += line->len << 1;
}
else {
/* size *= 2 */
size <<= 1;
}
lmtp->task->msg = fstrgrow (lmtp->task->task_pool, lmtp->task->msg, size);
}
fstrcat (lmtp->task->msg, line);
return 0;
}
break;
case LMTP_READ_DOT:
/* We have some input after reading dot, close connection as we have no currently support of multiply
* messages per session
*/
out_lmtp_reply (lmtp, LMTP_QUIT, "", "Bye");
return 0;
break;
}
}

static char*
format_lda_args (struct worker_task *task)
{
char *arg, *res, *c, *r;
size_t len;
GList *rcpt;
gboolean got_args = FALSE;

c = task->cfg->deliver_agent_path;
/* Find first arg */
if ((c = strchr (c, ' ')) == NULL) {
return task->cfg->deliver_agent_path;
}
/* Calculate length of result string */
len = strlen (task->cfg->deliver_agent_path);
while (*c) {
if (*c == '%') {
c++;
switch (*c) {
case 'f':
/* Insert from */
len += strlen (task->from) - 2;
break;
case 'r':
/* Insert list of recipients */
rcpt = g_list_first (task->rcpt);
len -= 2;
while (rcpt) {
len += strlen ((char *)rcpt->data) + 1;
rcpt = g_list_next (rcpt);
}
break;
}
}
c ++;
len ++;
}
res = memory_pool_alloc (task->task_pool, len + 1);
r = res;
c = task->cfg->deliver_agent_path;
while (*c) {
if (*c == ' ') {
got_args = TRUE;
}
if (got_args && *c == '%') {
switch (*(c + 1)) {
case 'f':
/* Insert from */
c += 2;
len = strlen (task->from);
memcpy (r, task->from, len);
r += len;
break;
case 'r':
/* Insert list of recipients */
c += 2;
rcpt = g_list_first (task->rcpt);
while (rcpt) {
len = strlen ((char *)rcpt->data) + 1;
memcpy (r, rcpt->data, len);
r += len;
*r++ = ' ';
rcpt = g_list_next (rcpt);
}
break;
default:
*r = *c;
r ++;
c ++;
break;
}
}
else {
*r = *c;
r ++;
c ++;
}
}

return res;
}

static int
lmtp_deliver_lda (struct worker_task *task)
{
char *args;
FILE *lda;
GMimeStream *stream;
int rc, ecode;

if ((args = format_lda_args (task)) == NULL) {
return -1;
}

lda = popen (args, "w");
if (lda == NULL) {
msg_info ("lmtp_deliver_lda: cannot deliver to lda, %m");
return -1;
}

stream = g_mime_stream_file_new (lda);

if (g_mime_object_write_to_stream ((GMimeObject *)task->message, stream) == -1) {
msg_info ("lmtp_deliver_lda: cannot write stream to lda");
return -1;
}

rc = pclose (lda);
if (rc == -1) {
msg_info ("lmtp_deliver_lda: lda returned error code");
return -1;
}
else if (WIFEXITED (rc)) {
ecode = WEXITSTATUS (rc);
if (ecode == 0) {
return 0;
}
else {
msg_info ("lmtp_deliver_lda: lda returned error code %d", ecode);
return -1;
}
}
}

int
lmtp_deliver_message (struct worker_task *task)
{
if (task->cfg->deliver_agent_path != NULL) {
/* Do deliver to LDA */
return lmtp_deliver_lda (task);
}
else {
/* XXX: do lmtp/smtp client */
return -1;
}
}

int
write_lmtp_reply (struct rspamd_lmtp_proto *lmtp)
{
int r;
char outbuf[OUTBUFSIZ];

msg_debug ("write_lmtp_reply: writing reply to client");
if (lmtp->task->error_code != 0) {
out_lmtp_reply (lmtp, lmtp->task->error_code, "", lmtp->task->last_error);
}
else {
/* Do delivery */
if (lmtp_deliver_message (lmtp->task) == -1) {
out_lmtp_reply (lmtp, LMTP_FAILURE, "", "Delivery failure");
return -1;
}
else {
out_lmtp_reply (lmtp, LMTP_OK, "", "Delivery completed");
}
}

return 0;
}

/*
* vi:ts=4
*/

+ 44
- 0
src/lmtp_proto.h Wyświetl plik

@@ -0,0 +1,44 @@
#ifndef RSPAMD_LMTP_PROTO_H
#define RSPAMD_LMTP_PROTO_H

#include "config.h"

struct worker_task;

enum lmtp_state {
LMTP_READ_LHLO,
LMTP_READ_FROM,
LMTP_READ_RCPT,
LMTP_READ_DATA,
LMTP_READ_MESSAGE,
LMTP_READ_DOT,
};

struct rspamd_lmtp_proto {
struct worker_task *task;
enum lmtp_state state;
};

/**
* Read one line of user's input for specified task
* @param lmtp lmtp object
* @param line line of user's input
* @return 0 if line was successfully parsed and -1 if we have protocol error
*/
int read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t *line);

/**
* Deliver message via lmtp/smtp or pipe to LDA
* @param task task object
* @return 0 if we wrote message and -1 if there was some error
*/
int lmtp_deliver_message (struct worker_task *task);

/**
* Write reply for specified lmtp object
* @param lmtp lmtp object
* @return 0 if we wrote reply and -1 if there was some error
*/
int write_lmtp_reply (struct rspamd_lmtp_proto *lmtp);

#endif

+ 14
- 3
src/main.c Wyświetl plik

@@ -27,6 +27,7 @@
#include "cfg_file.h"
#include "util.h"
#include "perl.h"
#include "lmtp.h"

/* 2 seconds to fork new process in place of dead one */
#define SOFT_FORK_TIME 2
@@ -178,6 +179,11 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum pro
msg_info ("fork_worker: starting controller process %d", getpid ());
start_controller (cur);
break;
case TYPE_LMTP:
setproctitle ("lmtp process");
pidfile_close (rspamd->pfh);
msg_info ("fork_worker: starting lmtp process %d", getpid ());
start_lmtp_worker (cur);
case TYPE_WORKER:
default:
setproctitle ("worker process");
@@ -368,6 +374,11 @@ main (int argc, char **argv, char **env)
if (cfg->controller_enabled) {
fork_worker (rspamd, listen_sock, 0, TYPE_CONTROLLER);
}
/* Start lmtp if enabled */
if (cfg->lmtp_enable) {
fork_worker (rspamd, listen_sock, 0, TYPE_LMTP);
}

/* Signal processing cycle */
for (;;) {
@@ -394,17 +405,17 @@ main (int argc, char **argv, char **env)
if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
/* Normal worker termination, do not fork one more */
msg_info ("main: %s process %d terminated normally",
(cur->type == TYPE_WORKER) ? "worker" : "controller", cur->pid);
(cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid);
}
else {
if (WIFSIGNALED (res)) {
msg_warn ("main: %s process %d terminated abnormally by signal: %d",
(cur->type == TYPE_WORKER) ? "worker" : "controller",
(cur->type != TYPE_WORKER) ? "controller" : "worker",
cur->pid, WTERMSIG(res));
}
else {
msg_warn ("main: %s process %d terminated abnormally",
(cur->type == TYPE_WORKER) ? "worker" : "controller", cur->pid);
(cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid);
}
/* Fork another worker in replace of dead one */
delay_fork (cur->type);

+ 3
- 0
src/main.h Wyświetl plik

@@ -22,6 +22,8 @@
#define SOFT_SHUTDOWN_TIME 60
/* Default metric name */
#define DEFAULT_METRIC "default"
/* 60 seconds for worker's IO */
#define WORKER_IO_TIMEOUT 60

/* Logging in postfix style */
#define msg_err g_critical
@@ -36,6 +38,7 @@ enum process_type {
TYPE_MAIN,
TYPE_WORKER,
TYPE_CONTROLLER,
TYPE_LMTP,
};

/**

+ 0
- 2
src/worker.c Wyświetl plik

@@ -40,8 +40,6 @@
#include <perl.h> /* from the Perl distribution */

#define TASK_POOL_SIZE 4095
/* 60 seconds for worker's IO */
#define WORKER_IO_TIMEOUT 60

const f_str_t CRLF = {
/* begin */"\r\n",

Ładowanie…
Anuluj
Zapisz