aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver/proxy.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libserver/proxy.c')
-rw-r--r--src/libserver/proxy.c241
1 files changed, 241 insertions, 0 deletions
diff --git a/src/libserver/proxy.c b/src/libserver/proxy.c
new file mode 100644
index 000000000..67c7665b8
--- /dev/null
+++ b/src/libserver/proxy.c
@@ -0,0 +1,241 @@
+/* Copyright (c) 2010-2012, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "main.h"
+#include "proxy.h"
+
+static void rspamd_proxy_backend_handler (gint fd, gshort what, gpointer data);
+static void rspamd_proxy_client_handler (gint fd, gshort what, gpointer data);
+
+static inline GQuark
+proxy_error_quark (void)
+{
+ return g_quark_from_static_string ("proxy-error");
+}
+
+void
+rspamd_proxy_close (rspamd_proxy_t *proxy)
+{
+ if (!proxy->closed) {
+ close (proxy->cfd);
+ close (proxy->bfd);
+
+ event_del (&proxy->client_ev);
+ event_del (&proxy->backend_ev);
+ proxy->closed = TRUE;
+ }
+}
+
+static void
+rspamd_proxy_client_handler (gint fd, gshort what, gpointer data)
+{
+ rspamd_proxy_t *proxy = data;
+ gint r;
+ GError *err = NULL;
+
+ if (what == EV_READ) {
+ /* Got data from client */
+ event_del (&proxy->client_ev);
+ r = read (proxy->cfd, proxy->buf, proxy->bufsize);
+ if (r > 0) {
+ /* Write this buffer to backend */
+ proxy->read_len = r;
+ proxy->buf_offset = 0;
+ event_del (&proxy->backend_ev);
+ event_set (&proxy->backend_ev, proxy->bfd, EV_WRITE, rspamd_proxy_backend_handler, proxy);
+ event_add (&proxy->backend_ev, proxy->tv);
+ }
+ else {
+ /* Error case or zero reply */
+ if (r < 0) {
+ /* Error case */
+ g_set_error (&err, proxy_error_quark(), r, "Client read error: %s", strerror (errno));
+ rspamd_proxy_close (proxy);
+ proxy->err_cb (err, proxy->user_data);
+ }
+ else {
+ /* Client closes connection */
+ rspamd_proxy_close (proxy);
+ proxy->err_cb (NULL, proxy->user_data);
+ }
+ }
+ }
+ else if (what == EV_WRITE) {
+ /* Can write to client */
+ r = write (proxy->cfd, proxy->buf + proxy->buf_offset, proxy->read_len - proxy->buf_offset);
+ if (r > 0) {
+ /* We wrote something */
+ proxy->buf_offset +=r;
+ if (proxy->buf_offset == proxy->read_len) {
+ /* We wrote everything */
+ event_del (&proxy->client_ev);
+ event_set (&proxy->client_ev, proxy->cfd, EV_READ, rspamd_proxy_client_handler, proxy);
+ event_add (&proxy->client_ev, proxy->tv);
+ event_del (&proxy->backend_ev);
+ event_set (&proxy->backend_ev, proxy->bfd, EV_READ, rspamd_proxy_backend_handler, proxy);
+ event_add (&proxy->backend_ev, proxy->tv);
+ }
+ else {
+ /* Plan another write event */
+ event_add (&proxy->backend_ev, proxy->tv);
+ }
+ }
+ else {
+ /* Error case or zero reply */
+ if (r < 0) {
+ /* Error case */
+ g_set_error (&err, proxy_error_quark(), r, "Client write error: %s", strerror (errno));
+ rspamd_proxy_close (proxy);
+ proxy->err_cb (err, proxy->user_data);
+ }
+ else {
+ /* Client closes connection */
+ rspamd_proxy_close (proxy);
+ proxy->err_cb (NULL, proxy->user_data);
+ }
+ }
+ }
+ else {
+ /* Got timeout */
+ g_set_error (&err, proxy_error_quark(), ETIMEDOUT, "Client timeout");
+ rspamd_proxy_close (proxy);
+ proxy->err_cb (err, proxy->user_data);
+ }
+}
+
+static void
+rspamd_proxy_backend_handler (gint fd, gshort what, gpointer data)
+{
+ rspamd_proxy_t *proxy = data;
+ gint r;
+ GError *err = NULL;
+
+ if (what == EV_READ) {
+ /* Got data from backend */
+ event_del (&proxy->backend_ev);
+ r = read (proxy->bfd, proxy->buf, proxy->bufsize);
+ if (r > 0) {
+ /* Write this buffer to client */
+ proxy->read_len = r;
+ proxy->buf_offset = 0;
+ event_del (&proxy->client_ev);
+ event_set (&proxy->client_ev, proxy->bfd, EV_WRITE, rspamd_proxy_client_handler, proxy);
+ event_add (&proxy->client_ev, proxy->tv);
+ }
+ else {
+ /* Error case or zero reply */
+ if (r < 0) {
+ /* Error case */
+ g_set_error (&err, proxy_error_quark(), r, "Backend read error: %s", strerror (errno));
+ rspamd_proxy_close (proxy);
+ proxy->err_cb (err, proxy->user_data);
+ }
+ else {
+ /* Client closes connection */
+ rspamd_proxy_close (proxy);
+ proxy->err_cb (NULL, proxy->user_data);
+ }
+ }
+ }
+ else if (what == EV_WRITE) {
+ /* Can write to backend */
+ r = write (proxy->bfd, proxy->buf + proxy->buf_offset, proxy->read_len - proxy->buf_offset);
+ if (r > 0) {
+ /* We wrote something */
+ proxy->buf_offset +=r;
+ if (proxy->buf_offset == proxy->read_len) {
+ /* We wrote everything */
+ event_del (&proxy->backend_ev);
+ event_set (&proxy->backend_ev, proxy->bfd, EV_READ, rspamd_proxy_backend_handler, proxy);
+ event_add (&proxy->backend_ev, proxy->tv);
+ event_del (&proxy->client_ev);
+ event_set (&proxy->client_ev, proxy->cfd, EV_READ, rspamd_proxy_client_handler, proxy);
+ event_add (&proxy->client_ev, proxy->tv);
+ }
+ else {
+ /* Plan another write event */
+ event_add (&proxy->backend_ev, proxy->tv);
+ }
+ }
+ else {
+ /* Error case or zero reply */
+ if (r < 0) {
+ /* Error case */
+ g_set_error (&err, proxy_error_quark(), r, "Backend write error: %s", strerror (errno));
+ rspamd_proxy_close (proxy);
+ proxy->err_cb (err, proxy->user_data);
+ }
+ else {
+ /* Client closes connection */
+ rspamd_proxy_close (proxy);
+ proxy->err_cb (NULL, proxy->user_data);
+ }
+ }
+ }
+ else {
+ /* Got timeout */
+ g_set_error (&err, proxy_error_quark(), ETIMEDOUT, "Client timeout");
+ rspamd_proxy_close (proxy);
+ proxy->err_cb (err, proxy->user_data);
+ }
+}
+
+/**
+ * Create new proxy between cfd and bfd
+ * @param cfd client's socket
+ * @param bfd backend's socket
+ * @param bufsize size of exchange buffer
+ * @param err_cb callback for erorrs or completing
+ * @param ud user data for callback
+ * @return new proxy object
+ */
+rspamd_proxy_t*
+rspamd_create_proxy (gint cfd, gint bfd, rspamd_mempool_t *pool, struct event_base *base,
+ gsize bufsize, struct timeval *tv, dispatcher_err_callback_t err_cb, gpointer ud)
+{
+ rspamd_proxy_t *new;
+
+ new = rspamd_mempool_alloc0 (pool, sizeof (rspamd_proxy_t));
+
+ new->cfd = dup (cfd);
+ new->bfd = dup (bfd);
+ new->pool = pool;
+ new->base = base;
+ new->bufsize = bufsize;
+ new->buf = rspamd_mempool_alloc (pool, bufsize);
+ new->err_cb = err_cb;
+ new->user_data = ud;
+ new->tv = tv;
+
+ /* Set client's and backend's interfaces to read events */
+ event_set (&new->client_ev, new->cfd, EV_READ, rspamd_proxy_client_handler, new);
+ event_base_set (new->base, &new->client_ev);
+ event_add (&new->client_ev, new->tv);
+
+ event_set (&new->backend_ev, new->bfd, EV_READ, rspamd_proxy_backend_handler, new);
+ event_base_set (new->base, &new->backend_ev);
+ event_add (&new->backend_ev, new->tv);
+
+ return new;
+}