Bläddra i källkod

* Add initial implementation of proxy object

* Add simple smtp proxy worker
tags/0.5.0
Vsevolod Stakhov 12 år sedan
förälder
incheckning
9b8ecfb892
11 ändrade filer med 980 tillägg och 81 borttagningar
  1. 2
    1
      CMakeLists.txt
  2. 1
    0
      lib/CMakeLists.txt
  3. 238
    0
      src/proxy.c
  4. 66
    0
      src/proxy.h
  5. 3
    61
      src/smtp.c
  6. 3
    12
      src/smtp.h
  7. 2
    2
      src/smtp_proto.c
  8. 5
    1
      src/smtp_proto.h
  9. 581
    0
      src/smtp_proxy.c
  10. 57
    3
      src/smtp_utils.c
  11. 22
    1
      src/smtp_utils.h

+ 2
- 1
CMakeLists.txt Visa fil

@@ -848,6 +848,7 @@ SET(RSPAMDSRC src/modules.c
src/main.c
src/map.c
src/smtp.c
src/smtp_proxy.c
src/worker.c)

SET(TOKENIZERSSRC src/tokenizers/tokenizers.c
@@ -864,7 +865,7 @@ SET(PLUGINSSRC src/plugins/surbl.c
src/plugins/spf.c)
SET(MODULES_LIST surbl regexp chartable fuzzy_check spf)
SET(WORKERS_LIST normal controller smtp lmtp fuzzy keystorage)
SET(WORKERS_LIST normal controller smtp smtp_proxy lmtp fuzzy keystorage)

AddModules(MODULES_LIST WORKERS_LIST)


+ 1
- 0
lib/CMakeLists.txt Visa fil

@@ -63,6 +63,7 @@ SET(RSPAMDLIBSRC
../src/message.c
../src/printf.c
../src/protocol.c
../src/proxy.c
../src/radix.c
../src/settings.c
../src/spf.c

+ 238
- 0
src/proxy.c Visa fil

@@ -0,0 +1,238 @@
/* Copyright (c) 2010-2012, Vsevolod Stakhov
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

#include "config.h"
#include "main.h"
#include "proxy.h"

static void rspamd_proxy_backend_handler (gint fd, gshort what, gpointer data);
static void rspamd_proxy_client_handler (gint fd, gshort what, gpointer data);

static inline GQuark
proxy_error_quark (void)
{
return g_quark_from_static_string ("proxy-error");
}

static void
rspamd_proxy_close (rspamd_proxy_t *proxy)
{
close (proxy->cfd);
close (proxy->bfd);

event_del (&proxy->client_ev);
event_del (&proxy->backend_ev);
}

static void
rspamd_proxy_client_handler (gint fd, gshort what, gpointer data)
{
rspamd_proxy_t *proxy = data;
gint r;
GError *err = NULL;

if (what == EV_READ) {
/* Got data from client */
event_del (&proxy->client_ev);
r = read (proxy->cfd, proxy->buf, proxy->bufsize);
if (r > 0) {
/* Write this buffer to backend */
proxy->read_len = r;
proxy->buf_offset = 0;
event_del (&proxy->backend_ev);
event_set (&proxy->backend_ev, proxy->bfd, EV_WRITE, rspamd_proxy_backend_handler, proxy);
event_add (&proxy->backend_ev, proxy->tv);
}
else {
/* Error case or zero reply */
if (r < 0) {
/* Error case */
g_set_error (&err, proxy_error_quark(), r, "Client read error: %s", strerror (errno));
rspamd_proxy_close (proxy);
proxy->err_cb (err, proxy->user_data);
}
else {
/* Client closes connection */
rspamd_proxy_close (proxy);
proxy->err_cb (NULL, proxy->user_data);
}
}
}
else if (what == EV_WRITE) {
/* Can write to client */
r = write (proxy->cfd, proxy->buf + proxy->buf_offset, proxy->read_len - proxy->buf_offset);
if (r > 0) {
/* We wrote something */
proxy->buf_offset +=r;
if (proxy->buf_offset == proxy->read_len) {
/* We wrote everything */
event_del (&proxy->client_ev);
event_set (&proxy->client_ev, proxy->bfd, EV_READ, rspamd_proxy_client_handler, proxy);
event_add (&proxy->client_ev, proxy->tv);
event_del (&proxy->backend_ev);
event_set (&proxy->backend_ev, proxy->bfd, EV_READ, rspamd_proxy_client_handler, proxy);
event_add (&proxy->backend_ev, proxy->tv);
}
else {
/* Plan another write event */
event_add (&proxy->backend_ev, proxy->tv);
}
}
else {
/* Error case or zero reply */
if (r < 0) {
/* Error case */
g_set_error (&err, proxy_error_quark(), r, "Client write error: %s", strerror (errno));
rspamd_proxy_close (proxy);
proxy->err_cb (err, proxy->user_data);
}
else {
/* Client closes connection */
rspamd_proxy_close (proxy);
proxy->err_cb (NULL, proxy->user_data);
}
}
}
else {
/* Got timeout */
g_set_error (&err, proxy_error_quark(), ETIMEDOUT, "Client timeout");
rspamd_proxy_close (proxy);
proxy->err_cb (err, proxy->user_data);
}
}

