Browse Source

* Continue implementing smtp proxy

tags/0.3.1
Vsevolod Stakhov 14 years ago
parent
commit
2a5690e7c1
7 changed files with 222 additions and 10 deletions
  1. 5
    0
      src/buffer.c
  2. 7
    0
      src/cfg_xml.c
  3. 3
    0
      src/logger.c
  4. 9
    0
      src/main.c
  5. 1
    0
      src/main.h
  6. 186
    8
      src/smtp.c
  7. 11
    2
      src/smtp.h

+ 5
- 0
src/buffer.c View File

rspamd_buffer_t *newbuf; rspamd_buffer_t *newbuf;


newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
if (len == 0) {
/* Assume NULL terminated */
len = strlen ((char *)data);
}

if (!allocated) { if (!allocated) {
newbuf->data = fstralloc (d->pool, len); newbuf->data = fstralloc (d->pool, len);



+ 7
- 0
src/cfg_xml.c View File

wrk->type = TYPE_LMTP; wrk->type = TYPE_LMTP;
wrk->has_socket = TRUE; wrk->has_socket = TRUE;
} }
else if (g_ascii_strcasecmp (data, "smtp") == 0) {
wrk->type = TYPE_SMTP;
wrk->has_socket = TRUE;
}
else if (g_ascii_strcasecmp (data, "fuzzy") == 0) { else if (g_ascii_strcasecmp (data, "fuzzy") == 0) {
wrk->type = TYPE_FUZZY; wrk->type = TYPE_FUZZY;
wrk->has_socket = FALSE; wrk->has_socket = FALSE;
case TYPE_LMTP: case TYPE_LMTP:
fprintf (f, " <type>lmtp</type>" EOL); fprintf (f, " <type>lmtp</type>" EOL);
break; break;
case TYPE_SMTP:
fprintf (f, " <type>smtp</type>" EOL);
break;
} }
escaped_str = g_markup_escape_text (wrk->bind_host, -1); escaped_str = g_markup_escape_text (wrk->bind_host, -1);
fprintf (f, " <bind_socket>%s</bind_socket>" EOL, escaped_str); fprintf (f, " <bind_socket>%s</bind_socket>" EOL, escaped_str);

+ 3
- 0
src/logger.c View File

case TYPE_LMTP: case TYPE_LMTP:
cptype = "lmtp"; cptype = "lmtp";
break; break;
case TYPE_SMTP:
cptype = "smtp";
break;
case TYPE_FUZZY: case TYPE_FUZZY:
cptype = "fuzzy"; cptype = "fuzzy";
break; break;

+ 9
- 0
src/main.c View File

#include "cfg_file.h" #include "cfg_file.h"
#include "util.h" #include "util.h"
#include "lmtp.h" #include "lmtp.h"
#include "smtp.h"
#include "fuzzy_storage.h" #include "fuzzy_storage.h"
#include "cfg_xml.h" #include "cfg_xml.h"


msg_info ("starting lmtp process %P", getpid ()); msg_info ("starting lmtp process %P", getpid ());
start_lmtp_worker (cur); start_lmtp_worker (cur);
break; break;
case TYPE_SMTP:
setproctitle ("smtp process");
pidfile_close (rspamd->pfh);
msg_info ("starting smtp process %P", getpid ());
start_smtp_worker (cur);
break;
case TYPE_FUZZY: case TYPE_FUZZY:
setproctitle ("fuzzy storage"); setproctitle ("fuzzy storage");
pidfile_close (rspamd->pfh); pidfile_close (rspamd->pfh);
return "controller"; return "controller";
case TYPE_LMTP: case TYPE_LMTP:
return "lmtp"; return "lmtp";
case TYPE_SMTP:
return "smtp";
} }


return NULL; return NULL;

+ 1
- 0
src/main.h View File

TYPE_WORKER, TYPE_WORKER,
TYPE_CONTROLLER, TYPE_CONTROLLER,
TYPE_LMTP, TYPE_LMTP,
TYPE_SMTP,
TYPE_FUZZY TYPE_FUZZY
}; };



+ 186
- 8
src/smtp.c View File

/* Max line size as it is defined in rfc2822 */ /* Max line size as it is defined in rfc2822 */
#define OUTBUFSIZ 1000 #define OUTBUFSIZ 1000


/* SMTP error messages */


static sig_atomic_t wanna_die = 0; static sig_atomic_t wanna_die = 0;




} }
} }


