/* Copyright (c) 2010-2012, Vsevolod Stakhov * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "config.h" #include "rspamd.h" #include "proxy.h" #include "unix-std.h" static void rspamd_proxy_backend_handler (gint fd, gshort what, gpointer data); static void rspamd_proxy_client_handler (gint fd, gshort what, gpointer data); static inline GQuark proxy_error_quark (void) { return g_quark_from_static_string ("proxy-error"); } void rspamd_proxy_close (rspamd_proxy_t *proxy) { if (!proxy->closed) { close (proxy->cfd); close (proxy->bfd); event_del (&proxy->client_ev); event_del (&proxy->backend_ev); proxy->closed = TRUE; } } static void rspamd_proxy_client_handler (gint fd, gshort what, gpointer data) { rspamd_proxy_t *proxy = data; gint r; GError *err = NULL; if (what == EV_READ) { /* Got data from client */ event_del (&proxy->client_ev); r = read (proxy->cfd, proxy->buf, proxy->bufsize); if (r > 0) { /* Write this buffer to backend */ proxy->read_len = r; proxy->buf_offset = 0; event_del (&proxy->backend_ev); event_set (&proxy->backend_ev, proxy->bfd, EV_WRITE, rspamd_proxy_backend_handler, proxy); event_add (&proxy->backend_ev, proxy->tv); } else { /* Error case or zero reply */ if (r < 0) { /* Error case */ g_set_error (&err, proxy_error_quark (), r, "Client read error: %s", strerror (errno)); rspamd_proxy_close (proxy); proxy->err_cb (err, proxy->user_data); } else { /* Client closes connection */ rspamd_proxy_close (proxy); proxy->err_cb (NULL, proxy->user_data); } } } else if (what == EV_WRITE) { /* Can write to client */ r = write (proxy->cfd, proxy->buf + proxy->buf_offset, proxy->read_len - proxy->buf_offset); if (r > 0) { /* We wrote something */ proxy->buf_offset += r; if (proxy->buf_offset == proxy->read_len) { /* We wrote everything */ event_del (&proxy->client_ev); event_set (&proxy->client_ev, proxy->cfd, EV_READ, rspamd_proxy_client_handler, proxy); event_add (&proxy->client_ev, proxy->tv); event_del (&proxy->backend_ev); event_set (&proxy->backend_ev, proxy->bfd, EV_READ, rspamd_proxy_backend_handler, proxy); event_add (&proxy->backend_ev, proxy->tv); } else { /* Plan another write event */ event_add (&proxy->backend_ev, proxy->tv); } } else { /* Error case or zero reply */ if (r < 0) { /* Error case */ g_set_error (&err, proxy_error_quark (), r, "Client write error: %s", strerror (errno)); rspamd_proxy_close (proxy); proxy->err_cb (err, proxy->user_data); } else { /* Client closes connection */ rspamd_proxy_close (proxy); proxy->err_cb (NULL, proxy->user_data); } } } else { /* Got timeout */ g_set_error (&err, proxy_error_quark (), ETIMEDOUT, "Client timeout"); rspamd_proxy_close (proxy); proxy->err_cb (err, proxy->user_data); } } static void rspamd_proxy_backend_handler (gint fd, gshort what, gpointer data) { rspamd_proxy_t *proxy = data; gint r; GError *err = NULL; if (what == EV_READ) { /* Got data from backend */ event_del (&proxy->backend_ev); r = read (proxy->bfd, proxy->buf, proxy->bufsize); if (r > 0) { /* Write this buffer to client */ proxy->read_len = r; proxy->buf_offset = 0; event_del (&proxy->client_ev); event_set (&proxy->client_ev, proxy->bfd, EV_WRITE, rspamd_proxy_client_handler, proxy); event_add (&proxy->client_ev, proxy->tv); } else { /* Error case or zero reply */ if (r < 0) { /* Error case */ g_set_error (&err, proxy_error_quark (), r, "Backend read error: %s", strerror (errno)); rspamd_proxy_close (proxy); proxy->err_cb (err, proxy->user_data); } else { /* Client closes connection */ rspamd_proxy_close (proxy); proxy->err_cb (NULL, proxy->user_data); } } } else if (what == EV_WRITE) { /* Can write to backend */ r = write (proxy->bfd, proxy->buf + proxy->buf_offset, proxy->read_len - proxy->buf_offset); if (r > 0) { /* We wrote something */ proxy->buf_offset += r; if (proxy->buf_offset == proxy->read_len) { /* We wrote everything */ event_del (&proxy->backend_ev); event_set (&proxy->backend_ev, proxy->bfd, EV_READ, rspamd_proxy_backend_handler, proxy); event_add (&proxy->backend_ev, proxy->tv); event_del (&proxy->client_ev); event_set (&proxy->client_ev, proxy->cfd, EV_READ, rspamd_proxy_client_handler, proxy); event_add (&proxy->client_ev, proxy->tv); } else { /* Plan another write event */ event_add (&proxy->backend_ev, proxy->tv); } } else { /* Error case or zero reply */ if (r < 0) { /* Error case */ g_set_error (&err, proxy_error_quark (), r, "Backend write error: %s", strerror (errno)); rspamd_proxy_close (proxy); proxy->err_cb (err, proxy->user_data); } else { /* Client closes connection */ rspamd_proxy_close (proxy); proxy->err_cb (NULL, proxy->user_data); } } } else { /* Got timeout */ g_set_error (&err, proxy_error_quark (), ETIMEDOUT, "Client timeout"); rspamd_proxy_close (proxy); proxy->err_cb (err, proxy->user_data); } } /** * Create new proxy between cfd and bfd * @param cfd client's socket * @param bfd backend's socket * @param bufsize size of exchange buffer * @param err_cb callback for erorrs or completing * @param ud user data for callback * @return new proxy object */ rspamd_proxy_t * rspamd_create_proxy (gint cfd, gint bfd, rspamd_mempool_t *pool, struct event_base *base, gsize bufsize, struct timeval *tv, dispatcher_err_callback_t err_cb, gpointer ud) { rspamd_proxy_t *new; new = rspamd_mempool_alloc0 (pool, sizeof (rspamd_proxy_t)); new->cfd = dup (cfd); new->bfd = dup (bfd); new->pool = pool; new->base = base; new->bufsize = bufsize; new->buf = rspamd_mempool_alloc (pool, bufsize); new->err_cb = err_cb; new->user_data = ud; new->tv = tv; /* Set client's and backend's interfaces to read events */ event_set (&new->client_ev, new->cfd, EV_READ, rspamd_proxy_client_handler, new); event_base_set (new->base, &new->client_ev); event_add (&new->client_ev, new->tv); event_set (&new->backend_ev, new->bfd, EV_READ, rspamd_proxy_backend_handler, new); event_base_set (new->base, &new->backend_ev); event_add (&new->backend_ev, new->tv); return new; }