From 0d64c808b7310b6e233ec570649fbb281a3f2b13 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 13 Feb 2012 19:09:27 +0400 Subject: [PATCH] Add quarks to events to determine source of event inside rspamd. Another try to fix threading. --- src/dns.c | 8 ++++---- src/events.c | 21 ++++++++++++++------- src/events.h | 3 ++- src/logger.c | 14 ++++++++++++++ src/lua/lua_http.c | 2 +- src/lua/lua_redis.c | 2 +- src/main.c | 3 +++ src/plugins/fuzzy_check.c | 4 ++-- src/plugins/surbl.c | 2 +- src/smtp.c | 2 +- src/smtp_utils.c | 2 +- src/worker.c | 20 +++++++++++--------- 12 files changed, 55 insertions(+), 28 deletions(-) diff --git a/src/dns.c b/src/dns.c index bd048c4ac..fe087da08 100644 --- a/src/dns.c +++ b/src/dns.c @@ -744,7 +744,7 @@ send_dns_request (struct rspamd_dns_request *req) event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req); event_base_set (req->resolver->ev_base, &req->io_event); event_add (&req->io_event, &req->tv); - register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, FALSE); + register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, g_quark_from_static_string ("dns resolver")); return 0; } else { @@ -757,7 +757,7 @@ send_dns_request (struct rspamd_dns_request *req) event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req); event_base_set (req->resolver->ev_base, &req->io_event); event_add (&req->io_event, &req->tv); - register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, FALSE); + register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, g_quark_from_static_string ("dns resolver")); return 0; } @@ -1343,7 +1343,7 @@ dns_retransmit_handler (gint fd, short what, void *arg) /* Add request to hash table */ g_hash_table_insert (req->resolver->requests, &req->id, req); - register_async_event (req->session, (event_finalizer_t)dns_fin_cb, req, FALSE); + register_async_event (req->session, (event_finalizer_t)dns_fin_cb, req, g_quark_from_static_string ("dns resolver")); } } } @@ -1450,7 +1450,7 @@ make_dns_request (struct rspamd_dns_resolver *resolver, req->id = header->qid; } g_hash_table_insert (resolver->requests, &req->id, req); - register_async_event (session, (event_finalizer_t)dns_fin_cb, req, FALSE); + register_async_event (session, (event_finalizer_t)dns_fin_cb, req, g_quark_from_static_string ("dns resolver")); } else if (r == -1) { return FALSE; diff --git a/src/events.c b/src/events.c index 98b492ffb..b658f48a9 100644 --- a/src/events.c +++ b/src/events.c @@ -87,7 +87,7 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin, } void -register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced) +register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, GQuark subsystem) { struct rspamd_async_event *new; @@ -99,32 +99,39 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event)); new->fin = fin; new->user_data = user_data; + new->subsystem = subsystem; g_hash_table_insert (session->events, new, new); #ifdef RSPAMD_EVENTS_DEBUG - msg_info ("added event: %p, pending %d events", user_data, g_hash_table_size (session->events)); + msg_info ("added event: %p, pending %d events, subsystem: %s", user_data, g_hash_table_size (session->events), + g_quark_to_string (subsystem)); #endif } void remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud) { - struct rspamd_async_event search_ev; + struct rspamd_async_event search_ev, *found_ev; if (session == NULL) { msg_info ("session is NULL"); return; } + /* Search for event */ search_ev.fin = fin; search_ev.user_data = ud; - if (g_hash_table_remove (session->events, &search_ev)) { + if ((found_ev = g_hash_table_lookup (session->events, &search_ev)) != NULL) { + g_hash_table_remove (session->events, found_ev); +#ifdef RSPAMD_EVENTS_DEBUG + msg_info ("removed event: %p, subsystem: %s, pending %d events", ud, + g_quark_to_string (found_ev->subsystem), g_hash_table_size (session->events)); +#endif + /* Remove event */ fin (ud); } -#ifdef RSPAMD_EVENTS_DEBUG - msg_info ("removed event: %p, pending %d events", ud, g_hash_table_size (session->events)); -#endif + check_session_pending (session); } diff --git a/src/events.h b/src/events.h index af41cc84a..2c4ea2c91 100644 --- a/src/events.h +++ b/src/events.h @@ -8,6 +8,7 @@ struct rspamd_async_event; typedef void (*event_finalizer_t)(void *user_data); struct rspamd_async_event { + GQuark subsystem; event_finalizer_t fin; void *user_data; guint ref; @@ -46,7 +47,7 @@ struct rspamd_async_session *new_async_session (memory_pool_t *pool, * @param forced unused */ void register_async_event (struct rspamd_async_session *session, - event_finalizer_t fin, void *user_data, gboolean forced); + event_finalizer_t fin, void *user_data, GQuark subsystem); /** * Remove normal event diff --git a/src/logger.c b/src/logger.c index 2abdf0dfc..dfdb5b1b5 100644 --- a/src/logger.c +++ b/src/logger.c @@ -60,6 +60,7 @@ struct rspamd_logger_s { guint32 repeats; gchar *saved_message; gchar *saved_function; + GMutex *mtx; }; static const gchar lf_chr = '\n'; @@ -268,6 +269,13 @@ rspamd_set_logger (enum rspamd_log_type type, GQuark ptype, struct rspamd_main * rspamd->logger->pid = getpid (); rspamd->logger->process_type = ptype; +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + rspamd->logger->mtx = g_mutex_new (); +#else + rspamd->logger->mtx = g_malloc (sizeof (GMutex)); + g_mutex_init (rspamd->logger->mtx); +#endif + switch (type) { case RSPAMD_LOG_CONSOLE: rspamd->logger->log_func = file_log_function; @@ -369,12 +377,14 @@ rspamd_common_log_function (rspamd_logger_t *rspamd_log, GLogLevelFlags log_leve u_char *end; if (log_level <= rspamd_log->cfg->log_level) { + g_mutex_lock (rspamd_log->mtx); va_start (vp, fmt); end = rspamd_vsnprintf (logbuf, sizeof (logbuf), fmt, vp); *end = '\0'; (void)rspamd_escape_string (escaped_logbuf, logbuf, sizeof (escaped_logbuf)); va_end (vp); rspamd_log->log_func (NULL, function, log_level, escaped_logbuf, FALSE, rspamd_log); + g_mutex_unlock (rspamd_log->mtx); } } @@ -643,12 +653,14 @@ rspamd_conditional_debug (rspamd_logger_t *rspamd_log, guint32 addr, const gchar if (rspamd_log->cfg->log_level >= G_LOG_LEVEL_DEBUG || rspamd_log->is_debug || (rspamd_log->debug_ip != NULL && radix32tree_find (rspamd_log->debug_ip, ntohl (addr)) != RADIX_NO_VALUE)) { + g_mutex_lock (rspamd_log->mtx); va_start (vp, fmt); end = rspamd_vsnprintf (logbuf, sizeof (logbuf), fmt, vp); *end = '\0'; (void)rspamd_escape_string (escaped_logbuf, logbuf, sizeof (escaped_logbuf)); va_end (vp); rspamd_log->log_func (NULL, function, G_LOG_LEVEL_DEBUG, escaped_logbuf, TRUE, rspamd_log); + g_mutex_unlock (rspamd_log->mtx); } } @@ -662,8 +674,10 @@ rspamd_glib_log_function (const gchar *log_domain, GLogLevelFlags log_level, con rspamd_logger_t *rspamd_log = arg; if (rspamd_log->enabled) { + g_mutex_lock (rspamd_log->mtx); (void)rspamd_escape_string (escaped_logbuf, message, sizeof (escaped_logbuf)); rspamd_log->log_func (log_domain, NULL, log_level, escaped_logbuf, FALSE, rspamd_log); + g_mutex_unlock (rspamd_log->mtx); } } diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c index 16f237a36..68faee2e7 100644 --- a/src/lua/lua_http.c +++ b/src/lua/lua_http.c @@ -295,7 +295,7 @@ lua_http_dns_callback (struct rspamd_dns_reply *reply, gpointer arg) ud->io_dispatcher = rspamd_create_dispatcher (ud->task->ev_base, ud->fd, BUFFER_LINE, lua_http_read_cb, NULL, lua_http_err_cb, &tv, ud); /* Write request */ - register_async_event (ud->task->s, lua_http_fin, ud, FALSE); + register_async_event (ud->task->s, lua_http_fin, ud, g_quark_from_static_string ("lua http")); if (!rspamd_dispatcher_write (ud->io_dispatcher, ud->req_buf, ud->req_len, TRUE, TRUE)) { lua_http_push_error (450, ud); diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index 4d2469e5f..a37396b7b 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -206,7 +206,7 @@ lua_redis_make_request_real (struct lua_redis_userdata *ud) return FALSE; } else { - register_async_event (ud->task->s, lua_redis_fin, ud, FALSE); + register_async_event (ud->task->s, lua_redis_fin, ud, g_quark_from_static_string ("lua redis")); } redisLibeventAttach (ud->ctx, ud->task->ev_base); /* Make a request now */ diff --git a/src/main.c b/src/main.c index bb6209974..502b1f0e1 100644 --- a/src/main.c +++ b/src/main.c @@ -382,6 +382,9 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf) set_worker_limits (cf); setproctitle ("%s process", cf->worker->name); rspamd_pidfile_close (rspamd->pfh); + /* Do silent log reopen to avoid collisions */ + close_log (rspamd->logger); + open_log (rspamd->logger); msg_info ("starting %s process %P", cf->worker->name, getpid ()); cf->worker->worker_start_func (cur); break; diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index ef88cab6b..4f66a5731 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -640,7 +640,7 @@ register_fuzzy_call (struct worker_task *task, fuzzy_hash_t *h) session->fd = sock; session->server = selected; event_add (&session->ev, &session->tv); - register_async_event (task->s, fuzzy_io_fin, session, FALSE); + register_async_event (task->s, fuzzy_io_fin, session, g_quark_from_static_string ("fuzzy check")); } } } @@ -773,7 +773,7 @@ register_fuzzy_controller_call (struct controller_session *session, struct worke s->fd = sock; event_add (&s->ev, &s->tv); (*saved)++; - register_async_event (session->s, fuzzy_learn_fin, s, FALSE); + register_async_event (session->s, fuzzy_learn_fin, s, g_quark_from_static_string ("fuzzy check")); return TRUE; } } diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c index 6e4cad8ce..3c4c09363 100644 --- a/src/plugins/surbl.c +++ b/src/plugins/surbl.c @@ -939,7 +939,7 @@ register_redirector_call (struct uri *url, struct worker_task *task, timeout->tv_usec = (surbl_module_ctx->connect_timeout - timeout->tv_sec * 1000) * 1000; event_set (¶m->ev, s, EV_WRITE, redirector_callback, (void *)param); event_add (¶m->ev, timeout); - register_async_event (task->s, free_redirector_session, param, FALSE); + register_async_event (task->s, free_redirector_session, param, g_quark_from_static_string ("surbl")); msg_info ("<%s> registered redirector call for %s to %s, according to rule: %s", task->message_id, struri (url), selected->name, rule); diff --git a/src/smtp.c b/src/smtp.c index 46df67b36..605a680cd 100644 --- a/src/smtp.c +++ b/src/smtp.c @@ -570,7 +570,7 @@ smtp_make_delay (struct smtp_session *session) evtimer_set (tev, smtp_delay_handler, session); evtimer_add (tev, tv); - register_async_event (session->s, (event_finalizer_t)event_del, tev, FALSE); + register_async_event (session->s, (event_finalizer_t)event_del, tev, g_quark_from_static_string ("smtp proxy")); session->delay_timer = tev; } else if (session->state == SMTP_STATE_DELAY) { diff --git a/src/smtp_utils.c b/src/smtp_utils.c index c56397d17..3a9d62cbb 100644 --- a/src/smtp_utils.c +++ b/src/smtp_utils.c @@ -94,7 +94,7 @@ create_smtp_upstream_connection (struct smtp_session *session) &session->ctx->smtp_timeout, session); session->state = SMTP_STATE_WAIT_UPSTREAM; session->upstream_state = SMTP_STATE_GREETING; - register_async_event (session->s, (event_finalizer_t)smtp_upstream_finalize_connection, session, FALSE); + register_async_event (session->s, (event_finalizer_t)smtp_upstream_finalize_connection, session, g_quark_from_static_string ("smtp proxy")); return TRUE; } diff --git a/src/worker.c b/src/worker.c index 28bdb7100..f113e04d9 100644 --- a/src/worker.c +++ b/src/worker.c @@ -506,12 +506,11 @@ read_socket (f_str_t * in, void *arg) } /* Add task to classify to classify pool */ if (ctx->classify_pool) { + register_async_thread (task->s); g_thread_pool_push (ctx->classify_pool, task, &err); if (err != NULL) { msg_err ("cannot pull task to the pool: %s", err->message); - } - else { - register_async_thread (task->s); + remove_async_thread (task->s); } } } @@ -636,12 +635,14 @@ fin_task (void *arg) } /* Check if we have all events finished */ - task->state = WRITE_REPLY; - if (task->fin_callback) { - task->fin_callback (task->fin_arg); - } - else { - rspamd_dispatcher_restore (task->dispatcher); + if (g_hash_table_size (task->s->events) == 0 && task->s->threads == 0) { + task->state = WRITE_REPLY; + if (task->fin_callback) { + task->fin_callback (task->fin_arg); + } + else { + rspamd_dispatcher_restore (task->dispatcher); + } } } @@ -655,6 +656,7 @@ restore_task (void *arg) /* Special state */ task->state = WAIT_POST_FILTER; + task->s->wanna_die = TRUE; } /* -- 2.39.5