]> source.dussan.org Git - rspamd.git/commitdiff
* Continue implementing smtp proxy
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 26 May 2010 14:54:44 +0000 (18:54 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 26 May 2010 14:54:44 +0000 (18:54 +0400)
src/buffer.c
src/cfg_xml.c
src/logger.c
src/main.c
src/main.h
src/smtp.c
src/smtp.h

index 1749cb624a8d3d4233cc64268f4da0c068982467..ec435bc836d248f7a92d7c7b1415f0b8bb978b88 100644 (file)
@@ -384,6 +384,11 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, void *data, size_t len, gbo
        rspamd_buffer_t                *newbuf;
 
        newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
+       if (len == 0) {
+               /* Assume NULL terminated */
+               len = strlen ((char *)data);
+       }
+
        if (!allocated) {
                newbuf->data = fstralloc (d->pool, len);
 
index 02f2519f09a4463a861d84049a40e77462a0ef81..18314106b67c31f7314388f97996107543b9d911 100644 (file)
@@ -633,6 +633,10 @@ worker_handle_type (struct config_file *cfg, struct rspamd_xml_userdata *ctx, GH
                wrk->type = TYPE_LMTP;
                wrk->has_socket = TRUE;
        }
+       else if (g_ascii_strcasecmp (data, "smtp") == 0) {
+               wrk->type = TYPE_SMTP;
+               wrk->has_socket = TRUE;
+       }
        else if (g_ascii_strcasecmp (data, "fuzzy") == 0) {
                wrk->type = TYPE_FUZZY;
                wrk->has_socket = FALSE;
@@ -1713,6 +1717,9 @@ xml_dump_workers (struct config_file *cfg, FILE *f)
                        case TYPE_LMTP:
                                fprintf (f, "  <type>lmtp</type>" EOL);
                                break;
+                       case TYPE_SMTP:
+                               fprintf (f, "  <type>smtp</type>" EOL);
+                               break;
                }
                escaped_str = g_markup_escape_text (wrk->bind_host, -1); 
                fprintf (f, "  <bind_socket>%s</bind_socket>" EOL, escaped_str);
index 72bb9da736d4219ed716d29febbe3e126de9232a..a27cc27fc0fc929ca386560329ad2546e90daa03 100644 (file)
@@ -496,6 +496,9 @@ file_log_function (const gchar * log_domain, const gchar *function, GLogLevelFla
                        case TYPE_LMTP:
                                cptype = "lmtp";
                                break;
+                       case TYPE_SMTP:
+                               cptype = "smtp";
+                               break;
                        case TYPE_FUZZY:
                                cptype = "fuzzy";
                                break;
index 33cc130d33253c30aa0c98bf6969476c96be1617..844a7692527ae336599c04e835eef903b722dd2a 100644 (file)
@@ -27,6 +27,7 @@
 #include "cfg_file.h"
 #include "util.h"
 #include "lmtp.h"
+#include "smtp.h"
 #include "fuzzy_storage.h"
 #include "cfg_xml.h"
 
@@ -355,6 +356,12 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
                                msg_info ("starting lmtp process %P", getpid ());
                                start_lmtp_worker (cur);
                                break;
+                       case TYPE_SMTP:
+                               setproctitle ("smtp process");
+                               pidfile_close (rspamd->pfh);
+                               msg_info ("starting smtp process %P", getpid ());
+                               start_smtp_worker (cur);
+                               break;
                        case TYPE_FUZZY:
                                setproctitle ("fuzzy storage");
                                pidfile_close (rspamd->pfh);
@@ -559,6 +566,8 @@ get_process_type (enum process_type type)
                return "controller";
        case TYPE_LMTP:
                return "lmtp";
+       case TYPE_SMTP:
+               return "smtp";
        }
 
        return NULL;
index 5a30273d75312af8b3e5775d26819a664d85dcb3..49d18aad260500d6443bcdb30fa5af6d7d26fc41 100644 (file)
@@ -47,6 +47,7 @@ enum process_type {
        TYPE_WORKER,
        TYPE_CONTROLLER,
        TYPE_LMTP,
+       TYPE_SMTP,
        TYPE_FUZZY
 };
 
index de50f21e4d898b12157fc47573e2814cf7b3ddab..116089ee1ac7dcf12fc84691b2354ec05dcefb43 100644 (file)
@@ -33,6 +33,9 @@
 /* Max line size as it is defined in rfc2822 */
 #define OUTBUFSIZ 1000
 
+/* SMTP error messages */
+
+
 static sig_atomic_t                    wanna_die = 0;
 
 
@@ -63,6 +66,41 @@ sig_handler (int signo, siginfo_t *info, void *unused)
        }
 }
 
