Browse Source

Update fuzzy storage in a separate thread.

tags/0.6.0
Vsevolod Stakhov 11 years ago
parent
commit
def7ea2c8b
2 changed files with 170 additions and 83 deletions
  1. 160
    83
      src/fuzzy_storage.c
  2. 10
    0
      src/util.h

+ 160
- 83
src/fuzzy_storage.c View File

@@ -73,7 +73,7 @@ worker_t fuzzy_worker = {
start_fuzzy, /* Start function */
FALSE, /* No socket */
TRUE, /* Unique */
FALSE, /* Non threaded */
TRUE, /* Threaded */
FALSE /* Non killable */
};

@@ -90,6 +90,7 @@ static guint32 mods = 0;
static struct timeval tmv;
static struct event tev;
static struct rspamd_stat *server_stat;
static sig_atomic_t wanna_die = 0;

struct rspamd_fuzzy_storage_ctx {
gboolean use_judy;
@@ -100,6 +101,10 @@ struct rspamd_fuzzy_storage_ctx {
radix_tree_t *update_ips;
gchar *update_map;
struct event_base *ev_base;
rspamd_rwlock_t *tree_lock;
rspamd_mutex_t *update_mtx;
GCond *update_cond;
GThread *update_thread;
};

struct rspamd_fuzzy_node {
@@ -149,113 +154,142 @@ compare_nodes (gconstpointer a, gconstpointer b, gpointer unused)
return n1->value - n2->value;
}

static void
sync_cache (struct rspamd_worker *wrk)
static gpointer
sync_cache (gpointer ud)
{
struct rspamd_worker *wrk = ud;
gint fd, i;
gchar *filename, header[4];
GList *cur, *tmp;
struct rspamd_fuzzy_node *node;
guint64 expire, now;
struct rspamd_fuzzy_storage_ctx *ctx = wrk->ctx;
struct rspamd_fuzzy_storage_ctx *ctx;
#ifdef WITH_JUDY
PPvoid_t pvalue;
gchar indexbuf[1024], tmpindex[1024];
#endif

/* Check for modifications */
if (mods < ctx->max_mods) {
return;
}
ctx = wrk->ctx;

msg_info ("syncing fuzzy hash storage");
filename = ctx->hashfile;
if (filename == NULL) {
return;
}
expire = ctx->expire;
if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) {
msg_err ("cannot create hash file %s: %s", filename, strerror (errno));
return;
}
for (;;) {

(void)lock_file (fd, FALSE);
rspamd_mutex_lock (ctx->update_mtx);

now = (guint64) time (NULL);
/* Fill header */
memcpy (header, FUZZY_FILE_MAGIC, 3);
header[3] = (gchar)CURRENT_FUZZY_VERSION;
if (write (fd, header, sizeof (header)) == -1) {
msg_err ("cannot write file %s while writing header: %s", filename, strerror (errno));
goto end;
}
/* Check for modifications */
while (mods < ctx->max_mods && !wanna_die) {
rspamd_cond_wait (ctx->update_cond, ctx->update_mtx);
}

msg_info ("syncing fuzzy hash storage");
filename = ctx->hashfile;
if (filename == NULL ) {
rspamd_mutex_unlock (ctx->update_mtx);
continue;
}
expire = ctx->expire;

if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT,
S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) {
msg_err(
"cannot create hash file %s: %s", filename, strerror (errno));
rspamd_mutex_unlock (ctx->update_mtx);
continue;
}

(void) lock_file (fd, FALSE);

now = (guint64) time (NULL );

/* Fill header */
memcpy (header, FUZZY_FILE_MAGIC, 3);
header[3] = (gchar) CURRENT_FUZZY_VERSION;
if (write (fd, header, sizeof(header)) == -1) {
msg_err(
"cannot write file %s while writing header: %s", filename, strerror (errno));
goto end;
}

#ifdef WITH_JUDY
if (ctx->use_judy) {
indexbuf[0] = '\0';
pvalue = JudySLFirst (jtree, indexbuf, PJE0);
while (pvalue) {
node = *((struct rspamd_fuzzy_node **)pvalue);
if (now - node->time > expire) {
/* Remove expired item */
rspamd_strlcpy (tmpindex, indexbuf, sizeof (tmpindex));
pvalue = JudySLNext (jtree, tmpindex, PJE0);
JudySLDel (&jtree, indexbuf, PJE0);
rspamd_strlcpy (indexbuf, tmpindex, sizeof (indexbuf));
bloom_del (bf, node->h.hash_pipe);
server_stat->fuzzy_hashes_expired ++;
server_stat->fuzzy_hashes --;
g_slice_free1 (sizeof (struct rspamd_fuzzy_node), node);
continue;
}
if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
msg_err ("cannot write file %s: %s", filename, strerror (errno));
goto end;
if (ctx->use_judy) {
rspamd_rwlock_reader_lock (ctx->tree_lock);
indexbuf[0] = '\0';
pvalue = JudySLFirst (jtree, indexbuf, PJE0);
while (pvalue) {
node = *((struct rspamd_fuzzy_node **)pvalue);
if (now - node->time > expire) {
#if 0
/* Remove expired item */
rspamd_strlcpy (tmpindex, indexbuf, sizeof (tmpindex));
pvalue = JudySLNext (jtree, tmpindex, PJE0);
JudySLDel (&jtree, indexbuf, PJE0);
rspamd_strlcpy (indexbuf, tmpindex, sizeof (indexbuf));
bloom_del (bf, node->h.hash_pipe);
server_stat->fuzzy_hashes_expired ++;
server_stat->fuzzy_hashes --;
g_slice_free1 (sizeof (struct rspamd_fuzzy_node), node);
#endif
continue;
}
if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
msg_err ("cannot write file %s: %s", filename, strerror (errno));
goto end;
}
pvalue = JudySLNext (jtree, indexbuf, PJE0);
}
pvalue = JudySLNext (jtree, indexbuf, PJE0);
rspamd_rwlock_reader_unlock (ctx->tree_lock);
}
}
else {
else {
#endif
cur = frequent->head;
while (cur) {
node = cur->data;
if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
msg_err ("cannot write file %s: %s", filename, strerror (errno));
}
cur = g_list_next (cur);
}
for (i = 0; i < BUCKETS; i++) {
cur = hashes[i]->head;
rspamd_rwlock_reader_lock (ctx->tree_lock);
cur = frequent->head;
while (cur) {
node = cur->data;
if (now - node->time > expire) {
/* Remove expired item */
tmp = cur;
cur = g_list_next (cur);
g_queue_delete_link (hashes[i], tmp);
bloom_del (bf, node->h.hash_pipe);
server_stat->fuzzy_hashes_expired ++;
server_stat->fuzzy_hashes --;
g_slice_free1 (sizeof (struct rspamd_fuzzy_node), node);
continue;
}
if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
msg_err ("cannot write file %s: %s", filename, strerror (errno));
goto end;
if (write (fd, node, sizeof(struct rspamd_fuzzy_node)) == -1) {
msg_err("cannot write file %s: %s", filename, strerror (errno));
}
cur = g_list_next (cur);
}
}
for (i = 0; i < BUCKETS; i++) {
cur = hashes[i]->head;
while (cur) {
node = cur->data;
if (now - node->time > expire) {
#if 0
/* Remove expired item */
tmp = cur;
cur = g_list_next (cur);
g_queue_delete_link (hashes[i], tmp);
bloom_del (bf, node->h.hash_pipe);
server_stat->fuzzy_hashes_expired++;
server_stat->fuzzy_hashes--;
g_slice_free1 (sizeof(struct rspamd_fuzzy_node), node);
#endif
continue;
}
if (write (fd, node, sizeof(struct rspamd_fuzzy_node)) == -1) {
msg_err(
"cannot write file %s: %s", filename, strerror (errno));
goto end;
}
cur = g_list_next (cur);
}
}
rspamd_rwlock_reader_unlock (ctx->tree_lock);
#ifdef WITH_JUDY
}
#endif

end:
(void)unlock_file (fd, FALSE);
close (fd);
(void) unlock_file (fd, FALSE);
close (fd);

rspamd_mutex_unlock (ctx->update_mtx);
if (wanna_die) {
break;
}
}

