summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-02-13 19:09:27 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-02-13 19:09:27 +0400
commit0d64c808b7310b6e233ec570649fbb281a3f2b13 (patch)
treebec667c9c350cc53dcd05e7779988486778c64c9 /src
parentb5f2b43a8d1fc3361dc1665c62b2a44d66c01474 (diff)
downloadrspamd-0d64c808b7310b6e233ec570649fbb281a3f2b13.tar.gz
rspamd-0d64c808b7310b6e233ec570649fbb281a3f2b13.zip
Add quarks to events to determine source of event inside rspamd.
Another try to fix threading.
Diffstat (limited to 'src')
-rw-r--r--src/dns.c8
-rw-r--r--src/events.c21
-rw-r--r--src/events.h3
-rw-r--r--src/logger.c14
-rw-r--r--src/lua/lua_http.c2
-rw-r--r--src/lua/lua_redis.c2
-rw-r--r--src/main.c3
-rw-r--r--src/plugins/fuzzy_check.c4
-rw-r--r--src/plugins/surbl.c2
-rw-r--r--src/smtp.c2
-rw-r--r--src/smtp_utils.c2
-rw-r--r--src/worker.c20
12 files changed, 55 insertions, 28 deletions
diff --git a/src/dns.c b/src/dns.c
index bd048c4ac..fe087da08 100644
--- a/src/dns.c
+++ b/src/dns.c
@@ -744,7 +744,7 @@ send_dns_request (struct rspamd_dns_request *req)
event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req);
event_base_set (req->resolver->ev_base, &req->io_event);
event_add (&req->io_event, &req->tv);
- register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, FALSE);
+ register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, g_quark_from_static_string ("dns resolver"));
return 0;
}
else {
@@ -757,7 +757,7 @@ send_dns_request (struct rspamd_dns_request *req)
event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req);
event_base_set (req->resolver->ev_base, &req->io_event);
event_add (&req->io_event, &req->tv);
- register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, FALSE);
+ register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, g_quark_from_static_string ("dns resolver"));
return 0;
}
@@ -1343,7 +1343,7 @@ dns_retransmit_handler (gint fd, short what, void *arg)
/* Add request to hash table */
g_hash_table_insert (req->resolver->requests, &req->id, req);
- register_async_event (req->session, (event_finalizer_t)dns_fin_cb, req, FALSE);
+ register_async_event (req->session, (event_finalizer_t)dns_fin_cb, req, g_quark_from_static_string ("dns resolver"));
}
}
}
@@ -1450,7 +1450,7 @@ make_dns_request (struct rspamd_dns_resolver *resolver,
req->id = header->qid;
}
g_hash_table_insert (resolver->requests, &req->id, req);
- register_async_event (session, (event_finalizer_t)dns_fin_cb, req, FALSE);
+ register_async_event (session, (event_finalizer_t)dns_fin_cb, req, g_quark_from_static_string ("dns resolver"));
}
else if (r == -1) {
return FALSE;
diff --git a/src/events.c b/src/events.c
index 98b492ffb..b658f48a9 100644
--- a/src/events.c
+++ b/src/events.c
@@ -87,7 +87,7 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin,
}
void
-register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced)
+register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, GQuark subsystem)
{
struct rspamd_async_event *new;
@@ -99,32 +99,39 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event));
new->fin = fin;
new->user_data = user_data;
+ new->subsystem = subsystem;
g_hash_table_insert (session->events, new, new);
#ifdef RSPAMD_EVENTS_DEBUG
- msg_info ("added event: %p, pending %d events", user_data, g_hash_table_size (session->events));
+ msg_info ("added event: %p, pending %d events, subsystem: %s", user_data, g_hash_table_size (session->events),
+ g_quark_to_string (subsystem));
#endif
}
void
remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud)
{
- struct rspamd_async_event search_ev;
+ struct rspamd_async_event search_ev, *found_ev;
if (session == NULL) {
msg_info ("session is NULL");
return;
}
+ /* Search for event */
search_ev.fin = fin;
search_ev.user_data = ud;
- if (g_hash_table_remove (session->events, &search_ev)) {
+ if ((found_ev = g_hash_table_lookup (session->events, &search_ev)) != NULL) {
+ g_hash_table_remove (session->events, found_ev);
+#ifdef RSPAMD_EVENTS_DEBUG
+ msg_info ("removed event: %p, subsystem: %s, pending %d events", ud,
+ g_quark_to_string (found_ev->subsystem), g_hash_table_size (session->events));
+#endif
+ /* Remove event */
fin (ud);
}
-#ifdef RSPAMD_EVENTS_DEBUG
- msg_info ("removed event: %p, pending %d events", ud, g_hash_table_size (session->events));
-#endif
+
check_session_pending (session);
}
diff --git a/src/events.h b/src/events.h
index af41cc84a..2c4ea2c91 100644
--- a/src/events.h
+++ b/src/events.h
@@ -8,6 +8,7 @@ struct rspamd_async_event;
typedef void (*event_finalizer_t)(void *user_data);
struct rspamd_async_event {
+ GQuark subsystem;
event_finalizer_t fin;
void *user_data;
guint ref;
@@ -46,7 +47,7 @@ struct rspamd_async_session *new_async_session (memory_pool_t *pool,
* @param forced unused
*/
void register_async_event (struct rspamd_async_session *session,
- event_finalizer_t fin, void *user_data, gboolean forced);
+ event_finalizer_t fin, void *user_data, GQuark subsystem);
/**
* Remove normal event
diff --git a/src/logger.c b/src/logger.c
index 2abdf0dfc..dfdb5b1b5 100644
--- a/src/logger.c
+++ b/src/logger.c
@@ -60,6 +60,7 @@ struct rspamd_logger_s {
guint32 repeats;
gchar *saved_message;
gchar *saved_function;
+ GMutex *mtx;
};
static const gchar lf_chr = '\n';
@@ -268,6 +269,13 @@ rspamd_set_logger (enum rspamd_log_type type, GQuark ptype, struct rspamd_main *
rspamd->logger->pid = getpid ();
rspamd->logger->process_type = ptype;
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ rspamd->logger->mtx = g_mutex_new ();
+#else
+ rspamd->logger->mtx = g_malloc (sizeof (GMutex));
+ g_mutex_init (rspamd->logger->mtx);
+#endif
+
switch (type) {
case RSPAMD_LOG_CONSOLE:
rspamd->logger->log_func = file_log_function;
@@ -369,12 +377,14 @@ rspamd_common_log_function (rspamd_logger_t *rspamd_log, GLogLevelFlags log_leve
u_char *end;
if (log_level <= rspamd_log->cfg->log_level) {
+ g_mutex_lock (rspamd_log->mtx);
va_start (vp, fmt);
end = rspamd_vsnprintf (logbuf, sizeof (logbuf), fmt, vp);
*end = '\0';
(void)rspamd_escape_string (escaped_logbuf, logbuf, sizeof (escaped_logbuf));
va_end (vp);
rspamd_log->log_func (NULL, function, log_level, escaped_logbuf, FALSE, rspamd_log);
+ g_mutex_unlock (rspamd_log->mtx);
}
}
@@ -643,12 +653,14 @@ rspamd_conditional_debug (rspamd_logger_t *rspamd_log, guint32 addr, const gchar
if (rspamd_log->cfg->log_level >= G_LOG_LEVEL_DEBUG || rspamd_log->is_debug ||
(rspamd_log->debug_ip != NULL && radix32tree_find (rspamd_log->debug_ip, ntohl (addr)) != RADIX_NO_VALUE)) {
+ g_mutex_lock (rspamd_log->mtx);
va_start (vp, fmt);
end = rspamd_vsnprintf (logbuf, sizeof (logbuf), fmt, vp);
*end = '\0';
(void)rspamd_escape_string (escaped_logbuf, logbuf, sizeof (escaped_logbuf));
va_end (vp);
rspamd_log->log_func (NULL, function, G_LOG_LEVEL_DEBUG, escaped_logbuf, TRUE, rspamd_log);
+ g_mutex_unlock (rspamd_log->mtx);
}
}
@@ -662,8 +674,10 @@ rspamd_glib_log_function (const gchar *log_domain, GLogLevelFlags log_level, con
rspamd_logger_t *rspamd_log = arg;
if (rspamd_log->enabled) {
+ g_mutex_lock (rspamd_log->mtx);
(void)rspamd_escape_string (escaped_logbuf, message, sizeof (escaped_logbuf));
rspamd_log->log_func (log_domain, NULL, log_level, escaped_logbuf, FALSE, rspamd_log);
+ g_mutex_unlock (rspamd_log->mtx);
}
}
diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c
index 16f237a36..68faee2e7 100644
--- a/src/lua/lua_http.c
+++ b/src/lua/lua_http.c
@@ -295,7 +295,7 @@ lua_http_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
ud->io_dispatcher = rspamd_create_dispatcher (ud->task->ev_base, ud->fd, BUFFER_LINE, lua_http_read_cb, NULL,
lua_http_err_cb, &tv, ud);
/* Write request */
- register_async_event (ud->task->s, lua_http_fin, ud, FALSE);
+ register_async_event (ud->task->s, lua_http_fin, ud, g_quark_from_static_string ("lua http"));
if (!rspamd_dispatcher_write (ud->io_dispatcher, ud->req_buf, ud->req_len, TRUE, TRUE)) {
lua_http_push_error (450, ud);
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index 4d2469e5f..a37396b7b 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -206,7 +206,7 @@ lua_redis_make_request_real (struct lua_redis_userdata *ud)
return FALSE;
}
else {
- register_async_event (ud->task->s, lua_redis_fin, ud, FALSE);
+ register_async_event (ud->task->s, lua_redis_fin, ud, g_quark_from_static_string ("lua redis"));
}
redisLibeventAttach (ud->ctx, ud->task->ev_base);
/* Make a request now */
diff --git a/src/main.c b/src/main.c
index bb6209974..502b1f0e1 100644
--- a/src/main.c
+++ b/src/main.c
@@ -382,6 +382,9 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
set_worker_limits (cf);
setproctitle ("%s process", cf->worker->name);
rspamd_pidfile_close (rspamd->pfh);
+ /* Do silent log reopen to avoid collisions */
+ close_log (rspamd->logger);
+ open_log (rspamd->logger);
msg_info ("starting %s process %P", cf->worker->name, getpid ());
cf->worker->worker_start_func (cur);
break;
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index ef88cab6b..4f66a5731 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -640,7 +640,7 @@ register_fuzzy_call (struct worker_task *task, fuzzy_hash_t *h)
session->fd = sock;
session->server = selected;
event_add (&session->ev, &session->tv);
- register_async_event (task->s, fuzzy_io_fin, session, FALSE);
+ register_async_event (task->s, fuzzy_io_fin, session, g_quark_from_static_string ("fuzzy check"));
}
}
}
@@ -773,7 +773,7 @@ register_fuzzy_controller_call (struct controller_session *session, struct worke
s->fd = sock;
event_add (&s->ev, &s->tv);
(*saved)++;
- register_async_event (session->s, fuzzy_learn_fin, s, FALSE);
+ register_async_event (session->s, fuzzy_learn_fin, s, g_quark_from_static_string ("fuzzy check"));
return TRUE;
}
}
diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c
index 6e4cad8ce..3c4c09363 100644
--- a/src/plugins/surbl.c
+++ b/src/plugins/surbl.c
@@ -939,7 +939,7 @@ register_redirector_call (struct uri *url, struct worker_task *task,
timeout->tv_usec = (surbl_module_ctx->connect_timeout - timeout->tv_sec * 1000) * 1000;
event_set (&param->ev, s, EV_WRITE, redirector_callback, (void *)param);
event_add (&param->ev, timeout);
- register_async_event (task->s, free_redirector_session, param, FALSE);
+ register_async_event (task->s, free_redirector_session, param, g_quark_from_static_string ("surbl"));
msg_info ("<%s> registered redirector call for %s to %s, according to rule: %s",
task->message_id, struri (url), selected->name, rule);
diff --git a/src/smtp.c b/src/smtp.c
index 46df67b36..605a680cd 100644
--- a/src/smtp.c
+++ b/src/smtp.c
@@ -570,7 +570,7 @@ smtp_make_delay (struct smtp_session *session)
evtimer_set (tev, smtp_delay_handler, session);
evtimer_add (tev, tv);
- register_async_event (session->s, (event_finalizer_t)event_del, tev, FALSE);
+ register_async_event (session->s, (event_finalizer_t)event_del, tev, g_quark_from_static_string ("smtp proxy"));
session->delay_timer = tev;
}
else if (session->state == SMTP_STATE_DELAY) {
diff --git a/src/smtp_utils.c b/src/smtp_utils.c
index c56397d17..3a9d62cbb 100644
--- a/src/smtp_utils.c
+++ b/src/smtp_utils.c
@@ -94,7 +94,7 @@ create_smtp_upstream_connection (struct smtp_session *session)
&session->ctx->smtp_timeout, session);
session->state = SMTP_STATE_WAIT_UPSTREAM;
session->upstream_state = SMTP_STATE_GREETING;
- register_async_event (session->s, (event_finalizer_t)smtp_upstream_finalize_connection, session, FALSE);
+ register_async_event (session->s, (event_finalizer_t)smtp_upstream_finalize_connection, session, g_quark_from_static_string ("smtp proxy"));
return TRUE;
}
diff --git a/src/worker.c b/src/worker.c
index 28bdb7100..f113e04d9 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -506,12 +506,11 @@ read_socket (f_str_t * in, void *arg)
}
/* Add task to classify to classify pool */
if (ctx->classify_pool) {
+ register_async_thread (task->s);
g_thread_pool_push (ctx->classify_pool, task, &err);
if (err != NULL) {
msg_err ("cannot pull task to the pool: %s", err->message);
- }
- else {
- register_async_thread (task->s);
+ remove_async_thread (task->s);
}
}
}
@@ -636,12 +635,14 @@ fin_task (void *arg)
}
/* Check if we have all events finished */
- task->state = WRITE_REPLY;
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_dispatcher_restore (task->dispatcher);
+ if (g_hash_table_size (task->s->events) == 0 && task->s->threads == 0) {
+ task->state = WRITE_REPLY;
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_dispatcher_restore (task->dispatcher);
+ }
}
}
@@ -655,6 +656,7 @@ restore_task (void *arg)
/* Special state */
task->state = WAIT_POST_FILTER;
+ task->s->wanna_die = TRUE;
}
/*