+char *
+make_smtp_error (struct smtp_session *session, int error_code, const char *format, ...)
+{
+       va_list                         vp;
+       char                           *result = NULL, *p;
+       size_t                          len;
+       
+       va_start (vp, format);
+       len = g_printf_string_upper_bound (format, vp);
+       result = memory_pool_alloc (session->pool, len + sizeof ("65535 "));
+       p = result + snprintf (result, len, "%d ", error_code);
+       vsnprintf (p, len - (p - result), format, vp);
+       va_end (vp);
+
+       return result;
+}
+
+static void
+free_smtp_session (gpointer arg)
+{
+       struct smtp_session            *session = arg;
+       
+       if (session) {
+               if (session->task) {
+                       free_task (session->task, FALSE);
+               }
+               if (session->dispatcher) {
+                       rspamd_remove_dispatcher (session->dispatcher);
+               }
+               memory_pool_delete (session->pool);
+               close (session->sock);
+               g_free (session);
+       }
+}
+
 /*
  * Config reload is designed by sending sigusr to active workers and pending shutdown of them
  */
@@ -84,6 +122,128 @@ sigusr_handler (int fd, short what, void *arg)
        return;
 }
 
+static gboolean
+read_smtp_command (struct smtp_session *session, f_str_t *line)
+{
+       /* XXX: write dialog implementation */
+
+       return FALSE;
+}
+
+/*
+ * Callback that is called when there is data to read in buffer
+ */
+static                          gboolean
+smtp_read_socket (f_str_t * in, void *arg)
+{
+       struct smtp_session            *session = arg;
+
+       switch (session->state) {
+               case SMTP_STATE_RESOLVE_REVERSE:
+               case SMTP_STATE_RESOLVE_NORMAL:
+               case SMTP_STATE_DELAY:
+                       session->error = make_smtp_error (session, 550, "%s Improper use of SMTP command pipelining");
+                       session->state = SMTP_STATE_ERROR;
+                       break;
+               case SMTP_STATE_GREETING:
+               case SMTP_STATE_HELO:
+               case SMTP_STATE_FROM:
+               case SMTP_STATE_RCPT:
+               case SMTP_STATE_DATA:
+                       return read_smtp_command (session, in);
+                       break;
+               default:
+                       session->error = make_smtp_error (session, 550, "%s Internal error");
+                       session->state = SMTP_STATE_ERROR;
+                       break;
+       }
+
+       return TRUE;
+}
+
+/*
+ * Callback for socket writing
+ */
+static                          gboolean
+smtp_write_socket (void *arg)
+{
+       struct smtp_session            *session = arg;
+
+       if (session->state == SMTP_STATE_WRITE_ERROR) {
+               rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+               destroy_session (session->s);
+               return FALSE;
+       }
+       
+       return TRUE;
+}
+
+/*
+ * Called if something goes wrong
+ */
+static void
+smtp_err_socket (GError * err, void *arg)
+{
+       struct smtp_session            *session = arg;
+
+       msg_info ("abnormally closing connection, error: %s", err->message);
+       /* Free buffers */
+       destroy_session (session->s);
+}
+
+/*
+ * Write greeting to client
+ */
+static void
+write_smtp_greeting (struct smtp_session *session)
+{
+       if (session->ctx->smtp_banner) {
+               rspamd_dispatcher_write (session->dispatcher, session->ctx->smtp_banner, 0, FALSE, TRUE);
+       }
+}
+
+/*
+ * Return from a delay
+ */
+static void
+smtp_delay_handler (int fd, short what, void *arg)
+{
+       struct smtp_session            *session = arg;
+
+       if (session->state == SMTP_STATE_DELAY) {
+               session->state = SMTP_STATE_GREETING;
+               write_smtp_greeting (session);
+       }
+       else {
+               session->state = SMTP_STATE_WRITE_ERROR;
+               smtp_write_socket (session);
+       }
+}
+
+/*
+ * Make delay for a client
+ */
+static void
+smtp_make_delay (struct smtp_session *session)
+{
+       struct event                  *tev;
+       struct timeval                *tv;
+
+       if (session->ctx->smtp_delay != 0 && session->state == SMTP_STATE_DELAY) {
+               tev = memory_pool_alloc (session->pool, sizeof (struct event));
+               tv = memory_pool_alloc (session->pool, sizeof (struct timeval));
+               tv->tv_sec = session->ctx->smtp_delay / 1000;
+               tv->tv_usec = session->ctx->smtp_delay - tv->tv_sec * 1000;
+
+               evtimer_set (tev, smtp_delay_handler, session);
+               evtimer_add (tev, tv);
+       }
+       else if (session->state == SMTP_STATE_DELAY) {
+               session->state = SMTP_STATE_GREETING;
+               write_smtp_greeting (session);
+       }
+}
+
 /*
  * Handle DNS replies
  */
@@ -106,7 +266,7 @@ smtp_dns_cb (int result, char type, int count, int ttl, void *addresses, void *a
                                        session->hostname = memory_pool_strdup (session->pool, "tempfail");
                                }
                                session->state = SMTP_STATE_DELAY;
