new->user_data = user_data;
new->wanna_die = FALSE;
new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ new->mtx = g_mutex_new ();
+ memory_pool_add_destructor (pool, (pool_destruct_func) g_mutex_free, new->mtx);
+#else
+ new->mtx = memory_pool_alloc (pool, sizeof (GMutex));
+ g_mutex_init (new->mtx);
+#endif
new->threads = 0;
memory_pool_add_destructor (pool, (pool_destruct_func) g_hash_table_destroy, new->events);
gboolean
check_session_pending (struct rspamd_async_session *session)
{
- if (session->threads == 0 && g_hash_table_size (session->events) == 0) {
+ g_mutex_lock (session->mtx);
+ if (session->wanna_die && session->threads == 0 && g_hash_table_size (session->events) == 0) {
+ session->wanna_die = FALSE;
if (session->fin != NULL) {
session->fin (session->user_data);
}
if (session->restore != NULL) {
session->restore (session->user_data);
}
+ g_mutex_unlock (session->mtx);
return TRUE;
}
+ g_mutex_unlock (session->mtx);
return FALSE;
}
-
+ g_mutex_unlock (session->mtx);
return TRUE;
}
register_async_thread (struct rspamd_async_session *session)
{
g_atomic_int_inc (&session->threads);
+#ifdef RSPAMD_EVENTS_DEBUG
+ msg_info ("added thread: pending %d thread", session->threads);
+#endif
}
/**
if (g_atomic_int_dec_and_test (&session->threads)) {
(void) check_session_pending (session);
}
+#ifdef RSPAMD_EVENTS_DEBUG
+ msg_info ("removed thread: pending %d thread", session->threads);
+#endif
}
memory_pool_t *pool;
gboolean wanna_die;
guint threads;
+ GMutex *mtx;
};
/**
if (!task->pass_all_filters &&
metric->action == METRIC_ACTION_REJECT &&
check_metric_is_spam (task, metric)) {
+ task->s->wanna_die = TRUE;
check_session_pending (task->s);
return 1;
}
cur = g_list_next (cur);
}
}
+ task->s->wanna_die = TRUE;
check_session_pending (task->s);
return 1;
make_composites (task);
}
+void
+process_statfiles_threaded (gpointer data, gpointer user_data)
+{
+ struct worker_task *task = (struct worker_task *)data;
+
+ if (task->is_skipped) {
+ return;
+ }
+
+ if (task->tokens == NULL) {
+ task->tokens = g_hash_table_new (g_direct_hash, g_direct_equal);
+ memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_hash_table_unref, task->tokens);
+ }
+
+ g_list_foreach (task->cfg->classifiers, classifiers_callback, task);
+ remove_async_thread (task->s);
+}
+
static void
insert_metric_header (gpointer metric_name, gpointer metric_value, gpointer data)
{
*/
void process_statfiles (struct worker_task *task);
+/**
+ * Process message with statfiles threaded
+ * @param data worker's task that present message from user
+ */
+void process_statfiles_threaded (gpointer data, gpointer user_data);
+
/**
* Insert a result to task
* @param task worker's task that present message from user
memory_pool_t *regexp_pool;
memory_pool_t *dynamic_pool;
gsize max_size;
+ gsize max_threads;
GThreadPool *workers;
};
gchar *value;
gint res = TRUE;
struct regexp_json_buf *jb, **pjb;
- gsize thr;
- GError *err = NULL;
if ((value = get_module_opt (cfg, "regexp", "statfile_prefix")) != NULL) {
regexp_module_ctx->statfile_prefix = memory_pool_strdup (regexp_module_ctx->regexp_pool, value);
}
if ((value = get_module_opt (cfg, "regexp", "max_threads")) != NULL) {
if (g_thread_supported ()) {
- thr = parse_limit (value, -1);
- if (thr > 1) {
-#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
- g_thread_init (NULL);
- workers_mtx = g_mutex_new ();
-#else
- workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex));
- g_mutex_init (workers_mtx);
-#endif
- regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded, regexp_module_ctx, thr, TRUE, &err);
- if (err != NULL) {
- msg_err ("thread pool creation failed: %s", err->message);
- regexp_module_ctx->workers = NULL;
- }
- }
+ regexp_module_ctx->max_threads = parse_limit (value, -1);
}
+ else {
+ regexp_module_ctx->max_threads = 0;
+ }
+ regexp_module_ctx->workers = NULL;
}
else {
+ regexp_module_ctx->max_threads = 0;
regexp_module_ctx->workers = NULL;
}
if ((value = get_module_opt (cfg, "regexp", "dynamic_rules")) != NULL) {
}
else if (it->type == EXPR_STR) {
/* This may be lua function, try to call it */
- cur = maybe_call_lua_function ((const gchar*)it->content.operand, task);
+ if (regexp_module_ctx->workers != NULL) {
+ g_mutex_lock (workers_mtx);
+ cur = maybe_call_lua_function ((const gchar*)it->content.operand, task);
+ g_mutex_unlock (workers_mtx);
+ }
debug_task ("function %s returned %s", (const gchar *)it->content.operand, cur ? "true" : "false");
if (try_optimize) {
try_optimize = optimize_regexp_expression (&it, stack, cur);
static void
process_regexp_item_threaded (gpointer data, gpointer user_data)
{
- struct regexp_threaded_ud *ud = user_data;
- gboolean res = FALSE;
+ struct regexp_threaded_ud *ud = data;
- if (ud->item->lua_function) {
- /* Just call function */
- if (lua_call_expression_func ("regexp", ud->item->lua_function, ud->task, NULL, &res) && res) {
- g_mutex_lock (workers_mtx);
- insert_result (ud->task, ud->item->symbol, 1, NULL);
- g_mutex_unlock (workers_mtx);
- }
- }
- else {
- /* Process expression */
- if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) {
- g_mutex_lock (workers_mtx);
- insert_result (ud->task, ud->item->symbol, 1, NULL);
- g_mutex_unlock (workers_mtx);
- }
+ /* Process expression */
+ if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) {
+ g_mutex_lock (workers_mtx);
+ insert_result (ud->task, ud->item->symbol, 1, NULL);
+ g_mutex_unlock (workers_mtx);
}
remove_async_thread (ud->task->s);
}
struct regexp_module_item *item = user_data;
gboolean res = FALSE;
struct regexp_threaded_ud *thr_ud;
+ GError *err = NULL;
+
- if (regexp_module_ctx->workers) {
+ if (!item->lua_function && regexp_module_ctx->max_threads > 1) {
+ if (regexp_module_ctx->workers == NULL) {
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_thread_init (NULL);
+ workers_mtx = g_mutex_new ();
+#else
+ workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex));
+ g_mutex_init (workers_mtx);
+#endif
+ regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded,
+ regexp_module_ctx, regexp_module_ctx->max_threads, TRUE, &err);
+ if (err != NULL) {
+ msg_err ("thread pool creation failed: %s", err->message);
+ regexp_module_ctx->max_threads = 0;
+ return;
+ }
+ }
thr_ud = memory_pool_alloc (task->task_pool, sizeof (struct regexp_threaded_ud));
thr_ud->item = item;
thr_ud->task = task;
- register_async_thread (task->s);
- g_thread_pool_push (regexp_module_ctx->workers, thr_ud, NULL);
+
+ g_thread_pool_push (regexp_module_ctx->workers, thr_ud, &err);
+ if (err == NULL) {
+ register_async_thread (task->s);
+ }
+ else {
+ msg_err ("error pushing task to the regexp thread pool: %s", err->message);
+ }
}
else {
/* Non-threaded version */
guint32 tasks;
/* Limit of tasks */
guint32 max_tasks;
+ /* Classify threads */
+ guint32 classify_threads;
+ /* Classify threads */
+ GThreadPool *classify_pool;
+ /* Events base */
struct event_base *ev_base;
};
task->state = WRITE_ERROR;
return write_socket (task);
}
+ /* Add task to classify to classify pool */
+ if (ctx->classify_pool) {
+ g_thread_pool_push (ctx->classify_pool, task, &err);
+ if (err != NULL) {
+ msg_err ("cannot pull task to the pool: %s", err->message);
+ }
+ else {
+ register_async_thread (task->s);
+ }
+ }
}
break;
case WRITE_REPLY:
fin_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
+ struct rspamd_worker_ctx *ctx;
+ ctx = task->worker->ctx;
if (task->state != WAIT_POST_FILTER) {
/* Process all statfiles */
- process_statfiles (task);
+ if (ctx->classify_pool == NULL) {
+ /* Non-threaded version */
+ process_statfiles (task);
+ }
+ else {
+ /* Just process composites */
+ make_composites (task);
+ }
/* Call post filters */
lua_call_post_filters (task);
}
ctx->is_mime = TRUE;
ctx->timeout = DEFAULT_WORKER_IO_TIMEOUT;
+ ctx->classify_threads = 1;
register_worker_opt (type, "mime", xml_handle_boolean, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime));
register_worker_opt (type, "http", xml_handle_boolean, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http));
register_worker_opt (type, "allow_learn", xml_handle_boolean, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, allow_learn));
register_worker_opt (type, "timeout", xml_handle_seconds, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, timeout));
register_worker_opt (type, "max_tasks", xml_handle_uint32, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, max_tasks));
+ register_worker_opt (type, "classify_threads", xml_handle_uint32, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, classify_threads));
return ctx;
}
struct sigaction signals;
gchar *is_custom_str;
struct rspamd_worker_ctx *ctx = worker->ctx;
+ GError *err = NULL;
#ifdef WITH_PROFILER
extern void _start (void), etext (void);
ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
+ /* Create classify pool */
+ ctx->classify_pool = NULL;
+ if (ctx->classify_threads > 1) {
+ ctx->classify_pool = g_thread_pool_new (process_statfiles_threaded, ctx, ctx->classify_threads, TRUE, &err);
+ if (err != NULL) {
+ msg_err ("pool create failed: %s", err->message);
+ ctx->classify_pool = NULL;
+ }
+ }
+
event_base_loop (ctx->ev_base, 0);
#ifndef BUILD_STATIC