Another try to fix threading.
event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req);
event_base_set (req->resolver->ev_base, &req->io_event);
event_add (&req->io_event, &req->tv);
- register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, FALSE);
+ register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, g_quark_from_static_string ("dns resolver"));
return 0;
}
else {
event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req);
event_base_set (req->resolver->ev_base, &req->io_event);
event_add (&req->io_event, &req->tv);
- register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, FALSE);
+ register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, g_quark_from_static_string ("dns resolver"));
return 0;
}
/* Add request to hash table */
g_hash_table_insert (req->resolver->requests, &req->id, req);
- register_async_event (req->session, (event_finalizer_t)dns_fin_cb, req, FALSE);
+ register_async_event (req->session, (event_finalizer_t)dns_fin_cb, req, g_quark_from_static_string ("dns resolver"));
}
}
}
req->id = header->qid;
}
g_hash_table_insert (resolver->requests, &req->id, req);
- register_async_event (session, (event_finalizer_t)dns_fin_cb, req, FALSE);
+ register_async_event (session, (event_finalizer_t)dns_fin_cb, req, g_quark_from_static_string ("dns resolver"));
}
else if (r == -1) {
return FALSE;
}
void
-register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced)
+register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, GQuark subsystem)
{
struct rspamd_async_event *new;
new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event));
new->fin = fin;
new->user_data = user_data;
+ new->subsystem = subsystem;
g_hash_table_insert (session->events, new, new);
#ifdef RSPAMD_EVENTS_DEBUG
- msg_info ("added event: %p, pending %d events", user_data, g_hash_table_size (session->events));
+ msg_info ("added event: %p, pending %d events, subsystem: %s", user_data, g_hash_table_size (session->events),
+ g_quark_to_string (subsystem));
#endif
}
void
remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud)
{
- struct rspamd_async_event search_ev;
+ struct rspamd_async_event search_ev, *found_ev;
if (session == NULL) {
msg_info ("session is NULL");
return;
}
+ /* Search for event */
search_ev.fin = fin;
search_ev.user_data = ud;
- if (g_hash_table_remove (session->events, &search_ev)) {
+ if ((found_ev = g_hash_table_lookup (session->events, &search_ev)) != NULL) {
+ g_hash_table_remove (session->events, found_ev);
+#ifdef RSPAMD_EVENTS_DEBUG
+ msg_info ("removed event: %p, subsystem: %s, pending %d events", ud,
+ g_quark_to_string (found_ev->subsystem), g_hash_table_size (session->events));
+#endif
+ /* Remove event */
fin (ud);
}
-#ifdef RSPAMD_EVENTS_DEBUG
- msg_info ("removed event: %p, pending %d events", ud, g_hash_table_size (session->events));
-#endif
+
check_session_pending (session);
}
typedef void (*event_finalizer_t)(void *user_data);
struct rspamd_async_event {
+ GQuark subsystem;
event_finalizer_t fin;
void *user_data;
guint ref;
* @param forced unused
*/
void register_async_event (struct rspamd_async_session *session,
- event_finalizer_t fin, void *user_data, gboolean forced);
+ event_finalizer_t fin, void *user_data, GQuark subsystem);
/**
* Remove normal event
guint32 repeats;
gchar *saved_message;
gchar *saved_function;
+ GMutex *mtx;
};
static const gchar lf_chr = '\n';
rspamd->logger->pid = getpid ();
rspamd->logger->process_type = ptype;
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ rspamd->logger->mtx = g_mutex_new ();
+#else
+ rspamd->logger->mtx = g_malloc (sizeof (GMutex));
+ g_mutex_init (rspamd->logger->mtx);
+#endif
+
switch (type) {
case RSPAMD_LOG_CONSOLE:
rspamd->logger->log_func = file_log_function;
u_char *end;
if (log_level <= rspamd_log->cfg->log_level) {
+ g_mutex_lock (rspamd_log->mtx);
va_start (vp, fmt);
end = rspamd_vsnprintf (logbuf, sizeof (logbuf), fmt, vp);
*end = '\0';
(void)rspamd_escape_string (escaped_logbuf, logbuf, sizeof (escaped_logbuf));
va_end (vp);
rspamd_log->log_func (NULL, function, log_level, escaped_logbuf, FALSE, rspamd_log);
+ g_mutex_unlock (rspamd_log->mtx);
}
}
if (rspamd_log->cfg->log_level >= G_LOG_LEVEL_DEBUG || rspamd_log->is_debug ||
(rspamd_log->debug_ip != NULL && radix32tree_find (rspamd_log->debug_ip, ntohl (addr)) != RADIX_NO_VALUE)) {
+ g_mutex_lock (rspamd_log->mtx);
va_start (vp, fmt);
end = rspamd_vsnprintf (logbuf, sizeof (logbuf), fmt, vp);
*end = '\0';
(void)rspamd_escape_string (escaped_logbuf, logbuf, sizeof (escaped_logbuf));
va_end (vp);
rspamd_log->log_func (NULL, function, G_LOG_LEVEL_DEBUG, escaped_logbuf, TRUE, rspamd_log);
+ g_mutex_unlock (rspamd_log->mtx);
}
}
rspamd_logger_t *rspamd_log = arg;
if (rspamd_log->enabled) {
+ g_mutex_lock (rspamd_log->mtx);
(void)rspamd_escape_string (escaped_logbuf, message, sizeof (escaped_logbuf));
rspamd_log->log_func (log_domain, NULL, log_level, escaped_logbuf, FALSE, rspamd_log);
+ g_mutex_unlock (rspamd_log->mtx);
}
}
ud->io_dispatcher = rspamd_create_dispatcher (ud->task->ev_base, ud->fd, BUFFER_LINE, lua_http_read_cb, NULL,
lua_http_err_cb, &tv, ud);
/* Write request */
- register_async_event (ud->task->s, lua_http_fin, ud, FALSE);
+ register_async_event (ud->task->s, lua_http_fin, ud, g_quark_from_static_string ("lua http"));
if (!rspamd_dispatcher_write (ud->io_dispatcher, ud->req_buf, ud->req_len, TRUE, TRUE)) {
lua_http_push_error (450, ud);
return FALSE;
}
else {
- register_async_event (ud->task->s, lua_redis_fin, ud, FALSE);
+ register_async_event (ud->task->s, lua_redis_fin, ud, g_quark_from_static_string ("lua redis"));
}
redisLibeventAttach (ud->ctx, ud->task->ev_base);
/* Make a request now */
set_worker_limits (cf);
setproctitle ("%s process", cf->worker->name);
rspamd_pidfile_close (rspamd->pfh);
+ /* Do silent log reopen to avoid collisions */
+ close_log (rspamd->logger);
+ open_log (rspamd->logger);
msg_info ("starting %s process %P", cf->worker->name, getpid ());
cf->worker->worker_start_func (cur);
break;
session->fd = sock;
session->server = selected;
event_add (&session->ev, &session->tv);
- register_async_event (task->s, fuzzy_io_fin, session, FALSE);
+ register_async_event (task->s, fuzzy_io_fin, session, g_quark_from_static_string ("fuzzy check"));
}
}
}
s->fd = sock;
event_add (&s->ev, &s->tv);
(*saved)++;
- register_async_event (session->s, fuzzy_learn_fin, s, FALSE);
+ register_async_event (session->s, fuzzy_learn_fin, s, g_quark_from_static_string ("fuzzy check"));
return TRUE;
}
}
timeout->tv_usec = (surbl_module_ctx->connect_timeout - timeout->tv_sec * 1000) * 1000;
event_set (¶m->ev, s, EV_WRITE, redirector_callback, (void *)param);
event_add (¶m->ev, timeout);
- register_async_event (task->s, free_redirector_session, param, FALSE);
+ register_async_event (task->s, free_redirector_session, param, g_quark_from_static_string ("surbl"));
msg_info ("<%s> registered redirector call for %s to %s, according to rule: %s",
task->message_id, struri (url), selected->name, rule);
evtimer_set (tev, smtp_delay_handler, session);
evtimer_add (tev, tv);
- register_async_event (session->s, (event_finalizer_t)event_del, tev, FALSE);
+ register_async_event (session->s, (event_finalizer_t)event_del, tev, g_quark_from_static_string ("smtp proxy"));
session->delay_timer = tev;
}
else if (session->state == SMTP_STATE_DELAY) {
&session->ctx->smtp_timeout, session);
session->state = SMTP_STATE_WAIT_UPSTREAM;
session->upstream_state = SMTP_STATE_GREETING;
- register_async_event (session->s, (event_finalizer_t)smtp_upstream_finalize_connection, session, FALSE);
+ register_async_event (session->s, (event_finalizer_t)smtp_upstream_finalize_connection, session, g_quark_from_static_string ("smtp proxy"));
return TRUE;
}
}
/* Add task to classify to classify pool */
if (ctx->classify_pool) {
+ register_async_thread (task->s);
g_thread_pool_push (ctx->classify_pool, task, &err);
if (err != NULL) {
msg_err ("cannot pull task to the pool: %s", err->message);
- }
- else {
- register_async_thread (task->s);
+ remove_async_thread (task->s);
}
}
}
}
/* Check if we have all events finished */
- task->state = WRITE_REPLY;
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_dispatcher_restore (task->dispatcher);
+ if (g_hash_table_size (task->s->events) == 0 && task->s->threads == 0) {
+ task->state = WRITE_REPLY;
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_dispatcher_restore (task->dispatcher);
+ }
}
}
/* Special state */
task->state = WAIT_POST_FILTER;
+ task->s->wanna_die = TRUE;
}
/*