-                               /* XXX: make_delay (session); */
+                               smtp_make_delay (session);
                        }
                        else {
                                if (addresses) {
@@ -126,7 +286,7 @@ smtp_dns_cb (int result, char type, int count, int ttl, void *addresses, void *a
                                        session->hostname = memory_pool_strdup (session->pool, "tempfail");
                                }
                                session->state = SMTP_STATE_DELAY;
-                               /* XXX: make_delay (session); */
+                               smtp_make_delay (session);
                        }
                        else {
                                res = 0;
@@ -143,13 +303,18 @@ smtp_dns_cb (int result, char type, int count, int ttl, void *addresses, void *a
                                        session->hostname = memory_pool_strdup (session->pool, "unknown");
                                }
                                session->state = SMTP_STATE_DELAY;
-                               /* XXX: make_delay (session); */
+                               smtp_make_delay (session);
                        }
                        break;
+               case SMTP_STATE_ERROR:
+                       session->state = SMTP_STATE_WRITE_ERROR;
+                       smtp_write_socket (session);
+                       break;
                default:
-                       msg_info ("this callback is called on unknown state: %d", session->state);
-                       session->state = SMTP_STATE_DELAY;
-                       /* XXX: make_delay (session); */
+                       /* 
+                        * This callback is called on unknown state, usually this indicates
+                        * an error (invalid pipelining)
+                        */
                        break;
        }
 }
@@ -201,6 +366,13 @@ accept_socket (int fd, short what, void *arg)
                g_free (session);
                close (nfd);
        }
+       
+       /* Set up dispatcher */
+       session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, 
+                                                       smtp_read_socket, smtp_write_socket, smtp_err_socket, &session->ctx->smtp_timeout, session);
+       session->dispatcher->peer_addr = session->client_addr.s_addr;
+       /* Set up async session */
+       session->s = new_async_session (session->pool, free_smtp_session, session);
 
 }
 
@@ -343,12 +515,14 @@ config_smtp_worker (struct rspamd_worker *worker)
 {
        struct smtp_worker_ctx         *ctx;
        char                           *value, *err_str;
+       uint32_t                        timeout;
 
        ctx = g_malloc0 (sizeof (struct smtp_worker_ctx));
        ctx->pool = memory_pool_new (memory_pool_get_size ());
        
        /* Set default values */
-       ctx->smtp_timeout = 300 * 1000;
+       ctx->smtp_timeout.tv_sec = 300;
+       ctx->smtp_timeout.tv_usec = 0;
        ctx->smtp_delay = 0;
        ctx->smtp_banner = "220 ESMTP Ready." CRLF;
 
@@ -365,10 +539,14 @@ config_smtp_worker (struct rspamd_worker *worker)
        }
        if ((value = g_hash_table_lookup (worker->cf->params, "smtp_timeout")) != NULL) {
                errno = 0;
-               ctx->smtp_timeout = strtoul (value, &err_str, 10);
+               timeout = strtoul (value, &err_str, 10);
                if (errno != 0 || (err_str && *err_str != '\0')) {
                        msg_warn ("cannot parse timeout, invalid number: %s: %s", value, strerror (errno));
                }
+               else {
+                       ctx->smtp_timeout.tv_sec = timeout / 1000;
+                       ctx->smtp_timeout.tv_usec = timeout - ctx->smtp_timeout.tv_sec * 1000;
+               }
        }
        if ((value = g_hash_table_lookup (worker->cf->params, "smtp_delay")) != NULL) {
                errno = 0;
index d03bb1631fc59244be5a9449107be4ede1e7bc83..6d9d7555f0bfec78e46fcd596e8ea3f3e8888355 100644 (file)
@@ -23,7 +23,7 @@ struct smtp_worker_ctx {
        memory_pool_t *pool;
        char *smtp_banner;
        uint32_t smtp_delay;
-       uint32_t smtp_timeout;
+       struct timeval smtp_timeout;
 
        gboolean use_xclient;
        gboolean helo_required;
@@ -40,7 +40,9 @@ enum rspamd_smtp_state {
        SMTP_STATE_RCPT,
        SMTP_STATE_DATA,
        SMTP_STATE_EOD,
-       SMTP_STATE_END
+       SMTP_STATE_END,
+       SMTP_STATE_ERROR,
+       SMTP_STATE_WRITE_ERROR
 };
 
 struct smtp_session {
@@ -51,10 +53,17 @@ struct smtp_session {
        struct worker_task *task;
        struct in_addr client_addr;
        char *hostname;
+       char *error;
        int sock;
+       
+       struct rspamd_async_session *s;
+       rspamd_io_dispatcher_t *dispatcher;
+
        struct smtp_upstream *upstream;
        int upstream_sock;
        gboolean resolved;
 };
 
+void start_smtp_worker (struct rspamd_worker *worker);
+
 #endif