static void
rspamd_proxy_backend_handler (gint fd, gshort what, gpointer data)
{
rspamd_proxy_t *proxy = data;
gint r;
GError *err;

if (what == EV_READ) {
/* Got data from backend */
event_del (&proxy->backend_ev);
r = read (proxy->cfd, proxy->buf, proxy->bufsize);
if (r > 0) {
/* Write this buffer to client */
proxy->read_len = r;
proxy->buf_offset = 0;
event_del (&proxy->client_ev);
event_set (&proxy->client_ev, proxy->bfd, EV_WRITE, rspamd_proxy_client_handler, proxy);
event_add (&proxy->client_ev, proxy->tv);
}
else {
/* Error case or zero reply */
if (r < 0) {
/* Error case */
g_set_error (&err, proxy_error_quark(), r, "Backend read error: %s", strerror (errno));
rspamd_proxy_close (proxy);
proxy->err_cb (err, proxy->user_data);
}
else {
/* Client closes connection */
rspamd_proxy_close (proxy);
proxy->err_cb (NULL, proxy->user_data);
}
}
}
else if (what == EV_WRITE) {
/* Can write to backend */
r = write (proxy->bfd, proxy->buf + proxy->buf_offset, proxy->read_len - proxy->buf_offset);
if (r > 0) {
/* We wrote something */
proxy->buf_offset +=r;
if (proxy->buf_offset == proxy->read_len) {
/* We wrote everything */
event_del (&proxy->backend_ev);
event_set (&proxy->backend_ev, proxy->bfd, EV_READ, rspamd_proxy_client_handler, proxy);
event_add (&proxy->backend_ev, proxy->tv);
event_del (&proxy->client_ev);
event_set (&proxy->client_ev, proxy->bfd, EV_READ, rspamd_proxy_client_handler, proxy);
event_add (&proxy->client_ev, proxy->tv);
}
else {
/* Plan another write event */
event_add (&proxy->backend_ev, proxy->tv);
}
}
else {
/* Error case or zero reply */
if (r < 0) {
/* Error case */
g_set_error (&err, proxy_error_quark(), r, "Backend write error: %s", strerror (errno));
rspamd_proxy_close (proxy);
proxy->err_cb (err, proxy->user_data);
}
else {
/* Client closes connection */
rspamd_proxy_close (proxy);
proxy->err_cb (NULL, proxy->user_data);
}
}
}
else {
/* Got timeout */
g_set_error (&err, proxy_error_quark(), ETIMEDOUT, "Client timeout");
rspamd_proxy_close (proxy);
proxy->err_cb (err, proxy->user_data);
}
}

/**
* Create new proxy between cfd and bfd
* @param cfd client's socket
* @param bfd backend's socket
* @param bufsize size of exchange buffer
* @param err_cb callback for erorrs or completing
* @param ud user data for callback
* @return new proxy object
*/
rspamd_proxy_t*
rspamd_create_proxy (gint cfd, gint bfd, memory_pool_t *pool, struct event_base *base,
gsize bufsize, struct timeval *tv, dispatcher_err_callback_t err_cb, gpointer ud)
{
rspamd_proxy_t *new;

new = memory_pool_alloc0 (pool, sizeof (rspamd_proxy_t));

new->cfd = cfd;
new->bfd = bfd;
new->pool = pool;
new->base = base;
new->bufsize = bufsize;
new->buf = memory_pool_alloc (pool, bufsize);
new->err_cb = err_cb;
new->user_data = ud;
new->tv = tv;

/* Set client's and backend's interfaces to read events */
event_set (&new->client_ev, new->cfd, EV_READ, rspamd_proxy_client_handler, new);
event_base_set (new->base, &new->client_ev);
event_add (&new->client_ev, new->tv);

event_set (&new->backend_ev, new->bfd, EV_READ, rspamd_proxy_backend_handler, new);
event_base_set (new->base, &new->backend_ev);
event_add (&new->backend_ev, new->tv);

return new;
}

