summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-08-20 19:13:45 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-08-20 19:13:45 +0400
commitca72f2d5e8e90d6dfb61fe407e8b532894a885ac (patch)
treedb8af6c2c224aaac910eafde4e9e45c81f96853e /src
parent8b8b79dd4f5bb0e8fd7b5d6f5fcd0a8050502862 (diff)
downloadrspamd-ca72f2d5e8e90d6dfb61fe407e8b532894a885ac.tar.gz
rspamd-ca72f2d5e8e90d6dfb61fe407e8b532894a885ac.zip
* Fix process dispatcher
* Use bloom filter in fuzzy storage
Diffstat (limited to 'src')
-rw-r--r--src/fuzzy_storage.c21
-rw-r--r--src/main.c10
-rw-r--r--src/main.h5
3 files changed, 29 insertions, 7 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index 01e7e8f44..97c33eed8 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -36,6 +36,7 @@
#include "modules.h"
#include "message.h"
#include "fuzzy.h"
+#include "bloom.h"
#include "fuzzy_storage.h"
/* This number is used as limit while comparing two fuzzy hashes, this value can vary from 0 to 100 */
@@ -50,6 +51,7 @@
#define BUCKETS 1024
static GQueue *hashes[BUCKETS];
+static bloom_filter_t *bf;
/* Number of cache modifications */
static uint32_t mods = 0;
@@ -121,6 +123,7 @@ sync_cache (struct rspamd_worker *wrk)
tmp = cur;
cur = g_list_next (cur);
g_queue_delete_link (hashes[i], tmp);
+ bloom_del (bf, node->h.hash_pipe);
g_free (node);
continue;
}
@@ -198,6 +201,7 @@ read_hashes_file (struct rspamd_worker *wrk)
break;
}
g_queue_push_head (hashes[node->h.block_size % BUCKETS], node);
+ bloom_add (bf, node->h.hash_pipe);
}
if (r > 0) {
@@ -229,6 +233,10 @@ process_check_command (struct fuzzy_cmd *cmd)
fuzzy_hash_t s;
int prob = 0;
+ if (!bloom_check (bf, cmd->hash)) {
+ return FALSE;
+ }
+
memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
s.block_size = cmd->blocksize;
cur = hashes[cmd->blocksize % BUCKETS]->head;
@@ -252,11 +260,16 @@ process_write_command (struct fuzzy_cmd *cmd)
{
struct rspamd_fuzzy_node *h;
+ if (bloom_check (bf, cmd->hash)) {
+ return FALSE;
+ }
+
h = g_malloc (sizeof (struct rspamd_fuzzy_node));
memcpy (&h->h.hash_pipe, &cmd->hash, sizeof (cmd->hash));
h->h.block_size = cmd->blocksize;
h->time = (uint64_t)time (NULL);
g_queue_push_head (hashes[cmd->blocksize % BUCKETS], h);
+ bloom_add (bf, cmd->hash);
mods ++;
msg_info ("process_write_command: fuzzy hash was successfully added");
@@ -270,6 +283,10 @@ process_delete_command (struct fuzzy_cmd *cmd)
struct rspamd_fuzzy_node *h;
fuzzy_hash_t s;
gboolean res = FALSE;
+
+ if (!bloom_check (bf, cmd->hash)) {
+ return FALSE;
+ }
memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
s.block_size = cmd->blocksize;
@@ -283,6 +300,7 @@ process_delete_command (struct fuzzy_cmd *cmd)
tmp = cur;
cur = g_list_next (cur);
g_queue_delete_link (hashes[cmd->blocksize % BUCKETS], tmp);
+ bloom_del (bf, cmd->hash);
msg_info ("process_delete_command: fuzzy hash was successfully deleted");
res = TRUE;
mods ++;
@@ -443,7 +461,8 @@ start_fuzzy_storage (struct rspamd_worker *worker)
/* Send SIGUSR2 to parent */
kill (getppid (), SIGUSR2);
-
+ /* Init bloom filter */
+ bf = bloom_create (20000000L, DEFAULT_BLOOM_HASHES);
/* Try to read hashes from file */
if (!read_hashes_file (worker)) {
msg_err ("read_hashes_file: cannot read hashes file, it can be created after save procedure");
diff --git a/src/main.c b/src/main.c
index 40c4c68f8..4353174f8 100644
--- a/src/main.c
+++ b/src/main.c
@@ -287,6 +287,7 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
cur->type = cf->type;
cur->pid = fork();
cur->cf = cf;
+ cur->pending = FALSE;
switch (cur->pid) {
case 0:
/* Drop privilleges */
@@ -717,7 +718,7 @@ main (int argc, char **argv, char **env)
else {
if (WIFSIGNALED (res)) {
msg_warn ("main: %s process %d terminated abnormally by signal: %d",
- (cur->type != TYPE_WORKER) ? "controller" : "worker",
+ (cur->type == TYPE_CONTROLLER) ? "controller" : "worker",
cur->pid, WTERMSIG(res));
}
else {
@@ -735,13 +736,15 @@ main (int argc, char **argv, char **env)
do_restart = 0;
do_reopen_log = 1;
- msg_info ("main: rspamd "RVERSION " is restarting");
+ msg_info ("main: rspamd " RVERSION " is restarting");
if (active_worker == NULL) {
/* reread_config (rspamd); */
TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
if (cur->type == TYPE_WORKER || cur->type == TYPE_LMTP || cur->type == TYPE_FUZZY) {
/* Start new workers that would reread configuration */
+ cur->pending = FALSE;
active_worker = fork_worker (rspamd, cur->cf);
+ active_worker->pending = TRUE;
}
/* Immideately send termination request to conroller and wait for SIGCHLD */
if (cur->type == TYPE_CONTROLLER) {
@@ -758,13 +761,12 @@ main (int argc, char **argv, char **env)
if (active_worker != NULL) {
msg_info ("main: worker process %d has been successfully started", active_worker->pid);
TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
- if (cur != active_worker && !cur->is_dying && cur->type != TYPE_CONTROLLER) {
+ if (!cur->pending && !cur->is_dying && cur->type != TYPE_CONTROLLER) {
/* Send to old workers SIGUSR2 */
kill (cur->pid, SIGUSR2);
cur->is_dying = 1;
}
}
- active_worker = NULL;
}
}
if (got_alarm) {
diff --git a/src/main.h b/src/main.h
index ee34a2cae..ef9a29537 100644
--- a/src/main.h
+++ b/src/main.h
@@ -56,8 +56,9 @@ enum process_type {
*/
struct rspamd_worker {
pid_t pid; /**< pid of worker */
- char is_initialized; /**< is initialized */
- char is_dying; /**< if worker is going to shutdown */
+ gboolean is_initialized; /**< is initialized */
+ gboolean is_dying; /**< if worker is going to shutdown */
+ gboolean pending; /**< if worker is pending to run */
struct rspamd_main *srv; /**< pointer to server structure */
enum process_type type; /**< process type */
struct event sig_ev; /**< signals event */