summaryrefslogtreecommitdiffstats
path: root/src/lua/lua_redis.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-10-20 17:01:10 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-10-20 19:43:32 +0100
commit49d00da657e00abb9d83cfccb607e04f655ef9e5 (patch)
tree5bf0327737f72b9521a72be7f4390f89787e49e6 /src/lua/lua_redis.c
parentf4086b26c873e291ff7bbfab08b7059344c22bce (diff)
downloadrspamd-49d00da657e00abb9d83cfccb607e04f655ef9e5.tar.gz
rspamd-49d00da657e00abb9d83cfccb607e04f655ef9e5.zip
[Project] Adopt lua redis
Diffstat (limited to 'src/lua/lua_redis.c')
-rw-r--r--src/lua/lua_redis.c62
1 files changed, 44 insertions, 18 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index 65c0609f9..8d884fab0 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -94,12 +94,12 @@ struct lua_redis_request_specific_userdata;
struct lua_redis_userdata {
redisAsyncContext *ctx;
struct rspamd_task *task;
+ struct rspamd_symcache_item *item;
struct rspamd_async_session *s;
struct event_base *ev_base;
struct rspamd_config *cfg;
struct rspamd_redis_pool *pool;
gchar *server;
- gchar *reqline;
struct lua_redis_request_specific_userdata *specific;
gdouble timeout;
guint16 port;
@@ -119,7 +119,6 @@ struct lua_redis_request_specific_userdata {
guint nargs;
gchar **args;
gsize *arglens;
- struct rspamd_async_watcher *w;
struct lua_redis_userdata *c;
struct lua_redis_ctx *ctx;
struct lua_redis_request_specific_userdata *next;
@@ -140,8 +139,9 @@ struct lua_redis_ctx {
struct lua_redis_result {
gboolean is_error;
gint result_ref;
- struct rspamd_async_watcher *w;
+ struct rspamd_symcache_item *item;
struct rspamd_async_session *s;
+ struct rspamd_task *task;
struct lua_redis_request_specific_userdata *sp_ud;
};
@@ -291,7 +291,10 @@ lua_redis_push_error (const gchar *err,
sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
if (connected && ud->s) {
- rspamd_session_watcher_pop (ud->s, sp_ud->w);
+ if (ud->item) {
+ rspamd_symcache_item_async_dec_check (ud->task, ud->item);
+ }
+
rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
}
else {
@@ -374,7 +377,10 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
if (ud->s) {
- rspamd_session_watcher_pop (ud->s, sp_ud->w);
+ if (ud->item) {
+ rspamd_symcache_item_async_dec_check (ud->task, ud->item);
+ }
+
rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
}
else {
@@ -491,7 +497,10 @@ lua_redis_cleanup_events (struct lua_redis_ctx *ctx)
while (!g_queue_is_empty (ctx->events_cleanup)) {
struct lua_redis_result *result = g_queue_pop_head (ctx->events_cleanup);
- rspamd_session_watcher_pop (result->s, result->w);
+ if (result->item) {
+ rspamd_symcache_item_async_dec_check (result->task, result->item);
+ }
+
rspamd_session_remove_event (result->s, lua_redis_fin, result->sp_ud);
g_free (result);
@@ -584,7 +593,8 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX);
result->s = ud->s;
- result->w = sp_ud->w;
+ result->item = ud->item;
+ result->task = ud->task;
result->sp_ud = sp_ud;
g_queue_push_tail (ctx->replies, result);
@@ -911,12 +921,17 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_asy
ctx->events_cleanup = g_queue_new ();
}
+
ud->s = session;
ud->cfg = cfg;
ud->pool = cfg->redis_pool;
ud->ev_base = ev_base;
ud->task = task;
+ if (task) {
+ ud->item = rspamd_symbols_cache_get_cur_item (task);
+ }
+
ret = TRUE;
}
else {
@@ -1020,6 +1035,7 @@ lua_redis_make_request (lua_State *L)
&sp_ud->nargs);
lua_pop (L, 1);
LL_PREPEND (ud->specific, sp_ud);
+
ret = redisAsyncCommandArgv (ud->ctx,
lua_redis_callback,
sp_ud,
@@ -1029,12 +1045,13 @@ lua_redis_make_request (lua_State *L)
if (ret == REDIS_OK) {
if (ud->s) {
- rspamd_session_add_event (ud->s, NULL, lua_redis_fin, sp_ud, g_quark_from_static_string ("lua redis"));
- sp_ud->w = rspamd_session_get_watcher (ud->s);
- rspamd_session_watcher_push (ud->s);
- }
- else {
- sp_ud->w = NULL;
+ rspamd_session_add_event (ud->s,
+ lua_redis_fin, sp_ud,
+ g_quark_from_static_string ("lua redis"));
+
+ if (ud->item) {
+ rspamd_symcache_item_async_inc (ud->task, ud->item);
+ }
}
REDIS_RETAIN (ctx); /* Cleared by fin event */
@@ -1396,18 +1413,27 @@ lua_redis_add_cmd (lua_State *L)
if (ret == REDIS_OK) {
if (ud->s) {
- rspamd_session_add_event (ud->s, NULL, lua_redis_fin, sp_ud, g_quark_from_static_string ("lua redis"));
- sp_ud->w = rspamd_session_get_watcher (ud->s);
- rspamd_session_watcher_push (ud->s);
+ rspamd_session_add_event (ud->s,
+ lua_redis_fin,
+ sp_ud,
+ g_quark_from_static_string ("lua redis"));
+
+ if (ud->item) {
+ rspamd_symcache_item_async_inc (ud->task, ud->item);
+ }
}
double_to_tv (sp_ud->c->timeout, &tv);
+
if (IS_ASYNC (ctx)) {
- event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud);
+ event_set (&sp_ud->timeout, -1, EV_TIMEOUT,
+ lua_redis_timeout, sp_ud);
}
else {
- event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout_sync, sp_ud);
+ event_set (&sp_ud->timeout, -1, EV_TIMEOUT,
+ lua_redis_timeout_sync, sp_ud);
}
+
event_base_set (ud->ev_base, &sp_ud->timeout);
event_add (&sp_ud->timeout, &tv);
REDIS_RETAIN (ctx);