Browse Source

[Rework] Adopt fuzzy storage for abstract backend

tags/1.4.0
Vsevolod Stakhov 7 years ago
parent
commit
e8abb0bd7c
2 changed files with 259 additions and 212 deletions
  1. 259
    211
      src/fuzzy_storage.c
  2. 0
    1
      src/rspamd.c

+ 259
- 211
src/fuzzy_storage.c View File

@@ -36,15 +36,11 @@
#include "libutil/http_private.h"
#include "unix-std.h"

/* This number is used as expire time in seconds for cache items (2 days) */
#define DEFAULT_EXPIRE 172800L
/* Resync value in seconds */
#define DEFAULT_SYNC_TIMEOUT 60.0
#define DEFAULT_KEYPAIR_CACHE_SIZE 512
#define DEFAULT_MASTER_TIMEOUT 10.0

#define INVALID_NODE_TIME (guint64) - 1

static const gchar *local_db_name = "local";

#define msg_err_fuzzy_update(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
@@ -77,10 +73,6 @@ worker_t fuzzy_worker = {
RSPAMD_WORKER_VER /* Version info */
};

/* For evtimer */
static struct timeval tmv;
static struct event tev;

struct fuzzy_global_stat {
guint64 fuzzy_hashes;
/**< number of fuzzy hashes stored */
@@ -139,7 +131,7 @@ struct rspamd_fuzzy_storage_ctx {
gboolean encrypted_only;
struct rspamd_keypair_cache *keypair_cache;
rspamd_lru_hash_t *errors_ips;
struct rspamd_fuzzy_backend_sqlite *backend;
struct rspamd_fuzzy_backend *backend;
GQueue *updates_pending;
struct rspamd_dns_resolver *resolver;
struct rspamd_config *cfg;
@@ -165,6 +157,7 @@ struct fuzzy_session {
} cmd;

struct rspamd_fuzzy_encrypted_reply reply;
struct fuzzy_key_stat *ip_stat;

enum rspamd_fuzzy_epoch epoch;
enum fuzzy_cmd_type cmd_type;
@@ -191,7 +184,10 @@ struct fuzzy_master_update_session {
const gchar *name;
gchar uid[16];
struct rspamd_http_connection *conn;
struct rspamd_http_message *msg;
struct rspamd_fuzzy_storage_ctx *ctx;
const gchar *src;
gchar *psrc;
rspamd_inet_addr_t *addr;
gboolean replied;
gint sock;
@@ -242,6 +238,14 @@ fuzzy_key_dtor (gpointer p)
g_slice_free1 (sizeof (*key), key);
}

static void
fuzzy_count_callback (guint64 count, void *ud)
{
struct rspamd_fuzzy_storage_ctx *ctx = ud;

ctx->stat.fuzzy_hashes = count;
}

struct fuzzy_slave_connection {
struct rspamd_cryptobox_keypair *local_key;
struct rspamd_cryptobox_pubkey *remote_key;
@@ -266,19 +270,34 @@ fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn)
}
}

struct rspamd_fuzzy_updates_cbdata {
struct rspamd_fuzzy_storage_ctx *ctx;
struct rspamd_http_message *msg;
struct fuzzy_slave_connection *conn;
struct rspamd_fuzzy_mirror *m;
};

static void
fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
struct rspamd_http_message *msg)
fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
{
struct rspamd_fuzzy_updates_cbdata *cbdata = ud;
GList *cur;
struct fuzzy_peer_cmd *io_cmd;
guint32 len;
guint32 rev;
guint32 rev32 = rev64, len;
const gchar *p;
rspamd_fstring_t *reply;
struct fuzzy_slave_connection *conn;
struct rspamd_fuzzy_storage_ctx *ctx;
struct rspamd_http_message *msg;
struct rspamd_fuzzy_mirror *m;
struct timeval tv;

rev = rspamd_fuzzy_backend_sqlite_version (ctx->backend, local_db_name);
rev = GUINT32_TO_LE (rev);
conn = cbdata->conn;
ctx = cbdata->ctx;
msg = cbdata->msg;
m = cbdata->m;
g_slice_free1 (sizeof (*cbdata), cbdata);
rev32 = GUINT32_TO_LE (rev32);
len = sizeof (guint32) * 2; /* revision + last chunk */

for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) {
@@ -295,8 +314,8 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
}

