]> source.dussan.org Git - rspamd.git/commitdiff
Add quarks to events to determine source of event inside rspamd.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 13 Feb 2012 15:09:27 +0000 (19:09 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 13 Feb 2012 15:09:27 +0000 (19:09 +0400)
Another try to fix threading.

12 files changed:
src/dns.c
src/events.c
src/events.h
src/logger.c
src/lua/lua_http.c
src/lua/lua_redis.c
src/main.c
src/plugins/fuzzy_check.c
src/plugins/surbl.c
src/smtp.c
src/smtp_utils.c
src/worker.c

index bd048c4ac01c54c85d6dc0a8e3ef07ab614a5edc..fe087da08bed7f091fbb53998d87f5a986089c2d 100644 (file)
--- a/src/dns.c
+++ b/src/dns.c
@@ -744,7 +744,7 @@ send_dns_request (struct rspamd_dns_request *req)
                        event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req);
                        event_base_set (req->resolver->ev_base, &req->io_event);
                        event_add (&req->io_event, &req->tv);
-                       register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, FALSE);
+                       register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, g_quark_from_static_string ("dns resolver"));
                        return 0;
                } 
                else {
@@ -757,7 +757,7 @@ send_dns_request (struct rspamd_dns_request *req)
                event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req);
                event_base_set (req->resolver->ev_base, &req->io_event);
                event_add (&req->io_event, &req->tv);
-               register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, FALSE);
+               register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, g_quark_from_static_string ("dns resolver"));
                return 0;
        }
        
@@ -1343,7 +1343,7 @@ dns_retransmit_handler (gint fd, short what, void *arg)
 
                        /* Add request to hash table */
                        g_hash_table_insert (req->resolver->requests, &req->id, req);
-                       register_async_event (req->session, (event_finalizer_t)dns_fin_cb, req, FALSE);
+                       register_async_event (req->session, (event_finalizer_t)dns_fin_cb, req, g_quark_from_static_string ("dns resolver"));
                }
        }
 }
@@ -1450,7 +1450,7 @@ make_dns_request (struct rspamd_dns_resolver *resolver,
                        req->id = header->qid;
                }
                g_hash_table_insert (resolver->requests, &req->id, req);
-               register_async_event (session, (event_finalizer_t)dns_fin_cb, req, FALSE);
+               register_async_event (session, (event_finalizer_t)dns_fin_cb, req, g_quark_from_static_string ("dns resolver"));
        }
        else if (r == -1) {
                return FALSE;
index 98b492ffb3a918e495f34c8cd5e7e6b48ef11a3e..b658f48a9cede13a4160f6c589c7088e468d4296 100644 (file)
@@ -87,7 +87,7 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin,
 }
 
 void
-register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced)
+register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, GQuark subsystem)
 {
        struct rspamd_async_event      *new;
 
@@ -99,32 +99,39 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
        new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event));
        new->fin = fin;
        new->user_data = user_data;
+       new->subsystem = subsystem;
 
        g_hash_table_insert (session->events, new, new);
 #ifdef RSPAMD_EVENTS_DEBUG
