diff options
Diffstat (limited to 'src/libserver/fuzzy_backend/fuzzy_backend_redis.c')
-rw-r--r-- | src/libserver/fuzzy_backend/fuzzy_backend_redis.c | 1289 |
1 files changed, 640 insertions, 649 deletions
diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_redis.c b/src/libserver/fuzzy_backend/fuzzy_backend_redis.c index 390119fbc..2f9d1ed10 100644 --- a/src/libserver/fuzzy_backend/fuzzy_backend_redis.c +++ b/src/libserver/fuzzy_backend/fuzzy_backend_redis.c @@ -30,22 +30,22 @@ #define REDIS_DEFAULT_OBJECT "fuzzy" #define REDIS_DEFAULT_TIMEOUT 2.0 -#define msg_err_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ - "fuzzy_redis", session->backend->id, \ - G_STRFUNC, \ - __VA_ARGS__) -#define msg_warn_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ - "fuzzy_redis", session->backend->id, \ - G_STRFUNC, \ - __VA_ARGS__) -#define msg_info_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ - "fuzzy_redis", session->backend->id, \ - G_STRFUNC, \ - __VA_ARGS__) -#define msg_debug_redis_session(...) rspamd_conditional_debug_fast (NULL, NULL, \ - rspamd_fuzzy_redis_log_id, "fuzzy_redis", session->backend->id, \ - G_STRFUNC, \ - __VA_ARGS__) +#define msg_err_redis_session(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \ + "fuzzy_redis", session->backend->id, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_warn_redis_session(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \ + "fuzzy_redis", session->backend->id, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_info_redis_session(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \ + "fuzzy_redis", session->backend->id, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_redis_session(...) rspamd_conditional_debug_fast(NULL, NULL, \ + rspamd_fuzzy_redis_log_id, "fuzzy_redis", session->backend->id, \ + G_STRFUNC, \ + __VA_ARGS__) INIT_LOG_MODULE(fuzzy_redis) @@ -101,53 +101,53 @@ struct rspamd_fuzzy_redis_session { }; static inline struct upstream_list * -rspamd_redis_get_servers (struct rspamd_fuzzy_backend_redis *ctx, - const gchar *what) +rspamd_redis_get_servers(struct rspamd_fuzzy_backend_redis *ctx, + const gchar *what) { lua_State *L = ctx->L; struct upstream_list *res = NULL; - lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->conf_ref); - lua_pushstring (L, what); - lua_gettable (L, -2); + lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->conf_ref); + lua_pushstring(L, what); + lua_gettable(L, -2); - if (lua_type (L, -1) == LUA_TUSERDATA) { - res = *((struct upstream_list **) lua_touserdata (L, -1)); + if (lua_type(L, -1) == LUA_TUSERDATA) { + res = *((struct upstream_list **) lua_touserdata(L, -1)); } else { struct lua_logger_trace tr; gchar outbuf[8192]; - memset (&tr, 0, sizeof (tr)); - lua_logger_out_type (L, -2, outbuf, sizeof (outbuf) - 1, &tr, - LUA_ESCAPE_UNPRINTABLE); + memset(&tr, 0, sizeof(tr)); + lua_logger_out_type(L, -2, outbuf, sizeof(outbuf) - 1, &tr, + LUA_ESCAPE_UNPRINTABLE); - msg_err ("cannot get %s upstreams for Redis fuzzy storage %s; table content: %s", + msg_err("cannot get %s upstreams for Redis fuzzy storage %s; table content: %s", what, ctx->id, outbuf); } - lua_settop (L, 0); + lua_settop(L, 0); return res; } static inline void -rspamd_fuzzy_redis_session_free_args (struct rspamd_fuzzy_redis_session *session) +rspamd_fuzzy_redis_session_free_args(struct rspamd_fuzzy_redis_session *session) { guint i; if (session->argv) { - for (i = 0; i < session->nargs; i ++) { - g_free (session->argv[i]); + for (i = 0; i < session->nargs; i++) { + g_free(session->argv[i]); } - g_free (session->argv); - g_free (session->argv_lens); + g_free(session->argv); + g_free(session->argv_lens); } } static void -rspamd_fuzzy_redis_session_dtor (struct rspamd_fuzzy_redis_session *session, - gboolean is_fatal) +rspamd_fuzzy_redis_session_dtor(struct rspamd_fuzzy_redis_session *session, + gboolean is_fatal) { redisAsyncContext *ac; @@ -155,144 +155,144 @@ rspamd_fuzzy_redis_session_dtor (struct rspamd_fuzzy_redis_session *session, if (session->ctx) { ac = session->ctx; session->ctx = NULL; - rspamd_redis_pool_release_connection (session->backend->pool, - ac, - is_fatal ? RSPAMD_REDIS_RELEASE_FATAL : RSPAMD_REDIS_RELEASE_DEFAULT); + rspamd_redis_pool_release_connection(session->backend->pool, + ac, + is_fatal ? RSPAMD_REDIS_RELEASE_FATAL : RSPAMD_REDIS_RELEASE_DEFAULT); } - ev_timer_stop (session->event_loop, &session->timeout); - rspamd_fuzzy_redis_session_free_args (session); + ev_timer_stop(session->event_loop, &session->timeout); + rspamd_fuzzy_redis_session_free_args(session); - REF_RELEASE (session->backend); - rspamd_upstream_unref (session->up); - g_free (session); + REF_RELEASE(session->backend); + rspamd_upstream_unref(session->up); + g_free(session); } static void -rspamd_fuzzy_backend_redis_dtor (struct rspamd_fuzzy_backend_redis *backend) +rspamd_fuzzy_backend_redis_dtor(struct rspamd_fuzzy_backend_redis *backend) { if (!backend->terminated && backend->conf_ref != -1) { - luaL_unref (backend->L, LUA_REGISTRYINDEX, backend->conf_ref); + luaL_unref(backend->L, LUA_REGISTRYINDEX, backend->conf_ref); } if (backend->id) { - g_free (backend->id); + g_free(backend->id); } - g_free (backend); + g_free(backend); } -void* -rspamd_fuzzy_backend_init_redis (struct rspamd_fuzzy_backend *bk, - const ucl_object_t *obj, struct rspamd_config *cfg, GError **err) +void * +rspamd_fuzzy_backend_init_redis(struct rspamd_fuzzy_backend *bk, + const ucl_object_t *obj, struct rspamd_config *cfg, GError **err) { struct rspamd_fuzzy_backend_redis *backend; const ucl_object_t *elt; gboolean ret = FALSE; guchar id_hash[rspamd_cryptobox_HASHBYTES]; rspamd_cryptobox_hash_state_t st; - lua_State *L = (lua_State *)cfg->lua_state; + lua_State *L = (lua_State *) cfg->lua_state; gint conf_ref = -1; - backend = g_malloc0 (sizeof (*backend)); + backend = g_malloc0(sizeof(*backend)); backend->timeout = REDIS_DEFAULT_TIMEOUT; backend->redis_object = REDIS_DEFAULT_OBJECT; backend->L = L; - ret = rspamd_lua_try_load_redis (L, obj, cfg, &conf_ref); + ret = rspamd_lua_try_load_redis(L, obj, cfg, &conf_ref); /* Now try global redis settings */ if (!ret) { - elt = ucl_object_lookup (cfg->rcl_obj, "redis"); + elt = ucl_object_lookup(cfg->rcl_obj, "redis"); if (elt) { const ucl_object_t *specific_obj; - specific_obj = ucl_object_lookup_any (elt, "fuzzy", "fuzzy_storage", - NULL); + specific_obj = ucl_object_lookup_any(elt, "fuzzy", "fuzzy_storage", + NULL); if (specific_obj) { - ret = rspamd_lua_try_load_redis (L, specific_obj, cfg, &conf_ref); + ret = rspamd_lua_try_load_redis(L, specific_obj, cfg, &conf_ref); } else { - ret = rspamd_lua_try_load_redis (L, elt, cfg, &conf_ref); + ret = rspamd_lua_try_load_redis(L, elt, cfg, &conf_ref); } } } if (!ret) { - msg_err_config ("cannot init redis backend for fuzzy storage"); - g_free (backend); + msg_err_config("cannot init redis backend for fuzzy storage"); + g_free(backend); return NULL; } - elt = ucl_object_lookup (obj, "prefix"); - if (elt == NULL || ucl_object_type (elt) != UCL_STRING) { + elt = ucl_object_lookup(obj, "prefix"); + if (elt == NULL || ucl_object_type(elt) != UCL_STRING) { backend->redis_object = REDIS_DEFAULT_OBJECT; } else { - backend->redis_object = ucl_object_tostring (elt); + backend->redis_object = ucl_object_tostring(elt); } backend->conf_ref = conf_ref; /* Check some common table values */ - lua_rawgeti (L, LUA_REGISTRYINDEX, conf_ref); + lua_rawgeti(L, LUA_REGISTRYINDEX, conf_ref); - lua_pushstring (L, "timeout"); - lua_gettable (L, -2); - if (lua_type (L, -1) == LUA_TNUMBER) { - backend->timeout = lua_tonumber (L, -1); + lua_pushstring(L, "timeout"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TNUMBER) { + backend->timeout = lua_tonumber(L, -1); } - lua_pop (L, 1); + lua_pop(L, 1); - lua_pushstring (L, "db"); - lua_gettable (L, -2); - if (lua_type (L, -1) == LUA_TSTRING) { - backend->dbname = rspamd_mempool_strdup (cfg->cfg_pool, - lua_tostring (L, -1)); + lua_pushstring(L, "db"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TSTRING) { + backend->dbname = rspamd_mempool_strdup(cfg->cfg_pool, + lua_tostring(L, -1)); } - lua_pop (L, 1); + lua_pop(L, 1); - lua_pushstring (L, "password"); - lua_gettable (L, -2); - if (lua_type (L, -1) == LUA_TSTRING) { - backend->password = rspamd_mempool_strdup (cfg->cfg_pool, - lua_tostring (L, -1)); + lua_pushstring(L, "password"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TSTRING) { + backend->password = rspamd_mempool_strdup(cfg->cfg_pool, + lua_tostring(L, -1)); } - lua_pop (L, 1); + lua_pop(L, 1); - lua_settop (L, 0); + lua_settop(L, 0); - REF_INIT_RETAIN (backend, rspamd_fuzzy_backend_redis_dtor); + REF_INIT_RETAIN(backend, rspamd_fuzzy_backend_redis_dtor); backend->pool = cfg->redis_pool; - rspamd_cryptobox_hash_init (&st, NULL, 0); - rspamd_cryptobox_hash_update (&st, backend->redis_object, - strlen (backend->redis_object)); + rspamd_cryptobox_hash_init(&st, NULL, 0); + rspamd_cryptobox_hash_update(&st, backend->redis_object, + strlen(backend->redis_object)); if (backend->dbname) { - rspamd_cryptobox_hash_update (&st, backend->dbname, - strlen (backend->dbname)); + rspamd_cryptobox_hash_update(&st, backend->dbname, + strlen(backend->dbname)); } if (backend->password) { - rspamd_cryptobox_hash_update (&st, backend->password, - strlen (backend->password)); + rspamd_cryptobox_hash_update(&st, backend->password, + strlen(backend->password)); } - rspamd_cryptobox_hash_final (&st, id_hash); - backend->id = rspamd_encode_base32 (id_hash, sizeof (id_hash), RSPAMD_BASE32_DEFAULT); + rspamd_cryptobox_hash_final(&st, id_hash); + backend->id = rspamd_encode_base32(id_hash, sizeof(id_hash), RSPAMD_BASE32_DEFAULT); return backend; } static void -rspamd_fuzzy_redis_timeout (EV_P_ ev_timer *w, int revents) +rspamd_fuzzy_redis_timeout(EV_P_ ev_timer *w, int revents) { struct rspamd_fuzzy_redis_session *session = - (struct rspamd_fuzzy_redis_session *)w->data; + (struct rspamd_fuzzy_redis_session *) w->data; redisAsyncContext *ac; static char errstr[128]; @@ -301,17 +301,17 @@ rspamd_fuzzy_redis_timeout (EV_P_ ev_timer *w, int revents) session->ctx = NULL; ac->err = REDIS_ERR_IO; /* Should be safe as in hiredis it is char[128] */ - rspamd_snprintf (errstr, sizeof (errstr), "%s", strerror (ETIMEDOUT)); + rspamd_snprintf(errstr, sizeof(errstr), "%s", strerror(ETIMEDOUT)); ac->errstr = errstr; /* This will cause session closing */ - rspamd_redis_pool_release_connection (session->backend->pool, - ac, RSPAMD_REDIS_RELEASE_FATAL); + rspamd_redis_pool_release_connection(session->backend->pool, + ac, RSPAMD_REDIS_RELEASE_FATAL); } } -static void rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r, - gpointer priv); +static void rspamd_fuzzy_redis_check_callback(redisAsyncContext *c, gpointer r, + gpointer priv); struct _rspamd_fuzzy_shingles_helper { guchar digest[64]; @@ -319,17 +319,17 @@ struct _rspamd_fuzzy_shingles_helper { }; static gint -rspamd_fuzzy_backend_redis_shingles_cmp (const void *a, const void *b) +rspamd_fuzzy_backend_redis_shingles_cmp(const void *a, const void *b) { const struct _rspamd_fuzzy_shingles_helper *sha = a, - *shb = b; + *shb = b; - return memcmp (sha->digest, shb->digest, sizeof (sha->digest)); + return memcmp(sha->digest, shb->digest, sizeof(sha->digest)); } static void -rspamd_fuzzy_redis_shingles_callback (redisAsyncContext *c, gpointer r, - gpointer priv) +rspamd_fuzzy_redis_shingles_callback(redisAsyncContext *c, gpointer r, + gpointer priv) { struct rspamd_fuzzy_redis_session *session = priv; redisReply *reply = r, *cur; @@ -338,46 +338,46 @@ rspamd_fuzzy_redis_shingles_callback (redisAsyncContext *c, gpointer r, struct _rspamd_fuzzy_shingles_helper *shingles, *prev = NULL, *sel = NULL; guint i, found = 0, max_found = 0, cur_found = 0; - ev_timer_stop (session->event_loop, &session->timeout); - memset (&rep, 0, sizeof (rep)); + ev_timer_stop(session->event_loop, &session->timeout); + memset(&rep, 0, sizeof(rep)); if (c->err == 0 && reply != NULL) { - rspamd_upstream_ok (session->up); + rspamd_upstream_ok(session->up); if (reply->type == REDIS_REPLY_ARRAY && - reply->elements == RSPAMD_SHINGLE_SIZE) { - shingles = g_alloca (sizeof (struct _rspamd_fuzzy_shingles_helper) * - RSPAMD_SHINGLE_SIZE); + reply->elements == RSPAMD_SHINGLE_SIZE) { + shingles = g_alloca(sizeof(struct _rspamd_fuzzy_shingles_helper) * + RSPAMD_SHINGLE_SIZE); - for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { cur = reply->element[i]; if (cur->type == REDIS_REPLY_STRING) { shingles[i].found = 1; - memcpy (shingles[i].digest, cur->str, MIN (64, cur->len)); - found ++; + memcpy(shingles[i].digest, cur->str, MIN(64, cur->len)); + found++; } else { - memset (shingles[i].digest, 0, sizeof (shingles[i].digest)); + memset(shingles[i].digest, 0, sizeof(shingles[i].digest)); shingles[i].found = 0; } } if (found > RSPAMD_SHINGLE_SIZE / 2) { /* Now sort to find the most frequent element */ - qsort (shingles, RSPAMD_SHINGLE_SIZE, - sizeof (struct _rspamd_fuzzy_shingles_helper), - rspamd_fuzzy_backend_redis_shingles_cmp); + qsort(shingles, RSPAMD_SHINGLE_SIZE, + sizeof(struct _rspamd_fuzzy_shingles_helper), + rspamd_fuzzy_backend_redis_shingles_cmp); prev = &shingles[0]; - for (i = 1; i < RSPAMD_SHINGLE_SIZE; i ++) { + for (i = 1; i < RSPAMD_SHINGLE_SIZE; i++) { if (!shingles[i].found) { continue; } - if (memcmp (shingles[i].digest, prev->digest, 64) == 0) { - cur_found ++; + if (memcmp(shingles[i].digest, prev->digest, 64) == 0) { + cur_found++; if (cur_found > max_found) { max_found = cur_found; @@ -391,55 +391,55 @@ rspamd_fuzzy_redis_shingles_callback (redisAsyncContext *c, gpointer r, } if (max_found > RSPAMD_SHINGLE_SIZE / 2) { - session->prob = ((float)max_found) / RSPAMD_SHINGLE_SIZE; + session->prob = ((float) max_found) / RSPAMD_SHINGLE_SIZE; rep.v1.prob = session->prob; - g_assert (sel != NULL); + g_assert(sel != NULL); /* Prepare new check command */ - rspamd_fuzzy_redis_session_free_args (session); + rspamd_fuzzy_redis_session_free_args(session); session->nargs = 5; - session->argv = g_malloc (sizeof (gchar *) * session->nargs); - session->argv_lens = g_malloc (sizeof (gsize) * session->nargs); + session->argv = g_malloc(sizeof(gchar *) * session->nargs); + session->argv_lens = g_malloc(sizeof(gsize) * session->nargs); - key = g_string_new (session->backend->redis_object); - g_string_append_len (key, sel->digest, sizeof (sel->digest)); - session->argv[0] = g_strdup ("HMGET"); + key = g_string_new(session->backend->redis_object); + g_string_append_len(key, sel->digest, sizeof(sel->digest)); + session->argv[0] = g_strdup("HMGET"); session->argv_lens[0] = 5; session->argv[1] = key->str; session->argv_lens[1] = key->len; - session->argv[2] = g_strdup ("V"); + session->argv[2] = g_strdup("V"); session->argv_lens[2] = 1; - session->argv[3] = g_strdup ("F"); + session->argv[3] = g_strdup("F"); session->argv_lens[3] = 1; - session->argv[4] = g_strdup ("C"); + session->argv[4] = g_strdup("C"); session->argv_lens[4] = 1; - g_string_free (key, FALSE); /* Do not free underlying array */ - memcpy (session->found_digest, sel->digest, - sizeof (session->cmd->digest)); + g_string_free(key, FALSE); /* Do not free underlying array */ + memcpy(session->found_digest, sel->digest, + sizeof(session->cmd->digest)); - g_assert (session->ctx != NULL); - if (redisAsyncCommandArgv (session->ctx, - rspamd_fuzzy_redis_check_callback, - session, session->nargs, - (const gchar **)session->argv, - session->argv_lens) != REDIS_OK) { + g_assert(session->ctx != NULL); + if (redisAsyncCommandArgv(session->ctx, + rspamd_fuzzy_redis_check_callback, + session, session->nargs, + (const gchar **) session->argv, + session->argv_lens) != REDIS_OK) { if (session->callback.cb_check) { - memset (&rep, 0, sizeof (rep)); - session->callback.cb_check (&rep, session->cbdata); + memset(&rep, 0, sizeof(rep)); + session->callback.cb_check(&rep, session->cbdata); } - rspamd_fuzzy_redis_session_dtor (session, TRUE); + rspamd_fuzzy_redis_session_dtor(session, TRUE); } else { /* Add timeout */ session->timeout.data = session; - ev_now_update_if_cheap ((struct ev_loop *)session->event_loop); - ev_timer_init (&session->timeout, - rspamd_fuzzy_redis_timeout, - session->backend->timeout, 0.0); - ev_timer_start (session->event_loop, &session->timeout); + ev_now_update_if_cheap((struct ev_loop *) session->event_loop); + ev_timer_init(&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start(session->event_loop, &session->timeout); } return; @@ -447,89 +447,89 @@ rspamd_fuzzy_redis_shingles_callback (redisAsyncContext *c, gpointer r, } } else if (reply->type == REDIS_REPLY_ERROR) { - msg_err_redis_session ("fuzzy backend redis error: \"%s\"", - reply->str); + msg_err_redis_session("fuzzy backend redis error: \"%s\"", + reply->str); } if (session->callback.cb_check) { - session->callback.cb_check (&rep, session->cbdata); + session->callback.cb_check(&rep, session->cbdata); } } else { if (session->callback.cb_check) { - session->callback.cb_check (&rep, session->cbdata); + session->callback.cb_check(&rep, session->cbdata); } if (c->errstr) { - msg_err_redis_session ("error getting shingles: %s", c->errstr); - rspamd_upstream_fail (session->up, FALSE, c->errstr); + msg_err_redis_session("error getting shingles: %s", c->errstr); + rspamd_upstream_fail(session->up, FALSE, c->errstr); } } - rspamd_fuzzy_redis_session_dtor (session, FALSE); + rspamd_fuzzy_redis_session_dtor(session, FALSE); } static void -rspamd_fuzzy_backend_check_shingles (struct rspamd_fuzzy_redis_session *session) +rspamd_fuzzy_backend_check_shingles(struct rspamd_fuzzy_redis_session *session) { struct rspamd_fuzzy_reply rep; const struct rspamd_fuzzy_shingle_cmd *shcmd; GString *key; guint i, init_len; - rspamd_fuzzy_redis_session_free_args (session); + rspamd_fuzzy_redis_session_free_args(session); /* First of all check digest */ session->nargs = RSPAMD_SHINGLE_SIZE + 1; - session->argv = g_malloc (sizeof (gchar *) * session->nargs); - session->argv_lens = g_malloc (sizeof (gsize) * session->nargs); - shcmd = (const struct rspamd_fuzzy_shingle_cmd *)session->cmd; + session->argv = g_malloc(sizeof(gchar *) * session->nargs); + session->argv_lens = g_malloc(sizeof(gsize) * session->nargs); + shcmd = (const struct rspamd_fuzzy_shingle_cmd *) session->cmd; - session->argv[0] = g_strdup ("MGET"); + session->argv[0] = g_strdup("MGET"); session->argv_lens[0] = 4; - init_len = strlen (session->backend->redis_object); + init_len = strlen(session->backend->redis_object); - for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { - key = g_string_sized_new (init_len + 2 + 2 + sizeof ("18446744073709551616")); - rspamd_printf_gstring (key, "%s_%d_%uL", session->backend->redis_object, - i, shcmd->sgl.hashes[i]); + key = g_string_sized_new(init_len + 2 + 2 + sizeof("18446744073709551616")); + rspamd_printf_gstring(key, "%s_%d_%uL", session->backend->redis_object, + i, shcmd->sgl.hashes[i]); session->argv[i + 1] = key->str; session->argv_lens[i + 1] = key->len; - g_string_free (key, FALSE); /* Do not free underlying array */ + g_string_free(key, FALSE); /* Do not free underlying array */ } session->shingles_checked = TRUE; - g_assert (session->ctx != NULL); + g_assert(session->ctx != NULL); - if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_shingles_callback, - session, session->nargs, - (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { - msg_err ("cannot execute redis command on %s: %s", - rspamd_inet_address_to_string_pretty (rspamd_upstream_addr_cur (session->up)), + if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_shingles_callback, + session, session->nargs, + (const gchar **) session->argv, session->argv_lens) != REDIS_OK) { + msg_err("cannot execute redis command on %s: %s", + rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)), session->ctx->errstr); if (session->callback.cb_check) { - memset (&rep, 0, sizeof (rep)); - session->callback.cb_check (&rep, session->cbdata); + memset(&rep, 0, sizeof(rep)); + session->callback.cb_check(&rep, session->cbdata); } - rspamd_fuzzy_redis_session_dtor (session, TRUE); + rspamd_fuzzy_redis_session_dtor(session, TRUE); } else { /* Add timeout */ session->timeout.data = session; - ev_now_update_if_cheap ((struct ev_loop *)session->event_loop); - ev_timer_init (&session->timeout, - rspamd_fuzzy_redis_timeout, - session->backend->timeout, 0.0); - ev_timer_start (session->event_loop, &session->timeout); + ev_now_update_if_cheap((struct ev_loop *) session->event_loop); + ev_timer_init(&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start(session->event_loop, &session->timeout); } } static void -rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r, - gpointer priv) +rspamd_fuzzy_redis_check_callback(redisAsyncContext *c, gpointer r, + gpointer priv) { struct rspamd_fuzzy_redis_session *session = priv; redisReply *reply = r, *cur; @@ -537,32 +537,32 @@ rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r, gulong value; guint found_elts = 0; - ev_timer_stop (session->event_loop, &session->timeout); - memset (&rep, 0, sizeof (rep)); + ev_timer_stop(session->event_loop, &session->timeout); + memset(&rep, 0, sizeof(rep)); if (c->err == 0 && reply != NULL) { - rspamd_upstream_ok (session->up); + rspamd_upstream_ok(session->up); if (reply->type == REDIS_REPLY_ARRAY && reply->elements >= 2) { cur = reply->element[0]; if (cur->type == REDIS_REPLY_STRING) { - value = strtoul (cur->str, NULL, 10); + value = strtoul(cur->str, NULL, 10); rep.v1.value = value; - found_elts ++; + found_elts++; } cur = reply->element[1]; if (cur->type == REDIS_REPLY_STRING) { - value = strtoul (cur->str, NULL, 10); + value = strtoul(cur->str, NULL, 10); rep.v1.flag = value; - found_elts ++; + found_elts++; } if (found_elts >= 2) { rep.v1.prob = session->prob; - memcpy (rep.digest, session->found_digest, sizeof (rep.digest)); + memcpy(rep.digest, session->found_digest, sizeof(rep.digest)); } rep.ts = 0; @@ -571,55 +571,54 @@ rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r, cur = reply->element[2]; if (cur->type == REDIS_REPLY_STRING) { - rep.ts = strtoul (cur->str, NULL, 10); + rep.ts = strtoul(cur->str, NULL, 10); } } } else if (reply->type == REDIS_REPLY_ERROR) { - msg_err_redis_session ("fuzzy backend redis error: \"%s\"", - reply->str); + msg_err_redis_session("fuzzy backend redis error: \"%s\"", + reply->str); } if (found_elts < 2) { if (session->cmd->shingles_count > 0 && !session->shingles_checked) { /* We also need to check all shingles here */ - rspamd_fuzzy_backend_check_shingles (session); + rspamd_fuzzy_backend_check_shingles(session); /* Do not free session */ return; } else { if (session->callback.cb_check) { - session->callback.cb_check (&rep, session->cbdata); + session->callback.cb_check(&rep, session->cbdata); } } } else { if (session->callback.cb_check) { - session->callback.cb_check (&rep, session->cbdata); + session->callback.cb_check(&rep, session->cbdata); } } } else { if (session->callback.cb_check) { - session->callback.cb_check (&rep, session->cbdata); + session->callback.cb_check(&rep, session->cbdata); } if (c->errstr) { - msg_err_redis_session ("error getting hashes on %s: %s", - rspamd_inet_address_to_string_pretty (rspamd_upstream_addr_cur (session->up)), - c->errstr); - rspamd_upstream_fail (session->up, FALSE, c->errstr); + msg_err_redis_session("error getting hashes on %s: %s", + rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)), + c->errstr); + rspamd_upstream_fail(session->up, FALSE, c->errstr); } } - rspamd_fuzzy_redis_session_dtor (session, FALSE); + rspamd_fuzzy_redis_session_dtor(session, FALSE); } -void -rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk, - const struct rspamd_fuzzy_cmd *cmd, - rspamd_fuzzy_check_cb cb, void *ud, - void *subr_ud) +void rspamd_fuzzy_backend_check_redis(struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud, + void *subr_ud) { struct rspamd_fuzzy_backend_redis *backend = subr_ud; struct rspamd_fuzzy_redis_session *session; @@ -629,151 +628,149 @@ rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk, struct rspamd_fuzzy_reply rep; GString *key; - g_assert (backend != NULL); + g_assert(backend != NULL); - ups = rspamd_redis_get_servers (backend, "read_servers"); + ups = rspamd_redis_get_servers(backend, "read_servers"); if (!ups) { if (cb) { - memset (&rep, 0, sizeof (rep)); - cb (&rep, ud); + memset(&rep, 0, sizeof(rep)); + cb(&rep, ud); } return; } - session = g_malloc0 (sizeof (*session)); + session = g_malloc0(sizeof(*session)); session->backend = backend; - REF_RETAIN (session->backend); + REF_RETAIN(session->backend); session->callback.cb_check = cb; session->cbdata = ud; session->command = RSPAMD_FUZZY_REDIS_COMMAND_CHECK; session->cmd = cmd; session->prob = 1.0; - memcpy (rep.digest, session->cmd->digest, sizeof (rep.digest)); - memcpy (session->found_digest, session->cmd->digest, sizeof (rep.digest)); - session->event_loop = rspamd_fuzzy_backend_event_base (bk); + memcpy(rep.digest, session->cmd->digest, sizeof(rep.digest)); + memcpy(session->found_digest, session->cmd->digest, sizeof(rep.digest)); + session->event_loop = rspamd_fuzzy_backend_event_base(bk); /* First of all check digest */ session->nargs = 5; - session->argv = g_malloc (sizeof (gchar *) * session->nargs); - session->argv_lens = g_malloc (sizeof (gsize) * session->nargs); + session->argv = g_malloc(sizeof(gchar *) * session->nargs); + session->argv_lens = g_malloc(sizeof(gsize) * session->nargs); - key = g_string_new (backend->redis_object); - g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); - session->argv[0] = g_strdup ("HMGET"); + key = g_string_new(backend->redis_object); + g_string_append_len(key, cmd->digest, sizeof(cmd->digest)); + session->argv[0] = g_strdup("HMGET"); session->argv_lens[0] = 5; session->argv[1] = key->str; session->argv_lens[1] = key->len; - session->argv[2] = g_strdup ("V"); + session->argv[2] = g_strdup("V"); session->argv_lens[2] = 1; - session->argv[3] = g_strdup ("F"); + session->argv[3] = g_strdup("F"); session->argv_lens[3] = 1; - session->argv[4] = g_strdup ("C"); + session->argv[4] = g_strdup("C"); session->argv_lens[4] = 1; - g_string_free (key, FALSE); /* Do not free underlying array */ + g_string_free(key, FALSE); /* Do not free underlying array */ - up = rspamd_upstream_get (ups, - RSPAMD_UPSTREAM_ROUND_ROBIN, - NULL, - 0); + up = rspamd_upstream_get(ups, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); - session->up = rspamd_upstream_ref (up); - addr = rspamd_upstream_addr_next (up); - g_assert (addr != NULL); - session->ctx = rspamd_redis_pool_connect (backend->pool, - backend->dbname, backend->password, - rspamd_inet_address_to_string (addr), - rspamd_inet_address_get_port (addr)); + session->up = rspamd_upstream_ref(up); + addr = rspamd_upstream_addr_next(up); + g_assert(addr != NULL); + session->ctx = rspamd_redis_pool_connect(backend->pool, + backend->dbname, backend->password, + rspamd_inet_address_to_string(addr), + rspamd_inet_address_get_port(addr)); if (session->ctx == NULL) { - rspamd_upstream_fail (up, TRUE, strerror (errno)); - rspamd_fuzzy_redis_session_dtor (session, TRUE); + rspamd_upstream_fail(up, TRUE, strerror(errno)); + rspamd_fuzzy_redis_session_dtor(session, TRUE); if (cb) { - memset (&rep, 0, sizeof (rep)); - cb (&rep, ud); + memset(&rep, 0, sizeof(rep)); + cb(&rep, ud); } } else { - if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_check_callback, - session, session->nargs, - (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { - rspamd_fuzzy_redis_session_dtor (session, TRUE); + if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_check_callback, + session, session->nargs, + (const gchar **) session->argv, session->argv_lens) != REDIS_OK) { + rspamd_fuzzy_redis_session_dtor(session, TRUE); if (cb) { - memset (&rep, 0, sizeof (rep)); - cb (&rep, ud); + memset(&rep, 0, sizeof(rep)); + cb(&rep, ud); } } else { /* Add timeout */ session->timeout.data = session; - ev_now_update_if_cheap ((struct ev_loop *)session->event_loop); - ev_timer_init (&session->timeout, - rspamd_fuzzy_redis_timeout, - session->backend->timeout, 0.0); - ev_timer_start (session->event_loop, &session->timeout); + ev_now_update_if_cheap((struct ev_loop *) session->event_loop); + ev_timer_init(&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start(session->event_loop, &session->timeout); } } } static void -rspamd_fuzzy_redis_count_callback (redisAsyncContext *c, gpointer r, - gpointer priv) +rspamd_fuzzy_redis_count_callback(redisAsyncContext *c, gpointer r, + gpointer priv) { struct rspamd_fuzzy_redis_session *session = priv; redisReply *reply = r; gulong nelts; - ev_timer_stop (session->event_loop, &session->timeout); + ev_timer_stop(session->event_loop, &session->timeout); if (c->err == 0 && reply != NULL) { - rspamd_upstream_ok (session->up); + rspamd_upstream_ok(session->up); if (reply->type == REDIS_REPLY_INTEGER) { if (session->callback.cb_count) { - session->callback.cb_count (reply->integer, session->cbdata); + session->callback.cb_count(reply->integer, session->cbdata); } } else if (reply->type == REDIS_REPLY_STRING) { - nelts = strtoul (reply->str, NULL, 10); + nelts = strtoul(reply->str, NULL, 10); if (session->callback.cb_count) { - session->callback.cb_count (nelts, session->cbdata); + session->callback.cb_count(nelts, session->cbdata); } } else { if (reply->type == REDIS_REPLY_ERROR) { - msg_err_redis_session ("fuzzy backend redis error: \"%s\"", - reply->str); + msg_err_redis_session("fuzzy backend redis error: \"%s\"", + reply->str); } if (session->callback.cb_count) { - session->callback.cb_count (0, session->cbdata); + session->callback.cb_count(0, session->cbdata); } } } else { if (session->callback.cb_count) { - session->callback.cb_count (0, session->cbdata); + session->callback.cb_count(0, session->cbdata); } if (c->errstr) { - msg_err_redis_session ("error getting count on %s: %s", - rspamd_inet_address_to_string_pretty (rspamd_upstream_addr_cur (session->up)), - c->errstr); - rspamd_upstream_fail (session->up, FALSE, c->errstr); + msg_err_redis_session("error getting count on %s: %s", + rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)), + c->errstr); + rspamd_upstream_fail(session->up, FALSE, c->errstr); } - } - rspamd_fuzzy_redis_session_dtor (session, FALSE); + rspamd_fuzzy_redis_session_dtor(session, FALSE); } -void -rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk, - rspamd_fuzzy_count_cb cb, void *ud, - void *subr_ud) +void rspamd_fuzzy_backend_count_redis(struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud, + void *subr_ud) { struct rspamd_fuzzy_backend_redis *backend = subr_ud; struct rspamd_fuzzy_redis_session *session; @@ -782,136 +779,135 @@ rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk, rspamd_inet_addr_t *addr; GString *key; - g_assert (backend != NULL); + g_assert(backend != NULL); - ups = rspamd_redis_get_servers (backend, "read_servers"); + ups = rspamd_redis_get_servers(backend, "read_servers"); if (!ups) { if (cb) { - cb (0, ud); + cb(0, ud); } return; } - session = g_malloc0 (sizeof (*session)); + session = g_malloc0(sizeof(*session)); session->backend = backend; - REF_RETAIN (session->backend); + REF_RETAIN(session->backend); session->callback.cb_count = cb; session->cbdata = ud; session->command = RSPAMD_FUZZY_REDIS_COMMAND_COUNT; - session->event_loop = rspamd_fuzzy_backend_event_base (bk); + session->event_loop = rspamd_fuzzy_backend_event_base(bk); session->nargs = 2; - session->argv = g_malloc (sizeof (gchar *) * 2); - session->argv_lens = g_malloc (sizeof (gsize) * 2); - key = g_string_new (backend->redis_object); - g_string_append (key, "_count"); - session->argv[0] = g_strdup ("GET"); + session->argv = g_malloc(sizeof(gchar *) * 2); + session->argv_lens = g_malloc(sizeof(gsize) * 2); + key = g_string_new(backend->redis_object); + g_string_append(key, "_count"); + session->argv[0] = g_strdup("GET"); session->argv_lens[0] = 3; session->argv[1] = key->str; session->argv_lens[1] = key->len; - g_string_free (key, FALSE); /* Do not free underlying array */ + g_string_free(key, FALSE); /* Do not free underlying array */ - up = rspamd_upstream_get (ups, - RSPAMD_UPSTREAM_ROUND_ROBIN, - NULL, - 0); + up = rspamd_upstream_get(ups, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); - session->up = rspamd_upstream_ref (up); - addr = rspamd_upstream_addr_next (up); - g_assert (addr != NULL); - session->ctx = rspamd_redis_pool_connect (backend->pool, - backend->dbname, backend->password, - rspamd_inet_address_to_string (addr), - rspamd_inet_address_get_port (addr)); + session->up = rspamd_upstream_ref(up); + addr = rspamd_upstream_addr_next(up); + g_assert(addr != NULL); + session->ctx = rspamd_redis_pool_connect(backend->pool, + backend->dbname, backend->password, + rspamd_inet_address_to_string(addr), + rspamd_inet_address_get_port(addr)); if (session->ctx == NULL) { - rspamd_upstream_fail (up, TRUE, strerror (errno)); - rspamd_fuzzy_redis_session_dtor (session, TRUE); + rspamd_upstream_fail(up, TRUE, strerror(errno)); + rspamd_fuzzy_redis_session_dtor(session, TRUE); if (cb) { - cb (0, ud); + cb(0, ud); } } else { - if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_count_callback, - session, session->nargs, - (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { - rspamd_fuzzy_redis_session_dtor (session, TRUE); + if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_count_callback, + session, session->nargs, + (const gchar **) session->argv, session->argv_lens) != REDIS_OK) { + rspamd_fuzzy_redis_session_dtor(session, TRUE); if (cb) { - cb (0, ud); + cb(0, ud); } } else { /* Add timeout */ session->timeout.data = session; - ev_now_update_if_cheap ((struct ev_loop *)session->event_loop); - ev_timer_init (&session->timeout, - rspamd_fuzzy_redis_timeout, - session->backend->timeout, 0.0); - ev_timer_start (session->event_loop, &session->timeout); + ev_now_update_if_cheap((struct ev_loop *) session->event_loop); + ev_timer_init(&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start(session->event_loop, &session->timeout); } } } static void -rspamd_fuzzy_redis_version_callback (redisAsyncContext *c, gpointer r, - gpointer priv) +rspamd_fuzzy_redis_version_callback(redisAsyncContext *c, gpointer r, + gpointer priv) { struct rspamd_fuzzy_redis_session *session = priv; redisReply *reply = r; gulong nelts; - ev_timer_stop (session->event_loop, &session->timeout); + ev_timer_stop(session->event_loop, &session->timeout); if (c->err == 0 && reply != NULL) { - rspamd_upstream_ok (session->up); + rspamd_upstream_ok(session->up); if (reply->type == REDIS_REPLY_INTEGER) { if (session->callback.cb_version) { - session->callback.cb_version (reply->integer, session->cbdata); + session->callback.cb_version(reply->integer, session->cbdata); } } else if (reply->type == REDIS_REPLY_STRING) { - nelts = strtoul (reply->str, NULL, 10); + nelts = strtoul(reply->str, NULL, 10); if (session->callback.cb_version) { - session->callback.cb_version (nelts, session->cbdata); + session->callback.cb_version(nelts, session->cbdata); } } else { if (reply->type == REDIS_REPLY_ERROR) { - msg_err_redis_session ("fuzzy backend redis error: \"%s\"", - reply->str); + msg_err_redis_session("fuzzy backend redis error: \"%s\"", + reply->str); } if (session->callback.cb_version) { - session->callback.cb_version (0, session->cbdata); + session->callback.cb_version(0, session->cbdata); } } } else { if (session->callback.cb_version) { - session->callback.cb_version (0, session->cbdata); + session->callback.cb_version(0, session->cbdata); } if (c->errstr) { - msg_err_redis_session ("error getting version on %s: %s", - rspamd_inet_address_to_string_pretty (rspamd_upstream_addr_cur (session->up)), - c->errstr); - rspamd_upstream_fail (session->up, FALSE, c->errstr); + msg_err_redis_session("error getting version on %s: %s", + rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)), + c->errstr); + rspamd_upstream_fail(session->up, FALSE, c->errstr); } } - rspamd_fuzzy_redis_session_dtor (session, FALSE); + rspamd_fuzzy_redis_session_dtor(session, FALSE); } -void -rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk, - const gchar *src, - rspamd_fuzzy_version_cb cb, void *ud, - void *subr_ud) +void rspamd_fuzzy_backend_version_redis(struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud, + void *subr_ud) { struct rspamd_fuzzy_backend_redis *backend = subr_ud; struct rspamd_fuzzy_redis_session *session; @@ -920,103 +916,102 @@ rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk, rspamd_inet_addr_t *addr; GString *key; - g_assert (backend != NULL); + g_assert(backend != NULL); - ups = rspamd_redis_get_servers (backend, "read_servers"); + ups = rspamd_redis_get_servers(backend, "read_servers"); if (!ups) { if (cb) { - cb (0, ud); + cb(0, ud); } return; } - session = g_malloc0 (sizeof (*session)); + session = g_malloc0(sizeof(*session)); session->backend = backend; - REF_RETAIN (session->backend); + REF_RETAIN(session->backend); session->callback.cb_version = cb; session->cbdata = ud; session->command = RSPAMD_FUZZY_REDIS_COMMAND_VERSION; - session->event_loop = rspamd_fuzzy_backend_event_base (bk); + session->event_loop = rspamd_fuzzy_backend_event_base(bk); session->nargs = 2; - session->argv = g_malloc (sizeof (gchar *) * 2); - session->argv_lens = g_malloc (sizeof (gsize) * 2); - key = g_string_new (backend->redis_object); - g_string_append (key, src); - session->argv[0] = g_strdup ("GET"); + session->argv = g_malloc(sizeof(gchar *) * 2); + session->argv_lens = g_malloc(sizeof(gsize) * 2); + key = g_string_new(backend->redis_object); + g_string_append(key, src); + session->argv[0] = g_strdup("GET"); session->argv_lens[0] = 3; session->argv[1] = key->str; session->argv_lens[1] = key->len; - g_string_free (key, FALSE); /* Do not free underlying array */ + g_string_free(key, FALSE); /* Do not free underlying array */ - up = rspamd_upstream_get (ups, - RSPAMD_UPSTREAM_ROUND_ROBIN, - NULL, - 0); + up = rspamd_upstream_get(ups, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); - session->up = rspamd_upstream_ref (up); - addr = rspamd_upstream_addr_next (up); - g_assert (addr != NULL); - session->ctx = rspamd_redis_pool_connect (backend->pool, - backend->dbname, backend->password, - rspamd_inet_address_to_string (addr), - rspamd_inet_address_get_port (addr)); + session->up = rspamd_upstream_ref(up); + addr = rspamd_upstream_addr_next(up); + g_assert(addr != NULL); + session->ctx = rspamd_redis_pool_connect(backend->pool, + backend->dbname, backend->password, + rspamd_inet_address_to_string(addr), + rspamd_inet_address_get_port(addr)); if (session->ctx == NULL) { - rspamd_upstream_fail (up, FALSE, strerror (errno)); - rspamd_fuzzy_redis_session_dtor (session, TRUE); + rspamd_upstream_fail(up, FALSE, strerror(errno)); + rspamd_fuzzy_redis_session_dtor(session, TRUE); if (cb) { - cb (0, ud); + cb(0, ud); } } else { - if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_version_callback, - session, session->nargs, - (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { - rspamd_fuzzy_redis_session_dtor (session, TRUE); + if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_version_callback, + session, session->nargs, + (const gchar **) session->argv, session->argv_lens) != REDIS_OK) { + rspamd_fuzzy_redis_session_dtor(session, TRUE); if (cb) { - cb (0, ud); + cb(0, ud); } } else { /* Add timeout */ session->timeout.data = session; - ev_now_update_if_cheap ((struct ev_loop *)session->event_loop); - ev_timer_init (&session->timeout, - rspamd_fuzzy_redis_timeout, - session->backend->timeout, 0.0); - ev_timer_start (session->event_loop, &session->timeout); + ev_now_update_if_cheap((struct ev_loop *) session->event_loop); + ev_timer_init(&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start(session->event_loop, &session->timeout); } } } -const gchar* -rspamd_fuzzy_backend_id_redis (struct rspamd_fuzzy_backend *bk, - void *subr_ud) +const gchar * +rspamd_fuzzy_backend_id_redis(struct rspamd_fuzzy_backend *bk, + void *subr_ud) { struct rspamd_fuzzy_backend_redis *backend = subr_ud; - g_assert (backend != NULL); + g_assert(backend != NULL); return backend->id; } -void -rspamd_fuzzy_backend_expire_redis (struct rspamd_fuzzy_backend *bk, - void *subr_ud) +void rspamd_fuzzy_backend_expire_redis(struct rspamd_fuzzy_backend *bk, + void *subr_ud) { struct rspamd_fuzzy_backend_redis *backend = subr_ud; - g_assert (backend != NULL); + g_assert(backend != NULL); } static gboolean -rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk, - struct rspamd_fuzzy_redis_session *session, - struct fuzzy_peer_cmd *io_cmd, guint *shift) +rspamd_fuzzy_update_append_command(struct rspamd_fuzzy_backend *bk, + struct rspamd_fuzzy_redis_session *session, + struct fuzzy_peer_cmd *io_cmd, guint *shift) { GString *key, *value; guint cur_shift = *shift; @@ -1028,7 +1023,6 @@ rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk, } else { cmd = &io_cmd->cmd.normal; - } if (cmd->cmd == FUZZY_WRITE) { @@ -1042,170 +1036,170 @@ rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk, */ /* HSET */ - klen = strlen (session->backend->redis_object) + - sizeof (cmd->digest) + 1; - key = g_string_sized_new (klen); - g_string_append (key, session->backend->redis_object); - g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); - value = g_string_sized_new (sizeof ("4294967296")); - rspamd_printf_gstring (value, "%d", cmd->flag); + klen = strlen(session->backend->redis_object) + + sizeof(cmd->digest) + 1; + key = g_string_sized_new(klen); + g_string_append(key, session->backend->redis_object); + g_string_append_len(key, cmd->digest, sizeof(cmd->digest)); + value = g_string_sized_new(sizeof("4294967296")); + rspamd_printf_gstring(value, "%d", cmd->flag); if (cmd->version & RSPAMD_FUZZY_FLAG_WEAK) { - session->argv[cur_shift] = g_strdup ("HSETNX"); - session->argv_lens[cur_shift++] = sizeof ("HSETNX") - 1; + session->argv[cur_shift] = g_strdup("HSETNX"); + session->argv_lens[cur_shift++] = sizeof("HSETNX") - 1; } else { - session->argv[cur_shift] = g_strdup ("HSET"); - session->argv_lens[cur_shift++] = sizeof ("HSET") - 1; + session->argv[cur_shift] = g_strdup("HSET"); + session->argv_lens[cur_shift++] = sizeof("HSET") - 1; } session->argv[cur_shift] = key->str; session->argv_lens[cur_shift++] = key->len; - session->argv[cur_shift] = g_strdup ("F"); - session->argv_lens[cur_shift++] = sizeof ("F") - 1; + session->argv[cur_shift] = g_strdup("F"); + session->argv_lens[cur_shift++] = sizeof("F") - 1; session->argv[cur_shift] = value->str; session->argv_lens[cur_shift++] = value->len; - g_string_free (key, FALSE); - g_string_free (value, FALSE); + g_string_free(key, FALSE); + g_string_free(value, FALSE); - if (redisAsyncCommandArgv (session->ctx, NULL, NULL, - 4, - (const gchar **)&session->argv[cur_shift - 4], - &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 4, + (const gchar **) &session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { return FALSE; } /* HSETNX */ - klen = strlen (session->backend->redis_object) + - sizeof (cmd->digest) + 1; - key = g_string_sized_new (klen); - g_string_append (key, session->backend->redis_object); - g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); - value = g_string_sized_new (sizeof ("18446744073709551616")); - rspamd_printf_gstring (value, "%L", (gint64)rspamd_get_calendar_ticks ()); - session->argv[cur_shift] = g_strdup ("HSETNX"); - session->argv_lens[cur_shift++] = sizeof ("HSETNX") - 1; + klen = strlen(session->backend->redis_object) + + sizeof(cmd->digest) + 1; + key = g_string_sized_new(klen); + g_string_append(key, session->backend->redis_object); + g_string_append_len(key, cmd->digest, sizeof(cmd->digest)); + value = g_string_sized_new(sizeof("18446744073709551616")); + rspamd_printf_gstring(value, "%L", (gint64) rspamd_get_calendar_ticks()); + session->argv[cur_shift] = g_strdup("HSETNX"); + session->argv_lens[cur_shift++] = sizeof("HSETNX") - 1; session->argv[cur_shift] = key->str; session->argv_lens[cur_shift++] = key->len; - session->argv[cur_shift] = g_strdup ("C"); - session->argv_lens[cur_shift++] = sizeof ("C") - 1; + session->argv[cur_shift] = g_strdup("C"); + session->argv_lens[cur_shift++] = sizeof("C") - 1; session->argv[cur_shift] = value->str; session->argv_lens[cur_shift++] = value->len; - g_string_free (key, FALSE); - g_string_free (value, FALSE); + g_string_free(key, FALSE); + g_string_free(value, FALSE); - if (redisAsyncCommandArgv (session->ctx, NULL, NULL, - 4, - (const gchar **)&session->argv[cur_shift - 4], - &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 4, + (const gchar **) &session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { return FALSE; } /* HINCRBY */ - key = g_string_sized_new (klen); - g_string_append (key, session->backend->redis_object); - g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); - value = g_string_sized_new (sizeof ("4294967296")); - rspamd_printf_gstring (value, "%d", cmd->value); - session->argv[cur_shift] = g_strdup ("HINCRBY"); - session->argv_lens[cur_shift++] = sizeof ("HINCRBY") - 1; + key = g_string_sized_new(klen); + g_string_append(key, session->backend->redis_object); + g_string_append_len(key, cmd->digest, sizeof(cmd->digest)); + value = g_string_sized_new(sizeof("4294967296")); + rspamd_printf_gstring(value, "%d", cmd->value); + session->argv[cur_shift] = g_strdup("HINCRBY"); + session->argv_lens[cur_shift++] = sizeof("HINCRBY") - 1; session->argv[cur_shift] = key->str; session->argv_lens[cur_shift++] = key->len; - session->argv[cur_shift] = g_strdup ("V"); - session->argv_lens[cur_shift++] = sizeof ("V") - 1; + session->argv[cur_shift] = g_strdup("V"); + session->argv_lens[cur_shift++] = sizeof("V") - 1; session->argv[cur_shift] = value->str; session->argv_lens[cur_shift++] = value->len; - g_string_free (key, FALSE); - g_string_free (value, FALSE); + g_string_free(key, FALSE); + g_string_free(value, FALSE); - if (redisAsyncCommandArgv (session->ctx, NULL, NULL, - 4, - (const gchar **)&session->argv[cur_shift - 4], - &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 4, + (const gchar **) &session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { return FALSE; } /* EXPIRE */ - key = g_string_sized_new (klen); - g_string_append (key, session->backend->redis_object); - g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); - value = g_string_sized_new (sizeof ("4294967296")); - rspamd_printf_gstring (value, "%d", - (gint)rspamd_fuzzy_backend_get_expire (bk)); - session->argv[cur_shift] = g_strdup ("EXPIRE"); - session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1; + key = g_string_sized_new(klen); + g_string_append(key, session->backend->redis_object); + g_string_append_len(key, cmd->digest, sizeof(cmd->digest)); + value = g_string_sized_new(sizeof("4294967296")); + rspamd_printf_gstring(value, "%d", + (gint) rspamd_fuzzy_backend_get_expire(bk)); + session->argv[cur_shift] = g_strdup("EXPIRE"); + session->argv_lens[cur_shift++] = sizeof("EXPIRE") - 1; session->argv[cur_shift] = key->str; session->argv_lens[cur_shift++] = key->len; session->argv[cur_shift] = value->str; session->argv_lens[cur_shift++] = value->len; - g_string_free (key, FALSE); - g_string_free (value, FALSE); + g_string_free(key, FALSE); + g_string_free(value, FALSE); - if (redisAsyncCommandArgv (session->ctx, NULL, NULL, - 3, - (const gchar **)&session->argv[cur_shift - 3], - &session->argv_lens[cur_shift - 3]) != REDIS_OK) { + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 3, + (const gchar **) &session->argv[cur_shift - 3], + &session->argv_lens[cur_shift - 3]) != REDIS_OK) { return FALSE; } /* INCR */ - key = g_string_sized_new (klen); - g_string_append (key, session->backend->redis_object); - g_string_append (key, "_count"); - session->argv[cur_shift] = g_strdup ("INCR"); - session->argv_lens[cur_shift++] = sizeof ("INCR") - 1; + key = g_string_sized_new(klen); + g_string_append(key, session->backend->redis_object); + g_string_append(key, "_count"); + session->argv[cur_shift] = g_strdup("INCR"); + session->argv_lens[cur_shift++] = sizeof("INCR") - 1; session->argv[cur_shift] = key->str; session->argv_lens[cur_shift++] = key->len; - g_string_free (key, FALSE); + g_string_free(key, FALSE); - if (redisAsyncCommandArgv (session->ctx, NULL, NULL, - 2, - (const gchar **)&session->argv[cur_shift - 2], - &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 2, + (const gchar **) &session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { return FALSE; } } else if (cmd->cmd == FUZZY_DEL) { /* DEL */ - klen = strlen (session->backend->redis_object) + - sizeof (cmd->digest) + 1; - - key = g_string_sized_new (klen); - g_string_append (key, session->backend->redis_object); - g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); - session->argv[cur_shift] = g_strdup ("DEL"); - session->argv_lens[cur_shift++] = sizeof ("DEL") - 1; + klen = strlen(session->backend->redis_object) + + sizeof(cmd->digest) + 1; + + key = g_string_sized_new(klen); + g_string_append(key, session->backend->redis_object); + g_string_append_len(key, cmd->digest, sizeof(cmd->digest)); + session->argv[cur_shift] = g_strdup("DEL"); + session->argv_lens[cur_shift++] = sizeof("DEL") - 1; session->argv[cur_shift] = key->str; session->argv_lens[cur_shift++] = key->len; - g_string_free (key, FALSE); + g_string_free(key, FALSE); - if (redisAsyncCommandArgv (session->ctx, NULL, NULL, - 2, - (const gchar **)&session->argv[cur_shift - 2], - &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 2, + (const gchar **) &session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { return FALSE; } /* DECR */ - key = g_string_sized_new (klen); - g_string_append (key, session->backend->redis_object); - g_string_append (key, "_count"); - session->argv[cur_shift] = g_strdup ("DECR"); - session->argv_lens[cur_shift++] = sizeof ("DECR") - 1; + key = g_string_sized_new(klen); + g_string_append(key, session->backend->redis_object); + g_string_append(key, "_count"); + session->argv[cur_shift] = g_strdup("DECR"); + session->argv_lens[cur_shift++] = sizeof("DECR") - 1; session->argv[cur_shift] = key->str; session->argv_lens[cur_shift++] = key->len; - g_string_free (key, FALSE); + g_string_free(key, FALSE); - if (redisAsyncCommandArgv (session->ctx, NULL, NULL, - 2, - (const gchar **)&session->argv[cur_shift - 2], - &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 2, + (const gchar **) &session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { return FALSE; } @@ -1217,29 +1211,29 @@ rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk, * Where <key> is <prefix> || <digest> */ - klen = strlen (session->backend->redis_object) + - sizeof (cmd->digest) + 1; + klen = strlen(session->backend->redis_object) + + sizeof(cmd->digest) + 1; /* EXPIRE */ - key = g_string_sized_new (klen); - g_string_append (key, session->backend->redis_object); - g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); - value = g_string_sized_new (sizeof ("4294967296")); - rspamd_printf_gstring (value, "%d", - (gint)rspamd_fuzzy_backend_get_expire (bk)); - session->argv[cur_shift] = g_strdup ("EXPIRE"); - session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1; + key = g_string_sized_new(klen); + g_string_append(key, session->backend->redis_object); + g_string_append_len(key, cmd->digest, sizeof(cmd->digest)); + value = g_string_sized_new(sizeof("4294967296")); + rspamd_printf_gstring(value, "%d", + (gint) rspamd_fuzzy_backend_get_expire(bk)); + session->argv[cur_shift] = g_strdup("EXPIRE"); + session->argv_lens[cur_shift++] = sizeof("EXPIRE") - 1; session->argv[cur_shift] = key->str; session->argv_lens[cur_shift++] = key->len; session->argv[cur_shift] = value->str; session->argv_lens[cur_shift++] = value->len; - g_string_free (key, FALSE); - g_string_free (value, FALSE); + g_string_free(key, FALSE); + g_string_free(value, FALSE); - if (redisAsyncCommandArgv (session->ctx, NULL, NULL, - 3, - (const gchar **)&session->argv[cur_shift - 3], - &session->argv_lens[cur_shift - 3]) != REDIS_OK) { + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 3, + (const gchar **) &session->argv[cur_shift - 3], + &session->argv_lens[cur_shift - 3]) != REDIS_OK) { return FALSE; } @@ -1248,15 +1242,15 @@ rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk, /* Ignore */ } else { - g_assert_not_reached (); + g_assert_not_reached(); } if (io_cmd->is_shingle) { if (cmd->cmd == FUZZY_WRITE) { - klen = strlen (session->backend->redis_object) + - 64 + 1; + klen = strlen(session->backend->redis_object) + + 64 + 1; - for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { guchar *hval; /* * For each command with shingles we additionally emit 32 commands: @@ -1264,94 +1258,94 @@ rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk, */ /* SETEX */ - key = g_string_sized_new (klen); - rspamd_printf_gstring (key, "%s_%d_%uL", - session->backend->redis_object, - i, - io_cmd->cmd.shingle.sgl.hashes[i]); - value = g_string_sized_new (sizeof ("4294967296")); - rspamd_printf_gstring (value, "%d", - (gint)rspamd_fuzzy_backend_get_expire (bk)); - hval = g_malloc (sizeof (io_cmd->cmd.shingle.basic.digest)); - memcpy (hval, io_cmd->cmd.shingle.basic.digest, - sizeof (io_cmd->cmd.shingle.basic.digest)); - session->argv[cur_shift] = g_strdup ("SETEX"); - session->argv_lens[cur_shift++] = sizeof ("SETEX") - 1; + key = g_string_sized_new(klen); + rspamd_printf_gstring(key, "%s_%d_%uL", + session->backend->redis_object, + i, + io_cmd->cmd.shingle.sgl.hashes[i]); + value = g_string_sized_new(sizeof("4294967296")); + rspamd_printf_gstring(value, "%d", + (gint) rspamd_fuzzy_backend_get_expire(bk)); + hval = g_malloc(sizeof(io_cmd->cmd.shingle.basic.digest)); + memcpy(hval, io_cmd->cmd.shingle.basic.digest, + sizeof(io_cmd->cmd.shingle.basic.digest)); + session->argv[cur_shift] = g_strdup("SETEX"); + session->argv_lens[cur_shift++] = sizeof("SETEX") - 1; session->argv[cur_shift] = key->str; session->argv_lens[cur_shift++] = key->len; session->argv[cur_shift] = value->str; session->argv_lens[cur_shift++] = value->len; session->argv[cur_shift] = hval; - session->argv_lens[cur_shift++] = sizeof (io_cmd->cmd.shingle.basic.digest); - g_string_free (key, FALSE); - g_string_free (value, FALSE); + session->argv_lens[cur_shift++] = sizeof(io_cmd->cmd.shingle.basic.digest); + g_string_free(key, FALSE); + g_string_free(value, FALSE); - if (redisAsyncCommandArgv (session->ctx, NULL, NULL, - 4, - (const gchar **)&session->argv[cur_shift - 4], - &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 4, + (const gchar **) &session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { return FALSE; } } } else if (cmd->cmd == FUZZY_DEL) { - klen = strlen (session->backend->redis_object) + - 64 + 1; - - for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { - key = g_string_sized_new (klen); - rspamd_printf_gstring (key, "%s_%d_%uL", - session->backend->redis_object, - i, - io_cmd->cmd.shingle.sgl.hashes[i]); - session->argv[cur_shift] = g_strdup ("DEL"); - session->argv_lens[cur_shift++] = sizeof ("DEL") - 1; + klen = strlen(session->backend->redis_object) + + 64 + 1; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { + key = g_string_sized_new(klen); + rspamd_printf_gstring(key, "%s_%d_%uL", + session->backend->redis_object, + i, + io_cmd->cmd.shingle.sgl.hashes[i]); + session->argv[cur_shift] = g_strdup("DEL"); + session->argv_lens[cur_shift++] = sizeof("DEL") - 1; session->argv[cur_shift] = key->str; session->argv_lens[cur_shift++] = key->len; - g_string_free (key, FALSE); + g_string_free(key, FALSE); - if (redisAsyncCommandArgv (session->ctx, NULL, NULL, - 2, - (const gchar **)&session->argv[cur_shift - 2], - &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 2, + (const gchar **) &session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { return FALSE; } } } else if (cmd->cmd == FUZZY_REFRESH) { - klen = strlen (session->backend->redis_object) + + klen = strlen(session->backend->redis_object) + 64 + 1; - for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { /* * For each command with shingles we additionally emit 32 commands: * EXPIRE <prefix>_<number>_<value> <expire> */ /* Expire */ - key = g_string_sized_new (klen); - rspamd_printf_gstring (key, "%s_%d_%uL", - session->backend->redis_object, - i, - io_cmd->cmd.shingle.sgl.hashes[i]); - value = g_string_sized_new (sizeof ("18446744073709551616")); - rspamd_printf_gstring (value, "%d", - (gint)rspamd_fuzzy_backend_get_expire (bk)); - session->argv[cur_shift] = g_strdup ("EXPIRE"); - session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1; + key = g_string_sized_new(klen); + rspamd_printf_gstring(key, "%s_%d_%uL", + session->backend->redis_object, + i, + io_cmd->cmd.shingle.sgl.hashes[i]); + value = g_string_sized_new(sizeof("18446744073709551616")); + rspamd_printf_gstring(value, "%d", + (gint) rspamd_fuzzy_backend_get_expire(bk)); + session->argv[cur_shift] = g_strdup("EXPIRE"); + session->argv_lens[cur_shift++] = sizeof("EXPIRE") - 1; session->argv[cur_shift] = key->str; session->argv_lens[cur_shift++] = key->len; session->argv[cur_shift] = value->str; session->argv_lens[cur_shift++] = value->len; - g_string_free (key, FALSE); - g_string_free (value, FALSE); + g_string_free(key, FALSE); + g_string_free(value, FALSE); - if (redisAsyncCommandArgv (session->ctx, NULL, NULL, - 3, - (const gchar **)&session->argv[cur_shift - 3], - &session->argv_lens[cur_shift - 3]) != REDIS_OK) { + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 3, + (const gchar **) &session->argv[cur_shift - 3], + &session->argv_lens[cur_shift - 3]) != REDIS_OK) { return FALSE; } @@ -1361,7 +1355,7 @@ rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk, /* Ignore */ } else { - g_assert_not_reached (); + g_assert_not_reached(); } } @@ -1371,59 +1365,58 @@ rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk, } static void -rspamd_fuzzy_redis_update_callback (redisAsyncContext *c, gpointer r, - gpointer priv) +rspamd_fuzzy_redis_update_callback(redisAsyncContext *c, gpointer r, + gpointer priv) { struct rspamd_fuzzy_redis_session *session = priv; redisReply *reply = r; - ev_timer_stop (session->event_loop, &session->timeout); + ev_timer_stop(session->event_loop, &session->timeout); if (c->err == 0 && reply != NULL) { - rspamd_upstream_ok (session->up); + rspamd_upstream_ok(session->up); if (reply->type == REDIS_REPLY_ARRAY) { /* TODO: check all replies somehow */ if (session->callback.cb_update) { - session->callback.cb_update (TRUE, - session->nadded, - session->ndeleted, - session->nextended, - session->nignored, - session->cbdata); + session->callback.cb_update(TRUE, + session->nadded, + session->ndeleted, + session->nextended, + session->nignored, + session->cbdata); } } else { if (reply->type == REDIS_REPLY_ERROR) { - msg_err_redis_session ("fuzzy backend redis error: \"%s\"", - reply->str); + msg_err_redis_session("fuzzy backend redis error: \"%s\"", + reply->str); } if (session->callback.cb_update) { - session->callback.cb_update (FALSE, 0, 0, 0, 0, session->cbdata); + session->callback.cb_update(FALSE, 0, 0, 0, 0, session->cbdata); } } } else { if (session->callback.cb_update) { - session->callback.cb_update (FALSE, 0, 0, 0, 0, session->cbdata); + session->callback.cb_update(FALSE, 0, 0, 0, 0, session->cbdata); } if (c->errstr) { - msg_err_redis_session ("error sending update to redis %s: %s", - rspamd_inet_address_to_string_pretty (rspamd_upstream_addr_cur (session->up)), - c->errstr); - rspamd_upstream_fail (session->up, FALSE, c->errstr); + msg_err_redis_session("error sending update to redis %s: %s", + rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)), + c->errstr); + rspamd_upstream_fail(session->up, FALSE, c->errstr); } } - rspamd_fuzzy_redis_session_dtor (session, FALSE); + rspamd_fuzzy_redis_session_dtor(session, FALSE); } -void -rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, - GArray *updates, const gchar *src, - rspamd_fuzzy_update_cb cb, void *ud, - void *subr_ud) +void rspamd_fuzzy_backend_update_redis(struct rspamd_fuzzy_backend *bk, + GArray *updates, const gchar *src, + rspamd_fuzzy_update_cb cb, void *ud, + void *subr_ud) { struct rspamd_fuzzy_backend_redis *backend = subr_ud; struct rspamd_fuzzy_redis_session *session; @@ -1436,20 +1429,20 @@ rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, struct rspamd_fuzzy_cmd *cmd = NULL; guint nargs, cur_shift; - g_assert (backend != NULL); + g_assert(backend != NULL); - ups = rspamd_redis_get_servers (backend, "write_servers"); + ups = rspamd_redis_get_servers(backend, "write_servers"); if (!ups) { if (cb) { - cb (FALSE, 0, 0, 0, 0, ud); + cb(FALSE, 0, 0, 0, 0, ud); } return; } - session = g_malloc0 (sizeof (*session)); + session = g_malloc0(sizeof(*session)); session->backend = backend; - REF_RETAIN (session->backend); + REF_RETAIN(session->backend); /* * For each normal hash addition we do 3 redis commands: @@ -1473,8 +1466,8 @@ rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, nargs = 4; - for (i = 0; i < updates->len; i ++) { - io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i); + for (i = 0; i < updates->len; i++) { + io_cmd = &g_array_index(updates, struct fuzzy_peer_cmd, i); if (io_cmd->is_shingle) { cmd = &io_cmd->cmd.shingle.basic; @@ -1485,16 +1478,15 @@ rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, if (cmd->cmd == FUZZY_WRITE) { nargs += 17; - session->nadded ++; + session->nadded++; if (io_cmd->is_shingle) { nargs += RSPAMD_SHINGLE_SIZE * 4; } - } else if (cmd->cmd == FUZZY_DEL) { nargs += 4; - session->ndeleted ++; + session->ndeleted++; if (io_cmd->is_shingle) { nargs += RSPAMD_SHINGLE_SIZE * 2; @@ -1502,14 +1494,14 @@ rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, } else if (cmd->cmd == FUZZY_REFRESH) { nargs += 3; - session->nextended ++; + session->nextended++; if (io_cmd->is_shingle) { nargs += RSPAMD_SHINGLE_SIZE * 3; } } else { - session->nignored ++; + session->nignored++; } } @@ -1519,48 +1511,48 @@ rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, session->command = RSPAMD_FUZZY_REDIS_COMMAND_UPDATES; session->cmd = cmd; session->prob = 1.0f; - session->event_loop = rspamd_fuzzy_backend_event_base (bk); + session->event_loop = rspamd_fuzzy_backend_event_base(bk); /* First of all check digest */ session->nargs = nargs; - session->argv = g_malloc0 (sizeof (gchar *) * session->nargs); - session->argv_lens = g_malloc0 (sizeof (gsize) * session->nargs); - - up = rspamd_upstream_get (ups, - RSPAMD_UPSTREAM_MASTER_SLAVE, - NULL, - 0); - - session->up = rspamd_upstream_ref (up); - addr = rspamd_upstream_addr_next (up); - g_assert (addr != NULL); - session->ctx = rspamd_redis_pool_connect (backend->pool, - backend->dbname, backend->password, - rspamd_inet_address_to_string (addr), - rspamd_inet_address_get_port (addr)); + session->argv = g_malloc0(sizeof(gchar *) * session->nargs); + session->argv_lens = g_malloc0(sizeof(gsize) * session->nargs); + + up = rspamd_upstream_get(ups, + RSPAMD_UPSTREAM_MASTER_SLAVE, + NULL, + 0); + + session->up = rspamd_upstream_ref(up); + addr = rspamd_upstream_addr_next(up); + g_assert(addr != NULL); + session->ctx = rspamd_redis_pool_connect(backend->pool, + backend->dbname, backend->password, + rspamd_inet_address_to_string(addr), + rspamd_inet_address_get_port(addr)); if (session->ctx == NULL) { - rspamd_upstream_fail (up, TRUE, strerror (errno)); - rspamd_fuzzy_redis_session_dtor (session, TRUE); + rspamd_upstream_fail(up, TRUE, strerror(errno)); + rspamd_fuzzy_redis_session_dtor(session, TRUE); if (cb) { - cb (FALSE, 0, 0, 0, 0, ud); + cb(FALSE, 0, 0, 0, 0, ud); } } else { /* Start with MULTI command */ - session->argv[0] = g_strdup ("MULTI"); + session->argv[0] = g_strdup("MULTI"); session->argv_lens[0] = 5; - if (redisAsyncCommandArgv (session->ctx, NULL, NULL, - 1, - (const gchar **)session->argv, - session->argv_lens) != REDIS_OK) { + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 1, + (const gchar **) session->argv, + session->argv_lens) != REDIS_OK) { if (cb) { - cb (FALSE, 0, 0, 0, 0, ud); + cb(FALSE, 0, 0, 0, 0, ud); } - rspamd_fuzzy_redis_session_dtor (session, TRUE); + rspamd_fuzzy_redis_session_dtor(session, TRUE); return; } @@ -1568,78 +1560,77 @@ rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, /* Now split the rest of commands in packs and emit them command by command */ cur_shift = 1; - for (i = 0; i < updates->len; i ++) { - io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i); + for (i = 0; i < updates->len; i++) { + io_cmd = &g_array_index(updates, struct fuzzy_peer_cmd, i); - if (!rspamd_fuzzy_update_append_command (bk, session, io_cmd, - &cur_shift)) { + if (!rspamd_fuzzy_update_append_command(bk, session, io_cmd, + &cur_shift)) { if (cb) { - cb (FALSE, 0, 0, 0, 0, ud); + cb(FALSE, 0, 0, 0, 0, ud); } - rspamd_fuzzy_redis_session_dtor (session, TRUE); + rspamd_fuzzy_redis_session_dtor(session, TRUE); return; } } /* Now INCR command for the source */ - key = g_string_new (backend->redis_object); - g_string_append (key, src); - session->argv[cur_shift] = g_strdup ("INCR"); - session->argv_lens[cur_shift ++] = 4; + key = g_string_new(backend->redis_object); + g_string_append(key, src); + session->argv[cur_shift] = g_strdup("INCR"); + session->argv_lens[cur_shift++] = 4; session->argv[cur_shift] = key->str; - session->argv_lens[cur_shift ++] = key->len; - g_string_free (key, FALSE); + session->argv_lens[cur_shift++] = key->len; + g_string_free(key, FALSE); - if (redisAsyncCommandArgv (session->ctx, NULL, NULL, - 2, - (const gchar **)&session->argv[cur_shift - 2], - &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 2, + (const gchar **) &session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { if (cb) { - cb (FALSE, 0, 0, 0, 0, ud); + cb(FALSE, 0, 0, 0, 0, ud); } - rspamd_fuzzy_redis_session_dtor (session, TRUE); + rspamd_fuzzy_redis_session_dtor(session, TRUE); return; } /* Finally we call EXEC with a specific callback */ - session->argv[cur_shift] = g_strdup ("EXEC"); + session->argv[cur_shift] = g_strdup("EXEC"); session->argv_lens[cur_shift] = 4; - if (redisAsyncCommandArgv (session->ctx, - rspamd_fuzzy_redis_update_callback, session, - 1, - (const gchar **)&session->argv[cur_shift], - &session->argv_lens[cur_shift]) != REDIS_OK) { + if (redisAsyncCommandArgv(session->ctx, + rspamd_fuzzy_redis_update_callback, session, + 1, + (const gchar **) &session->argv[cur_shift], + &session->argv_lens[cur_shift]) != REDIS_OK) { if (cb) { - cb (FALSE, 0, 0, 0, 0, ud); + cb(FALSE, 0, 0, 0, 0, ud); } - rspamd_fuzzy_redis_session_dtor (session, TRUE); + rspamd_fuzzy_redis_session_dtor(session, TRUE); return; } else { /* Add timeout */ session->timeout.data = session; - ev_now_update_if_cheap ((struct ev_loop *)session->event_loop); - ev_timer_init (&session->timeout, - rspamd_fuzzy_redis_timeout, - session->backend->timeout, 0.0); - ev_timer_start (session->event_loop, &session->timeout); + ev_now_update_if_cheap((struct ev_loop *) session->event_loop); + ev_timer_init(&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start(session->event_loop, &session->timeout); } } } -void -rspamd_fuzzy_backend_close_redis (struct rspamd_fuzzy_backend *bk, - void *subr_ud) +void rspamd_fuzzy_backend_close_redis(struct rspamd_fuzzy_backend *bk, + void *subr_ud) { struct rspamd_fuzzy_backend_redis *backend = subr_ud; - g_assert (backend != NULL); + g_assert(backend != NULL); /* * XXX: we leak lua registry element there to avoid crashing @@ -1653,5 +1644,5 @@ rspamd_fuzzy_backend_close_redis (struct rspamd_fuzzy_backend *bk, if (backend->ref.refcount > 1) { backend->terminated = true; } - REF_RELEASE (backend); + REF_RELEASE(backend); } |