reply = rspamd_fstring_sized_new (len);
reply = rspamd_fstring_append (reply, (const char *)&rev,
sizeof (rev));
reply = rspamd_fstring_append (reply, (const char *)&rev32,
sizeof (rev32));

for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) {
io_cmd = cur->data;
@@ -320,6 +339,30 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
len = 0;
reply = rspamd_fstring_append (reply, (const char *)&len, sizeof (len));
rspamd_http_message_set_body_from_fstring_steal (msg, reply);
double_to_tv (ctx->sync_timeout, &tv);
rspamd_http_connection_write_message (conn->http_conn,
msg, NULL, NULL, conn,
conn->sock,
&tv, ctx->ev_base);
msg_info ("send update request to %s", m->name);
}

static void
fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m,
struct fuzzy_slave_connection *conn,
struct rspamd_fuzzy_storage_ctx *ctx,
struct rspamd_http_message *msg)
{

struct rspamd_fuzzy_updates_cbdata *cbdata;

cbdata = g_slice_alloc (sizeof (*cbdata));
cbdata->ctx = ctx;
cbdata->msg = msg;
cbdata->conn = conn;
cbdata->m = m;
rspamd_fuzzy_backend_version (ctx->backend, local_db_name,
fuzzy_mirror_updates_version_cb, cbdata);
}