return NULL;
}

static void
@@ -273,8 +307,13 @@ sigterm_handler (gint fd, short what, void *arg)
event_del (&worker->sig_ev_usr2);
event_del (&worker->bind_ev);
close (worker->cf->listen_sock);

rspamd_mutex_lock (ctx->update_mtx);
mods = ctx->max_mods + 1;
sync_cache (worker);
wanna_die = 1;
g_cond_signal (ctx->update_cond);
rspamd_mutex_unlock (ctx->update_mtx);

(void)event_base_loopexit (ctx->ev_base, &tv);
}

@@ -297,8 +336,11 @@ sigusr2_handler (gint fd, short what, void *arg)
event_del (&worker->bind_ev);
close (worker->cf->listen_sock);
msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
rspamd_mutex_lock (ctx->update_mtx);
mods = ctx->max_mods + 1;
sync_cache (worker);
wanna_die = 1;
g_cond_signal (ctx->update_cond);
rspamd_mutex_unlock (ctx->update_mtx);

event_base_loopexit (ctx->ev_base, &tv);
return;
@@ -457,6 +499,7 @@ check_hash_node (GQueue *hash, fuzzy_hash_t *s, gint update_value, struct rspamd
GList *cur;
struct rspamd_fuzzy_node *h;
gint prob = 0;

#ifdef WITH_JUDY
PPvoid_t pvalue;

@@ -529,7 +572,9 @@ process_check_command (struct fuzzy_cmd *cmd, gint *flag, struct rspamd_fuzzy_st
memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
s.block_size = cmd->blocksize;

rspamd_rwlock_reader_lock (ctx->tree_lock);
h = check_hash_node (hashes[cmd->blocksize % BUCKETS], &s, 0, ctx);
rspamd_rwlock_reader_unlock (ctx->tree_lock);

if (h == NULL) {
return 0;
@@ -544,12 +589,17 @@ static gboolean
update_hash (struct fuzzy_cmd *cmd, struct rspamd_fuzzy_storage_ctx *ctx)
{
fuzzy_hash_t s;
gboolean r;

memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
s.block_size = cmd->blocksize;
mods ++;

return check_hash_node (hashes[cmd->blocksize % BUCKETS], &s, cmd->value, ctx) != NULL;
rspamd_rwlock_writer_lock (ctx->tree_lock);
r = check_hash_node (hashes[cmd->blocksize % BUCKETS], &s, cmd->value, ctx) != NULL;
rspamd_rwlock_writer_unlock (ctx->tree_lock);

return r;
}

static gboolean
@@ -566,6 +616,7 @@ process_write_command (struct fuzzy_cmd *cmd, struct rspamd_fuzzy_storage_ctx *c
}
}

rspamd_rwlock_writer_lock (ctx->tree_lock);
h = g_slice_alloc (sizeof (struct rspamd_fuzzy_node));
memcpy (&h->h.hash_pipe, &cmd->hash, sizeof (cmd->hash));
h->h.block_size = cmd->blocksize;
@@ -584,6 +635,7 @@ process_write_command (struct fuzzy_cmd *cmd, struct rspamd_fuzzy_storage_ctx *c
#ifdef WITH_JUDY
}
#endif
rspamd_rwlock_writer_unlock (ctx->tree_lock);
bloom_add (bf, cmd->hash);
mods++;
server_stat->fuzzy_hashes ++;
@@ -603,6 +655,7 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s, struct rspamd_fuzzy_storage_ctx *ctx
gpointer data;

if (ctx->use_judy) {
rspamd_rwlock_writer_lock (ctx->tree_lock);
pvalue = JudySLGet (jtree, s->hash_pipe, PJE0);
if (pvalue) {
data = *pvalue;
@@ -613,9 +666,11 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s, struct rspamd_fuzzy_storage_ctx *ctx
server_stat->fuzzy_hashes --;
mods++;
}
rspamd_rwlock_writer_unlock (ctx->tree_lock);
}
else {
#endif
rspamd_rwlock_writer_lock (ctx->tree_lock);
cur = hash->head;

/* XXX: too slow way */
@@ -635,6 +690,7 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s, struct rspamd_fuzzy_storage_ctx *ctx
}
cur = g_list_next (cur);
}
rspamd_rwlock_writer_unlock (ctx->tree_lock);
#ifdef WITH_JUDY
}
#endif
@@ -837,7 +893,9 @@ sync_callback (gint fd, short what, void *arg)
tmv.tv_usec = 0;
evtimer_add (&tev, &tmv);

