Fix several issues in threads handling inside keystorage. Fix sigsuspend usage in keystorage.tags/0.4.7
} | } | ||||
/* Consume and discard vararg */ | /* Consume and discard vararg */ | ||||
va_arg(ap,void); | |||||
(void)va_arg(ap,int); | |||||
} | } | ||||
} | } | ||||
new->name = g_malloc (sizeof ("18446744073709551616")); | new->name = g_malloc (sizeof ("18446744073709551616")); | ||||
rspamd_snprintf (new->name, sizeof ("18446744073709551616"), "%d", id); | rspamd_snprintf (new->name, sizeof ("18446744073709551616"), "%d", id); | ||||
} | } | ||||
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) | |||||
g_rw_lock_init (&new->rwlock); | |||||
#else | |||||
g_static_rw_lock_init (&new->rwlock); | g_static_rw_lock_init (&new->rwlock); | ||||
#endif | |||||
/* Init structures */ | /* Init structures */ | ||||
if (new->cache->init_func) { | if (new->cache->init_func) { | ||||
gint steps = 0; | gint steps = 0; | ||||
struct rspamd_kv_element *elt; | struct rspamd_kv_element *elt; | ||||
g_static_rw_lock_writer_lock (&storage->rwlock); | |||||
RW_W_LOCK (&storage->rwlock); | |||||
/* Hard limit */ | /* Hard limit */ | ||||
if (storage->max_memory > 0) { | if (storage->max_memory > 0) { | ||||
if (len > storage->max_memory) { | if (len > storage->max_memory) { | ||||
msg_info ("<%s>: trying to insert value of length %z while limit is %z", storage->name, | msg_info ("<%s>: trying to insert value of length %z while limit is %z", storage->name, | ||||
len, storage->max_memory); | len, storage->max_memory); | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
return FALSE; | return FALSE; | ||||
} | } | ||||
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); | msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); | ||||
} | } | ||||
if (++steps > MAX_EXPIRE_STEPS) { | if (++steps > MAX_EXPIRE_STEPS) { | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); | msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); | ||||
return FALSE; | return FALSE; | ||||
} | } | ||||
storage->elts ++; | storage->elts ++; | ||||
storage->memory += ELT_SIZE (elt); | storage->memory += ELT_SIZE (elt); | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
return TRUE; | return TRUE; | ||||
} | } | ||||
glong longval; | glong longval; | ||||
/* Hard limit */ | /* Hard limit */ | ||||
g_static_rw_lock_writer_lock (&storage->rwlock); | |||||
RW_W_LOCK (&storage->rwlock); | |||||
if (storage->max_memory > 0) { | if (storage->max_memory > 0) { | ||||
if (len + sizeof (struct rspamd_kv_element) + keylen >= storage->max_memory) { | if (len + sizeof (struct rspamd_kv_element) + keylen >= storage->max_memory) { | ||||
msg_warn ("<%s>: trying to insert value of length %z while limit is %z", storage->name, | msg_warn ("<%s>: trying to insert value of length %z while limit is %z", storage->name, | ||||
len, storage->max_memory); | len, storage->max_memory); | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
return FALSE; | return FALSE; | ||||
} | } | ||||
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); | msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); | ||||
} | } | ||||
if (++steps > MAX_EXPIRE_STEPS) { | if (++steps > MAX_EXPIRE_STEPS) { | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); | msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); | ||||
return FALSE; | return FALSE; | ||||
} | } | ||||
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); | msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); | ||||
} | } | ||||
if (++steps > MAX_EXPIRE_STEPS) { | if (++steps > MAX_EXPIRE_STEPS) { | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); | msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); | ||||
return FALSE; | return FALSE; | ||||
} | } | ||||
/* Just do incref and nothing more */ | /* Just do incref and nothing more */ | ||||
if (storage->backend && storage->backend->incref_func) { | if (storage->backend && storage->backend->incref_func) { | ||||
if (storage->backend->incref_func (storage->backend, key, keylen)) { | if (storage->backend->incref_func (storage->backend, key, keylen)) { | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
return TRUE; | return TRUE; | ||||
} | } | ||||
else { | else { | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
return FALSE; | return FALSE; | ||||
} | } | ||||
} | } | ||||
else { | else { | ||||
elt = storage->cache->insert_func (storage->cache, key, keylen, data, len); | elt = storage->cache->insert_func (storage->cache, key, keylen, data, len); | ||||
if (elt == NULL) { | if (elt == NULL) { | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
return FALSE; | return FALSE; | ||||
} | } | ||||
} | } | ||||
storage->elts ++; | storage->elts ++; | ||||
storage->memory += ELT_SIZE (elt); | storage->memory += ELT_SIZE (elt); | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
return res; | return res; | ||||
} | } | ||||
/* Now check limits */ | /* Now check limits */ | ||||
while (storage->memory + ELT_SIZE (elt) > storage->max_memory) { | while (storage->memory + ELT_SIZE (elt) > storage->max_memory) { | ||||
if (storage->expire) { | if (storage->expire) { | ||||
g_static_rw_lock_writer_lock (&storage->rwlock); | |||||
RW_W_LOCK (&storage->rwlock); | |||||
storage->expire->step_func (storage->expire, storage, time (NULL), steps); | storage->expire->step_func (storage->expire, storage, time (NULL), steps); | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
} | } | ||||
else { | else { | ||||
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); | msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); | ||||
} | } | ||||
} | } | ||||
g_static_rw_lock_writer_lock (&storage->rwlock); | |||||
RW_W_LOCK (&storage->rwlock); | |||||
/* Insert elt to the cache */ | /* Insert elt to the cache */ | ||||
res = storage->cache->replace_func (storage->cache, key, keylen, elt); | res = storage->cache->replace_func (storage->cache, key, keylen, elt); | ||||
if (res && storage->backend) { | if (res && storage->backend) { | ||||
res = storage->backend->replace_func (storage->backend, key, keylen, elt); | res = storage->backend->replace_func (storage->backend, key, keylen, elt); | ||||
} | } | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
return res; | return res; | ||||
} | } | ||||
glong *lp; | glong *lp; | ||||
/* First try to look at cache */ | /* First try to look at cache */ | ||||
g_static_rw_lock_writer_lock (&storage->rwlock); | |||||
RW_W_LOCK (&storage->rwlock); | |||||
elt = storage->cache->lookup_func (storage->cache, key, keylen); | elt = storage->cache->lookup_func (storage->cache, key, keylen); | ||||
if (elt == NULL && storage->backend) { | if (elt == NULL && storage->backend) { | ||||
if (belt) { | if (belt) { | ||||
/* Put this element into cache */ | /* Put this element into cache */ | ||||
if ((belt->flags & KV_ELT_INTEGER) != 0) { | if ((belt->flags & KV_ELT_INTEGER) != 0) { | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
rspamd_kv_storage_insert_cache (storage, ELT_KEY (belt), keylen, ELT_DATA (belt), | rspamd_kv_storage_insert_cache (storage, ELT_KEY (belt), keylen, ELT_DATA (belt), | ||||
belt->size, belt->flags, | belt->size, belt->flags, | ||||
belt->expire, &elt); | belt->expire, &elt); | ||||
g_static_rw_lock_writer_lock (&storage->rwlock); | |||||
RW_W_LOCK (&storage->rwlock); | |||||
} | } | ||||
if ((belt->flags & KV_ELT_DIRTY) == 0) { | if ((belt->flags & KV_ELT_DIRTY) == 0) { | ||||
g_free (belt); | g_free (belt); | ||||
elt->age = time (NULL); | elt->age = time (NULL); | ||||
if (storage->backend) { | if (storage->backend) { | ||||
if (storage->backend->replace_func (storage->backend, key, keylen, elt)) { | if (storage->backend->replace_func (storage->backend, key, keylen, elt)) { | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
return TRUE; | return TRUE; | ||||
} | } | ||||
else { | else { | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
return FALSE; | return FALSE; | ||||
} | } | ||||
} | } | ||||
else { | else { | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
return TRUE; | return TRUE; | ||||
} | } | ||||
} | } | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
return FALSE; | return FALSE; | ||||
} | } | ||||
struct rspamd_kv_element *elt = NULL, *belt; | struct rspamd_kv_element *elt = NULL, *belt; | ||||
/* First try to look at cache */ | /* First try to look at cache */ | ||||
g_static_rw_lock_reader_lock (&storage->rwlock); | |||||
RW_R_LOCK (&storage->rwlock); | |||||
elt = storage->cache->lookup_func (storage->cache, key, keylen); | elt = storage->cache->lookup_func (storage->cache, key, keylen); | ||||
/* Next look at the backend */ | /* Next look at the backend */ | ||||
struct rspamd_kv_element *elt; | struct rspamd_kv_element *elt; | ||||
/* First delete key from cache */ | /* First delete key from cache */ | ||||
g_static_rw_lock_writer_lock (&storage->rwlock); | |||||
RW_W_LOCK (&storage->rwlock); | |||||
elt = storage->cache->delete_func (storage->cache, key, keylen); | elt = storage->cache->delete_func (storage->cache, key, keylen); | ||||
/* Now delete from backend */ | /* Now delete from backend */ | ||||
} | } | ||||
} | } | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
return elt; | return elt; | ||||
} | } | ||||
void | void | ||||
rspamd_kv_storage_destroy (struct rspamd_kv_storage *storage) | rspamd_kv_storage_destroy (struct rspamd_kv_storage *storage) | ||||
{ | { | ||||
g_static_rw_lock_writer_lock (&storage->rwlock); | |||||
RW_W_LOCK (&storage->rwlock); | |||||
if (storage->backend && storage->backend->destroy_func) { | if (storage->backend && storage->backend->destroy_func) { | ||||
storage->backend->destroy_func (storage->backend); | storage->backend->destroy_func (storage->backend); | ||||
} | } | ||||
g_free (storage->name); | g_free (storage->name); | ||||
g_static_rw_lock_writer_unlock (&storage->rwlock); | |||||
RW_W_UNLOCK (&storage->rwlock); | |||||
g_slice_free1 (sizeof (struct rspamd_kv_storage), storage); | g_slice_free1 (sizeof (struct rspamd_kv_storage), storage); | ||||
} | } | ||||
struct rspamd_kv_expire; | struct rspamd_kv_expire; | ||||
struct rspamd_kv_element; | struct rspamd_kv_element; | ||||
/* Locking definitions */ | |||||
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) | |||||
#define RW_R_LOCK g_rw_lock_reader_lock | |||||
#define RW_R_UNLOCK g_rw_lock_reader_unlock | |||||
#define RW_W_LOCK g_rw_lock_writer_lock | |||||
#define RW_W_UNLOCK g_rw_lock_writer_unlock | |||||
#else | |||||
#define RW_R_LOCK g_static_rw_lock_reader_lock | |||||
#define RW_R_UNLOCK g_static_rw_lock_reader_unlock | |||||
#define RW_W_LOCK g_static_rw_lock_writer_lock | |||||
#define RW_W_UNLOCK g_static_rw_lock_writer_unlock | |||||
#endif | |||||
/* Callbacks for cache */ | /* Callbacks for cache */ | ||||
typedef void (*cache_init)(struct rspamd_kv_cache *cache); | typedef void (*cache_init)(struct rspamd_kv_cache *cache); | ||||
typedef struct rspamd_kv_element* (*cache_insert)(struct rspamd_kv_cache *cache, | typedef struct rspamd_kv_element* (*cache_insert)(struct rspamd_kv_cache *cache, | ||||
gchar *name; /* numeric ID */ | gchar *name; /* numeric ID */ | ||||
gboolean no_overwrite; /* do not overwrite data with the same keys */ | gboolean no_overwrite; /* do not overwrite data with the same keys */ | ||||
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) | |||||
GRWLock rwlock; /* rwlock in new glib */ | |||||
#else | |||||
GStaticRWLock rwlock; /* rwlock for threaded access */ | GStaticRWLock rwlock; /* rwlock for threaded access */ | ||||
#endif | |||||
}; | }; | ||||
/** Create new kv storage */ | /** Create new kv storage */ |
/* Logging functions */ | /* Logging functions */ | ||||
#define thr_err(...) do { \ | #define thr_err(...) do { \ | ||||
g_static_mutex_lock (thr->log_mtx); \ | |||||
g_mutex_lock (thr->log_mtx); \ | |||||
rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_CRITICAL, __FUNCTION__, __VA_ARGS__); \ | rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_CRITICAL, __FUNCTION__, __VA_ARGS__); \ | ||||
g_static_mutex_unlock (thr->log_mtx); \ | |||||
g_mutex_unlock (thr->log_mtx); \ | |||||
} while (0) | } while (0) | ||||
#define thr_warn(...) do { \ | #define thr_warn(...) do { \ | ||||
g_static_mutex_lock (thr->log_mtx); \ | |||||
g_mutex_lock (thr->log_mtx); \ | |||||
rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_WARNING, __FUNCTION__, __VA_ARGS__); \ | rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_WARNING, __FUNCTION__, __VA_ARGS__); \ | ||||
g_static_mutex_unlock (thr->log_mtx); \ | |||||
g_mutex_unlock (thr->log_mtx); \ | |||||
} while (0) | } while (0) | ||||
#define thr_info(...) do { \ | #define thr_info(...) do { \ | ||||
g_static_mutex_lock (thr->log_mtx); \ | |||||
g_mutex_lock (thr->log_mtx); \ | |||||
rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_INFO, __FUNCTION__, __VA_ARGS__); \ | rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_INFO, __FUNCTION__, __VA_ARGS__); \ | ||||
g_static_mutex_unlock (thr->log_mtx); \ | |||||
g_mutex_unlock (thr->log_mtx); \ | |||||
} while (0) | } while (0) | ||||
/* Init functions */ | /* Init functions */ | ||||
else if (session->command == KVSTORAGE_CMD_GET) { | else if (session->command == KVSTORAGE_CMD_GET) { | ||||
elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->keylen, session->now); | elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->keylen, session->now); | ||||
if (elt == NULL) { | if (elt == NULL) { | ||||
g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); | |||||
RW_R_UNLOCK (&session->cf->storage->rwlock); | |||||
if (!is_redis) { | if (!is_redis) { | ||||
return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, | return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, | ||||
sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); | sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); | ||||
} | } | ||||
if (!rspamd_dispatcher_write (session->dispather, outbuf, | if (!rspamd_dispatcher_write (session->dispather, outbuf, | ||||
r, TRUE, FALSE)) { | r, TRUE, FALSE)) { | ||||
g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); | |||||
RW_R_UNLOCK (&session->cf->storage->rwlock); | |||||
return FALSE; | return FALSE; | ||||
} | } | ||||
if (elt->flags & KV_ELT_INTEGER) { | if (elt->flags & KV_ELT_INTEGER) { | ||||
if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) { | if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) { | ||||
g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); | |||||
RW_R_UNLOCK (&session->cf->storage->rwlock); | |||||
return FALSE; | return FALSE; | ||||
} | } | ||||
} | } | ||||
else { | else { | ||||
if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) { | if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) { | ||||
g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); | |||||
RW_R_UNLOCK (&session->cf->storage->rwlock); | |||||
return FALSE; | return FALSE; | ||||
} | } | ||||
} | } | ||||
sizeof (CRLF) - 1, FALSE, TRUE); | sizeof (CRLF) - 1, FALSE, TRUE); | ||||
} | } | ||||
if (!res) { | if (!res) { | ||||
g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); | |||||
RW_R_UNLOCK (&session->cf->storage->rwlock); | |||||
} | } | ||||
return res; | return res; | ||||
if ((session->elt->flags & KV_ELT_NEED_INSERT) != 0) { | if ((session->elt->flags & KV_ELT_NEED_INSERT) != 0) { | ||||
/* Insert to cache and free element */ | /* Insert to cache and free element */ | ||||
session->elt->flags &= ~KV_ELT_NEED_INSERT; | session->elt->flags &= ~KV_ELT_NEED_INSERT; | ||||
g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); | |||||
RW_R_UNLOCK (&session->cf->storage->rwlock); | |||||
rspamd_kv_storage_insert_cache (session->cf->storage, ELT_KEY (session->elt), | rspamd_kv_storage_insert_cache (session->cf->storage, ELT_KEY (session->elt), | ||||
session->elt->keylen, ELT_DATA (session->elt), | session->elt->keylen, ELT_DATA (session->elt), | ||||
session->elt->size, session->elt->flags, | session->elt->size, session->elt->flags, | ||||
session->elt = NULL; | session->elt = NULL; | ||||
return TRUE; | return TRUE; | ||||
} | } | ||||
g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); | |||||
RW_R_UNLOCK (&session->cf->storage->rwlock); | |||||
session->elt = NULL; | session->elt = NULL; | ||||
} | } | ||||
} | } | ||||
if (session->elt) { | if (session->elt) { | ||||
g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); | |||||
RW_R_UNLOCK (&session->cf->storage->rwlock); | |||||
session->elt = NULL; | session->elt = NULL; | ||||
} | } | ||||
gint nfd; | gint nfd; | ||||
struct kvstorage_session *session; | struct kvstorage_session *session; | ||||
g_static_mutex_lock (thr->accept_mtx); | |||||
g_mutex_lock (thr->accept_mtx); | |||||
if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { | if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { | ||||
thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno)); | thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno)); | ||||
g_static_mutex_unlock (thr->accept_mtx); | |||||
g_mutex_unlock (thr->accept_mtx); | |||||
return; | return; | ||||
} | } | ||||
/* Check for EAGAIN */ | /* Check for EAGAIN */ | ||||
if (nfd == 0) { | if (nfd == 0) { | ||||
g_static_mutex_unlock (thr->accept_mtx); | |||||
g_mutex_unlock (thr->accept_mtx); | |||||
return; | return; | ||||
} | } | ||||
kvstorage_read_socket, kvstorage_write_socket, | kvstorage_read_socket, kvstorage_write_socket, | ||||
kvstorage_err_socket, thr->tv, session); | kvstorage_err_socket, thr->tv, session); | ||||
g_static_mutex_unlock (thr->accept_mtx); | |||||
g_mutex_unlock (thr->accept_mtx); | |||||
session->elt = NULL; | session->elt = NULL; | ||||
if (su.ss.ss_family == AF_UNIX) { | if (su.ss.ss_family == AF_UNIX) { | ||||
} | } | ||||
} | } | ||||
/** | |||||
* Handle termination | |||||
*/ | |||||
static void | |||||
thr_term_socket (gint fd, short what, void *arg) | |||||
{ | |||||
struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg; | |||||
struct timeval tv; | |||||
if (read (fd, &tv, sizeof (struct timeval)) != sizeof (struct timeval)) { | |||||
thr_err ("cannot read data from socket: %s", strerror (errno)); | |||||
tv.tv_sec = 0; | |||||
tv.tv_usec = 0; | |||||
} | |||||
event_base_loopexit (thr->ev_base, &tv); | |||||
event_del (&thr->bind_ev); | |||||
} | |||||
/** | /** | ||||
* Thread main worker function | * Thread main worker function | ||||
*/ | */ | ||||
event_base_set (thr->ev_base, &thr->bind_ev); | event_base_set (thr->ev_base, &thr->bind_ev); | ||||
event_add (&thr->bind_ev, NULL); | event_add (&thr->bind_ev, NULL); | ||||
event_set (&thr->term_ev, thr->term_sock[0], EV_READ | EV_PERSIST, thr_term_socket, (void *)thr); | |||||
event_base_set (thr->ev_base, &thr->term_ev); | |||||
event_add (&thr->term_ev, NULL); | |||||
event_base_loop (thr->ev_base, 0); | event_base_loop (thr->ev_base, 0); | ||||
return NULL; | return NULL; | ||||
new->ctx = ctx; | new->ctx = ctx; | ||||
new->worker = worker; | new->worker = worker; | ||||
new->tv = &ctx->io_timeout; | new->tv = &ctx->io_timeout; | ||||
new->log_mtx = &ctx->log_mtx; | |||||
new->accept_mtx = &ctx->accept_mtx; | |||||
new->log_mtx = ctx->log_mtx; | |||||
new->accept_mtx = ctx->accept_mtx; | |||||
new->id = id; | new->id = id; | ||||
/* Create and setup terminating socket */ | |||||
if (make_socketpair (new->term_sock) == -1) { | |||||
msg_err ("socket failed: %s", strerror (errno)); | |||||
return NULL; | |||||
} | |||||
make_socket_nonblocking (new->term_sock[0]); | |||||
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) | |||||
new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err); | new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err); | ||||
#else | |||||
gchar *name; | |||||
name = memory_pool_alloc (ctx->pool, sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1); | |||||
rspamd_snprintf (name, sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1, "kvstorage_thread%d", id); | |||||
new->thr = g_thread_new (name, kvstorage_thread, new); | |||||
#endif | |||||
new->ev_base = NULL; | new->ev_base = NULL; | ||||
new->signals = signals; | new->signals = signals; | ||||
} | } | ||||
worker->srv->pid = getpid (); | worker->srv->pid = getpid (); | ||||
ctx->threads = NULL; | ctx->threads = NULL; | ||||
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) | |||||
g_thread_init (NULL); | g_thread_init (NULL); | ||||
#endif | |||||
#if _EVENT_NUMERIC_VERSION > 0x02000000 | #if _EVENT_NUMERIC_VERSION > 0x02000000 | ||||
if (evthread_use_pthreads () == -1) { | if (evthread_use_pthreads () == -1) { | ||||
msg_err ("threads support is not supported in your libevent so kvstorage is not functionable"); | msg_err ("threads support is not supported in your libevent so kvstorage is not functionable"); | ||||
} | } | ||||
init_signals (&signals, sig_handler); | init_signals (&signals, sig_handler); | ||||
sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); | |||||
/* Set umask */ | /* Set umask */ | ||||
umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP); | umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP); | ||||
/* Init mutexes */ | |||||
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) | |||||
ctx->log_mtx = g_mutex_new (); | |||||
ctx->accept_mtx = g_mutex_new (); | |||||
#else | |||||
ctx->log_mtx = memory_pool_alloc (ctx->pool, sizeof (GMutex)); | |||||
ctx->accept_mtx = memory_pool_alloc (ctx->pool, sizeof (GMutex)); | |||||
g_mutex_init (ctx->log_mtx); | |||||
g_mutex_init (ctx->accept_mtx); | |||||
#endif | |||||
/* Start workers threads */ | /* Start workers threads */ | ||||
g_static_mutex_init (&ctx->log_mtx); | |||||
g_static_mutex_init (&ctx->accept_mtx); | |||||
for (i = 0; i < worker->cf->count; i ++) { | for (i = 0; i < worker->cf->count; i ++) { | ||||
thr = create_kvstorage_thread (worker, ctx, i, &signals.sa_mask); | thr = create_kvstorage_thread (worker, ctx, i, &signals.sa_mask); | ||||
ctx->threads = g_list_prepend (ctx->threads, thr); | |||||
if (thr != NULL) { | |||||
ctx->threads = g_list_prepend (ctx->threads, thr); | |||||
} | |||||
} | } | ||||
sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL); | sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL); | ||||
cur = ctx->threads; | cur = ctx->threads; | ||||
while (cur) { | while (cur) { | ||||
thr = cur->data; | thr = cur->data; | ||||
if (thr->ev_base != NULL) { | |||||
event_del (&thr->bind_ev); | |||||
event_base_loopexit (thr->ev_base, &tv); | |||||
while (write (thr->term_sock[1], &tv, sizeof (struct timeval)) == -1) { | |||||
if (errno != EAGAIN) { | |||||
msg_err ("write to term socket failed: %s", strerror (errno)); | |||||
abort (); | |||||
} | |||||
} | } | ||||
cur = g_list_next (cur); | cur = g_list_next (cur); | ||||
} | } | ||||
cur = ctx->threads; | cur = ctx->threads; | ||||
while (cur) { | while (cur) { | ||||
thr = cur->data; | thr = cur->data; | ||||
if (thr->ev_base != NULL) { | |||||
event_del (&thr->bind_ev); | |||||
event_base_loopexit (thr->ev_base, &tv); | |||||
while (write (thr->term_sock[1], &tv, sizeof (struct timeval)) == -1) { | |||||
if (errno != EAGAIN) { | |||||
msg_err ("write to term socket failed: %s", strerror (errno)); | |||||
abort (); | |||||
} | |||||
} | } | ||||
cur = g_list_next (cur); | cur = g_list_next (cur); | ||||
} | } | ||||
} | } | ||||
msg_info ("syncing storages"); | msg_info ("syncing storages"); | ||||
/* Wait for threads in the recent glib */ | |||||
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) | |||||
cur = ctx->threads; | |||||
while (cur) { | |||||
thr = cur->data; | |||||
(void)g_thread_join (thr->thr); | |||||
cur = g_list_next (cur); | |||||
} | |||||
#endif | |||||
destroy_kvstorage_config (); | destroy_kvstorage_config (); | ||||
close_log (rspamd_main->logger); | close_log (rspamd_main->logger); | ||||
exit (EXIT_SUCCESS); | exit (EXIT_SUCCESS); |
gboolean is_redis; | gboolean is_redis; | ||||
memory_pool_t *pool; | memory_pool_t *pool; | ||||
struct event_base *ev_base; | struct event_base *ev_base; | ||||
GStaticMutex log_mtx; | |||||
GStaticMutex accept_mtx; | |||||
GMutex *log_mtx; | |||||
GMutex *accept_mtx; | |||||
}; | }; | ||||
struct kvstorage_worker_thread { | struct kvstorage_worker_thread { | ||||
struct event bind_ev; | struct event bind_ev; | ||||
struct event term_ev; | |||||
struct timeval *tv; | struct timeval *tv; | ||||
struct kvstorage_worker_ctx *ctx; | struct kvstorage_worker_ctx *ctx; | ||||
struct rspamd_worker *worker; | struct rspamd_worker *worker; | ||||
GThread *thr; | GThread *thr; | ||||
struct event_base *ev_base; | struct event_base *ev_base; | ||||
GStaticMutex *log_mtx; | |||||
GStaticMutex *accept_mtx; | |||||
GMutex *log_mtx; | |||||
GMutex *accept_mtx; | |||||
guint id; | guint id; | ||||
sigset_t *signals; | sigset_t *signals; | ||||
gint term_sock[2]; | |||||
}; | }; | ||||
struct kvstorage_session { | struct kvstorage_session { |
{ | { | ||||
while (h) { | while (h) { | ||||
if (G_LIKELY (!strong)) { | if (G_LIKELY (!strong)) { | ||||
if (h->value && !g_strncasecmp (field, h->name, strlen (field))) { | |||||
if (h->value && !g_ascii_strncasecmp (field, h->name, strlen (field))) { | |||||
if (pool != NULL) { | if (pool != NULL) { | ||||
*ret = g_list_prepend (*ret, memory_pool_strdup (pool, h->value)); | *ret = g_list_prepend (*ret, memory_pool_strdup (pool, h->value)); | ||||
} | } | ||||
while (g_mime_header_iter_is_valid (iter)) { | while (g_mime_header_iter_is_valid (iter)) { | ||||
name = g_mime_header_iter_get_name (iter); | name = g_mime_header_iter_get_name (iter); | ||||
if (G_LIKELY (!strong)) { | if (G_LIKELY (!strong)) { | ||||
if (!g_strncasecmp (field, name, strlen (name))) { | |||||
if (!g_ascii_strncasecmp (field, name, strlen (name))) { | |||||
if (pool != NULL) { | if (pool != NULL) { | ||||
*ret = g_list_prepend (*ret, memory_pool_strdup (pool, g_mime_header_iter_get_value (iter))); | *ret = g_list_prepend (*ret, memory_pool_strdup (pool, g_mime_header_iter_get_value (iter))); | ||||
} | } | ||||
{ | { | ||||
gint i; | gint i; | ||||
if (!g_strcasecmp (field, "MIME-Version:") || !g_strncasecmp (field, "Content-", 8)) { | |||||
if (!g_ascii_strcasecmp (field, "MIME-Version:") || !g_ascii_strncasecmp (field, "Content-", 8)) { | |||||
return; | return; | ||||
} | } | ||||
for (i = 0; i <= HEADER_UNKNOWN; ++i) { | for (i = 0; i <= HEADER_UNKNOWN; ++i) { | ||||
if (!fieldfunc[i].name || !g_strncasecmp (field, fieldfunc[i].name, strlen (fieldfunc[i].name))) { | |||||
if (!fieldfunc[i].name || !g_ascii_strncasecmp (field, fieldfunc[i].name, strlen (fieldfunc[i].name))) { | |||||
switch (fieldfunc[i].functype) { | switch (fieldfunc[i].functype) { | ||||
case FUNC_CHARPTR: | case FUNC_CHARPTR: | ||||
(*(fieldfunc[i].setfunc)) (message, value); | (*(fieldfunc[i].setfunc)) (message, value); | ||||
InternetAddressList *ia_list = NULL, *ia; | InternetAddressList *ia_list = NULL, *ia; | ||||
for (i = 0; i <= HEADER_UNKNOWN; ++i) { | for (i = 0; i <= HEADER_UNKNOWN; ++i) { | ||||
if (!fieldfunc[i].name || !g_strncasecmp (field, fieldfunc[i].name, strlen (fieldfunc[i].name))) { | |||||
if (!fieldfunc[i].name || !g_ascii_strncasecmp (field, fieldfunc[i].name, strlen (fieldfunc[i].name))) { | |||||
switch (fieldfunc[i].functype) { | switch (fieldfunc[i].functype) { | ||||
case FUNC_CHARFREEPTR: | case FUNC_CHARFREEPTR: | ||||
ret = (gchar *)(*(fieldfunc[i].func)) (message); | ret = (gchar *)(*(fieldfunc[i].func)) (message); |
}; | }; | ||||
static struct regexp_ctx *regexp_module_ctx = NULL; | static struct regexp_ctx *regexp_module_ctx = NULL; | ||||
static GMutex *workers_mtx = NULL; | |||||
static gint regexp_common_filter (struct worker_task *task); | static gint regexp_common_filter (struct worker_task *task); | ||||
static void process_regexp_item_threaded (gpointer data, gpointer user_data); | static void process_regexp_item_threaded (gpointer data, gpointer user_data); | ||||
if (g_thread_supported ()) { | if (g_thread_supported ()) { | ||||
thr = parse_limit (value, -1); | thr = parse_limit (value, -1); | ||||
if (thr > 1) { | if (thr > 1) { | ||||
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) | |||||
g_thread_init (NULL); | g_thread_init (NULL); | ||||
workers_mtx = g_mutex_new (); | |||||
#else | |||||
workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex)); | |||||
g_mutex_init (workers_mtx); | |||||
#endif | |||||
regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded, regexp_module_ctx, thr, TRUE, &err); | regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded, regexp_module_ctx, thr, TRUE, &err); | ||||
if (err != NULL) { | if (err != NULL) { | ||||
msg_err ("thread pool creation failed: %s", err->message); | msg_err ("thread pool creation failed: %s", err->message); | ||||
cur_opt = g_hash_table_lookup (cfg->modules_opts, "regexp"); | cur_opt = g_hash_table_lookup (cfg->modules_opts, "regexp"); | ||||
while (cur_opt) { | while (cur_opt) { | ||||
cur = cur_opt->data; | cur = cur_opt->data; | ||||
if (strcmp (cur->param, "metric") == 0 || strcmp (cur->param, "statfile_prefix") == 0) { | |||||
cur_opt = g_list_next (cur_opt); | |||||
continue; | |||||
} | |||||
else if (g_ascii_strncasecmp (cur->param, "autolearn", sizeof ("autolearn") - 1) == 0) { | |||||
/* Skip several options that are not regexp */ | |||||
if (g_ascii_strncasecmp (cur->param, "autolearn", sizeof ("autolearn") - 1) == 0) { | |||||
parse_autolearn_param (cur->param, cur->value, cfg); | parse_autolearn_param (cur->param, cur->value, cfg); | ||||
cur_opt = g_list_next (cur_opt); | cur_opt = g_list_next (cur_opt); | ||||
continue; | continue; | ||||
cur_opt = g_list_next (cur_opt); | cur_opt = g_list_next (cur_opt); | ||||
continue; | continue; | ||||
} | } | ||||
else if (g_ascii_strncasecmp (cur->param, "max_threads", sizeof ("max_threads") - 1) == 0) { | |||||
cur_opt = g_list_next (cur_opt); | |||||
continue; | |||||
} | |||||
/* Handle regexps */ | |||||
cur_item = memory_pool_alloc0 (regexp_module_ctx->regexp_pool, sizeof (struct regexp_module_item)); | cur_item = memory_pool_alloc0 (regexp_module_ctx->regexp_pool, sizeof (struct regexp_module_item)); | ||||
cur_item->symbol = cur->param; | cur_item->symbol = cur->param; | ||||
if (cur->is_lua && cur->lua_type == LUA_VAR_STRING) { | if (cur->is_lua && cur->lua_type == LUA_VAR_STRING) { | ||||
if (ud->item->lua_function) { | if (ud->item->lua_function) { | ||||
/* Just call function */ | /* Just call function */ | ||||
if (lua_call_expression_func ("regexp", ud->item->lua_function, ud->task, NULL, &res) && res) { | if (lua_call_expression_func ("regexp", ud->item->lua_function, ud->task, NULL, &res) && res) { | ||||
g_mutex_lock (workers_mtx); | |||||
insert_result (ud->task, ud->item->symbol, 1, NULL); | insert_result (ud->task, ud->item->symbol, 1, NULL); | ||||
g_mutex_unlock (workers_mtx); | |||||
} | } | ||||
} | } | ||||
else { | else { | ||||
/* Process expression */ | /* Process expression */ | ||||
if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) { | if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) { | ||||
g_mutex_lock (workers_mtx); | |||||
insert_result (ud->task, ud->item->symbol, 1, NULL); | insert_result (ud->task, ud->item->symbol, 1, NULL); | ||||
g_mutex_unlock (workers_mtx); | |||||
} | } | ||||
} | } | ||||
} | } |
cur_opt = g_hash_table_lookup (cfg->modules_opts, "surbl"); | cur_opt = g_hash_table_lookup (cfg->modules_opts, "surbl"); | ||||
while (cur_opt) { | while (cur_opt) { | ||||
cur = cur_opt->data; | cur = cur_opt->data; | ||||
if (!g_strncasecmp (cur->param, "suffix", sizeof ("suffix") - 1)) { | |||||
if (!g_ascii_strncasecmp (cur->param, "suffix", sizeof ("suffix") - 1)) { | |||||
if ((str = strchr (cur->param, '_')) != NULL) { | if ((str = strchr (cur->param, '_')) != NULL) { | ||||
new_suffix = memory_pool_alloc (surbl_module_ctx->surbl_pool, sizeof (struct suffix_item)); | new_suffix = memory_pool_alloc (surbl_module_ctx->surbl_pool, sizeof (struct suffix_item)); | ||||
*str = '\0'; | *str = '\0'; | ||||
} | } | ||||
} | } | ||||
/* Search for bits */ | /* Search for bits */ | ||||
else if (!g_strncasecmp (cur->param, "bit", sizeof ("bit") - 1)) { | |||||
else if (!g_ascii_strncasecmp (cur->param, "bit", sizeof ("bit") - 1)) { | |||||
if ((str = strchr (cur->param, '_')) != NULL) { | if ((str = strchr (cur->param, '_')) != NULL) { | ||||
bit = strtoul (str + 1, NULL, 10); | bit = strtoul (str + 1, NULL, 10); | ||||
if (bit != 0) { | if (bit != 0) { |