static void
@@ -353,7 +396,6 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
{
struct fuzzy_slave_connection *conn;
struct rspamd_http_message *msg;
struct timeval tv;

conn = g_slice_alloc0 (sizeof (*conn));
conn->up = rspamd_upstream_get (m->u,
@@ -389,84 +431,85 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
rspamd_http_connection_set_key (conn->http_conn,
ctx->sync_keypair);
msg->peer_key = rspamd_pubkey_ref (m->key);
double_to_tv (ctx->sync_timeout, &tv);
fuzzy_mirror_updates_to_http (ctx, msg);
fuzzy_mirror_updates_to_http (m, conn, ctx, msg);
}

rspamd_http_connection_write_message (conn->http_conn,
msg, NULL, NULL, conn,
conn->sock,
&tv, ctx->ev_base);
msg_info ("send update request to %s", m->name);
struct rspamd_updates_cbdata {
struct rspamd_fuzzy_storage_ctx *ctx;
const gchar *source;
};

static void
fuzzy_update_version_callback (guint64 ver, void *ud)
{
msg_info ("updated fuzzy storage from %s: version: %d",
(const char *)ud, (gint)ver);
}

static void
rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
const gchar *source)
rspamd_fuzzy_updates_cb (gboolean success, void *ud)
{
GList *cur;
struct fuzzy_peer_cmd *io_cmd;
struct rspamd_fuzzy_cmd *cmd;
gpointer ptr;
struct rspamd_updates_cbdata *cbdata = ud;
struct rspamd_fuzzy_mirror *m;
guint nupdates = 0, i;
struct rspamd_fuzzy_storage_ctx *ctx;
const gchar *source;
GList *cur;
struct fuzzy_peer_cmd *io_cmd;

if (ctx->updates_pending &&
g_queue_get_length (ctx->updates_pending) > 0 &&
rspamd_fuzzy_backend_sqlite_prepare_update (ctx->backend, source)) {
cur = ctx->updates_pending->head;
ctx = cbdata->ctx;
source = cbdata->source;

while (cur) {
io_cmd = cur->data;
if (success) {
rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);

if (io_cmd->is_shingle) {
cmd = &io_cmd->cmd.shingle.basic;
ptr = &io_cmd->cmd.shingle;
}
else {
cmd = &io_cmd->cmd.normal;
ptr = &io_cmd->cmd.normal;
}
if (nupdates > 0) {
for (i = 0; i < ctx->mirrors->len; i ++) {
m = g_ptr_array_index (ctx->mirrors, i);

if (cmd->cmd == FUZZY_WRITE) {
rspamd_fuzzy_backend_sqlite_add (ctx->backend, ptr);
}
else {
rspamd_fuzzy_backend_sqlite_del (ctx->backend, ptr);
rspamd_fuzzy_send_update_mirror (ctx, m);
}
}

/* Clear updates */
cur = ctx->updates_pending->head;

nupdates++;
while (cur) {
io_cmd = cur->data;
g_slice_free1 (sizeof (*io_cmd), io_cmd);
cur = g_list_next (cur);
}

if (rspamd_fuzzy_backend_sqlite_finish_update (ctx->backend, source, nupdates > 0)) {
ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_sqlite_count (ctx->backend);
g_queue_clear (ctx->updates_pending);
rspamd_fuzzy_backend_version (ctx->backend, source,
fuzzy_update_version_callback, (void *)source);

if (nupdates > 0) {
for (i = 0; i < ctx->mirrors->len; i ++) {
m = g_ptr_array_index (ctx->mirrors, i);
}
else {
msg_err ("cannot commit update transaction to fuzzy backend, "
"%ud updates are still pending",
g_queue_get_length (ctx->updates_pending));
}

rspamd_fuzzy_send_update_mirror (ctx, m);
}
}
g_slice_free1 (sizeof (*cbdata), cbdata);
}

/* Clear updates */
cur = ctx->updates_pending->head;
static void
rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
const gchar *source)
{

struct rspamd_updates_cbdata *cbdata;

if (ctx->updates_pending &&
g_queue_get_length (ctx->updates_pending) > 0) {
cbdata = g_slice_alloc (sizeof (*cbdata));
cbdata->ctx = ctx;
cbdata->source = source;
rspamd_fuzzy_backend_process_updates (ctx->backend, ctx->updates_pending,
source, rspamd_fuzzy_updates_cb, cbdata);

while (cur) {
io_cmd = cur->data;
g_slice_free1 (sizeof (*io_cmd), io_cmd);
cur = g_list_next (cur);
}

g_queue_clear (ctx->updates_pending);
msg_info ("updated fuzzy storage: %ud updates processed, version: %d",
nupdates, rspamd_fuzzy_backend_sqlite_version (ctx->backend, source));
}
else {
msg_err ("cannot commit update transaction to fuzzy backend, "
"%ud updates are still pending",
g_queue_get_length (ctx->updates_pending));
}
}
else if (ctx->updates_pending &&
g_queue_get_length (ctx->updates_pending) > 0) {
@@ -598,6 +641,77 @@ rspamd_fuzzy_update_stats (struct rspamd_fuzzy_storage_ctx *ctx,
}
}

static void
rspamd_fuzzy_make_reply (struct rspamd_fuzzy_cmd *cmd,
struct rspamd_fuzzy_reply *result,
struct fuzzy_session *session,
gboolean encrypted, gboolean is_shingle)
{
if (cmd) {
result->tag = cmd->tag;

memcpy (&session->reply.rep, result, sizeof (*result));

rspamd_fuzzy_update_stats (session->ctx,
session->epoch,
result->prob > 0.5,
is_shingle,
session->key_stat,
session->ip_stat,
cmd->cmd,
result->value);

if (encrypted) {
/* We need also to encrypt reply */
ottery_rand_bytes (session->reply.hdr.nonce,
sizeof (session->reply.hdr.nonce));
rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep,
sizeof (session->reply.rep),
session->reply.hdr.nonce,
session->nm,
session->reply.hdr.mac,
RSPAMD_CRYPTOBOX_MODE_25519);
}
}

rspamd_fuzzy_write_reply (session);
}

static void
rspamd_fuzzy_check_callback (struct rspamd_fuzzy_reply *result, void *ud)
{
struct fuzzy_session *session = ud;
gboolean encrypted = FALSE, is_shingle = FALSE;
struct rspamd_fuzzy_cmd *cmd = NULL;
gsize up_len = 0;

switch (session->cmd_type) {
case CMD_NORMAL:
cmd = &session->cmd.normal;
up_len = sizeof (session->cmd.normal);
break;
case CMD_SHINGLE:
cmd = &session->cmd.shingle.basic;
up_len = sizeof (session->cmd.shingle);
is_shingle = TRUE;
break;
case CMD_ENCRYPTED_NORMAL:
cmd = &session->cmd.enc_normal.cmd;
up_len = sizeof (session->cmd.normal);
encrypted = TRUE;
break;
case CMD_ENCRYPTED_SHINGLE:
cmd = &session->cmd.enc_shingle.cmd.basic;
up_len = sizeof (session->cmd.shingle);
encrypted = TRUE;
is_shingle = TRUE;
break;
}

rspamd_fuzzy_make_reply (cmd, result, session, encrypted, is_shingle);
REF_RELEASE (session);
}

static void
rspamd_fuzzy_process_command (struct fuzzy_session *session)
{
@@ -637,14 +751,16 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
if (G_UNLIKELY (cmd == NULL || up_len == 0)) {
result.value = 500;
result.prob = 0.0;
goto reply;
rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
return;
}

if (session->ctx->encrypted_only && !encrypted) {
/* Do not accept unencrypted commands */
result.value = 403;
result.prob = 0.0;
goto reply;
rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
return;
}

if (session->key_stat) {
@@ -657,17 +773,21 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
rspamd_lru_hash_insert (session->key_stat->last_ips,
naddr, ip_stat, -1, 0);
}

session->ip_stat = ip_stat;
}

result.flag = cmd->flag;
if (cmd->cmd == FUZZY_CHECK) {
result = rspamd_fuzzy_backend_sqlite_check (session->ctx->backend, cmd,
session->ctx->expire);
REF_RETAIN (session);
rspamd_fuzzy_backend_check (session->ctx->backend, cmd,
rspamd_fuzzy_check_callback, session);
}
else if (cmd->cmd == FUZZY_STAT) {
result.prob = 1.0;
result.value = 0;
result.flag = rspamd_fuzzy_backend_sqlite_count (session->ctx->backend);
result.flag = session->ctx->stat.fuzzy_hashes;
rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
}
else {
if (rspamd_fuzzy_check_client (session)) {
@@ -703,36 +823,9 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
result.value = 403;
result.prob = 0.0;
}
}

reply:
if (cmd) {
result.tag = cmd->tag;

memcpy (&session->reply.rep, &result, sizeof (result));

rspamd_fuzzy_update_stats (session->ctx,
session->epoch,
result.prob > 0.5,
is_shingle,
session->key_stat,
ip_stat, cmd->cmd,
result.value);

if (encrypted) {
/* We need also to encrypt reply */
ottery_rand_bytes (session->reply.hdr.nonce,
sizeof (session->reply.hdr.nonce));
rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep,
sizeof (session->reply.rep),
session->reply.hdr.nonce,
session->nm,
session->reply.hdr.mac,
RSPAMD_CRYPTOBOX_MODE_25519);
}
rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
}

