diff options
Diffstat (limited to 'src/lua/lua_redis.c')
-rw-r--r-- | src/lua/lua_redis.c | 1082 |
1 files changed, 538 insertions, 544 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index 99fc383b5..2ac5a47b7 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -49,41 +49,41 @@ local function symbol_callback(task) end */ -LUA_FUNCTION_DEF (redis, make_request); -LUA_FUNCTION_DEF (redis, make_request_sync); -LUA_FUNCTION_DEF (redis, connect); -LUA_FUNCTION_DEF (redis, connect_sync); -LUA_FUNCTION_DEF (redis, add_cmd); -LUA_FUNCTION_DEF (redis, exec); -LUA_FUNCTION_DEF (redis, gc); +LUA_FUNCTION_DEF(redis, make_request); +LUA_FUNCTION_DEF(redis, make_request_sync); +LUA_FUNCTION_DEF(redis, connect); +LUA_FUNCTION_DEF(redis, connect_sync); +LUA_FUNCTION_DEF(redis, add_cmd); +LUA_FUNCTION_DEF(redis, exec); +LUA_FUNCTION_DEF(redis, gc); static const struct luaL_reg redislib_f[] = { - LUA_INTERFACE_DEF (redis, make_request), - LUA_INTERFACE_DEF (redis, make_request_sync), - LUA_INTERFACE_DEF (redis, connect), - LUA_INTERFACE_DEF (redis, connect_sync), - {NULL, NULL} -}; + LUA_INTERFACE_DEF(redis, make_request), + LUA_INTERFACE_DEF(redis, make_request_sync), + LUA_INTERFACE_DEF(redis, connect), + LUA_INTERFACE_DEF(redis, connect_sync), + {NULL, NULL}}; static const struct luaL_reg redislib_m[] = { - LUA_INTERFACE_DEF (redis, add_cmd), - LUA_INTERFACE_DEF (redis, exec), + LUA_INTERFACE_DEF(redis, add_cmd), + LUA_INTERFACE_DEF(redis, exec), {"__gc", lua_redis_gc}, {"__tostring", rspamd_lua_class_tostring}, - {NULL, NULL} -}; + {NULL, NULL}}; #undef REDIS_DEBUG_REFS #ifdef REDIS_DEBUG_REFS -#define REDIS_RETAIN(x) do { \ - msg_err ("retain ref %p, refcount: %d", (x), (x)->ref.refcount); \ - REF_RETAIN(x); \ -} while (0) - -#define REDIS_RELEASE(x) do { \ - msg_err ("release ref %p, refcount: %d", (x), (x)->ref.refcount); \ - REF_RELEASE(x); \ -} while (0) +#define REDIS_RETAIN(x) \ + do { \ + msg_err("retain ref %p, refcount: %d", (x), (x)->ref.refcount); \ + REF_RETAIN(x); \ + } while (0) + +#define REDIS_RELEASE(x) \ + do { \ + msg_err("release ref %p, refcount: %d", (x), (x)->ref.refcount); \ + REF_RELEASE(x); \ + } while (0) #else #define REDIS_RETAIN REF_RETAIN #define REDIS_RELEASE REF_RELEASE @@ -109,10 +109,10 @@ struct lua_redis_userdata { guint16 terminated; }; -#define msg_debug_lua_redis(...) rspamd_conditional_debug_fast (NULL, NULL, \ - rspamd_lua_redis_log_id, "lua_redis", ud->log_tag, \ - G_STRFUNC, \ - __VA_ARGS__) +#define msg_debug_lua_redis(...) rspamd_conditional_debug_fast(NULL, NULL, \ + rspamd_lua_redis_log_id, "lua_redis", ud->log_tag, \ + G_STRFUNC, \ + __VA_ARGS__) INIT_LOG_MODULE(lua_redis) #define LUA_REDIS_SPECIFIC_REPLIED (1 << 0) @@ -142,8 +142,8 @@ struct lua_redis_ctx { struct lua_redis_userdata async; guint cmds_pending; ref_entry_t ref; - GQueue *replies; /* for sync connection only */ - GQueue *events_cleanup; /* for sync connection only */ + GQueue *replies; /* for sync connection only */ + GQueue *events_cleanup; /* for sync connection only */ struct thread_entry *thread; /* for sync mode, set only if there was yield */ }; @@ -157,30 +157,30 @@ struct lua_redis_result { }; static struct lua_redis_ctx * -lua_check_redis (lua_State * L, gint pos) +lua_check_redis(lua_State *L, gint pos) { - void *ud = rspamd_lua_check_udata (L, pos, "rspamd{redis}"); - luaL_argcheck (L, ud != NULL, pos, "'redis' expected"); - return ud ? *((struct lua_redis_ctx **)ud) : NULL; + void *ud = rspamd_lua_check_udata(L, pos, "rspamd{redis}"); + luaL_argcheck(L, ud != NULL, pos, "'redis' expected"); + return ud ? *((struct lua_redis_ctx **) ud) : NULL; } static void -lua_redis_free_args (char **args, gsize *arglens, guint nargs) +lua_redis_free_args(char **args, gsize *arglens, guint nargs) { guint i; if (args) { - for (i = 0; i < nargs; i ++) { - g_free (args[i]); + for (i = 0; i < nargs; i++) { + g_free(args[i]); } - g_free (args); - g_free (arglens); + g_free(args); + g_free(arglens); } } static void -lua_redis_dtor (struct lua_redis_ctx *ctx) +lua_redis_dtor(struct lua_redis_ctx *ctx) { struct lua_redis_userdata *ud; struct lua_redis_request_specific_userdata *cur, *tmp; @@ -188,12 +188,13 @@ lua_redis_dtor (struct lua_redis_ctx *ctx) struct redisAsyncContext *ac; ud = &ctx->async; - msg_debug_lua_redis ("destructing %p", ctx); + msg_debug_lua_redis("destructing %p", ctx); if (ud->ctx) { - LL_FOREACH_SAFE (ud->specific, cur, tmp) { - ev_timer_stop (ud->event_loop, &cur->timeout_ev); + LL_FOREACH_SAFE(ud->specific, cur, tmp) + { + ev_timer_stop(ud->event_loop, &cur->timeout_ev); if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) { is_successful = FALSE; @@ -209,53 +210,52 @@ lua_redis_dtor (struct lua_redis_ctx *ctx) ud->ctx = NULL; if (!is_successful) { - rspamd_redis_pool_release_connection (ud->pool, ac, - RSPAMD_REDIS_RELEASE_FATAL); + rspamd_redis_pool_release_connection(ud->pool, ac, + RSPAMD_REDIS_RELEASE_FATAL); } else { - rspamd_redis_pool_release_connection (ud->pool, ac, - (ctx->flags & LUA_REDIS_NO_POOL) ? - RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT); + rspamd_redis_pool_release_connection(ud->pool, ac, + (ctx->flags & LUA_REDIS_NO_POOL) ? RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT); } - } - LL_FOREACH_SAFE (ud->specific, cur, tmp) { - lua_redis_free_args (cur->args, cur->arglens, cur->nargs); + LL_FOREACH_SAFE(ud->specific, cur, tmp) + { + lua_redis_free_args(cur->args, cur->arglens, cur->nargs); if (cur->cbref != -1) { - luaL_unref (ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref); + luaL_unref(ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref); } - g_free (cur); + g_free(cur); } if (ctx->events_cleanup) { - g_queue_free (ctx->events_cleanup); + g_queue_free(ctx->events_cleanup); ctx->events_cleanup = NULL; } if (ctx->replies) { - g_queue_free (ctx->replies); + g_queue_free(ctx->replies); ctx->replies = NULL; } - g_free (ctx); + g_free(ctx); } static gint -lua_redis_gc (lua_State *L) +lua_redis_gc(lua_State *L) { - struct lua_redis_ctx *ctx = lua_check_redis (L, 1); + struct lua_redis_ctx *ctx = lua_check_redis(L, 1); if (ctx) { - REDIS_RELEASE (ctx); + REDIS_RELEASE(ctx); } return 0; } static void -lua_redis_fin (void *arg) +lua_redis_fin(void *arg) { struct lua_redis_request_specific_userdata *sp_ud = arg; struct lua_redis_userdata *ud; @@ -264,15 +264,15 @@ lua_redis_fin (void *arg) ctx = sp_ud->ctx; ud = sp_ud->c; - if (ev_can_stop (&sp_ud->timeout_ev)) { - ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev); + if (ev_can_stop(&sp_ud->timeout_ev)) { + ev_timer_stop(sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev); } - msg_debug_lua_redis ("finished redis query %p from session %p; refcount=%d", - sp_ud, ctx, ctx->ref.refcount); + msg_debug_lua_redis("finished redis query %p from session %p; refcount=%d", + sp_ud, ctx, ctx->ref.refcount); sp_ud->flags |= LUA_REDIS_SPECIFIC_FINISHED; - REDIS_RELEASE (ctx); + REDIS_RELEASE(ctx); } /** @@ -281,93 +281,93 @@ lua_redis_fin (void *arg) * @param ud */ static void -lua_redis_push_error (const gchar *err, - struct lua_redis_ctx *ctx, - struct lua_redis_request_specific_userdata *sp_ud, - gboolean connected) +lua_redis_push_error(const gchar *err, + struct lua_redis_ctx *ctx, + struct lua_redis_request_specific_userdata *sp_ud, + gboolean connected) { struct lua_redis_userdata *ud = sp_ud->c; struct lua_callback_state cbs; lua_State *L; - if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) { + if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED | LUA_REDIS_SPECIFIC_FINISHED))) { if (sp_ud->cbref != -1) { - lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs); + lua_thread_pool_prepare_callback(ud->cfg->lua_thread_pool, &cbs); L = cbs.L; - lua_pushcfunction (L, &rspamd_lua_traceback); - int err_idx = lua_gettop (L); + lua_pushcfunction(L, &rspamd_lua_traceback); + int err_idx = lua_gettop(L); /* Push error */ - lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref); + lua_rawgeti(cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref); /* String of error */ - lua_pushstring (cbs.L, err); + lua_pushstring(cbs.L, err); /* Data is nil */ - lua_pushnil (cbs.L); + lua_pushnil(cbs.L); if (ud->item) { - rspamd_symcache_set_cur_item (ud->task, ud->item); + rspamd_symcache_set_cur_item(ud->task, ud->item); } - if (lua_pcall (cbs.L, 2, 0, err_idx) != 0) { - msg_info ("call to callback failed: %s", lua_tostring (cbs.L, -1)); + if (lua_pcall(cbs.L, 2, 0, err_idx) != 0) { + msg_info("call to callback failed: %s", lua_tostring(cbs.L, -1)); } - lua_settop (L, err_idx - 1); - lua_thread_pool_restore_callback (&cbs); + lua_settop(L, err_idx - 1); + lua_thread_pool_restore_callback(&cbs); } sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; if (connected && ud->s) { if (ud->item) { - rspamd_symcache_item_async_dec_check (ud->task, ud->item, M); + rspamd_symcache_item_async_dec_check(ud->task, ud->item, M); } - rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud); + rspamd_session_remove_event(ud->s, lua_redis_fin, sp_ud); } else { - lua_redis_fin (sp_ud); + lua_redis_fin(sp_ud); } } } static void -lua_redis_push_reply (lua_State *L, const redisReply *r, gboolean text_data) +lua_redis_push_reply(lua_State *L, const redisReply *r, gboolean text_data) { guint i; struct rspamd_lua_text *t; switch (r->type) { case REDIS_REPLY_INTEGER: - lua_pushinteger (L, r->integer); + lua_pushinteger(L, r->integer); break; case REDIS_REPLY_NIL: - lua_getfield (L, LUA_REGISTRYINDEX, "redis.null"); + lua_getfield(L, LUA_REGISTRYINDEX, "redis.null"); break; case REDIS_REPLY_STRING: case REDIS_REPLY_STATUS: if (text_data) { - t = lua_newuserdata (L, sizeof (*t)); - rspamd_lua_setclass (L, "rspamd{text}", -1); + t = lua_newuserdata(L, sizeof(*t)); + rspamd_lua_setclass(L, "rspamd{text}", -1); t->flags = 0; t->start = r->str; t->len = r->len; } else { - lua_pushlstring (L, r->str, r->len); + lua_pushlstring(L, r->str, r->len); } break; case REDIS_REPLY_ARRAY: - lua_createtable (L, r->elements, 0); + lua_createtable(L, r->elements, 0); for (i = 0; i < r->elements; ++i) { - lua_redis_push_reply (L, r->element[i], text_data); - lua_rawseti (L, -2, i + 1); /* Store sub-reply */ + lua_redis_push_reply(L, r->element[i], text_data); + lua_rawseti(L, -2, i + 1); /* Store sub-reply */ } break; default: /* should not happen */ - msg_info ("unknown reply type: %d", r->type); + msg_info("unknown reply type: %d", r->type); break; } } @@ -378,48 +378,48 @@ lua_redis_push_reply (lua_State *L, const redisReply *r, gboolean text_data) * @param ud */ static void -lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, - struct lua_redis_request_specific_userdata *sp_ud) +lua_redis_push_data(const redisReply *r, struct lua_redis_ctx *ctx, + struct lua_redis_request_specific_userdata *sp_ud) { struct lua_redis_userdata *ud = sp_ud->c; struct lua_callback_state cbs; lua_State *L; - if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED)) || - (sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { + if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED | LUA_REDIS_SPECIFIC_FINISHED)) || + (sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { if (sp_ud->cbref != -1) { - lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs); + lua_thread_pool_prepare_callback(ud->cfg->lua_thread_pool, &cbs); L = cbs.L; - lua_pushcfunction (L, &rspamd_lua_traceback); - int err_idx = lua_gettop (L); + lua_pushcfunction(L, &rspamd_lua_traceback); + int err_idx = lua_gettop(L); /* Push error */ - lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref); + lua_rawgeti(cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref); /* Error is nil */ - lua_pushnil (cbs.L); + lua_pushnil(cbs.L); /* Data */ - lua_redis_push_reply (cbs.L, r, ctx->flags & LUA_REDIS_TEXTDATA); + lua_redis_push_reply(cbs.L, r, ctx->flags & LUA_REDIS_TEXTDATA); if (ud->item) { - rspamd_symcache_set_cur_item (ud->task, ud->item); + rspamd_symcache_set_cur_item(ud->task, ud->item); } - gint ret = lua_pcall (cbs.L, 2, 0, err_idx); + gint ret = lua_pcall(cbs.L, 2, 0, err_idx); if (ret != 0) { - msg_info ("call to lua_redis callback failed (%d): %s", - ret, lua_tostring (cbs.L, -1)); + msg_info("call to lua_redis callback failed (%d): %s", + ret, lua_tostring(cbs.L, -1)); } - lua_settop (L, err_idx - 1); - lua_thread_pool_restore_callback (&cbs); + lua_settop(L, err_idx - 1); + lua_thread_pool_restore_callback(&cbs); } if (sp_ud->flags & LUA_REDIS_SUBSCRIBED) { if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_REPLIED)) { - if (ev_can_stop (&sp_ud->timeout_ev)) { - ev_timer_stop (sp_ud->ctx->async.event_loop, - &sp_ud->timeout_ev); + if (ev_can_stop(&sp_ud->timeout_ev)) { + ev_timer_stop(sp_ud->ctx->async.event_loop, + &sp_ud->timeout_ev); } } } @@ -429,14 +429,14 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { if (ud->s) { if (ud->item) { - rspamd_symcache_item_async_dec_check (ud->task, - ud->item, M); + rspamd_symcache_item_async_dec_check(ud->task, + ud->item, M); } - rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud); + rspamd_session_remove_event(ud->s, lua_redis_fin, sp_ud); } else { - lua_redis_fin (sp_ud); + lua_redis_fin(sp_ud); } } } @@ -449,7 +449,7 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, * @param priv userdata */ static void -lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) +lua_redis_callback(redisAsyncContext *c, gpointer r, gpointer priv) { redisReply *reply = r; struct lua_redis_request_specific_userdata *sp_ud = priv; @@ -465,33 +465,33 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) return; } - msg_debug_lua_redis ("got reply from redis %p for query %p", sp_ud->c->ctx, - sp_ud); + msg_debug_lua_redis("got reply from redis %p for query %p", sp_ud->c->ctx, + sp_ud); - REDIS_RETAIN (ctx); + REDIS_RETAIN(ctx); /* If session is finished, we cannot call lua callbacks */ if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) || - (sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { + (sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { if (c->err == 0) { if (r != NULL) { if (reply->type != REDIS_REPLY_ERROR) { - lua_redis_push_data (reply, ctx, sp_ud); + lua_redis_push_data(reply, ctx, sp_ud); } else { - lua_redis_push_error (reply->str, ctx, sp_ud, TRUE); + lua_redis_push_error(reply->str, ctx, sp_ud, TRUE); } } else { - lua_redis_push_error ("received no data from server", ctx, sp_ud, TRUE); + lua_redis_push_error("received no data from server", ctx, sp_ud, TRUE); } } else { if (c->err == REDIS_ERR_IO) { - lua_redis_push_error (strerror (errno), ctx, sp_ud, TRUE); + lua_redis_push_error(strerror(errno), ctx, sp_ud, TRUE); } else { - lua_redis_push_error (c->errstr, ctx, sp_ud, TRUE); + lua_redis_push_error(c->errstr, ctx, sp_ud, TRUE); } } } @@ -506,73 +506,72 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) ud->ctx = NULL; if (ac) { - msg_debug_lua_redis ("release redis connection ud=%p; ctx=%p; refcount=%d", - ud, ctx, ctx->ref.refcount); - rspamd_redis_pool_release_connection (ud->pool, ac, - (ctx->flags & LUA_REDIS_NO_POOL) ? - RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT); + msg_debug_lua_redis("release redis connection ud=%p; ctx=%p; refcount=%d", + ud, ctx, ctx->ref.refcount); + rspamd_redis_pool_release_connection(ud->pool, ac, + (ctx->flags & LUA_REDIS_NO_POOL) ? RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT); } } } - REDIS_RELEASE (ctx); + REDIS_RELEASE(ctx); } static gint -lua_redis_push_results (struct lua_redis_ctx *ctx, lua_State *L) +lua_redis_push_results(struct lua_redis_ctx *ctx, lua_State *L) { - gint results = g_queue_get_length (ctx->replies); + gint results = g_queue_get_length(ctx->replies); gint i; gboolean can_use_lua = TRUE; - results = g_queue_get_length (ctx->replies); + results = g_queue_get_length(ctx->replies); - if (!lua_checkstack (L, (results * 2) + 1)) { - luaL_error (L, "cannot resize stack to fit %d commands", - ctx->cmds_pending); + if (!lua_checkstack(L, (results * 2) + 1)) { + luaL_error(L, "cannot resize stack to fit %d commands", + ctx->cmds_pending); can_use_lua = FALSE; } - for (i = 0; i < results; i ++) { - struct lua_redis_result *result = g_queue_pop_head (ctx->replies); + for (i = 0; i < results; i++) { + struct lua_redis_result *result = g_queue_pop_head(ctx->replies); if (can_use_lua) { - lua_pushboolean (L, !result->is_error); - lua_rawgeti (L, LUA_REGISTRYINDEX, result->result_ref); + lua_pushboolean(L, !result->is_error); + lua_rawgeti(L, LUA_REGISTRYINDEX, result->result_ref); } - luaL_unref (L, LUA_REGISTRYINDEX, result->result_ref); + luaL_unref(L, LUA_REGISTRYINDEX, result->result_ref); - g_queue_push_tail (ctx->events_cleanup, result); + g_queue_push_tail(ctx->events_cleanup, result); } return can_use_lua ? results * 2 : 0; } static void -lua_redis_cleanup_events (struct lua_redis_ctx *ctx) +lua_redis_cleanup_events(struct lua_redis_ctx *ctx) { - REDIS_RETAIN (ctx); /* To avoid preliminary destruction */ + REDIS_RETAIN(ctx); /* To avoid preliminary destruction */ - while (!g_queue_is_empty (ctx->events_cleanup)) { - struct lua_redis_result *result = g_queue_pop_head (ctx->events_cleanup); + while (!g_queue_is_empty(ctx->events_cleanup)) { + struct lua_redis_result *result = g_queue_pop_head(ctx->events_cleanup); if (result->item) { - rspamd_symcache_item_async_dec_check (result->task, result->item, M); + rspamd_symcache_item_async_dec_check(result->task, result->item, M); } if (result->s) { - rspamd_session_remove_event (result->s, lua_redis_fin, result->sp_ud); + rspamd_session_remove_event(result->s, lua_redis_fin, result->sp_ud); } else { - lua_redis_fin (result->sp_ud); + lua_redis_fin(result->sp_ud); } - g_free (result); + g_free(result); } - REDIS_RELEASE (ctx); + REDIS_RELEASE(ctx); } /** @@ -582,14 +581,14 @@ lua_redis_cleanup_events (struct lua_redis_ctx *ctx) * @param priv userdata */ static void -lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv) +lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv) { redisReply *reply = r; struct lua_redis_request_specific_userdata *sp_ud = priv; struct lua_redis_ctx *ctx; struct lua_redis_userdata *ud; - struct thread_entry* thread; + struct thread_entry *thread; gint results; ctx = sp_ud->ctx; @@ -607,38 +606,38 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv) return; } - if (ev_can_stop ( &sp_ud->timeout_ev)) { - ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev); + if (ev_can_stop(&sp_ud->timeout_ev)) { + ev_timer_stop(ud->event_loop, &sp_ud->timeout_ev); } if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) { - msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud); + msg_debug_lua_redis("got reply from redis: %p for query %p", ac, sp_ud); - struct lua_redis_result *result = g_malloc0 (sizeof *result); + struct lua_redis_result *result = g_malloc0(sizeof *result); if (ac->err == 0) { if (r != NULL) { if (reply->type != REDIS_REPLY_ERROR) { result->is_error = FALSE; - lua_redis_push_reply (L, reply, ctx->flags & LUA_REDIS_TEXTDATA); + lua_redis_push_reply(L, reply, ctx->flags & LUA_REDIS_TEXTDATA); } else { result->is_error = TRUE; - lua_pushstring (L, reply->str); + lua_pushstring(L, reply->str); } } else { result->is_error = TRUE; - lua_pushliteral (L, "received no data from server"); + lua_pushliteral(L, "received no data from server"); } } else { result->is_error = TRUE; if (ac->err == REDIS_ERR_IO) { - lua_pushstring (L, strerror (errno)); + lua_pushstring(L, strerror(errno)); } else { - lua_pushstring (L, ac->errstr); + lua_pushstring(L, ac->errstr); } } @@ -655,21 +654,20 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv) * This will call all callbacks pending so the entire context * will be destructed */ - rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, - RSPAMD_REDIS_RELEASE_FATAL); + rspamd_redis_pool_release_connection(sp_ud->c->pool, ac, + RSPAMD_REDIS_RELEASE_FATAL); } - result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX); + result->result_ref = luaL_ref(L, LUA_REGISTRYINDEX); result->s = ud->s; result->item = ud->item; result->task = ud->task; result->sp_ud = sp_ud; - g_queue_push_tail (ctx->replies, result); - + g_queue_push_tail(ctx->replies, result); } - ctx->cmds_pending --; + ctx->cmds_pending--; if (ctx->cmds_pending == 0) { if (ctx->thread) { @@ -681,28 +679,27 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv) results = lua_redis_push_results(ctx, thread->lua_state); if (ud->item) { - rspamd_symcache_set_cur_item (ud->task, ud->item); + rspamd_symcache_set_cur_item(ud->task, ud->item); } - lua_thread_resume (thread, results); + lua_thread_resume(thread, results); lua_redis_cleanup_events(ctx); } else { /* We cannot resume the thread as the associated task has gone */ - lua_thread_pool_terminate_entry_full (ud->cfg->lua_thread_pool, - ctx->thread, G_STRLOC, true); + lua_thread_pool_terminate_entry_full(ud->cfg->lua_thread_pool, + ctx->thread, G_STRLOC, true); ctx->thread = NULL; } } } - } static void -lua_redis_timeout_sync (EV_P_ ev_timer *w, int revents) +lua_redis_timeout_sync(EV_P_ ev_timer *w, int revents) { struct lua_redis_request_specific_userdata *sp_ud = - (struct lua_redis_request_specific_userdata *)w->data; + (struct lua_redis_request_specific_userdata *) w->data; struct lua_redis_ctx *ctx; struct lua_redis_userdata *ud; redisAsyncContext *ac; @@ -713,8 +710,8 @@ lua_redis_timeout_sync (EV_P_ ev_timer *w, int revents) ud = sp_ud->c; ctx = sp_ud->ctx; - msg_debug_lua_redis ("timeout while querying redis server: %p, redis: %p", sp_ud, - sp_ud->c->ctx); + msg_debug_lua_redis("timeout while querying redis server: %p, redis: %p", sp_ud, + sp_ud->c->ctx); if (sp_ud->c->ctx) { ac = sp_ud->c->ctx; @@ -729,16 +726,16 @@ lua_redis_timeout_sync (EV_P_ ev_timer *w, int revents) * This will call all callbacks pending so the entire context * will be destructed */ - rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, - RSPAMD_REDIS_RELEASE_FATAL); + rspamd_redis_pool_release_connection(sp_ud->c->pool, ac, + RSPAMD_REDIS_RELEASE_FATAL); } } static void -lua_redis_timeout (EV_P_ ev_timer *w, int revents) +lua_redis_timeout(EV_P_ ev_timer *w, int revents) { struct lua_redis_request_specific_userdata *sp_ud = - (struct lua_redis_request_specific_userdata *)w->data; + (struct lua_redis_request_specific_userdata *) w->data; struct lua_redis_userdata *ud; struct lua_redis_ctx *ctx; redisAsyncContext *ac; @@ -750,10 +747,10 @@ lua_redis_timeout (EV_P_ ev_timer *w, int revents) ctx = sp_ud->ctx; ud = sp_ud->c; - REDIS_RETAIN (ctx); - msg_debug_lua_redis ("timeout while querying redis server: %p, redis: %p", sp_ud, - sp_ud->c->ctx); - lua_redis_push_error ("timeout while connecting the server", ctx, sp_ud, TRUE); + REDIS_RETAIN(ctx); + msg_debug_lua_redis("timeout while querying redis server: %p, redis: %p", sp_ud, + sp_ud->c->ctx); + lua_redis_push_error("timeout while connecting the server", ctx, sp_ud, TRUE); if (sp_ud->c->ctx) { ac = sp_ud->c->ctx; @@ -765,102 +762,102 @@ lua_redis_timeout (EV_P_ ev_timer *w, int revents) * This will call all callbacks pending so the entire context * will be destructed */ - rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, - RSPAMD_REDIS_RELEASE_FATAL); + rspamd_redis_pool_release_connection(sp_ud->c->pool, ac, + RSPAMD_REDIS_RELEASE_FATAL); } - REDIS_RELEASE (ctx); + REDIS_RELEASE(ctx); } static void -lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd, - gchar ***pargs, gsize **parglens, guint *nargs) +lua_redis_parse_args(lua_State *L, gint idx, const gchar *cmd, + gchar ***pargs, gsize **parglens, guint *nargs) { gchar **args = NULL; gsize *arglens; gint top; - if (idx != 0 && lua_type (L, idx) == LUA_TTABLE) { + if (idx != 0 && lua_type(L, idx) == LUA_TTABLE) { /* Get all arguments */ - lua_pushvalue (L, idx); - lua_pushnil (L); + lua_pushvalue(L, idx); + lua_pushnil(L); top = 0; - while (lua_next (L, -2) != 0) { - gint type = lua_type (L, -1); + while (lua_next(L, -2) != 0) { + gint type = lua_type(L, -1); if (type == LUA_TNUMBER || type == LUA_TSTRING || - type == LUA_TUSERDATA) { - top ++; + type == LUA_TUSERDATA) { + top++; } - lua_pop (L, 1); + lua_pop(L, 1); } - args = g_malloc ((top + 1) * sizeof (gchar *)); - arglens = g_malloc ((top + 1) * sizeof (gsize)); - arglens[0] = strlen (cmd); - args[0] = g_malloc (arglens[0]); - memcpy (args[0], cmd, arglens[0]); + args = g_malloc((top + 1) * sizeof(gchar *)); + arglens = g_malloc((top + 1) * sizeof(gsize)); + arglens[0] = strlen(cmd); + args[0] = g_malloc(arglens[0]); + memcpy(args[0], cmd, arglens[0]); top = 1; - lua_pushnil (L); + lua_pushnil(L); - while (lua_next (L, -2) != 0) { - gint type = lua_type (L, -1); + while (lua_next(L, -2) != 0) { + gint type = lua_type(L, -1); if (type == LUA_TSTRING) { const gchar *s; - s = lua_tolstring (L, -1, &arglens[top]); - args[top] = g_malloc (arglens[top]); - memcpy (args[top], s, arglens[top]); - top ++; + s = lua_tolstring(L, -1, &arglens[top]); + args[top] = g_malloc(arglens[top]); + memcpy(args[top], s, arglens[top]); + top++; } else if (type == LUA_TUSERDATA) { struct rspamd_lua_text *t; - t = lua_check_text (L, -1); + t = lua_check_text(L, -1); if (t && t->start) { arglens[top] = t->len; - args[top] = g_malloc (arglens[top]); - memcpy (args[top], t->start, arglens[top]); - top ++; + args[top] = g_malloc(arglens[top]); + memcpy(args[top], t->start, arglens[top]); + top++; } } else if (type == LUA_TNUMBER) { - gdouble val = lua_tonumber (L, -1); + gdouble val = lua_tonumber(L, -1); gint r; gchar numbuf[64]; - if (val == (gdouble)((gint64)val)) { - r = rspamd_snprintf (numbuf, sizeof (numbuf), "%L", - (gint64)val); + if (val == (gdouble) ((gint64) val)) { + r = rspamd_snprintf(numbuf, sizeof(numbuf), "%L", + (gint64) val); } else { - r = rspamd_snprintf (numbuf, sizeof (numbuf), "%f", - val); + r = rspamd_snprintf(numbuf, sizeof(numbuf), "%f", + val); } arglens[top] = r; - args[top] = g_malloc (arglens[top]); - memcpy (args[top], numbuf, arglens[top]); - top ++; + args[top] = g_malloc(arglens[top]); + memcpy(args[top], numbuf, arglens[top]); + top++; } - lua_pop (L, 1); + lua_pop(L, 1); } - lua_pop (L, 1); + lua_pop(L, 1); } else { /* Use merely cmd */ - args = g_malloc (sizeof (gchar *)); - arglens = g_malloc (sizeof (gsize)); - arglens[0] = strlen (cmd); - args[0] = g_malloc (arglens[0]); - memcpy (args[0], cmd, arglens[0]); + args = g_malloc(sizeof(gchar *)); + arglens = g_malloc(sizeof(gsize)); + arglens[0] = strlen(cmd); + args[0] = g_malloc(arglens[0]); + memcpy(args[0], cmd, arglens[0]); top = 1; } @@ -870,7 +867,7 @@ lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd, } static struct lua_redis_ctx * -rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_async) +rspamd_lua_redis_prepare_connection(lua_State *L, gint *pcbref, gboolean is_async) { struct lua_redis_ctx *ctx = NULL; rspamd_inet_addr_t *ip = NULL; @@ -886,47 +883,47 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_asy gboolean ret = FALSE; guint flags = 0; - if (lua_istable (L, 1)) { + if (lua_istable(L, 1)) { /* Table version */ - lua_pushvalue (L, 1); - lua_pushstring (L, "task"); - lua_gettable (L, -2); - if (lua_type (L, -1) == LUA_TUSERDATA) { - task = lua_check_task_maybe (L, -1); + lua_pushvalue(L, 1); + lua_pushstring(L, "task"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TUSERDATA) { + task = lua_check_task_maybe(L, -1); } - lua_pop (L, 1); + lua_pop(L, 1); if (!task) { /* We need to get ev_base, config and session separately */ - lua_pushstring (L, "config"); - lua_gettable (L, -2); - if (lua_type (L, -1) == LUA_TUSERDATA) { - cfg = lua_check_config (L, -1); + lua_pushstring(L, "config"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TUSERDATA) { + cfg = lua_check_config(L, -1); } - lua_pop (L, 1); + lua_pop(L, 1); - lua_pushstring (L, "session"); - lua_gettable (L, -2); - if (lua_type (L, -1) == LUA_TUSERDATA) { - session = lua_check_session (L, -1); + lua_pushstring(L, "session"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TUSERDATA) { + session = lua_check_session(L, -1); } - lua_pop (L, 1); + lua_pop(L, 1); - lua_pushstring (L, "ev_base"); - lua_gettable (L, -2); - if (lua_type (L, -1) == LUA_TUSERDATA) { - ev_base = lua_check_ev_base (L, -1); + lua_pushstring(L, "ev_base"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TUSERDATA) { + ev_base = lua_check_ev_base(L, -1); } - lua_pop (L, 1); + lua_pop(L, 1); if (cfg && ev_base) { ret = TRUE; } else if (!cfg) { - msg_err_task_check ("config is not passed"); + msg_err_task_check("config is not passed"); } else { - msg_err_task_check ("ev_base is not set"); + msg_err_task_check("ev_base is not set"); } } else { @@ -935,92 +932,90 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_asy ev_base = task->event_loop; log_tag = task->task_pool->tag.uid; ret = TRUE; - } if (pcbref) { - lua_pushstring (L, "callback"); - lua_gettable (L, -2); - if (lua_type (L, -1) == LUA_TFUNCTION) { + lua_pushstring(L, "callback"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TFUNCTION) { /* This also pops function from the stack */ - cbref = luaL_ref (L, LUA_REGISTRYINDEX); + cbref = luaL_ref(L, LUA_REGISTRYINDEX); *pcbref = cbref; } else { *pcbref = -1; - lua_pop (L, 1); + lua_pop(L, 1); } } - lua_pushstring (L, "host"); - lua_gettable (L, -2); + lua_pushstring(L, "host"); + lua_gettable(L, -2); - if (lua_type (L, -1) == LUA_TUSERDATA) { - addr = lua_check_ip (L, -1); - host = rspamd_inet_address_to_string_pretty (addr->addr); + if (lua_type(L, -1) == LUA_TUSERDATA) { + addr = lua_check_ip(L, -1); + host = rspamd_inet_address_to_string_pretty(addr->addr); } - else if (lua_type (L, -1) == LUA_TSTRING) { - host = lua_tostring (L, -1); + else if (lua_type(L, -1) == LUA_TSTRING) { + host = lua_tostring(L, -1); - if (rspamd_parse_inet_address (&ip, - host, strlen (host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) { - addr = g_alloca (sizeof (*addr)); + if (rspamd_parse_inet_address(&ip, + host, strlen(host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) { + addr = g_alloca(sizeof(*addr)); addr->addr = ip; - if (rspamd_inet_address_get_port (ip) == 0) { - rspamd_inet_address_set_port (ip, 6379); + if (rspamd_inet_address_get_port(ip) == 0) { + rspamd_inet_address_set_port(ip, 6379); } } } - lua_pop (L, 1); + lua_pop(L, 1); - lua_pushstring (L, "password"); - lua_gettable (L, -2); - if (lua_type (L, -1) == LUA_TSTRING) { - password = lua_tostring (L, -1); + lua_pushstring(L, "password"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TSTRING) { + password = lua_tostring(L, -1); } - lua_pop (L, 1); + lua_pop(L, 1); - lua_pushstring (L, "dbname"); - lua_gettable (L, -2); - if (lua_type (L, -1) == LUA_TSTRING) { - dbname = lua_tostring (L, -1); + lua_pushstring(L, "dbname"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TSTRING) { + dbname = lua_tostring(L, -1); } - lua_pop (L, 1); + lua_pop(L, 1); - lua_pushstring (L, "opaque_data"); - lua_gettable (L, -2); - if (!!lua_toboolean (L, -1)) { + lua_pushstring(L, "opaque_data"); + lua_gettable(L, -2); + if (!!lua_toboolean(L, -1)) { flags |= LUA_REDIS_TEXTDATA; } - lua_pop (L, 1); + lua_pop(L, 1); - lua_pushstring (L, "no_pool"); - lua_gettable (L, -2); - if (!!lua_toboolean (L, -1)) { + lua_pushstring(L, "no_pool"); + lua_gettable(L, -2); + if (!!lua_toboolean(L, -1)) { flags |= LUA_REDIS_NO_POOL; } - lua_pop (L, 1); + lua_pop(L, 1); - lua_pop (L, 1); /* table */ + lua_pop(L, 1); /* table */ - if (session && rspamd_session_blocked (session)) { - msg_err_task_check ("Session is being destroying"); + if (session && rspamd_session_blocked(session)) { + msg_err_task_check("Session is being destroying"); ret = FALSE; } if (ret && addr != NULL) { - ctx = g_malloc0 (sizeof (struct lua_redis_ctx)); - REF_INIT_RETAIN (ctx, lua_redis_dtor); + ctx = g_malloc0(sizeof(struct lua_redis_ctx)); + REF_INIT_RETAIN(ctx, lua_redis_dtor); if (is_async) { ctx->flags |= flags | LUA_REDIS_ASYNC; ud = &ctx->async; } else { ud = &ctx->async; - ctx->replies = g_queue_new (); - ctx->events_cleanup = g_queue_new (); - + ctx->replies = g_queue_new(); + ctx->events_cleanup = g_queue_new(); } ud->s = session; @@ -1030,68 +1025,68 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_asy ud->task = task; if (log_tag) { - rspamd_strlcpy (ud->log_tag, log_tag, sizeof (ud->log_tag)); + rspamd_strlcpy(ud->log_tag, log_tag, sizeof(ud->log_tag)); } else { /* Use pointer itself as a tag */ - rspamd_snprintf (ud->log_tag, sizeof (ud->log_tag), - "%ud", - (int)rspamd_cryptobox_fast_hash (&ud, sizeof (ud), 0)); + rspamd_snprintf(ud->log_tag, sizeof(ud->log_tag), + "%ud", + (int) rspamd_cryptobox_fast_hash(&ud, sizeof(ud), 0)); } if (task) { - ud->item = rspamd_symcache_get_cur_item (task); + ud->item = rspamd_symcache_get_cur_item(task); } ret = TRUE; } else { if (cbref != -1) { - luaL_unref (L, LUA_REGISTRYINDEX, cbref); + luaL_unref(L, LUA_REGISTRYINDEX, cbref); } - msg_err_task_check ("incorrect function invocation"); + msg_err_task_check("incorrect function invocation"); ret = FALSE; } } if (ret) { ud->terminated = 0; - ud->ctx = rspamd_redis_pool_connect (ud->pool, - dbname, password, - rspamd_inet_address_to_string (addr->addr), - rspamd_inet_address_get_port (addr->addr)); + ud->ctx = rspamd_redis_pool_connect(ud->pool, + dbname, password, + rspamd_inet_address_to_string(addr->addr), + rspamd_inet_address_get_port(addr->addr)); if (ip) { - rspamd_inet_address_free (ip); + rspamd_inet_address_free(ip); } if (ud->ctx == NULL || ud->ctx->err) { if (ud->ctx) { - msg_err_task_check ("cannot connect to redis: %s", - ud->ctx->errstr); - rspamd_redis_pool_release_connection (ud->pool, ud->ctx, - RSPAMD_REDIS_RELEASE_FATAL); + msg_err_task_check("cannot connect to redis: %s", + ud->ctx->errstr); + rspamd_redis_pool_release_connection(ud->pool, ud->ctx, + RSPAMD_REDIS_RELEASE_FATAL); ud->ctx = NULL; } else { - msg_err_task_check ("cannot connect to redis (OS error): %s", - strerror (errno)); + msg_err_task_check("cannot connect to redis (OS error): %s", + strerror(errno)); } - REDIS_RELEASE (ctx); + REDIS_RELEASE(ctx); return NULL; } - msg_debug_lua_redis ("opened redis connection host=%s; ctx=%p; ud=%p", - host, ctx, ud); + msg_debug_lua_redis("opened redis connection host=%s; ctx=%p; ud=%p", + host, ctx, ud); return ctx; } if (ip) { - rspamd_inet_address_free (ip); + rspamd_inet_address_free(ip); } return NULL; @@ -1109,7 +1104,7 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_asy * @return {boolean} `true` if a request has been scheduled */ static int -lua_redis_make_request (lua_State *L) +lua_redis_make_request(lua_State *L) { LUA_TRACE_POINT; struct lua_redis_request_specific_userdata *sp_ud; @@ -1120,94 +1115,94 @@ lua_redis_make_request (lua_State *L) gint cbref = -1; gboolean ret = FALSE; - ctx = rspamd_lua_redis_prepare_connection (L, &cbref, TRUE); + ctx = rspamd_lua_redis_prepare_connection(L, &cbref, TRUE); if (ctx) { ud = &ctx->async; - sp_ud = g_malloc0 (sizeof (*sp_ud)); + sp_ud = g_malloc0(sizeof(*sp_ud)); sp_ud->cbref = cbref; sp_ud->c = ud; sp_ud->ctx = ctx; - lua_pushstring (L, "cmd"); - lua_gettable (L, -2); - cmd = lua_tostring (L, -1); - lua_pop (L, 1); + lua_pushstring(L, "cmd"); + lua_gettable(L, -2); + cmd = lua_tostring(L, -1); + lua_pop(L, 1); - lua_pushstring (L, "timeout"); - lua_gettable (L, 1); - if (lua_type (L, -1) == LUA_TNUMBER) { - timeout = lua_tonumber (L, -1); + lua_pushstring(L, "timeout"); + lua_gettable(L, 1); + if (lua_type(L, -1) == LUA_TNUMBER) { + timeout = lua_tonumber(L, -1); } - lua_pop (L, 1); + lua_pop(L, 1); ud->timeout = timeout; - lua_pushstring (L, "args"); - lua_gettable (L, 1); - lua_redis_parse_args (L, -1, cmd, &sp_ud->args, &sp_ud->arglens, - &sp_ud->nargs); - lua_pop (L, 1); - LL_PREPEND (ud->specific, sp_ud); + lua_pushstring(L, "args"); + lua_gettable(L, 1); + lua_redis_parse_args(L, -1, cmd, &sp_ud->args, &sp_ud->arglens, + &sp_ud->nargs); + lua_pop(L, 1); + LL_PREPEND(ud->specific, sp_ud); - ret = redisAsyncCommandArgv (ud->ctx, - lua_redis_callback, - sp_ud, - sp_ud->nargs, - (const gchar **)sp_ud->args, - sp_ud->arglens); + ret = redisAsyncCommandArgv(ud->ctx, + lua_redis_callback, + sp_ud, + sp_ud->nargs, + (const gchar **) sp_ud->args, + sp_ud->arglens); if (ret == REDIS_OK) { if (ud->s) { - rspamd_session_add_event (ud->s, - lua_redis_fin, sp_ud, - M); + rspamd_session_add_event(ud->s, + lua_redis_fin, sp_ud, + M); if (ud->item) { - rspamd_symcache_item_async_inc (ud->task, ud->item, M); + rspamd_symcache_item_async_inc(ud->task, ud->item, M); } } - REDIS_RETAIN (ctx); /* Cleared by fin event */ - ctx->cmds_pending ++; + REDIS_RETAIN(ctx); /* Cleared by fin event */ + ctx->cmds_pending++; if (ud->ctx->c.flags & REDIS_SUBSCRIBED) { - msg_debug_lua_redis ("subscribe command, never unref/timeout"); + msg_debug_lua_redis("subscribe command, never unref/timeout"); sp_ud->flags |= LUA_REDIS_SUBSCRIBED; } sp_ud->timeout_ev.data = sp_ud; - ev_now_update_if_cheap ((struct ev_loop *)ud->event_loop); - ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout, timeout, 0.0); - ev_timer_start (ud->event_loop, &sp_ud->timeout_ev); + ev_now_update_if_cheap((struct ev_loop *) ud->event_loop); + ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout, timeout, 0.0); + ev_timer_start(ud->event_loop, &sp_ud->timeout_ev); ret = TRUE; } else { - msg_info ("call to redis failed: %s", ud->ctx->errstr); - rspamd_redis_pool_release_connection (ud->pool, ud->ctx, - RSPAMD_REDIS_RELEASE_FATAL); + msg_info("call to redis failed: %s", ud->ctx->errstr); + rspamd_redis_pool_release_connection(ud->pool, ud->ctx, + RSPAMD_REDIS_RELEASE_FATAL); ud->ctx = NULL; - REDIS_RELEASE (ctx); + REDIS_RELEASE(ctx); ret = FALSE; } } else { - lua_pushboolean (L, FALSE); - lua_pushnil (L); + lua_pushboolean(L, FALSE); + lua_pushnil(L); return 2; } - lua_pushboolean (L, ret); + lua_pushboolean(L, ret); if (ret) { - pctx = lua_newuserdata (L, sizeof (ctx)); + pctx = lua_newuserdata(L, sizeof(ctx)); *pctx = ctx; - rspamd_lua_setclass (L, "rspamd{redis}", -1); + rspamd_lua_setclass(L, "rspamd{redis}", -1); } else { - lua_pushnil (L); + lua_pushnil(L); } return 2; @@ -1223,7 +1218,7 @@ lua_redis_make_request (lua_State *L) * @return {boolean + result} `true` and a result if a request has been successful */ static int -lua_redis_make_request_sync (lua_State *L) +lua_redis_make_request_sync(lua_State *L) { LUA_TRACE_POINT; struct rspamd_lua_ip *addr = NULL; @@ -1238,56 +1233,56 @@ lua_redis_make_request_sync (lua_State *L) redisContext *ctx; redisReply *r; - if (lua_istable (L, 1)) { - lua_pushvalue (L, 1); + if (lua_istable(L, 1)) { + lua_pushvalue(L, 1); - lua_pushstring (L, "cmd"); - lua_gettable (L, -2); - cmd = lua_tostring (L, -1); - lua_pop (L, 1); + lua_pushstring(L, "cmd"); + lua_gettable(L, -2); + cmd = lua_tostring(L, -1); + lua_pop(L, 1); - lua_pushstring (L, "host"); - lua_gettable (L, -2); - if (lua_type (L, -1) == LUA_TUSERDATA) { - addr = lua_check_ip (L, -1); + lua_pushstring(L, "host"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TUSERDATA) { + addr = lua_check_ip(L, -1); } - else if (lua_type (L, -1) == LUA_TSTRING) { - host = lua_tostring (L, -1); - if (rspamd_parse_inet_address (&ip, - host, strlen (host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) { - addr = g_alloca (sizeof (*addr)); + else if (lua_type(L, -1) == LUA_TSTRING) { + host = lua_tostring(L, -1); + if (rspamd_parse_inet_address(&ip, + host, strlen(host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) { + addr = g_alloca(sizeof(*addr)); addr->addr = ip; - if (rspamd_inet_address_get_port (ip) == 0) { - rspamd_inet_address_set_port (ip, 6379); + if (rspamd_inet_address_get_port(ip) == 0) { + rspamd_inet_address_set_port(ip, 6379); } } } - lua_pop (L, 1); + lua_pop(L, 1); - lua_pushstring (L, "timeout"); - lua_gettable (L, -2); - if (lua_type (L, -1) == LUA_TNUMBER) { - timeout = lua_tonumber (L, -1); + lua_pushstring(L, "timeout"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TNUMBER) { + timeout = lua_tonumber(L, -1); } - lua_pop (L, 1); + lua_pop(L, 1); - lua_pushstring (L, "opaque_data"); - lua_gettable (L, -2); - if (!!lua_toboolean (L, -1)) { + lua_pushstring(L, "opaque_data"); + lua_gettable(L, -2); + if (!!lua_toboolean(L, -1)) { flags |= LUA_REDIS_TEXTDATA; } - lua_pop (L, 1); + lua_pop(L, 1); if (cmd) { - lua_pushstring (L, "args"); - lua_gettable (L, -2); - lua_redis_parse_args (L, -1, cmd, &args, &arglens, &nargs); - lua_pop (L, 1); + lua_pushstring(L, "args"); + lua_gettable(L, -2); + lua_redis_parse_args(L, -1, cmd, &args, &arglens, &nargs); + lua_pop(L, 1); } - lua_pop (L, 1); + lua_pop(L, 1); if (addr && cmd) { ret = TRUE; @@ -1295,66 +1290,66 @@ lua_redis_make_request_sync (lua_State *L) } if (ret) { - double_to_tv (timeout, &tv); + double_to_tv(timeout, &tv); - if (rspamd_inet_address_get_af (addr->addr) == AF_UNIX) { - ctx = redisConnectUnixWithTimeout ( - rspamd_inet_address_to_string (addr->addr), tv); + if (rspamd_inet_address_get_af(addr->addr) == AF_UNIX) { + ctx = redisConnectUnixWithTimeout( + rspamd_inet_address_to_string(addr->addr), tv); } else { - ctx = redisConnectWithTimeout ( - rspamd_inet_address_to_string (addr->addr), - rspamd_inet_address_get_port (addr->addr), tv); + ctx = redisConnectWithTimeout( + rspamd_inet_address_to_string(addr->addr), + rspamd_inet_address_get_port(addr->addr), tv); } if (ip) { - rspamd_inet_address_free (ip); + rspamd_inet_address_free(ip); } if (ctx == NULL || ctx->err) { - redisFree (ctx); - lua_redis_free_args (args, arglens, nargs); - lua_pushboolean (L, FALSE); + redisFree(ctx); + lua_redis_free_args(args, arglens, nargs); + lua_pushboolean(L, FALSE); return 1; } - r = redisCommandArgv (ctx, - nargs, - (const gchar **)args, - arglens); + r = redisCommandArgv(ctx, + nargs, + (const gchar **) args, + arglens); if (r != NULL) { if (r->type != REDIS_REPLY_ERROR) { - lua_pushboolean (L, TRUE); - lua_redis_push_reply (L, r, flags & LUA_REDIS_TEXTDATA); + lua_pushboolean(L, TRUE); + lua_redis_push_reply(L, r, flags & LUA_REDIS_TEXTDATA); } else { - lua_pushboolean (L, FALSE); - lua_pushstring (L, r->str); + lua_pushboolean(L, FALSE); + lua_pushstring(L, r->str); } - freeReplyObject (r); - redisFree (ctx); - lua_redis_free_args (args, arglens, nargs); + freeReplyObject(r); + redisFree(ctx); + lua_redis_free_args(args, arglens, nargs); return 2; } else { - msg_info ("call to redis failed: %s", ctx->errstr); - redisFree (ctx); - lua_redis_free_args (args, arglens, nargs); - lua_pushboolean (L, FALSE); + msg_info("call to redis failed: %s", ctx->errstr); + redisFree(ctx); + lua_redis_free_args(args, arglens, nargs); + lua_pushboolean(L, FALSE); } } else { if (ip) { - rspamd_inet_address_free (ip); + rspamd_inet_address_free(ip); } - msg_err ("bad arguments for redis request"); - lua_redis_free_args (args, arglens, nargs); + msg_err("bad arguments for redis request"); + lua_redis_free_args(args, arglens, nargs); - lua_pushboolean (L, FALSE); + lua_pushboolean(L, FALSE); } return 1; @@ -1369,38 +1364,38 @@ lua_redis_make_request_sync (lua_State *L) * @return {boolean,redis} new connection object or nil if connection failed */ static int -lua_redis_connect (lua_State *L) +lua_redis_connect(lua_State *L) { LUA_TRACE_POINT; struct lua_redis_userdata *ud; struct lua_redis_ctx *ctx, **pctx; gdouble timeout = REDIS_DEFAULT_TIMEOUT; - ctx = rspamd_lua_redis_prepare_connection (L, NULL, TRUE); + ctx = rspamd_lua_redis_prepare_connection(L, NULL, TRUE); if (ctx) { ud = &ctx->async; - lua_pushstring (L, "timeout"); - lua_gettable (L, 1); - if (lua_type (L, -1) == LUA_TNUMBER) { - timeout = lua_tonumber (L, -1); + lua_pushstring(L, "timeout"); + lua_gettable(L, 1); + if (lua_type(L, -1) == LUA_TNUMBER) { + timeout = lua_tonumber(L, -1); } - lua_pop (L, 1); + lua_pop(L, 1); ud->timeout = timeout; } else { - lua_pushboolean (L, FALSE); - lua_pushnil (L); + lua_pushboolean(L, FALSE); + lua_pushnil(L); return 2; } - lua_pushboolean (L, TRUE); - pctx = lua_newuserdata (L, sizeof (ctx)); + lua_pushboolean(L, TRUE); + pctx = lua_newuserdata(L, sizeof(ctx)); *pctx = ctx; - rspamd_lua_setclass (L, "rspamd{redis}", -1); + rspamd_lua_setclass(L, "rspamd{redis}", -1); return 2; } @@ -1413,34 +1408,34 @@ lua_redis_connect (lua_State *L) * @return {redis} redis object if a request has been successful */ static int -lua_redis_connect_sync (lua_State *L) +lua_redis_connect_sync(lua_State *L) { LUA_TRACE_POINT; gdouble timeout = REDIS_DEFAULT_TIMEOUT; struct lua_redis_ctx *ctx, **pctx; - ctx = rspamd_lua_redis_prepare_connection (L, NULL, FALSE); + ctx = rspamd_lua_redis_prepare_connection(L, NULL, FALSE); if (ctx) { - if (lua_istable (L, 1)) { - lua_pushstring (L, "timeout"); - lua_gettable (L, 1); - if (lua_type (L, -1) == LUA_TNUMBER) { - timeout = lua_tonumber (L, -1); + if (lua_istable(L, 1)) { + lua_pushstring(L, "timeout"); + lua_gettable(L, 1); + if (lua_type(L, -1) == LUA_TNUMBER) { + timeout = lua_tonumber(L, -1); } - lua_pop (L, 1); + lua_pop(L, 1); } ctx->async.timeout = timeout; - lua_pushboolean (L, TRUE); - pctx = lua_newuserdata (L, sizeof (ctx)); + lua_pushboolean(L, TRUE); + pctx = lua_newuserdata(L, sizeof(ctx)); *pctx = ctx; - rspamd_lua_setclass (L, "rspamd{redis}", -1); + rspamd_lua_setclass(L, "rspamd{redis}", -1); } else { - lua_pushboolean (L, FALSE); - lua_pushstring (L, "bad arguments for redis request"); + lua_pushboolean(L, FALSE); + lua_pushstring(L, "bad arguments for redis request"); return 2; } @@ -1455,10 +1450,10 @@ lua_redis_connect_sync (lua_State *L) * @return {boolean} `true` if a request has been successful */ static int -lua_redis_add_cmd (lua_State *L) +lua_redis_add_cmd(lua_State *L) { LUA_TRACE_POINT; - struct lua_redis_ctx *ctx = lua_check_redis (L, 1); + struct lua_redis_ctx *ctx = lua_check_redis(L, 1); struct lua_redis_request_specific_userdata *sp_ud; struct lua_redis_userdata *ud; const gchar *cmd = NULL; @@ -1467,30 +1462,30 @@ lua_redis_add_cmd (lua_State *L) if (ctx) { if (ctx->flags & LUA_REDIS_TERMINATED) { - lua_pushboolean (L, FALSE); - lua_pushstring (L, "Connection is terminated"); + lua_pushboolean(L, FALSE); + lua_pushstring(L, "Connection is terminated"); return 2; } /* Async version */ - if (lua_type (L, 2) == LUA_TSTRING) { + if (lua_type(L, 2) == LUA_TSTRING) { /* No callback version */ - cmd = lua_tostring (L, 2); + cmd = lua_tostring(L, 2); args_pos = 3; } - else if (lua_type (L, 2) == LUA_TFUNCTION) { - lua_pushvalue (L, 2); - cbref = luaL_ref (L, LUA_REGISTRYINDEX); - cmd = lua_tostring (L, 3); + else if (lua_type(L, 2) == LUA_TFUNCTION) { + lua_pushvalue(L, 2); + cbref = luaL_ref(L, LUA_REGISTRYINDEX); + cmd = lua_tostring(L, 3); args_pos = 4; } else { - return luaL_error (L, "invalid arguments"); + return luaL_error(L, "invalid arguments"); } - sp_ud = g_malloc0 (sizeof (*sp_ud)); - if (IS_ASYNC (ctx)) { + sp_ud = g_malloc0(sizeof(*sp_ud)); + if (IS_ASYNC(ctx)) { sp_ud->c = &ctx->async; ud = &ctx->async; sp_ud->cbref = cbref; @@ -1501,73 +1496,73 @@ lua_redis_add_cmd (lua_State *L) } sp_ud->ctx = ctx; - lua_redis_parse_args (L, args_pos, cmd, &sp_ud->args, - &sp_ud->arglens, &sp_ud->nargs); + lua_redis_parse_args(L, args_pos, cmd, &sp_ud->args, + &sp_ud->arglens, &sp_ud->nargs); - LL_PREPEND (sp_ud->c->specific, sp_ud); + LL_PREPEND(sp_ud->c->specific, sp_ud); - if (ud->s && rspamd_session_blocked (ud->s)) { - lua_pushboolean (L, 0); - lua_pushstring (L, "session is terminating"); + if (ud->s && rspamd_session_blocked(ud->s)) { + lua_pushboolean(L, 0); + lua_pushstring(L, "session is terminating"); return 2; } - if (IS_ASYNC (ctx)) { - ret = redisAsyncCommandArgv (sp_ud->c->ctx, - lua_redis_callback, - sp_ud, - sp_ud->nargs, - (const gchar **)sp_ud->args, - sp_ud->arglens); + if (IS_ASYNC(ctx)) { + ret = redisAsyncCommandArgv(sp_ud->c->ctx, + lua_redis_callback, + sp_ud, + sp_ud->nargs, + (const gchar **) sp_ud->args, + sp_ud->arglens); } else { - ret = redisAsyncCommandArgv (sp_ud->c->ctx, - lua_redis_callback_sync, - sp_ud, - sp_ud->nargs, - (const gchar **)sp_ud->args, - sp_ud->arglens); + ret = redisAsyncCommandArgv(sp_ud->c->ctx, + lua_redis_callback_sync, + sp_ud, + sp_ud->nargs, + (const gchar **) sp_ud->args, + sp_ud->arglens); } if (ret == REDIS_OK) { if (ud->s) { - rspamd_session_add_event (ud->s, - lua_redis_fin, - sp_ud, - M); + rspamd_session_add_event(ud->s, + lua_redis_fin, + sp_ud, + M); if (ud->item) { - rspamd_symcache_item_async_inc (ud->task, ud->item, M); + rspamd_symcache_item_async_inc(ud->task, ud->item, M); } } sp_ud->timeout_ev.data = sp_ud; - if (IS_ASYNC (ctx)) { - ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout, - sp_ud->c->timeout, 0.0); + if (IS_ASYNC(ctx)) { + ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout, + sp_ud->c->timeout, 0.0); } else { - ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout_sync, - sp_ud->c->timeout, 0.0); + ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout_sync, + sp_ud->c->timeout, 0.0); } - ev_timer_start (ud->event_loop, &sp_ud->timeout_ev); - REDIS_RETAIN (ctx); - ctx->cmds_pending ++; + ev_timer_start(ud->event_loop, &sp_ud->timeout_ev); + REDIS_RETAIN(ctx); + ctx->cmds_pending++; } else { - msg_info ("call to redis failed: %s", - sp_ud->c->ctx->errstr); - lua_pushboolean (L, 0); - lua_pushstring (L, sp_ud->c->ctx->errstr); + msg_info("call to redis failed: %s", + sp_ud->c->ctx->errstr); + lua_pushboolean(L, 0); + lua_pushstring(L, sp_ud->c->ctx->errstr); return 2; } } - lua_pushboolean (L, true); + lua_pushboolean(L, true); return 1; } @@ -1578,66 +1573,66 @@ lua_redis_add_cmd (lua_State *L) * @return {boolean}, {table}, ...: pairs in format [bool, result] for each request pending */ static int -lua_redis_exec (lua_State *L) +lua_redis_exec(lua_State *L) { LUA_TRACE_POINT; - struct lua_redis_ctx *ctx = lua_check_redis (L, 1); + struct lua_redis_ctx *ctx = lua_check_redis(L, 1); if (ctx == NULL) { - lua_error (L); + lua_error(L); return 1; } - if (IS_ASYNC (ctx)) { - lua_pushstring (L, "Async redis pipelining is not implemented"); - lua_error (L); + if (IS_ASYNC(ctx)) { + lua_pushstring(L, "Async redis pipelining is not implemented"); + lua_error(L); return 0; } else { - if (ctx->cmds_pending == 0 && g_queue_get_length (ctx->replies) == 0) { - lua_pushstring (L, "No pending commands to execute"); - lua_error (L); + if (ctx->cmds_pending == 0 && g_queue_get_length(ctx->replies) == 0) { + lua_pushstring(L, "No pending commands to execute"); + lua_error(L); } - if (ctx->cmds_pending == 0 && g_queue_get_length (ctx->replies) > 0) { - gint results = lua_redis_push_results (ctx, L); + if (ctx->cmds_pending == 0 && g_queue_get_length(ctx->replies) > 0) { + gint results = lua_redis_push_results(ctx, L); return results; } else { - ctx->thread = lua_thread_pool_get_running_entry (ctx->async.cfg->lua_thread_pool); - return lua_thread_yield (ctx->thread, 0); + ctx->thread = lua_thread_pool_get_running_entry(ctx->async.cfg->lua_thread_pool); + return lua_thread_yield(ctx->thread, 0); } } } static gint -lua_load_redis (lua_State * L) +lua_load_redis(lua_State *L) { - lua_newtable (L); - luaL_register (L, NULL, redislib_f); + lua_newtable(L); + luaL_register(L, NULL, redislib_f); return 1; } static gint -lua_redis_null_idx (lua_State *L) +lua_redis_null_idx(lua_State *L) { - lua_pushnil (L); + lua_pushnil(L); return 1; } static void -lua_redis_null_mt (lua_State *L) +lua_redis_null_mt(lua_State *L) { - luaL_newmetatable (L, "redis{null}"); + luaL_newmetatable(L, "redis{null}"); - lua_pushcfunction (L, lua_redis_null_idx); - lua_setfield (L, -2, "__index"); - lua_pushcfunction (L, lua_redis_null_idx); - lua_setfield (L, -2, "__tostring"); + lua_pushcfunction(L, lua_redis_null_idx); + lua_setfield(L, -2, "__index"); + lua_pushcfunction(L, lua_redis_null_idx); + lua_setfield(L, -2, "__tostring"); - lua_pop (L, 1); + lua_pop(L, 1); } /** @@ -1645,17 +1640,16 @@ lua_redis_null_mt (lua_State *L) * @param L lua stack * @return */ -void -luaopen_redis (lua_State * L) +void luaopen_redis(lua_State *L) { - rspamd_lua_new_class (L, "rspamd{redis}", redislib_m); - lua_pop (L, 1); - rspamd_lua_add_preload (L, "rspamd_redis", lua_load_redis); + rspamd_lua_new_class(L, "rspamd{redis}", redislib_m); + lua_pop(L, 1); + rspamd_lua_add_preload(L, "rspamd_redis", lua_load_redis); /* Set null element */ - lua_redis_null_mt (L); - redis_null = lua_newuserdata (L, 0); - luaL_getmetatable (L, "redis{null}"); - lua_setmetatable (L, -2); - lua_setfield (L, LUA_REGISTRYINDEX, "redis.null"); + lua_redis_null_mt(L); + redis_null = lua_newuserdata(L, 0); + luaL_getmetatable(L, "redis{null}"); + lua_setmetatable(L, -2); + lua_setfield(L, LUA_REGISTRYINDEX, "redis.null"); } |