From 49d00da657e00abb9d83cfccb607e04f655ef9e5 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sat, 20 Oct 2018 17:01:10 +0100 Subject: [PATCH] [Project] Adopt lua redis --- src/lua/lua_redis.c | 62 ++++++++++++++++++++++++++++++++------------- 1 file changed, 44 insertions(+), 18 deletions(-) diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index 65c0609f9..8d884fab0 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -94,12 +94,12 @@ struct lua_redis_request_specific_userdata; struct lua_redis_userdata { redisAsyncContext *ctx; struct rspamd_task *task; + struct rspamd_symcache_item *item; struct rspamd_async_session *s; struct event_base *ev_base; struct rspamd_config *cfg; struct rspamd_redis_pool *pool; gchar *server; - gchar *reqline; struct lua_redis_request_specific_userdata *specific; gdouble timeout; guint16 port; @@ -119,7 +119,6 @@ struct lua_redis_request_specific_userdata { guint nargs; gchar **args; gsize *arglens; - struct rspamd_async_watcher *w; struct lua_redis_userdata *c; struct lua_redis_ctx *ctx; struct lua_redis_request_specific_userdata *next; @@ -140,8 +139,9 @@ struct lua_redis_ctx { struct lua_redis_result { gboolean is_error; gint result_ref; - struct rspamd_async_watcher *w; + struct rspamd_symcache_item *item; struct rspamd_async_session *s; + struct rspamd_task *task; struct lua_redis_request_specific_userdata *sp_ud; }; @@ -291,7 +291,10 @@ lua_redis_push_error (const gchar *err, sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; if (connected && ud->s) { - rspamd_session_watcher_pop (ud->s, sp_ud->w); + if (ud->item) { + rspamd_symcache_item_async_dec_check (ud->task, ud->item); + } + rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud); } else { @@ -374,7 +377,10 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; if (ud->s) { - rspamd_session_watcher_pop (ud->s, sp_ud->w); + if (ud->item) { + rspamd_symcache_item_async_dec_check (ud->task, ud->item); + } + rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud); } else { @@ -491,7 +497,10 @@ lua_redis_cleanup_events (struct lua_redis_ctx *ctx) while (!g_queue_is_empty (ctx->events_cleanup)) { struct lua_redis_result *result = g_queue_pop_head (ctx->events_cleanup); - rspamd_session_watcher_pop (result->s, result->w); + if (result->item) { + rspamd_symcache_item_async_dec_check (result->task, result->item); + } + rspamd_session_remove_event (result->s, lua_redis_fin, result->sp_ud); g_free (result); @@ -584,7 +593,8 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv) result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX); result->s = ud->s; - result->w = sp_ud->w; + result->item = ud->item; + result->task = ud->task; result->sp_ud = sp_ud; g_queue_push_tail (ctx->replies, result); @@ -911,12 +921,17 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_asy ctx->events_cleanup = g_queue_new (); } + ud->s = session; ud->cfg = cfg; ud->pool = cfg->redis_pool; ud->ev_base = ev_base; ud->task = task; + if (task) { + ud->item = rspamd_symbols_cache_get_cur_item (task); + } + ret = TRUE; } else { @@ -1020,6 +1035,7 @@ lua_redis_make_request (lua_State *L) &sp_ud->nargs); lua_pop (L, 1); LL_PREPEND (ud->specific, sp_ud); + ret = redisAsyncCommandArgv (ud->ctx, lua_redis_callback, sp_ud, @@ -1029,12 +1045,13 @@ lua_redis_make_request (lua_State *L) if (ret == REDIS_OK) { if (ud->s) { - rspamd_session_add_event (ud->s, NULL, lua_redis_fin, sp_ud, g_quark_from_static_string ("lua redis")); - sp_ud->w = rspamd_session_get_watcher (ud->s); - rspamd_session_watcher_push (ud->s); - } - else { - sp_ud->w = NULL; + rspamd_session_add_event (ud->s, + lua_redis_fin, sp_ud, + g_quark_from_static_string ("lua redis")); + + if (ud->item) { + rspamd_symcache_item_async_inc (ud->task, ud->item); + } } REDIS_RETAIN (ctx); /* Cleared by fin event */ @@ -1396,18 +1413,27 @@ lua_redis_add_cmd (lua_State *L) if (ret == REDIS_OK) { if (ud->s) { - rspamd_session_add_event (ud->s, NULL, lua_redis_fin, sp_ud, g_quark_from_static_string ("lua redis")); - sp_ud->w = rspamd_session_get_watcher (ud->s); - rspamd_session_watcher_push (ud->s); + rspamd_session_add_event (ud->s, + lua_redis_fin, + sp_ud, + g_quark_from_static_string ("lua redis")); + + if (ud->item) { + rspamd_symcache_item_async_inc (ud->task, ud->item); + } } double_to_tv (sp_ud->c->timeout, &tv); + if (IS_ASYNC (ctx)) { - event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud); + event_set (&sp_ud->timeout, -1, EV_TIMEOUT, + lua_redis_timeout, sp_ud); } else { - event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout_sync, sp_ud); + event_set (&sp_ud->timeout, -1, EV_TIMEOUT, + lua_redis_timeout_sync, sp_ud); } + event_base_set (ud->ev_base, &sp_ud->timeout); event_add (&sp_ud->timeout, &tv); REDIS_RETAIN (ctx); -- 2.39.5