Browse Source

[Minor] Fix periodic updates

tags/1.4.0
Vsevolod Stakhov 7 years ago
parent
commit
99c25fad76
3 changed files with 74 additions and 27 deletions
  1. 23
    5
      src/fuzzy_storage.c
  2. 46
    20
      src/libserver/fuzzy_backend.c
  3. 5
    2
      src/libserver/fuzzy_backend.h

+ 23
- 5
src/fuzzy_storage.c View File

@@ -451,7 +451,7 @@ rspamd_fuzzy_updates_cb (gboolean success, void *ud)
{
struct rspamd_updates_cbdata *cbdata = ud;
struct rspamd_fuzzy_mirror *m;
guint nupdates = 0, i;
guint i;
struct rspamd_fuzzy_storage_ctx *ctx;
const gchar *source;
GList *cur;
@@ -463,7 +463,7 @@ rspamd_fuzzy_updates_cb (gboolean success, void *ud)
if (success) {
rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);

if (nupdates > 0) {
if (g_queue_get_length (ctx->updates_pending) > 0) {
for (i = 0; i < ctx->mirrors->len; i ++) {
m = g_ptr_array_index (ctx->mirrors, i);

@@ -1458,6 +1458,20 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
}
}

static gboolean
rspamd_fuzzy_storage_periodic_callback (void *ud)
{
struct rspamd_fuzzy_storage_ctx *ctx = ud;

if (g_queue_get_length (ctx->updates_pending) > 0) {
rspamd_fuzzy_process_updates_queue (ctx, local_db_name);

return TRUE;
}

return FALSE;
}

static gboolean
rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
struct rspamd_worker *worker, gint fd,
@@ -1471,7 +1485,9 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
rep.reply.fuzzy_sync.status = 0;

if (ctx->backend && worker->index == 0) {
rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
rspamd_fuzzy_storage_periodic_callback, ctx);
}

if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
@@ -1515,7 +1531,8 @@ rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
}

if (ctx->backend && worker->index == 0) {
rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
rspamd_fuzzy_storage_periodic_callback, ctx);
}

if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
@@ -2255,7 +2272,8 @@ start_fuzzy (struct rspamd_worker *worker)
}

if (worker->index == 0) {
rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
rspamd_fuzzy_storage_periodic_callback, ctx);
}

if (ctx->mirrors && ctx->mirrors->len != 0) {

+ 46
- 20
src/libserver/fuzzy_backend.c View File

@@ -68,7 +68,7 @@ struct rspamd_fuzzy_backend_subr {
rspamd_fuzzy_version_cb cb, void *ud,
void *subr_ud);
const gchar* (*id) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
void (*expire) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
void (*periodic) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
void (*close) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
};

@@ -80,7 +80,7 @@ static const struct rspamd_fuzzy_backend_subr fuzzy_subrs[] = {
.count = rspamd_fuzzy_backend_count_sqlite,
.version = rspamd_fuzzy_backend_version_sqlite,
.id = rspamd_fuzzy_backend_id_sqlite,
.expire = rspamd_fuzzy_backend_expire_sqlite,
.periodic = rspamd_fuzzy_backend_expire_sqlite,
.close = rspamd_fuzzy_backend_close_sqlite,
}
};
@@ -90,9 +90,11 @@ struct rspamd_fuzzy_backend {
gdouble expire;
gdouble sync;
struct event_base *ev_base;
rspamd_fuzzy_periodic_cb periodic_cb;
void *periodic_ud;
const struct rspamd_fuzzy_backend_subr *subr;
void *subr_ud;
struct event expire_event;
struct event periodic_event;
};

static GQuark
@@ -349,8 +351,25 @@ rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *bk)
return NULL;
}

