struct rspamd_keypair_cache *keypair_cache;
rspamd_lru_hash_t *errors_ips;
struct rspamd_fuzzy_backend *backend;
- GQueue *updates_pending;
+ GArray *updates_pending;
guint updates_failed;
guint updates_maxfail;
guint32 collection_id;
fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
{
struct rspamd_fuzzy_updates_cbdata *cbdata = ud;
- GList *cur;
struct fuzzy_peer_cmd *io_cmd;
guint32 rev32 = rev64, len;
const gchar *p;
struct rspamd_http_message *msg;
struct rspamd_fuzzy_mirror *m;
struct timeval tv;
+ guint i;
conn = cbdata->conn;
ctx = cbdata->ctx;
rev32 = GUINT32_TO_LE (rev32);
len = sizeof (guint32) * 2; /* revision + last chunk */
- for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) {
- io_cmd = cur->data;
+ for (i = 0; i < ctx->updates_pending->len; i ++) {
+ io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i);
if (io_cmd->is_shingle) {
len += sizeof (guint32) + sizeof (guint32) +
reply = rspamd_fstring_append (reply, (const char *)&rev32,
sizeof (rev32));
- for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) {
- io_cmd = cur->data;
+ for (i = 0; i < ctx->updates_pending->len; i ++) {
+ io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i);
if (io_cmd->is_shingle) {
len = sizeof (guint32) +
guint i;
struct rspamd_fuzzy_storage_ctx *ctx;
const gchar *source;
- GList *cur;
- struct fuzzy_peer_cmd *io_cmd;
ctx = cbdata->ctx;
source = cbdata->source;
if (success) {
rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
- if (g_queue_get_length (ctx->updates_pending) > 0) {
+ if (ctx->updates_pending->len > 0) {
for (i = 0; i < ctx->mirrors->len; i ++) {
m = g_ptr_array_index (ctx->mirrors, i);
}
/* Clear updates */
- cur = ctx->updates_pending->head;
-
- while (cur) {
- io_cmd = cur->data;
- g_slice_free1 (sizeof (*io_cmd), io_cmd);
- cur = g_list_next (cur);
- }
-
- g_queue_clear (ctx->updates_pending);
+ ctx->updates_pending->len = 0;
rspamd_fuzzy_backend_version (ctx->backend, source,
fuzzy_update_version_callback, g_strdup (source));
ctx->updates_failed = 0;
if (++ctx->updates_failed > ctx->updates_maxfail) {
msg_err ("cannot commit update transaction to fuzzy backend, discard "
"%ud updates after %d retries",
- g_queue_get_length (ctx->updates_pending),
+ ctx->updates_pending->len,
ctx->updates_maxfail);
ctx->updates_failed = 0;
- cur = ctx->updates_pending->head;
-
- while (cur) {
- io_cmd = cur->data;
- g_slice_free1 (sizeof (*io_cmd), io_cmd);
- cur = g_list_next (cur);
- }
-
- g_queue_clear (ctx->updates_pending);
+ ctx->updates_pending->len = 0;
}
else {
msg_err ("cannot commit update transaction to fuzzy backend, "
"%ud updates are still pending, %d updates left",
- g_queue_get_length (ctx->updates_pending),
+ ctx->updates_pending->len,
ctx->updates_maxfail - ctx->updates_failed);
}
}
struct rspamd_updates_cbdata *cbdata;
- if (ctx->updates_pending &&
- (forced || g_queue_get_length (ctx->updates_pending) > 0)) {
+ if ((forced ||ctx->updates_pending->len > 0)) {
cbdata = g_slice_alloc (sizeof (*cbdata));
cbdata->ctx = ctx;
cbdata->source = g_strdup (source);
gboolean encrypted = FALSE, is_shingle = FALSE;
struct rspamd_fuzzy_cmd *cmd = NULL;
struct rspamd_fuzzy_reply result;
- struct fuzzy_peer_cmd *up_cmd;
+ struct fuzzy_peer_cmd up_cmd;
struct fuzzy_peer_request *up_req;
struct fuzzy_key_stat *ip_stat = NULL;
gchar hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1];
if (session->worker->index == 0 || session->ctx->peer_fd == -1) {
/* Just add to the queue */
- up_cmd = g_slice_alloc0 (sizeof (*up_cmd));
- up_cmd->is_shingle = is_shingle;
+ up_cmd.is_shingle = is_shingle;
ptr = is_shingle ?
- (gpointer)&up_cmd->cmd.shingle :
- (gpointer)&up_cmd->cmd.normal;
+ (gpointer)&up_cmd.cmd.shingle :
+ (gpointer)&up_cmd.cmd.normal;
memcpy (ptr, cmd, up_len);
- g_queue_push_tail (session->ctx->updates_pending, up_cmd);
+ g_array_append_val (session->ctx->updates_pending, up_cmd);
}
else {
/* We need to send request to the peer */
gsize remain;
gint32 revision;
guint32 len = 0, cnt = 0;
- struct fuzzy_peer_cmd cmd, *pcmd;
+ struct fuzzy_peer_cmd cmd;
enum {
read_len = 0,
read_data,
finish_processing
} state = read_len;
- GList *updates = NULL, *cur;
+
gpointer flag_ptr;
/*
goto err;
}
- pcmd = g_slice_alloc (sizeof (cmd));
- memcpy (pcmd, &cmd, len);
- updates = g_list_prepend (updates, pcmd);
+ if (cmd.is_shingle) {
+ if ((flag_ptr = g_hash_table_lookup (session->ctx->master_flags,
+ GUINT_TO_POINTER (cmd.cmd.shingle.basic.flag))) != NULL) {
+ cmd.cmd.shingle.basic.flag = GPOINTER_TO_UINT (flag_ptr);
+ }
+ }
+ else {
+ if ((flag_ptr = g_hash_table_lookup (session->ctx->master_flags,
+ GUINT_TO_POINTER (cmd.cmd.normal.flag))) != NULL) {
+ cmd.cmd.normal.flag = GPOINTER_TO_UINT (flag_ptr);
+ }
+ }
+
+ g_array_append_val (session->ctx->updates_pending, cmd);
p += len;
remain -= len;
}
}
- /* Insert elements to the updates from head */
- for (cur = updates; cur != NULL; cur = g_list_next (cur)) {
- pcmd = cur->data;
-
- if (pcmd->is_shingle) {
- if ((flag_ptr = g_hash_table_lookup (session->ctx->master_flags,
- GUINT_TO_POINTER (pcmd->cmd.shingle.basic.flag))) != NULL) {
- pcmd->cmd.shingle.basic.flag = GPOINTER_TO_UINT (flag_ptr);
- }
- }
- else {
- if ((flag_ptr = g_hash_table_lookup (session->ctx->master_flags,
- GUINT_TO_POINTER (pcmd->cmd.normal.flag))) != NULL) {
- pcmd->cmd.normal.flag = GPOINTER_TO_UINT (flag_ptr);
- }
- }
-
-
- g_queue_push_head (session->ctx->updates_pending, cur->data);
- cur->data = NULL;
- }
rspamd_fuzzy_process_updates_queue (session->ctx, session->src, TRUE);
msg_info_fuzzy_update ("processed updates from the master %s, "
cnt, revision, our_rev);
err:
- if (updates) {
- /* We still need to clear queue */
- for (cur = updates; cur != NULL; cur = g_list_next (cur)) {
- if (cur->data) {
- g_slice_free1 (sizeof (cmd), cur->data);
- }
- }
-
- g_list_free (updates);
- }
+ return;
}
struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
const rspamd_ftok_t *sign_header;
struct rspamd_fuzzy_storage_ctx *ctx;
- GList *cur;
+ guint i;
struct fuzzy_peer_cmd *io_cmd;
rspamd_fstring_t *reply;
GError *err = NULL;
ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie));
/* Send&Clear updates */
- cur = ctx->updates_pending->head;
reply = rspamd_fstring_sized_new (8192);
/*
* Message format:
*/
reply = rspamd_fstring_append (reply, (const gchar *)&ctx->collection_id,
sizeof (ctx->collection_id));
- cur = ctx->updates_pending->head;
- while (cur) {
- io_cmd = cur->data;
+ for (i = 0; i < ctx->updates_pending->len; i ++) {
+ io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i);
if (io_cmd->is_shingle) {
cmdlen = sizeof (io_cmd->cmd.shingle) + sizeof (guint32);
sizeof (cmdlen));
reply = rspamd_fstring_append (reply, (const gchar *)io_cmd,
cmdlen);
- g_slice_free1 (sizeof (*io_cmd), io_cmd);
nupdates ++;
- cur = g_list_next (cur);
}
msg_info_fuzzy_collection ("collection %d done, send %d updates",
reply = rspamd_fstring_append (reply, (const gchar *)&cmdlen,
sizeof (cmdlen));
- g_queue_clear (ctx->updates_pending);
+ ctx->updates_pending->len = 0;
/* Clear failed attempts counter */
ctx->updates_failed = 0;
ctx->collection_id ++;
rspamd_fuzzy_collection_periodic (gint fd, gshort what, gpointer ud)
{
struct rspamd_fuzzy_storage_ctx *ctx = ud;
- GList *cur;
- struct fuzzy_peer_cmd *io_cmd;
if (++ctx->updates_failed > ctx->updates_maxfail) {
msg_err ("cannot store more data in workqueue, discard "
"%ud updates after %d missed collection points",
- g_queue_get_length (ctx->updates_pending),
+ ctx->updates_pending->len,
ctx->updates_maxfail);
ctx->updates_failed = 0;
- cur = ctx->updates_pending->head;
-
- while (cur) {
- io_cmd = cur->data;
- g_slice_free1 (sizeof (*io_cmd), io_cmd);
- cur = g_list_next (cur);
- }
-
- g_queue_clear (ctx->updates_pending);
+ ctx->updates_pending->len = 0;
/* Regenerate cookie */
ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie));
}
else {
msg_err ("fuzzy data has not been collected in time, "
"%ud updates are still pending, %d updates left",
- g_queue_get_length (ctx->updates_pending),
+ ctx->updates_pending->len,
ctx->updates_maxfail - ctx->updates_failed);
}
{
struct rspamd_fuzzy_storage_ctx *ctx = ud;
- if (g_queue_get_length (ctx->updates_pending) > 0) {
+ if (ctx->updates_pending->len > 0) {
rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
return TRUE;
static void
rspamd_fuzzy_peer_io (gint fd, gshort what, gpointer d)
{
- struct fuzzy_peer_cmd cmd, *pcmd;
+ struct fuzzy_peer_cmd cmd;
struct rspamd_fuzzy_storage_ctx *ctx = d;
gssize r;
}
}
else {
- pcmd = g_slice_alloc (sizeof (*pcmd));
- memcpy (pcmd, &cmd, sizeof (cmd));
- g_queue_push_tail (ctx->updates_pending, pcmd);
+ g_array_append_val (ctx->updates_pending, cmd);
}
}
if (worker->index == 0) {
- ctx->updates_pending = g_queue_new ();
+ ctx->updates_pending = g_array_sized_new (FALSE, FALSE,
+ sizeof (struct fuzzy_peer_cmd), 1024);
rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
rspamd_fuzzy_storage_periodic_callback, ctx);
}
* we collect fuzzy hashes in the updates queue and ignore all read commands
*/
if (worker->index == 0) {
- ctx->updates_pending = g_queue_new ();
+ ctx->updates_pending = g_array_sized_new (FALSE, FALSE,
+ sizeof (struct fuzzy_peer_cmd), 1024);
double_to_tv (ctx->sync_timeout, &ctx->stat_tv);
event_set (&ctx->stat_ev, -1, EV_TIMEOUT|EV_PERSIST,
rspamd_fuzzy_collection_periodic, ctx);
event_base_loop (ctx->ev_base, 0);
rspamd_worker_block_signals ();
- if (worker->index == 0 && g_queue_get_length (ctx->updates_pending) > 0) {
+ if (worker->index == 0 && ctx->updates_pending->len > 0) {
if (!ctx->collection_mode) {
rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
event_base_loop (ctx->ev_base, 0);
close (fd);
}
+
+ g_array_free (ctx->updates_pending, TRUE);
}
rspamd_log_close (worker->srv->logger);
rspamd_fuzzy_check_cb cb, void *ud,
void *subr_ud);
static void rspamd_fuzzy_backend_update_sqlite (struct rspamd_fuzzy_backend *bk,
- GQueue *updates, const gchar *src,
+ GArray *updates, const gchar *src,
rspamd_fuzzy_update_cb cb, void *ud,
void *subr_ud);
static void rspamd_fuzzy_backend_count_sqlite (struct rspamd_fuzzy_backend *bk,
rspamd_fuzzy_check_cb cb, void *ud,
void *subr_ud);
void (*update) (struct rspamd_fuzzy_backend *bk,
- GQueue *updates, const gchar *src,
+ GArray *updates, const gchar *src,
rspamd_fuzzy_update_cb cb, void *ud,
void *subr_ud);
void (*count) (struct rspamd_fuzzy_backend *bk,
static void
rspamd_fuzzy_backend_update_sqlite (struct rspamd_fuzzy_backend *bk,
- GQueue *updates, const gchar *src,
+ GArray *updates, const gchar *src,
rspamd_fuzzy_update_cb cb, void *ud,
void *subr_ud)
{
struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
gboolean success = FALSE;
- GList *cur;
+ guint i;
struct fuzzy_peer_cmd *io_cmd;
struct rspamd_fuzzy_cmd *cmd;
gpointer ptr;
guint nupdates = 0;
if (rspamd_fuzzy_backend_sqlite_prepare_update (sq, src)) {
- cur = updates->head;
-
- while (cur) {
- io_cmd = cur->data;
+ for (i = 0; i < updates->len; i ++) {
+ io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i);
if (io_cmd->is_shingle) {
cmd = &io_cmd->cmd.shingle.basic;
}
nupdates ++;
- cur = g_list_next (cur);
}
if (rspamd_fuzzy_backend_sqlite_finish_update (sq, src,
void
rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk,
- GQueue *updates, const gchar *src, rspamd_fuzzy_update_cb cb,
+ GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb,
void *ud)
{
g_assert (bk != NULL);