+ 66
- 0
src/proxy.h Visa fil

@@ -0,0 +1,66 @@
/* Copyright (c) 2010-2012, Vsevolod Stakhov
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/


#ifndef PROXY_H_
#define PROXY_H_

#include "config.h"
#include "buffer.h"

/**
* @file proxy.h
* Direct asynchronous proxy implementation
*/

typedef struct rspamd_proxy_s {
struct event client_ev; /**< event for client's communication */
struct event backend_ev; /**< event for backend communication */
struct event_base *base; /**< base for event operations */
memory_pool_t *pool; /**< memory pool */
dispatcher_err_callback_t err_cb; /**< error callback */
struct event_base *ev_base; /**< event base */
gint cfd; /**< client's socket */
gint bfd; /**< backend's socket */
guint8 *buf; /**< exchange buffer */
gsize bufsize; /**< buffer size */
gint read_len; /**< read length */
gint buf_offset; /**< offset to write */
gpointer user_data; /**< user's data for callbacks */
struct timeval *tv; /**< timeout for communications */
} rspamd_proxy_t;

/**
* Create new proxy between cfd and bfd
* @param cfd client's socket
* @param bfd backend's socket
* @param bufsize size of exchange buffer
* @param err_cb callback for erorrs or completing
* @param ud user data for callback
* @return new proxy object
*/
rspamd_proxy_t* rspamd_create_proxy (gint cfd, gint bfd, memory_pool_t *pool,
struct event_base *base, gsize bufsize, struct timeval *tv,
dispatcher_err_callback_t err_cb, gpointer ud);

#endif /* PROXY_H_ */

+ 3
- 61
src/smtp.c Visa fil

@@ -45,9 +45,6 @@

#define DEFAULT_REJECT_MESSAGE "450 4.5.0 Spam message rejected"

#define XCLIENT_HOST_UNAVAILABLE "[UNAVAILABLE]"
#define XCLIENT_HOST_TEMPFAIL "[TEMPUNAVAIL]"

static gboolean smtp_write_socket (void *arg);

static sig_atomic_t wanna_die = 0;
@@ -397,7 +394,7 @@ smtp_read_socket (f_str_t * in, void *arg)
case SMTP_STATE_RESOLVE_REVERSE:
case SMTP_STATE_RESOLVE_NORMAL:
case SMTP_STATE_DELAY:
session->error = make_smtp_error (session, 550, "%s Improper use of SMTP command pipelining", "5.5.0");
session->error = make_smtp_error (session->pool, 550, "%s Improper use of SMTP command pipelining", "5.5.0");
session->state = SMTP_STATE_ERROR;
break;
case SMTP_STATE_GREETING:
@@ -444,7 +441,7 @@ smtp_read_socket (f_str_t * in, void *arg)
rspamd_dispatcher_pause (session->dispatcher);
break;
default:
session->error = make_smtp_error (session, 550, "%s Internal error", "5.5.0");
session->error = make_smtp_error (session->pool, 550, "%s Internal error", "5.5.0");
session->state = SMTP_STATE_ERROR;
break;
}
@@ -820,61 +817,6 @@ parse_smtp_banner (struct smtp_worker_ctx *ctx, const gchar *line)
}
}

static gboolean
parse_upstreams_line (struct smtp_worker_ctx *ctx, const gchar *line)
{
gchar **strv, *p, *t, *tt, *err_str;
guint32 num, i;
struct smtp_upstream *cur;
gchar resolved_path[PATH_MAX];
strv = g_strsplit_set (line, ",; ", -1);
num = g_strv_length (strv);

if (num >= MAX_UPSTREAM) {
msg_err ("cannot define %d upstreams %d is max", num, MAX_UPSTREAM);
return FALSE;
}

for (i = 0; i < num; i ++) {
p = strv[i];
cur = &ctx->upstreams[ctx->upstream_num];
if ((t = strrchr (p, ':')) != NULL && (tt = strchr (p, ':')) != t) {
/* Assume that after last `:' we have weigth */
*t = '\0';
t ++;
errno = 0;
cur->up.priority = strtoul (t, &err_str, 10);
if (errno != 0 || (err_str && *err_str != '\0')) {
msg_err ("cannot convert weight: %s, %s", t, strerror (errno));
g_strfreev (strv);
return FALSE;
}
}
if (*p == '/') {
cur->is_unix = TRUE;
if (realpath (p, resolved_path) == NULL) {
msg_err ("cannot resolve path: %s", resolved_path);
g_strfreev (strv);
return FALSE;
}
cur->name = memory_pool_strdup (ctx->pool, resolved_path);
ctx->upstream_num ++;
}
else {
if (! parse_host_port (p, &cur->addr, &cur->port)) {
g_strfreev (strv);
return FALSE;
}
cur->name = memory_pool_strdup (ctx->pool, p);
ctx->upstream_num ++;
}
}

g_strfreev (strv);
return TRUE;
}

