io_cmd = cur->data;
if (io_cmd->is_shingle) {
- len += sizeof (guint32) + sizeof (gboolean) +
+ len += sizeof (guint32) + sizeof (guint32) +
sizeof (struct rspamd_fuzzy_shingle_cmd);
}
else {
- len += sizeof (guint32) + sizeof (gboolean) +
+ len += sizeof (guint32) + sizeof (guint32) +
sizeof (struct rspamd_fuzzy_cmd);
}
}
io_cmd = cur->data;
if (io_cmd->is_shingle) {
- len = sizeof (gboolean) +
+ len = sizeof (guint32) +
sizeof (struct rspamd_fuzzy_shingle_cmd);
}
else {
- len = sizeof (gboolean) +
+ len = sizeof (guint32) +
sizeof (struct rspamd_fuzzy_cmd);
}
struct rspamd_updates_cbdata {
struct rspamd_fuzzy_storage_ctx *ctx;
- const gchar *source;
+ gchar *source;
};
static void
{
msg_info ("updated fuzzy storage from %s: version: %d",
(const char *)ud, (gint)ver);
+ g_free (ud);
}
static void
g_queue_clear (ctx->updates_pending);
rspamd_fuzzy_backend_version (ctx->backend, source,
- fuzzy_update_version_callback, (void *)source);
+ fuzzy_update_version_callback, g_strdup (source));
ctx->updates_failed = 0;
}
else {
event_base_loopexit (ctx->ev_base, &tv);
}
+ g_free (cbdata->source);
g_slice_free1 (sizeof (*cbdata), cbdata);
}
static void
rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
- const gchar *source)
+ const gchar *source, gboolean forced)
{
struct rspamd_updates_cbdata *cbdata;
if (ctx->updates_pending &&
- g_queue_get_length (ctx->updates_pending) > 0) {
+ (forced || g_queue_get_length (ctx->updates_pending) > 0)) {
cbdata = g_slice_alloc (sizeof (*cbdata));
cbdata->ctx = ctx;
- cbdata->source = source;
+ cbdata->source = g_strdup (source);
rspamd_fuzzy_backend_process_updates (ctx->backend, ctx->updates_pending,
source, rspamd_fuzzy_updates_cb, cbdata);
}
*/
p = rspamd_http_message_get_body (msg, &remain);
- if (p && remain > sizeof (gint32) * 2) {
+ if (p && remain >= sizeof (gint32) * 2) {
memcpy (&revision, p, sizeof (gint32));
revision = GINT32_TO_LE (revision);
return;
}
- if (len < sizeof (struct rspamd_fuzzy_cmd) + sizeof (gboolean) ||
+ if (len < sizeof (struct rspamd_fuzzy_cmd) + sizeof (guint32) ||
len > sizeof (cmd)) {
/* Bad size command */
msg_err_fuzzy_update ("incorrect element size: %d, at least "
"%d expected", len,
- (gint)(sizeof (struct rspamd_fuzzy_cmd) + sizeof (gboolean)));
+ (gint)(sizeof (struct rspamd_fuzzy_cmd) + sizeof (guint32)));
goto err;
}
- memcpy (&cmd, p, sizeof (gboolean));
- if (cmd.is_shingle && len < sizeof (cmd)) {
+ memcpy (&cmd, p, len);
+ if (cmd.is_shingle && len != sizeof (cmd)) {
/* Short command */
msg_err_fuzzy_update ("incorrect element size: %d, at least "
"%d expected", len,
}
pcmd = g_slice_alloc (sizeof (cmd));
- memcpy (pcmd, p, len);
+ memcpy (pcmd, &cmd, len);
updates = g_list_prepend (updates, pcmd);
p += len;
cur->data = NULL;
}
- rspamd_fuzzy_process_updates_queue (session->ctx, session->src);
+ rspamd_fuzzy_process_updates_queue (session->ctx, session->src, TRUE);
msg_info_fuzzy_update ("processed updates from the master %s, "
"%ud operations processed,"
" revision: %d (local revision: %d)",
}
}
- /* This also update our version id */
g_list_free (updates);
}
}
}
static int
-rspamd_fuzzy_collection_cookie_handler (struct rspamd_http_connection_entry *conn_ent,
+rspamd_fuzzy_collection_cookie (struct rspamd_http_connection_entry *conn_ent,
struct rspamd_http_message *msg)
{
struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
}
static int
-rspamd_fuzzy_collection_data_handler (struct rspamd_http_connection_entry *conn_ent,
+rspamd_fuzzy_collection_data (struct rspamd_http_connection_entry *conn_ent,
struct rspamd_http_message *msg)
{
struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
GError *err = NULL;
guchar *decoded_signature;
gsize dec_len;
- guint32 cmdlen;
+ guint32 cmdlen, nupdates = 0;
sign_header = rspamd_http_message_find_header (msg, "Signature");
*/
reply = rspamd_fstring_append (reply, (const gchar *)&ctx->collection_id,
sizeof (ctx->collection_id));
+ cur = ctx->updates_pending->head;
+
while (cur) {
io_cmd = cur->data;
if (io_cmd->is_shingle) {
- cmdlen = sizeof (io_cmd->cmd.shingle);
+ cmdlen = sizeof (io_cmd->cmd.shingle) + sizeof (guint32);
}
else {
- cmdlen = sizeof (io_cmd->cmd.normal);
+ cmdlen = sizeof (io_cmd->cmd.normal) + sizeof (guint32);
}
cmdlen = GUINT32_TO_LE (cmdlen);
reply = rspamd_fstring_append (reply, (const gchar *)&cmdlen,
sizeof (cmdlen));
- reply = rspamd_fstring_append (reply, (const gchar *)&io_cmd->cmd,
+ reply = rspamd_fstring_append (reply, (const gchar *)io_cmd,
cmdlen);
g_slice_free1 (sizeof (*io_cmd), io_cmd);
+ nupdates ++;
cur = g_list_next (cur);
}
+ msg_info_fuzzy_collection ("collection %d done, send %d updates",
+ ctx->collection_id, nupdates);
/* Last command */
cmdlen = 0;
reply = rspamd_fstring_append (reply, (const gchar *)&cmdlen,
struct rspamd_fuzzy_storage_ctx *ctx = ud;
if (g_queue_get_length (ctx->updates_pending) > 0) {
- rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
+ rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
return TRUE;
}
rep.reply.fuzzy_sync.status = 0;
if (ctx->backend && worker->index == 0) {
- rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
+ rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
rspamd_fuzzy_storage_periodic_callback, ctx);
}
/* Register paths */
rspamd_http_router_add_path (ctx->collection_rt,
"/cookie",
- rspamd_fuzzy_collection_cookie_handler);
+ rspamd_fuzzy_collection_cookie);
rspamd_http_router_add_path (ctx->collection_rt,
"/data",
- rspamd_fuzzy_collection_data_handler);
+ rspamd_fuzzy_collection_data);
}
}
if (worker->index == 0 && g_queue_get_length (ctx->updates_pending) > 0) {
if (!ctx->collection_mode) {
- rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
+ rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
event_base_loop (ctx->ev_base, 0);
}
}