Explorar el Código

Add lmdb initialization.

tags/0.8.0
Vsevolod Stakhov hace 9 años
padre
commit
e10dd0b3e9
Se han modificado 2 ficheros con 144 adiciones y 79 borrados
  1. 1
    0
      src/CMakeLists.txt
  2. 143
    79
      src/fuzzy_storage.c

+ 1
- 0
src/CMakeLists.txt Ver fichero

@@ -109,6 +109,7 @@ ENDIF(HAVE_LIBEVENT2)
IF(WITH_DB)
TARGET_LINK_LIBRARIES(rspamd db)
ENDIF(WITH_DB)
TARGET_LINK_LIBRARIES(rspamd lmdb)

IF(OPENSSL_FOUND)
TARGET_LINK_LIBRARIES(rspamd ${OPENSSL_LIBRARIES})

+ 143
- 79
src/fuzzy_storage.c Ver fichero

@@ -101,6 +101,9 @@ struct rspamd_fuzzy_storage_ctx {
rspamd_mutex_t *update_mtx;
GCond *update_cond;
GThread *update_thread;

/* lmdb interface */
MDB_env *env;
};

struct rspamd_legacy_fuzzy_node {
@@ -122,6 +125,12 @@ struct fuzzy_session {

extern sig_atomic_t wanna_die;

static GQuark
rspamd_fuzzy_quark(void)
{
return g_quark_from_static_string ("fuzzy-storage");
}

static void
legacy_fuzzy_node_free (gpointer n)
{
@@ -156,7 +165,7 @@ legacy_expire_nodes (gpointer *to_expire, gint expired_num,
}

static gpointer
legacy_sync_cache (gpointer ud)
rspamd_fuzzy_storage_sync_cb (gpointer ud)
{
static const int max_expired = 8192;
struct rspamd_worker *wrk = ud;
@@ -170,7 +179,7 @@ legacy_sync_cache (gpointer ud)

ctx = wrk->ctx;

for (;; ) {
for (;;) {

rspamd_mutex_lock (ctx->update_mtx);

@@ -180,79 +189,86 @@ legacy_sync_cache (gpointer ud)
}

msg_info ("syncing fuzzy hash storage");
filename = ctx->hashfile;
if (filename == NULL ) {
rspamd_mutex_unlock (ctx->update_mtx);
if (wanna_die) {
return NULL;
if (ctx->legacy) {
filename = ctx->hashfile;
if (filename == NULL ) {
rspamd_mutex_unlock (ctx->update_mtx);
if (wanna_die) {
return NULL;
}
continue;
}
continue;
}
expire = ctx->expire;
expire = ctx->expire;

if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT,
S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) {
msg_err (
"cannot create hash file %s: %s", filename, strerror (errno));
rspamd_mutex_unlock (ctx->update_mtx);
if (wanna_die) {
return NULL;
if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT,
S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) {
msg_err (
"cannot create hash file %s: %s", filename,
strerror (errno));
rspamd_mutex_unlock (ctx->update_mtx);
if (wanna_die) {
return NULL;
}
continue;
}
continue;
}

(void) rspamd_file_lock (fd, FALSE);
(void) rspamd_file_lock (fd, FALSE);

now = (guint64) time (NULL );
now = (guint64) time (NULL );

/* Fill header */
memcpy (header, FUZZY_FILE_MAGIC, 3);
header[3] = (gchar) CURRENT_FUZZY_VERSION;
if (write (fd, header, sizeof(header)) == -1) {
msg_err (
"cannot write file %s while writing header: %s",
filename,
strerror (errno));
goto end;
}

rspamd_rwlock_reader_lock (ctx->tree_lock);
g_hash_table_iter_init (&iter, static_hash);
/* Fill header */
memcpy (header, FUZZY_FILE_MAGIC, 3);
header[3] = (gchar) CURRENT_FUZZY_VERSION;
if (write (fd, header, sizeof(header)) == -1) {
msg_err (
"cannot write file %s while writing header: %s",
filename,
strerror (errno));
goto end;
}

while (g_hash_table_iter_next (&iter, NULL, (void **)&node)) {
if (node->time == INVALID_NODE_TIME || now - node->time >
expire) {
if (nodes_expired == NULL) {
nodes_expired = g_malloc (
max_expired * sizeof (gpointer));
rspamd_rwlock_reader_lock (ctx->tree_lock);
g_hash_table_iter_init (&iter, static_hash);

while (g_hash_table_iter_next (&iter, NULL, (void **)&node)) {
if (node->time == INVALID_NODE_TIME ||
now - node->time > expire) {
if (nodes_expired == NULL) {
nodes_expired = g_malloc (
max_expired * sizeof (gpointer));
}

if (expired_num < max_expired) {
nodes_expired[expired_num++] = node;
}
continue;
}

if (expired_num < max_expired) {
nodes_expired[expired_num++] = node;
if (write (fd, node, sizeof (struct rspamd_legacy_fuzzy_node))
== -1) {
msg_err ("cannot write file %s: %s", filename,
strerror (errno));
goto end;
}
continue;
}
if (write (fd, node, sizeof (struct rspamd_legacy_fuzzy_node)) == -1) {
msg_err ("cannot write file %s: %s", filename,
strerror (errno));
goto end;
}
}
rspamd_rwlock_reader_unlock (ctx->tree_lock);
rspamd_rwlock_reader_unlock (ctx->tree_lock);

/* Now try to expire some nodes */
if (expired_num > 0) {
rspamd_rwlock_writer_lock (ctx->tree_lock);
legacy_expire_nodes (nodes_expired, expired_num, ctx);
rspamd_rwlock_writer_unlock (ctx->tree_lock);
/* Now try to expire some nodes */
if (expired_num > 0) {
rspamd_rwlock_writer_lock (ctx->tree_lock);
legacy_expire_nodes (nodes_expired, expired_num, ctx);
rspamd_rwlock_writer_unlock (ctx->tree_lock);
}
mods = 0;
end:
if (nodes_expired != NULL) {
g_free (nodes_expired);
}
(void) rspamd_file_unlock (fd, FALSE);
close (fd);
}
mods = 0;
end:
if (nodes_expired != NULL) {
g_free (nodes_expired);
else {
mdb_env_sync (ctx->env, 0);
}
(void) rspamd_file_unlock (fd, FALSE);
close (fd);

rspamd_mutex_unlock (ctx->update_mtx);
if (wanna_die) {
@@ -651,6 +667,38 @@ legacy_fuzzy_cmd (struct fuzzy_session *session)

#undef LEGACY_CMD_PROCESS

/*
* MDB Interface
*/

static gboolean
rspamd_fuzzy_storage_open_db (struct rspamd_fuzzy_storage_ctx *ctx, GError **err)
{
gchar *dir;
gint rc;

if (ctx->hashfile == NULL) {
g_set_error (err, rspamd_fuzzy_quark(), 500, "Cannot work without file");
return FALSE;
}

dir = g_path_get_dirname (ctx->hashfile);
if (dir == NULL || access (dir, W_OK) == -1) {
g_set_error (err, rspamd_fuzzy_quark(), errno, "Cannot access directory: %s",
strerror (errno));
return FALSE;
}

mdb_env_create (&ctx->env);

if ((rc = mdb_env_open (ctx->env, dir, MDB_NOSYNC, 0600)) != 0) {
g_set_error (err, rspamd_fuzzy_quark(), errno, "Cannot open mdb_env: %s",
mdb_strerror (rc));
return FALSE;
}

return TRUE;
}

/*
* Accept new connection and construct task
@@ -744,16 +792,7 @@ init_fuzzy (struct rspamd_config *cfg)
ctx = g_malloc0 (sizeof (struct rspamd_fuzzy_storage_ctx));

ctx->max_mods = DEFAULT_MOD_LIMIT;
ctx->frequent_score = DEFAULT_FREQUENT_SCORE;
ctx->expire = DEFAULT_EXPIRE;
ctx->tree_lock = rspamd_rwlock_new ();
ctx->update_mtx = rspamd_mutex_new ();
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
ctx->update_cond = g_malloc0 (sizeof (GCond));
g_cond_init (ctx->update_cond);
#else
ctx->update_cond = g_cond_new ();
#endif

rspamd_rcl_register_worker_option (cfg, type, "hashfile",
rspamd_rcl_parse_struct_string, ctx,
@@ -771,6 +810,10 @@ init_fuzzy (struct rspamd_config *cfg)
rspamd_rcl_parse_struct_string, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, hashfile), 0);

rspamd_rcl_register_worker_option (cfg, type, "legacy",
rspamd_rcl_parse_struct_boolean, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, legacy), 0);

/* Legacy options */
rspamd_rcl_register_worker_option (cfg, type, "max_mods",
rspamd_rcl_parse_struct_integer, ctx,
@@ -827,15 +870,32 @@ start_fuzzy (struct rspamd_worker *worker)
sigh->post_handler = sigterm_handler;
sigh->handler_data = worker;

static_hash = g_hash_table_new_full (rspamd_fuzzy_hash, rspamd_fuzzy_equal,
if (ctx->legacy) {
ctx->tree_lock = rspamd_rwlock_new ();
ctx->update_mtx = rspamd_mutex_new ();
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
ctx->update_cond = g_malloc0 (sizeof (GCond));
g_cond_init (ctx->update_cond);
#else
ctx->update_cond = g_cond_new ();
#endif
static_hash = g_hash_table_new_full (rspamd_fuzzy_hash, rspamd_fuzzy_equal,
NULL, legacy_fuzzy_node_free);

/* Init bloom filter */
bf = rspamd_bloom_create (2000000L, RSPAMD_DEFAULT_BLOOM_HASHES);
/* Try to read hashes from file */
if (!legacy_read_db (worker)) {
msg_err (
"cannot read hashes file, it can be created after save procedure");
/* Init bloom filter */
bf = rspamd_bloom_create (2000000L, RSPAMD_DEFAULT_BLOOM_HASHES);
/* Try to read hashes from file */
if (!legacy_read_db (worker)) {
msg_err (
"cannot read hashes file, it can be created after save procedure");
}
}
else {
if (!rspamd_fuzzy_storage_open_db (ctx, &err)) {
msg_err (err->message);
g_error_free (err);
exit (EXIT_FAILURE);
}
}

/* Timer event */
@@ -863,7 +923,7 @@ start_fuzzy (struct rspamd_worker *worker)
rspamd_map_watch (worker->srv->cfg, ctx->ev_base);

ctx->update_thread = rspamd_create_thread ("fuzzy update",
legacy_sync_cache,
rspamd_fuzzy_storage_sync_cb,
worker,
&err);
if (ctx->update_thread == NULL) {
@@ -876,6 +936,10 @@ start_fuzzy (struct rspamd_worker *worker)
g_thread_join (ctx->update_thread);
}

if (!ctx->legacy) {
mdb_env_close (ctx->env);
}

rspamd_log_close (rspamd_main->logger);
exit (EXIT_SUCCESS);
}

Cargando…
Cancelar
Guardar