rspamd_fuzzy_write_reply (session);
}


@@ -916,12 +1009,11 @@ rspamd_fuzzy_cmd_from_wire (guchar *buf, guint buflen, struct fuzzy_session *s)

static void
rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
struct rspamd_http_message *msg)
struct rspamd_http_message *msg, guint our_rev)
{
const guchar *p;
gchar *src = NULL, *psrc;
gsize remain;
gint32 revision, our_rev;
gint32 revision;
guint32 len = 0, cnt = 0;
struct fuzzy_peer_cmd cmd, *pcmd;
enum {
@@ -932,25 +1024,6 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
GList *updates = NULL, *cur;
gpointer flag_ptr;

if (!rspamd_http_message_get_body (msg, NULL) || !msg->url
|| msg->url->len == 0) {
msg_err_fuzzy_update ("empty update message, not processing");

return;
}

/* Detect source from url: /update_v1/<source>, so we look for the last '/' */
remain = msg->url->len;
psrc = rspamd_fstringdup (msg->url);
src = psrc;

while (remain--) {
if (src[remain] == '/') {
src = &src[remain + 1];
break;
}
}

/*
* Message format:
* <uint32_le> - revision
@@ -965,13 +1038,11 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
if (remain > sizeof (gint32) * 2) {
memcpy (&revision, p, sizeof (gint32));
revision = GINT32_TO_LE (revision);
our_rev = rspamd_fuzzy_backend_sqlite_version (session->ctx->backend, src);

if (revision <= our_rev) {
msg_err_fuzzy_update ("remote revision: %d is older than ours: %d, "
"refusing update",
revision, our_rev);
g_free (psrc);

return;
}
@@ -1076,7 +1147,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
cur->data = NULL;
}

rspamd_fuzzy_process_updates_queue (session->ctx, src);
rspamd_fuzzy_process_updates_queue (session->ctx, session->src);
msg_info_fuzzy_update ("processed updates from the master %s, "
"%ud operations processed,"
" revision: %d (local revision: %d)",
@@ -1084,8 +1155,6 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
cnt, revision, our_rev);

err:
g_free (psrc);

if (updates) {
/* We still need to clear queue */
for (cur = updates; cur != NULL; cur = g_list_next (cur)) {
@@ -1119,6 +1188,10 @@ rspamd_fuzzy_mirror_session_destroy (struct fuzzy_master_update_session *session
rspamd_http_connection_unref (session->conn);
rspamd_inet_address_destroy (session->addr);
close (session->sock);

if (session->psrc) {
g_free (session->psrc);
}
g_slice_free1 (sizeof (*session), session);
}
}
@@ -1151,6 +1224,15 @@ rspamd_fuzzy_mirror_send_reply (struct fuzzy_master_update_session *session,
session->ctx->ev_base);
}

static void
rspamd_fuzzy_update_version_callback (guint64 version, void *ud)
{
struct fuzzy_master_update_session *session = ud;

rspamd_fuzzy_mirror_process_update (session, session->msg, version);
rspamd_fuzzy_mirror_send_reply (session, 200, "OK");
}

static gint
rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
@@ -1158,6 +1240,9 @@ rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
struct fuzzy_master_update_session *session = conn->ud;
const struct rspamd_cryptobox_pubkey *rk;
const gchar *err_str = NULL;
gchar *psrc;
const gchar *src = NULL;
gsize remain;

if (session->replied) {
rspamd_fuzzy_mirror_session_destroy (session);
@@ -1189,9 +1274,30 @@ rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
msg_warn_fuzzy_update ("no trusted key specified, accept any update from %s",
rspamd_inet_address_to_string (session->addr));
}
if (!rspamd_http_message_get_body (msg, NULL) || !msg->url
|| msg->url->len == 0) {
msg_err_fuzzy_update ("empty update message, not processing");

rspamd_fuzzy_mirror_process_update (session, msg);
rspamd_fuzzy_mirror_send_reply (session, 200, "OK");
goto end;
}

/* Detect source from url: /update_v1/<source>, so we look for the last '/' */
remain = msg->url->len;
psrc = rspamd_fstringdup (msg->url);
src = psrc;

while (remain--) {
if (src[remain] == '/') {
src = &src[remain + 1];
break;
}
}

session->src = src;
session->psrc = psrc;
session->msg = msg;
rspamd_fuzzy_backend_version (session->ctx->backend, src,
rspamd_fuzzy_update_version_callback, session);

return 0;
}
@@ -1352,38 +1458,6 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
}
}

