Browse Source

[Rework] More cleanup actions

tags/1.4.0
Vsevolod Stakhov 7 years ago
parent
commit
ed77fe5856

+ 1
- 2
src/CMakeLists.txt View File

@@ -78,7 +78,6 @@ SET(RSPAMDSRC controller.c
fuzzy_storage.c
lua_worker.c
rspamd.c
smtp_proxy.c
worker.c
rspamd_proxy.c
log_helper.c)
@@ -93,7 +92,7 @@ SET(PLUGINSSRC plugins/surbl.c
lua/lua_fann.c)

SET(MODULES_LIST surbl regexp chartable fuzzy_check spf dkim)
SET(WORKERS_LIST normal controller smtp_proxy fuzzy lua rspamd_proxy log_helper)
SET(WORKERS_LIST normal controller fuzzy lua rspamd_proxy log_helper)
IF (ENABLE_HYPERSCAN MATCHES "ON")
LIST(APPEND WORKERS_LIST "hs_helper")
LIST(APPEND RSPAMDSRC "hs_helper.c")

+ 0
- 2
src/libmime/CMakeLists.txt View File

@@ -5,8 +5,6 @@ SET(LIBRSPAMDMIMESRC
${CMAKE_CURRENT_SOURCE_DIR}/filter.c
${CMAKE_CURRENT_SOURCE_DIR}/images.c
${CMAKE_CURRENT_SOURCE_DIR}/message.c
${CMAKE_CURRENT_SOURCE_DIR}/smtp_utils.c
${CMAKE_CURRENT_SOURCE_DIR}/smtp_proto.c
${CMAKE_CURRENT_SOURCE_DIR}/archives.c)

SET(RSPAMD_MIME ${LIBRSPAMDMIMESRC} PARENT_SCOPE)

+ 0
- 2
src/libserver/CMakeLists.txt View File

@@ -1,6 +1,5 @@
# Librspamdserver
SET(LIBRSPAMDSERVERSRC
${CMAKE_CURRENT_SOURCE_DIR}/buffer.c
${CMAKE_CURRENT_SOURCE_DIR}/cfg_utils.c
${CMAKE_CURRENT_SOURCE_DIR}/cfg_rcl.c
${CMAKE_CURRENT_SOURCE_DIR}/composites.c
@@ -14,7 +13,6 @@ SET(LIBRSPAMDSERVERSRC
${CMAKE_CURRENT_SOURCE_DIR}/html.c
${CMAKE_CURRENT_SOURCE_DIR}/monitored.c
${CMAKE_CURRENT_SOURCE_DIR}/protocol.c
${CMAKE_CURRENT_SOURCE_DIR}/proxy.c
${CMAKE_CURRENT_SOURCE_DIR}/redis_pool.c
${CMAKE_CURRENT_SOURCE_DIR}/re_cache.c
${CMAKE_CURRENT_SOURCE_DIR}/roll_history.c

+ 0
- 284
src/libserver/proxy.c View File

@@ -1,284 +0,0 @@
/*-
* Copyright 2016 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "config.h"
#include "rspamd.h"
#include "proxy.h"
#include "unix-std.h"

static void rspamd_proxy_backend_handler (gint fd, gshort what, gpointer data);
static void rspamd_proxy_client_handler (gint fd, gshort what, gpointer data);

static inline GQuark
proxy_error_quark (void)
{
return g_quark_from_static_string ("proxy-error");
}

void
rspamd_proxy_close (rspamd_proxy_t *proxy)
{
if (!proxy->closed) {
close (proxy->cfd);
close (proxy->bfd);

event_del (&proxy->client_ev);
event_del (&proxy->backend_ev);
proxy->closed = TRUE;
}
}

static void
rspamd_proxy_client_handler (gint fd, gshort what, gpointer data)
{
rspamd_proxy_t *proxy = data;
gint r;
GError *err = NULL;

if (what == EV_READ) {
/* Got data from client */
event_del (&proxy->client_ev);
r = read (proxy->cfd, proxy->buf, proxy->bufsize);
if (r > 0) {
/* Write this buffer to backend */
proxy->read_len = r;
proxy->buf_offset = 0;
event_del (&proxy->backend_ev);
event_set (&proxy->backend_ev,
proxy->bfd,
EV_WRITE,
rspamd_proxy_backend_handler,
proxy);
event_add (&proxy->backend_ev, proxy->tv);
}
else {
/* Error case or zero reply */
if (r < 0) {
/* Error case */
g_set_error (&err,
proxy_error_quark (), r, "Client read error: %s",
strerror (errno));
rspamd_proxy_close (proxy);
proxy->err_cb (err, proxy->user_data);
}
else {
/* Client closes connection */
rspamd_proxy_close (proxy);
proxy->err_cb (NULL, proxy->user_data);
}
}
}
else if (what == EV_WRITE) {
/* Can write to client */
r = write (proxy->cfd,
proxy->buf + proxy->buf_offset,
proxy->read_len - proxy->buf_offset);
if (r > 0) {
/* We wrote something */
proxy->buf_offset += r;
if (proxy->buf_offset == proxy->read_len) {
/* We wrote everything */
event_del (&proxy->client_ev);
event_set (&proxy->client_ev,
proxy->cfd,
EV_READ,
rspamd_proxy_client_handler,
proxy);
event_add (&proxy->client_ev, proxy->tv);
event_del (&proxy->backend_ev);
event_set (&proxy->backend_ev,
proxy->bfd,
EV_READ,
rspamd_proxy_backend_handler,
proxy);
event_add (&proxy->backend_ev, proxy->tv);
}
else {
/* Plan another write event */
event_add (&proxy->backend_ev, proxy->tv);
}
}
else {
/* Error case or zero reply */
if (r < 0) {
/* Error case */
g_set_error (&err,
proxy_error_quark (), r, "Client write error: %s",
strerror (errno));
rspamd_proxy_close (proxy);
proxy->err_cb (err, proxy->user_data);
}
else {
/* Client closes connection */
rspamd_proxy_close (proxy);
proxy->err_cb (NULL, proxy->user_data);
}
}
}
else {
/* Got timeout */
g_set_error (&err, proxy_error_quark (), ETIMEDOUT, "Client timeout");
rspamd_proxy_close (proxy);
proxy->err_cb (err, proxy->user_data);
}
}

