From def7ea2c8ba62a067b63f3be65bfd44147da315f Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 11 Jun 2013 00:16:27 +0100 Subject: [PATCH] Update fuzzy storage in a separate thread. --- src/fuzzy_storage.c | 243 +++++++++++++++++++++++++++++--------------- src/util.h | 10 ++ 2 files changed, 170 insertions(+), 83 deletions(-) diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index bedf473ac..53bde37bf 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -73,7 +73,7 @@ worker_t fuzzy_worker = { start_fuzzy, /* Start function */ FALSE, /* No socket */ TRUE, /* Unique */ - FALSE, /* Non threaded */ + TRUE, /* Threaded */ FALSE /* Non killable */ }; @@ -90,6 +90,7 @@ static guint32 mods = 0; static struct timeval tmv; static struct event tev; static struct rspamd_stat *server_stat; +static sig_atomic_t wanna_die = 0; struct rspamd_fuzzy_storage_ctx { gboolean use_judy; @@ -100,6 +101,10 @@ struct rspamd_fuzzy_storage_ctx { radix_tree_t *update_ips; gchar *update_map; struct event_base *ev_base; + rspamd_rwlock_t *tree_lock; + rspamd_mutex_t *update_mtx; + GCond *update_cond; + GThread *update_thread; }; struct rspamd_fuzzy_node { @@ -149,113 +154,142 @@ compare_nodes (gconstpointer a, gconstpointer b, gpointer unused) return n1->value - n2->value; } -static void -sync_cache (struct rspamd_worker *wrk) +static gpointer +sync_cache (gpointer ud) { + struct rspamd_worker *wrk = ud; gint fd, i; gchar *filename, header[4]; GList *cur, *tmp; struct rspamd_fuzzy_node *node; guint64 expire, now; - struct rspamd_fuzzy_storage_ctx *ctx = wrk->ctx; + struct rspamd_fuzzy_storage_ctx *ctx; #ifdef WITH_JUDY PPvoid_t pvalue; gchar indexbuf[1024], tmpindex[1024]; #endif - /* Check for modifications */ - if (mods < ctx->max_mods) { - return; - } + ctx = wrk->ctx; - msg_info ("syncing fuzzy hash storage"); - filename = ctx->hashfile; - if (filename == NULL) { - return; - } - expire = ctx->expire; - - if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) { - msg_err ("cannot create hash file %s: %s", filename, strerror (errno)); - return; - } + for (;;) { - (void)lock_file (fd, FALSE); + rspamd_mutex_lock (ctx->update_mtx); - now = (guint64) time (NULL); - - /* Fill header */ - memcpy (header, FUZZY_FILE_MAGIC, 3); - header[3] = (gchar)CURRENT_FUZZY_VERSION; - if (write (fd, header, sizeof (header)) == -1) { - msg_err ("cannot write file %s while writing header: %s", filename, strerror (errno)); - goto end; - } + /* Check for modifications */ + while (mods < ctx->max_mods && !wanna_die) { + rspamd_cond_wait (ctx->update_cond, ctx->update_mtx); + } + + msg_info ("syncing fuzzy hash storage"); + filename = ctx->hashfile; + if (filename == NULL ) { + rspamd_mutex_unlock (ctx->update_mtx); + continue; + } + expire = ctx->expire; + + if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT, + S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) { + msg_err( + "cannot create hash file %s: %s", filename, strerror (errno)); + rspamd_mutex_unlock (ctx->update_mtx); + continue; + } + + (void) lock_file (fd, FALSE); + + now = (guint64) time (NULL ); + + /* Fill header */ + memcpy (header, FUZZY_FILE_MAGIC, 3); + header[3] = (gchar) CURRENT_FUZZY_VERSION; + if (write (fd, header, sizeof(header)) == -1) { + msg_err( + "cannot write file %s while writing header: %s", filename, strerror (errno)); + goto end; + } #ifdef WITH_JUDY - if (ctx->use_judy) { - indexbuf[0] = '\0'; - pvalue = JudySLFirst (jtree, indexbuf, PJE0); - while (pvalue) { - node = *((struct rspamd_fuzzy_node **)pvalue); - if (now - node->time > expire) { - /* Remove expired item */ - rspamd_strlcpy (tmpindex, indexbuf, sizeof (tmpindex)); - pvalue = JudySLNext (jtree, tmpindex, PJE0); - JudySLDel (&jtree, indexbuf, PJE0); - rspamd_strlcpy (indexbuf, tmpindex, sizeof (indexbuf)); - bloom_del (bf, node->h.hash_pipe); - server_stat->fuzzy_hashes_expired ++; - server_stat->fuzzy_hashes --; - g_slice_free1 (sizeof (struct rspamd_fuzzy_node), node); - continue; - } - if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) { - msg_err ("cannot write file %s: %s", filename, strerror (errno)); - goto end; + if (ctx->use_judy) { + rspamd_rwlock_reader_lock (ctx->tree_lock); + indexbuf[0] = '\0'; + pvalue = JudySLFirst (jtree, indexbuf, PJE0); + while (pvalue) { + node = *((struct rspamd_fuzzy_node **)pvalue); + if (now - node->time > expire) { +#if 0 + /* Remove expired item */ + rspamd_strlcpy (tmpindex, indexbuf, sizeof (tmpindex)); + pvalue = JudySLNext (jtree, tmpindex, PJE0); + JudySLDel (&jtree, indexbuf, PJE0); + rspamd_strlcpy (indexbuf, tmpindex, sizeof (indexbuf)); + bloom_del (bf, node->h.hash_pipe); + server_stat->fuzzy_hashes_expired ++; + server_stat->fuzzy_hashes --; + g_slice_free1 (sizeof (struct rspamd_fuzzy_node), node); +#endif + continue; + } + if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) { + msg_err ("cannot write file %s: %s", filename, strerror (errno)); + goto end; + } + pvalue = JudySLNext (jtree, indexbuf, PJE0); } - pvalue = JudySLNext (jtree, indexbuf, PJE0); + rspamd_rwlock_reader_unlock (ctx->tree_lock); } - } - else { + else { #endif - cur = frequent->head; - while (cur) { - node = cur->data; - if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) { - msg_err ("cannot write file %s: %s", filename, strerror (errno)); - } - cur = g_list_next (cur); - } - for (i = 0; i < BUCKETS; i++) { - cur = hashes[i]->head; + rspamd_rwlock_reader_lock (ctx->tree_lock); + cur = frequent->head; while (cur) { node = cur->data; - if (now - node->time > expire) { - /* Remove expired item */ - tmp = cur; - cur = g_list_next (cur); - g_queue_delete_link (hashes[i], tmp); - bloom_del (bf, node->h.hash_pipe); - server_stat->fuzzy_hashes_expired ++; - server_stat->fuzzy_hashes --; - g_slice_free1 (sizeof (struct rspamd_fuzzy_node), node); - continue; - } - if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) { - msg_err ("cannot write file %s: %s", filename, strerror (errno)); - goto end; + if (write (fd, node, sizeof(struct rspamd_fuzzy_node)) == -1) { + msg_err("cannot write file %s: %s", filename, strerror (errno)); } cur = g_list_next (cur); } - } + for (i = 0; i < BUCKETS; i++) { + cur = hashes[i]->head; + while (cur) { + node = cur->data; + if (now - node->time > expire) { +#if 0 + /* Remove expired item */ + tmp = cur; + cur = g_list_next (cur); + g_queue_delete_link (hashes[i], tmp); + bloom_del (bf, node->h.hash_pipe); + server_stat->fuzzy_hashes_expired++; + server_stat->fuzzy_hashes--; + g_slice_free1 (sizeof(struct rspamd_fuzzy_node), node); +#endif + continue; + } + if (write (fd, node, sizeof(struct rspamd_fuzzy_node)) == -1) { + msg_err( + "cannot write file %s: %s", filename, strerror (errno)); + goto end; + } + cur = g_list_next (cur); + } + } + rspamd_rwlock_reader_unlock (ctx->tree_lock); #ifdef WITH_JUDY } #endif end: - (void)unlock_file (fd, FALSE); - close (fd); + (void) unlock_file (fd, FALSE); + close (fd); + + rspamd_mutex_unlock (ctx->update_mtx); + if (wanna_die) { + break; + } + } + + return NULL; } static void @@ -273,8 +307,13 @@ sigterm_handler (gint fd, short what, void *arg) event_del (&worker->sig_ev_usr2); event_del (&worker->bind_ev); close (worker->cf->listen_sock); + + rspamd_mutex_lock (ctx->update_mtx); mods = ctx->max_mods + 1; - sync_cache (worker); + wanna_die = 1; + g_cond_signal (ctx->update_cond); + rspamd_mutex_unlock (ctx->update_mtx); + (void)event_base_loopexit (ctx->ev_base, &tv); } @@ -297,8 +336,11 @@ sigusr2_handler (gint fd, short what, void *arg) event_del (&worker->bind_ev); close (worker->cf->listen_sock); msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + rspamd_mutex_lock (ctx->update_mtx); mods = ctx->max_mods + 1; - sync_cache (worker); + wanna_die = 1; + g_cond_signal (ctx->update_cond); + rspamd_mutex_unlock (ctx->update_mtx); event_base_loopexit (ctx->ev_base, &tv); return; @@ -457,6 +499,7 @@ check_hash_node (GQueue *hash, fuzzy_hash_t *s, gint update_value, struct rspamd GList *cur; struct rspamd_fuzzy_node *h; gint prob = 0; + #ifdef WITH_JUDY PPvoid_t pvalue; @@ -529,7 +572,9 @@ process_check_command (struct fuzzy_cmd *cmd, gint *flag, struct rspamd_fuzzy_st memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe)); s.block_size = cmd->blocksize; + rspamd_rwlock_reader_lock (ctx->tree_lock); h = check_hash_node (hashes[cmd->blocksize % BUCKETS], &s, 0, ctx); + rspamd_rwlock_reader_unlock (ctx->tree_lock); if (h == NULL) { return 0; @@ -544,12 +589,17 @@ static gboolean update_hash (struct fuzzy_cmd *cmd, struct rspamd_fuzzy_storage_ctx *ctx) { fuzzy_hash_t s; + gboolean r; memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe)); s.block_size = cmd->blocksize; mods ++; - return check_hash_node (hashes[cmd->blocksize % BUCKETS], &s, cmd->value, ctx) != NULL; + rspamd_rwlock_writer_lock (ctx->tree_lock); + r = check_hash_node (hashes[cmd->blocksize % BUCKETS], &s, cmd->value, ctx) != NULL; + rspamd_rwlock_writer_unlock (ctx->tree_lock); + + return r; } static gboolean @@ -566,6 +616,7 @@ process_write_command (struct fuzzy_cmd *cmd, struct rspamd_fuzzy_storage_ctx *c } } + rspamd_rwlock_writer_lock (ctx->tree_lock); h = g_slice_alloc (sizeof (struct rspamd_fuzzy_node)); memcpy (&h->h.hash_pipe, &cmd->hash, sizeof (cmd->hash)); h->h.block_size = cmd->blocksize; @@ -584,6 +635,7 @@ process_write_command (struct fuzzy_cmd *cmd, struct rspamd_fuzzy_storage_ctx *c #ifdef WITH_JUDY } #endif + rspamd_rwlock_writer_unlock (ctx->tree_lock); bloom_add (bf, cmd->hash); mods++; server_stat->fuzzy_hashes ++; @@ -603,6 +655,7 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s, struct rspamd_fuzzy_storage_ctx *ctx gpointer data; if (ctx->use_judy) { + rspamd_rwlock_writer_lock (ctx->tree_lock); pvalue = JudySLGet (jtree, s->hash_pipe, PJE0); if (pvalue) { data = *pvalue; @@ -613,9 +666,11 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s, struct rspamd_fuzzy_storage_ctx *ctx server_stat->fuzzy_hashes --; mods++; } + rspamd_rwlock_writer_unlock (ctx->tree_lock); } else { #endif + rspamd_rwlock_writer_lock (ctx->tree_lock); cur = hash->head; /* XXX: too slow way */ @@ -635,6 +690,7 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s, struct rspamd_fuzzy_storage_ctx *ctx } cur = g_list_next (cur); } + rspamd_rwlock_writer_unlock (ctx->tree_lock); #ifdef WITH_JUDY } #endif @@ -837,7 +893,9 @@ sync_callback (gint fd, short what, void *arg) tmv.tv_usec = 0; evtimer_add (&tev, &tmv); - sync_cache (worker); + rspamd_mutex_lock (ctx->update_mtx); + g_cond_signal (ctx->update_cond); + rspamd_mutex_unlock (ctx->update_mtx); } static gboolean @@ -877,6 +935,14 @@ init_fuzzy (void) ctx->max_mods = DEFAULT_MOD_LIMIT; ctx->frequent_score = DEFAULT_FREQUENT_SCORE; ctx->expire = DEFAULT_EXPIRE; + ctx->tree_lock = rspamd_rwlock_new (); + ctx->update_mtx = rspamd_mutex_new (); +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) + ctx->update_cond = g_malloc0 (sizeof (GCond)); + g_cond_init (ctx->update_cond); +#else + ctx->update_cond = g_cond_new (); +#endif register_worker_opt (type, "hashfile", xml_handle_string, ctx, G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, hashfile)); @@ -904,6 +970,7 @@ start_fuzzy (struct rspamd_worker *worker) struct event sev; gint retries = 0; struct rspamd_fuzzy_storage_ctx *ctx = worker->ctx; + GError *err = NULL; worker->srv->pid = getpid (); @@ -971,7 +1038,17 @@ start_fuzzy (struct rspamd_worker *worker) gperf_profiler_init (worker->srv->cfg, "fuzzy"); + ctx->update_thread = rspamd_create_thread ("fuzzy update", sync_cache, worker, &err); + if (ctx->update_thread == NULL) { + msg_err ("error creating update thread: %s", err->message); + } + event_base_loop (ctx->ev_base, 0); + + if (ctx->update_thread != NULL) { + g_thread_join (ctx->update_thread); + } + close_log (rspamd_main->logger); exit (EXIT_SUCCESS); } diff --git a/src/util.h b/src/util.h index 39473a85e..228fb7445 100644 --- a/src/util.h +++ b/src/util.h @@ -315,6 +315,16 @@ void rspamd_rwlock_reader_unlock (rspamd_rwlock_t *mtx); */ void rspamd_rwlock_free (rspamd_rwlock_t *mtx); +static inline void +rspamd_cond_wait (GCond *cond, rspamd_mutex_t *mtx) +{ +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) + g_cond_wait (cond, &mtx->mtx); +#else + g_cond_wait (cond, g_static_mutex_get_mutex (&mtx->mtx)); +#endif +} + /** * Create new named thread * @param name name pattern -- 2.39.5