static void
make_capabilities (struct smtp_worker_ctx *ctx, const gchar *line)
{
@@ -973,7 +915,7 @@ config_smtp_worker (struct rspamd_worker *worker)

/* Init upstreams */
if ((value = ctx->upstreams_str) != NULL) {
if (!parse_upstreams_line (ctx, value)) {
if (!parse_upstreams_line (ctx->pool, ctx->upstreams, value, &ctx->upstream_num)) {
return FALSE;
}
}

+ 3
- 12
src/smtp.h Visa fil

@@ -4,19 +4,10 @@
#include "config.h"
#include "main.h"
#include "upstream.h"

struct smtp_upstream {
struct upstream up;
const gchar *name;
struct in_addr addr;
guint16 port;
gboolean is_unix;
};
#include "smtp_utils.h"

struct rspamd_dns_resolver;

#define MAX_UPSTREAM 128
#define DEFAULT_MAX_ERRORS 10

enum rspamd_smtp_stage {
@@ -29,8 +20,8 @@ enum rspamd_smtp_stage {
};

struct smtp_worker_ctx {
struct smtp_upstream upstreams[MAX_UPSTREAM];
size_t upstream_num;
struct smtp_upstream upstreams[MAX_SMTP_UPSTREAMS];
gsize upstream_num;
gchar *upstreams_str;
memory_pool_t *pool;

+ 2
- 2
src/smtp_proto.c Visa fil

@@ -31,7 +31,7 @@
#include "smtp_utils.h"

gchar *
make_smtp_error (struct smtp_session *session, gint error_code, const gchar *format, ...)
make_smtp_error (memory_pool_t *pool, gint error_code, const gchar *format, ...)
{
va_list vp;
gchar *result = NULL, *p;
@@ -42,7 +42,7 @@ make_smtp_error (struct smtp_session *session, gint error_code, const gchar *for
va_end (vp);
va_start (vp, format);
len += sizeof ("65535 ") + sizeof (CRLF) - 1;
result = memory_pool_alloc (session->pool, len);
result = memory_pool_alloc (pool, len);
p = result + rspamd_snprintf (result, len, "%d ", error_code);
p = rspamd_vsnprintf (p, len - (p - result), format, vp);
*p++ = CR; *p++ = LF; *p = '\0';

+ 5
- 1
src/smtp_proto.h Visa fil

@@ -18,6 +18,10 @@

#define DATA_END_TRAILER "." CRLF

#define XCLIENT_HOST_UNAVAILABLE "[UNAVAILABLE]"
#define XCLIENT_HOST_TEMPFAIL "[TEMPUNAVAIL]"

#define MAX_SMTP_UPSTREAMS 128

struct smtp_command {
enum {
@@ -39,7 +43,7 @@ struct smtp_command {
/*
* Generate SMTP error message
*/
gchar * make_smtp_error (struct smtp_session *session, gint error_code, const gchar *format, ...);
gchar * make_smtp_error (memory_pool_t *pool, gint error_code, const gchar *format, ...);

/*
* Parse a single SMTP command

+ 581
- 0
src/smtp_proxy.c Visa fil

@@ -0,0 +1,581 @@
/* Copyright (c) 2010-2012, Vsevolod Stakhov
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

#include "config.h"
#include "main.h"
#include "cfg_file.h"
#include "cfg_xml.h"
#include "util.h"
#include "smtp_proto.h"
#include "map.h"
#include "message.h"
#include "settings.h"
#include "dns.h"
#include "upstream.h"
#include "proxy.h"

/*
* SMTP proxy is a simple smtp proxy worker for dns resolving and
* load balancing. It uses XCLIENT command and is designed for MTA
* that supports that (postfix and exim).
*/

/* Upstream timeouts */
#define DEFAULT_UPSTREAM_ERROR_TIME 10
#define DEFAULT_UPSTREAM_DEAD_TIME 300
#define DEFAULT_UPSTREAM_MAXERRORS 10

static sig_atomic_t wanna_die = 0;

/* Init functions */
gpointer init_smtp_proxy ();
void start_smtp_proxy (struct rspamd_worker *worker);

worker_t smtp_proxy_worker = {
"smtp_proxy", /* Name */
init_smtp_proxy, /* Init function */
start_smtp_proxy, /* Start function */
TRUE, /* Has socket */
FALSE, /* Non unique */
FALSE, /* Non threaded */
TRUE /* Killable */
};

struct smtp_proxy_ctx {
struct smtp_upstream upstreams[MAX_SMTP_UPSTREAMS];
size_t upstream_num;
gchar *upstreams_str;

memory_pool_t *pool;
guint32 smtp_delay;
guint32 delay_jitter;
guint32 smtp_timeout_raw;
struct timeval smtp_timeout;

struct rspamd_dns_resolver *resolver;
struct event_base *ev_base;
};

enum rspamd_smtp_proxy_state {
SMTP_PROXY_STATE_RESOLVE_REVERSE = 0,
SMTP_PROXY_STATE_RESOLVE_NORMAL,
SMTP_PROXY_STATE_DELAY
};

struct smtp_proxy_session {
struct smtp_proxy_ctx *ctx;
memory_pool_t *pool;

enum rspamd_smtp_proxy_state state;
struct rspamd_worker *worker;
struct in_addr client_addr;
gchar *hostname;
gchar *error;
gchar *temp_name;
gint sock;
gint upstream_sock;

struct rspamd_async_session *s;
rspamd_io_dispatcher_t *dispatcher;

rspamd_proxy_t *proxy;

struct smtp_upstream *upstream;

struct event *delay_timer;

gboolean resolved;
struct rspamd_dns_resolver *resolver;
struct event_base *ev_base;
};

#ifndef HAVE_SA_SIGINFO
static void
sig_handler (gint signo)
#else
static void
sig_handler (gint signo, siginfo_t *info, void *unused)
#endif
{
struct timeval tv;

switch (signo) {
case SIGINT:
case SIGTERM:
if (!wanna_die) {
wanna_die = 1;
tv.tv_sec = 0;
tv.tv_usec = 0;
event_loopexit (&tv);

#ifdef WITH_GPERF_TOOLS
ProfilerStop ();
#endif
}
break;
}
}

/*
* Config reload is designed by sending sigusr to active workers and pending shutdown of them
*/
static void
sigusr2_handler (gint fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
/* Do not accept new connections, preparing to end worker's process */
struct timeval tv;
if (! wanna_die) {
tv.tv_sec = SOFT_SHUTDOWN_TIME;
tv.tv_usec = 0;
event_del (&worker->sig_ev_usr1);
event_del (&worker->sig_ev_usr2);
event_del (&worker->bind_ev);
msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
event_loopexit (&tv);
}
return;
}

/*
* Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them
*/
static void
sigusr1_handler (gint fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *) arg;

reopen_log (worker->srv->logger);

return;
}

static void
free_smtp_proxy_session (gpointer arg)
{
struct smtp_proxy_session *session = arg;

if (session) {
if (session->dispatcher) {
rspamd_remove_dispatcher (session->dispatcher);
}

close (session->sock);
memory_pool_delete (session->pool);
g_free (session);
}
}

static void
smtp_proxy_err_proxy (GError * err, void *arg)
{
struct smtp_proxy_session *session = arg;

msg_info ("abnormally closing connection, error: %s", err->message);
/* Free buffers */
destroy_session (session->s);
}

static gboolean
create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session)
{
struct smtp_upstream *selected;
struct sockaddr_un *un;

/* Try to select upstream */
selected = (struct smtp_upstream *)get_upstream_round_robin (session->ctx->upstreams,
session->ctx->upstream_num, sizeof (struct smtp_upstream),
time (NULL), DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS);
if (selected == NULL) {
msg_err ("no upstreams suitable found");
return FALSE;
}

session->upstream = selected;

/* Now try to create socket */
if (selected->is_unix) {
un = alloca (sizeof (struct sockaddr_un));
session->upstream_sock = make_unix_socket (selected->name, un, FALSE, TRUE);
}
else {
session->upstream_sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE);
}
if (session->upstream_sock == -1) {
msg_err ("cannot make a connection to %s", selected->name);
upstream_fail (&selected->up, time (NULL));
return FALSE;
}
/* Create a proxy for upstream connection */
rspamd_dispatcher_pause (session->dispatcher);
session->proxy = rspamd_create_proxy (session->sock, session->upstream_sock, session->pool,
session->ev_base, BUFSIZ, &session->ctx->smtp_timeout, smtp_proxy_err_proxy, session);

return TRUE;
}

/* Resolving and delay handlers */
/*
* Return from a delay
*/
static void
smtp_delay_handler (gint fd, short what, void *arg)
{
struct smtp_proxy_session *session = arg;

remove_normal_event (session->s, (event_finalizer_t) event_del,
session->delay_timer);
if (session->state == SMTP_PROXY_STATE_DELAY) {
/* TODO: Create upstream connection here */
create_smtp_proxy_upstream_connection (session);
}
else {
/* TODO: Write error here */
if (rspamd_dispatcher_write (session->dispatcher,
make_smtp_error (session->pool, 550, "%s Improper use of SMTP command pipelining", "5.5.0"),
0, FALSE, TRUE)) {
destroy_session (session->s);
}
}
}

/*
* Make delay for a client
*/
static void
smtp_make_delay (struct smtp_proxy_session *session)
{
struct event *tev;
struct timeval *tv;
gint32 jitter;

if (session->ctx->smtp_delay != 0 && session->state == SMTP_PROXY_STATE_DELAY) {
tev = memory_pool_alloc (session->pool, sizeof(struct event));
tv = memory_pool_alloc (session->pool, sizeof(struct timeval));
if (session->ctx->delay_jitter != 0) {
jitter = g_random_int_range (0, session->ctx->delay_jitter);
msec_to_tv (session->ctx->smtp_delay + jitter, tv);
}
else {
msec_to_tv (session->ctx->smtp_delay, tv);
}

evtimer_set (tev, smtp_delay_handler, session);
evtimer_add (tev, tv);
register_async_event (session->s, (event_finalizer_t) event_del, tev,
g_quark_from_static_string ("smtp proxy"));
session->delay_timer = tev;
}
else if (session->state == SMTP_PROXY_STATE_DELAY) {
/* TODO: Create upstream connection here */
create_smtp_proxy_upstream_connection (session);
}
}

/*
* Handle DNS replies
*/
static void
smtp_dns_cb (struct rspamd_dns_reply *reply, void *arg)
{
struct smtp_proxy_session *session = arg;
gint res = 0;
union rspamd_reply_element *elt;
GList *cur;

switch (session->state)
{
case SMTP_PROXY_STATE_RESOLVE_REVERSE:
/* Parse reverse reply and start resolve of this ip */
if (reply->code != DNS_RC_NOERROR) {
rspamd_conditional_debug (rspamd_main->logger,
session->client_addr.s_addr, __FUNCTION__, "DNS error: %s",
dns_strerror (reply->code));

if (reply->code == DNS_RC_NXDOMAIN) {
session->hostname = memory_pool_strdup (session->pool,
XCLIENT_HOST_UNAVAILABLE);
}
else {
session->hostname = memory_pool_strdup (session->pool,
XCLIENT_HOST_TEMPFAIL);
}
session->state = SMTP_PROXY_STATE_DELAY;
smtp_make_delay (session);
}
else {
if (reply->elements) {
elt = reply->elements->data;
session->hostname = memory_pool_strdup (session->pool,
elt->ptr.name);
session->state = SMTP_PROXY_STATE_RESOLVE_NORMAL;
make_dns_request (session->resolver, session->s, session->pool,
smtp_dns_cb, session, DNS_REQUEST_A, session->hostname);

}
}
break;
case SMTP_PROXY_STATE_RESOLVE_NORMAL:
if (reply->code != DNS_RC_NOERROR) {
rspamd_conditional_debug (rspamd_main->logger,
session->client_addr.s_addr, __FUNCTION__, "DNS error: %s",
dns_strerror (reply->code));

if (reply->code == DNS_RC_NXDOMAIN) {
session->hostname = memory_pool_strdup (session->pool,
XCLIENT_HOST_UNAVAILABLE);
}
else {
session->hostname = memory_pool_strdup (session->pool,
XCLIENT_HOST_TEMPFAIL);
}
session->state = SMTP_PROXY_STATE_DELAY;
smtp_make_delay (session);
}
else {
res = 0;
cur = reply->elements;
while (cur) {
elt = cur->data;
if (memcmp (&session->client_addr, &elt->a.addr[0],
sizeof(struct in_addr)) == 0) {
res = 1;
session->resolved = TRUE;
break;
}
cur = g_list_next (cur);
}

if (res == 0) {
msg_info(
"cannot find address for hostname: %s, ip: %s", session->hostname, inet_ntoa (session->client_addr));
session->hostname = memory_pool_strdup (session->pool,
XCLIENT_HOST_UNAVAILABLE);
}
session->state = SMTP_PROXY_STATE_DELAY;
smtp_make_delay (session);
}
break;
default:
/* TODO: write something about pipelining */
break;
}
}

/*
* Callback that is called when there is data to read in buffer
*/
static gboolean
smtp_proxy_read_socket (f_str_t * in, void *arg)
{
struct smtp_proxy_session *session = arg;

/* This can be called only if client is using invalid pipelining */
if (rspamd_dispatcher_write (session->dispatcher,
make_smtp_error (session->pool, 550, "%s Improper use of SMTP command pipelining", "5.5.0"),
0, FALSE, TRUE)) {
destroy_session (session->s);
}
else {
return FALSE;
}

return TRUE;
}

/*
* Called if something goes wrong
*/
static void
smtp_proxy_err_socket (GError * err, void *arg)
{
struct smtp_proxy_session *session = arg;

msg_info ("abnormally closing connection, error: %s", err->message);
/* Free buffers */
destroy_session (session->s);
}

/*
* Accept new connection and construct session
*/
static void
accept_socket (gint fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
union sa_union su;
struct smtp_proxy_session *session;
struct smtp_proxy_ctx *ctx;

socklen_t addrlen = sizeof (su.ss);
gint nfd;

if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
msg_warn ("accept failed: %s", strerror (errno));
return;
}
/* Check for EAGAIN */
if (nfd == 0) {
return;
}

ctx = worker->ctx;
session = g_slice_alloc0 (sizeof (struct smtp_proxy_session));
session->pool = memory_pool_new (memory_pool_get_size ());

if (su.ss.ss_family == AF_UNIX) {
msg_info ("accepted connection from unix socket");
session->client_addr.s_addr = INADDR_NONE;
}
else if (su.ss.ss_family == AF_INET) {
msg_info ("accepted connection from %s port %d", inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port));
memcpy (&session->client_addr, &su.s4.sin_addr, sizeof (struct in_addr));
}

session->sock = nfd;
session->worker = worker;
session->ctx = ctx;
session->resolver = ctx->resolver;
session->ev_base = ctx->ev_base;
worker->srv->stat->connections_count++;

/* Resolve client's addr */
/* Set up async session */
session->s = new_async_session (session->pool, NULL, NULL, free_smtp_proxy_session, session);
session->state = SMTP_PROXY_STATE_RESOLVE_REVERSE;
if (! make_dns_request (session->resolver, session->s, session->pool,
smtp_dns_cb, session, DNS_REQUEST_PTR, &session->client_addr)) {
msg_err ("cannot resolve %s", inet_ntoa (session->client_addr));
g_slice_free1 (sizeof (struct smtp_proxy_session), session);
close (nfd);
return;
}
else {
session->dispatcher = rspamd_create_dispatcher (session->ev_base, nfd, BUFFER_ANY,
smtp_proxy_read_socket, NULL, smtp_proxy_err_socket,
&session->ctx->smtp_timeout, session);
session->dispatcher->peer_addr = session->client_addr.s_addr;
}
}

gpointer
init_smtp_proxy (void)
{
struct smtp_proxy_ctx *ctx;
GQuark type;

type = g_quark_try_string ("smtp_proxy");

ctx = g_malloc0 (sizeof (struct smtp_worker_ctx));
ctx->pool = memory_pool_new (memory_pool_get_size ());

/* Set default values */
ctx->smtp_timeout_raw = 300000;
ctx->smtp_delay = 0;

register_worker_opt (type, "upstreams", xml_handle_string, ctx,
G_STRUCT_OFFSET (struct smtp_proxy_ctx, upstreams_str));
register_worker_opt (type, "timeout", xml_handle_seconds, ctx,
G_STRUCT_OFFSET (struct smtp_proxy_ctx, smtp_timeout_raw));
register_worker_opt (type, "delay", xml_handle_seconds, ctx,
G_STRUCT_OFFSET (struct smtp_proxy_ctx, smtp_delay));
register_worker_opt (type, "jitter", xml_handle_seconds, ctx,
G_STRUCT_OFFSET (struct smtp_proxy_ctx, delay_jitter));

return ctx;
}

/* Make post-init configuration */
static gboolean
config_smtp_proxy_worker (struct rspamd_worker *worker)
{
struct smtp_proxy_ctx *ctx = worker->ctx;
gchar *value;

/* Init timeval */
msec_to_tv (ctx->smtp_timeout_raw, &ctx->smtp_timeout);

/* Init upstreams */
if ((value = ctx->upstreams_str) != NULL) {
if (!parse_upstreams_line (ctx->pool, ctx->upstreams, value, &ctx->upstream_num)) {
return FALSE;
}
}
else {
msg_err ("no upstreams defined, don't know what to do");
return FALSE;
}

return TRUE;
}

/*
* Start worker process
*/
void
start_smtp_proxy (struct rspamd_worker *worker)
{
struct sigaction signals;
struct smtp_worker_ctx *ctx = worker->ctx;

gperf_profiler_init (worker->srv->cfg, "worker");

worker->srv->pid = getpid ();
ctx->ev_base = event_init ();

/* Set smtp options */
if ( !config_smtp_proxy_worker (worker)) {
msg_err ("cannot configure smtp worker, exiting");
exit (EXIT_SUCCESS);
}

init_signals (&signals, sig_handler);
sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);

/* SIGUSR2 handler */
signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
event_base_set (ctx->ev_base, &worker->sig_ev_usr2);
signal_add (&worker->sig_ev_usr2, NULL);

/* SIGUSR1 handler */
signal_set (&worker->sig_ev_usr1, SIGUSR1, sigusr1_handler, (void *) worker);
event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
signal_add (&worker->sig_ev_usr1, NULL);

/* Accept event */
event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
event_base_set (ctx->ev_base, &worker->bind_ev);
event_add (&worker->bind_ev, NULL);

/* DNS resolver */
ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);

/* Set umask */
umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);

event_base_loop (ctx->ev_base, 0);

close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);
}