static void
sync_callback (gint fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
struct rspamd_fuzzy_storage_ctx *ctx;
gdouble next_check;
guint64 old_expired, new_expired;

ctx = worker->ctx;

if (ctx->backend) {
rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
/* Call backend sync */
old_expired = rspamd_fuzzy_backend_sqlite_expired (ctx->backend);
rspamd_fuzzy_backend_sqlite_sync (ctx->backend, ctx->expire, TRUE);
new_expired = rspamd_fuzzy_backend_sqlite_expired (ctx->backend);

if (old_expired < new_expired) {
ctx->stat.fuzzy_hashes_expired += new_expired - old_expired;
}
}

/* Timer event */
event_del (&tev);
evtimer_set (&tev, sync_callback, worker);
event_base_set (ctx->ev_base, &tev);
/* Plan event with jitter */
next_check = rspamd_time_jitter (ctx->sync_timeout, 0);
double_to_tv (next_check, &tmv);
evtimer_add (&tev, &tmv);
}

static gboolean
rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
struct rspamd_worker *worker, gint fd,
@@ -1392,29 +1466,13 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
gpointer ud)
{
struct rspamd_fuzzy_storage_ctx *ctx = ud;
gdouble next_check;
guint64 old_expired, new_expired;
struct rspamd_control_reply rep;

if (ctx->backend) {
rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
/* Call backend sync */
old_expired = rspamd_fuzzy_backend_sqlite_expired (ctx->backend);
rspamd_fuzzy_backend_sqlite_sync (ctx->backend, ctx->expire, TRUE);
new_expired = rspamd_fuzzy_backend_sqlite_expired (ctx->backend);

if (old_expired < new_expired) {
ctx->stat.fuzzy_hashes_expired += new_expired - old_expired;
}
}

rep.reply.fuzzy_sync.status = 0;

/* Handle next timer event */
event_del (&tev);
next_check = rspamd_time_jitter (ctx->sync_timeout, 0);
double_to_tv (next_check, &tmv);
evtimer_add (&tev, &tmv);
if (ctx->backend && worker->index == 0) {
rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
}

if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
msg_err ("cannot write reply to the control socket: %s",
@@ -1439,14 +1497,14 @@ rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,

if (ctx->backend) {
/* Close backend and reopen it one more time */
rspamd_fuzzy_backend_sqlite_close (ctx->backend);
rspamd_fuzzy_backend_close (ctx->backend);
}

memset (&rep, 0, sizeof (rep));
rep.type = RSPAMD_CONTROL_RELOAD;

if ((ctx->backend = rspamd_fuzzy_backend_sqlite_open (ctx->hashfile,
TRUE,
if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base,
worker->cf->options,
&err)) == NULL) {
msg_err ("cannot open backend after reload: %e", err);
g_error_free (err);
@@ -1456,6 +1514,10 @@ rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
rep.reply.reload.status = 0;
}

if (ctx->backend && worker->index == 0) {
rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
}

if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
msg_err ("cannot write reply to the control socket: %s",
strerror (errno));
@@ -1644,7 +1706,7 @@ rspamd_fuzzy_storage_stat (struct rspamd_main *rspamd_main,
rep.reply.fuzzy_stat.status = 0;

memcpy (rep.reply.fuzzy_stat.storage_id,
rspamd_fuzzy_sqlite_backend_id (ctx->backend),
rspamd_fuzzy_backend_id (ctx->backend),
sizeof (rep.reply.fuzzy_stat.storage_id));

obj = rspamd_fuzzy_stat_to_ucl (ctx, TRUE);
@@ -1901,7 +1963,6 @@ init_fuzzy (struct rspamd_config *cfg)
ctx->magic = rspamd_fuzzy_storage_magic;
ctx->sync_timeout = DEFAULT_SYNC_TIMEOUT;
ctx->master_timeout = DEFAULT_MASTER_TIMEOUT;
ctx->expire = DEFAULT_EXPIRE;
ctx->keypair_cache_size = DEFAULT_KEYPAIR_CACHE_SIZE;
ctx->keys = g_hash_table_new_full (fuzzy_kp_hash, fuzzy_kp_equal,
NULL, fuzzy_key_dtor);
@@ -2108,7 +2169,6 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
GList *cur;
struct rspamd_worker_listen_socket *ls;
struct event *accept_events;
gdouble next_check;

ctx->peer_fd = rep_fd;

@@ -2157,14 +2217,6 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
event_base_set (ctx->ev_base, &ctx->peer_ev);
event_add (&ctx->peer_ev, NULL);
ctx->updates_pending = g_queue_new ();

/* Timer event */
evtimer_set (&tev, sync_callback, worker);
event_base_set (ctx->ev_base, &tev);
/* Plan event with jitter */
next_check = rspamd_time_jitter (ctx->sync_timeout, 0);
double_to_tv (next_check, &tmv);
evtimer_add (&tev, &tmv);
}
}

@@ -2188,13 +2240,14 @@ start_fuzzy (struct rspamd_worker *worker)
/*
* Open DB and perform VACUUM
*/
if ((ctx->backend = rspamd_fuzzy_backend_sqlite_open (ctx->hashfile, TRUE, &err)) == NULL) {
if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base,
worker->cf->options, &err)) == NULL) {
msg_err ("cannot open backend: %e", err);
g_error_free (err);
exit (EXIT_SUCCESS);
}

ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_sqlite_count (ctx->backend);
rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);