char *
make_smtp_error (struct smtp_session *session, int error_code, const char *format, ...)
{
va_list vp;
char *result = NULL, *p;
size_t len;
va_start (vp, format);
len = g_printf_string_upper_bound (format, vp);
result = memory_pool_alloc (session->pool, len + sizeof ("65535 "));
p = result + snprintf (result, len, "%d ", error_code);
vsnprintf (p, len - (p - result), format, vp);
va_end (vp);

return result;
}

static void
free_smtp_session (gpointer arg)
{
struct smtp_session *session = arg;
if (session) {
if (session->task) {
free_task (session->task, FALSE);
}
if (session->dispatcher) {
rspamd_remove_dispatcher (session->dispatcher);
}
memory_pool_delete (session->pool);
close (session->sock);
g_free (session);
}
}

/* /*
* Config reload is designed by sending sigusr to active workers and pending shutdown of them * Config reload is designed by sending sigusr to active workers and pending shutdown of them
*/ */
return; return;
} }


static gboolean
read_smtp_command (struct smtp_session *session, f_str_t *line)
{
/* XXX: write dialog implementation */

return FALSE;
}

/*
* Callback that is called when there is data to read in buffer
*/
static gboolean
smtp_read_socket (f_str_t * in, void *arg)
{
struct smtp_session *session = arg;

switch (session->state) {
case SMTP_STATE_RESOLVE_REVERSE:
case SMTP_STATE_RESOLVE_NORMAL:
case SMTP_STATE_DELAY:
session->error = make_smtp_error (session, 550, "%s Improper use of SMTP command pipelining");
session->state = SMTP_STATE_ERROR;
break;
case SMTP_STATE_GREETING:
case SMTP_STATE_HELO:
case SMTP_STATE_FROM:
case SMTP_STATE_RCPT:
case SMTP_STATE_DATA:
return read_smtp_command (session, in);
break;
default:
session->error = make_smtp_error (session, 550, "%s Internal error");
session->state = SMTP_STATE_ERROR;
break;
}

return TRUE;
}

/*
* Callback for socket writing
*/
static gboolean
smtp_write_socket (void *arg)
{
struct smtp_session *session = arg;

if (session->state == SMTP_STATE_WRITE_ERROR) {
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
destroy_session (session->s);
return FALSE;
}
return TRUE;
}

/*
* Called if something goes wrong
*/
static void
smtp_err_socket (GError * err, void *arg)
{
struct smtp_session *session = arg;

msg_info ("abnormally closing connection, error: %s", err->message);
/* Free buffers */
destroy_session (session->s);
}

/*
* Write greeting to client
*/
static void
write_smtp_greeting (struct smtp_session *session)
{
if (session->ctx->smtp_banner) {
rspamd_dispatcher_write (session->dispatcher, session->ctx->smtp_banner, 0, FALSE, TRUE);
}
}

/*
* Return from a delay
*/
static void
smtp_delay_handler (int fd, short what, void *arg)
{
struct smtp_session *session = arg;

if (session->state == SMTP_STATE_DELAY) {
session->state = SMTP_STATE_GREETING;
write_smtp_greeting (session);
}
else {
session->state = SMTP_STATE_WRITE_ERROR;
smtp_write_socket (session);
}
}

/*
* Make delay for a client
*/
static void
smtp_make_delay (struct smtp_session *session)
{
struct event *tev;
struct timeval *tv;

if (session->ctx->smtp_delay != 0 && session->state == SMTP_STATE_DELAY) {
tev = memory_pool_alloc (session->pool, sizeof (struct event));
tv = memory_pool_alloc (session->pool, sizeof (struct timeval));
tv->tv_sec = session->ctx->smtp_delay / 1000;
tv->tv_usec = session->ctx->smtp_delay - tv->tv_sec * 1000;

evtimer_set (tev, smtp_delay_handler, session);
evtimer_add (tev, tv);
}
else if (session->state == SMTP_STATE_DELAY) {
session->state = SMTP_STATE_GREETING;
write_smtp_greeting (session);
}
}

