From dee3bdecf7b338ded8df307703e2ebf6be209e30 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 27 Oct 2011 18:19:32 +0300 Subject: [PATCH] * Initial implementation of threaded kvstorage worker. --- CMakeLists.txt | 1 + src/cfg_xml.c | 4 + src/kvstorage_server.c | 286 +++++++++++++++++++++++++++++++++++++++++ src/kvstorage_server.h | 56 ++++++++ src/main.c | 12 ++ src/util.c | 4 +- src/util.h | 1 + 7 files changed, 362 insertions(+), 2 deletions(-) create mode 100644 src/kvstorage_server.c create mode 100644 src/kvstorage_server.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 23425e86a..0d9fc18b6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -739,6 +739,7 @@ SET(RSPAMDSRC src/modules.c src/controller.c src/fuzzy_storage.c src/greylist_storage.c + src/kvstorage_server.c src/lmtp.c src/main.c src/map.c diff --git a/src/cfg_xml.c b/src/cfg_xml.c index d66912ab9..edd8c9a17 100644 --- a/src/cfg_xml.c +++ b/src/cfg_xml.c @@ -919,6 +919,10 @@ worker_handle_type (struct config_file *cfg, struct rspamd_xml_userdata *ctx, GH wrk->type = TYPE_GREYLIST; wrk->has_socket = FALSE; } + else if (g_ascii_strcasecmp (data, "keystorage") == 0) { + wrk->type = TYPE_KVSTORAGE; + wrk->has_socket = TRUE; + } else { msg_err ("unknown worker type: %s", data); return FALSE; diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c new file mode 100644 index 000000000..78c6c34e3 --- /dev/null +++ b/src/kvstorage_server.c @@ -0,0 +1,286 @@ +/* Copyright (c) 2010, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + +#include "config.h" +#include "kvstorage.h" +#include "kvstorage_config.h" +#include "kvstorage_server.h" +#include "cfg_file.h" +#include "cfg_xml.h" +#include "main.h" + +/* This is required for normal signals processing */ +static GList *global_evbases = NULL; +static struct event_base *main_base = NULL; +static sig_atomic_t wanna_die = 0; + +/* Logging functions */ +#define thr_err(...) do { \ + g_static_mutex_lock (thr->log_mtx); \ + rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_CRITICAL, __FUNCTION__, __VA_ARGS__); \ + g_static_mutex_unlock (thr->log_mtx); \ +} while (0) + +#define thr_warn(...) do { \ + g_static_mutex_lock (thr->log_mtx); \ + rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_WARNING, __FUNCTION__, __VA_ARGS__); \ + g_static_mutex_unlock (thr->log_mtx); \ +} while (0) + +#define thr_info(...) do { \ + g_static_mutex_lock (thr->log_mtx); \ + rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_INFO, __FUNCTION__, __VA_ARGS__); \ + g_static_mutex_unlock (thr->log_mtx); \ +} while (0) + +#ifndef HAVE_SA_SIGINFO +static void +sig_handler (gint signo) +#else +static void +sig_handler (gint signo, siginfo_t *info, void *unused) +#endif +{ + struct timeval tv; + GList *cur; + + switch (signo) { + case SIGUSR1: + reopen_log (rspamd_main->logger); + break; + case SIGINT: + case SIGTERM: + if (!wanna_die) { + wanna_die = 1; + tv.tv_sec = 0; + tv.tv_usec = 0; + cur = global_evbases; + while (cur) { + event_base_loopexit (cur->data, &tv); + } + event_base_loopexit (main_base, &tv); +#ifdef WITH_GPERF_TOOLS + ProfilerStop (); +#endif + } + break; + } +} + +/* + * Config reload is designed by sending sigusr to active workers and pending shutdown of them + */ +static void +sigusr_handler (gint fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + /* Do not accept new connections, preparing to end worker's process */ + struct timeval tv; + GList *cur; + struct kvstorage_worker_ctx *ctx; + struct kvstorage_worker_thread *thr; + + ctx = worker->ctx; + if (! wanna_die) { + tv.tv_sec = SOFT_SHUTDOWN_TIME; + tv.tv_usec = 0; + event_del (&worker->sig_ev); + msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + cur = ctx->threads; + while (cur) { + thr = cur->data; + if (thr->ev_base != NULL) { + event_del (&thr->bind_ev); + event_base_loopexit (thr->ev_base, &tv); + } + } + event_base_loopexit (ctx->ev_base, &tv); + } + return; +} + +gpointer +init_kvstorage_worker (void) +{ + struct kvstorage_worker_ctx *ctx; + + ctx = g_malloc0 (sizeof (struct kvstorage_worker_ctx)); + ctx->pool = memory_pool_new (memory_pool_get_size ()); + + /* Set default values */ + ctx->timeout_raw = 300000; + + register_worker_opt (TYPE_SMTP, "timeout", xml_handle_seconds, ctx, + G_STRUCT_OFFSET (struct kvstorage_worker_ctx, timeout_raw)); + return ctx; +} + +/* Make post-init configuration */ +static gboolean +config_kvstorage_worker (struct rspamd_worker *worker) +{ + struct kvstorage_worker_ctx *ctx = worker->ctx; + + /* Init timeval */ + msec_to_tv (ctx->timeout_raw, &ctx->io_timeout); + + return TRUE; +} + +/** + * Accept function + */ +/* + * Accept new connection and construct task + */ +static void +thr_accept_socket (gint fd, short what, void *arg) +{ + struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg; + union sa_union su; + socklen_t addrlen = sizeof (su.ss); + gint nfd; + + if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { + thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno)); + return; + } + /* Check for EAGAIN */ + if (nfd == 0) { + return; + } + + if (su.ss.ss_family == AF_UNIX) { + thr_info ("%ud: accepted connection from unix socket", thr->id); + } + else if (su.ss.ss_family == AF_INET) { + thr_info ("%ud: accepted connection from %s port %d", thr->id, + inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port)); + } + /* XXX: write the logic */ + close (nfd); +} + +/** + * Thread main worker function + */ +static gpointer +kvstorage_thread (gpointer ud) +{ + struct kvstorage_worker_thread *thr = ud; + + /* Init thread specific events */ + thr->ev_base = event_init (); + event_set (&thr->bind_ev, thr->worker->cf->listen_sock, EV_READ | EV_PERSIST, thr_accept_socket, (void *)thr); + event_base_set (thr->ev_base, &thr->bind_ev); + event_add (&thr->bind_ev, NULL); + + event_base_loop (thr->ev_base, 0); + + return NULL; +} + +/** + * Create new thread, set it detached + */ +static struct kvstorage_worker_thread * +create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_ctx *ctx, guint id) +{ + struct kvstorage_worker_thread *new; + GError *err = NULL; + + new = memory_pool_alloc (ctx->pool, sizeof (struct kvstorage_worker_thread)); + new->ctx = ctx; + new->worker = worker; + new->tv = &ctx->io_timeout; + new->log_mtx = &ctx->log_mtx; + new->id = id; + new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err); + new->ev_base = NULL; + + if (new->thr == NULL) { + msg_err ("cannot create thread: %s", err->message); + } + + return new; +} + +/* + * Start worker process + */ +void +start_kvstorage_worker (struct rspamd_worker *worker) +{ + struct sigaction signals; + struct kvstorage_worker_ctx *ctx = worker->ctx; + guint i; + struct kvstorage_worker_thread *thr; + + gperf_profiler_init (worker->srv->cfg, "kvstorage"); + + if (!g_thread_supported ()) { + msg_err ("threads support is not supported on your system so kvstorage is not functionable"); + exit (EXIT_SUCCESS); + } + /* Create socketpair */ + if (make_socketpair (ctx->s_pair) == -1) { + msg_err ("cannot create socketpair, exiting"); + exit (EXIT_SUCCESS); + } + worker->srv->pid = getpid (); + ctx->ev_base = event_init (); + ctx->threads = NULL; + + g_thread_init (NULL); + main_base = ctx->ev_base; + + /* Set kvstorage options */ + if ( !config_kvstorage_worker (worker)) { + msg_err ("cannot configure kvstorage worker, exiting"); + exit (EXIT_SUCCESS); + } + + init_signals (&signals, sig_handler); + sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); + + /* SIGUSR2 handler */ + signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker); + event_base_set (ctx->ev_base, &worker->sig_ev); + signal_add (&worker->sig_ev, NULL); + + /* Start workers threads */ + g_static_mutex_init (&ctx->log_mtx); + for (i = 0; i < worker->cf->count; i ++) { + thr = create_kvstorage_thread (worker, ctx, i); + ctx->threads = g_list_prepend (ctx->threads, thr); + } + + /* Set umask */ + umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP); + + event_base_loop (ctx->ev_base, 0); + + close_log (rspamd_main->logger); + exit (EXIT_SUCCESS); +} diff --git a/src/kvstorage_server.h b/src/kvstorage_server.h new file mode 100644 index 000000000..29107101c --- /dev/null +++ b/src/kvstorage_server.h @@ -0,0 +1,56 @@ +/* Copyright (c) 2010, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + +#ifndef KVSTORAGE_SERVER_H_ +#define KVSTORAGE_SERVER_H_ + +#include "config.h" +#include "mem_pool.h" + +/* Configuration context for kvstorage worker */ +struct kvstorage_worker_ctx { + struct timeval io_timeout; + guint32 timeout_raw; + GList *threads; + gint s_pair[2]; + memory_pool_t *pool; + struct event_base *ev_base; + GStaticMutex log_mtx; +}; + +struct kvstorage_worker_thread { + struct event bind_ev; + struct timeval *tv; + struct kvstorage_worker_ctx *ctx; + struct rspamd_worker *worker; + GThread *thr; + struct event_base *ev_base; + GStaticMutex *log_mtx; + guint id; +}; + +gpointer init_kvstorage_worker (void); +void start_kvstorage_worker (struct rspamd_worker *worker); + +#endif /* KVSTORAGE_SERVER_H_ */ diff --git a/src/main.c b/src/main.c index e470a1e8b..e3e05503d 100644 --- a/src/main.c +++ b/src/main.c @@ -30,6 +30,7 @@ #include "smtp.h" #include "map.h" #include "fuzzy_storage.h" +#include "kvstorage_server.h" #include "cfg_xml.h" #include "symbols_cache.h" #include "lua/lua_common.h" @@ -379,6 +380,12 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf) msg_info ("starting greylist storage process %P", getpid ()); start_greylist_storage (cur); break; + case TYPE_KVSTORAGE: + setproctitle ("kv storage"); + rspamd_pidfile_close (rspamd->pfh); + msg_info ("starting key-value storage process %P", getpid ()); + start_kvstorage_worker (cur); + break; case TYPE_WORKER: default: setproctitle ("worker process"); @@ -572,6 +579,9 @@ spawn_workers (struct rspamd_main *rspamd) } fork_worker (rspamd, cf); } + else if (cf->type == TYPE_KVSTORAGE) { + fork_worker (rspamd, cf); + } else { for (i = 0; i < cf->count; i++) { fork_worker (rspamd, cf); @@ -806,6 +816,8 @@ init_workers_ctx (enum process_type type) return init_fuzzy_storage (); case TYPE_SMTP: return init_smtp_worker (); + case TYPE_KVSTORAGE: + return init_kvstorage_worker (); default: return NULL; } diff --git a/src/util.c b/src/util.c index d127651e7..423ceb969 100644 --- a/src/util.c +++ b/src/util.c @@ -370,10 +370,10 @@ make_socketpair (gint pair[2]) { gint r; - r = socketpair (PF_LOCAL, SOCK_STREAM, 0, pair); + r = socketpair (AF_LOCAL, SOCK_STREAM, 0, pair); if (r == -1) { - msg_warn ("socketpair failed: %d, '%s'", errno, strerror (errno)); + msg_warn ("socketpair failed: %d, '%s'", errno, strerror (errno), pair[0], pair[1]); return -1; } /* Set close on exec */ diff --git a/src/util.h b/src/util.h index c6c721ed1..a3db2d642 100644 --- a/src/util.h +++ b/src/util.h @@ -26,6 +26,7 @@ enum process_type { TYPE_SMTP, TYPE_FUZZY, TYPE_GREYLIST, + TYPE_KVSTORAGE, TYPE_MAX=255 }; -- 2.39.5