if (ctx->keypair_cache_size > 0) {
/* Create keypairs cache */
@@ -2202,7 +2255,7 @@ start_fuzzy (struct rspamd_worker *worker)
}

if (worker->index == 0) {
rspamd_fuzzy_backend_sqlite_sync (ctx->backend, ctx->expire, TRUE);
rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
}

if (ctx->mirrors && ctx->mirrors->len != 0) {
@@ -2260,12 +2313,7 @@ start_fuzzy (struct rspamd_worker *worker)
event_base_loop (ctx->ev_base, 0);
rspamd_worker_block_signals ();

if (worker->index == 0) {
rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
rspamd_fuzzy_backend_sqlite_sync (ctx->backend, ctx->expire, TRUE);
}

rspamd_fuzzy_backend_sqlite_close (ctx->backend);
rspamd_fuzzy_backend_close (ctx->backend);
rspamd_log_close (worker->srv->logger);

if (ctx->peer_fd != -1) {

+ 0
- 1
src/rspamd.c View File

@@ -16,7 +16,6 @@
#include "config.h"
#include "rspamd.h"
#include "libutil/map.h"
#include "fuzzy_storage.h"
#include "lua/lua_common.h"
#include "libserver/worker_util.h"
#include "libserver/rspamd_control.h"

Loading…
Cancel
Save