sync_cache (worker);
rspamd_mutex_lock (ctx->update_mtx);
g_cond_signal (ctx->update_cond);
rspamd_mutex_unlock (ctx->update_mtx);
}

static gboolean
@@ -877,6 +935,14 @@ init_fuzzy (void)
ctx->max_mods = DEFAULT_MOD_LIMIT;
ctx->frequent_score = DEFAULT_FREQUENT_SCORE;
ctx->expire = DEFAULT_EXPIRE;
ctx->tree_lock = rspamd_rwlock_new ();
ctx->update_mtx = rspamd_mutex_new ();
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
ctx->update_cond = g_malloc0 (sizeof (GCond));
g_cond_init (ctx->update_cond);
#else
ctx->update_cond = g_cond_new ();
#endif

register_worker_opt (type, "hashfile", xml_handle_string, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, hashfile));
@@ -904,6 +970,7 @@ start_fuzzy (struct rspamd_worker *worker)
struct event sev;
gint retries = 0;
struct rspamd_fuzzy_storage_ctx *ctx = worker->ctx;
GError *err = NULL;

worker->srv->pid = getpid ();

@@ -971,7 +1038,17 @@ start_fuzzy (struct rspamd_worker *worker)

gperf_profiler_init (worker->srv->cfg, "fuzzy");

ctx->update_thread = rspamd_create_thread ("fuzzy update", sync_cache, worker, &err);
if (ctx->update_thread == NULL) {
msg_err ("error creating update thread: %s", err->message);
}

event_base_loop (ctx->ev_base, 0);

if (ctx->update_thread != NULL) {
g_thread_join (ctx->update_thread);
}

close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);
}

+ 10
- 0
src/util.h View File

@@ -315,6 +315,16 @@ void rspamd_rwlock_reader_unlock (rspamd_rwlock_t *mtx);
*/
void rspamd_rwlock_free (rspamd_rwlock_t *mtx);

static inline void
rspamd_cond_wait (GCond *cond, rspamd_mutex_t *mtx)
{
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
g_cond_wait (cond, &mtx->mtx);
#else
g_cond_wait (cond, g_static_mutex_get_mutex (&mtx->mtx));
#endif
}

/**
* Create new named thread
* @param name name pattern

Loading…
Cancel
Save