From: Vsevolod Stakhov Date: Thu, 17 Aug 2017 07:31:17 +0000 (+0100) Subject: [Feature] Use array instead of queue to reduce memory fragmentation X-Git-Tag: 1.7.0~717 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=0626200288019b3da207101dc31a3c83dfed8d76;p=rspamd.git [Feature] Use array instead of queue to reduce memory fragmentation --- diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index ad77c2848..e08598969 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -162,7 +162,7 @@ struct rspamd_fuzzy_storage_ctx { struct rspamd_keypair_cache *keypair_cache; rspamd_lru_hash_t *errors_ips; struct rspamd_fuzzy_backend *backend; - GQueue *updates_pending; + GArray *updates_pending; guint updates_failed; guint updates_maxfail; guint32 collection_id; @@ -310,7 +310,6 @@ static void fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud) { struct rspamd_fuzzy_updates_cbdata *cbdata = ud; - GList *cur; struct fuzzy_peer_cmd *io_cmd; guint32 rev32 = rev64, len; const gchar *p; @@ -320,6 +319,7 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud) struct rspamd_http_message *msg; struct rspamd_fuzzy_mirror *m; struct timeval tv; + guint i; conn = cbdata->conn; ctx = cbdata->ctx; @@ -329,8 +329,8 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud) rev32 = GUINT32_TO_LE (rev32); len = sizeof (guint32) * 2; /* revision + last chunk */ - for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) { - io_cmd = cur->data; + for (i = 0; i < ctx->updates_pending->len; i ++) { + io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i); if (io_cmd->is_shingle) { len += sizeof (guint32) + sizeof (guint32) + @@ -346,8 +346,8 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud) reply = rspamd_fstring_append (reply, (const char *)&rev32, sizeof (rev32)); - for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) { - io_cmd = cur->data; + for (i = 0; i < ctx->updates_pending->len; i ++) { + io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i); if (io_cmd->is_shingle) { len = sizeof (guint32) + @@ -502,8 +502,6 @@ rspamd_fuzzy_updates_cb (gboolean success, void *ud) guint i; struct rspamd_fuzzy_storage_ctx *ctx; const gchar *source; - GList *cur; - struct fuzzy_peer_cmd *io_cmd; ctx = cbdata->ctx; source = cbdata->source; @@ -511,7 +509,7 @@ rspamd_fuzzy_updates_cb (gboolean success, void *ud) if (success) { rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx); - if (g_queue_get_length (ctx->updates_pending) > 0) { + if (ctx->updates_pending->len > 0) { for (i = 0; i < ctx->mirrors->len; i ++) { m = g_ptr_array_index (ctx->mirrors, i); @@ -520,15 +518,7 @@ rspamd_fuzzy_updates_cb (gboolean success, void *ud) } /* Clear updates */ - cur = ctx->updates_pending->head; - - while (cur) { - io_cmd = cur->data; - g_slice_free1 (sizeof (*io_cmd), io_cmd); - cur = g_list_next (cur); - } - - g_queue_clear (ctx->updates_pending); + ctx->updates_pending->len = 0; rspamd_fuzzy_backend_version (ctx->backend, source, fuzzy_update_version_callback, g_strdup (source)); ctx->updates_failed = 0; @@ -537,23 +527,15 @@ rspamd_fuzzy_updates_cb (gboolean success, void *ud) if (++ctx->updates_failed > ctx->updates_maxfail) { msg_err ("cannot commit update transaction to fuzzy backend, discard " "%ud updates after %d retries", - g_queue_get_length (ctx->updates_pending), + ctx->updates_pending->len, ctx->updates_maxfail); ctx->updates_failed = 0; - cur = ctx->updates_pending->head; - - while (cur) { - io_cmd = cur->data; - g_slice_free1 (sizeof (*io_cmd), io_cmd); - cur = g_list_next (cur); - } - - g_queue_clear (ctx->updates_pending); + ctx->updates_pending->len = 0; } else { msg_err ("cannot commit update transaction to fuzzy backend, " "%ud updates are still pending, %d updates left", - g_queue_get_length (ctx->updates_pending), + ctx->updates_pending->len, ctx->updates_maxfail - ctx->updates_failed); } } @@ -579,8 +561,7 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx, struct rspamd_updates_cbdata *cbdata; - if (ctx->updates_pending && - (forced || g_queue_get_length (ctx->updates_pending) > 0)) { + if ((forced ||ctx->updates_pending->len > 0)) { cbdata = g_slice_alloc (sizeof (*cbdata)); cbdata->ctx = ctx; cbdata->source = g_strdup (source); @@ -783,7 +764,7 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) gboolean encrypted = FALSE, is_shingle = FALSE; struct rspamd_fuzzy_cmd *cmd = NULL; struct rspamd_fuzzy_reply result; - struct fuzzy_peer_cmd *up_cmd; + struct fuzzy_peer_cmd up_cmd; struct fuzzy_peer_request *up_req; struct fuzzy_key_stat *ip_stat = NULL; gchar hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1]; @@ -890,13 +871,12 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) if (session->worker->index == 0 || session->ctx->peer_fd == -1) { /* Just add to the queue */ - up_cmd = g_slice_alloc0 (sizeof (*up_cmd)); - up_cmd->is_shingle = is_shingle; + up_cmd.is_shingle = is_shingle; ptr = is_shingle ? - (gpointer)&up_cmd->cmd.shingle : - (gpointer)&up_cmd->cmd.normal; + (gpointer)&up_cmd.cmd.shingle : + (gpointer)&up_cmd.cmd.normal; memcpy (ptr, cmd, up_len); - g_queue_push_tail (session->ctx->updates_pending, up_cmd); + g_array_append_val (session->ctx->updates_pending, up_cmd); } else { /* We need to send request to the peer */ @@ -1111,13 +1091,13 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, gsize remain; gint32 revision; guint32 len = 0, cnt = 0; - struct fuzzy_peer_cmd cmd, *pcmd; + struct fuzzy_peer_cmd cmd; enum { read_len = 0, read_data, finish_processing } state = read_len; - GList *updates = NULL, *cur; + gpointer flag_ptr; /* @@ -1204,9 +1184,20 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, goto err; } - pcmd = g_slice_alloc (sizeof (cmd)); - memcpy (pcmd, &cmd, len); - updates = g_list_prepend (updates, pcmd); + if (cmd.is_shingle) { + if ((flag_ptr = g_hash_table_lookup (session->ctx->master_flags, + GUINT_TO_POINTER (cmd.cmd.shingle.basic.flag))) != NULL) { + cmd.cmd.shingle.basic.flag = GPOINTER_TO_UINT (flag_ptr); + } + } + else { + if ((flag_ptr = g_hash_table_lookup (session->ctx->master_flags, + GUINT_TO_POINTER (cmd.cmd.normal.flag))) != NULL) { + cmd.cmd.normal.flag = GPOINTER_TO_UINT (flag_ptr); + } + } + + g_array_append_val (session->ctx->updates_pending, cmd); p += len; remain -= len; @@ -1221,27 +1212,6 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, } } - /* Insert elements to the updates from head */ - for (cur = updates; cur != NULL; cur = g_list_next (cur)) { - pcmd = cur->data; - - if (pcmd->is_shingle) { - if ((flag_ptr = g_hash_table_lookup (session->ctx->master_flags, - GUINT_TO_POINTER (pcmd->cmd.shingle.basic.flag))) != NULL) { - pcmd->cmd.shingle.basic.flag = GPOINTER_TO_UINT (flag_ptr); - } - } - else { - if ((flag_ptr = g_hash_table_lookup (session->ctx->master_flags, - GUINT_TO_POINTER (pcmd->cmd.normal.flag))) != NULL) { - pcmd->cmd.normal.flag = GPOINTER_TO_UINT (flag_ptr); - } - } - - - g_queue_push_head (session->ctx->updates_pending, cur->data); - cur->data = NULL; - } rspamd_fuzzy_process_updates_queue (session->ctx, session->src, TRUE); msg_info_fuzzy_update ("processed updates from the master %s, " @@ -1251,16 +1221,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, cnt, revision, our_rev); err: - if (updates) { - /* We still need to clear queue */ - for (cur = updates; cur != NULL; cur = g_list_next (cur)) { - if (cur->data) { - g_slice_free1 (sizeof (cmd), cur->data); - } - } - - g_list_free (updates); - } + return; } @@ -1513,7 +1474,7 @@ rspamd_fuzzy_collection_data (struct rspamd_http_connection_entry *conn_ent, struct rspamd_fuzzy_collection_session *session = conn_ent->ud; const rspamd_ftok_t *sign_header; struct rspamd_fuzzy_storage_ctx *ctx; - GList *cur; + guint i; struct fuzzy_peer_cmd *io_cmd; rspamd_fstring_t *reply; GError *err = NULL; @@ -1564,7 +1525,6 @@ rspamd_fuzzy_collection_data (struct rspamd_http_connection_entry *conn_ent, ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie)); /* Send&Clear updates */ - cur = ctx->updates_pending->head; reply = rspamd_fstring_sized_new (8192); /* * Message format: @@ -1577,10 +1537,9 @@ rspamd_fuzzy_collection_data (struct rspamd_http_connection_entry *conn_ent, */ reply = rspamd_fstring_append (reply, (const gchar *)&ctx->collection_id, sizeof (ctx->collection_id)); - cur = ctx->updates_pending->head; - while (cur) { - io_cmd = cur->data; + for (i = 0; i < ctx->updates_pending->len; i ++) { + io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i); if (io_cmd->is_shingle) { cmdlen = sizeof (io_cmd->cmd.shingle) + sizeof (guint32); @@ -1595,9 +1554,7 @@ rspamd_fuzzy_collection_data (struct rspamd_http_connection_entry *conn_ent, sizeof (cmdlen)); reply = rspamd_fstring_append (reply, (const gchar *)io_cmd, cmdlen); - g_slice_free1 (sizeof (*io_cmd), io_cmd); nupdates ++; - cur = g_list_next (cur); } msg_info_fuzzy_collection ("collection %d done, send %d updates", @@ -1607,7 +1564,7 @@ rspamd_fuzzy_collection_data (struct rspamd_http_connection_entry *conn_ent, reply = rspamd_fstring_append (reply, (const gchar *)&cmdlen, sizeof (cmdlen)); - g_queue_clear (ctx->updates_pending); + ctx->updates_pending->len = 0; /* Clear failed attempts counter */ ctx->updates_failed = 0; ctx->collection_id ++; @@ -1664,31 +1621,21 @@ static void rspamd_fuzzy_collection_periodic (gint fd, gshort what, gpointer ud) { struct rspamd_fuzzy_storage_ctx *ctx = ud; - GList *cur; - struct fuzzy_peer_cmd *io_cmd; if (++ctx->updates_failed > ctx->updates_maxfail) { msg_err ("cannot store more data in workqueue, discard " "%ud updates after %d missed collection points", - g_queue_get_length (ctx->updates_pending), + ctx->updates_pending->len, ctx->updates_maxfail); ctx->updates_failed = 0; - cur = ctx->updates_pending->head; - - while (cur) { - io_cmd = cur->data; - g_slice_free1 (sizeof (*io_cmd), io_cmd); - cur = g_list_next (cur); - } - - g_queue_clear (ctx->updates_pending); + ctx->updates_pending->len = 0; /* Regenerate cookie */ ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie)); } else { msg_err ("fuzzy data has not been collected in time, " "%ud updates are still pending, %d updates left", - g_queue_get_length (ctx->updates_pending), + ctx->updates_pending->len, ctx->updates_maxfail - ctx->updates_failed); } @@ -1859,7 +1806,7 @@ rspamd_fuzzy_storage_periodic_callback (void *ud) { struct rspamd_fuzzy_storage_ctx *ctx = ud; - if (g_queue_get_length (ctx->updates_pending) > 0) { + if (ctx->updates_pending->len > 0) { rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE); return TRUE; @@ -2612,7 +2559,7 @@ init_fuzzy (struct rspamd_config *cfg) static void rspamd_fuzzy_peer_io (gint fd, gshort what, gpointer d) { - struct fuzzy_peer_cmd cmd, *pcmd; + struct fuzzy_peer_cmd cmd; struct rspamd_fuzzy_storage_ctx *ctx = d; gssize r; @@ -2628,9 +2575,7 @@ rspamd_fuzzy_peer_io (gint fd, gshort what, gpointer d) } } else { - pcmd = g_slice_alloc (sizeof (*pcmd)); - memcpy (pcmd, &cmd, sizeof (cmd)); - g_queue_push_tail (ctx->updates_pending, pcmd); + g_array_append_val (ctx->updates_pending, cmd); } } @@ -2744,7 +2689,8 @@ start_fuzzy (struct rspamd_worker *worker) if (worker->index == 0) { - ctx->updates_pending = g_queue_new (); + ctx->updates_pending = g_array_sized_new (FALSE, FALSE, + sizeof (struct fuzzy_peer_cmd), 1024); rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout, rspamd_fuzzy_storage_periodic_callback, ctx); } @@ -2768,7 +2714,8 @@ start_fuzzy (struct rspamd_worker *worker) * we collect fuzzy hashes in the updates queue and ignore all read commands */ if (worker->index == 0) { - ctx->updates_pending = g_queue_new (); + ctx->updates_pending = g_array_sized_new (FALSE, FALSE, + sizeof (struct fuzzy_peer_cmd), 1024); double_to_tv (ctx->sync_timeout, &ctx->stat_tv); event_set (&ctx->stat_ev, -1, EV_TIMEOUT|EV_PERSIST, rspamd_fuzzy_collection_periodic, ctx); @@ -2880,7 +2827,7 @@ start_fuzzy (struct rspamd_worker *worker) event_base_loop (ctx->ev_base, 0); rspamd_worker_block_signals (); - if (worker->index == 0 && g_queue_get_length (ctx->updates_pending) > 0) { + if (worker->index == 0 && ctx->updates_pending->len > 0) { if (!ctx->collection_mode) { rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE); event_base_loop (ctx->ev_base, 0); @@ -2914,6 +2861,8 @@ start_fuzzy (struct rspamd_worker *worker) close (fd); } + + g_array_free (ctx->updates_pending, TRUE); } rspamd_log_close (worker->srv->logger); diff --git a/src/libserver/fuzzy_backend.c b/src/libserver/fuzzy_backend.c index f9b46c167..2b32d4705 100644 --- a/src/libserver/fuzzy_backend.c +++ b/src/libserver/fuzzy_backend.c @@ -34,7 +34,7 @@ static void rspamd_fuzzy_backend_check_sqlite (struct rspamd_fuzzy_backend *bk, rspamd_fuzzy_check_cb cb, void *ud, void *subr_ud); static void rspamd_fuzzy_backend_update_sqlite (struct rspamd_fuzzy_backend *bk, - GQueue *updates, const gchar *src, + GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb, void *ud, void *subr_ud); static void rspamd_fuzzy_backend_count_sqlite (struct rspamd_fuzzy_backend *bk, @@ -60,7 +60,7 @@ struct rspamd_fuzzy_backend_subr { rspamd_fuzzy_check_cb cb, void *ud, void *subr_ud); void (*update) (struct rspamd_fuzzy_backend *bk, - GQueue *updates, const gchar *src, + GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb, void *ud, void *subr_ud); void (*count) (struct rspamd_fuzzy_backend *bk, @@ -155,23 +155,21 @@ rspamd_fuzzy_backend_check_sqlite (struct rspamd_fuzzy_backend *bk, static void rspamd_fuzzy_backend_update_sqlite (struct rspamd_fuzzy_backend *bk, - GQueue *updates, const gchar *src, + GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb, void *ud, void *subr_ud) { struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; gboolean success = FALSE; - GList *cur; + guint i; struct fuzzy_peer_cmd *io_cmd; struct rspamd_fuzzy_cmd *cmd; gpointer ptr; guint nupdates = 0; if (rspamd_fuzzy_backend_sqlite_prepare_update (sq, src)) { - cur = updates->head; - - while (cur) { - io_cmd = cur->data; + for (i = 0; i < updates->len; i ++) { + io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i); if (io_cmd->is_shingle) { cmd = &io_cmd->cmd.shingle.basic; @@ -190,7 +188,6 @@ rspamd_fuzzy_backend_update_sqlite (struct rspamd_fuzzy_backend *bk, } nupdates ++; - cur = g_list_next (cur); } if (rspamd_fuzzy_backend_sqlite_finish_update (sq, src, @@ -326,7 +323,7 @@ rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *bk, void rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk, - GQueue *updates, const gchar *src, rspamd_fuzzy_update_cb cb, + GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb, void *ud) { g_assert (bk != NULL); diff --git a/src/libserver/fuzzy_backend.h b/src/libserver/fuzzy_backend.h index 6c880d9c8..032784465 100644 --- a/src/libserver/fuzzy_backend.h +++ b/src/libserver/fuzzy_backend.h @@ -62,7 +62,7 @@ void rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *bk, * @param src */ void rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk, - GQueue *updates, const gchar *src, rspamd_fuzzy_update_cb cb, + GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb, void *ud); /** diff --git a/src/libserver/fuzzy_backend_redis.c b/src/libserver/fuzzy_backend_redis.c index f8bec8882..7e5df5ff8 100644 --- a/src/libserver/fuzzy_backend_redis.c +++ b/src/libserver/fuzzy_backend_redis.c @@ -1223,7 +1223,7 @@ rspamd_fuzzy_redis_update_callback (redisAsyncContext *c, gpointer r, void rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, - GQueue *updates, const gchar *src, + GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb, void *ud, void *subr_ud) { @@ -1232,7 +1232,7 @@ rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, struct upstream *up; struct timeval tv; rspamd_inet_addr_t *addr; - GList *cur; + guint i; GString *key; struct fuzzy_peer_cmd *io_cmd; struct rspamd_fuzzy_cmd *cmd; @@ -1267,8 +1267,8 @@ rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, ncommands = 3; /* For MULTI + EXEC + INCR */ nargs = 4; - for (cur = updates->head; cur != NULL; cur = g_list_next (cur)) { - io_cmd = cur->data; + for (i = 0; i < updates->len; i ++) { + io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i); if (io_cmd->is_shingle) { cmd = &io_cmd->cmd.shingle.basic; @@ -1352,8 +1352,8 @@ rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, /* Now split the rest of commands in packs and emit them command by command */ cur_shift = 1; - for (cur = updates->head; cur != NULL; cur = g_list_next (cur)) { - io_cmd = cur->data; + for (i = 0; i < updates->len; i ++) { + io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i); if (!rspamd_fuzzy_update_append_command (bk, session, io_cmd, &cur_shift)) { diff --git a/src/libserver/fuzzy_backend_redis.h b/src/libserver/fuzzy_backend_redis.h index 083bccafe..b10ac332d 100644 --- a/src/libserver/fuzzy_backend_redis.h +++ b/src/libserver/fuzzy_backend_redis.h @@ -29,7 +29,7 @@ void rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk, rspamd_fuzzy_check_cb cb, void *ud, void *subr_ud); void rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, - GQueue *updates, const gchar *src, + GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb, void *ud, void *subr_ud); void rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk,