diff options
-rw-r--r-- | CMakeLists.txt | 34 | ||||
-rw-r--r-- | config.h.in | 2 | ||||
-rw-r--r-- | contrib/libev/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/CMakeLists.txt | 3 | ||||
-rw-r--r-- | src/libserver/worker_util.c | 92 | ||||
-rw-r--r-- | src/libserver/worker_util.h | 14 | ||||
-rw-r--r-- | src/libutil/addr.c | 39 | ||||
-rw-r--r-- | src/libutil/addr.h | 10 | ||||
-rw-r--r-- | src/libutil/util.c | 36 | ||||
-rw-r--r-- | src/libutil/util.h | 13 | ||||
-rw-r--r-- | src/lua/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/lua/lua_common.c | 1 | ||||
-rw-r--r-- | src/lua/lua_common.h | 1 | ||||
-rw-r--r-- | src/lua/lua_fann.c | 1032 | ||||
-rw-r--r-- | src/rspamd.c | 77 | ||||
-rw-r--r-- | src/rspamd.h | 10 |
16 files changed, 126 insertions, 1240 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 893cce7f6..d5f37900a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -52,9 +52,7 @@ OPTION(WANT_SYSTEMD_UNITS "Install systemd unit files on Linux [default: OFF]" OPTION(ENABLE_SNOWBALL "Enable snowball stemmer [default: ON]" ON) OPTION(ENABLE_CLANG_PLUGIN "Enable clang static analysing plugin [default: OFF]" OFF) OPTION(ENABLE_HYPERSCAN "Enable hyperscan for fast regexp processing [default: OFF]" OFF) -OPTION(ENABLE_FANN "Enable fann for neural network plugin [default: OFF]" OFF) OPTION(ENABLE_PCRE2 "Enable pcre2 instead of pcre [default: OFF]" OFF) -OPTION(ENABLE_GD "Enable libgd for images processing [default: OFF]" OFF) OPTION(ENABLE_JEMALLOC "Build rspamd with jemalloc allocator [default: OFF]" OFF) OPTION(ENABLE_COVERAGE "Build rspamd with code coverage options [default: OFF]" OFF) OPTION(ENABLE_FULL_DEBUG "Build rspamd with all possible debug [default: OFF]" OFF) @@ -634,9 +632,7 @@ IF(ENABLE_LIBUNWIND MATCHES "ON") ROOT ${LIBUNWIND_ROOT_DIR} MODULES libunwind) SET(WITH_LIBUNWIND "1") ENDIF() -ProcessPackage(GTHREAD2 LIBRARY gthread-2.0 INCLUDE glib.h - INCLUDE_SUFFIXES include/glib include/glib-2.0 - ROOT ${GLIB_ROOT_DIR} MODULES gthread-2.0>=2.28) + ProcessPackage(GLIB2 LIBRARY glib-2.0 INCLUDE glib.h INCLUDE_SUFFIXES include/glib include/glib-2.0 ROOT ${GLIB_ROOT_DIR} MODULES glib-2.0>=2.28) @@ -672,34 +668,6 @@ IF(ENABLE_HYPERSCAN MATCHES "ON") ROOT ${HYPERSCAN_ROOT_DIR} MODULES libhs) SET(WITH_HYPERSCAN 1) ENDIF() -IF (ENABLE_FANN MATCHES "ON") - ProcessPackage(FANN LIBRARY fann INCLUDE fann.h INCLUDE_SUFFIXES - include/fann - ROOT ${FANN_ROOT_DIR} MODULES fann) - SET(WITH_FANN 1) -ENDIF () - -IF (ENABLE_GD MATCHES "ON") - ProcessPackage(GD LIBRARY gd INCLUDE gd.h INCLUDE_SUFFIXES - include/gd include/libgd - ROOT ${GD_ROOT_DIR} MODULES gd) - LIST(APPEND CMAKE_REQUIRED_INCLUDES "${GD_INCLUDE}") - LIST(APPEND CMAKE_REQUIRED_LIBRARIES "${GD_LIBRARY}") - - CHECK_SYMBOL_EXISTS(gdImageSetInterpolationMethod gd.h GD_INTERPOLATION) - CHECK_SYMBOL_EXISTS(gdImageScale gd.h GD_SCALE) - CHECK_SYMBOL_EXISTS(gdImageGrayScale gd.h GD_GRAYSCALE) - CHECK_SYMBOL_EXISTS(gdImageCreateFromJpegPtr gd.h GD_JPEG) - CHECK_SYMBOL_EXISTS(gdImageCreateFromPngPtr gd.h GD_PNG) - CHECK_SYMBOL_EXISTS(gdImageCreateFromBmpPtr gd.h GD_BMP) - CHECK_SYMBOL_EXISTS(gdImageCreateFromGifPtr gd.h GD_GIF) - - IF(GD_INTERPOLATION AND GD_SCALE AND GD_GRAYSCALE AND GD_JPEG AND GD_PNG AND GD_GIF AND GD_BMP) - SET(USABLE_GD 1) - ELSE() - MESSAGE(STATUS "Libgd is found but it is unusable") - ENDIF() -ENDIF () #Check for openssl (required for dkim) SET(HAVE_OPENSSL 1) diff --git a/config.h.in b/config.h.in index 0693cd55c..9dcf51a50 100644 --- a/config.h.in +++ b/config.h.in @@ -139,8 +139,6 @@ #cmakedefine LIBEVENT_EVHTTP 1 #cmakedefine PARAM_H_HAS_BITSET 1 #cmakedefine WITH_DB 1 -#cmakedefine WITH_FANN 1 -#cmakedefine USABLE_GD 1 #cmakedefine WITH_GPERF_TOOLS 1 #cmakedefine WITH_HIREDIS 1 #cmakedefine WITH_HYPERSCAN 1 diff --git a/contrib/libev/CMakeLists.txt b/contrib/libev/CMakeLists.txt index ca216d180..e681aeb91 100644 --- a/contrib/libev/CMakeLists.txt +++ b/contrib/libev/CMakeLists.txt @@ -59,7 +59,6 @@ ADD_LIBRARY(rspamd-libev STATIC ${LIBEVSRC}) ADD_DEFINITIONS("-DEV_CONFIG_H=\"${CMAKE_CURRENT_BINARY_DIR}/libev-config.h\"" -DEV_MULTIPLICITY=1 -DEV_USE_FLOOR=1 - -DEV_USE_MONOTONIC=1 -DEV_NO_THREADS=1 # We do not have threads in Rspamd! -DEV_FEATURES=127 # Enable all features ) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3e6abd4e8..da5c19e2d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -101,8 +101,7 @@ SET(PLUGINSSRC plugins/surbl.c plugins/fuzzy_check.c plugins/spf.c plugins/dkim_check.c - libserver/rspamd_control.c - lua/lua_fann.c) + libserver/rspamd_control.c) SET(MODULES_LIST surbl regexp chartable fuzzy_check spf dkim) SET(WORKERS_LIST normal controller fuzzy lua rspamd_proxy log_helper) diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index 59698be32..0d4c4db17 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -299,12 +299,12 @@ rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base) struct ev_loop * rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, - void (*accept_handler)(int, short, void *)) + rspamd_accept_handler hdl) { - struct ev_loop *ev_base; - struct event *accept_events; + struct ev_loop *event_loop; GList *cur; struct rspamd_worker_listen_socket *ls; + struct rspamd_worker_accept_event *accept_ev; #ifdef WITH_PROFILER extern void _start (void), etext (void); @@ -316,65 +316,58 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, worker->signal_events = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, rspamd_sigh_free); - ev_base = event_init (); + event_loop = ev_default_loop (EVFLAG_SIGNALFD); - rspamd_worker_init_signals (worker, ev_base); - rspamd_control_worker_add_default_handler (worker, ev_base); + rspamd_worker_init_signals (worker, event_loop); + rspamd_control_worker_add_default_handler (worker, event_loop); #ifdef WITH_HIREDIS rspamd_redis_pool_config (worker->srv->cfg->redis_pool, - worker->srv->cfg, ev_base); + worker->srv->cfg, event_loop); #endif /* Accept all sockets */ - if (accept_handler) { + if (hdl) { cur = worker->cf->listen_socks; while (cur) { ls = cur->data; if (ls->fd != -1) { - accept_events = g_malloc0 (sizeof (struct event) * 2); - event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST, - accept_handler, worker); - event_base_set (ev_base, &accept_events[0]); - event_add (&accept_events[0], NULL); - worker->accept_events = g_list_prepend (worker->accept_events, - accept_events); + accept_ev = g_malloc0 (sizeof (*accept_ev)); + accept_ev->event_loop = event_loop; + ev_io_init (&accept_ev->accept_ev, hdl, ls->fd, EV_READ); + ev_io_start (event_loop, &accept_ev->accept_ev); + + DL_APPEND (worker->accept_events, accept_ev); } cur = g_list_next (cur); } } - return ev_base; + return event_loop; } void rspamd_worker_stop_accept (struct rspamd_worker *worker) { - GList *cur; - struct event *events; + struct rspamd_worker_accept_event *cur, *tmp; /* Remove all events */ - cur = worker->accept_events; - while (cur) { - events = cur->data; + DL_FOREACH_SAFE (worker->accept_events, cur, tmp) { - if (rspamd_event_pending (&events[0], EV_TIMEOUT|EV_READ|EV_WRITE)) { - event_del (&events[0]); + if (ev_is_active (&cur->accept_ev) || ev_is_pending (&cur->accept_ev)) { + ev_io_stop (cur->event_loop, &cur->accept_ev); } - if (rspamd_event_pending (&events[1], EV_TIMEOUT|EV_READ|EV_WRITE)) { - event_del (&events[1]); + + if (ev_is_active (&cur->throttling_ev) || ev_is_pending (&cur->throttling_ev)) { + ev_timer_stop (cur->event_loop, &cur->throttling_ev); } - cur = g_list_next (cur); - g_free (events); + g_free (cur); } - if (worker->accept_events != NULL) { - g_list_free (worker->accept_events); - } /* XXX: we need to do it much later */ #if 0 g_hash_table_iter_init (&it, worker->signal_events); @@ -721,16 +714,6 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, rspamd_log_open (rspamd_main->logger); wrk->start_time = rspamd_get_calendar_ticks (); -#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) - # if (GLIB_MINOR_VERSION > 20) - /* Ugly hack for old glib */ - if (!g_thread_get_initialized ()) { - g_thread_init (NULL); - } -# else - g_thread_init (NULL); -# endif -#endif if (cf->bind_conf) { msg_info_main ("starting %s process %P (%d); listen on: %s", cf->worker->name, @@ -1129,4 +1112,33 @@ rspamd_set_crash_handler (struct rspamd_main *rspamd_main) sigaction (SIGFPE, &sa, NULL); sigaction (SIGSYS, &sa, NULL); #endif +} + +static void +rspamd_enable_accept_event (gint fd, short what, gpointer d) +{ + struct event *events = d; + + event_del (&events[1]); + event_add (&events[0], NULL); +} + +void +rspamd_worker_throttle_accept_events (gint sock, void *data) +{ + struct rspamd_worker_accept_event *head, *cur; + const gdouble throttling = 0.5; + struct ev_loop *ev_base; + + head = (struct rspamd_worker_accept_event *)data; + + DL_FOREACH (head, cur) { + + ev_base = event_get_base (&events[0]); + event_del (&events[0]); + event_set (&events[1], sock, EV_TIMEOUT, rspamd_enable_accept_event, + events); + event_base_set (ev_base, &events[1]); + event_add (&events[1], &tv); + } }
\ No newline at end of file diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h index 2dc78dfc7..67b54e5c9 100644 --- a/src/libserver/worker_util.h +++ b/src/libserver/worker_util.h @@ -36,6 +36,9 @@ struct rspamd_worker_signal_handler; * @param base */ void rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base); + +typedef void (*rspamd_accept_handler)(struct ev_loop *loop, ev_io *w, int revents); + /** * Prepare worker's startup * @param worker worker structure @@ -46,7 +49,7 @@ void rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *b */ struct ev_loop * rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, - void (*accept_handler)(int, short, void *)); + rspamd_accept_handler hdl); /** * Set special signal handler for a worker @@ -124,8 +127,6 @@ void rspamd_controller_send_ucl (struct rspamd_http_connection_entry *entry, */ worker_t * rspamd_get_worker_by_type (struct rspamd_config *cfg, GQuark type); -void rspamd_worker_stop_accept (struct rspamd_worker *worker); - /** * Block signals before terminations */ @@ -202,6 +203,13 @@ void rspamd_worker_init_monitored (struct rspamd_worker *worker, struct ev_loop *ev_base, struct rspamd_dns_resolver *resolver); +/** + * Performs throttling for accept events + * @param sock + * @param data struct rspamd_worker_accept_event * list + */ +void rspamd_worker_throttle_accept_events (gint sock, void *data); + #define msg_err_main(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ rspamd_main->server_pool->tag.tagname, rspamd_main->server_pool->tag.uid, \ G_STRFUNC, \ diff --git a/src/libutil/addr.c b/src/libutil/addr.c index 339f5facb..112c5d2cd 100644 --- a/src/libutil/addr.c +++ b/src/libutil/addr.c @@ -203,41 +203,10 @@ rspamd_ip_is_valid (const rspamd_inet_addr_t *addr) return ret; } -static void -rspamd_enable_accept_event (gint fd, short what, gpointer d) -{ - struct event *events = d; - - event_del (&events[1]); - event_add (&events[0], NULL); -} - -static void -rspamd_disable_accept_events (gint sock, GList *accept_events) -{ - GList *cur; - struct event *events; - const gdouble throttling = 0.5; - struct timeval tv; - struct ev_loop *ev_base; - - double_to_tv (throttling, &tv); - - for (cur = accept_events; cur != NULL; cur = g_list_next (cur)) { - events = cur->data; - - ev_base = event_get_base (&events[0]); - event_del (&events[0]); - event_set (&events[1], sock, EV_TIMEOUT, rspamd_enable_accept_event, - events); - event_base_set (ev_base, &events[1]); - event_add (&events[1], &tv); - } -} - gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **target, - GList *accept_events) + rspamd_accept_throttling_handler hdl, + void *hdl_data) { gint nfd, serrno; union sa_union su; @@ -254,7 +223,9 @@ rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **target, } else if (errno == EMFILE || errno == ENFILE) { /* Temporary disable accept event */ - rspamd_disable_accept_events (sock, accept_events); + if (hdl) { + hdl (sock, hdl_data); + } return 0; } diff --git a/src/libutil/addr.h b/src/libutil/addr.h index bfe586ad1..7efa5e318 100644 --- a/src/libutil/addr.h +++ b/src/libutil/addr.h @@ -221,15 +221,17 @@ int rspamd_inet_address_listen (const rspamd_inet_addr_t *addr, gint type, */ gboolean rspamd_ip_is_valid (const rspamd_inet_addr_t *addr); +typedef void (*rspamd_accept_throttling_handler)(gint, void *); /** * Accept from listening socket filling addr structure * @param sock listening socket - * @param addr allocated inet addr structure - * @param accept_events events for accepting new sockets + * @param target allocated inet addr structure * @return */ -gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **addr, - GList *accept_events); +gint rspamd_accept_from_socket (gint sock, + rspamd_inet_addr_t **target, + rspamd_accept_throttling_handler hdl, + void *hdl_data); /** * Parse host[:port[:priority]] line diff --git a/src/libutil/util.c b/src/libutil/util.c index 665b6accb..e7a5c2601 100644 --- a/src/libutil/util.c +++ b/src/libutil/util.c @@ -1612,42 +1612,6 @@ rspamd_thread_func (gpointer ud) return ud; } -/** - * Create new named thread - * @param name name pattern - * @param func function to start - * @param data data to pass to function - * @param err error pointer - * @return new thread object that can be joined - */ -GThread * -rspamd_create_thread (const gchar *name, - GThreadFunc func, - gpointer data, - GError **err) -{ - GThread *new; - struct rspamd_thread_data *td; - static gint32 id; - guint r; - - r = strlen (name); - td = g_malloc (sizeof (struct rspamd_thread_data)); - td->id = ++id; - td->name = g_malloc (r + sizeof ("4294967296")); - td->func = func; - td->data = data; - - rspamd_snprintf (td->name, r + sizeof ("4294967296"), "%s-%d", name, id); -#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 32)) - new = g_thread_try_new (td->name, rspamd_thread_func, td, err); -#else - new = g_thread_create (rspamd_thread_func, td, TRUE, err); -#endif - - return new; -} - struct hash_copy_callback_data { gpointer (*key_copy_func)(gconstpointer data, gpointer ud); gpointer (*value_copy_func)(gconstpointer data, gpointer ud); diff --git a/src/libutil/util.h b/src/libutil/util.h index b7d6055ce..21e4b320e 100644 --- a/src/libutil/util.h +++ b/src/libutil/util.h @@ -263,19 +263,6 @@ void rspamd_mutex_unlock (rspamd_mutex_t *mtx); void rspamd_mutex_free (rspamd_mutex_t *mtx); /** - * Create new named thread - * @param name name pattern - * @param func function to start - * @param data data to pass to function - * @param err error pointer - * @return new thread object that can be joined - */ -GThread * rspamd_create_thread (const gchar *name, - GThreadFunc func, - gpointer data, - GError **err); - -/** * Deep copy of one hash table to another * @param src source hash * @param dst destination hash diff --git a/src/lua/CMakeLists.txt b/src/lua/CMakeLists.txt index c3f1a84c1..4a2003605 100644 --- a/src/lua/CMakeLists.txt +++ b/src/lua/CMakeLists.txt @@ -22,7 +22,6 @@ SET(LUASRC ${CMAKE_CURRENT_SOURCE_DIR}/lua_common.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_util.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_tcp.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_html.c - ${CMAKE_CURRENT_SOURCE_DIR}/lua_fann.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_sqlite3.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_cryptobox.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_map.c diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c index 5f31ebbb0..689dcd1c4 100644 --- a/src/lua/lua_common.c +++ b/src/lua/lua_common.c @@ -910,7 +910,6 @@ rspamd_lua_init (bool wipe_mem) luaopen_util (L); luaopen_tcp (L); luaopen_html (L); - luaopen_fann (L); luaopen_sqlite3 (L); luaopen_cryptobox (L); luaopen_dns (L); diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h index f3d006743..8919a46fd 100644 --- a/src/lua/lua_common.h +++ b/src/lua/lua_common.h @@ -281,7 +281,6 @@ void luaopen_text (lua_State *L); void luaopen_util (lua_State * L); void luaopen_tcp (lua_State * L); void luaopen_html (lua_State * L); -void luaopen_fann (lua_State *L); void luaopen_sqlite3 (lua_State *L); void luaopen_cryptobox (lua_State *L); void luaopen_dns (lua_State *L); diff --git a/src/lua/lua_fann.c b/src/lua/lua_fann.c deleted file mode 100644 index 7eb493740..000000000 --- a/src/lua/lua_fann.c +++ /dev/null @@ -1,1032 +0,0 @@ -/*- - * Copyright 2016 Vsevolod Stakhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "lua_common.h" - -#ifdef WITH_FANN -#include <fann.h> -#endif - -#include "unix-std.h" - -/*** - * @module rspamd_fann - * This module enables [fann](http://libfann.github.io) interaction in rspamd - * Please note, that this module works merely if you have `ENABLE_FANN=ON` option - * definition when building rspamd - */ - -/* - * Fann functions - */ -LUA_FUNCTION_DEF (fann, is_enabled); -LUA_FUNCTION_DEF (fann, create); -LUA_FUNCTION_DEF (fann, create_full); -LUA_FUNCTION_DEF (fann, load_file); -LUA_FUNCTION_DEF (fann, load_data); - -/* - * Fann methods - */ -LUA_FUNCTION_DEF (fann, train); -LUA_FUNCTION_DEF (fann, train_threaded); -LUA_FUNCTION_DEF (fann, test); -LUA_FUNCTION_DEF (fann, save); -LUA_FUNCTION_DEF (fann, data); -LUA_FUNCTION_DEF (fann, get_inputs); -LUA_FUNCTION_DEF (fann, get_outputs); -LUA_FUNCTION_DEF (fann, get_layers); -LUA_FUNCTION_DEF (fann, get_mse); -LUA_FUNCTION_DEF (fann, dtor); - -static const struct luaL_reg fannlib_f[] = { - LUA_INTERFACE_DEF (fann, is_enabled), - LUA_INTERFACE_DEF (fann, create), - LUA_INTERFACE_DEF (fann, create_full), - LUA_INTERFACE_DEF (fann, load_file), - {"load", lua_fann_load_file}, - LUA_INTERFACE_DEF (fann, load_data), - {NULL, NULL} -}; - -static const struct luaL_reg fannlib_m[] = { - LUA_INTERFACE_DEF (fann, train), - LUA_INTERFACE_DEF (fann, train_threaded), - LUA_INTERFACE_DEF (fann, test), - LUA_INTERFACE_DEF (fann, save), - LUA_INTERFACE_DEF (fann, data), - LUA_INTERFACE_DEF (fann, get_inputs), - LUA_INTERFACE_DEF (fann, get_outputs), - LUA_INTERFACE_DEF (fann, get_layers), - LUA_INTERFACE_DEF (fann, get_mse), - {"__gc", lua_fann_dtor}, - {"__tostring", rspamd_lua_class_tostring}, - {NULL, NULL} -}; - -#ifdef WITH_FANN -struct fann * -rspamd_lua_check_fann (lua_State *L, gint pos) -{ - void *ud = rspamd_lua_check_udata (L, pos, "rspamd{fann}"); - luaL_argcheck (L, ud != NULL, pos, "'fann' expected"); - return ud ? *((struct fann **) ud) : NULL; -} -#endif - -/*** - * @function rspamd_fann.is_enabled() - * Checks if fann is enabled for this rspamd build - * @return {boolean} true if fann is enabled - */ -static gint -lua_fann_is_enabled (lua_State *L) -{ -#ifdef WITH_FANN - lua_pushboolean (L, true); -#else - lua_pushboolean (L, false); -#endif - return 1; -} - -/*** - * @function rspamd_fann.create(nlayers, [layer1, ... layern]) - * Creates new neural network with `nlayers` that contains `layer1`...`layern` - * neurons in each layer - * @param {number} nlayers number of layers - * @param {number} layerI number of neurons in each layer - * @return {fann} fann object - */ -static gint -lua_fann_create (lua_State *L) -{ -#ifndef WITH_FANN - return 0; -#else - struct fann *f, **pfann; - guint nlayers, *layers, i; - - nlayers = luaL_checknumber (L, 1); - - if (nlayers > 0) { - layers = g_malloc (nlayers * sizeof (layers[0])); - - if (lua_type (L, 2) == LUA_TNUMBER) { - for (i = 0; i < nlayers; i ++) { - layers[i] = luaL_checknumber (L, i + 2); - } - } - else if (lua_type (L, 2) == LUA_TTABLE) { - for (i = 0; i < nlayers; i ++) { - lua_rawgeti (L, 2, i + 1); - layers[i] = luaL_checknumber (L, -1); - lua_pop (L, 1); - } - } - - f = fann_create_standard_array (nlayers, layers); - fann_set_activation_function_hidden (f, FANN_SIGMOID_SYMMETRIC); - fann_set_activation_function_output (f, FANN_SIGMOID_SYMMETRIC); - fann_set_training_algorithm (f, FANN_TRAIN_INCREMENTAL); - fann_randomize_weights (f, 0, 1); - - if (f != NULL) { - pfann = lua_newuserdata (L, sizeof (gpointer)); - *pfann = f; - rspamd_lua_setclass (L, "rspamd{fann}", -1); - } - else { - lua_pushnil (L); - } - - g_free (layers); - } - else { - lua_pushnil (L); - } - - return 1; -#endif -} - -#ifdef WITH_FANN -static enum fann_activationfunc_enum -string_to_activation_func (const gchar *str) -{ - if (str == NULL) { - return FANN_SIGMOID_SYMMETRIC; - } - if (strcmp (str, "sigmoid") == 0) { - return FANN_SIGMOID; - } - else if (strcmp (str, "elliot") == 0) { - return FANN_ELLIOT; - } - else if (strcmp (str, "elliot_symmetric") == 0) { - return FANN_ELLIOT_SYMMETRIC; - } - else if (strcmp (str, "linear") == 0) { - return FANN_LINEAR; - } - - return FANN_SIGMOID_SYMMETRIC; -} - -static enum fann_train_enum -string_to_learn_alg (const gchar *str) -{ - if (str == NULL) { - return FANN_TRAIN_INCREMENTAL; - } - if (strcmp (str, "rprop") == 0) { - return FANN_TRAIN_RPROP; - } - else if (strcmp (str, "qprop") == 0) { - return FANN_TRAIN_QUICKPROP; - } - else if (strcmp (str, "batch") == 0) { - return FANN_TRAIN_BATCH; - } - - return FANN_TRAIN_INCREMENTAL; -} -/* - * This is needed since libfann provides no versioning macros... - */ -static struct fann_train_data * -rspamd_fann_create_train (guint num_data, guint num_input, guint num_output) -{ - struct fann_train_data *t; - fann_type *inp, *outp; - guint i; - - g_assert (num_data > 0 && num_input > 0 && num_output > 0); - - t = calloc (1, sizeof (*t)); - g_assert (t != NULL); - - t->num_data = num_data; - t->num_input = num_input; - t->num_output = num_output; - - t->input = calloc (num_data, sizeof (fann_type *)); - g_assert (t->input != NULL); - - t->output = calloc (num_data, sizeof (fann_type *)); - g_assert (t->output != NULL); - - inp = calloc (num_data * num_input, sizeof (fann_type)); - g_assert (inp != NULL); - - outp = calloc (num_data * num_output, sizeof (fann_type)); - g_assert (outp != NULL); - - for (i = 0; i < num_data; i ++) { - t->input[i] = inp; - inp += num_input; - t->output[i] = outp; - outp += num_output; - } - - return t; -} - - -#endif - -/*** - * @function rspamd_fann.create_full(params) - * Creates new neural network with parameters: - * - `layers` {table/numbers}: table of layers in form: {N1, N2, N3 ... Nn} where N is number of neurons in a layer - * - `activation_hidden` {string}: activation function type for hidden layers (`tanh` by default) - * - `activation_output` {string}: activation function type for output layer (`tanh` by default) - * - `sparsed` {float}: create sparsed ANN, where number is a coefficient for sparsing - * - `learn` {string}: learning algorithm (quickprop, rprop or incremental) - * - `randomize` {boolean}: randomize weights (true by default) - * @return {fann} fann object - */ -static gint -lua_fann_create_full (lua_State *L) -{ -#ifndef WITH_FANN - return 0; -#else - struct fann *f, **pfann; - guint nlayers, *layers, i; - const gchar *activation_hidden = NULL, *activation_output, *learn_alg = NULL; - gdouble sparsed = 0.0; - gboolean randomize_ann = TRUE; - GError *err = NULL; - - if (lua_type (L, 1) == LUA_TTABLE) { - lua_pushstring (L, "layers"); - lua_gettable (L, 1); - - if (lua_type (L, -1) != LUA_TTABLE) { - return luaL_error (L, "bad layers attribute"); - } - - nlayers = rspamd_lua_table_size (L, -1); - if (nlayers < 2) { - return luaL_error (L, "bad layers attribute"); - } - - layers = g_new0 (guint, nlayers); - - for (i = 0; i < nlayers; i ++) { - lua_rawgeti (L, -1, i + 1); - layers[i] = luaL_checknumber (L, -1); - lua_pop (L, 1); - } - - lua_pop (L, 1); /* Table */ - - if (!rspamd_lua_parse_table_arguments (L, 1, &err, - "sparsed=N;randomize=B;learn=S;activation_hidden=S;activation_output=S", - &sparsed, &randomize_ann, &learn_alg, &activation_hidden, &activation_output)) { - g_free (layers); - - if (err) { - gint r; - - r = luaL_error (L, "invalid arguments: %s", err->message); - g_error_free (err); - return r; - } - else { - return luaL_error (L, "invalid arguments"); - } - } - - if (sparsed != 0.0) { - f = fann_create_standard_array (nlayers, layers); - } - else { - f = fann_create_sparse_array (sparsed, nlayers, layers); - } - - if (f != NULL) { - pfann = lua_newuserdata (L, sizeof (gpointer)); - *pfann = f; - rspamd_lua_setclass (L, "rspamd{fann}", -1); - } - else { - g_free (layers); - return luaL_error (L, "cannot create fann"); - } - - fann_set_activation_function_hidden (f, - string_to_activation_func (activation_hidden)); - fann_set_activation_function_output (f, - string_to_activation_func (activation_output)); - fann_set_training_algorithm (f, string_to_learn_alg (learn_alg)); - - if (randomize_ann) { - fann_randomize_weights (f, 0, 1); - } - - g_free (layers); - } - else { - return luaL_error (L, "bad arguments"); - } - - return 1; -#endif -} - -/*** - * @function rspamd_fann.load(file) - * Loads neural network from the file - * @param {string} file filename where fann is stored - * @return {fann} fann object - */ -static gint -lua_fann_load_file (lua_State *L) -{ -#ifndef WITH_FANN - return 0; -#else - struct fann *f, **pfann; - const gchar *fname; - - fname = luaL_checkstring (L, 1); - - if (fname != NULL) { - f = fann_create_from_file (fname); - - if (f != NULL) { - pfann = lua_newuserdata (L, sizeof (gpointer)); - *pfann = f; - rspamd_lua_setclass (L, "rspamd{fann}", -1); - } - else { - lua_pushnil (L); - } - } - else { - lua_pushnil (L); - } - - return 1; -#endif -} - -/*** - * @function rspamd_fann.load_data(data) - * Loads neural network from the data - * @param {string} file filename where fann is stored - * @return {fann} fann object - */ -static gint -lua_fann_load_data (lua_State *L) -{ -#ifndef WITH_FANN - return 0; -#else - struct fann *f, **pfann; - gint fd; - struct rspamd_lua_text *t; - gchar fpath[PATH_MAX]; - - if (lua_type (L, 1) == LUA_TUSERDATA) { - t = lua_check_text (L, 1); - - if (!t) { - return luaL_error (L, "text required"); - } - } - else { - t = g_alloca (sizeof (*t)); - t->start = lua_tolstring (L, 1, (gsize *)&t->len); - t->flags = 0; - } - - /* We need to save data to file because of libfann stupidity */ - rspamd_strlcpy (fpath, "/tmp/rspamd-fannXXXXXXXXXX", sizeof (fpath)); - fd = mkstemp (fpath); - - if (fd == -1) { - msg_warn ("cannot create tempfile: %s", strerror (errno)); - lua_pushnil (L); - } - else { - if (write (fd, t->start, t->len) == -1) { - msg_warn ("cannot write tempfile: %s", strerror (errno)); - lua_pushnil (L); - unlink (fpath); - close (fd); - - return 1; - } - - f = fann_create_from_file (fpath); - unlink (fpath); - close (fd); - - if (f != NULL) { - pfann = lua_newuserdata (L, sizeof (gpointer)); - *pfann = f; - rspamd_lua_setclass (L, "rspamd{fann}", -1); - } - else { - lua_pushnil (L); - } - } - - return 1; -#endif -} - -/*** - * @function rspamd_fann:data() - * Returns serialized neural network - * @return {rspamd_text} fann data - */ -static gint -lua_fann_data (lua_State *L) -{ -#ifndef WITH_FANN - return 0; -#else - struct fann *f = rspamd_lua_check_fann (L, 1); - gint fd; - struct rspamd_lua_text *res; - gchar fpath[PATH_MAX]; - gpointer map; - gsize sz; - - if (f == NULL) { - return luaL_error (L, "invalid arguments"); - } - - /* We need to save data to file because of libfann stupidity */ - rspamd_strlcpy (fpath, "/tmp/rspamd-fannXXXXXXXXXX", sizeof (fpath)); - fd = mkstemp (fpath); - - if (fd == -1) { - msg_warn ("cannot create tempfile: %s", strerror (errno)); - lua_pushnil (L); - } - else { - if (fann_save (f, fpath) == -1) { - msg_warn ("cannot write tempfile: %s", strerror (errno)); - lua_pushnil (L); - unlink (fpath); - close (fd); - - return 1; - } - - - (void)lseek (fd, 0, SEEK_SET); - map = rspamd_file_xmap (fpath, PROT_READ, &sz, TRUE); - unlink (fpath); - close (fd); - - if (map != NULL) { - res = lua_newuserdata (L, sizeof (*res)); - res->len = sz; - res->start = map; - res->flags = RSPAMD_TEXT_FLAG_OWN|RSPAMD_TEXT_FLAG_MMAPED; - rspamd_lua_setclass (L, "rspamd{text}", -1); - } - else { - lua_pushnil (L); - } - - } - - return 1; -#endif -} - - -/** - * @method rspamd_fann:train(inputs, outputs) - * Trains neural network with samples. Inputs and outputs should be tables of - * equal size, each row in table should be N inputs and M outputs, e.g. - * {0, 1, 1} -> {0} - * @param {table} inputs input samples - * @param {table} outputs output samples - * @return {number} number of samples learned - */ -static gint -lua_fann_train (lua_State *L) -{ -#ifndef WITH_FANN - return 0; -#else - struct fann *f = rspamd_lua_check_fann (L, 1); - guint ninputs, noutputs, j; - fann_type *cur_input, *cur_output; - gboolean ret = FALSE; - - if (f != NULL) { - /* First check sanity, call for table.getn for that */ - ninputs = rspamd_lua_table_size (L, 2); - noutputs = rspamd_lua_table_size (L, 3); - - if (ninputs != fann_get_num_input (f) || - noutputs != fann_get_num_output (f)) { - msg_err ("bad number of inputs(%d, expected %d) and " - "output(%d, expected %d) args for train", - ninputs, fann_get_num_input (f), - noutputs, fann_get_num_output (f)); - } - else { - cur_input = g_malloc (ninputs * sizeof (fann_type)); - - for (j = 0; j < ninputs; j ++) { - lua_rawgeti (L, 2, j + 1); - cur_input[j] = lua_tonumber (L, -1); - lua_pop (L, 1); - } - - cur_output = g_malloc (noutputs * sizeof (fann_type)); - - for (j = 0; j < noutputs; j++) { - lua_rawgeti (L, 3, j + 1); - cur_output[j] = lua_tonumber (L, -1); - lua_pop (L, 1); - } - - fann_train (f, cur_input, cur_output); - g_free (cur_input); - g_free (cur_output); - - ret = TRUE; - } - } - - lua_pushboolean (L, ret); - - return 1; -#endif -} - -#ifdef WITH_FANN -struct lua_fann_train_cbdata { - lua_State *L; - gint pair[2]; - struct fann_train_data *train; - struct fann *f; - gint cbref; - gdouble desired_mse; - guint max_epochs; - GThread *t; - struct event io; -}; - -struct lua_fann_train_reply { - gint errcode; - float mse; - gchar errmsg[128]; -}; - -static void -lua_fann_push_train_result (struct lua_fann_train_cbdata *cbdata, - gint errcode, float mse, const gchar *errmsg) -{ - lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref); - lua_pushnumber (cbdata->L, errcode); - lua_pushstring (cbdata->L, errmsg); - lua_pushnumber (cbdata->L, mse); - - if (lua_pcall (cbdata->L, 3, 0, 0) != 0) { - msg_err ("call to train callback failed: %s", lua_tostring (cbdata->L, -1)); - lua_pop (cbdata->L, 1); - } -} - -static void -lua_fann_thread_notify (gint fd, short what, gpointer ud) -{ - struct lua_fann_train_cbdata *cbdata = ud; - struct lua_fann_train_reply rep; - - if (read (cbdata->pair[0], &rep, sizeof (rep)) == -1) { - if (errno == EAGAIN || errno == EINTR) { - event_add (&cbdata->io, NULL); - return; - } - - lua_fann_push_train_result (cbdata, errno, 0.0, strerror (errno)); - } - else { - lua_fann_push_train_result (cbdata, rep.errcode, rep.mse, rep.errmsg); - } - - g_assert (write (cbdata->pair[0], "", 1) == 1); - g_thread_join (cbdata->t); - close (cbdata->pair[0]); - close (cbdata->pair[1]); - - fann_destroy_train (cbdata->train); - luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref); - g_free (cbdata); -} - -static void * -lua_fann_train_thread (void *ud) -{ - struct lua_fann_train_cbdata *cbdata = ud; - struct lua_fann_train_reply rep; - gchar repbuf[1]; - - msg_info ("start learning ANN, %d epochs are possible", - cbdata->max_epochs); - rspamd_socket_blocking (cbdata->pair[1]); - fann_train_on_data (cbdata->f, cbdata->train, cbdata->max_epochs, 0, - cbdata->desired_mse); - rep.errcode = 0; - rspamd_strlcpy (rep.errmsg, "OK", sizeof (rep.errmsg)); - rep.mse = fann_get_MSE (cbdata->f); - - if (write (cbdata->pair[1], &rep, sizeof (rep)) == -1) { - msg_err ("cannot write to socketpair: %s", strerror (errno)); - - return NULL; - } - - if (read (cbdata->pair[1], repbuf, sizeof (repbuf)) == -1) { - msg_err ("cannot read from socketpair: %s", strerror (errno)); - - return NULL; - } - - return NULL; -} -#endif -/** - * @method rspamd_fann:train_threaded(inputs, outputs, callback, event_base, {params}) - * Trains neural network with batch of samples. Inputs and outputs should be tables of - * equal size, each row in table should be N inputs and M outputs, e.g. - * {{0, 1, 1}, ...} -> {{0}, {1} ...} - * @param {table} inputs input samples - * @param {table} outputs output samples - * @param {callback} function that is called when train is completed - */ -static gint -lua_fann_train_threaded (lua_State *L) -{ -#ifndef WITH_FANN - return 0; -#else - struct fann *f = rspamd_lua_check_fann (L, 1); - guint ninputs, noutputs, ndata, i, j; - struct lua_fann_train_cbdata *cbdata; - struct ev_loop *ev_base = lua_check_ev_base (L, 5); - GError *err = NULL; - const guint max_epochs_default = 1000; - const gdouble desired_mse_default = 0.0001; - - if (f != NULL && lua_type (L, 2) == LUA_TTABLE && - lua_type (L, 3) == LUA_TTABLE && lua_type (L, 4) == LUA_TFUNCTION && - ev_base != NULL) { - /* First check sanity, call for table.getn for that */ - ndata = rspamd_lua_table_size (L, 2); - ninputs = fann_get_num_input (f); - noutputs = fann_get_num_output (f); - cbdata = g_malloc0 (sizeof (*cbdata)); - cbdata->L = L; - cbdata->f = f; - cbdata->train = rspamd_fann_create_train (ndata, ninputs, noutputs); - lua_pushvalue (L, 4); - cbdata->cbref = luaL_ref (L, LUA_REGISTRYINDEX); - - if (rspamd_socketpair (cbdata->pair, 0) == -1) { - msg_err ("cannot open socketpair: %s", strerror (errno)); - cbdata->pair[0] = -1; - cbdata->pair[1] = -1; - goto err; - } - - for (i = 0; i < ndata; i ++) { - lua_rawgeti (L, 2, i + 1); - - if (rspamd_lua_table_size (L, -1) != ninputs) { - msg_err ("invalid number of inputs: %d, %d expected", - rspamd_lua_table_size (L, -1), ninputs); - goto err; - } - - for (j = 0; j < ninputs; j ++) { - lua_rawgeti (L, -1, j + 1); - cbdata->train->input[i][j] = lua_tonumber (L, -1); - lua_pop (L, 1); - } - - lua_pop (L, 1); - lua_rawgeti (L, 3, i + 1); - - if (rspamd_lua_table_size (L, -1) != noutputs) { - msg_err ("invalid number of outputs: %d, %d expected", - rspamd_lua_table_size (L, -1), noutputs); - goto err; - } - - for (j = 0; j < noutputs; j++) { - lua_rawgeti (L, -1, j + 1); - cbdata->train->output[i][j] = lua_tonumber (L, -1); - lua_pop (L, 1); - } - } - - cbdata->max_epochs = max_epochs_default; - cbdata->desired_mse = desired_mse_default; - - if (lua_type (L, 5) == LUA_TTABLE) { - rspamd_lua_parse_table_arguments (L, 5, NULL, - "max_epochs=I;desired_mse=N", - &cbdata->max_epochs, &cbdata->desired_mse); - } - - /* Now we can call training in a separate thread */ - rspamd_socket_nonblocking (cbdata->pair[0]); - event_set (&cbdata->io, cbdata->pair[0], EV_READ, lua_fann_thread_notify, - cbdata); - event_base_set (ev_base, &cbdata->io); - /* TODO: add timeout */ - event_add (&cbdata->io, NULL); - cbdata->t = rspamd_create_thread ("fann train", lua_fann_train_thread, - cbdata, &err); - - if (cbdata->t == NULL) { - msg_err ("cannot create training thread: %e", err); - - if (err) { - g_error_free (err); - } - - goto err; - } - } - else { - return luaL_error (L, "invalid arguments"); - } - - return 0; - -err: - if (cbdata->pair[0] != -1) { - close (cbdata->pair[0]); - } - if (cbdata->pair[1] != -1) { - close (cbdata->pair[1]); - } - - fann_destroy_train (cbdata->train); - luaL_unref (L, LUA_REGISTRYINDEX, cbdata->cbref); - g_free (cbdata); - return luaL_error (L, "invalid arguments"); -#endif -} - -/** - * @method rspamd_fann:test(inputs) - * Tests neural network with samples. Inputs is a single sample of input data. - * The function returns table of results, e.g.: - * {0, 1, 1} -> {0} - * @param {table} inputs input sample - * @return {table/number} outputs values - */ -static gint -lua_fann_test (lua_State *L) -{ -#ifndef WITH_FANN - return 0; -#else - struct fann *f = rspamd_lua_check_fann (L, 1); - guint ninputs, noutputs, i, tbl_idx = 2; - fann_type *cur_input, *cur_output; - - if (f != NULL) { - /* First check sanity, call for table.getn for that */ - if (lua_isnumber (L, 2)) { - ninputs = lua_tonumber (L, 2); - tbl_idx = 3; - } - else { - ninputs = rspamd_lua_table_size (L, 2); - - if (ninputs == 0) { - msg_err ("empty inputs number"); - lua_pushnil (L); - - return 1; - } - } - - cur_input = g_malloc0 (ninputs * sizeof (fann_type)); - - for (i = 0; i < ninputs; i++) { - lua_rawgeti (L, tbl_idx, i + 1); - cur_input[i] = lua_tonumber (L, -1); - lua_pop (L, 1); - } - - cur_output = fann_run (f, cur_input); - noutputs = fann_get_num_output (f); - lua_createtable (L, noutputs, 0); - - for (i = 0; i < noutputs; i ++) { - lua_pushnumber (L, cur_output[i]); - lua_rawseti (L, -2, i + 1); - } - - g_free (cur_input); - } - else { - lua_pushnil (L); - } - - return 1; -#endif -} - -/*** - * @method rspamd_fann:get_inputs() - * Returns number of inputs for neural network - * @return {number} number of inputs - */ -static gint -lua_fann_get_inputs (lua_State *L) -{ -#ifndef WITH_FANN - return 0; -#else - struct fann *f = rspamd_lua_check_fann (L, 1); - - if (f != NULL) { - lua_pushnumber (L, fann_get_num_input (f)); - } - else { - lua_pushnil (L); - } - - return 1; -#endif -} - -/*** - * @method rspamd_fann:get_outputs() - * Returns number of outputs for neural network - * @return {number} number of outputs - */ -static gint -lua_fann_get_outputs (lua_State *L) -{ -#ifndef WITH_FANN - return 0; -#else - struct fann *f = rspamd_lua_check_fann (L, 1); - - if (f != NULL) { - lua_pushnumber (L, fann_get_num_output (f)); - } - else { - lua_pushnil (L); - } - - return 1; -#endif -} - -/*** - * @method rspamd_fann:get_mse() - * Returns mean square error for ANN - * @return {number} MSE value - */ -static gint -lua_fann_get_mse (lua_State *L) -{ -#ifndef WITH_FANN - return 0; -#else - struct fann *f = rspamd_lua_check_fann (L, 1); - - if (f != NULL) { - lua_pushnumber (L, fann_get_MSE (f)); - } - else { - lua_pushnil (L); - } - - return 1; -#endif -} - -/*** - * @method rspamd_fann:get_layers() - * Returns array of neurons count for each layer - * @return {table/number} table with number ofr neurons in each layer - */ -static gint -lua_fann_get_layers (lua_State *L) -{ -#ifndef WITH_FANN - return 0; -#else - struct fann *f = rspamd_lua_check_fann (L, 1); - guint nlayers, i, *layers; - - if (f != NULL) { - nlayers = fann_get_num_layers (f); - layers = g_new (guint, nlayers); - fann_get_layer_array (f, layers); - lua_createtable (L, nlayers, 0); - - for (i = 0; i < nlayers; i ++) { - lua_pushnumber (L, layers[i]); - lua_rawseti (L, -2, i + 1); - } - - g_free (layers); - } - else { - lua_pushnil (L); - } - - return 1; -#endif -} - -/*** - * @method rspamd_fann:save(fname) - * Save fann to file named 'fname' - * @param {string} fname filename to save fann into - * @return {boolean} true if ann has been saved - */ -static gint -lua_fann_save (lua_State *L) -{ -#ifndef WITH_FANN - return 0; -#else - struct fann *f = rspamd_lua_check_fann (L, 1); - const gchar *fname = luaL_checkstring (L, 2); - - if (f != NULL && fname != NULL) { - if (fann_save (f, fname) == 0) { - lua_pushboolean (L, true); - } - else { - msg_err ("cannot save ANN to %s: %s", fname, strerror (errno)); - lua_pushboolean (L, false); - } - } - else { - lua_pushnil (L); - } - - return 1; -#endif -} - -static gint -lua_fann_dtor (lua_State *L) -{ -#ifndef WITH_FANN - return 0; -#else - struct fann *f = rspamd_lua_check_fann (L, 1); - - if (f) { - fann_destroy (f); - } - - return 0; -#endif -} - -static gint -lua_load_fann (lua_State * L) -{ - lua_newtable (L); - luaL_register (L, NULL, fannlib_f); - - return 1; -} - -void -luaopen_fann (lua_State * L) -{ - rspamd_lua_new_class (L, "rspamd{fann}", fannlib_m); - lua_pop (L, 1); - - rspamd_lua_add_preload (L, "rspamd_fann", lua_load_fann); -} diff --git a/src/rspamd.c b/src/rspamd.c index 685793ee8..a4a659a53 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -984,21 +984,21 @@ do_encrypt_password (void) /* Signal handlers */ static void -rspamd_term_handler (gint signo, short what, gpointer arg) +rspamd_term_handler (struct ev_loop *loop, ev_signal *w, int revents) { - struct rspamd_main *rspamd_main = arg; + struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data; msg_info_main ("catch termination signal, waiting for children"); rspamd_log_nolock (rspamd_main->logger); - rspamd_pass_signal (rspamd_main->workers, signo); + rspamd_pass_signal (rspamd_main->workers, w->signum); - event_base_loopexit (rspamd_main->event_loop, NULL); + ev_break (rspamd_main->event_loop, EVBREAK_ALL); } static void -rspamd_usr1_handler (gint signo, short what, gpointer arg) +rspamd_usr1_handler (struct ev_loop *loop, ev_signal *w, int revents) { - struct rspamd_main *rspamd_main = arg; + struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data; rspamd_log_reopen_priv (rspamd_main->logger, rspamd_main->workers_uid, @@ -1008,9 +1008,9 @@ rspamd_usr1_handler (gint signo, short what, gpointer arg) } static void -rspamd_hup_handler (gint signo, short what, gpointer arg) +rspamd_hup_handler (struct ev_loop *loop, ev_signal *w, int revents) { - struct rspamd_main *rspamd_main = arg; + struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data; msg_info_main ("rspamd " RVERSION @@ -1243,8 +1243,9 @@ main (gint argc, gchar **argv, gchar **env) worker_t **pworker; GQuark type; rspamd_inet_addr_t *control_addr = NULL; - struct ev_loop *ev_base; - struct event term_ev, int_ev, cld_ev, hup_ev, usr1_ev, control_ev; + struct ev_loop *event_loop; + ev_signal term_ev, int_ev, cld_ev, hup_ev, usr1_ev; + ev_io control_ev; struct timeval term_tv; struct rspamd_main *rspamd_main; gboolean skip_pid = FALSE, valgrind_mode = FALSE; @@ -1498,47 +1499,53 @@ main (gint argc, gchar **argv, gchar **env) rspamd_main->workers = g_hash_table_new (g_direct_hash, g_direct_equal); /* Init event base */ - ev_base = event_init (); - rspamd_main->event_loop = ev_base; + event_loop = ev_default_loop (EVFLAG_SIGNALFD); + rspamd_main->event_loop = event_loop; /* Unblock signals */ sigemptyset (&signals.sa_mask); sigprocmask (SIG_SETMASK, &signals.sa_mask, NULL); /* Set events for signals */ - evsignal_set (&term_ev, SIGTERM, rspamd_term_handler, rspamd_main); - event_base_set (ev_base, &term_ev); - event_add (&term_ev, NULL); - evsignal_set (&int_ev, SIGINT, rspamd_term_handler, rspamd_main); - event_base_set (ev_base, &int_ev); - event_add (&int_ev, NULL); - evsignal_set (&hup_ev, SIGHUP, rspamd_hup_handler, rspamd_main); - event_base_set (ev_base, &hup_ev); - event_add (&hup_ev, NULL); + ev_signal_init (&term_ev, rspamd_term_handler, SIGTERM); + term_ev.data = rspamd_main; + ev_signal_start (event_loop, &term_ev); + + ev_signal_init (&int_ev, rspamd_term_handler, SIGINT); + int_ev.data = rspamd_main; + ev_signal_start (event_loop, &int_ev); + + ev_signal_init (&hup_ev, rspamd_hup_handler, SIGHUP); + hup_ev.data = rspamd_main; + ev_signal_start (event_loop, &hup_ev); + + ev_signal_init (&usr1_ev, rspamd_usr1_handler, SIGUSR1); + usr1_ev.data = rspamd_main; + ev_signal_start (event_loop, &usr1_ev); + + + /* XXX: deal with children */ evsignal_set (&cld_ev, SIGCHLD, rspamd_cld_handler, rspamd_main); - event_base_set (ev_base, &cld_ev); + event_base_set (event_loop, &cld_ev); event_add (&cld_ev, NULL); - evsignal_set (&usr1_ev, SIGUSR1, rspamd_usr1_handler, rspamd_main); - event_base_set (ev_base, &usr1_ev); - event_add (&usr1_ev, NULL); + rspamd_check_core_limits (rspamd_main); rspamd_mempool_lock_mutex (rspamd_main->start_mtx); - spawn_workers (rspamd_main, ev_base); + spawn_workers (rspamd_main, event_loop); rspamd_mempool_unlock_mutex (rspamd_main->start_mtx); rspamd_main->http_ctx = rspamd_http_context_create (rspamd_main->cfg, - ev_base, rspamd_main->cfg->ups_ctx); + event_loop, rspamd_main->cfg->ups_ctx); if (control_fd != -1) { msg_info_main ("listening for control commands on %s", rspamd_inet_address_to_string (control_addr)); - event_set (&control_ev, control_fd, EV_READ|EV_PERSIST, - rspamd_control_handler, rspamd_main); - event_base_set (ev_base, &control_ev); - event_add (&control_ev, NULL); + ev_io_init (&control_ev, rspamd_control_handler, control_fd, EV_READ); + control_ev.data = rspamd_main; + ev_io_start (event_loop, &control_ev); } - event_base_loop (ev_base, 0); + event_base_loop (event_loop, 0); /* We need to block signals unless children are waited for */ rspamd_worker_block_signals (); @@ -1570,10 +1577,10 @@ main (gint argc, gchar **argv, gchar **env) event_set (&term_ev, -1, EV_TIMEOUT|EV_PERSIST, rspamd_final_term_handler, rspamd_main); - event_base_set (ev_base, &term_ev); + event_base_set (event_loop, &term_ev); event_add (&term_ev, &term_tv); - event_base_loop (ev_base, 0); + event_base_loop (event_loop, 0); event_del (&term_ev); /* Maybe save roll history */ @@ -1595,7 +1602,7 @@ main (gint argc, gchar **argv, gchar **env) } g_free (rspamd_main); - event_base_free (ev_base); + event_base_free (event_loop); sqlite3_shutdown (); if (control_addr) { diff --git a/src/rspamd.h b/src/rspamd.h index 9af78e904..375cba13f 100644 --- a/src/rspamd.h +++ b/src/rspamd.h @@ -62,6 +62,12 @@ enum rspamd_worker_flags { RSPAMD_WORKER_CONTROLLER = (1 << 6), }; +struct rspamd_worker_accept_event { + ev_io accept_ev; + ev_timer throttling_ev; + struct ev_loop *event_loop; + struct rspamd_worker_accept_event *prev, *next; +}; /** * Worker process structure @@ -77,7 +83,7 @@ struct rspamd_worker { struct rspamd_main *srv; /**< pointer to server structure */ GQuark type; /**< process type */ GHashTable *signal_events; /**< signal events */ - GList *accept_events; /**< socket events */ + struct rspamd_worker_accept_event *accept_events; /**< socket events */ struct rspamd_worker_conf *cf; /**< worker config data */ gpointer ctx; /**< worker's specific data */ enum rspamd_worker_flags flags; /**< worker's flags */ @@ -85,7 +91,7 @@ struct rspamd_worker { [1] is used by a worker */ gint srv_pipe[2]; /**< used by workers to request something from the main process. [0] - main, [1] - worker */ - struct event srv_ev; /**< used by main for read workers' requests */ + ev_io srv_ev; /**< used by main for read workers' requests */ gpointer control_data; /**< used by control protocol to handle commands */ gpointer tmp_data; /**< used to avoid race condition to deal with control messages */ GPtrArray *finish_actions; /**< called when worker is terminated */ |