static inline void
rspamd_fuzzy_backend_periodic_sync (struct rspamd_fuzzy_backend *bk)
{
if (bk->periodic_cb) {
if (bk->periodic_cb (bk->periodic_ud)) {
if (bk->subr->periodic) {
bk->subr->periodic (bk, bk->subr_ud);
}
}
}
else {
if (bk->subr->periodic) {
bk->subr->periodic (bk, bk->subr_ud);
}
}
}

static void
rspamd_fuzzy_backend_expire_cb (gint fd, short what, void *ud)
rspamd_fuzzy_backend_periodic_cb (gint fd, short what, void *ud)
{
struct rspamd_fuzzy_backend *bk = ud;
gdouble jittered;
@@ -358,33 +377,40 @@ rspamd_fuzzy_backend_expire_cb (gint fd, short what, void *ud)

jittered = rspamd_time_jitter (bk->sync, bk->sync / 2.0);
double_to_tv (jittered, &tv);
event_del (&bk->expire_event);
bk->subr->expire (bk, bk->subr_ud);
event_add (&bk->expire_event, &tv);
event_del (&bk->periodic_event);
rspamd_fuzzy_backend_periodic_sync (bk);
event_add (&bk->periodic_event, &tv);
}

void
rspamd_fuzzy_backend_start_expire (struct rspamd_fuzzy_backend *bk,
gdouble timeout)
rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *bk,
gdouble timeout,
rspamd_fuzzy_periodic_cb cb,
void *ud)
{
gdouble jittered;
struct timeval tv;

g_assert (bk != NULL);

if (bk->subr->expire) {
if (bk->subr->periodic) {
if (bk->sync > 0.0) {
event_del (&bk->expire_event);
event_del (&bk->periodic_event);
}

if (cb) {
bk->periodic_cb = cb;
bk->periodic_ud = ud;
}

bk->subr->expire (bk, bk->subr_ud);
rspamd_fuzzy_backend_periodic_sync (bk);
bk->sync = timeout;
jittered = rspamd_time_jitter (timeout, timeout / 2.0);
double_to_tv (jittered, &tv);
event_set (&bk->expire_event, -1, EV_TIMEOUT,
rspamd_fuzzy_backend_expire_cb, bk);
event_base_set (bk->ev_base, &bk->expire_event);
event_add (&bk->expire_event, &tv);
event_set (&bk->periodic_event, -1, EV_TIMEOUT,
rspamd_fuzzy_backend_periodic_cb, bk);
event_base_set (bk->ev_base, &bk->periodic_event);
event_add (&bk->periodic_event, &tv);
}
}

@@ -393,12 +419,12 @@ rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *bk)
{
g_assert (bk != NULL);

bk->subr->close (bk, bk->subr_ud);

if (bk->sync > 0.0) {
bk->subr->expire (bk, bk->subr_ud);
event_del (&bk->expire_event);
rspamd_fuzzy_backend_periodic_sync (bk);
event_del (&bk->periodic_event);
}

bk->subr->close (bk, bk->subr_ud);

g_slice_free1 (sizeof (*bk), bk);
}

+ 5
- 2
src/libserver/fuzzy_backend.h View File

@@ -29,6 +29,7 @@ typedef void (*rspamd_fuzzy_check_cb) (struct rspamd_fuzzy_reply *rep, void *ud)
typedef void (*rspamd_fuzzy_update_cb) (gboolean success, void *ud);
typedef void (*rspamd_fuzzy_version_cb) (guint64 rev, void *ud);
typedef void (*rspamd_fuzzy_count_cb) (guint64 count, void *ud);
typedef gboolean (*rspamd_fuzzy_periodic_cb) (void *ud);

/**
* Open fuzzy backend
@@ -92,8 +93,10 @@ const gchar * rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend);
* Starts expire process for the backend
* @param backend
*/
void rspamd_fuzzy_backend_start_expire (struct rspamd_fuzzy_backend *backend,
gdouble timeout);
void rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *backend,
gdouble timeout,
rspamd_fuzzy_periodic_cb cb,
void *ud);

/**
* Closes backend

Loading…
Cancel
Save