-       msg_info ("added event: %p, pending %d events", user_data, g_hash_table_size (session->events));
+       msg_info ("added event: %p, pending %d events, subsystem: %s", user_data, g_hash_table_size (session->events),
+                       g_quark_to_string (subsystem));
 #endif
 }
 
 void
 remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud)
 {
-       struct rspamd_async_event       search_ev;
+       struct rspamd_async_event       search_ev, *found_ev;
 
        if (session == NULL) {
                msg_info ("session is NULL");
                return;
        }
 
+       /* Search for event */
        search_ev.fin = fin;
        search_ev.user_data = ud;
-       if (g_hash_table_remove (session->events, &search_ev)) {
+       if ((found_ev = g_hash_table_lookup (session->events, &search_ev)) != NULL) {
+               g_hash_table_remove (session->events, found_ev);
+#ifdef RSPAMD_EVENTS_DEBUG
+               msg_info ("removed event: %p, subsystem: %s, pending %d events", ud,
+                       g_quark_to_string (found_ev->subsystem), g_hash_table_size (session->events));
+#endif
+               /* Remove event */
                fin (ud);
        }
 
-#ifdef RSPAMD_EVENTS_DEBUG
-       msg_info ("removed event: %p, pending %d events", ud, g_hash_table_size (session->events));
-#endif
+
 
        check_session_pending (session);
 }
index af41cc84ab525b340bd00b8e1d98e3908ee679f7..2c4ea2c917e5fb7594cf106b298e5f09f1113cd0 100644 (file)
@@ -8,6 +8,7 @@ struct rspamd_async_event;
 typedef void (*event_finalizer_t)(void *user_data);
 
 struct rspamd_async_event {
+       GQuark subsystem;
        event_finalizer_t fin;
        void *user_data;
        guint ref;
@@ -46,7 +47,7 @@ struct rspamd_async_session *new_async_session (memory_pool_t *pool,
  * @param forced unused
  */
 void register_async_event (struct rspamd_async_session *session,
-               event_finalizer_t fin, void *user_data, gboolean forced);
+               event_finalizer_t fin, void *user_data, GQuark subsystem);
 
 /**
  * Remove normal event
index 2abdf0dfc5e2f40e84ae45196296cf399b0b95a3..dfdb5b1b5bbc68fd869d911df5e065af00e849f6 100644 (file)
@@ -60,6 +60,7 @@ struct rspamd_logger_s {
        guint32                  repeats;
        gchar                   *saved_message;
        gchar                   *saved_function;
+       GMutex                                  *mtx;
 };
 
 static const gchar lf_chr = '\n';
@@ -268,6 +269,13 @@ rspamd_set_logger (enum rspamd_log_type type, GQuark ptype, struct rspamd_main *
        rspamd->logger->pid = getpid ();
        rspamd->logger->process_type = ptype;
 
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+       rspamd->logger->mtx = g_mutex_new ();
+#else
+       rspamd->logger->mtx = g_malloc (sizeof (GMutex));
+       g_mutex_init (rspamd->logger->mtx);
+#endif
+
        switch (type) {
                case RSPAMD_LOG_CONSOLE:
                        rspamd->logger->log_func = file_log_function;
@@ -369,12 +377,14 @@ rspamd_common_log_function (rspamd_logger_t *rspamd_log, GLogLevelFlags log_leve
     u_char                         *end;
 
        if (log_level <= rspamd_log->cfg->log_level) {
+               g_mutex_lock (rspamd_log->mtx);
                va_start (vp, fmt);
                end = rspamd_vsnprintf (logbuf, sizeof (logbuf), fmt, vp);
                *end = '\0';
                (void)rspamd_escape_string (escaped_logbuf, logbuf, sizeof (escaped_logbuf));
                va_end (vp);
                rspamd_log->log_func (NULL, function, log_level, escaped_logbuf, FALSE, rspamd_log);
+               g_mutex_unlock (rspamd_log->mtx);
        }
 }
 
@@ -643,12 +653,14 @@ rspamd_conditional_debug (rspamd_logger_t *rspamd_log, guint32 addr, const gchar
        if (rspamd_log->cfg->log_level >= G_LOG_LEVEL_DEBUG || rspamd_log->is_debug ||
                        (rspamd_log->debug_ip != NULL && radix32tree_find (rspamd_log->debug_ip, ntohl (addr)) != RADIX_NO_VALUE)) {
 
+               g_mutex_lock (rspamd_log->mtx);
                va_start (vp, fmt);
                end = rspamd_vsnprintf (logbuf, sizeof (logbuf), fmt, vp);
                *end = '\0';
                (void)rspamd_escape_string (escaped_logbuf, logbuf, sizeof (escaped_logbuf));
                va_end (vp);
                rspamd_log->log_func (NULL, function, G_LOG_LEVEL_DEBUG, escaped_logbuf, TRUE, rspamd_log);
+               g_mutex_unlock (rspamd_log->mtx);
        }
 } 
 
@@ -662,8 +674,10 @@ rspamd_glib_log_function (const gchar *log_domain, GLogLevelFlags log_level, con
        rspamd_logger_t              *rspamd_log = arg;
 
        if (rspamd_log->enabled) {
+               g_mutex_lock (rspamd_log->mtx);
                (void)rspamd_escape_string (escaped_logbuf, message, sizeof (escaped_logbuf));
                rspamd_log->log_func (log_domain, NULL, log_level, escaped_logbuf, FALSE, rspamd_log);
+               g_mutex_unlock (rspamd_log->mtx);
        }
 }
 
index 16f237a369a70fe52ad376affa456547b4e868d5..68faee2e77789b5f081406b27e4111b754ab1f60 100644 (file)
@@ -295,7 +295,7 @@ lua_http_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
        ud->io_dispatcher = rspamd_create_dispatcher (ud->task->ev_base, ud->fd, BUFFER_LINE, lua_http_read_cb, NULL,
                        lua_http_err_cb, &tv, ud);
        /* Write request */
-       register_async_event (ud->task->s, lua_http_fin, ud, FALSE);
+       register_async_event (ud->task->s, lua_http_fin, ud, g_quark_from_static_string ("lua http"));
 
        if (!rspamd_dispatcher_write (ud->io_dispatcher, ud->req_buf, ud->req_len, TRUE, TRUE)) {
                lua_http_push_error (450, ud);
index 4d2469e5f943779e0dd3f4e0fdfe858fd883cb6c..a37396b7bb3a3a3b23be1b429b4516cd9797fc14 100644 (file)
@@ -206,7 +206,7 @@ lua_redis_make_request_real (struct lua_redis_userdata *ud)
                return FALSE;
        }
        else {
-               register_async_event (ud->task->s, lua_redis_fin, ud, FALSE);
+               register_async_event (ud->task->s, lua_redis_fin, ud, g_quark_from_static_string ("lua redis"));
        }
        redisLibeventAttach (ud->ctx, ud->task->ev_base);
        /* Make a request now */
index bb62099749c1a4c3f9f109f06c2b338163fd41e2..502b1f0e19e90f57a6a5994d5962efa44cf3fab0 100644 (file)
@@ -382,6 +382,9 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
                        set_worker_limits (cf);
                        setproctitle ("%s process", cf->worker->name);
                        rspamd_pidfile_close (rspamd->pfh);
+                       /* Do silent log reopen to avoid collisions */
+                       close_log (rspamd->logger);
+                       open_log (rspamd->logger);
                        msg_info ("starting %s process %P", cf->worker->name, getpid ());
                        cf->worker->worker_start_func (cur);
                        break;
index ef88cab6b3046f9416f198394ec6b1cd86553136..4f66a5731c3feb816fcb21927283eba5d251a643 100644 (file)
@@ -640,7 +640,7 @@ register_fuzzy_call (struct worker_task *task, fuzzy_hash_t *h)
                        session->fd = sock;
                        session->server = selected;
                        event_add (&session->ev, &session->tv);
-                       register_async_event (task->s, fuzzy_io_fin, session, FALSE);
+                       register_async_event (task->s, fuzzy_io_fin, session, g_quark_from_static_string ("fuzzy check"));
                }
        }
 }