static void
rspamd_proxy_backend_handler (gint fd, gshort what, gpointer data)
{
rspamd_proxy_t *proxy = data;
gint r;
GError *err = NULL;

if (what == EV_READ) {
/* Got data from backend */
event_del (&proxy->backend_ev);
r = read (proxy->bfd, proxy->buf, proxy->bufsize);
if (r > 0) {
/* Write this buffer to client */
proxy->read_len = r;
proxy->buf_offset = 0;
event_del (&proxy->client_ev);
event_set (&proxy->client_ev,
proxy->bfd,
EV_WRITE,
rspamd_proxy_client_handler,
proxy);
event_add (&proxy->client_ev, proxy->tv);
}
else {
/* Error case or zero reply */
if (r < 0) {
/* Error case */
g_set_error (&err,
proxy_error_quark (), r, "Backend read error: %s",
strerror (errno));
rspamd_proxy_close (proxy);
proxy->err_cb (err, proxy->user_data);
}
else {
/* Client closes connection */
rspamd_proxy_close (proxy);
proxy->err_cb (NULL, proxy->user_data);
}
}
}
else if (what == EV_WRITE) {
/* Can write to backend */
r = write (proxy->bfd,
proxy->buf + proxy->buf_offset,
proxy->read_len - proxy->buf_offset);
if (r > 0) {
/* We wrote something */
proxy->buf_offset += r;
if (proxy->buf_offset == proxy->read_len) {
/* We wrote everything */
event_del (&proxy->backend_ev);
event_set (&proxy->backend_ev,
proxy->bfd,
EV_READ,
rspamd_proxy_backend_handler,
proxy);
event_add (&proxy->backend_ev, proxy->tv);
event_del (&proxy->client_ev);
event_set (&proxy->client_ev,
proxy->cfd,
EV_READ,
rspamd_proxy_client_handler,
proxy);
event_add (&proxy->client_ev, proxy->tv);
}
else {
/* Plan another write event */
event_add (&proxy->backend_ev, proxy->tv);
}
}
else {
/* Error case or zero reply */
if (r < 0) {
/* Error case */
g_set_error (&err,
proxy_error_quark (), r, "Backend write error: %s",
strerror (errno));
rspamd_proxy_close (proxy);
proxy->err_cb (err, proxy->user_data);
}
else {
/* Client closes connection */
rspamd_proxy_close (proxy);
proxy->err_cb (NULL, proxy->user_data);
}
}
}
else {
/* Got timeout */
g_set_error (&err, proxy_error_quark (), ETIMEDOUT, "Client timeout");
rspamd_proxy_close (proxy);
proxy->err_cb (err, proxy->user_data);
}
}

