You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

fuzzy_backend_redis.c 46KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666
  1. /*
  2. * Copyright 2024 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "ref.h"
  18. #include "fuzzy_backend.h"
  19. #include "fuzzy_backend_redis.h"
  20. #include "redis_pool.h"
  21. #include "cryptobox.h"
  22. #include "str_util.h"
  23. #include "upstream.h"
  24. #include "contrib/hiredis/hiredis.h"
  25. #include "contrib/hiredis/async.h"
  26. #include "lua/lua_common.h"
  27. #define REDIS_DEFAULT_PORT 6379
  28. #define REDIS_DEFAULT_OBJECT "fuzzy"
  29. #define REDIS_DEFAULT_TIMEOUT 2.0
  30. #define msg_err_redis_session(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
  31. "fuzzy_redis", session->backend->id, \
  32. G_STRFUNC, \
  33. __VA_ARGS__)
  34. #define msg_warn_redis_session(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \
  35. "fuzzy_redis", session->backend->id, \
  36. G_STRFUNC, \
  37. __VA_ARGS__)
  38. #define msg_info_redis_session(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \
  39. "fuzzy_redis", session->backend->id, \
  40. G_STRFUNC, \
  41. __VA_ARGS__)
  42. #define msg_debug_redis_session(...) rspamd_conditional_debug_fast(NULL, NULL, \
  43. rspamd_fuzzy_redis_log_id, "fuzzy_redis", session->backend->id, \
  44. G_STRFUNC, \
  45. __VA_ARGS__)
  46. INIT_LOG_MODULE(fuzzy_redis)
  47. struct rspamd_fuzzy_backend_redis {
  48. lua_State *L;
  49. const gchar *redis_object;
  50. const gchar *username;
  51. const gchar *password;
  52. const gchar *dbname;
  53. gchar *id;
  54. struct rspamd_redis_pool *pool;
  55. gdouble timeout;
  56. gint conf_ref;
  57. bool terminated;
  58. ref_entry_t ref;
  59. };
  60. enum rspamd_fuzzy_redis_command {
  61. RSPAMD_FUZZY_REDIS_COMMAND_COUNT,
  62. RSPAMD_FUZZY_REDIS_COMMAND_VERSION,
  63. RSPAMD_FUZZY_REDIS_COMMAND_UPDATES,
  64. RSPAMD_FUZZY_REDIS_COMMAND_CHECK
  65. };
  66. struct rspamd_fuzzy_redis_session {
  67. struct rspamd_fuzzy_backend_redis *backend;
  68. redisAsyncContext *ctx;
  69. ev_timer timeout;
  70. const struct rspamd_fuzzy_cmd *cmd;
  71. struct ev_loop *event_loop;
  72. float prob;
  73. gboolean shingles_checked;
  74. enum rspamd_fuzzy_redis_command command;
  75. guint nargs;
  76. guint nadded;
  77. guint ndeleted;
  78. guint nextended;
  79. guint nignored;
  80. union {
  81. rspamd_fuzzy_check_cb cb_check;
  82. rspamd_fuzzy_update_cb cb_update;
  83. rspamd_fuzzy_version_cb cb_version;
  84. rspamd_fuzzy_count_cb cb_count;
  85. } callback;
  86. void *cbdata;
  87. gchar **argv;
  88. gsize *argv_lens;
  89. struct upstream *up;
  90. guchar found_digest[rspamd_cryptobox_HASHBYTES];
  91. };
  92. static inline struct upstream_list *
  93. rspamd_redis_get_servers(struct rspamd_fuzzy_backend_redis *ctx,
  94. const gchar *what)
  95. {
  96. lua_State *L = ctx->L;
  97. struct upstream_list *res = NULL;
  98. lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->conf_ref);
  99. lua_pushstring(L, what);
  100. lua_gettable(L, -2);
  101. if (lua_type(L, -1) == LUA_TUSERDATA) {
  102. res = *((struct upstream_list **) lua_touserdata(L, -1));
  103. }
  104. else {
  105. struct lua_logger_trace tr;
  106. gchar outbuf[8192];
  107. memset(&tr, 0, sizeof(tr));
  108. lua_logger_out_type(L, -2, outbuf, sizeof(outbuf) - 1, &tr,
  109. LUA_ESCAPE_UNPRINTABLE);
  110. msg_err("cannot get %s upstreams for Redis fuzzy storage %s; table content: %s",
  111. what, ctx->id, outbuf);
  112. }
  113. lua_settop(L, 0);
  114. return res;
  115. }
  116. static inline void
  117. rspamd_fuzzy_redis_session_free_args(struct rspamd_fuzzy_redis_session *session)
  118. {
  119. guint i;
  120. if (session->argv) {
  121. for (i = 0; i < session->nargs; i++) {
  122. g_free(session->argv[i]);
  123. }
  124. g_free(session->argv);
  125. g_free(session->argv_lens);
  126. }
  127. }
  128. static void
  129. rspamd_fuzzy_redis_session_dtor(struct rspamd_fuzzy_redis_session *session,
  130. gboolean is_fatal)
  131. {
  132. redisAsyncContext *ac;
  133. if (session->ctx) {
  134. ac = session->ctx;
  135. session->ctx = NULL;
  136. rspamd_redis_pool_release_connection(session->backend->pool,
  137. ac,
  138. is_fatal ? RSPAMD_REDIS_RELEASE_FATAL : RSPAMD_REDIS_RELEASE_DEFAULT);
  139. }
  140. ev_timer_stop(session->event_loop, &session->timeout);
  141. rspamd_fuzzy_redis_session_free_args(session);
  142. REF_RELEASE(session->backend);
  143. rspamd_upstream_unref(session->up);
  144. g_free(session);
  145. }
  146. static void
  147. rspamd_fuzzy_backend_redis_dtor(struct rspamd_fuzzy_backend_redis *backend)
  148. {
  149. if (!backend->terminated && backend->conf_ref != -1) {
  150. luaL_unref(backend->L, LUA_REGISTRYINDEX, backend->conf_ref);
  151. }
  152. if (backend->id) {
  153. g_free(backend->id);
  154. }
  155. g_free(backend);
  156. }
  157. void *
  158. rspamd_fuzzy_backend_init_redis(struct rspamd_fuzzy_backend *bk,
  159. const ucl_object_t *obj, struct rspamd_config *cfg, GError **err)
  160. {
  161. struct rspamd_fuzzy_backend_redis *backend;
  162. const ucl_object_t *elt;
  163. gboolean ret = FALSE;
  164. guchar id_hash[rspamd_cryptobox_HASHBYTES];
  165. rspamd_cryptobox_hash_state_t st;
  166. lua_State *L = (lua_State *) cfg->lua_state;
  167. gint conf_ref = -1;
  168. backend = g_malloc0(sizeof(*backend));
  169. backend->timeout = REDIS_DEFAULT_TIMEOUT;
  170. backend->redis_object = REDIS_DEFAULT_OBJECT;
  171. backend->L = L;
  172. ret = rspamd_lua_try_load_redis(L, obj, cfg, &conf_ref);
  173. /* Now try global redis settings */
  174. if (!ret) {
  175. elt = ucl_object_lookup(cfg->cfg_ucl_obj, "redis");
  176. if (elt) {
  177. const ucl_object_t *specific_obj;
  178. specific_obj = ucl_object_lookup_any(elt, "fuzzy", "fuzzy_storage",
  179. NULL);
  180. if (specific_obj) {
  181. ret = rspamd_lua_try_load_redis(L, specific_obj, cfg, &conf_ref);
  182. }
  183. else {
  184. ret = rspamd_lua_try_load_redis(L, elt, cfg, &conf_ref);
  185. }
  186. }
  187. }
  188. if (!ret) {
  189. msg_err_config("cannot init redis backend for fuzzy storage");
  190. g_free(backend);
  191. return NULL;
  192. }
  193. elt = ucl_object_lookup(obj, "prefix");
  194. if (elt == NULL || ucl_object_type(elt) != UCL_STRING) {
  195. backend->redis_object = REDIS_DEFAULT_OBJECT;
  196. }
  197. else {
  198. backend->redis_object = ucl_object_tostring(elt);
  199. }
  200. backend->conf_ref = conf_ref;
  201. /* Check some common table values */
  202. lua_rawgeti(L, LUA_REGISTRYINDEX, conf_ref);
  203. lua_pushstring(L, "timeout");
  204. lua_gettable(L, -2);
  205. if (lua_type(L, -1) == LUA_TNUMBER) {
  206. backend->timeout = lua_tonumber(L, -1);
  207. }
  208. lua_pop(L, 1);
  209. lua_pushstring(L, "db");
  210. lua_gettable(L, -2);
  211. if (lua_type(L, -1) == LUA_TSTRING) {
  212. backend->dbname = rspamd_mempool_strdup(cfg->cfg_pool,
  213. lua_tostring(L, -1));
  214. }
  215. lua_pop(L, 1);
  216. lua_pushstring(L, "username");
  217. lua_gettable(L, -2);
  218. if (lua_type(L, -1) == LUA_TSTRING) {
  219. backend->username = rspamd_mempool_strdup(cfg->cfg_pool,
  220. lua_tostring(L, -1));
  221. }
  222. lua_pop(L, 1);
  223. lua_pushstring(L, "password");
  224. lua_gettable(L, -2);
  225. if (lua_type(L, -1) == LUA_TSTRING) {
  226. backend->password = rspamd_mempool_strdup(cfg->cfg_pool,
  227. lua_tostring(L, -1));
  228. }
  229. lua_pop(L, 1);
  230. lua_settop(L, 0);
  231. REF_INIT_RETAIN(backend, rspamd_fuzzy_backend_redis_dtor);
  232. backend->pool = cfg->redis_pool;
  233. rspamd_cryptobox_hash_init(&st, NULL, 0);
  234. rspamd_cryptobox_hash_update(&st, backend->redis_object,
  235. strlen(backend->redis_object));
  236. if (backend->dbname) {
  237. rspamd_cryptobox_hash_update(&st, backend->dbname,
  238. strlen(backend->dbname));
  239. }
  240. if (backend->username) {
  241. rspamd_cryptobox_hash_update(&st, backend->username,
  242. strlen(backend->username));
  243. }
  244. if (backend->password) {
  245. rspamd_cryptobox_hash_update(&st, backend->password,
  246. strlen(backend->password));
  247. }
  248. rspamd_cryptobox_hash_final(&st, id_hash);
  249. backend->id = rspamd_encode_base32(id_hash, sizeof(id_hash), RSPAMD_BASE32_DEFAULT);
  250. return backend;
  251. }
  252. static void
  253. rspamd_fuzzy_redis_timeout(EV_P_ ev_timer *w, int revents)
  254. {
  255. struct rspamd_fuzzy_redis_session *session =
  256. (struct rspamd_fuzzy_redis_session *) w->data;
  257. redisAsyncContext *ac;
  258. static char errstr[128];
  259. if (session->ctx) {
  260. ac = session->ctx;
  261. session->ctx = NULL;
  262. ac->err = REDIS_ERR_IO;
  263. /* Should be safe as in hiredis it is char[128] */
  264. rspamd_snprintf(errstr, sizeof(errstr), "%s", strerror(ETIMEDOUT));
  265. ac->errstr = errstr;
  266. /* This will cause session closing */
  267. rspamd_redis_pool_release_connection(session->backend->pool,
  268. ac, RSPAMD_REDIS_RELEASE_FATAL);
  269. }
  270. }
  271. static void rspamd_fuzzy_redis_check_callback(redisAsyncContext *c, gpointer r,
  272. gpointer priv);
  273. struct _rspamd_fuzzy_shingles_helper {
  274. guchar digest[64];
  275. guint found;
  276. };
  277. static gint
  278. rspamd_fuzzy_backend_redis_shingles_cmp(const void *a, const void *b)
  279. {
  280. const struct _rspamd_fuzzy_shingles_helper *sha = a,
  281. *shb = b;
  282. return memcmp(sha->digest, shb->digest, sizeof(sha->digest));
  283. }
  284. static void
  285. rspamd_fuzzy_redis_shingles_callback(redisAsyncContext *c, gpointer r,
  286. gpointer priv)
  287. {
  288. struct rspamd_fuzzy_redis_session *session = priv;
  289. redisReply *reply = r, *cur;
  290. struct rspamd_fuzzy_reply rep;
  291. GString *key;
  292. struct _rspamd_fuzzy_shingles_helper *shingles, *prev = NULL, *sel = NULL;
  293. guint i, found = 0, max_found = 0, cur_found = 0;
  294. ev_timer_stop(session->event_loop, &session->timeout);
  295. memset(&rep, 0, sizeof(rep));
  296. if (c->err == 0 && reply != NULL) {
  297. rspamd_upstream_ok(session->up);
  298. if (reply->type == REDIS_REPLY_ARRAY &&
  299. reply->elements == RSPAMD_SHINGLE_SIZE) {
  300. shingles = g_alloca(sizeof(struct _rspamd_fuzzy_shingles_helper) *
  301. RSPAMD_SHINGLE_SIZE);
  302. for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
  303. cur = reply->element[i];
  304. if (cur->type == REDIS_REPLY_STRING) {
  305. shingles[i].found = 1;
  306. memcpy(shingles[i].digest, cur->str, MIN(64, cur->len));
  307. found++;
  308. }
  309. else {
  310. memset(shingles[i].digest, 0, sizeof(shingles[i].digest));
  311. shingles[i].found = 0;
  312. }
  313. }
  314. if (found > RSPAMD_SHINGLE_SIZE / 2) {
  315. /* Now sort to find the most frequent element */
  316. qsort(shingles, RSPAMD_SHINGLE_SIZE,
  317. sizeof(struct _rspamd_fuzzy_shingles_helper),
  318. rspamd_fuzzy_backend_redis_shingles_cmp);
  319. prev = &shingles[0];
  320. for (i = 1; i < RSPAMD_SHINGLE_SIZE; i++) {
  321. if (!shingles[i].found) {
  322. continue;
  323. }
  324. if (memcmp(shingles[i].digest, prev->digest, 64) == 0) {
  325. cur_found++;
  326. if (cur_found > max_found) {
  327. max_found = cur_found;
  328. sel = &shingles[i];
  329. }
  330. }
  331. else {
  332. cur_found = 1;
  333. prev = &shingles[i];
  334. }
  335. }
  336. if (max_found > RSPAMD_SHINGLE_SIZE / 2) {
  337. session->prob = ((float) max_found) / RSPAMD_SHINGLE_SIZE;
  338. rep.v1.prob = session->prob;
  339. g_assert(sel != NULL);
  340. /* Prepare new check command */
  341. rspamd_fuzzy_redis_session_free_args(session);
  342. session->nargs = 5;
  343. session->argv = g_malloc(sizeof(gchar *) * session->nargs);
  344. session->argv_lens = g_malloc(sizeof(gsize) * session->nargs);
  345. key = g_string_new(session->backend->redis_object);
  346. g_string_append_len(key, sel->digest, sizeof(sel->digest));
  347. session->argv[0] = g_strdup("HMGET");
  348. session->argv_lens[0] = 5;
  349. session->argv[1] = key->str;
  350. session->argv_lens[1] = key->len;
  351. session->argv[2] = g_strdup("V");
  352. session->argv_lens[2] = 1;
  353. session->argv[3] = g_strdup("F");
  354. session->argv_lens[3] = 1;
  355. session->argv[4] = g_strdup("C");
  356. session->argv_lens[4] = 1;
  357. g_string_free(key, FALSE); /* Do not free underlying array */
  358. memcpy(session->found_digest, sel->digest,
  359. sizeof(session->cmd->digest));
  360. g_assert(session->ctx != NULL);
  361. if (redisAsyncCommandArgv(session->ctx,
  362. rspamd_fuzzy_redis_check_callback,
  363. session, session->nargs,
  364. (const gchar **) session->argv,
  365. session->argv_lens) != REDIS_OK) {
  366. if (session->callback.cb_check) {
  367. memset(&rep, 0, sizeof(rep));
  368. session->callback.cb_check(&rep, session->cbdata);
  369. }
  370. rspamd_fuzzy_redis_session_dtor(session, TRUE);
  371. }
  372. else {
  373. /* Add timeout */
  374. session->timeout.data = session;
  375. ev_now_update_if_cheap((struct ev_loop *) session->event_loop);
  376. ev_timer_init(&session->timeout,
  377. rspamd_fuzzy_redis_timeout,
  378. session->backend->timeout, 0.0);
  379. ev_timer_start(session->event_loop, &session->timeout);
  380. }
  381. return;
  382. }
  383. }
  384. }
  385. else if (reply->type == REDIS_REPLY_ERROR) {
  386. msg_err_redis_session("fuzzy backend redis error: \"%s\"",
  387. reply->str);
  388. }
  389. if (session->callback.cb_check) {
  390. session->callback.cb_check(&rep, session->cbdata);
  391. }
  392. }
  393. else {
  394. if (session->callback.cb_check) {
  395. session->callback.cb_check(&rep, session->cbdata);
  396. }
  397. if (c->errstr) {
  398. msg_err_redis_session("error getting shingles: %s", c->errstr);
  399. rspamd_upstream_fail(session->up, FALSE, c->errstr);
  400. }
  401. }
  402. rspamd_fuzzy_redis_session_dtor(session, FALSE);
  403. }
  404. static void
  405. rspamd_fuzzy_backend_check_shingles(struct rspamd_fuzzy_redis_session *session)
  406. {
  407. struct rspamd_fuzzy_reply rep;
  408. const struct rspamd_fuzzy_shingle_cmd *shcmd;
  409. GString *key;
  410. guint i, init_len;
  411. rspamd_fuzzy_redis_session_free_args(session);
  412. /* First of all check digest */
  413. session->nargs = RSPAMD_SHINGLE_SIZE + 1;
  414. session->argv = g_malloc(sizeof(gchar *) * session->nargs);
  415. session->argv_lens = g_malloc(sizeof(gsize) * session->nargs);
  416. shcmd = (const struct rspamd_fuzzy_shingle_cmd *) session->cmd;
  417. session->argv[0] = g_strdup("MGET");
  418. session->argv_lens[0] = 4;
  419. init_len = strlen(session->backend->redis_object);
  420. for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
  421. key = g_string_sized_new(init_len + 2 + 2 + sizeof("18446744073709551616"));
  422. rspamd_printf_gstring(key, "%s_%d_%uL", session->backend->redis_object,
  423. i, shcmd->sgl.hashes[i]);
  424. session->argv[i + 1] = key->str;
  425. session->argv_lens[i + 1] = key->len;
  426. g_string_free(key, FALSE); /* Do not free underlying array */
  427. }
  428. session->shingles_checked = TRUE;
  429. g_assert(session->ctx != NULL);
  430. if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_shingles_callback,
  431. session, session->nargs,
  432. (const gchar **) session->argv, session->argv_lens) != REDIS_OK) {
  433. msg_err("cannot execute redis command on %s: %s",
  434. rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)),
  435. session->ctx->errstr);
  436. if (session->callback.cb_check) {
  437. memset(&rep, 0, sizeof(rep));
  438. session->callback.cb_check(&rep, session->cbdata);
  439. }
  440. rspamd_fuzzy_redis_session_dtor(session, TRUE);
  441. }
  442. else {
  443. /* Add timeout */
  444. session->timeout.data = session;
  445. ev_now_update_if_cheap((struct ev_loop *) session->event_loop);
  446. ev_timer_init(&session->timeout,
  447. rspamd_fuzzy_redis_timeout,
  448. session->backend->timeout, 0.0);
  449. ev_timer_start(session->event_loop, &session->timeout);
  450. }
  451. }
  452. static void
  453. rspamd_fuzzy_redis_check_callback(redisAsyncContext *c, gpointer r,
  454. gpointer priv)
  455. {
  456. struct rspamd_fuzzy_redis_session *session = priv;
  457. redisReply *reply = r, *cur;
  458. struct rspamd_fuzzy_reply rep;
  459. gulong value;
  460. guint found_elts = 0;
  461. ev_timer_stop(session->event_loop, &session->timeout);
  462. memset(&rep, 0, sizeof(rep));
  463. if (c->err == 0 && reply != NULL) {
  464. rspamd_upstream_ok(session->up);
  465. if (reply->type == REDIS_REPLY_ARRAY && reply->elements >= 2) {
  466. cur = reply->element[0];
  467. if (cur->type == REDIS_REPLY_STRING) {
  468. value = strtoul(cur->str, NULL, 10);
  469. rep.v1.value = value;
  470. found_elts++;
  471. }
  472. cur = reply->element[1];
  473. if (cur->type == REDIS_REPLY_STRING) {
  474. value = strtoul(cur->str, NULL, 10);
  475. rep.v1.flag = value;
  476. found_elts++;
  477. }
  478. if (found_elts >= 2) {
  479. rep.v1.prob = session->prob;
  480. memcpy(rep.digest, session->found_digest, sizeof(rep.digest));
  481. }
  482. rep.ts = 0;
  483. if (reply->elements > 2) {
  484. cur = reply->element[2];
  485. if (cur->type == REDIS_REPLY_STRING) {
  486. rep.ts = strtoul(cur->str, NULL, 10);
  487. }
  488. }
  489. }
  490. else if (reply->type == REDIS_REPLY_ERROR) {
  491. msg_err_redis_session("fuzzy backend redis error: \"%s\"",
  492. reply->str);
  493. }
  494. if (found_elts < 2) {
  495. if (session->cmd->shingles_count > 0 && !session->shingles_checked) {
  496. /* We also need to check all shingles here */
  497. rspamd_fuzzy_backend_check_shingles(session);
  498. /* Do not free session */
  499. return;
  500. }
  501. else {
  502. if (session->callback.cb_check) {
  503. session->callback.cb_check(&rep, session->cbdata);
  504. }
  505. }
  506. }
  507. else {
  508. if (session->callback.cb_check) {
  509. session->callback.cb_check(&rep, session->cbdata);
  510. }
  511. }
  512. }
  513. else {
  514. if (session->callback.cb_check) {
  515. session->callback.cb_check(&rep, session->cbdata);
  516. }
  517. if (c->errstr) {
  518. msg_err_redis_session("error getting hashes on %s: %s",
  519. rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)),
  520. c->errstr);
  521. rspamd_upstream_fail(session->up, FALSE, c->errstr);
  522. }
  523. }
  524. rspamd_fuzzy_redis_session_dtor(session, FALSE);
  525. }
  526. void rspamd_fuzzy_backend_check_redis(struct rspamd_fuzzy_backend *bk,
  527. const struct rspamd_fuzzy_cmd *cmd,
  528. rspamd_fuzzy_check_cb cb, void *ud,
  529. void *subr_ud)
  530. {
  531. struct rspamd_fuzzy_backend_redis *backend = subr_ud;
  532. struct rspamd_fuzzy_redis_session *session;
  533. struct upstream *up;
  534. struct upstream_list *ups;
  535. rspamd_inet_addr_t *addr;
  536. struct rspamd_fuzzy_reply rep;
  537. GString *key;
  538. g_assert(backend != NULL);
  539. ups = rspamd_redis_get_servers(backend, "read_servers");
  540. if (!ups) {
  541. if (cb) {
  542. memset(&rep, 0, sizeof(rep));
  543. cb(&rep, ud);
  544. }
  545. return;
  546. }
  547. session = g_malloc0(sizeof(*session));
  548. session->backend = backend;
  549. REF_RETAIN(session->backend);
  550. session->callback.cb_check = cb;
  551. session->cbdata = ud;
  552. session->command = RSPAMD_FUZZY_REDIS_COMMAND_CHECK;
  553. session->cmd = cmd;
  554. session->prob = 1.0;
  555. memcpy(rep.digest, session->cmd->digest, sizeof(rep.digest));
  556. memcpy(session->found_digest, session->cmd->digest, sizeof(rep.digest));
  557. session->event_loop = rspamd_fuzzy_backend_event_base(bk);
  558. /* First of all check digest */
  559. session->nargs = 5;
  560. session->argv = g_malloc(sizeof(gchar *) * session->nargs);
  561. session->argv_lens = g_malloc(sizeof(gsize) * session->nargs);
  562. key = g_string_new(backend->redis_object);
  563. g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
  564. session->argv[0] = g_strdup("HMGET");
  565. session->argv_lens[0] = 5;
  566. session->argv[1] = key->str;
  567. session->argv_lens[1] = key->len;
  568. session->argv[2] = g_strdup("V");
  569. session->argv_lens[2] = 1;
  570. session->argv[3] = g_strdup("F");
  571. session->argv_lens[3] = 1;
  572. session->argv[4] = g_strdup("C");
  573. session->argv_lens[4] = 1;
  574. g_string_free(key, FALSE); /* Do not free underlying array */
  575. up = rspamd_upstream_get(ups,
  576. RSPAMD_UPSTREAM_ROUND_ROBIN,
  577. NULL,
  578. 0);
  579. session->up = rspamd_upstream_ref(up);
  580. addr = rspamd_upstream_addr_next(up);
  581. g_assert(addr != NULL);
  582. session->ctx = rspamd_redis_pool_connect(backend->pool,
  583. backend->dbname,
  584. backend->username, backend->password,
  585. rspamd_inet_address_to_string(addr),
  586. rspamd_inet_address_get_port(addr));
  587. if (session->ctx == NULL) {
  588. rspamd_upstream_fail(up, TRUE, strerror(errno));
  589. rspamd_fuzzy_redis_session_dtor(session, TRUE);
  590. if (cb) {
  591. memset(&rep, 0, sizeof(rep));
  592. cb(&rep, ud);
  593. }
  594. }
  595. else {
  596. if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_check_callback,
  597. session, session->nargs,
  598. (const gchar **) session->argv, session->argv_lens) != REDIS_OK) {
  599. rspamd_fuzzy_redis_session_dtor(session, TRUE);
  600. if (cb) {
  601. memset(&rep, 0, sizeof(rep));
  602. cb(&rep, ud);
  603. }
  604. }
  605. else {
  606. /* Add timeout */
  607. session->timeout.data = session;
  608. ev_now_update_if_cheap((struct ev_loop *) session->event_loop);
  609. ev_timer_init(&session->timeout,
  610. rspamd_fuzzy_redis_timeout,
  611. session->backend->timeout, 0.0);
  612. ev_timer_start(session->event_loop, &session->timeout);
  613. }
  614. }
  615. }
  616. static void
  617. rspamd_fuzzy_redis_count_callback(redisAsyncContext *c, gpointer r,
  618. gpointer priv)
  619. {
  620. struct rspamd_fuzzy_redis_session *session = priv;
  621. redisReply *reply = r;
  622. gulong nelts;
  623. ev_timer_stop(session->event_loop, &session->timeout);
  624. if (c->err == 0 && reply != NULL) {
  625. rspamd_upstream_ok(session->up);
  626. if (reply->type == REDIS_REPLY_INTEGER) {
  627. if (session->callback.cb_count) {
  628. session->callback.cb_count(reply->integer, session->cbdata);
  629. }
  630. }
  631. else if (reply->type == REDIS_REPLY_STRING) {
  632. nelts = strtoul(reply->str, NULL, 10);
  633. if (session->callback.cb_count) {
  634. session->callback.cb_count(nelts, session->cbdata);
  635. }
  636. }
  637. else {
  638. if (reply->type == REDIS_REPLY_ERROR) {
  639. msg_err_redis_session("fuzzy backend redis error: \"%s\"",
  640. reply->str);
  641. }
  642. if (session->callback.cb_count) {
  643. session->callback.cb_count(0, session->cbdata);
  644. }
  645. }
  646. }
  647. else {
  648. if (session->callback.cb_count) {
  649. session->callback.cb_count(0, session->cbdata);
  650. }
  651. if (c->errstr) {
  652. msg_err_redis_session("error getting count on %s: %s",
  653. rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)),
  654. c->errstr);
  655. rspamd_upstream_fail(session->up, FALSE, c->errstr);
  656. }
  657. }
  658. rspamd_fuzzy_redis_session_dtor(session, FALSE);
  659. }
  660. void rspamd_fuzzy_backend_count_redis(struct rspamd_fuzzy_backend *bk,
  661. rspamd_fuzzy_count_cb cb, void *ud,
  662. void *subr_ud)
  663. {
  664. struct rspamd_fuzzy_backend_redis *backend = subr_ud;
  665. struct rspamd_fuzzy_redis_session *session;
  666. struct upstream *up;
  667. struct upstream_list *ups;
  668. rspamd_inet_addr_t *addr;
  669. GString *key;
  670. g_assert(backend != NULL);
  671. ups = rspamd_redis_get_servers(backend, "read_servers");
  672. if (!ups) {
  673. if (cb) {
  674. cb(0, ud);
  675. }
  676. return;
  677. }
  678. session = g_malloc0(sizeof(*session));
  679. session->backend = backend;
  680. REF_RETAIN(session->backend);
  681. session->callback.cb_count = cb;
  682. session->cbdata = ud;
  683. session->command = RSPAMD_FUZZY_REDIS_COMMAND_COUNT;
  684. session->event_loop = rspamd_fuzzy_backend_event_base(bk);
  685. session->nargs = 2;
  686. session->argv = g_malloc(sizeof(gchar *) * 2);
  687. session->argv_lens = g_malloc(sizeof(gsize) * 2);
  688. key = g_string_new(backend->redis_object);
  689. g_string_append(key, "_count");
  690. session->argv[0] = g_strdup("GET");
  691. session->argv_lens[0] = 3;
  692. session->argv[1] = key->str;
  693. session->argv_lens[1] = key->len;
  694. g_string_free(key, FALSE); /* Do not free underlying array */
  695. up = rspamd_upstream_get(ups,
  696. RSPAMD_UPSTREAM_ROUND_ROBIN,
  697. NULL,
  698. 0);
  699. session->up = rspamd_upstream_ref(up);
  700. addr = rspamd_upstream_addr_next(up);
  701. g_assert(addr != NULL);
  702. session->ctx = rspamd_redis_pool_connect(backend->pool,
  703. backend->dbname,
  704. backend->username, backend->password,
  705. rspamd_inet_address_to_string(addr),
  706. rspamd_inet_address_get_port(addr));
  707. if (session->ctx == NULL) {
  708. rspamd_upstream_fail(up, TRUE, strerror(errno));
  709. rspamd_fuzzy_redis_session_dtor(session, TRUE);
  710. if (cb) {
  711. cb(0, ud);
  712. }
  713. }
  714. else {
  715. if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_count_callback,
  716. session, session->nargs,
  717. (const gchar **) session->argv, session->argv_lens) != REDIS_OK) {
  718. rspamd_fuzzy_redis_session_dtor(session, TRUE);
  719. if (cb) {
  720. cb(0, ud);
  721. }
  722. }
  723. else {
  724. /* Add timeout */
  725. session->timeout.data = session;
  726. ev_now_update_if_cheap((struct ev_loop *) session->event_loop);
  727. ev_timer_init(&session->timeout,
  728. rspamd_fuzzy_redis_timeout,
  729. session->backend->timeout, 0.0);
  730. ev_timer_start(session->event_loop, &session->timeout);
  731. }
  732. }
  733. }
  734. static void
  735. rspamd_fuzzy_redis_version_callback(redisAsyncContext *c, gpointer r,
  736. gpointer priv)
  737. {
  738. struct rspamd_fuzzy_redis_session *session = priv;
  739. redisReply *reply = r;
  740. gulong nelts;
  741. ev_timer_stop(session->event_loop, &session->timeout);
  742. if (c->err == 0 && reply != NULL) {
  743. rspamd_upstream_ok(session->up);
  744. if (reply->type == REDIS_REPLY_INTEGER) {
  745. if (session->callback.cb_version) {
  746. session->callback.cb_version(reply->integer, session->cbdata);
  747. }
  748. }
  749. else if (reply->type == REDIS_REPLY_STRING) {
  750. nelts = strtoul(reply->str, NULL, 10);
  751. if (session->callback.cb_version) {
  752. session->callback.cb_version(nelts, session->cbdata);
  753. }
  754. }
  755. else {
  756. if (reply->type == REDIS_REPLY_ERROR) {
  757. msg_err_redis_session("fuzzy backend redis error: \"%s\"",
  758. reply->str);
  759. }
  760. if (session->callback.cb_version) {
  761. session->callback.cb_version(0, session->cbdata);
  762. }
  763. }
  764. }
  765. else {
  766. if (session->callback.cb_version) {
  767. session->callback.cb_version(0, session->cbdata);
  768. }
  769. if (c->errstr) {
  770. msg_err_redis_session("error getting version on %s: %s",
  771. rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)),
  772. c->errstr);
  773. rspamd_upstream_fail(session->up, FALSE, c->errstr);
  774. }
  775. }
  776. rspamd_fuzzy_redis_session_dtor(session, FALSE);
  777. }
  778. void rspamd_fuzzy_backend_version_redis(struct rspamd_fuzzy_backend *bk,
  779. const gchar *src,
  780. rspamd_fuzzy_version_cb cb, void *ud,
  781. void *subr_ud)
  782. {
  783. struct rspamd_fuzzy_backend_redis *backend = subr_ud;
  784. struct rspamd_fuzzy_redis_session *session;
  785. struct upstream *up;
  786. struct upstream_list *ups;
  787. rspamd_inet_addr_t *addr;
  788. GString *key;
  789. g_assert(backend != NULL);
  790. ups = rspamd_redis_get_servers(backend, "read_servers");
  791. if (!ups) {
  792. if (cb) {
  793. cb(0, ud);
  794. }
  795. return;
  796. }
  797. session = g_malloc0(sizeof(*session));
  798. session->backend = backend;
  799. REF_RETAIN(session->backend);
  800. session->callback.cb_version = cb;
  801. session->cbdata = ud;
  802. session->command = RSPAMD_FUZZY_REDIS_COMMAND_VERSION;
  803. session->event_loop = rspamd_fuzzy_backend_event_base(bk);
  804. session->nargs = 2;
  805. session->argv = g_malloc(sizeof(gchar *) * 2);
  806. session->argv_lens = g_malloc(sizeof(gsize) * 2);
  807. key = g_string_new(backend->redis_object);
  808. g_string_append(key, src);
  809. session->argv[0] = g_strdup("GET");
  810. session->argv_lens[0] = 3;
  811. session->argv[1] = key->str;
  812. session->argv_lens[1] = key->len;
  813. g_string_free(key, FALSE); /* Do not free underlying array */
  814. up = rspamd_upstream_get(ups,
  815. RSPAMD_UPSTREAM_ROUND_ROBIN,
  816. NULL,
  817. 0);
  818. session->up = rspamd_upstream_ref(up);
  819. addr = rspamd_upstream_addr_next(up);
  820. g_assert(addr != NULL);
  821. session->ctx = rspamd_redis_pool_connect(backend->pool,
  822. backend->dbname,
  823. backend->username, backend->password,
  824. rspamd_inet_address_to_string(addr),
  825. rspamd_inet_address_get_port(addr));
  826. if (session->ctx == NULL) {
  827. rspamd_upstream_fail(up, FALSE, strerror(errno));
  828. rspamd_fuzzy_redis_session_dtor(session, TRUE);
  829. if (cb) {
  830. cb(0, ud);
  831. }
  832. }
  833. else {
  834. if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_version_callback,
  835. session, session->nargs,
  836. (const gchar **) session->argv, session->argv_lens) != REDIS_OK) {
  837. rspamd_fuzzy_redis_session_dtor(session, TRUE);
  838. if (cb) {
  839. cb(0, ud);
  840. }
  841. }
  842. else {
  843. /* Add timeout */
  844. session->timeout.data = session;
  845. ev_now_update_if_cheap((struct ev_loop *) session->event_loop);
  846. ev_timer_init(&session->timeout,
  847. rspamd_fuzzy_redis_timeout,
  848. session->backend->timeout, 0.0);
  849. ev_timer_start(session->event_loop, &session->timeout);
  850. }
  851. }
  852. }
  853. const gchar *
  854. rspamd_fuzzy_backend_id_redis(struct rspamd_fuzzy_backend *bk,
  855. void *subr_ud)
  856. {
  857. struct rspamd_fuzzy_backend_redis *backend = subr_ud;
  858. g_assert(backend != NULL);
  859. return backend->id;
  860. }
  861. void rspamd_fuzzy_backend_expire_redis(struct rspamd_fuzzy_backend *bk,
  862. void *subr_ud)
  863. {
  864. struct rspamd_fuzzy_backend_redis *backend = subr_ud;
  865. g_assert(backend != NULL);
  866. }
  867. static gboolean
  868. rspamd_fuzzy_update_append_command(struct rspamd_fuzzy_backend *bk,
  869. struct rspamd_fuzzy_redis_session *session,
  870. struct fuzzy_peer_cmd *io_cmd, guint *shift)
  871. {
  872. GString *key, *value;
  873. guint cur_shift = *shift;
  874. guint i, klen;
  875. struct rspamd_fuzzy_cmd *cmd;
  876. if (io_cmd->is_shingle) {
  877. cmd = &io_cmd->cmd.shingle.basic;
  878. }
  879. else {
  880. cmd = &io_cmd->cmd.normal;
  881. }
  882. if (cmd->cmd == FUZZY_WRITE) {
  883. /*
  884. * For each normal hash addition we do 5 redis commands:
  885. * HSET <key> F <flag>
  886. * HSETNX <key> C <time>
  887. * HINCRBY <key> V <weight>
  888. * EXPIRE <key> <expire>
  889. * Where <key> is <prefix> || <digest>
  890. */
  891. /* HSET */
  892. klen = strlen(session->backend->redis_object) +
  893. sizeof(cmd->digest) + 1;
  894. key = g_string_sized_new(klen);
  895. g_string_append(key, session->backend->redis_object);
  896. g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
  897. value = g_string_sized_new(sizeof("4294967296"));
  898. rspamd_printf_gstring(value, "%d", cmd->flag);
  899. if (cmd->version & RSPAMD_FUZZY_FLAG_WEAK) {
  900. session->argv[cur_shift] = g_strdup("HSETNX");
  901. session->argv_lens[cur_shift++] = sizeof("HSETNX") - 1;
  902. }
  903. else {
  904. session->argv[cur_shift] = g_strdup("HSET");
  905. session->argv_lens[cur_shift++] = sizeof("HSET") - 1;
  906. }
  907. session->argv[cur_shift] = key->str;
  908. session->argv_lens[cur_shift++] = key->len;
  909. session->argv[cur_shift] = g_strdup("F");
  910. session->argv_lens[cur_shift++] = sizeof("F") - 1;
  911. session->argv[cur_shift] = value->str;
  912. session->argv_lens[cur_shift++] = value->len;
  913. g_string_free(key, FALSE);
  914. g_string_free(value, FALSE);
  915. if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
  916. 4,
  917. (const gchar **) &session->argv[cur_shift - 4],
  918. &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
  919. return FALSE;
  920. }
  921. /* HSETNX */
  922. klen = strlen(session->backend->redis_object) +
  923. sizeof(cmd->digest) + 1;
  924. key = g_string_sized_new(klen);
  925. g_string_append(key, session->backend->redis_object);
  926. g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
  927. value = g_string_sized_new(sizeof("18446744073709551616"));
  928. rspamd_printf_gstring(value, "%L", (int64_t) rspamd_get_calendar_ticks());
  929. session->argv[cur_shift] = g_strdup("HSETNX");
  930. session->argv_lens[cur_shift++] = sizeof("HSETNX") - 1;
  931. session->argv[cur_shift] = key->str;
  932. session->argv_lens[cur_shift++] = key->len;
  933. session->argv[cur_shift] = g_strdup("C");
  934. session->argv_lens[cur_shift++] = sizeof("C") - 1;
  935. session->argv[cur_shift] = value->str;
  936. session->argv_lens[cur_shift++] = value->len;
  937. g_string_free(key, FALSE);
  938. g_string_free(value, FALSE);
  939. if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
  940. 4,
  941. (const gchar **) &session->argv[cur_shift - 4],
  942. &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
  943. return FALSE;
  944. }
  945. /* HINCRBY */
  946. key = g_string_sized_new(klen);
  947. g_string_append(key, session->backend->redis_object);
  948. g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
  949. value = g_string_sized_new(sizeof("4294967296"));
  950. rspamd_printf_gstring(value, "%d", cmd->value);
  951. session->argv[cur_shift] = g_strdup("HINCRBY");
  952. session->argv_lens[cur_shift++] = sizeof("HINCRBY") - 1;
  953. session->argv[cur_shift] = key->str;
  954. session->argv_lens[cur_shift++] = key->len;
  955. session->argv[cur_shift] = g_strdup("V");
  956. session->argv_lens[cur_shift++] = sizeof("V") - 1;
  957. session->argv[cur_shift] = value->str;
  958. session->argv_lens[cur_shift++] = value->len;
  959. g_string_free(key, FALSE);
  960. g_string_free(value, FALSE);
  961. if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
  962. 4,
  963. (const gchar **) &session->argv[cur_shift - 4],
  964. &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
  965. return FALSE;
  966. }
  967. /* EXPIRE */
  968. key = g_string_sized_new(klen);
  969. g_string_append(key, session->backend->redis_object);
  970. g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
  971. value = g_string_sized_new(sizeof("4294967296"));
  972. rspamd_printf_gstring(value, "%d",
  973. (gint) rspamd_fuzzy_backend_get_expire(bk));
  974. session->argv[cur_shift] = g_strdup("EXPIRE");
  975. session->argv_lens[cur_shift++] = sizeof("EXPIRE") - 1;
  976. session->argv[cur_shift] = key->str;
  977. session->argv_lens[cur_shift++] = key->len;
  978. session->argv[cur_shift] = value->str;
  979. session->argv_lens[cur_shift++] = value->len;
  980. g_string_free(key, FALSE);
  981. g_string_free(value, FALSE);
  982. if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
  983. 3,
  984. (const gchar **) &session->argv[cur_shift - 3],
  985. &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
  986. return FALSE;
  987. }
  988. /* INCR */
  989. key = g_string_sized_new(klen);
  990. g_string_append(key, session->backend->redis_object);
  991. g_string_append(key, "_count");
  992. session->argv[cur_shift] = g_strdup("INCR");
  993. session->argv_lens[cur_shift++] = sizeof("INCR") - 1;
  994. session->argv[cur_shift] = key->str;
  995. session->argv_lens[cur_shift++] = key->len;
  996. g_string_free(key, FALSE);
  997. if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
  998. 2,
  999. (const gchar **) &session->argv[cur_shift - 2],
  1000. &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
  1001. return FALSE;
  1002. }
  1003. }
  1004. else if (cmd->cmd == FUZZY_DEL) {
  1005. /* DEL */
  1006. klen = strlen(session->backend->redis_object) +
  1007. sizeof(cmd->digest) + 1;
  1008. key = g_string_sized_new(klen);
  1009. g_string_append(key, session->backend->redis_object);
  1010. g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
  1011. session->argv[cur_shift] = g_strdup("DEL");
  1012. session->argv_lens[cur_shift++] = sizeof("DEL") - 1;
  1013. session->argv[cur_shift] = key->str;
  1014. session->argv_lens[cur_shift++] = key->len;
  1015. g_string_free(key, FALSE);
  1016. if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
  1017. 2,
  1018. (const gchar **) &session->argv[cur_shift - 2],
  1019. &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
  1020. return FALSE;
  1021. }
  1022. /* DECR */
  1023. key = g_string_sized_new(klen);
  1024. g_string_append(key, session->backend->redis_object);
  1025. g_string_append(key, "_count");
  1026. session->argv[cur_shift] = g_strdup("DECR");
  1027. session->argv_lens[cur_shift++] = sizeof("DECR") - 1;
  1028. session->argv[cur_shift] = key->str;
  1029. session->argv_lens[cur_shift++] = key->len;
  1030. g_string_free(key, FALSE);
  1031. if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
  1032. 2,
  1033. (const gchar **) &session->argv[cur_shift - 2],
  1034. &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
  1035. return FALSE;
  1036. }
  1037. }
  1038. else if (cmd->cmd == FUZZY_REFRESH) {
  1039. /*
  1040. * Issue refresh command by just EXPIRE command
  1041. * EXPIRE <key> <expire>
  1042. * Where <key> is <prefix> || <digest>
  1043. */
  1044. klen = strlen(session->backend->redis_object) +
  1045. sizeof(cmd->digest) + 1;
  1046. /* EXPIRE */
  1047. key = g_string_sized_new(klen);
  1048. g_string_append(key, session->backend->redis_object);
  1049. g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
  1050. value = g_string_sized_new(sizeof("4294967296"));
  1051. rspamd_printf_gstring(value, "%d",
  1052. (gint) rspamd_fuzzy_backend_get_expire(bk));
  1053. session->argv[cur_shift] = g_strdup("EXPIRE");
  1054. session->argv_lens[cur_shift++] = sizeof("EXPIRE") - 1;
  1055. session->argv[cur_shift] = key->str;
  1056. session->argv_lens[cur_shift++] = key->len;
  1057. session->argv[cur_shift] = value->str;
  1058. session->argv_lens[cur_shift++] = value->len;
  1059. g_string_free(key, FALSE);
  1060. g_string_free(value, FALSE);
  1061. if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
  1062. 3,
  1063. (const gchar **) &session->argv[cur_shift - 3],
  1064. &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
  1065. return FALSE;
  1066. }
  1067. }
  1068. else if (cmd->cmd == FUZZY_DUP) {
  1069. /* Ignore */
  1070. }
  1071. else {
  1072. g_assert_not_reached();
  1073. }
  1074. if (io_cmd->is_shingle) {
  1075. if (cmd->cmd == FUZZY_WRITE) {
  1076. klen = strlen(session->backend->redis_object) +
  1077. 64 + 1;
  1078. for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
  1079. guchar *hval;
  1080. /*
  1081. * For each command with shingles we additionally emit 32 commands:
  1082. * SETEX <prefix>_<number>_<value> <expire> <digest>
  1083. */
  1084. /* SETEX */
  1085. key = g_string_sized_new(klen);
  1086. rspamd_printf_gstring(key, "%s_%d_%uL",
  1087. session->backend->redis_object,
  1088. i,
  1089. io_cmd->cmd.shingle.sgl.hashes[i]);
  1090. value = g_string_sized_new(sizeof("4294967296"));
  1091. rspamd_printf_gstring(value, "%d",
  1092. (gint) rspamd_fuzzy_backend_get_expire(bk));
  1093. hval = g_malloc(sizeof(io_cmd->cmd.shingle.basic.digest));
  1094. memcpy(hval, io_cmd->cmd.shingle.basic.digest,
  1095. sizeof(io_cmd->cmd.shingle.basic.digest));
  1096. session->argv[cur_shift] = g_strdup("SETEX");
  1097. session->argv_lens[cur_shift++] = sizeof("SETEX") - 1;
  1098. session->argv[cur_shift] = key->str;
  1099. session->argv_lens[cur_shift++] = key->len;
  1100. session->argv[cur_shift] = value->str;
  1101. session->argv_lens[cur_shift++] = value->len;
  1102. session->argv[cur_shift] = hval;
  1103. session->argv_lens[cur_shift++] = sizeof(io_cmd->cmd.shingle.basic.digest);
  1104. g_string_free(key, FALSE);
  1105. g_string_free(value, FALSE);
  1106. if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
  1107. 4,
  1108. (const gchar **) &session->argv[cur_shift - 4],
  1109. &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
  1110. return FALSE;
  1111. }
  1112. }
  1113. }
  1114. else if (cmd->cmd == FUZZY_DEL) {
  1115. klen = strlen(session->backend->redis_object) +
  1116. 64 + 1;
  1117. for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
  1118. key = g_string_sized_new(klen);
  1119. rspamd_printf_gstring(key, "%s_%d_%uL",
  1120. session->backend->redis_object,
  1121. i,
  1122. io_cmd->cmd.shingle.sgl.hashes[i]);
  1123. session->argv[cur_shift] = g_strdup("DEL");
  1124. session->argv_lens[cur_shift++] = sizeof("DEL") - 1;
  1125. session->argv[cur_shift] = key->str;
  1126. session->argv_lens[cur_shift++] = key->len;
  1127. g_string_free(key, FALSE);
  1128. if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
  1129. 2,
  1130. (const gchar **) &session->argv[cur_shift - 2],
  1131. &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
  1132. return FALSE;
  1133. }
  1134. }
  1135. }
  1136. else if (cmd->cmd == FUZZY_REFRESH) {
  1137. klen = strlen(session->backend->redis_object) +
  1138. 64 + 1;
  1139. for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
  1140. /*
  1141. * For each command with shingles we additionally emit 32 commands:
  1142. * EXPIRE <prefix>_<number>_<value> <expire>
  1143. */
  1144. /* Expire */
  1145. key = g_string_sized_new(klen);
  1146. rspamd_printf_gstring(key, "%s_%d_%uL",
  1147. session->backend->redis_object,
  1148. i,
  1149. io_cmd->cmd.shingle.sgl.hashes[i]);
  1150. value = g_string_sized_new(sizeof("18446744073709551616"));
  1151. rspamd_printf_gstring(value, "%d",
  1152. (gint) rspamd_fuzzy_backend_get_expire(bk));
  1153. session->argv[cur_shift] = g_strdup("EXPIRE");
  1154. session->argv_lens[cur_shift++] = sizeof("EXPIRE") - 1;
  1155. session->argv[cur_shift] = key->str;
  1156. session->argv_lens[cur_shift++] = key->len;
  1157. session->argv[cur_shift] = value->str;
  1158. session->argv_lens[cur_shift++] = value->len;
  1159. g_string_free(key, FALSE);
  1160. g_string_free(value, FALSE);
  1161. if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
  1162. 3,
  1163. (const gchar **) &session->argv[cur_shift - 3],
  1164. &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
  1165. return FALSE;
  1166. }
  1167. }
  1168. }
  1169. else if (cmd->cmd == FUZZY_DUP) {
  1170. /* Ignore */
  1171. }
  1172. else {
  1173. g_assert_not_reached();
  1174. }
  1175. }
  1176. *shift = cur_shift;
  1177. return TRUE;
  1178. }
  1179. static void
  1180. rspamd_fuzzy_redis_update_callback(redisAsyncContext *c, gpointer r,
  1181. gpointer priv)
  1182. {
  1183. struct rspamd_fuzzy_redis_session *session = priv;
  1184. redisReply *reply = r;
  1185. ev_timer_stop(session->event_loop, &session->timeout);
  1186. if (c->err == 0 && reply != NULL) {
  1187. rspamd_upstream_ok(session->up);
  1188. if (reply->type == REDIS_REPLY_ARRAY) {
  1189. /* TODO: check all replies somehow */
  1190. if (session->callback.cb_update) {
  1191. session->callback.cb_update(TRUE,
  1192. session->nadded,
  1193. session->ndeleted,
  1194. session->nextended,
  1195. session->nignored,
  1196. session->cbdata);
  1197. }
  1198. }
  1199. else {
  1200. if (reply->type == REDIS_REPLY_ERROR) {
  1201. msg_err_redis_session("fuzzy backend redis error: \"%s\"",
  1202. reply->str);
  1203. }
  1204. if (session->callback.cb_update) {
  1205. session->callback.cb_update(FALSE, 0, 0, 0, 0, session->cbdata);
  1206. }
  1207. }
  1208. }
  1209. else {
  1210. if (session->callback.cb_update) {
  1211. session->callback.cb_update(FALSE, 0, 0, 0, 0, session->cbdata);
  1212. }
  1213. if (c->errstr) {
  1214. msg_err_redis_session("error sending update to redis %s: %s",
  1215. rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)),
  1216. c->errstr);
  1217. rspamd_upstream_fail(session->up, FALSE, c->errstr);
  1218. }
  1219. }
  1220. rspamd_fuzzy_redis_session_dtor(session, FALSE);
  1221. }
  1222. void rspamd_fuzzy_backend_update_redis(struct rspamd_fuzzy_backend *bk,
  1223. GArray *updates, const gchar *src,
  1224. rspamd_fuzzy_update_cb cb, void *ud,
  1225. void *subr_ud)
  1226. {
  1227. struct rspamd_fuzzy_backend_redis *backend = subr_ud;
  1228. struct rspamd_fuzzy_redis_session *session;
  1229. struct upstream *up;
  1230. struct upstream_list *ups;
  1231. rspamd_inet_addr_t *addr;
  1232. guint i;
  1233. GString *key;
  1234. struct fuzzy_peer_cmd *io_cmd;
  1235. struct rspamd_fuzzy_cmd *cmd = NULL;
  1236. guint nargs, cur_shift;
  1237. g_assert(backend != NULL);
  1238. ups = rspamd_redis_get_servers(backend, "write_servers");
  1239. if (!ups) {
  1240. if (cb) {
  1241. cb(FALSE, 0, 0, 0, 0, ud);
  1242. }
  1243. return;
  1244. }
  1245. session = g_malloc0(sizeof(*session));
  1246. session->backend = backend;
  1247. REF_RETAIN(session->backend);
  1248. /*
  1249. * For each normal hash addition we do 3 redis commands:
  1250. * HSET <key> F <flag> **OR** HSETNX <key> F <flag> when flag is weak
  1251. * HINCRBY <key> V <weight>
  1252. * EXPIRE <key> <expire>
  1253. * INCR <prefix||fuzzy_count>
  1254. *
  1255. * Where <key> is <prefix> || <digest>
  1256. *
  1257. * For each command with shingles we additionally emit 32 commands:
  1258. * SETEX <prefix>_<number>_<value> <expire> <digest>
  1259. *
  1260. * For each delete command we emit:
  1261. * DEL <key>
  1262. *
  1263. * For each delete command with shingles we emit also 32 commands:
  1264. * DEL <prefix>_<number>_<value>
  1265. * DECR <prefix||fuzzy_count>
  1266. */
  1267. nargs = 4;
  1268. for (i = 0; i < updates->len; i++) {
  1269. io_cmd = &g_array_index(updates, struct fuzzy_peer_cmd, i);
  1270. if (io_cmd->is_shingle) {
  1271. cmd = &io_cmd->cmd.shingle.basic;
  1272. }
  1273. else {
  1274. cmd = &io_cmd->cmd.normal;
  1275. }
  1276. if (cmd->cmd == FUZZY_WRITE) {
  1277. nargs += 17;
  1278. session->nadded++;
  1279. if (io_cmd->is_shingle) {
  1280. nargs += RSPAMD_SHINGLE_SIZE * 4;
  1281. }
  1282. }
  1283. else if (cmd->cmd == FUZZY_DEL) {
  1284. nargs += 4;
  1285. session->ndeleted++;
  1286. if (io_cmd->is_shingle) {
  1287. nargs += RSPAMD_SHINGLE_SIZE * 2;
  1288. }
  1289. }
  1290. else if (cmd->cmd == FUZZY_REFRESH) {
  1291. nargs += 3;
  1292. session->nextended++;
  1293. if (io_cmd->is_shingle) {
  1294. nargs += RSPAMD_SHINGLE_SIZE * 3;
  1295. }
  1296. }
  1297. else {
  1298. session->nignored++;
  1299. }
  1300. }
  1301. /* Now we need to create a new request */
  1302. session->callback.cb_update = cb;
  1303. session->cbdata = ud;
  1304. session->command = RSPAMD_FUZZY_REDIS_COMMAND_UPDATES;
  1305. session->cmd = cmd;
  1306. session->prob = 1.0f;
  1307. session->event_loop = rspamd_fuzzy_backend_event_base(bk);
  1308. /* First of all check digest */
  1309. session->nargs = nargs;
  1310. session->argv = g_malloc0(sizeof(gchar *) * session->nargs);
  1311. session->argv_lens = g_malloc0(sizeof(gsize) * session->nargs);
  1312. up = rspamd_upstream_get(ups,
  1313. RSPAMD_UPSTREAM_MASTER_SLAVE,
  1314. NULL,
  1315. 0);
  1316. session->up = rspamd_upstream_ref(up);
  1317. addr = rspamd_upstream_addr_next(up);
  1318. g_assert(addr != NULL);
  1319. session->ctx = rspamd_redis_pool_connect(backend->pool,
  1320. backend->dbname,
  1321. backend->username, backend->password,
  1322. rspamd_inet_address_to_string(addr),
  1323. rspamd_inet_address_get_port(addr));
  1324. if (session->ctx == NULL) {
  1325. rspamd_upstream_fail(up, TRUE, strerror(errno));
  1326. rspamd_fuzzy_redis_session_dtor(session, TRUE);
  1327. if (cb) {
  1328. cb(FALSE, 0, 0, 0, 0, ud);
  1329. }
  1330. }
  1331. else {
  1332. /* Start with MULTI command */
  1333. session->argv[0] = g_strdup("MULTI");
  1334. session->argv_lens[0] = 5;
  1335. if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
  1336. 1,
  1337. (const gchar **) session->argv,
  1338. session->argv_lens) != REDIS_OK) {
  1339. if (cb) {
  1340. cb(FALSE, 0, 0, 0, 0, ud);
  1341. }
  1342. rspamd_fuzzy_redis_session_dtor(session, TRUE);
  1343. return;
  1344. }
  1345. /* Now split the rest of commands in packs and emit them command by command */
  1346. cur_shift = 1;
  1347. for (i = 0; i < updates->len; i++) {
  1348. io_cmd = &g_array_index(updates, struct fuzzy_peer_cmd, i);
  1349. if (!rspamd_fuzzy_update_append_command(bk, session, io_cmd,
  1350. &cur_shift)) {
  1351. if (cb) {
  1352. cb(FALSE, 0, 0, 0, 0, ud);
  1353. }
  1354. rspamd_fuzzy_redis_session_dtor(session, TRUE);
  1355. return;
  1356. }
  1357. }
  1358. /* Now INCR command for the source */
  1359. key = g_string_new(backend->redis_object);
  1360. g_string_append(key, src);
  1361. session->argv[cur_shift] = g_strdup("INCR");
  1362. session->argv_lens[cur_shift++] = 4;
  1363. session->argv[cur_shift] = key->str;
  1364. session->argv_lens[cur_shift++] = key->len;
  1365. g_string_free(key, FALSE);
  1366. if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
  1367. 2,
  1368. (const gchar **) &session->argv[cur_shift - 2],
  1369. &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
  1370. if (cb) {
  1371. cb(FALSE, 0, 0, 0, 0, ud);
  1372. }
  1373. rspamd_fuzzy_redis_session_dtor(session, TRUE);
  1374. return;
  1375. }
  1376. /* Finally we call EXEC with a specific callback */
  1377. session->argv[cur_shift] = g_strdup("EXEC");
  1378. session->argv_lens[cur_shift] = 4;
  1379. if (redisAsyncCommandArgv(session->ctx,
  1380. rspamd_fuzzy_redis_update_callback, session,
  1381. 1,
  1382. (const gchar **) &session->argv[cur_shift],
  1383. &session->argv_lens[cur_shift]) != REDIS_OK) {
  1384. if (cb) {
  1385. cb(FALSE, 0, 0, 0, 0, ud);
  1386. }
  1387. rspamd_fuzzy_redis_session_dtor(session, TRUE);
  1388. return;
  1389. }
  1390. else {
  1391. /* Add timeout */
  1392. session->timeout.data = session;
  1393. ev_now_update_if_cheap((struct ev_loop *) session->event_loop);
  1394. ev_timer_init(&session->timeout,
  1395. rspamd_fuzzy_redis_timeout,
  1396. session->backend->timeout, 0.0);
  1397. ev_timer_start(session->event_loop, &session->timeout);
  1398. }
  1399. }
  1400. }
  1401. void rspamd_fuzzy_backend_close_redis(struct rspamd_fuzzy_backend *bk,
  1402. void *subr_ud)
  1403. {
  1404. struct rspamd_fuzzy_backend_redis *backend = subr_ud;
  1405. g_assert(backend != NULL);
  1406. /*
  1407. * XXX: we leak lua registry element there to avoid crashing
  1408. * due to chicken-egg problem between lua state termination and
  1409. * redis pool termination.
  1410. * Here, we assume that redis pool is destroyed AFTER lua_state,
  1411. * so all connections pending will release references but due to
  1412. * `terminated` hack they will not try to access Lua stuff
  1413. * This is enabled merely if we have connections pending (e.g. refcount > 1)
  1414. */
  1415. if (backend->ref.refcount > 1) {
  1416. backend->terminated = true;
  1417. }
  1418. REF_RELEASE(backend);
  1419. }