]> source.dussan.org Git - rspamd.git/commitdiff
Rework worker utilities.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 30 Apr 2014 11:45:17 +0000 (12:45 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 30 Apr 2014 11:45:38 +0000 (12:45 +0100)
src/libmime/CMakeLists.txt
src/libmime/worker_util.c [deleted file]
src/libserver/CMakeLists.txt
src/libserver/worker_util.c [new file with mode: 0644]
src/libserver/worker_util.h [new file with mode: 0644]
src/libutil/util.h
src/main.h

index 303b7a088bbc2058efe475c76959f80bb681f50c..e612dce192f62c6e94c5fe4ea1380ad0662e0672 100644 (file)
@@ -6,8 +6,7 @@ SET(LIBRSPAMDMIMESRC
                                message.c
                                protocol.c
                                smtp_utils.c
-                               smtp_proto.c
-                               worker_util.c)
+                               smtp_proto.c)
 
 # Librspamdmime
 ADD_LIBRARY(rspamd-mime ${LINK_TYPE} ${LIBRSPAMDMIMESRC})
diff --git a/src/libmime/worker_util.c b/src/libmime/worker_util.c
deleted file mode 100644 (file)
index 5507cdb..0000000
+++ /dev/null
@@ -1,217 +0,0 @@
-/* Copyright (c) 2010-2011, Vsevolod Stakhov
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *       * Redistributions of source code must retain the above copyright
- *         notice, this list of conditions and the following disclaimer.
- *       * Redistributions in binary form must reproduce the above copyright
- *         notice, this list of conditions and the following disclaimer in the
- *         documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
- * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
- * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "config.h"
-#include "main.h"
-#include "message.h"
-#include "lua/lua_common.h"
-
-extern struct rspamd_main                      *rspamd_main;
-
-/**
- * Return worker's control structure by its type
- * @param type
- * @return worker's control structure or NULL
- */
-worker_t*
-get_worker_by_type (GQuark type)
-{
-       worker_t                                                **cur;
-
-       cur = &workers[0];
-       while (*cur) {
-               if (g_quark_from_string ((*cur)->name) == type) {
-                       return *cur;
-               }
-               cur ++;
-       }
-
-       return NULL;
-}
-
-double
-set_counter (const gchar *name, guint32 value)
-{
-       struct counter_data            *cd;
-       double                          alpha;
-       gchar                           *key;
-
-       cd = rspamd_hash_lookup (rspamd_main->counters, (gpointer) name);
-
-       if (cd == NULL) {
-               cd = rspamd_mempool_alloc_shared (rspamd_main->counters->pool, sizeof (struct counter_data));
-               cd->value = value;
-               cd->number = 0;
-               key = rspamd_mempool_strdup_shared (rspamd_main->counters->pool, name);
-               rspamd_hash_insert (rspamd_main->counters, (gpointer) key, (gpointer) cd);
-       }
-       else {
-               /* Calculate new value */
-               rspamd_mempool_wlock_rwlock (rspamd_main->counters->lock);
-
-               alpha = 2. / (++cd->number + 1);
-               cd->value = cd->value * (1. - alpha) + value * alpha;
-
-               rspamd_mempool_wunlock_rwlock (rspamd_main->counters->lock);
-       }
-
-       return cd->value;
-}
-
-sig_atomic_t wanna_die = 0;
-
-#ifndef HAVE_SA_SIGINFO
-static void
-worker_sig_handler (gint signo)
-#else
-static void
-worker_sig_handler (gint signo, siginfo_t * info, void *unused)
-#endif
-{
-       struct timeval                  tv;
-
-       switch (signo) {
-       case SIGINT:
-       case SIGTERM:
-               if (!wanna_die) {
-                       wanna_die = 1;
-                       tv.tv_sec = 0;
-                       tv.tv_usec = 0;
-                       event_loopexit (&tv);
-
-#ifdef WITH_GPERF_TOOLS
-                       ProfilerStop ();
-#endif
-               }
-               break;
-       }
-}
-
-/*
- * Config reload is designed by sending sigusr2 to active workers and pending shutdown of them
- */
-static void
-worker_sigusr2_handler (gint fd, short what, void *arg)
-{
-       struct rspamd_worker           *worker = (struct rspamd_worker *) arg;
-       /* Do not accept new connections, preparing to end worker's process */
-       struct timeval                  tv;
-
-       if (!wanna_die) {
-               tv.tv_sec = SOFT_SHUTDOWN_TIME;
-               tv.tv_usec = 0;
-               event_del (&worker->sig_ev_usr1);
-               event_del (&worker->sig_ev_usr2);
-               worker_stop_accept (worker);
-               msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
-               event_loopexit (&tv);
-       }
-       return;
-}
-
-/*
- * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them
- */
-static void
-worker_sigusr1_handler (gint fd, short what, void *arg)
-{
-       struct rspamd_worker           *worker = (struct rspamd_worker *) arg;
-
-       reopen_log (worker->srv->logger);
-
-       return;
-}
-
-struct event_base *
-prepare_worker (struct rspamd_worker *worker, const char *name,
-               void (*accept_handler)(int, short, void *))
-{
-       struct event_base                *ev_base;
-       struct event                     *accept_event;
-       struct sigaction                  signals;
-       GList                             *cur;
-       gint                               listen_socket;
-
-#ifdef WITH_PROFILER
-       extern void                     _start (void), etext (void);
-       monstartup ((u_long) & _start, (u_long) & etext);
-#endif
-
-       gperf_profiler_init (worker->srv->cfg, name);
-
-       worker->srv->pid = getpid ();
-
-       ev_base = event_init ();
-
-       init_signals (&signals, worker_sig_handler);
-       sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
-
-       /* Accept all sockets */
-       cur = worker->cf->listen_socks;
-       while (cur) {
-               listen_socket = GPOINTER_TO_INT (cur->data);
-               if (listen_socket != -1) {
-                       accept_event = g_slice_alloc0 (sizeof (struct event));
-                       event_set (accept_event, listen_socket, EV_READ | EV_PERSIST,
-                                       accept_handler, worker);
-                       event_base_set (ev_base, accept_event);
-                       event_add (accept_event, NULL);
-                       worker->accept_events = g_list_prepend (worker->accept_events, accept_event);
-               }
-               cur = g_list_next (cur);
-       }
-
-       /* SIGUSR2 handler */
-       signal_set (&worker->sig_ev_usr2, SIGUSR2, worker_sigusr2_handler,
-                       (void *) worker);
-       event_base_set (ev_base, &worker->sig_ev_usr2);
-       signal_add (&worker->sig_ev_usr2, NULL);
-
-       /* SIGUSR1 handler */
-       signal_set (&worker->sig_ev_usr1, SIGUSR1, worker_sigusr1_handler,
-                       (void *) worker);
-       event_base_set (ev_base, &worker->sig_ev_usr1);
-       signal_add (&worker->sig_ev_usr1, NULL);
-
-       return ev_base;
-}
-
-void
-worker_stop_accept (struct rspamd_worker *worker)
-{
-       GList                             *cur;
-       struct event                     *event;
-
-       /* Remove all events */
-       cur = worker->accept_events;
-       while (cur) {
-               event = cur->data;
-               event_del (event);
-               cur = g_list_next (cur);
-               g_slice_free1 (sizeof (struct event), event);
-       }
-
-       if (worker->accept_events != NULL) {
-               g_list_free (worker->accept_events);
-       }
-}
index bd5df18b9e9aa5d7d79c2d5bdd52c4f09e093373..99a4debb2bf818920290625f615a9ce86827ab6a 100644 (file)
@@ -17,7 +17,9 @@ SET(LIBRSPAMDSERVERSRC
                                statfile_sync.c
                                symbols_cache.c
                                task.c
-                               url.c)
+                               url.c
+                               worker_util.c)
+
 SET(TOKENIZERSSRC  ../tokenizers/tokenizers.c
                                ../tokenizers/osb.c)
 
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
new file mode 100644 (file)
index 0000000..5507cdb
--- /dev/null
@@ -0,0 +1,217 @@
+/* Copyright (c) 2010-2011, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *       * Redistributions of source code must retain the above copyright
+ *         notice, this list of conditions and the following disclaimer.
+ *       * Redistributions in binary form must reproduce the above copyright
+ *         notice, this list of conditions and the following disclaimer in the
+ *         documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "main.h"
+#include "message.h"
+#include "lua/lua_common.h"
+
+extern struct rspamd_main                      *rspamd_main;
+
+/**
+ * Return worker's control structure by its type
+ * @param type
+ * @return worker's control structure or NULL
+ */
+worker_t*
+get_worker_by_type (GQuark type)
+{
+       worker_t                                                **cur;
+
+       cur = &workers[0];
+       while (*cur) {
+               if (g_quark_from_string ((*cur)->name) == type) {
+                       return *cur;
+               }
+               cur ++;
+       }
+
+       return NULL;
+}
+
+double
+set_counter (const gchar *name, guint32 value)
+{
+       struct counter_data            *cd;
+       double                          alpha;
+       gchar                           *key;
+
+       cd = rspamd_hash_lookup (rspamd_main->counters, (gpointer) name);
+
+       if (cd == NULL) {
+               cd = rspamd_mempool_alloc_shared (rspamd_main->counters->pool, sizeof (struct counter_data));
+               cd->value = value;
+               cd->number = 0;
+               key = rspamd_mempool_strdup_shared (rspamd_main->counters->pool, name);
+               rspamd_hash_insert (rspamd_main->counters, (gpointer) key, (gpointer) cd);
+       }
+       else {
+               /* Calculate new value */
+               rspamd_mempool_wlock_rwlock (rspamd_main->counters->lock);
+
+               alpha = 2. / (++cd->number + 1);
+               cd->value = cd->value * (1. - alpha) + value * alpha;
+
+               rspamd_mempool_wunlock_rwlock (rspamd_main->counters->lock);
+       }
+
+       return cd->value;
+}
+
+sig_atomic_t wanna_die = 0;
+
+#ifndef HAVE_SA_SIGINFO
+static void
+worker_sig_handler (gint signo)
+#else
+static void
+worker_sig_handler (gint signo, siginfo_t * info, void *unused)
+#endif
+{
+       struct timeval                  tv;
+
+       switch (signo) {
+       case SIGINT:
+       case SIGTERM:
+               if (!wanna_die) {
+                       wanna_die = 1;
+                       tv.tv_sec = 0;
+                       tv.tv_usec = 0;
+                       event_loopexit (&tv);
+
+#ifdef WITH_GPERF_TOOLS
+                       ProfilerStop ();
+#endif
+               }
+               break;
+       }
+}
+
+/*
+ * Config reload is designed by sending sigusr2 to active workers and pending shutdown of them
+ */
+static void
+worker_sigusr2_handler (gint fd, short what, void *arg)
+{
+       struct rspamd_worker           *worker = (struct rspamd_worker *) arg;
+       /* Do not accept new connections, preparing to end worker's process */
+       struct timeval                  tv;
+
+       if (!wanna_die) {
+               tv.tv_sec = SOFT_SHUTDOWN_TIME;
+               tv.tv_usec = 0;
+               event_del (&worker->sig_ev_usr1);
+               event_del (&worker->sig_ev_usr2);
+               worker_stop_accept (worker);
+               msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+               event_loopexit (&tv);
+       }
+       return;
+}
+
+/*
+ * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them
+ */
+static void
+worker_sigusr1_handler (gint fd, short what, void *arg)
+{
+       struct rspamd_worker           *worker = (struct rspamd_worker *) arg;
+
+       reopen_log (worker->srv->logger);
+
+       return;
+}
+
+struct event_base *
+prepare_worker (struct rspamd_worker *worker, const char *name,
+               void (*accept_handler)(int, short, void *))
+{
+       struct event_base                *ev_base;
+       struct event                     *accept_event;
+       struct sigaction                  signals;
+       GList                             *cur;
+       gint                               listen_socket;
+
+#ifdef WITH_PROFILER
+       extern void                     _start (void), etext (void);
+       monstartup ((u_long) & _start, (u_long) & etext);
+#endif
+
+       gperf_profiler_init (worker->srv->cfg, name);
+
+       worker->srv->pid = getpid ();
+
+       ev_base = event_init ();
+
+       init_signals (&signals, worker_sig_handler);
+       sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+
+       /* Accept all sockets */
+       cur = worker->cf->listen_socks;
+       while (cur) {
+               listen_socket = GPOINTER_TO_INT (cur->data);
+               if (listen_socket != -1) {
+                       accept_event = g_slice_alloc0 (sizeof (struct event));
+                       event_set (accept_event, listen_socket, EV_READ | EV_PERSIST,
+                                       accept_handler, worker);
+                       event_base_set (ev_base, accept_event);
+                       event_add (accept_event, NULL);
+                       worker->accept_events = g_list_prepend (worker->accept_events, accept_event);
+               }
+               cur = g_list_next (cur);
+       }
+
+       /* SIGUSR2 handler */
+       signal_set (&worker->sig_ev_usr2, SIGUSR2, worker_sigusr2_handler,
+                       (void *) worker);
+       event_base_set (ev_base, &worker->sig_ev_usr2);
+       signal_add (&worker->sig_ev_usr2, NULL);
+
+       /* SIGUSR1 handler */
+       signal_set (&worker->sig_ev_usr1, SIGUSR1, worker_sigusr1_handler,
+                       (void *) worker);
+       event_base_set (ev_base, &worker->sig_ev_usr1);
+       signal_add (&worker->sig_ev_usr1, NULL);
+
+       return ev_base;
+}
+
+void
+worker_stop_accept (struct rspamd_worker *worker)
+{
+       GList                             *cur;
+       struct event                     *event;
+
+       /* Remove all events */
+       cur = worker->accept_events;
+       while (cur) {
+               event = cur->data;
+               event_del (event);
+               cur = g_list_next (cur);
+               g_slice_free1 (sizeof (struct event), event);
+       }
+
+       if (worker->accept_events != NULL) {
+               g_list_free (worker->accept_events);
+       }
+}
diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h
new file mode 100644 (file)
index 0000000..d88b93a
--- /dev/null
@@ -0,0 +1,67 @@
+/* Copyright (c) 2014, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *       * Redistributions of source code must retain the above copyright
+ *         notice, this list of conditions and the following disclaimer.
+ *       * Redistributions in binary form must reproduce the above copyright
+ *         notice, this list of conditions and the following disclaimer in the
+ *         documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+#ifndef WORKER_UTIL_H_
+#define WORKER_UTIL_H_
+
+#include "config.h"
+#include "util.h"
+
+/**
+ * Return worker's control structure by its type
+ * @param type
+ * @return worker's control structure or NULL
+ */
+worker_t* get_worker_by_type (GQuark type);
+
+/**
+ * Set counter for a symbol
+ */
+double set_counter (const gchar *name, guint32 value);
+
+#ifndef HAVE_SA_SIGINFO
+typedef void (*rspamd_sig_handler_t) (gint);
+#else
+typedef void (*rspamd_sig_handler_t) (gint, siginfo_t *, void *);
+#endif
+
+struct rspamd_worker;
+
+/**
+ * Prepare worker's startup
+ * @param worker worker structure
+ * @param name name of the worker
+ * @param sig_handler handler of main signals
+ * @param accept_handler handler of accept event for listen sockets
+ * @return event base suitable for a worker
+ */
+struct event_base *
+prepare_worker (struct rspamd_worker *worker, const char *name,
+               void (*accept_handler)(int, short, void *));
+
+/**
+ * Stop accepting new connections for a worker
+ * @param worker
+ */
+void worker_stop_accept (struct rspamd_worker *worker);
+
+#endif /* WORKER_UTIL_H_ */
index 4359d4288b267787e5d494ed813c2e036b2570e5..dfa34a458e5fc267a65aafa6138fae03448a07c6 100644 (file)
@@ -259,13 +259,6 @@ gboolean rspamd_strtoul (const gchar *s, gsize len, gulong *value);
  */
 gint rspamd_fallocate (gint fd, off_t offset, off_t len);
 
