aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt34
-rw-r--r--config.h.in2
-rw-r--r--contrib/libev/CMakeLists.txt1
-rw-r--r--src/CMakeLists.txt3
-rw-r--r--src/libserver/worker_util.c92
-rw-r--r--src/libserver/worker_util.h14
-rw-r--r--src/libutil/addr.c39
-rw-r--r--src/libutil/addr.h10
-rw-r--r--src/libutil/util.c36
-rw-r--r--src/libutil/util.h13
-rw-r--r--src/lua/CMakeLists.txt1
-rw-r--r--src/lua/lua_common.c1
-rw-r--r--src/lua/lua_common.h1
-rw-r--r--src/lua/lua_fann.c1032
-rw-r--r--src/rspamd.c77
-rw-r--r--src/rspamd.h10
16 files changed, 126 insertions, 1240 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 893cce7f6..d5f37900a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -52,9 +52,7 @@ OPTION(WANT_SYSTEMD_UNITS "Install systemd unit files on Linux [default: OFF]"
OPTION(ENABLE_SNOWBALL "Enable snowball stemmer [default: ON]" ON)
OPTION(ENABLE_CLANG_PLUGIN "Enable clang static analysing plugin [default: OFF]" OFF)
OPTION(ENABLE_HYPERSCAN "Enable hyperscan for fast regexp processing [default: OFF]" OFF)
-OPTION(ENABLE_FANN "Enable fann for neural network plugin [default: OFF]" OFF)
OPTION(ENABLE_PCRE2 "Enable pcre2 instead of pcre [default: OFF]" OFF)
-OPTION(ENABLE_GD "Enable libgd for images processing [default: OFF]" OFF)
OPTION(ENABLE_JEMALLOC "Build rspamd with jemalloc allocator [default: OFF]" OFF)
OPTION(ENABLE_COVERAGE "Build rspamd with code coverage options [default: OFF]" OFF)
OPTION(ENABLE_FULL_DEBUG "Build rspamd with all possible debug [default: OFF]" OFF)
@@ -634,9 +632,7 @@ IF(ENABLE_LIBUNWIND MATCHES "ON")
ROOT ${LIBUNWIND_ROOT_DIR} MODULES libunwind)
SET(WITH_LIBUNWIND "1")
ENDIF()
-ProcessPackage(GTHREAD2 LIBRARY gthread-2.0 INCLUDE glib.h
- INCLUDE_SUFFIXES include/glib include/glib-2.0
- ROOT ${GLIB_ROOT_DIR} MODULES gthread-2.0>=2.28)
+
ProcessPackage(GLIB2 LIBRARY glib-2.0 INCLUDE glib.h
INCLUDE_SUFFIXES include/glib include/glib-2.0
ROOT ${GLIB_ROOT_DIR} MODULES glib-2.0>=2.28)
@@ -672,34 +668,6 @@ IF(ENABLE_HYPERSCAN MATCHES "ON")
ROOT ${HYPERSCAN_ROOT_DIR} MODULES libhs)
SET(WITH_HYPERSCAN 1)
ENDIF()
-IF (ENABLE_FANN MATCHES "ON")
- ProcessPackage(FANN LIBRARY fann INCLUDE fann.h INCLUDE_SUFFIXES
- include/fann
- ROOT ${FANN_ROOT_DIR} MODULES fann)
- SET(WITH_FANN 1)
-ENDIF ()
-
-IF (ENABLE_GD MATCHES "ON")
- ProcessPackage(GD LIBRARY gd INCLUDE gd.h INCLUDE_SUFFIXES
- include/gd include/libgd
- ROOT ${GD_ROOT_DIR} MODULES gd)
- LIST(APPEND CMAKE_REQUIRED_INCLUDES "${GD_INCLUDE}")
- LIST(APPEND CMAKE_REQUIRED_LIBRARIES "${GD_LIBRARY}")
-
- CHECK_SYMBOL_EXISTS(gdImageSetInterpolationMethod gd.h GD_INTERPOLATION)
- CHECK_SYMBOL_EXISTS(gdImageScale gd.h GD_SCALE)
- CHECK_SYMBOL_EXISTS(gdImageGrayScale gd.h GD_GRAYSCALE)
- CHECK_SYMBOL_EXISTS(gdImageCreateFromJpegPtr gd.h GD_JPEG)
- CHECK_SYMBOL_EXISTS(gdImageCreateFromPngPtr gd.h GD_PNG)
- CHECK_SYMBOL_EXISTS(gdImageCreateFromBmpPtr gd.h GD_BMP)
- CHECK_SYMBOL_EXISTS(gdImageCreateFromGifPtr gd.h GD_GIF)
-
- IF(GD_INTERPOLATION AND GD_SCALE AND GD_GRAYSCALE AND GD_JPEG AND GD_PNG AND GD_GIF AND GD_BMP)
- SET(USABLE_GD 1)
- ELSE()
- MESSAGE(STATUS "Libgd is found but it is unusable")
- ENDIF()
-ENDIF ()
#Check for openssl (required for dkim)
SET(HAVE_OPENSSL 1)
diff --git a/config.h.in b/config.h.in
index 0693cd55c..9dcf51a50 100644
--- a/config.h.in
+++ b/config.h.in
@@ -139,8 +139,6 @@
#cmakedefine LIBEVENT_EVHTTP 1
#cmakedefine PARAM_H_HAS_BITSET 1
#cmakedefine WITH_DB 1
-#cmakedefine WITH_FANN 1
-#cmakedefine USABLE_GD 1
#cmakedefine WITH_GPERF_TOOLS 1
#cmakedefine WITH_HIREDIS 1
#cmakedefine WITH_HYPERSCAN 1
diff --git a/contrib/libev/CMakeLists.txt b/contrib/libev/CMakeLists.txt
index ca216d180..e681aeb91 100644
--- a/contrib/libev/CMakeLists.txt
+++ b/contrib/libev/CMakeLists.txt
@@ -59,7 +59,6 @@ ADD_LIBRARY(rspamd-libev STATIC ${LIBEVSRC})
ADD_DEFINITIONS("-DEV_CONFIG_H=\"${CMAKE_CURRENT_BINARY_DIR}/libev-config.h\""
-DEV_MULTIPLICITY=1
-DEV_USE_FLOOR=1
- -DEV_USE_MONOTONIC=1
-DEV_NO_THREADS=1 # We do not have threads in Rspamd!
-DEV_FEATURES=127 # Enable all features
)
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 3e6abd4e8..da5c19e2d 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -101,8 +101,7 @@ SET(PLUGINSSRC plugins/surbl.c
plugins/fuzzy_check.c
plugins/spf.c
plugins/dkim_check.c
- libserver/rspamd_control.c
- lua/lua_fann.c)
+ libserver/rspamd_control.c)
SET(MODULES_LIST surbl regexp chartable fuzzy_check spf dkim)
SET(WORKERS_LIST normal controller fuzzy lua rspamd_proxy log_helper)
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 59698be32..0d4c4db17 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -299,12 +299,12 @@ rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base)
struct ev_loop *
rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
- void (*accept_handler)(int, short, void *))
+ rspamd_accept_handler hdl)
{
- struct ev_loop *ev_base;
- struct event *accept_events;
+ struct ev_loop *event_loop;
GList *cur;
struct rspamd_worker_listen_socket *ls;
+ struct rspamd_worker_accept_event *accept_ev;
#ifdef WITH_PROFILER
extern void _start (void), etext (void);
@@ -316,65 +316,58 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
worker->signal_events = g_hash_table_new_full (g_direct_hash, g_direct_equal,
NULL, rspamd_sigh_free);
- ev_base = event_init ();
+ event_loop = ev_default_loop (EVFLAG_SIGNALFD);
- rspamd_worker_init_signals (worker, ev_base);
- rspamd_control_worker_add_default_handler (worker, ev_base);
+ rspamd_worker_init_signals (worker, event_loop);
+ rspamd_control_worker_add_default_handler (worker, event_loop);
#ifdef WITH_HIREDIS
rspamd_redis_pool_config (worker->srv->cfg->redis_pool,
- worker->srv->cfg, ev_base);
+ worker->srv->cfg, event_loop);
#endif
/* Accept all sockets */
- if (accept_handler) {
+ if (hdl) {
cur = worker->cf->listen_socks;
while (cur) {
ls = cur->data;
if (ls->fd != -1) {
- accept_events = g_malloc0 (sizeof (struct event) * 2);
- event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST,
- accept_handler, worker);
- event_base_set (ev_base, &accept_events[0]);
- event_add (&accept_events[0], NULL);
- worker->accept_events = g_list_prepend (worker->accept_events,
- accept_events);
+ accept_ev = g_malloc0 (sizeof (*accept_ev));
+ accept_ev->event_loop = event_loop;
+ ev_io_init (&accept_ev->accept_ev, hdl, ls->fd, EV_READ);
+ ev_io_start (event_loop, &accept_ev->accept_ev);
+
+ DL_APPEND (worker->accept_events, accept_ev);
}
cur = g_list_next (cur);
}
}
- return ev_base;
+ return event_loop;
}
void
rspamd_worker_stop_accept (struct rspamd_worker *worker)
{
- GList *cur;
- struct event *events;
+ struct rspamd_worker_accept_event *cur, *tmp;
/* Remove all events */
- cur = worker->accept_events;
- while (cur) {
- events = cur->data;
+ DL_FOREACH_SAFE (worker->accept_events, cur, tmp) {
- if (rspamd_event_pending (&events[0], EV_TIMEOUT|EV_READ|EV_WRITE)) {
- event_del (&events[0]);
+ if (ev_is_active (&cur->accept_ev) || ev_is_pending (&cur->accept_ev)) {
+ ev_io_stop (cur->event_loop, &cur->accept_ev);
}
- if (rspamd_event_pending (&events[1], EV_TIMEOUT|EV_READ|EV_WRITE)) {
- event_del (&events[1]);
+
+ if (ev_is_active (&cur->throttling_ev) || ev_is_pending (&cur->throttling_ev)) {
+ ev_timer_stop (cur->event_loop, &cur->throttling_ev);
}
- cur = g_list_next (cur);
- g_free (events);
+ g_free (cur);
}
- if (worker->accept_events != NULL) {
- g_list_free (worker->accept_events);
- }
/* XXX: we need to do it much later */
#if 0
g_hash_table_iter_init (&it, worker->signal_events);
@@ -721,16 +714,6 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
rspamd_log_open (rspamd_main->logger);
wrk->start_time = rspamd_get_calendar_ticks ();
-#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
- # if (GLIB_MINOR_VERSION > 20)
- /* Ugly hack for old glib */
- if (!g_thread_get_initialized ()) {
- g_thread_init (NULL);
- }
-# else
- g_thread_init (NULL);
-# endif
-#endif
if (cf->bind_conf) {
msg_info_main ("starting %s process %P (%d); listen on: %s",
cf->worker->name,
@@ -1129,4 +1112,33 @@ rspamd_set_crash_handler (struct rspamd_main *rspamd_main)
sigaction (SIGFPE, &sa, NULL);
sigaction (SIGSYS, &sa, NULL);
#endif
+}
+
+static void
+rspamd_enable_accept_event (gint fd, short what, gpointer d)
+{
+ struct event *events = d;
+
+ event_del (&events[1]);
+ event_add (&events[0], NULL);
+}
+
+void
+rspamd_worker_throttle_accept_events (gint sock, void *data)
+{
+ struct rspamd_worker_accept_event *head, *cur;
+ const gdouble throttling = 0.5;
+ struct ev_loop *ev_base;
+
+ head = (struct rspamd_worker_accept_event *)data;
+
+ DL_FOREACH (head, cur) {
+
+ ev_base = event_get_base (&events[0]);
+ event_del (&events[0]);
+ event_set (&events[1], sock, EV_TIMEOUT, rspamd_enable_accept_event,
+ events);
+ event_base_set (ev_base, &events[1]);
+ event_add (&events[1], &tv);
+ }
} \ No newline at end of file
diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h
index 2dc78dfc7..67b54e5c9 100644
--- a/src/libserver/worker_util.h
+++ b/src/libserver/worker_util.h
@@ -36,6 +36,9 @@ struct rspamd_worker_signal_handler;
* @param base
*/
void rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base);
+
+typedef void (*rspamd_accept_handler)(struct ev_loop *loop, ev_io *w, int revents);
+
/**
* Prepare worker's startup
* @param worker worker structure
@@ -46,7 +49,7 @@ void rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *b
*/
struct ev_loop *
rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
- void (*accept_handler)(int, short, void *));
+ rspamd_accept_handler hdl);
/**
* Set special signal handler for a worker
@@ -124,8 +127,6 @@ void rspamd_controller_send_ucl (struct rspamd_http_connection_entry *entry,
*/
worker_t * rspamd_get_worker_by_type (struct rspamd_config *cfg, GQuark type);
-void rspamd_worker_stop_accept (struct rspamd_worker *worker);
-
/**
* Block signals before terminations
*/
@@ -202,6 +203,13 @@ void rspamd_worker_init_monitored (struct rspamd_worker *worker,
struct ev_loop *ev_base,
struct rspamd_dns_resolver *resolver);
+/**
+ * Performs throttling for accept events
+ * @param sock
+ * @param data struct rspamd_worker_accept_event * list
+ */
+void rspamd_worker_throttle_accept_events (gint sock, void *data);
+
#define msg_err_main(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
rspamd_main->server_pool->tag.tagname, rspamd_main->server_pool->tag.uid, \
G_STRFUNC, \
diff --git a/src/libutil/addr.c b/src/libutil/addr.c
index 339f5facb..112c5d2cd 100644
--- a/src/libutil/addr.c
+++ b/src/libutil/addr.c
@@ -203,41 +203,10 @@ rspamd_ip_is_valid (const rspamd_inet_addr_t *addr)
return ret;
}
-static void
-rspamd_enable_accept_event (gint fd, short what, gpointer d)
-{
- struct event *events = d;
-
- event_del (&events[1]);
- event_add (&events[0], NULL);
-}
-
-static void
-rspamd_disable_accept_events (gint sock, GList *accept_events)
-{
- GList *cur;
- struct event *events;
- const gdouble throttling = 0.5;
- struct timeval tv;
- struct ev_loop *ev_base;
-
- double_to_tv (throttling, &tv);
-
- for (cur = accept_events; cur != NULL; cur = g_list_next (cur)) {
- events = cur->data;
-
- ev_base = event_get_base (&events[0]);
- event_del (&events[0]);
- event_set (&events[1], sock, EV_TIMEOUT, rspamd_enable_accept_event,
- events);
- event_base_set (ev_base, &events[1]);
- event_add (&events[1], &tv);
- }
-}
-
gint
rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **target,
- GList *accept_events)
+ rspamd_accept_throttling_handler hdl,
+ void *hdl_data)
{
gint nfd, serrno;
union sa_union su;
@@ -254,7 +223,9 @@ rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **target,
}
else if (errno == EMFILE || errno == ENFILE) {
/* Temporary disable accept event */
- rspamd_disable_accept_events (sock, accept_events);
+ if (hdl) {
+ hdl (sock, hdl_data);
+ }
return 0;
}
diff --git a/src/libutil/addr.h b/src/libutil/addr.h
index bfe586ad1..7efa5e318 100644
--- a/src/libutil/addr.h
+++ b/src/libutil/addr.h
@@ -221,15 +221,17 @@ int rspamd_inet_address_listen (const rspamd_inet_addr_t *addr, gint type,
*/
gboolean rspamd_ip_is_valid (const rspamd_inet_addr_t *addr);
+typedef void (*rspamd_accept_throttling_handler)(gint, void *);
/**
* Accept from listening socket filling addr structure
* @param sock listening socket
- * @param addr allocated inet addr structure
- * @param accept_events events for accepting new sockets
+ * @param target allocated inet addr structure
* @return
*/
-gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **addr,
- GList *accept_events);
+gint rspamd_accept_from_socket (gint sock,
+ rspamd_inet_addr_t **target,
+ rspamd_accept_throttling_handler hdl,
+ void *hdl_data);
/**
* Parse host[:port[:priority]] line
diff --git a/src/libutil/util.c b/src/libutil/util.c
index 665b6accb..e7a5c2601 100644
--- a/src/libutil/util.c
+++ b/src/libutil/util.c
@@ -1612,42 +1612,6 @@ rspamd_thread_func (gpointer ud)
return ud;
}
-/**
- * Create new named thread
- * @param name name pattern
- * @param func function to start
- * @param data data to pass to function
- * @param err error pointer
- * @return new thread object that can be joined
- */
-GThread *
-rspamd_create_thread (const gchar *name,
- GThreadFunc func,
- gpointer data,
- GError **err)
-{
- GThread *new;
- struct rspamd_thread_data *td;
- static gint32 id;
- guint r;
-
- r = strlen (name);
- td = g_malloc (sizeof (struct rspamd_thread_data));
- td->id = ++id;
- td->name = g_malloc (r + sizeof ("4294967296"));
- td->func = func;
- td->data = data;
-
- rspamd_snprintf (td->name, r + sizeof ("4294967296"), "%s-%d", name, id);
-#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 32))
- new = g_thread_try_new (td->name, rspamd_thread_func, td, err);
-#else
- new = g_thread_create (rspamd_thread_func, td, TRUE, err);
-#endif
-
- return new;
-}
-
struct hash_copy_callback_data {
gpointer (*key_copy_func)(gconstpointer data, gpointer ud);
gpointer (*value_copy_func)(gconstpointer data, gpointer ud);
diff --git a/src/libutil/util.h b/src/libutil/util.h
index b7d6055ce..21e4b320e 100644
--- a/src/libutil/util.h
+++ b/src/libutil/util.h
@@ -263,19 +263,6 @@ void rspamd_mutex_unlock (rspamd_mutex_t *mtx);
void rspamd_mutex_free (rspamd_mutex_t *mtx);
/**
- * Create new named thread
- * @param name name pattern
- * @param func function to start
- * @param data data to pass to function
- * @param err error pointer
- * @return new thread object that can be joined
- */
-GThread * rspamd_create_thread (const gchar *name,
- GThreadFunc func,
- gpointer data,
- GError **err);
-
-/**
* Deep copy of one hash table to another
* @param src source hash
* @param dst destination hash
diff --git a/src/lua/CMakeLists.txt b/src/lua/CMakeLists.txt
index c3f1a84c1..4a2003605 100644
--- a/src/lua/CMakeLists.txt
+++ b/src/lua/CMakeLists.txt
@@ -22,7 +22,6 @@ SET(LUASRC ${CMAKE_CURRENT_SOURCE_DIR}/lua_common.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_util.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_tcp.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_html.c
- ${CMAKE_CURRENT_SOURCE_DIR}/lua_fann.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_sqlite3.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_cryptobox.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_map.c
diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c
index 5f31ebbb0..689dcd1c4 100644
--- a/src/lua/lua_common.c
+++ b/src/lua/lua_common.c
@@ -910,7 +910,6 @@ rspamd_lua_init (bool wipe_mem)
luaopen_util (L);
luaopen_tcp (L);
luaopen_html (L);
- luaopen_fann (L);
luaopen_sqlite3 (L);
luaopen_cryptobox (L);
luaopen_dns (L);
diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h
index f3d006743..8919a46fd 100644
--- a/src/lua/lua_common.h
+++ b/src/lua/lua_common.h
@@ -281,7 +281,6 @@ void luaopen_text (lua_State *L);
void luaopen_util (lua_State * L);
void luaopen_tcp (lua_State * L);
void luaopen_html (lua_State * L);
-void luaopen_fann (lua_State *L);
void luaopen_sqlite3 (lua_State *L);
void luaopen_cryptobox (lua_State *L);
void luaopen_dns (lua_State *L);
diff --git a/src/lua/lua_fann.c b/src/lua/lua_fann.c
deleted file mode 100644
index 7eb493740..000000000
--- a/src/lua/lua_fann.c
+++ /dev/null
@@ -1,1032 +0,0 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "lua_common.h"
-
-#ifdef WITH_FANN
-#include <fann.h>
-#endif
-
-#include "unix-std.h"
-
-/***
- * @module rspamd_fann
- * This module enables [fann](http://libfann.github.io) interaction in rspamd
- * Please note, that this module works merely if you have `ENABLE_FANN=ON` option
- * definition when building rspamd
- */
-
-/*
- * Fann functions
- */
-LUA_FUNCTION_DEF (fann, is_enabled);
-LUA_FUNCTION_DEF (fann, create);
-LUA_FUNCTION_DEF (fann, create_full);
-LUA_FUNCTION_DEF (fann, load_file);
-LUA_FUNCTION_DEF (fann, load_data);
-
-/*
- * Fann methods
- */
-LUA_FUNCTION_DEF (fann, train);
-LUA_FUNCTION_DEF (fann, train_threaded);
-LUA_FUNCTION_DEF (fann, test);
-LUA_FUNCTION_DEF (fann, save);
-LUA_FUNCTION_DEF (fann, data);
-LUA_FUNCTION_DEF (fann, get_inputs);
-LUA_FUNCTION_DEF (fann, get_outputs);
-LUA_FUNCTION_DEF (fann, get_layers);
-LUA_FUNCTION_DEF (fann, get_mse);
-LUA_FUNCTION_DEF (fann, dtor);
-
-static const struct luaL_reg fannlib_f[] = {
- LUA_INTERFACE_DEF (fann, is_enabled),
- LUA_INTERFACE_DEF (fann, create),
- LUA_INTERFACE_DEF (fann, create_full),
- LUA_INTERFACE_DEF (fann, load_file),
- {"load", lua_fann_load_file},
- LUA_INTERFACE_DEF (fann, load_data),
- {NULL, NULL}
-};
-
-static const struct luaL_reg fannlib_m[] = {
- LUA_INTERFACE_DEF (fann, train),
- LUA_INTERFACE_DEF (fann, train_threaded),
- LUA_INTERFACE_DEF (fann, test),
- LUA_INTERFACE_DEF (fann, save),
- LUA_INTERFACE_DEF (fann, data),
- LUA_INTERFACE_DEF (fann, get_inputs),
- LUA_INTERFACE_DEF (fann, get_outputs),
- LUA_INTERFACE_DEF (fann, get_layers),
- LUA_INTERFACE_DEF (fann, get_mse),
- {"__gc", lua_fann_dtor},
- {"__tostring", rspamd_lua_class_tostring},
- {NULL, NULL}
-};
-
-#ifdef WITH_FANN
-struct fann *
-rspamd_lua_check_fann (lua_State *L, gint pos)
-{
- void *ud = rspamd_lua_check_udata (L, pos, "rspamd{fann}");
- luaL_argcheck (L, ud != NULL, pos, "'fann' expected");
- return ud ? *((struct fann **) ud) : NULL;
-}
-#endif
-
-/***
- * @function rspamd_fann.is_enabled()
- * Checks if fann is enabled for this rspamd build
- * @return {boolean} true if fann is enabled
- */
-static gint
-lua_fann_is_enabled (lua_State *L)
-{
-#ifdef WITH_FANN
- lua_pushboolean (L, true);
-#else
- lua_pushboolean (L, false);
-#endif
- return 1;
-}
-
-/***
- * @function rspamd_fann.create(nlayers, [layer1, ... layern])
- * Creates new neural network with `nlayers` that contains `layer1`...`layern`
- * neurons in each layer
- * @param {number} nlayers number of layers
- * @param {number} layerI number of neurons in each layer
- * @return {fann} fann object
- */
-static gint
-lua_fann_create (lua_State *L)
-{
-#ifndef WITH_FANN
- return 0;
-#else
- struct fann *f, **pfann;
- guint nlayers, *layers, i;
-
- nlayers = luaL_checknumber (L, 1);
-
- if (nlayers > 0) {
- layers = g_malloc (nlayers * sizeof (layers[0]));
-
- if (lua_type (L, 2) == LUA_TNUMBER) {
- for (i = 0; i < nlayers; i ++) {
- layers[i] = luaL_checknumber (L, i + 2);
- }
- }
- else if (lua_type (L, 2) == LUA_TTABLE) {
- for (i = 0; i < nlayers; i ++) {
- lua_rawgeti (L, 2, i + 1);
- layers[i] = luaL_checknumber (L, -1);
- lua_pop (L, 1);
- }
- }
-
- f = fann_create_standard_array (nlayers, layers);
- fann_set_activation_function_hidden (f, FANN_SIGMOID_SYMMETRIC);
- fann_set_activation_function_output (f, FANN_SIGMOID_SYMMETRIC);
- fann_set_training_algorithm (f, FANN_TRAIN_INCREMENTAL);
- fann_randomize_weights (f, 0, 1);
-
- if (f != NULL) {
- pfann = lua_newuserdata (L, sizeof (gpointer));
- *pfann = f;
- rspamd_lua_setclass (L, "rspamd{fann}", -1);
- }
- else {
- lua_pushnil (L);
- }
-
- g_free (layers);
- }
- else {
- lua_pushnil (L);
- }
-
- return 1;
-#endif
-}
-
-#ifdef WITH_FANN
-static enum fann_activationfunc_enum
-string_to_activation_func (const gchar *str)
-{
- if (str == NULL) {
- return FANN_SIGMOID_SYMMETRIC;
- }
- if (strcmp (str, "sigmoid") == 0) {
- return FANN_SIGMOID;
- }
- else if (strcmp (str, "elliot") == 0) {
- return FANN_ELLIOT;
- }
- else if (strcmp (str, "elliot_symmetric") == 0) {
- return FANN_ELLIOT_SYMMETRIC;
- }
- else if (strcmp (str, "linear") == 0) {
- return FANN_LINEAR;
- }
-
- return FANN_SIGMOID_SYMMETRIC;
-}
-
-static enum fann_train_enum
-string_to_learn_alg (const gchar *str)
-{
- if (str == NULL) {
- return FANN_TRAIN_INCREMENTAL;
- }
- if (strcmp (str, "rprop") == 0) {
- return FANN_TRAIN_RPROP;
- }
- else if (strcmp (str, "qprop") == 0) {
- return FANN_TRAIN_QUICKPROP;
- }
- else if (strcmp (str, "batch") == 0) {
- return FANN_TRAIN_BATCH;
- }
-
- return FANN_TRAIN_INCREMENTAL;
-}
-/*
- * This is needed since libfann provides no versioning macros...
- */
-static struct fann_train_data *
-rspamd_fann_create_train (guint num_data, guint num_input, guint num_output)
-{
- struct fann_train_data *t;
- fann_type *inp, *outp;
- guint i;
-
- g_assert (num_data > 0 && num_input > 0 && num_output > 0);
-
- t = calloc (1, sizeof (*t));
- g_assert (t != NULL);
-
- t->num_data = num_data;
- t->num_input = num_input;
- t->num_output = num_output;
-
- t->input = calloc (num_data, sizeof (fann_type *));
- g_assert (t->input != NULL);
-
- t->output = calloc (num_data, sizeof (fann_type *));
- g_assert (t->output != NULL);
-
- inp = calloc (num_data * num_input, sizeof (fann_type));
- g_assert (inp != NULL);
-
- outp = calloc (num_data * num_output, sizeof (fann_type));
- g_assert (outp != NULL);
-
- for (i = 0; i < num_data; i ++) {
- t->input[i] = inp;
- inp += num_input;
- t->output[i] = outp;
- outp += num_output;
- }
-
- return t;
-}
-
-
-#endif
-
-/***
- * @function rspamd_fann.create_full(params)
- * Creates new neural network with parameters:
- * - `layers` {table/numbers}: table of layers in form: {N1, N2, N3 ... Nn} where N is number of neurons in a layer
- * - `activation_hidden` {string}: activation function type for hidden layers (`tanh` by default)
- * - `activation_output` {string}: activation function type for output layer (`tanh` by default)
- * - `sparsed` {float}: create sparsed ANN, where number is a coefficient for sparsing
- * - `learn` {string}: learning algorithm (quickprop, rprop or incremental)
- * - `randomize` {boolean}: randomize weights (true by default)
- * @return {fann} fann object
- */
-static gint
-lua_fann_create_full (lua_State *L)
-{
-#ifndef WITH_FANN
- return 0;
-#else
- struct fann *f, **pfann;
- guint nlayers, *layers, i;
- const gchar *activation_hidden = NULL, *activation_output, *learn_alg = NULL;
- gdouble sparsed = 0.0;
- gboolean randomize_ann = TRUE;
- GError *err = NULL;
-
- if (lua_type (L, 1) == LUA_TTABLE) {
- lua_pushstring (L, "layers");
- lua_gettable (L, 1);
-
- if (lua_type (L, -1) != LUA_TTABLE) {
- return luaL_error (L, "bad layers attribute");
- }
-
- nlayers = rspamd_lua_table_size (L, -1);
- if (nlayers < 2) {
- return luaL_error (L, "bad layers attribute");
- }
-
- layers = g_new0 (guint, nlayers);
-
- for (i = 0; i < nlayers; i ++) {
- lua_rawgeti (L, -1, i + 1);
- layers[i] = luaL_checknumber (L, -1);
- lua_pop (L, 1);
- }
-
- lua_pop (L, 1); /* Table */
-
- if (!rspamd_lua_parse_table_arguments (L, 1, &err,
- "sparsed=N;randomize=B;learn=S;activation_hidden=S;activation_output=S",
- &sparsed, &randomize_ann, &learn_alg, &activation_hidden, &activation_output)) {
- g_free (layers);
-
- if (err) {
- gint r;
-
- r = luaL_error (L, "invalid arguments: %s", err->message);
- g_error_free (err);
- return r;
- }
- else {
- return luaL_error (L, "invalid arguments");
- }
- }
-
- if (sparsed != 0.0) {
- f = fann_create_standard_array (nlayers, layers);
- }
- else {
- f = fann_create_sparse_array (sparsed, nlayers, layers);
- }
-
- if (f != NULL) {
- pfann = lua_newuserdata (L, sizeof (gpointer));
- *pfann = f;
- rspamd_lua_setclass (L, "rspamd{fann}", -1);
- }
- else {
- g_free (layers);
- return luaL_error (L, "cannot create fann");
- }
-
- fann_set_activation_function_hidden (f,
- string_to_activation_func (activation_hidden));
- fann_set_activation_function_output (f,
- string_to_activation_func (activation_output));
- fann_set_training_algorithm (f, string_to_learn_alg (learn_alg));
-
- if (randomize_ann) {
- fann_randomize_weights (f, 0, 1);
- }
-
- g_free (layers);
- }
- else {
- return luaL_error (L, "bad arguments");
- }
-
- return 1;
-#endif
-}
-
-/***
- * @function rspamd_fann.load(file)
- * Loads neural network from the file
- * @param {string} file filename where fann is stored
- * @return {fann} fann object
- */
-static gint
-lua_fann_load_file (lua_State *L)
-{
-#ifndef WITH_FANN
- return 0;
-#else
- struct fann *f, **pfann;
- const gchar *fname;
-
- fname = luaL_checkstring (L, 1);
-
- if (fname != NULL) {
- f = fann_create_from_file (fname);
-
- if (f != NULL) {
- pfann = lua_newuserdata (L, sizeof (gpointer));
- *pfann = f;
- rspamd_lua_setclass (L, "rspamd{fann}", -1);
- }
- else {
- lua_pushnil (L);
- }
- }
- else {
- lua_pushnil (L);
- }
-
- return 1;
-#endif
-}
-
-/***
- * @function rspamd_fann.load_data(data)
- * Loads neural network from the data
- * @param {string} file filename where fann is stored
- * @return {fann} fann object
- */
-static gint
-lua_fann_load_data (lua_State *L)
-{
-#ifndef WITH_FANN
- return 0;
-#else
- struct fann *f, **pfann;
- gint fd;
- struct rspamd_lua_text *t;
- gchar fpath[PATH_MAX];
-
- if (lua_type (L, 1) == LUA_TUSERDATA) {
- t = lua_check_text (L, 1);
-
- if (!t) {
- return luaL_error (L, "text required");
- }
- }
- else {
- t = g_alloca (sizeof (*t));
- t->start = lua_tolstring (L, 1, (gsize *)&t->len);
- t->flags = 0;
- }
-
- /* We need to save data to file because of libfann stupidity */
- rspamd_strlcpy (fpath, "/tmp/rspamd-fannXXXXXXXXXX", sizeof (fpath));
- fd = mkstemp (fpath);
-
- if (fd == -1) {
- msg_warn ("cannot create tempfile: %s", strerror (errno));
- lua_pushnil (L);
- }
- else {
- if (write (fd, t->start, t->len) == -1) {
- msg_warn ("cannot write tempfile: %s", strerror (errno));
- lua_pushnil (L);
- unlink (fpath);
- close (fd);
-
- return 1;
- }
-
- f = fann_create_from_file (fpath);
- unlink (fpath);
- close (fd);
-
- if (f != NULL) {
- pfann = lua_newuserdata (L, sizeof (gpointer));
- *pfann = f;
- rspamd_lua_setclass (L, "rspamd{fann}", -1);
- }
- else {
- lua_pushnil (L);
- }
- }
-
- return 1;
-#endif
-}
-
-/***
- * @function rspamd_fann:data()
- * Returns serialized neural network
- * @return {rspamd_text} fann data
- */
-static gint
-lua_fann_data (lua_State *L)
-{
-#ifndef WITH_FANN
- return 0;
-#else
- struct fann *f = rspamd_lua_check_fann (L, 1);
- gint fd;
- struct rspamd_lua_text *res;
- gchar fpath[PATH_MAX];
- gpointer map;
- gsize sz;
-
- if (f == NULL) {
- return luaL_error (L, "invalid arguments");
- }
-
- /* We need to save data to file because of libfann stupidity */
- rspamd_strlcpy (fpath, "/tmp/rspamd-fannXXXXXXXXXX", sizeof (fpath));
- fd = mkstemp (fpath);
-
- if (fd == -1) {
- msg_warn ("cannot create tempfile: %s", strerror (errno));
- lua_pushnil (L);
- }
- else {
- if (fann_save (f, fpath) == -1) {
- msg_warn ("cannot write tempfile: %s", strerror (errno));
- lua_pushnil (L);
- unlink (fpath);
- close (fd);
-
- return 1;
- }
-
-
- (void)lseek (fd, 0, SEEK_SET);
- map = rspamd_file_xmap (fpath, PROT_READ, &sz, TRUE);
- unlink (fpath);
- close (fd);
-
- if (map != NULL) {
- res = lua_newuserdata (L, sizeof (*res));
- res->len = sz;
- res->start = map;
- res->flags = RSPAMD_TEXT_FLAG_OWN|RSPAMD_TEXT_FLAG_MMAPED;
- rspamd_lua_setclass (L, "rspamd{text}", -1);
- }
- else {
- lua_pushnil (L);
- }
-
- }
-
- return 1;
-#endif
-}
-
-
-/**
- * @method rspamd_fann:train(inputs, outputs)
- * Trains neural network with samples. Inputs and outputs should be tables of
- * equal size, each row in table should be N inputs and M outputs, e.g.
- * {0, 1, 1} -> {0}
- * @param {table} inputs input samples
- * @param {table} outputs output samples
- * @return {number} number of samples learned
- */
-static gint
-lua_fann_train (lua_State *L)
-{
-#ifndef WITH_FANN
- return 0;
-#else
- struct fann *f = rspamd_lua_check_fann (L, 1);
- guint ninputs, noutputs, j;
- fann_type *cur_input, *cur_output;
- gboolean ret = FALSE;
-
- if (f != NULL) {
- /* First check sanity, call for table.getn for that */
- ninputs = rspamd_lua_table_size (L, 2);
- noutputs = rspamd_lua_table_size (L, 3);
-
- if (ninputs != fann_get_num_input (f) ||
- noutputs != fann_get_num_output (f)) {
- msg_err ("bad number of inputs(%d, expected %d) and "
- "output(%d, expected %d) args for train",
- ninputs, fann_get_num_input (f),
- noutputs, fann_get_num_output (f));
- }
- else {
- cur_input = g_malloc (ninputs * sizeof (fann_type));
-
- for (j = 0; j < ninputs; j ++) {
- lua_rawgeti (L, 2, j + 1);
- cur_input[j] = lua_tonumber (L, -1);
- lua_pop (L, 1);
- }
-
- cur_output = g_malloc (noutputs * sizeof (fann_type));
-
- for (j = 0; j < noutputs; j++) {
- lua_rawgeti (L, 3, j + 1);
- cur_output[j] = lua_tonumber (L, -1);
- lua_pop (L, 1);
- }
-
- fann_train (f, cur_input, cur_output);
- g_free (cur_input);
- g_free (cur_output);
-
- ret = TRUE;
- }
- }
-
- lua_pushboolean (L, ret);
-
- return 1;
-#endif
-}
-
-#ifdef WITH_FANN
-struct lua_fann_train_cbdata {
- lua_State *L;
- gint pair[2];
- struct fann_train_data *train;
- struct fann *f;
- gint cbref;
- gdouble desired_mse;
- guint max_epochs;
- GThread *t;
- struct event io;
-};
-
-struct lua_fann_train_reply {
- gint errcode;
- float mse;
- gchar errmsg[128];
-};
-
-static void
-lua_fann_push_train_result (struct lua_fann_train_cbdata *cbdata,
- gint errcode, float mse, const gchar *errmsg)
-{
- lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref);
- lua_pushnumber (cbdata->L, errcode);
- lua_pushstring (cbdata->L, errmsg);
- lua_pushnumber (cbdata->L, mse);
-
- if (lua_pcall (cbdata->L, 3, 0, 0) != 0) {
- msg_err ("call to train callback failed: %s", lua_tostring (cbdata->L, -1));
- lua_pop (cbdata->L, 1);
- }
-}
-
-static void
-lua_fann_thread_notify (gint fd, short what, gpointer ud)
-{
- struct lua_fann_train_cbdata *cbdata = ud;
- struct lua_fann_train_reply rep;
-
- if (read (cbdata->pair[0], &rep, sizeof (rep)) == -1) {
- if (errno == EAGAIN || errno == EINTR) {
- event_add (&cbdata->io, NULL);
- return;
- }
-
- lua_fann_push_train_result (cbdata, errno, 0.0, strerror (errno));
- }
- else {
- lua_fann_push_train_result (cbdata, rep.errcode, rep.mse, rep.errmsg);
- }
-
- g_assert (write (cbdata->pair[0], "", 1) == 1);
- g_thread_join (cbdata->t);
- close (cbdata->pair[0]);
- close (cbdata->pair[1]);
-
- fann_destroy_train (cbdata->train);
- luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref);
- g_free (cbdata);
-}
-
-static void *
-lua_fann_train_thread (void *ud)
-{
- struct lua_fann_train_cbdata *cbdata = ud;
- struct lua_fann_train_reply rep;
- gchar repbuf[1];
-
- msg_info ("start learning ANN, %d epochs are possible",
- cbdata->max_epochs);
- rspamd_socket_blocking (cbdata->pair[1]);
- fann_train_on_data (cbdata->f, cbdata->train, cbdata->max_epochs, 0,
- cbdata->desired_mse);
- rep.errcode = 0;
- rspamd_strlcpy (rep.errmsg, "OK", sizeof (rep.errmsg));
- rep.mse = fann_get_MSE (cbdata->f);
-
- if (write (cbdata->pair[1], &rep, sizeof (rep)) == -1) {
- msg_err ("cannot write to socketpair: %s", strerror (errno));
-
- return NULL;
- }
-
- if (read (cbdata->pair[1], repbuf, sizeof (repbuf)) == -1) {
- msg_err ("cannot read from socketpair: %s", strerror (errno));
-
- return NULL;
- }
-
- return NULL;
-}
-#endif
-/**
- * @method rspamd_fann:train_threaded(inputs, outputs, callback, event_base, {params})
- * Trains neural network with batch of samples. Inputs and outputs should be tables of
- * equal size, each row in table should be N inputs and M outputs, e.g.
- * {{0, 1, 1}, ...} -> {{0}, {1} ...}
- * @param {table} inputs input samples
- * @param {table} outputs output samples
- * @param {callback} function that is called when train is completed
- */
-static gint
-lua_fann_train_threaded (lua_State *L)
-{
-#ifndef WITH_FANN
- return 0;
-#else
- struct fann *f = rspamd_lua_check_fann (L, 1);
- guint ninputs, noutputs, ndata, i, j;
- struct lua_fann_train_cbdata *cbdata;
- struct ev_loop *ev_base = lua_check_ev_base (L, 5);
- GError *err = NULL;
- const guint max_epochs_default = 1000;
- const gdouble desired_mse_default = 0.0001;
-
- if (f != NULL && lua_type (L, 2) == LUA_TTABLE &&
- lua_type (L, 3) == LUA_TTABLE && lua_type (L, 4) == LUA_TFUNCTION &&
- ev_base != NULL) {
- /* First check sanity, call for table.getn for that */
- ndata = rspamd_lua_table_size (L, 2);
- ninputs = fann_get_num_input (f);
- noutputs = fann_get_num_output (f);
- cbdata = g_malloc0 (sizeof (*cbdata));
- cbdata->L = L;
- cbdata->f = f;
- cbdata->train = rspamd_fann_create_train (ndata, ninputs, noutputs);
- lua_pushvalue (L, 4);
- cbdata->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
-
- if (rspamd_socketpair (cbdata->pair, 0) == -1) {
- msg_err ("cannot open socketpair: %s", strerror (errno));
- cbdata->pair[0] = -1;
- cbdata->pair[1] = -1;
- goto err;
- }
-
- for (i = 0; i < ndata; i ++) {
- lua_rawgeti (L, 2, i + 1);
-
- if (rspamd_lua_table_size (L, -1) != ninputs) {
- msg_err ("invalid number of inputs: %d, %d expected",
- rspamd_lua_table_size (L, -1), ninputs);
- goto err;
- }
-
- for (j = 0; j < ninputs; j ++) {
- lua_rawgeti (L, -1, j + 1);
- cbdata->train->input[i][j] = lua_tonumber (L, -1);
- lua_pop (L, 1);
- }
-
- lua_pop (L, 1);
- lua_rawgeti (L, 3, i + 1);
-
- if (rspamd_lua_table_size (L, -1) != noutputs) {
- msg_err ("invalid number of outputs: %d, %d expected",
- rspamd_lua_table_size (L, -1), noutputs);
- goto err;
- }
-
- for (j = 0; j < noutputs; j++) {
- lua_rawgeti (L, -1, j + 1);
- cbdata->train->output[i][j] = lua_tonumber (L, -1);
- lua_pop (L, 1);
- }
- }
-
- cbdata->max_epochs = max_epochs_default;
- cbdata->desired_mse = desired_mse_default;
-
- if (lua_type (L, 5) == LUA_TTABLE) {
- rspamd_lua_parse_table_arguments (L, 5, NULL,
- "max_epochs=I;desired_mse=N",
- &cbdata->max_epochs, &cbdata->desired_mse);
- }
-
- /* Now we can call training in a separate thread */
- rspamd_socket_nonblocking (cbdata->pair[0]);
- event_set (&cbdata->io, cbdata->pair[0], EV_READ, lua_fann_thread_notify,
- cbdata);
- event_base_set (ev_base, &cbdata->io);
- /* TODO: add timeout */
- event_add (&cbdata->io, NULL);
- cbdata->t = rspamd_create_thread ("fann train", lua_fann_train_thread,
- cbdata, &err);
-
- if (cbdata->t == NULL) {
- msg_err ("cannot create training thread: %e", err);
-
- if (err) {
- g_error_free (err);
- }
-
- goto err;
- }
- }
- else {
- return luaL_error (L, "invalid arguments");
- }
-
- return 0;
-
-err:
- if (cbdata->pair[0] != -1) {
- close (cbdata->pair[0]);
- }
- if (cbdata->pair[1] != -1) {
- close (cbdata->pair[1]);
- }
-
- fann_destroy_train (cbdata->train);
- luaL_unref (L, LUA_REGISTRYINDEX, cbdata->cbref);
- g_free (cbdata);
- return luaL_error (L, "invalid arguments");
-#endif
-}
-
-/**
- * @method rspamd_fann:test(inputs)
- * Tests neural network with samples. Inputs is a single sample of input data.
- * The function returns table of results, e.g.:
- * {0, 1, 1} -> {0}
- * @param {table} inputs input sample
- * @return {table/number} outputs values
- */
-static gint
-lua_fann_test (lua_State *L)
-{
-#ifndef WITH_FANN
- return 0;
-#else
- struct fann *f = rspamd_lua_check_fann (L, 1);
- guint ninputs, noutputs, i, tbl_idx = 2;
- fann_type *cur_input, *cur_output;
-
- if (f != NULL) {
- /* First check sanity, call for table.getn for that */
- if (lua_isnumber (L, 2)) {
- ninputs = lua_tonumber (L, 2);
- tbl_idx = 3;
- }
- else {
- ninputs = rspamd_lua_table_size (L, 2);
-
- if (ninputs == 0) {
- msg_err ("empty inputs number");
- lua_pushnil (L);
-
- return 1;
- }
- }
-
- cur_input = g_malloc0 (ninputs * sizeof (fann_type));
-
- for (i = 0; i < ninputs; i++) {
- lua_rawgeti (L, tbl_idx, i + 1);
- cur_input[i] = lua_tonumber (L, -1);
- lua_pop (L, 1);
- }
-
- cur_output = fann_run (f, cur_input);
- noutputs = fann_get_num_output (f);
- lua_createtable (L, noutputs, 0);
-
- for (i = 0; i < noutputs; i ++) {
- lua_pushnumber (L, cur_output[i]);
- lua_rawseti (L, -2, i + 1);
- }
-
- g_free (cur_input);
- }
- else {
- lua_pushnil (L);
- }
-
- return 1;
-#endif
-}
-
-/***
- * @method rspamd_fann:get_inputs()
- * Returns number of inputs for neural network
- * @return {number} number of inputs
- */
-static gint
-lua_fann_get_inputs (lua_State *L)
-{
-#ifndef WITH_FANN
- return 0;
-#else
- struct fann *f = rspamd_lua_check_fann (L, 1);
-
- if (f != NULL) {
- lua_pushnumber (L, fann_get_num_input (f));
- }
- else {
- lua_pushnil (L);
- }
-
- return 1;
-#endif
-}
-
-/***
- * @method rspamd_fann:get_outputs()
- * Returns number of outputs for neural network
- * @return {number} number of outputs
- */
-static gint
-lua_fann_get_outputs (lua_State *L)
-{
-#ifndef WITH_FANN
- return 0;
-#else
- struct fann *f = rspamd_lua_check_fann (L, 1);
-
- if (f != NULL) {
- lua_pushnumber (L, fann_get_num_output (f));
- }
- else {
- lua_pushnil (L);
- }
-
- return 1;
-#endif
-}
-
-/***
- * @method rspamd_fann:get_mse()
- * Returns mean square error for ANN
- * @return {number} MSE value
- */
-static gint
-lua_fann_get_mse (lua_State *L)
-{
-#ifndef WITH_FANN
- return 0;
-#else
- struct fann *f = rspamd_lua_check_fann (L, 1);
-
- if (f != NULL) {
- lua_pushnumber (L, fann_get_MSE (f));
- }
- else {
- lua_pushnil (L);
- }
-
- return 1;
-#endif
-}
-
-/***
- * @method rspamd_fann:get_layers()
- * Returns array of neurons count for each layer
- * @return {table/number} table with number ofr neurons in each layer
- */
-static gint
-lua_fann_get_layers (lua_State *L)
-{
-#ifndef WITH_FANN
- return 0;
-#else
- struct fann *f = rspamd_lua_check_fann (L, 1);
- guint nlayers, i, *layers;
-
- if (f != NULL) {
- nlayers = fann_get_num_layers (f);
- layers = g_new (guint, nlayers);
- fann_get_layer_array (f, layers);
- lua_createtable (L, nlayers, 0);
-
- for (i = 0; i < nlayers; i ++) {
- lua_pushnumber (L, layers[i]);
- lua_rawseti (L, -2, i + 1);
- }
-
- g_free (layers);
- }
- else {
- lua_pushnil (L);
- }
-
- return 1;
-#endif
-}
-
-/***
- * @method rspamd_fann:save(fname)
- * Save fann to file named 'fname'
- * @param {string} fname filename to save fann into
- * @return {boolean} true if ann has been saved
- */
-static gint
-lua_fann_save (lua_State *L)
-{
-#ifndef WITH_FANN
- return 0;
-#else
- struct fann *f = rspamd_lua_check_fann (L, 1);
- const gchar *fname = luaL_checkstring (L, 2);
-
- if (f != NULL && fname != NULL) {
- if (fann_save (f, fname) == 0) {
- lua_pushboolean (L, true);
- }
- else {
- msg_err ("cannot save ANN to %s: %s", fname, strerror (errno));
- lua_pushboolean (L, false);
- }
- }
- else {
- lua_pushnil (L);
- }
-
- return 1;
-#endif
-}
-
-static gint
-lua_fann_dtor (lua_State *L)
-{
-#ifndef WITH_FANN
- return 0;
-#else
- struct fann *f = rspamd_lua_check_fann (L, 1);
-
- if (f) {
- fann_destroy (f);
- }
-
- return 0;
-#endif
-}
-
-static gint
-lua_load_fann (lua_State * L)
-{
- lua_newtable (L);
- luaL_register (L, NULL, fannlib_f);
-
- return 1;
-}
-
-void
-luaopen_fann (lua_State * L)
-{
- rspamd_lua_new_class (L, "rspamd{fann}", fannlib_m);
- lua_pop (L, 1);
-
- rspamd_lua_add_preload (L, "rspamd_fann", lua_load_fann);
-}
diff --git a/src/rspamd.c b/src/rspamd.c
index 685793ee8..a4a659a53 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -984,21 +984,21 @@ do_encrypt_password (void)
/* Signal handlers */
static void
-rspamd_term_handler (gint signo, short what, gpointer arg)
+rspamd_term_handler (struct ev_loop *loop, ev_signal *w, int revents)
{
- struct rspamd_main *rspamd_main = arg;
+ struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
msg_info_main ("catch termination signal, waiting for children");
rspamd_log_nolock (rspamd_main->logger);
- rspamd_pass_signal (rspamd_main->workers, signo);
+ rspamd_pass_signal (rspamd_main->workers, w->signum);
- event_base_loopexit (rspamd_main->event_loop, NULL);
+ ev_break (rspamd_main->event_loop, EVBREAK_ALL);
}
static void
-rspamd_usr1_handler (gint signo, short what, gpointer arg)
+rspamd_usr1_handler (struct ev_loop *loop, ev_signal *w, int revents)
{
- struct rspamd_main *rspamd_main = arg;
+ struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
rspamd_log_reopen_priv (rspamd_main->logger,
rspamd_main->workers_uid,
@@ -1008,9 +1008,9 @@ rspamd_usr1_handler (gint signo, short what, gpointer arg)
}
static void
-rspamd_hup_handler (gint signo, short what, gpointer arg)
+rspamd_hup_handler (struct ev_loop *loop, ev_signal *w, int revents)
{
- struct rspamd_main *rspamd_main = arg;
+ struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
msg_info_main ("rspamd "
RVERSION
@@ -1243,8 +1243,9 @@ main (gint argc, gchar **argv, gchar **env)
worker_t **pworker;
GQuark type;
rspamd_inet_addr_t *control_addr = NULL;
- struct ev_loop *ev_base;
- struct event term_ev, int_ev, cld_ev, hup_ev, usr1_ev, control_ev;
+ struct ev_loop *event_loop;
+ ev_signal term_ev, int_ev, cld_ev, hup_ev, usr1_ev;
+ ev_io control_ev;
struct timeval term_tv;
struct rspamd_main *rspamd_main;
gboolean skip_pid = FALSE, valgrind_mode = FALSE;
@@ -1498,47 +1499,53 @@ main (gint argc, gchar **argv, gchar **env)
rspamd_main->workers = g_hash_table_new (g_direct_hash, g_direct_equal);
/* Init event base */
- ev_base = event_init ();
- rspamd_main->event_loop = ev_base;
+ event_loop = ev_default_loop (EVFLAG_SIGNALFD);
+ rspamd_main->event_loop = event_loop;
/* Unblock signals */
sigemptyset (&signals.sa_mask);
sigprocmask (SIG_SETMASK, &signals.sa_mask, NULL);
/* Set events for signals */
- evsignal_set (&term_ev, SIGTERM, rspamd_term_handler, rspamd_main);
- event_base_set (ev_base, &term_ev);
- event_add (&term_ev, NULL);
- evsignal_set (&int_ev, SIGINT, rspamd_term_handler, rspamd_main);
- event_base_set (ev_base, &int_ev);
- event_add (&int_ev, NULL);
- evsignal_set (&hup_ev, SIGHUP, rspamd_hup_handler, rspamd_main);
- event_base_set (ev_base, &hup_ev);
- event_add (&hup_ev, NULL);
+ ev_signal_init (&term_ev, rspamd_term_handler, SIGTERM);
+ term_ev.data = rspamd_main;
+ ev_signal_start (event_loop, &term_ev);
+
+ ev_signal_init (&int_ev, rspamd_term_handler, SIGINT);
+ int_ev.data = rspamd_main;
+ ev_signal_start (event_loop, &int_ev);
+
+ ev_signal_init (&hup_ev, rspamd_hup_handler, SIGHUP);
+ hup_ev.data = rspamd_main;
+ ev_signal_start (event_loop, &hup_ev);
+
+ ev_signal_init (&usr1_ev, rspamd_usr1_handler, SIGUSR1);
+ usr1_ev.data = rspamd_main;
+ ev_signal_start (event_loop, &usr1_ev);
+
+
+ /* XXX: deal with children */
evsignal_set (&cld_ev, SIGCHLD, rspamd_cld_handler, rspamd_main);
- event_base_set (ev_base, &cld_ev);
+ event_base_set (event_loop, &cld_ev);
event_add (&cld_ev, NULL);
- evsignal_set (&usr1_ev, SIGUSR1, rspamd_usr1_handler, rspamd_main);
- event_base_set (ev_base, &usr1_ev);
- event_add (&usr1_ev, NULL);
+
rspamd_check_core_limits (rspamd_main);
rspamd_mempool_lock_mutex (rspamd_main->start_mtx);
- spawn_workers (rspamd_main, ev_base);
+ spawn_workers (rspamd_main, event_loop);
rspamd_mempool_unlock_mutex (rspamd_main->start_mtx);
rspamd_main->http_ctx = rspamd_http_context_create (rspamd_main->cfg,
- ev_base, rspamd_main->cfg->ups_ctx);
+ event_loop, rspamd_main->cfg->ups_ctx);
if (control_fd != -1) {
msg_info_main ("listening for control commands on %s",
rspamd_inet_address_to_string (control_addr));
- event_set (&control_ev, control_fd, EV_READ|EV_PERSIST,
- rspamd_control_handler, rspamd_main);
- event_base_set (ev_base, &control_ev);
- event_add (&control_ev, NULL);
+ ev_io_init (&control_ev, rspamd_control_handler, control_fd, EV_READ);
+ control_ev.data = rspamd_main;
+ ev_io_start (event_loop, &control_ev);
}
- event_base_loop (ev_base, 0);
+ event_base_loop (event_loop, 0);
/* We need to block signals unless children are waited for */
rspamd_worker_block_signals ();
@@ -1570,10 +1577,10 @@ main (gint argc, gchar **argv, gchar **env)
event_set (&term_ev, -1, EV_TIMEOUT|EV_PERSIST,
rspamd_final_term_handler, rspamd_main);
- event_base_set (ev_base, &term_ev);
+ event_base_set (event_loop, &term_ev);
event_add (&term_ev, &term_tv);
- event_base_loop (ev_base, 0);
+ event_base_loop (event_loop, 0);
event_del (&term_ev);
/* Maybe save roll history */
@@ -1595,7 +1602,7 @@ main (gint argc, gchar **argv, gchar **env)
}
g_free (rspamd_main);
- event_base_free (ev_base);
+ event_base_free (event_loop);
sqlite3_shutdown ();
if (control_addr) {
diff --git a/src/rspamd.h b/src/rspamd.h
index 9af78e904..375cba13f 100644
--- a/src/rspamd.h
+++ b/src/rspamd.h
@@ -62,6 +62,12 @@ enum rspamd_worker_flags {
RSPAMD_WORKER_CONTROLLER = (1 << 6),
};
+struct rspamd_worker_accept_event {
+ ev_io accept_ev;
+ ev_timer throttling_ev;
+ struct ev_loop *event_loop;
+ struct rspamd_worker_accept_event *prev, *next;
+};
/**
* Worker process structure
@@ -77,7 +83,7 @@ struct rspamd_worker {
struct rspamd_main *srv; /**< pointer to server structure */
GQuark type; /**< process type */
GHashTable *signal_events; /**< signal events */
- GList *accept_events; /**< socket events */
+ struct rspamd_worker_accept_event *accept_events; /**< socket events */
struct rspamd_worker_conf *cf; /**< worker config data */
gpointer ctx; /**< worker's specific data */
enum rspamd_worker_flags flags; /**< worker's flags */
@@ -85,7 +91,7 @@ struct rspamd_worker {
[1] is used by a worker */
gint srv_pipe[2]; /**< used by workers to request something from the
main process. [0] - main, [1] - worker */
- struct event srv_ev; /**< used by main for read workers' requests */
+ ev_io srv_ev; /**< used by main for read workers' requests */
gpointer control_data; /**< used by control protocol to handle commands */
gpointer tmp_data; /**< used to avoid race condition to deal with control messages */
GPtrArray *finish_actions; /**< called when worker is terminated */