@@ -773,7 +773,7 @@ register_fuzzy_controller_call (struct controller_session *session, struct worke
                        s->fd = sock;
                        event_add (&s->ev, &s->tv);
                        (*saved)++;
-                       register_async_event (session->s, fuzzy_learn_fin, s, FALSE);
+                       register_async_event (session->s, fuzzy_learn_fin, s, g_quark_from_static_string ("fuzzy check"));
                        return TRUE;
                }
        }
index 6e4cad8ce5779d1a3a4f02899123d5813bf9708c..3c4c0936329b90deaa3fb045b9eb4115564e04cd 100644 (file)
@@ -939,7 +939,7 @@ register_redirector_call (struct uri *url, struct worker_task *task,
        timeout->tv_usec = (surbl_module_ctx->connect_timeout - timeout->tv_sec * 1000) * 1000;
        event_set (&param->ev, s, EV_WRITE, redirector_callback, (void *)param);
        event_add (&param->ev, timeout);
-       register_async_event (task->s, free_redirector_session, param, FALSE);
+       register_async_event (task->s, free_redirector_session, param, g_quark_from_static_string ("surbl"));
 
        msg_info ("<%s> registered redirector call for %s to %s, according to rule: %s",
                        task->message_id, struri (url), selected->name, rule);
index 46df67b3646fb7cba0d2c2312f54a705fc384f54..605a680cd570d76403231aaa1c05770f023ee743 100644 (file)
@@ -570,7 +570,7 @@ smtp_make_delay (struct smtp_session *session)
 
                evtimer_set (tev, smtp_delay_handler, session);
                evtimer_add (tev, tv);
-               register_async_event (session->s, (event_finalizer_t)event_del, tev, FALSE);
+               register_async_event (session->s, (event_finalizer_t)event_del, tev, g_quark_from_static_string ("smtp proxy"));
                session->delay_timer = tev;
        }
        else if (session->state == SMTP_STATE_DELAY) {
index c56397d171f9ad6d133385094f2813ae2bf7bf66..3a9d62cbbf4e405dd4d665f97c7a5cd933e80c9e 100644 (file)
@@ -94,7 +94,7 @@ create_smtp_upstream_connection (struct smtp_session *session)
                                                        &session->ctx->smtp_timeout, session);
        session->state = SMTP_STATE_WAIT_UPSTREAM;
        session->upstream_state = SMTP_STATE_GREETING;
