Browse Source

[Feature] Implement fuzzy updates push protocol

tags/1.3.0
Vsevolod Stakhov 8 years ago
parent
commit
7b5cad0989
1 changed files with 172 additions and 2 deletions
  1. 172
    2
      src/fuzzy_storage.c

+ 172
- 2
src/fuzzy_storage.c View File

@@ -225,6 +225,162 @@ fuzzy_key_dtor (gpointer p)
g_slice_free1 (sizeof (*key), key);
}

struct fuzzy_slave_connection {
struct rspamd_cryptobox_keypair *local_key;
struct rspamd_cryptobox_pubkey *remote_key;
struct upstream *up;
struct rspamd_http_connection *http_conn;
struct rspamd_fuzzy_mirror *mirror;
gint sock;
};

static void
fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn)
{
if (conn) {
if (conn->http_conn) {
rspamd_http_connection_reset (conn->http_conn);
rspamd_http_connection_unref (conn->http_conn);
}

close (conn->sock);

g_slice_free1 (sizeof (*conn), conn);
}
}

static void
fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
struct rspamd_http_message *msg)
{
GList *cur;
struct fuzzy_peer_cmd *io_cmd;
gsize len;
guint32 rev;
const gchar *p;

rev = rspamd_fuzzy_backend_version (ctx->backend);
rev = GUINT32_TO_LE (rev);
len = sizeof (guint32) * 2; /* revision + last chunk */

for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) {
io_cmd = cur->data;

if (io_cmd->is_shingle) {
len += sizeof (guint32) + sizeof (gboolean) +
sizeof (struct rspamd_fuzzy_shingle_cmd);
}
else {
len += sizeof (guint32) + sizeof (gboolean) +
sizeof (struct rspamd_fuzzy_cmd);
}
}

msg->body = rspamd_fstring_sized_new (len);
msg->body = rspamd_fstring_append (msg->body, (const char *)&rev,
sizeof (rev));

for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) {
io_cmd = cur->data;

if (io_cmd->is_shingle) {
len = sizeof (gboolean) +
sizeof (struct rspamd_fuzzy_shingle_cmd);
}
else {
len = sizeof (gboolean) +
sizeof (struct rspamd_fuzzy_cmd);
}

p = (const char *)io_cmd;
msg->body = rspamd_fstring_append (msg->body, (const char *)&len,
sizeof (len));
msg->body = rspamd_fstring_append (msg->body, p, len);
}

/* Last chunk */
len = 0;
msg->body = rspamd_fstring_append (msg->body, (const char *)&len,
sizeof (len));
}

static void
fuzzy_mirror_error_handler (struct rspamd_http_connection *conn, GError *err)
{
struct fuzzy_slave_connection *bk_conn = conn->ud;
msg_info ("abnormally closing connection from backend: %s:%s, "
"error: %e",
bk_conn->mirror->name,
rspamd_inet_address_to_string (rspamd_upstream_addr (bk_conn->up)),
err);

fuzzy_mirror_close_connection (bk_conn);
}

static gint
fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
{
struct fuzzy_slave_connection *bk_conn = conn->ud;

msg_info ("finished mirror connection to %s", bk_conn->mirror->name);
fuzzy_mirror_close_connection (bk_conn);

return 0;
}

static void
rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
struct rspamd_fuzzy_mirror *m)
{
struct fuzzy_slave_connection *conn;
struct rspamd_http_message *msg;
struct timeval tv;

conn = g_slice_alloc0 (sizeof (*conn));
conn->up = rspamd_upstream_get (m->u,
RSPAMD_UPSTREAM_MASTER_SLAVE, NULL, 0);
conn->mirror = m;

if (conn->up == NULL) {
msg_err ("cannot select upstream for %s", m->name);
return;
}

conn->sock = rspamd_inet_address_connect (
rspamd_upstream_addr (conn->up),
SOCK_STREAM, TRUE);

if (conn->sock == -1) {
msg_err ("cannot connect upstream for %s", m->name);
rspamd_upstream_fail (conn->up);
return;
}

msg = rspamd_http_new_message (HTTP_REQUEST);
rspamd_printf_fstring (&msg->url, "/update_v1");

conn->http_conn = rspamd_http_connection_new (
NULL,
fuzzy_mirror_error_handler,
fuzzy_mirror_finish_handler,
RSPAMD_HTTP_CLIENT_SIMPLE,
RSPAMD_HTTP_CLIENT,
ctx->keypair_cache);

rspamd_http_connection_set_key (conn->http_conn,
ctx->sync_keypair);
msg->peer_key = rspamd_pubkey_ref (m->key);
double_to_tv (ctx->sync_timeout, &tv);
fuzzy_mirror_updates_to_http (ctx, msg);

rspamd_http_connection_write_message (conn->http_conn,
msg, NULL, NULL, conn,
conn->sock,
&tv, ctx->ev_base);
msg_info ("send update request to %s", m->name);
}

static void
rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
{
@@ -232,13 +388,15 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
struct fuzzy_peer_cmd *io_cmd;
struct rspamd_fuzzy_cmd *cmd;
gpointer ptr;
guint nupdates = 0;
struct rspamd_fuzzy_mirror *m;
guint nupdates = 0, i;
time_t now = time (NULL);

if (ctx->updates_pending &&
g_queue_get_length (ctx->updates_pending) > 0 &&
rspamd_fuzzy_backend_prepare_update (ctx->backend)) {
cur = ctx->updates_pending->head;

while (cur) {
io_cmd = cur->data;

@@ -264,6 +422,14 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)

if (rspamd_fuzzy_backend_finish_update (ctx->backend)) {
ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend);

for (i = 0; i < ctx->mirrors->len; i ++) {
m = g_ptr_array_index (ctx->mirrors, i);

rspamd_fuzzy_send_update_mirror (ctx, m);
}

/* Clear updates */
cur = ctx->updates_pending->head;

while (cur) {
@@ -722,7 +888,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
{
const guchar *p;
gsize remain;
guint32 revision, our_rev, len;
guint32 revision, our_rev, len, cnt = 0;
struct fuzzy_peer_cmd cmd, *pcmd;
enum {
read_len = 0,
@@ -825,6 +991,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
remain -= len;
len = 0;
state = read_len;
cnt ++;
break;
case finish_processing:
/* Do nothing */
@@ -840,6 +1007,8 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
}

rspamd_fuzzy_process_updates_queue (session->ctx);
msg_info ("processed updates from the master, %ud operations processed,"
" revision: %ud", cnt, revision);

err:
if (updates) {
@@ -1595,6 +1764,7 @@ init_fuzzy (struct rspamd_config *cfg)
(GDestroyNotify) rspamd_inet_address_destroy, g_free,
rspamd_inet_address_hash, rspamd_inet_address_equal);
ctx->cfg = cfg;
ctx->mirrors = g_ptr_array_new ();

rspamd_rcl_register_worker_option (cfg,
type,

Loading…
Cancel
Save