-/**
- * Return worker's control structure by its type
- * @param type
- * @return worker's control structure or NULL
- */
-extern worker_t* get_worker_by_type (GQuark type);
-
 /**
  * Utils for working with threads to be compatible with all glib versions
  */
index 57a7907e598202d4b1dcb5bc82cb821e0735a2d0..d93c00e09c71e75231feb32cb395fca8ae0e438e 100644 (file)
@@ -22,6 +22,7 @@
 #include "roll_history.h"
 #include "http.h"
 #include "task.h"
+#include "worker_util.h"
 
 /* Default values */
 #define FIXED_CONFIG_FILE RSPAMD_CONFDIR "/rspamd.conf"
@@ -195,37 +196,6 @@ void register_custom_controller_command (const gchar *name, controller_func_t ha
  */
 extern struct rspamd_main *rspamd_main;
 
-/* Worker task manipulations */
-
-/**
- * Set counter for a symbol
- */
-double set_counter (const gchar *name, guint32 value);
-
-#ifndef HAVE_SA_SIGINFO
-typedef void (*rspamd_sig_handler_t) (gint);
-#else
-typedef void (*rspamd_sig_handler_t) (gint, siginfo_t *, void *);
-#endif
-
-/**
- * Prepare worker's startup
- * @param worker worker structure
- * @param name name of the worker
- * @param sig_handler handler of main signals
- * @param accept_handler handler of accept event for listen sockets
- * @return event base suitable for a worker
- */
-struct event_base *
-prepare_worker (struct rspamd_worker *worker, const char *name,
-               void (*accept_handler)(int, short, void *));
-
-/**
- * Stop accepting new connections for a worker
- * @param worker
- */
-void worker_stop_accept (struct rspamd_worker *worker);
-
 #endif
 
 /*