diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2010-05-26 18:54:44 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2010-05-26 18:54:44 +0400 |
commit | 2a5690e7c12ac78c8ba8bd9f6e7d0a60c577201b (patch) | |
tree | afaef5e7bd71e05102901dd5e6ef267a4bdead29 /src | |
parent | d608ddf90823ba1fb60bed510751fb219f440f0e (diff) | |
download | rspamd-2a5690e7c12ac78c8ba8bd9f6e7d0a60c577201b.tar.gz rspamd-2a5690e7c12ac78c8ba8bd9f6e7d0a60c577201b.zip |
* Continue implementing smtp proxy
Diffstat (limited to 'src')
-rw-r--r-- | src/buffer.c | 5 | ||||
-rw-r--r-- | src/cfg_xml.c | 7 | ||||
-rw-r--r-- | src/logger.c | 3 | ||||
-rw-r--r-- | src/main.c | 9 | ||||
-rw-r--r-- | src/main.h | 1 | ||||
-rw-r--r-- | src/smtp.c | 194 | ||||
-rw-r--r-- | src/smtp.h | 13 |
7 files changed, 222 insertions, 10 deletions
diff --git a/src/buffer.c b/src/buffer.c index 1749cb624..ec435bc83 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -384,6 +384,11 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, void *data, size_t len, gbo rspamd_buffer_t *newbuf; newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); + if (len == 0) { + /* Assume NULL terminated */ + len = strlen ((char *)data); + } + if (!allocated) { newbuf->data = fstralloc (d->pool, len); diff --git a/src/cfg_xml.c b/src/cfg_xml.c index 02f2519f0..18314106b 100644 --- a/src/cfg_xml.c +++ b/src/cfg_xml.c @@ -633,6 +633,10 @@ worker_handle_type (struct config_file *cfg, struct rspamd_xml_userdata *ctx, GH wrk->type = TYPE_LMTP; wrk->has_socket = TRUE; } + else if (g_ascii_strcasecmp (data, "smtp") == 0) { + wrk->type = TYPE_SMTP; + wrk->has_socket = TRUE; + } else if (g_ascii_strcasecmp (data, "fuzzy") == 0) { wrk->type = TYPE_FUZZY; wrk->has_socket = FALSE; @@ -1713,6 +1717,9 @@ xml_dump_workers (struct config_file *cfg, FILE *f) case TYPE_LMTP: fprintf (f, " <type>lmtp</type>" EOL); break; + case TYPE_SMTP: + fprintf (f, " <type>smtp</type>" EOL); + break; } escaped_str = g_markup_escape_text (wrk->bind_host, -1); fprintf (f, " <bind_socket>%s</bind_socket>" EOL, escaped_str); diff --git a/src/logger.c b/src/logger.c index 72bb9da73..a27cc27fc 100644 --- a/src/logger.c +++ b/src/logger.c @@ -496,6 +496,9 @@ file_log_function (const gchar * log_domain, const gchar *function, GLogLevelFla case TYPE_LMTP: cptype = "lmtp"; break; + case TYPE_SMTP: + cptype = "smtp"; + break; case TYPE_FUZZY: cptype = "fuzzy"; break; diff --git a/src/main.c b/src/main.c index 33cc130d3..844a76925 100644 --- a/src/main.c +++ b/src/main.c @@ -27,6 +27,7 @@ #include "cfg_file.h" #include "util.h" #include "lmtp.h" +#include "smtp.h" #include "fuzzy_storage.h" #include "cfg_xml.h" @@ -355,6 +356,12 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf) msg_info ("starting lmtp process %P", getpid ()); start_lmtp_worker (cur); break; + case TYPE_SMTP: + setproctitle ("smtp process"); + pidfile_close (rspamd->pfh); + msg_info ("starting smtp process %P", getpid ()); + start_smtp_worker (cur); + break; case TYPE_FUZZY: setproctitle ("fuzzy storage"); pidfile_close (rspamd->pfh); @@ -559,6 +566,8 @@ get_process_type (enum process_type type) return "controller"; case TYPE_LMTP: return "lmtp"; + case TYPE_SMTP: + return "smtp"; } return NULL; diff --git a/src/main.h b/src/main.h index 5a30273d7..49d18aad2 100644 --- a/src/main.h +++ b/src/main.h @@ -47,6 +47,7 @@ enum process_type { TYPE_WORKER, TYPE_CONTROLLER, TYPE_LMTP, + TYPE_SMTP, TYPE_FUZZY }; diff --git a/src/smtp.c b/src/smtp.c index de50f21e4..116089ee1 100644 --- a/src/smtp.c +++ b/src/smtp.c @@ -33,6 +33,9 @@ /* Max line size as it is defined in rfc2822 */ #define OUTBUFSIZ 1000 +/* SMTP error messages */ + + static sig_atomic_t wanna_die = 0; @@ -63,6 +66,41 @@ sig_handler (int signo, siginfo_t *info, void *unused) } } +char * +make_smtp_error (struct smtp_session *session, int error_code, const char *format, ...) +{ + va_list vp; + char *result = NULL, *p; + size_t len; + + va_start (vp, format); + len = g_printf_string_upper_bound (format, vp); + result = memory_pool_alloc (session->pool, len + sizeof ("65535 ")); + p = result + snprintf (result, len, "%d ", error_code); + vsnprintf (p, len - (p - result), format, vp); + va_end (vp); + + return result; +} + +static void +free_smtp_session (gpointer arg) +{ + struct smtp_session *session = arg; + + if (session) { + if (session->task) { + free_task (session->task, FALSE); + } + if (session->dispatcher) { + rspamd_remove_dispatcher (session->dispatcher); + } + memory_pool_delete (session->pool); + close (session->sock); + g_free (session); + } +} + /* * Config reload is designed by sending sigusr to active workers and pending shutdown of them */ @@ -84,6 +122,128 @@ sigusr_handler (int fd, short what, void *arg) return; } +static gboolean +read_smtp_command (struct smtp_session *session, f_str_t *line) +{ + /* XXX: write dialog implementation */ + + return FALSE; +} + +/* + * Callback that is called when there is data to read in buffer + */ +static gboolean +smtp_read_socket (f_str_t * in, void *arg) +{ + struct smtp_session *session = arg; + + switch (session->state) { + case SMTP_STATE_RESOLVE_REVERSE: + case SMTP_STATE_RESOLVE_NORMAL: + case SMTP_STATE_DELAY: + session->error = make_smtp_error (session, 550, "%s Improper use of SMTP command pipelining"); + session->state = SMTP_STATE_ERROR; + break; + case SMTP_STATE_GREETING: + case SMTP_STATE_HELO: + case SMTP_STATE_FROM: + case SMTP_STATE_RCPT: + case SMTP_STATE_DATA: + return read_smtp_command (session, in); + break; + default: + session->error = make_smtp_error (session, 550, "%s Internal error"); + session->state = SMTP_STATE_ERROR; + break; + } + + return TRUE; +} + +/* + * Callback for socket writing + */ +static gboolean +smtp_write_socket (void *arg) +{ + struct smtp_session *session = arg; + + if (session->state == SMTP_STATE_WRITE_ERROR) { + rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + destroy_session (session->s); + return FALSE; + } + + return TRUE; +} + +/* + * Called if something goes wrong + */ +static void +smtp_err_socket (GError * err, void *arg) +{ + struct smtp_session *session = arg; + + msg_info ("abnormally closing connection, error: %s", err->message); + /* Free buffers */ + destroy_session (session->s); +} + +/* + * Write greeting to client + */ +static void +write_smtp_greeting (struct smtp_session *session) +{ + if (session->ctx->smtp_banner) { + rspamd_dispatcher_write (session->dispatcher, session->ctx->smtp_banner, 0, FALSE, TRUE); + } +} + +/* + * Return from a delay + */ +static void +smtp_delay_handler (int fd, short what, void *arg) +{ + struct smtp_session *session = arg; + + if (session->state == SMTP_STATE_DELAY) { + session->state = SMTP_STATE_GREETING; + write_smtp_greeting (session); + } + else { + session->state = SMTP_STATE_WRITE_ERROR; + smtp_write_socket (session); + } +} + +/* + * Make delay for a client + */ +static void +smtp_make_delay (struct smtp_session *session) +{ + struct event *tev; + struct timeval *tv; + + if (session->ctx->smtp_delay != 0 && session->state == SMTP_STATE_DELAY) { + tev = memory_pool_alloc (session->pool, sizeof (struct event)); + tv = memory_pool_alloc (session->pool, sizeof (struct timeval)); + tv->tv_sec = session->ctx->smtp_delay / 1000; + tv->tv_usec = session->ctx->smtp_delay - tv->tv_sec * 1000; + + evtimer_set (tev, smtp_delay_handler, session); + evtimer_add (tev, tv); + } + else if (session->state == SMTP_STATE_DELAY) { + session->state = SMTP_STATE_GREETING; + write_smtp_greeting (session); + } +} + /* * Handle DNS replies */ @@ -106,7 +266,7 @@ smtp_dns_cb (int result, char type, int count, int ttl, void *addresses, void *a session->hostname = memory_pool_strdup (session->pool, "tempfail"); } session->state = SMTP_STATE_DELAY; - /* XXX: make_delay (session); */ + smtp_make_delay (session); } else { if (addresses) { @@ -126,7 +286,7 @@ smtp_dns_cb (int result, char type, int count, int ttl, void *addresses, void *a session->hostname = memory_pool_strdup (session->pool, "tempfail"); } session->state = SMTP_STATE_DELAY; - /* XXX: make_delay (session); */ + smtp_make_delay (session); } else { res = 0; @@ -143,13 +303,18 @@ smtp_dns_cb (int result, char type, int count, int ttl, void *addresses, void *a session->hostname = memory_pool_strdup (session->pool, "unknown"); } session->state = SMTP_STATE_DELAY; - /* XXX: make_delay (session); */ + smtp_make_delay (session); } break; + case SMTP_STATE_ERROR: + session->state = SMTP_STATE_WRITE_ERROR; + smtp_write_socket (session); + break; default: - msg_info ("this callback is called on unknown state: %d", session->state); - session->state = SMTP_STATE_DELAY; - /* XXX: make_delay (session); */ + /* + * This callback is called on unknown state, usually this indicates + * an error (invalid pipelining) + */ break; } } @@ -201,6 +366,13 @@ accept_socket (int fd, short what, void *arg) g_free (session); close (nfd); } + + /* Set up dispatcher */ + session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, + smtp_read_socket, smtp_write_socket, smtp_err_socket, &session->ctx->smtp_timeout, session); + session->dispatcher->peer_addr = session->client_addr.s_addr; + /* Set up async session */ + session->s = new_async_session (session->pool, free_smtp_session, session); } @@ -343,12 +515,14 @@ config_smtp_worker (struct rspamd_worker *worker) { struct smtp_worker_ctx *ctx; char *value, *err_str; + uint32_t timeout; ctx = g_malloc0 (sizeof (struct smtp_worker_ctx)); ctx->pool = memory_pool_new (memory_pool_get_size ()); /* Set default values */ - ctx->smtp_timeout = 300 * 1000; + ctx->smtp_timeout.tv_sec = 300; + ctx->smtp_timeout.tv_usec = 0; ctx->smtp_delay = 0; ctx->smtp_banner = "220 ESMTP Ready." CRLF; @@ -365,10 +539,14 @@ config_smtp_worker (struct rspamd_worker *worker) } if ((value = g_hash_table_lookup (worker->cf->params, "smtp_timeout")) != NULL) { errno = 0; - ctx->smtp_timeout = strtoul (value, &err_str, 10); + timeout = strtoul (value, &err_str, 10); if (errno != 0 || (err_str && *err_str != '\0')) { msg_warn ("cannot parse timeout, invalid number: %s: %s", value, strerror (errno)); } + else { + ctx->smtp_timeout.tv_sec = timeout / 1000; + ctx->smtp_timeout.tv_usec = timeout - ctx->smtp_timeout.tv_sec * 1000; + } } if ((value = g_hash_table_lookup (worker->cf->params, "smtp_delay")) != NULL) { errno = 0; diff --git a/src/smtp.h b/src/smtp.h index d03bb1631..6d9d7555f 100644 --- a/src/smtp.h +++ b/src/smtp.h @@ -23,7 +23,7 @@ struct smtp_worker_ctx { memory_pool_t *pool; char *smtp_banner; uint32_t smtp_delay; - uint32_t smtp_timeout; + struct timeval smtp_timeout; gboolean use_xclient; gboolean helo_required; @@ -40,7 +40,9 @@ enum rspamd_smtp_state { SMTP_STATE_RCPT, SMTP_STATE_DATA, SMTP_STATE_EOD, - SMTP_STATE_END + SMTP_STATE_END, + SMTP_STATE_ERROR, + SMTP_STATE_WRITE_ERROR }; struct smtp_session { @@ -51,10 +53,17 @@ struct smtp_session { struct worker_task *task; struct in_addr client_addr; char *hostname; + char *error; int sock; + + struct rspamd_async_session *s; + rspamd_io_dispatcher_t *dispatcher; + struct smtp_upstream *upstream; int upstream_sock; gboolean resolved; }; +void start_smtp_worker (struct rspamd_worker *worker); + #endif |