/**
* Create new proxy between cfd and bfd
* @param cfd client's socket
* @param bfd backend's socket
* @param bufsize size of exchange buffer
* @param err_cb callback for erorrs or completing
* @param ud user data for callback
* @return new proxy object
*/
rspamd_proxy_t *
rspamd_create_proxy (gint cfd,
gint bfd,
rspamd_mempool_t *pool,
struct event_base *base,
gsize bufsize,
struct timeval *tv,
dispatcher_err_callback_t err_cb,
gpointer ud)
{
rspamd_proxy_t *new;

new = rspamd_mempool_alloc0 (pool, sizeof (rspamd_proxy_t));

new->cfd = dup (cfd);
new->bfd = dup (bfd);
new->pool = pool;
new->base = base;
new->bufsize = bufsize;
new->buf = rspamd_mempool_alloc (pool, bufsize);
new->err_cb = err_cb;
new->user_data = ud;
new->tv = tv;

/* Set client's and backend's interfaces to read events */
event_set (&new->client_ev,
new->cfd,
EV_READ,
rspamd_proxy_client_handler,
new);
event_base_set (new->base, &new->client_ev);
event_add (&new->client_ev, new->tv);

event_set (&new->backend_ev,
new->bfd,
EV_READ,
rspamd_proxy_backend_handler,
new);
event_base_set (new->base, &new->backend_ev);
event_add (&new->backend_ev, new->tv);

return new;
}

+ 0
- 66
src/libserver/proxy.h View File

@@ -1,66 +0,0 @@
/*-
* Copyright 2016 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef PROXY_H_
#define PROXY_H_

#include "config.h"
#include "buffer.h"
#include <event.h>

/**
* @file proxy.h
* Direct asynchronous proxy implementation
*/

typedef struct rspamd_proxy_s {
struct event client_ev; /**< event for client's communication */
struct event backend_ev; /**< event for backend communication */
struct event_base *base; /**< base for event operations */
rspamd_mempool_t *pool; /**< memory pool */
dispatcher_err_callback_t err_cb; /**< error callback */
struct event_base *ev_base; /**< event base */
gint cfd; /**< client's socket */
gint bfd; /**< backend's socket */
guint8 *buf; /**< exchange buffer */
gsize bufsize; /**< buffer size */
gint read_len; /**< read length */
gint buf_offset; /**< offset to write */
gpointer user_data; /**< user's data for callbacks */
struct timeval *tv; /**< timeout for communications */
gboolean closed; /**< whether descriptors are closed */
} rspamd_proxy_t;

/**
* Create new proxy between cfd and bfd
* @param cfd client's socket
* @param bfd backend's socket
* @param bufsize size of exchange buffer
* @param err_cb callback for erorrs or completing
* @param ud user data for callback
* @return new proxy object
*/
rspamd_proxy_t * rspamd_create_proxy (gint cfd,
gint bfd,
rspamd_mempool_t *pool,
struct event_base *base,
gsize bufsize,
struct timeval *tv,
dispatcher_err_callback_t err_cb,
gpointer ud);

void rspamd_proxy_close (rspamd_proxy_t *proxy);

#endif /* PROXY_H_ */

+ 0
- 1
src/lua/CMakeLists.txt View File

@@ -12,7 +12,6 @@ SET(LUASRC ${CMAKE_CURRENT_SOURCE_DIR}/lua_common.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_redis.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_upstream.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_mempool.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_buffer.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_dns.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_rsa.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_ip.c

+ 0
- 1
src/rspamadm/CMakeLists.txt View File

@@ -16,7 +16,6 @@ SET(RSPAMADMSRC rspamadm.c
${CMAKE_SOURCE_DIR}/src/controller.c
${CMAKE_SOURCE_DIR}/src/fuzzy_storage.c
${CMAKE_SOURCE_DIR}/src/lua_worker.c
${CMAKE_SOURCE_DIR}/src/smtp_proxy.c
${CMAKE_SOURCE_DIR}/src/worker.c
${CMAKE_SOURCE_DIR}/src/rspamd_proxy.c
${CMAKE_SOURCE_DIR}/src/log_helper.c)

+ 0
- 1
src/rspamd.h View File

@@ -16,7 +16,6 @@
#include "libutil/radix.h"
#include "libserver/url.h"
#include "libserver/protocol.h"
#include "libserver/buffer.h"
#include "libserver/events.h"
#include "libserver/roll_history.h"
#include "libserver/task.h"

Loading…
Cancel
Save