/* /*
* Handle DNS replies * Handle DNS replies
*/ */
session->hostname = memory_pool_strdup (session->pool, "tempfail"); session->hostname = memory_pool_strdup (session->pool, "tempfail");
} }
session->state = SMTP_STATE_DELAY; session->state = SMTP_STATE_DELAY;
/* XXX: make_delay (session); */
smtp_make_delay (session);
} }
else { else {
if (addresses) { if (addresses) {
session->hostname = memory_pool_strdup (session->pool, "tempfail"); session->hostname = memory_pool_strdup (session->pool, "tempfail");
} }
session->state = SMTP_STATE_DELAY; session->state = SMTP_STATE_DELAY;
/* XXX: make_delay (session); */
smtp_make_delay (session);
} }
else { else {
res = 0; res = 0;
session->hostname = memory_pool_strdup (session->pool, "unknown"); session->hostname = memory_pool_strdup (session->pool, "unknown");
} }
session->state = SMTP_STATE_DELAY; session->state = SMTP_STATE_DELAY;
/* XXX: make_delay (session); */
smtp_make_delay (session);
} }
break; break;
case SMTP_STATE_ERROR:
session->state = SMTP_STATE_WRITE_ERROR;
smtp_write_socket (session);
break;
default: default:
msg_info ("this callback is called on unknown state: %d", session->state);
session->state = SMTP_STATE_DELAY;
/* XXX: make_delay (session); */
/*
* This callback is called on unknown state, usually this indicates
* an error (invalid pipelining)
*/
break; break;
} }
} }
g_free (session); g_free (session);
close (nfd); close (nfd);
} }
/* Set up dispatcher */
session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE,
smtp_read_socket, smtp_write_socket, smtp_err_socket, &session->ctx->smtp_timeout, session);
session->dispatcher->peer_addr = session->client_addr.s_addr;
/* Set up async session */
session->s = new_async_session (session->pool, free_smtp_session, session);


} }


{ {
struct smtp_worker_ctx *ctx; struct smtp_worker_ctx *ctx;
char *value, *err_str; char *value, *err_str;
uint32_t timeout;


ctx = g_malloc0 (sizeof (struct smtp_worker_ctx)); ctx = g_malloc0 (sizeof (struct smtp_worker_ctx));
ctx->pool = memory_pool_new (memory_pool_get_size ()); ctx->pool = memory_pool_new (memory_pool_get_size ());
/* Set default values */ /* Set default values */
ctx->smtp_timeout = 300 * 1000;
ctx->smtp_timeout.tv_sec = 300;
ctx->smtp_timeout.tv_usec = 0;
ctx->smtp_delay = 0; ctx->smtp_delay = 0;
ctx->smtp_banner = "220 ESMTP Ready." CRLF; ctx->smtp_banner = "220 ESMTP Ready." CRLF;


} }
if ((value = g_hash_table_lookup (worker->cf->params, "smtp_timeout")) != NULL) { if ((value = g_hash_table_lookup (worker->cf->params, "smtp_timeout")) != NULL) {
errno = 0; errno = 0;
ctx->smtp_timeout = strtoul (value, &err_str, 10);
timeout = strtoul (value, &err_str, 10);
if (errno != 0 || (err_str && *err_str != '\0')) { if (errno != 0 || (err_str && *err_str != '\0')) {
msg_warn ("cannot parse timeout, invalid number: %s: %s", value, strerror (errno)); msg_warn ("cannot parse timeout, invalid number: %s: %s", value, strerror (errno));
} }
else {
ctx->smtp_timeout.tv_sec = timeout / 1000;
ctx->smtp_timeout.tv_usec = timeout - ctx->smtp_timeout.tv_sec * 1000;
}
} }
if ((value = g_hash_table_lookup (worker->cf->params, "smtp_delay")) != NULL) { if ((value = g_hash_table_lookup (worker->cf->params, "smtp_delay")) != NULL) {
errno = 0; errno = 0;

+ 11
- 2
src/smtp.h View File

memory_pool_t *pool; memory_pool_t *pool;
char *smtp_banner; char *smtp_banner;
uint32_t smtp_delay; uint32_t smtp_delay;
uint32_t smtp_timeout;
struct timeval smtp_timeout;


gboolean use_xclient; gboolean use_xclient;
gboolean helo_required; gboolean helo_required;
SMTP_STATE_RCPT, SMTP_STATE_RCPT,
SMTP_STATE_DATA, SMTP_STATE_DATA,
SMTP_STATE_EOD, SMTP_STATE_EOD,
SMTP_STATE_END
SMTP_STATE_END,
SMTP_STATE_ERROR,
SMTP_STATE_WRITE_ERROR
}; };


struct smtp_session { struct smtp_session {
struct worker_task *task; struct worker_task *task;
struct in_addr client_addr; struct in_addr client_addr;
char *hostname; char *hostname;
char *error;
int sock; int sock;
struct rspamd_async_session *s;
rspamd_io_dispatcher_t *dispatcher;

struct smtp_upstream *upstream; struct smtp_upstream *upstream;
int upstream_sock; int upstream_sock;
gboolean resolved; gboolean resolved;
}; };


void start_smtp_worker (struct rspamd_worker *worker);

#endif #endif

Loading…
Cancel
Save