+ 57
- 3
src/smtp_utils.c Visa fil

@@ -28,7 +28,7 @@
#include "smtp.h"
#include "smtp_proto.h"

gboolean
void
free_smtp_session (gpointer arg)
{
struct smtp_session *session = arg;
@@ -56,8 +56,6 @@ free_smtp_session (gpointer arg)
memory_pool_delete (session->pool);
g_free (session);
}

return TRUE;
}

gboolean
@@ -313,3 +311,59 @@ err:
destroy_session (session->s);
return FALSE;
}

gboolean
parse_upstreams_line (memory_pool_t *pool, struct smtp_upstream *upstreams, const gchar *line, gsize *count)
{
gchar **strv, *p, *t, *tt, *err_str;
guint32 num, i;
struct smtp_upstream *cur;
gchar resolved_path[PATH_MAX];

strv = g_strsplit_set (line, ",; ", -1);
num = g_strv_length (strv);

if (num >= MAX_SMTP_UPSTREAMS) {
msg_err ("cannot define %d upstreams %d is max", num, MAX_SMTP_UPSTREAMS);
return FALSE;
}
*count = 0;

for (i = 0; i < num; i ++) {
p = strv[i];
cur = &upstreams[*count];
if ((t = strrchr (p, ':')) != NULL && (tt = strchr (p, ':')) != t) {
/* Assume that after last `:' we have weigth */
*t = '\0';
t ++;
errno = 0;
cur->up.priority = strtoul (t, &err_str, 10);
if (errno != 0 || (err_str && *err_str != '\0')) {
msg_err ("cannot convert weight: %s, %s", t, strerror (errno));
g_strfreev (strv);
return FALSE;
}
}
if (*p == '/') {
cur->is_unix = TRUE;
if (realpath (p, resolved_path) == NULL) {
msg_err ("cannot resolve path: %s", resolved_path);
g_strfreev (strv);
return FALSE;
}
cur->name = memory_pool_strdup (pool, resolved_path);
(*count) ++;
}
else {
if (! parse_host_port (p, &cur->addr, &cur->port)) {
g_strfreev (strv);
return FALSE;
}
cur->name = memory_pool_strdup (pool, p);
(*count) ++;
}
}

g_strfreev (strv);
return TRUE;
}

+ 22
- 1
src/smtp_utils.h Visa fil

@@ -4,13 +4,25 @@
#include "config.h"
#include "main.h"
#include "smtp.h"
#include "smtp_proto.h"

/**
* @file smtp_utils.h
* Contains utilities for smtp protocol handling
*/

struct smtp_upstream {
struct upstream up;

const gchar *name;
struct in_addr addr;
guint16 port;
gboolean is_unix;
};

#define MAX_SMTP_UPSTREAMS 128

struct smtp_session;

/**
* Send message to upstream
* @param session session object
@@ -39,4 +51,13 @@ gboolean write_smtp_reply (struct smtp_session *session);
*/
void free_smtp_session (gpointer arg);

/**
* Parse upstreams line
* @param upstreams pointer to the array of upstreams (must be at least MAX_SMTP_UPSTREAMS size)
* @param line description line
* @param count targeted count
* @return
*/
gboolean parse_upstreams_line (memory_pool_t *pool, struct smtp_upstream *upstreams, const gchar *line, gsize *count);

#endif /* SMTP_UTILS_H_ */

Laddar…
Avbryt
Spara