|
|
@@ -149,6 +149,7 @@ struct fuzzy_session { |
|
|
|
}; |
|
|
|
|
|
|
|
struct fuzzy_peer_cmd { |
|
|
|
gboolean is_shingle; |
|
|
|
union { |
|
|
|
struct rspamd_fuzzy_cmd normal; |
|
|
|
struct rspamd_fuzzy_shingle_cmd shingle; |
|
|
@@ -210,19 +211,30 @@ static void |
|
|
|
rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx) |
|
|
|
{ |
|
|
|
GList *cur; |
|
|
|
struct fuzzy_peer_cmd *cmd; |
|
|
|
struct fuzzy_peer_cmd *io_cmd; |
|
|
|
struct rspamd_fuzzy_cmd *cmd; |
|
|
|
gpointer ptr; |
|
|
|
guint nupdates = 0; |
|
|
|
|
|
|
|
if (rspamd_fuzzy_backend_prepare_update (ctx->backend)) { |
|
|
|
cur = ctx->updates_pending->head; |
|
|
|
while (cur) { |
|
|
|
cmd = cur->data; |
|
|
|
io_cmd = cur->data; |
|
|
|
|
|
|
|
if (io_cmd->is_shingle) { |
|
|
|
cmd = &io_cmd->cmd.shingle.basic; |
|
|
|
ptr = &io_cmd->cmd.shingle; |
|
|
|
} |
|
|
|
else { |
|
|
|
cmd = &io_cmd->cmd.normal; |
|
|
|
ptr = &io_cmd->cmd.normal; |
|
|
|
} |
|
|
|
|
|
|
|
if (cmd->cmd.normal.cmd == FUZZY_WRITE) { |
|
|
|
rspamd_fuzzy_backend_add (ctx->backend, &cmd->cmd.normal); |
|
|
|
if (cmd->cmd == FUZZY_WRITE) { |
|
|
|
rspamd_fuzzy_backend_add (ctx->backend, ptr); |
|
|
|
} |
|
|
|
else { |
|
|
|
rspamd_fuzzy_backend_del (ctx->backend, &cmd->cmd.normal); |
|
|
|
rspamd_fuzzy_backend_del (ctx->backend, ptr); |
|
|
|
} |
|
|
|
|
|
|
|
nupdates++; |
|
|
@@ -234,8 +246,8 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx) |
|
|
|
cur = ctx->updates_pending->head; |
|
|
|
|
|
|
|
while (cur) { |
|
|
|
cmd = cur->data; |
|
|
|
g_slice_free1 (sizeof (*cmd), cmd); |
|
|
|
io_cmd = cur->data; |
|
|
|
g_slice_free1 (sizeof (*io_cmd), io_cmd); |
|
|
|
cur = g_list_next (cur); |
|
|
|
} |
|
|
|
|
|
|
@@ -387,6 +399,7 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) |
|
|
|
struct fuzzy_peer_request *up_req; |
|
|
|
struct fuzzy_key_stat *ip_stat = NULL; |
|
|
|
rspamd_inet_addr_t *naddr; |
|
|
|
gpointer ptr; |
|
|
|
gsize up_len; |
|
|
|
|
|
|
|
switch (session->cmd_type) { |
|
|
@@ -441,14 +454,22 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) |
|
|
|
|
|
|
|
if (session->worker->index == 0 || session->ctx->peer_fd == -1) { |
|
|
|
/* Just add to the queue */ |
|
|
|
up_cmd = g_slice_alloc (sizeof (*up_cmd)); |
|
|
|
memcpy (up_cmd, cmd, up_len); |
|
|
|
up_cmd = g_slice_alloc0 (sizeof (*up_cmd)); |
|
|
|
up_cmd->is_shingle = is_shingle; |
|
|
|
ptr = is_shingle ? |
|
|
|
(gpointer)&up_cmd->cmd.shingle : |
|
|
|
(gpointer)&up_cmd->cmd.normal; |
|
|
|
memcpy (ptr, cmd, up_len); |
|
|
|
g_queue_push_tail (session->ctx->updates_pending, up_cmd); |
|
|
|
} |
|
|
|
else { |
|
|
|
/* We need to send request to the peer */ |
|
|
|
up_req = g_slice_alloc (sizeof (*up_req)); |
|
|
|
memcpy (&up_req->cmd, cmd, up_len); |
|
|
|
up_req = g_slice_alloc0 (sizeof (*up_req)); |
|
|
|
up_req->cmd.is_shingle = is_shingle; |
|
|
|
ptr = is_shingle ? |
|
|
|
(gpointer)&up_req->cmd.cmd.shingle : |
|
|
|
(gpointer)&up_req->cmd.cmd.normal; |
|
|
|
memcpy (ptr, cmd, up_len); |
|
|
|
event_set (&up_req->io_ev, session->ctx->peer_fd, EV_WRITE, |
|
|
|
fuzzy_peer_send_io, up_req); |
|
|
|
event_base_set (session->ctx->ev_base, &up_req->io_ev); |