-       register_async_event (session->s, (event_finalizer_t)smtp_upstream_finalize_connection, session, FALSE);
+       register_async_event (session->s, (event_finalizer_t)smtp_upstream_finalize_connection, session, g_quark_from_static_string ("smtp proxy"));
        return TRUE;
 }
 
index 28bdb7100e13c5f9f0ffec6cf35ed2332b1a6abd..f113e04d980702329a30283967e0729fd035d0ee 100644 (file)
@@ -506,12 +506,11 @@ read_socket (f_str_t * in, void *arg)
                        }
                        /* Add task to classify to classify pool */
                        if (ctx->classify_pool) {
+                               register_async_thread (task->s);
                                g_thread_pool_push (ctx->classify_pool, task, &err);
                                if (err != NULL) {
                                        msg_err ("cannot pull task to the pool: %s", err->message);
-                               }
-                               else {
-                                       register_async_thread (task->s);
+                                       remove_async_thread (task->s);
                                }
                        }
                }
@@ -636,12 +635,14 @@ fin_task (void *arg)
        }
 
        /* Check if we have all events finished */
-       task->state = WRITE_REPLY;
-       if (task->fin_callback) {
-               task->fin_callback (task->fin_arg);
-       }
-       else {
-               rspamd_dispatcher_restore (task->dispatcher);
+       if (g_hash_table_size (task->s->events) == 0 && task->s->threads == 0) {
+               task->state = WRITE_REPLY;
+               if (task->fin_callback) {
+                       task->fin_callback (task->fin_arg);
+               }
+               else {
+                       rspamd_dispatcher_restore (task->dispatcher);
+               }
        }
 }
 
@@ -655,6 +656,7 @@ restore_task (void *arg)
 
        /* Special state */
        task->state = WAIT_POST_FILTER;
+       task->s->wanna_die = TRUE;
 }
 
 /*