summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2013-06-11 00:16:27 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2013-06-11 00:16:27 +0100
commitdef7ea2c8ba62a067b63f3be65bfd44147da315f (patch)
tree5245f48a459c3e76fce1cddce5ae64dfcc7d6a72
parentf06d122b502e7df46aa6860915ec8854348b5b77 (diff)
downloadrspamd-def7ea2c8ba62a067b63f3be65bfd44147da315f.tar.gz
rspamd-def7ea2c8ba62a067b63f3be65bfd44147da315f.zip
Update fuzzy storage in a separate thread.
-rw-r--r--src/fuzzy_storage.c243
-rw-r--r--src/util.h10
2 files changed, 170 insertions, 83 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index bedf473ac..53bde37bf 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -73,7 +73,7 @@ worker_t fuzzy_worker = {
start_fuzzy, /* Start function */
FALSE, /* No socket */
TRUE, /* Unique */
- FALSE, /* Non threaded */
+ TRUE, /* Threaded */
FALSE /* Non killable */
};
@@ -90,6 +90,7 @@ static guint32 mods = 0;
static struct timeval tmv;
static struct event tev;
static struct rspamd_stat *server_stat;
+static sig_atomic_t wanna_die = 0;
struct rspamd_fuzzy_storage_ctx {
gboolean use_judy;
@@ -100,6 +101,10 @@ struct rspamd_fuzzy_storage_ctx {
radix_tree_t *update_ips;
gchar *update_map;
struct event_base *ev_base;
+ rspamd_rwlock_t *tree_lock;
+ rspamd_mutex_t *update_mtx;
+ GCond *update_cond;
+ GThread *update_thread;
};
struct rspamd_fuzzy_node {
@@ -149,113 +154,142 @@ compare_nodes (gconstpointer a, gconstpointer b, gpointer unused)
return n1->value - n2->value;
}
-static void
-sync_cache (struct rspamd_worker *wrk)
+static gpointer
+sync_cache (gpointer ud)
{
+ struct rspamd_worker *wrk = ud;
gint fd, i;
gchar *filename, header[4];
GList *cur, *tmp;
struct rspamd_fuzzy_node *node;
guint64 expire, now;
- struct rspamd_fuzzy_storage_ctx *ctx = wrk->ctx;
+ struct rspamd_fuzzy_storage_ctx *ctx;
#ifdef WITH_JUDY
PPvoid_t pvalue;
gchar indexbuf[1024], tmpindex[1024];
#endif
- /* Check for modifications */
- if (mods < ctx->max_mods) {
- return;
- }
+ ctx = wrk->ctx;
- msg_info ("syncing fuzzy hash storage");
- filename = ctx->hashfile;
- if (filename == NULL) {
- return;
- }
- expire = ctx->expire;
-
- if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) {
- msg_err ("cannot create hash file %s: %s", filename, strerror (errno));
- return;
- }
+ for (;;) {
- (void)lock_file (fd, FALSE);
+ rspamd_mutex_lock (ctx->update_mtx);
- now = (guint64) time (NULL);
-
- /* Fill header */
- memcpy (header, FUZZY_FILE_MAGIC, 3);
- header[3] = (gchar)CURRENT_FUZZY_VERSION;
- if (write (fd, header, sizeof (header)) == -1) {
- msg_err ("cannot write file %s while writing header: %s", filename, strerror (errno));
- goto end;
- }
+ /* Check for modifications */
+ while (mods < ctx->max_mods && !wanna_die) {
+ rspamd_cond_wait (ctx->update_cond, ctx->update_mtx);
+ }
+
+ msg_info ("syncing fuzzy hash storage");
+ filename = ctx->hashfile;
+ if (filename == NULL ) {
+ rspamd_mutex_unlock (ctx->update_mtx);
+ continue;
+ }
+ expire = ctx->expire;
+
+ if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT,
+ S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) {
+ msg_err(
+ "cannot create hash file %s: %s", filename, strerror (errno));
+ rspamd_mutex_unlock (ctx->update_mtx);
+ continue;
+ }
+
+ (void) lock_file (fd, FALSE);
+
+ now = (guint64) time (NULL );
+
+ /* Fill header */
+ memcpy (header, FUZZY_FILE_MAGIC, 3);
+ header[3] = (gchar) CURRENT_FUZZY_VERSION;
+ if (write (fd, header, sizeof(header)) == -1) {
+ msg_err(
+ "cannot write file %s while writing header: %s", filename, strerror (errno));
+ goto end;
+ }
#ifdef WITH_JUDY
- if (ctx->use_judy) {
- indexbuf[0] = '\0';
- pvalue = JudySLFirst (jtree, indexbuf, PJE0);
- while (pvalue) {
- node = *((struct rspamd_fuzzy_node **)pvalue);
- if (now - node->time > expire) {
- /* Remove expired item */
- rspamd_strlcpy (tmpindex, indexbuf, sizeof (tmpindex));
- pvalue = JudySLNext (jtree, tmpindex, PJE0);
- JudySLDel (&jtree, indexbuf, PJE0);
- rspamd_strlcpy (indexbuf, tmpindex, sizeof (indexbuf));
- bloom_del (bf, node->h.hash_pipe);
- server_stat->fuzzy_hashes_expired ++;
- server_stat->fuzzy_hashes --;
- g_slice_free1 (sizeof (struct rspamd_fuzzy_node), node);
- continue;
- }
- if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
- msg_err ("cannot write file %s: %s", filename, strerror (errno));
- goto end;
+ if (ctx->use_judy) {
+ rspamd_rwlock_reader_lock (ctx->tree_lock);
+ indexbuf[0] = '\0';
+ pvalue = JudySLFirst (jtree, indexbuf, PJE0);
+ while (pvalue) {
+ node = *((struct rspamd_fuzzy_node **)pvalue);
+ if (now - node->time > expire) {
+#if 0
+ /* Remove expired item */
+ rspamd_strlcpy (tmpindex, indexbuf, sizeof (tmpindex));
+ pvalue = JudySLNext (jtree, tmpindex, PJE0);
+ JudySLDel (&jtree, indexbuf, PJE0);
+ rspamd_strlcpy (indexbuf, tmpindex, sizeof (indexbuf));
+ bloom_del (bf, node->h.hash_pipe);
+ server_stat->fuzzy_hashes_expired ++;
+ server_stat->fuzzy_hashes --;
+ g_slice_free1 (sizeof (struct rspamd_fuzzy_node), node);
+#endif
+ continue;
+ }
+ if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
+ msg_err ("cannot write file %s: %s", filename, strerror (errno));
+ goto end;
+ }
+ pvalue = JudySLNext (jtree, indexbuf, PJE0);
}
- pvalue = JudySLNext (jtree, indexbuf, PJE0);
+ rspamd_rwlock_reader_unlock (ctx->tree_lock);
}
- }
- else {
+ else {
#endif
- cur = frequent->head;
- while (cur) {
- node = cur->data;
- if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
- msg_err ("cannot write file %s: %s", filename, strerror (errno));
- }
- cur = g_list_next (cur);
- }
- for (i = 0; i < BUCKETS; i++) {
- cur = hashes[i]->head;
+ rspamd_rwlock_reader_lock (ctx->tree_lock);
+ cur = frequent->head;
while (cur) {
node = cur->data;
- if (now - node->time > expire) {
- /* Remove expired item */
- tmp = cur;
- cur = g_list_next (cur);
- g_queue_delete_link (hashes[i], tmp);
- bloom_del (bf, node->h.hash_pipe);
- server_stat->fuzzy_hashes_expired ++;
- server_stat->fuzzy_hashes --;
- g_slice_free1 (sizeof (struct rspamd_fuzzy_node), node);
- continue;
- }
- if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
- msg_err ("cannot write file %s: %s", filename, strerror (errno));
- goto end;
+ if (write (fd, node, sizeof(struct rspamd_fuzzy_node)) == -1) {
+ msg_err("cannot write file %s: %s", filename, strerror (errno));
}
cur = g_list_next (cur);
}
- }
+ for (i = 0; i < BUCKETS; i++) {
+ cur = hashes[i]->head;
+ while (cur) {
+ node = cur->data;
+ if (now - node->time > expire) {
+#if 0
+ /* Remove expired item */
+ tmp = cur;
+ cur = g_list_next (cur);
+ g_queue_delete_link (hashes[i], tmp);
+ bloom_del (bf, node->h.hash_pipe);
+ server_stat->fuzzy_hashes_expired++;
+ server_stat->fuzzy_hashes--;
+ g_slice_free1 (sizeof(struct rspamd_fuzzy_node), node);
+#endif
+ continue;
+ }
+ if (write (fd, node, sizeof(struct rspamd_fuzzy_node)) == -1) {
+ msg_err(
+ "cannot write file %s: %s", filename, strerror (errno));
+ goto end;
+ }
+ cur = g_list_next (cur);
+ }
+ }
+ rspamd_rwlock_reader_unlock (ctx->tree_lock);
#ifdef WITH_JUDY
}
#endif
end:
- (void)unlock_file (fd, FALSE);
- close (fd);
+ (void) unlock_file (fd, FALSE);
+ close (fd);
+
+ rspamd_mutex_unlock (ctx->update_mtx);
+ if (wanna_die) {
+ break;
+ }
+ }
+
+ return NULL;
}
static void
@@ -273,8 +307,13 @@ sigterm_handler (gint fd, short what, void *arg)
event_del (&worker->sig_ev_usr2);
event_del (&worker->bind_ev);
close (worker->cf->listen_sock);
+
+ rspamd_mutex_lock (ctx->update_mtx);
mods = ctx->max_mods + 1;
- sync_cache (worker);
+ wanna_die = 1;
+ g_cond_signal (ctx->update_cond);
+ rspamd_mutex_unlock (ctx->update_mtx);
+
(void)event_base_loopexit (ctx->ev_base, &tv);
}
@@ -297,8 +336,11 @@ sigusr2_handler (gint fd, short what, void *arg)
event_del (&worker->bind_ev);
close (worker->cf->listen_sock);
msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+ rspamd_mutex_lock (ctx->update_mtx);
mods = ctx->max_mods + 1;
- sync_cache (worker);
+ wanna_die = 1;
+ g_cond_signal (ctx->update_cond);
+ rspamd_mutex_unlock (ctx->update_mtx);
event_base_loopexit (ctx->ev_base, &tv);
return;
@@ -457,6 +499,7 @@ check_hash_node (GQueue *hash, fuzzy_hash_t *s, gint update_value, struct rspamd
GList *cur;
struct rspamd_fuzzy_node *h;
gint prob = 0;
+
#ifdef WITH_JUDY
PPvoid_t pvalue;
@@ -529,7 +572,9 @@ process_check_command (struct fuzzy_cmd *cmd, gint *flag, struct rspamd_fuzzy_st
memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
s.block_size = cmd->blocksize;
+ rspamd_rwlock_reader_lock (ctx->tree_lock);
h = check_hash_node (hashes[cmd->blocksize % BUCKETS], &s, 0, ctx);
+ rspamd_rwlock_reader_unlock (ctx->tree_lock);
if (h == NULL) {
return 0;
@@ -544,12 +589,17 @@ static gboolean
update_hash (struct fuzzy_cmd *cmd, struct rspamd_fuzzy_storage_ctx *ctx)
{
fuzzy_hash_t s;
+ gboolean r;
memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
s.block_size = cmd->blocksize;
mods ++;
- return check_hash_node (hashes[cmd->blocksize % BUCKETS], &s, cmd->value, ctx) != NULL;
+ rspamd_rwlock_writer_lock (ctx->tree_lock);
+ r = check_hash_node (hashes[cmd->blocksize % BUCKETS], &s, cmd->value, ctx) != NULL;
+ rspamd_rwlock_writer_unlock (ctx->tree_lock);
+
+ return r;
}
static gboolean
@@ -566,6 +616,7 @@ process_write_command (struct fuzzy_cmd *cmd, struct rspamd_fuzzy_storage_ctx *c
}
}
+ rspamd_rwlock_writer_lock (ctx->tree_lock);
h = g_slice_alloc (sizeof (struct rspamd_fuzzy_node));
memcpy (&h->h.hash_pipe, &cmd->hash, sizeof (cmd->hash));
h->h.block_size = cmd->blocksize;
@@ -584,6 +635,7 @@ process_write_command (struct fuzzy_cmd *cmd, struct rspamd_fuzzy_storage_ctx *c
#ifdef WITH_JUDY
}
#endif
+ rspamd_rwlock_writer_unlock (ctx->tree_lock);
bloom_add (bf, cmd->hash);
mods++;
server_stat->fuzzy_hashes ++;
@@ -603,6 +655,7 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s, struct rspamd_fuzzy_storage_ctx *ctx
gpointer data;
if (ctx->use_judy) {
+ rspamd_rwlock_writer_lock (ctx->tree_lock);
pvalue = JudySLGet (jtree, s->hash_pipe, PJE0);
if (pvalue) {
data = *pvalue;
@@ -613,9 +666,11 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s, struct rspamd_fuzzy_storage_ctx *ctx
server_stat->fuzzy_hashes --;
mods++;
}
+ rspamd_rwlock_writer_unlock (ctx->tree_lock);
}
else {
#endif
+ rspamd_rwlock_writer_lock (ctx->tree_lock);
cur = hash->head;
/* XXX: too slow way */
@@ -635,6 +690,7 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s, struct rspamd_fuzzy_storage_ctx *ctx
}
cur = g_list_next (cur);
}
+ rspamd_rwlock_writer_unlock (ctx->tree_lock);
#ifdef WITH_JUDY
}
#endif
@@ -837,7 +893,9 @@ sync_callback (gint fd, short what, void *arg)
tmv.tv_usec = 0;
evtimer_add (&tev, &tmv);
- sync_cache (worker);
+ rspamd_mutex_lock (ctx->update_mtx);
+ g_cond_signal (ctx->update_cond);
+ rspamd_mutex_unlock (ctx->update_mtx);
}
static gboolean
@@ -877,6 +935,14 @@ init_fuzzy (void)
ctx->max_mods = DEFAULT_MOD_LIMIT;
ctx->frequent_score = DEFAULT_FREQUENT_SCORE;
ctx->expire = DEFAULT_EXPIRE;
+ ctx->tree_lock = rspamd_rwlock_new ();
+ ctx->update_mtx = rspamd_mutex_new ();
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
+ ctx->update_cond = g_malloc0 (sizeof (GCond));
+ g_cond_init (ctx->update_cond);
+#else
+ ctx->update_cond = g_cond_new ();
+#endif
register_worker_opt (type, "hashfile", xml_handle_string, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, hashfile));
@@ -904,6 +970,7 @@ start_fuzzy (struct rspamd_worker *worker)
struct event sev;
gint retries = 0;
struct rspamd_fuzzy_storage_ctx *ctx = worker->ctx;
+ GError *err = NULL;
worker->srv->pid = getpid ();
@@ -971,7 +1038,17 @@ start_fuzzy (struct rspamd_worker *worker)
gperf_profiler_init (worker->srv->cfg, "fuzzy");
+ ctx->update_thread = rspamd_create_thread ("fuzzy update", sync_cache, worker, &err);
+ if (ctx->update_thread == NULL) {
+ msg_err ("error creating update thread: %s", err->message);
+ }
+
event_base_loop (ctx->ev_base, 0);
+
+ if (ctx->update_thread != NULL) {
+ g_thread_join (ctx->update_thread);
+ }
+
close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);
}
diff --git a/src/util.h b/src/util.h
index 39473a85e..228fb7445 100644
--- a/src/util.h
+++ b/src/util.h
@@ -315,6 +315,16 @@ void rspamd_rwlock_reader_unlock (rspamd_rwlock_t *mtx);
*/
void rspamd_rwlock_free (rspamd_rwlock_t *mtx);
+static inline void
+rspamd_cond_wait (GCond *cond, rspamd_mutex_t *mtx)
+{
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
+ g_cond_wait (cond, &mtx->mtx);
+#else
+ g_cond_wait (cond, g_static_mutex_get_mutex (&mtx->mtx));
+#endif
+}
+
/**
* Create new named thread
* @param name name pattern