You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

fuzzy_backend_redis.c 42KB


  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "ref.h"
  18. #include "fuzzy_backend.h"
  19. #include "fuzzy_backend_redis.h"
  20. #include "redis_pool.h"
  21. #include "cryptobox.h"
  22. #include "str_util.h"
  23. #include "upstream.h"
  24. #include "contrib/hiredis/hiredis.h"
  25. #include "contrib/hiredis/async.h"
  26. #include "lua/lua_common.h"
  27. #define REDIS_DEFAULT_PORT 6379
  28. #define REDIS_DEFAULT_OBJECT "fuzzy"
  29. #define REDIS_DEFAULT_TIMEOUT 2.0
  30. #define msg_err_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
  31. "fuzzy_redis", session->backend->id, \
  32. G_STRFUNC, \
  33. __VA_ARGS__)
  34. #define msg_warn_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
  35. "fuzzy_redis", session->backend->id, \
  36. G_STRFUNC, \
  37. __VA_ARGS__)
  38. #define msg_info_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
  39. "fuzzy_redis", session->backend->id, \
  40. G_STRFUNC, \
  41. __VA_ARGS__)
  42. #define msg_debug_redis_session(...) rspamd_conditional_debug_fast (NULL, NULL, \
  43. rspamd_fuzzy_redis_log_id, "fuzzy_redis", session->backend->id, \
  44. G_STRFUNC, \
  45. __VA_ARGS__)
  46. INIT_LOG_MODULE(fuzzy_redis)
  47. struct rspamd_fuzzy_backend_redis {
  48. lua_State *L;
  49. const gchar *redis_object;
  50. const gchar *password;
  51. const gchar *dbname;
  52. gchar *id;
  53. struct rspamd_redis_pool *pool;
  54. gdouble timeout;
  55. gint conf_ref;
  56. ref_entry_t ref;
  57. };
  58. enum rspamd_fuzzy_redis_command {
  59. RSPAMD_FUZZY_REDIS_COMMAND_COUNT,
  60. RSPAMD_FUZZY_REDIS_COMMAND_VERSION,
  61. RSPAMD_FUZZY_REDIS_COMMAND_UPDATES,
  62. RSPAMD_FUZZY_REDIS_COMMAND_CHECK
  63. };
  64. struct rspamd_fuzzy_redis_session {
  65. struct rspamd_fuzzy_backend_redis *backend;
  66. redisAsyncContext *ctx;
  67. ev_timer timeout;
  68. const struct rspamd_fuzzy_cmd *cmd;
  69. struct ev_loop *event_loop;
  70. float prob;
  71. gboolean shingles_checked;
  72. enum rspamd_fuzzy_redis_command command;
  73. guint nargs;
  74. guint nadded;
  75. guint ndeleted;
  76. guint nextended;
  77. guint nignored;
  78. union {
  79. rspamd_fuzzy_check_cb cb_check;
  80. rspamd_fuzzy_update_cb cb_update;
  81. rspamd_fuzzy_version_cb cb_version;
  82. rspamd_fuzzy_count_cb cb_count;
  83. } callback;
  84. void *cbdata;
  85. gchar **argv;
  86. gsize *argv_lens;
  87. struct upstream *up;
  88. guchar found_digest[rspamd_cryptobox_HASHBYTES];
  89. };
  90. static inline struct upstream_list *
  91. rspamd_redis_get_servers (struct rspamd_fuzzy_backend_redis *ctx,
  92. const gchar *what)
  93. {
  94. lua_State *L = ctx->L;
  95. struct upstream_list *res;
  96. lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->conf_ref);
  97. lua_pushstring (L, what);
  98. lua_gettable (L, -2);
  99. res = *((struct upstream_list**)lua_touserdata (L, -1));
  100. lua_settop (L, 0);
  101. return res;
  102. }
  103. static inline void
  104. rspamd_fuzzy_redis_session_free_args (struct rspamd_fuzzy_redis_session *session)
  105. {
  106. guint i;
  107. if (session->argv) {
  108. for (i = 0; i < session->nargs; i ++) {
  109. g_free (session->argv[i]);
  110. }
  111. g_free (session->argv);
  112. g_free (session->argv_lens);
  113. }
  114. }
  115. static void
  116. rspamd_fuzzy_redis_session_dtor (struct rspamd_fuzzy_redis_session *session,
  117. gboolean is_fatal)
  118. {
  119. redisAsyncContext *ac;
  120. if (session->ctx) {
  121. ac = session->ctx;
  122. session->ctx = NULL;
  123. rspamd_redis_pool_release_connection (session->backend->pool,
  124. ac,
  125. is_fatal ? RSPAMD_REDIS_RELEASE_FATAL : RSPAMD_REDIS_RELEASE_DEFAULT);
  126. }
  127. ev_timer_stop (session->event_loop, &session->timeout);
  128. rspamd_fuzzy_redis_session_free_args (session);
  129. REF_RELEASE (session->backend);
  130. g_free (session);
  131. }
  132. static void
  133. rspamd_fuzzy_backend_redis_dtor (struct rspamd_fuzzy_backend_redis *backend)
  134. {
  135. lua_State *L = backend->L;
  136. if (backend->conf_ref) {
  137. luaL_unref (L, LUA_REGISTRYINDEX, backend->conf_ref);
  138. }
  139. if (backend->id) {
  140. g_free (backend->id);
  141. }
  142. g_free (backend);
  143. }
  144. void*
  145. rspamd_fuzzy_backend_init_redis (struct rspamd_fuzzy_backend *bk,
  146. const ucl_object_t *obj, struct rspamd_config *cfg, GError **err)
  147. {
  148. struct rspamd_fuzzy_backend_redis *backend;
  149. const ucl_object_t *elt;
  150. gboolean ret = FALSE;
  151. guchar id_hash[rspamd_cryptobox_HASHBYTES];
  152. rspamd_cryptobox_hash_state_t st;
  153. lua_State *L = (lua_State *)cfg->lua_state;
  154. gint conf_ref = -1;
  155. backend = g_malloc0 (sizeof (*backend));
  156. backend->timeout = REDIS_DEFAULT_TIMEOUT;
  157. backend->redis_object = REDIS_DEFAULT_OBJECT;
  158. backend->L = L;
  159. ret = rspamd_lua_try_load_redis (L, obj, cfg, &conf_ref);
  160. /* Now try global redis settings */
  161. if (!ret) {
  162. elt = ucl_object_lookup (cfg->rcl_obj, "redis");
  163. if (elt) {
  164. const ucl_object_t *specific_obj;
  165. specific_obj = ucl_object_lookup_any (elt, "fuzzy", "fuzzy_storage",
  166. NULL);
  167. if (specific_obj) {
  168. ret = rspamd_lua_try_load_redis (L, specific_obj, cfg, &conf_ref);
  169. }
  170. else {
  171. ret = rspamd_lua_try_load_redis (L, elt, cfg, &conf_ref);
  172. }
  173. }
  174. }
  175. if (!ret) {
  176. msg_err_config ("cannot init redis backend for fuzzy storage");
  177. g_free (backend);
  178. return NULL;
  179. }
  180. elt = ucl_object_lookup (obj, "prefix");
  181. if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
  182. backend->redis_object = REDIS_DEFAULT_OBJECT;
  183. }
  184. else {
  185. backend->redis_object = ucl_object_tostring (elt);
  186. }
  187. backend->conf_ref = conf_ref;
  188. /* Check some common table values */
  189. lua_rawgeti (L, LUA_REGISTRYINDEX, conf_ref);
  190. lua_pushstring (L, "timeout");
  191. lua_gettable (L, -2);
  192. if (lua_type (L, -1) == LUA_TNUMBER) {
  193. backend->timeout = lua_tonumber (L, -1);
  194. }
  195. lua_pop (L, 1);
  196. lua_pushstring (L, "db");
  197. lua_gettable (L, -2);
  198. if (lua_type (L, -1) == LUA_TSTRING) {
  199. backend->dbname = rspamd_mempool_strdup (cfg->cfg_pool,
  200. lua_tostring (L, -1));
  201. }
  202. lua_pop (L, 1);
  203. lua_pushstring (L, "password");
  204. lua_gettable (L, -2);
  205. if (lua_type (L, -1) == LUA_TSTRING) {
  206. backend->password = rspamd_mempool_strdup (cfg->cfg_pool,
  207. lua_tostring (L, -1));
  208. }
  209. lua_pop (L, 1);
  210. lua_settop (L, 0);
  211. REF_INIT_RETAIN (backend, rspamd_fuzzy_backend_redis_dtor);
  212. backend->pool = cfg->redis_pool;
  213. rspamd_cryptobox_hash_init (&st, NULL, 0);
  214. rspamd_cryptobox_hash_update (&st, backend->redis_object,
  215. strlen (backend->redis_object));
  216. if (backend->dbname) {
  217. rspamd_cryptobox_hash_update (&st, backend->dbname,
  218. strlen (backend->dbname));
  219. }
  220. if (backend->password) {
  221. rspamd_cryptobox_hash_update (&st, backend->password,
  222. strlen (backend->password));
  223. }
  224. rspamd_cryptobox_hash_final (&st, id_hash);
  225. backend->id = rspamd_encode_base32 (id_hash, sizeof (id_hash));
  226. return backend;
  227. }
  228. static void
  229. rspamd_fuzzy_redis_timeout (EV_P_ ev_timer *w, int revents)
  230. {
  231. struct rspamd_fuzzy_redis_session *session =
  232. (struct rspamd_fuzzy_redis_session *)w->data;
  233. redisAsyncContext *ac;
  234. static char errstr[128];
  235. if (session->ctx) {
  236. ac = session->ctx;
  237. session->ctx = NULL;
  238. ac->err = REDIS_ERR_IO;
  239. /* Should be safe as in hiredis it is char[128] */
  240. rspamd_snprintf (errstr, sizeof (errstr), "%s", strerror (ETIMEDOUT));
  241. ac->errstr = errstr;
  242. /* This will cause session closing */
  243. rspamd_redis_pool_release_connection (session->backend->pool,
  244. ac, RSPAMD_REDIS_RELEASE_FATAL);
  245. }
  246. }
  247. static void rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r,
  248. gpointer priv);
  249. struct _rspamd_fuzzy_shingles_helper {
  250. guchar digest[64];
  251. guint found;
  252. };
  253. static gint
  254. rspamd_fuzzy_backend_redis_shingles_cmp (const void *a, const void *b)
  255. {
  256. const struct _rspamd_fuzzy_shingles_helper *sha = a,
  257. *shb = b;
  258. return memcmp (sha->digest, shb->digest, sizeof (sha->digest));
  259. }
  260. static void
  261. rspamd_fuzzy_redis_shingles_callback (redisAsyncContext *c, gpointer r,
  262. gpointer priv)
  263. {
  264. struct rspamd_fuzzy_redis_session *session = priv;
  265. redisReply *reply = r, *cur;
  266. struct rspamd_fuzzy_reply rep;
  267. GString *key;
  268. struct _rspamd_fuzzy_shingles_helper *shingles, *prev = NULL, *sel = NULL;
  269. guint i, found = 0, max_found = 0, cur_found = 0;
  270. ev_timer_stop (session->event_loop, &session->timeout);
  271. memset (&rep, 0, sizeof (rep));
  272. if (c->err == 0) {
  273. rspamd_upstream_ok (session->up);
  274. if (reply->type == REDIS_REPLY_ARRAY &&
  275. reply->elements == RSPAMD_SHINGLE_SIZE) {
  276. shingles = g_alloca (sizeof (struct _rspamd_fuzzy_shingles_helper) *
  277. RSPAMD_SHINGLE_SIZE);
  278. for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
  279. cur = reply->element[i];
  280. if (cur->type == REDIS_REPLY_STRING) {
  281. shingles[i].found = 1;
  282. memcpy (shingles[i].digest, cur->str, MIN (64, cur->len));
  283. found ++;
  284. }
  285. else {
  286. memset (shingles[i].digest, 0, sizeof (shingles[i].digest));
  287. shingles[i].found = 0;
  288. }
  289. }
  290. if (found > RSPAMD_SHINGLE_SIZE / 2) {
  291. /* Now sort to find the most frequent element */
  292. qsort (shingles, RSPAMD_SHINGLE_SIZE,
  293. sizeof (struct _rspamd_fuzzy_shingles_helper),
  294. rspamd_fuzzy_backend_redis_shingles_cmp);
  295. prev = &shingles[0];
  296. for (i = 1; i < RSPAMD_SHINGLE_SIZE; i ++) {
  297. if (!shingles[i].found) {
  298. continue;
  299. }
  300. if (memcmp (shingles[i].digest, prev->digest, 64) == 0) {
  301. cur_found ++;
  302. if (cur_found > max_found) {
  303. max_found = cur_found;
  304. sel = &shingles[i];
  305. }
  306. }
  307. else {
  308. cur_found = 1;
  309. prev = &shingles[i];
  310. }
  311. }
  312. if (max_found > RSPAMD_SHINGLE_SIZE / 2) {
  313. session->prob = ((float)max_found) / RSPAMD_SHINGLE_SIZE;
  314. rep.v1.prob = session->prob;
  315. g_assert (sel != NULL);
  316. /* Prepare new check command */
  317. rspamd_fuzzy_redis_session_free_args (session);
  318. session->nargs = 5;
  319. session->argv = g_malloc (sizeof (gchar *) * session->nargs);
  320. session->argv_lens = g_malloc (sizeof (gsize) * session->nargs);
  321. key = g_string_new (session->backend->redis_object);
  322. g_string_append_len (key, sel->digest, sizeof (sel->digest));
  323. session->argv[0] = g_strdup ("HMGET");
  324. session->argv_lens[0] = 5;
  325. session->argv[1] = key->str;
  326. session->argv_lens[1] = key->len;
  327. session->argv[2] = g_strdup ("V");
  328. session->argv_lens[2] = 1;
  329. session->argv[3] = g_strdup ("F");
  330. session->argv_lens[3] = 1;
  331. session->argv[4] = g_strdup ("C");
  332. session->argv_lens[4] = 1;
  333. g_string_free (key, FALSE); /* Do not free underlying array */
  334. memcpy (session->found_digest, sel->digest,
  335. sizeof (session->cmd->digest));
  336. g_assert (session->ctx != NULL);
  337. if (redisAsyncCommandArgv (session->ctx,
  338. rspamd_fuzzy_redis_check_callback,
  339. session, session->nargs,
  340. (const gchar **)session->argv,
  341. session->argv_lens) != REDIS_OK) {
  342. if (session->callback.cb_check) {
  343. memset (&rep, 0, sizeof (rep));
  344. session->callback.cb_check (&rep, session->cbdata);
  345. }
  346. rspamd_fuzzy_redis_session_dtor (session, TRUE);
  347. }
  348. else {
  349. /* Add timeout */
  350. session->timeout.data = session;
  351. ev_timer_init (&session->timeout,
  352. rspamd_fuzzy_redis_timeout,
  353. session->backend->timeout, 0.0);
  354. ev_timer_start (session->event_loop, &session->timeout);
  355. }
  356. return;
  357. }
  358. }
  359. }
  360. if (session->callback.cb_check) {
  361. session->callback.cb_check (&rep, session->cbdata);
  362. }
  363. }
  364. else {
  365. if (session->callback.cb_check) {
  366. session->callback.cb_check (&rep, session->cbdata);
  367. }
  368. if (c->errstr) {
  369. msg_err_redis_session ("error getting shingles: %s", c->errstr);
  370. }
  371. rspamd_upstream_fail (session->up, FALSE, strerror (errno));
  372. }
  373. rspamd_fuzzy_redis_session_dtor (session, FALSE);
  374. }
  375. static void
  376. rspamd_fuzzy_backend_check_shingles (struct rspamd_fuzzy_redis_session *session)
  377. {
  378. struct rspamd_fuzzy_reply rep;
  379. const struct rspamd_fuzzy_shingle_cmd *shcmd;
  380. GString *key;
  381. guint i, init_len;
  382. rspamd_fuzzy_redis_session_free_args (session);
  383. /* First of all check digest */
  384. session->nargs = RSPAMD_SHINGLE_SIZE + 1;
  385. session->argv = g_malloc (sizeof (gchar *) * session->nargs);
  386. session->argv_lens = g_malloc (sizeof (gsize) * session->nargs);
  387. shcmd = (const struct rspamd_fuzzy_shingle_cmd *)session->cmd;
  388. session->argv[0] = g_strdup ("MGET");
  389. session->argv_lens[0] = 4;
  390. init_len = strlen (session->backend->redis_object);
  391. for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
  392. key = g_string_sized_new (init_len + 2 + 2 + sizeof ("18446744073709551616"));
  393. rspamd_printf_gstring (key, "%s_%d_%uL", session->backend->redis_object,
  394. i, shcmd->sgl.hashes[i]);
  395. session->argv[i + 1] = key->str;
  396. session->argv_lens[i + 1] = key->len;
  397. g_string_free (key, FALSE); /* Do not free underlying array */
  398. }
  399. session->shingles_checked = TRUE;
  400. g_assert (session->ctx != NULL);
  401. if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_shingles_callback,
  402. session, session->nargs,
  403. (const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
  404. msg_err ("cannot execute redis command: %s", session->ctx->errstr);
  405. if (session->callback.cb_check) {
  406. memset (&rep, 0, sizeof (rep));
  407. session->callback.cb_check (&rep, session->cbdata);
  408. }
  409. rspamd_fuzzy_redis_session_dtor (session, TRUE);
  410. }
  411. else {
  412. /* Add timeout */
  413. session->timeout.data = session;
  414. ev_timer_init (&session->timeout,
  415. rspamd_fuzzy_redis_timeout,
  416. session->backend->timeout, 0.0);
  417. ev_timer_start (session->event_loop, &session->timeout);
  418. }
  419. }
  420. static void
  421. rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r,
  422. gpointer priv)
  423. {
  424. struct rspamd_fuzzy_redis_session *session = priv;
  425. redisReply *reply = r, *cur;
  426. struct rspamd_fuzzy_reply rep;
  427. gulong value;
  428. guint found_elts = 0;
  429. ev_timer_stop (session->event_loop, &session->timeout);
  430. memset (&rep, 0, sizeof (rep));
  431. if (c->err == 0) {
  432. rspamd_upstream_ok (session->up);
  433. if (reply->type == REDIS_REPLY_ARRAY && reply->elements >= 2) {
  434. cur = reply->element[0];
  435. if (cur->type == REDIS_REPLY_STRING) {
  436. value = strtoul (cur->str, NULL, 10);
  437. rep.v1.value = value;
  438. found_elts ++;
  439. }
  440. cur = reply->element[1];
  441. if (cur->type == REDIS_REPLY_STRING) {
  442. value = strtoul (cur->str, NULL, 10);
  443. rep.v1.flag = value;
  444. found_elts ++;
  445. }
  446. if (found_elts >= 2) {
  447. rep.v1.prob = session->prob;
  448. memcpy (rep.digest, session->found_digest, sizeof (rep.digest));
  449. }
  450. rep.ts = 0;
  451. if (reply->elements > 2) {
  452. cur = reply->element[2];
  453. if (cur->type == REDIS_REPLY_STRING) {
  454. rep.ts = strtoul (cur->str, NULL, 10);
  455. }
  456. }
  457. }
  458. if (found_elts != 2) {
  459. if (session->cmd->shingles_count > 0 && !session->shingles_checked) {
  460. /* We also need to check all shingles here */
  461. rspamd_fuzzy_backend_check_shingles (session);
  462. /* Do not free session */
  463. return;
  464. }
  465. else {
  466. if (session->callback.cb_check) {
  467. session->callback.cb_check (&rep, session->cbdata);
  468. }
  469. }
  470. }
  471. else {
  472. if (session->callback.cb_check) {
  473. session->callback.cb_check (&rep, session->cbdata);
  474. }
  475. }
  476. }
  477. else {
  478. if (session->callback.cb_check) {
  479. session->callback.cb_check (&rep, session->cbdata);
  480. }
  481. if (c->errstr) {
  482. msg_err_redis_session ("error getting hashes: %s", c->errstr);
  483. }
  484. rspamd_upstream_fail (session->up, FALSE, strerror (errno));
  485. }
  486. rspamd_fuzzy_redis_session_dtor (session, FALSE);
  487. }
  488. void
  489. rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk,
  490. const struct rspamd_fuzzy_cmd *cmd,
  491. rspamd_fuzzy_check_cb cb, void *ud,
  492. void *subr_ud)
  493. {
  494. struct rspamd_fuzzy_backend_redis *backend = subr_ud;
  495. struct rspamd_fuzzy_redis_session *session;
  496. struct upstream *up;
  497. struct upstream_list *ups;
  498. rspamd_inet_addr_t *addr;
  499. struct rspamd_fuzzy_reply rep;
  500. GString *key;
  501. g_assert (backend != NULL);
  502. session = g_malloc0 (sizeof (*session));
  503. session->backend = backend;
  504. REF_RETAIN (session->backend);
  505. session->callback.cb_check = cb;
  506. session->cbdata = ud;
  507. session->command = RSPAMD_FUZZY_REDIS_COMMAND_CHECK;
  508. session->cmd = cmd;
  509. session->prob = 1.0;
  510. memcpy (rep.digest, session->cmd->digest, sizeof (rep.digest));
  511. memcpy (session->found_digest, session->cmd->digest, sizeof (rep.digest));
  512. session->event_loop = rspamd_fuzzy_backend_event_base (bk);
  513. /* First of all check digest */
  514. session->nargs = 5;
  515. session->argv = g_malloc (sizeof (gchar *) * session->nargs);
  516. session->argv_lens = g_malloc (sizeof (gsize) * session->nargs);
  517. key = g_string_new (backend->redis_object);
  518. g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
  519. session->argv[0] = g_strdup ("HMGET");
  520. session->argv_lens[0] = 5;
  521. session->argv[1] = key->str;
  522. session->argv_lens[1] = key->len;
  523. session->argv[2] = g_strdup ("V");
  524. session->argv_lens[2] = 1;
  525. session->argv[3] = g_strdup ("F");
  526. session->argv_lens[3] = 1;
  527. session->argv[4] = g_strdup ("C");
  528. session->argv_lens[4] = 1;
  529. g_string_free (key, FALSE); /* Do not free underlying array */
  530. ups = rspamd_redis_get_servers (backend, "read_servers");
  531. up = rspamd_upstream_get (ups,
  532. RSPAMD_UPSTREAM_ROUND_ROBIN,
  533. NULL,
  534. 0);
  535. session->up = up;
  536. addr = rspamd_upstream_addr_next (up);
  537. g_assert (addr != NULL);
  538. session->ctx = rspamd_redis_pool_connect (backend->pool,
  539. backend->dbname, backend->password,
  540. rspamd_inet_address_to_string (addr),
  541. rspamd_inet_address_get_port (addr));
  542. if (session->ctx == NULL) {
  543. rspamd_upstream_fail (up, TRUE, strerror (errno));
  544. rspamd_fuzzy_redis_session_dtor (session, TRUE);
  545. if (cb) {
  546. memset (&rep, 0, sizeof (rep));
  547. cb (&rep, ud);
  548. }
  549. }
  550. else {
  551. if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_check_callback,
  552. session, session->nargs,
  553. (const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
  554. rspamd_fuzzy_redis_session_dtor (session, TRUE);
  555. if (cb) {
  556. memset (&rep, 0, sizeof (rep));
  557. cb (&rep, ud);
  558. }
  559. }
  560. else {
  561. /* Add timeout */
  562. session->timeout.data = session;
  563. ev_timer_init (&session->timeout,
  564. rspamd_fuzzy_redis_timeout,
  565. session->backend->timeout, 0.0);
  566. ev_timer_start (session->event_loop, &session->timeout);
  567. }
  568. }
  569. }
  570. static void
  571. rspamd_fuzzy_redis_count_callback (redisAsyncContext *c, gpointer r,
  572. gpointer priv)
  573. {
  574. struct rspamd_fuzzy_redis_session *session = priv;
  575. redisReply *reply = r;
  576. gulong nelts;
  577. ev_timer_stop (session->event_loop, &session->timeout);
  578. if (c->err == 0) {
  579. rspamd_upstream_ok (session->up);
  580. if (reply->type == REDIS_REPLY_INTEGER) {
  581. if (session->callback.cb_count) {
  582. session->callback.cb_count (reply->integer, session->cbdata);
  583. }
  584. }
  585. else if (reply->type == REDIS_REPLY_STRING) {
  586. nelts = strtoul (reply->str, NULL, 10);
  587. if (session->callback.cb_count) {
  588. session->callback.cb_count (nelts, session->cbdata);
  589. }
  590. }
  591. else {
  592. if (session->callback.cb_count) {
  593. session->callback.cb_count (0, session->cbdata);
  594. }
  595. }
  596. }
  597. else {
  598. if (session->callback.cb_count) {
  599. session->callback.cb_count (0, session->cbdata);
  600. }
  601. if (c->errstr) {
  602. msg_err_redis_session ("error getting count: %s", c->errstr);
  603. }
  604. rspamd_upstream_fail (session->up, FALSE, strerror (errno));
  605. }
  606. rspamd_fuzzy_redis_session_dtor (session, FALSE);
  607. }
  608. void
  609. rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk,
  610. rspamd_fuzzy_count_cb cb, void *ud,
  611. void *subr_ud)
  612. {
  613. struct rspamd_fuzzy_backend_redis *backend = subr_ud;
  614. struct rspamd_fuzzy_redis_session *session;
  615. struct upstream *up;
  616. struct upstream_list *ups;
  617. rspamd_inet_addr_t *addr;
  618. GString *key;
  619. g_assert (backend != NULL);
  620. session = g_malloc0 (sizeof (*session));
  621. session->backend = backend;
  622. REF_RETAIN (session->backend);
  623. session->callback.cb_count = cb;
  624. session->cbdata = ud;
  625. session->command = RSPAMD_FUZZY_REDIS_COMMAND_COUNT;
  626. session->event_loop = rspamd_fuzzy_backend_event_base (bk);
  627. session->nargs = 2;
  628. session->argv = g_malloc (sizeof (gchar *) * 2);
  629. session->argv_lens = g_malloc (sizeof (gsize) * 2);
  630. key = g_string_new (backend->redis_object);
  631. g_string_append (key, "_count");
  632. session->argv[0] = g_strdup ("GET");
  633. session->argv_lens[0] = 3;
  634. session->argv[1] = key->str;
  635. session->argv_lens[1] = key->len;
  636. g_string_free (key, FALSE); /* Do not free underlying array */
  637. ups = rspamd_redis_get_servers (backend, "read_servers");
  638. up = rspamd_upstream_get (ups,
  639. RSPAMD_UPSTREAM_ROUND_ROBIN,
  640. NULL,
  641. 0);
  642. session->up = up;
  643. addr = rspamd_upstream_addr_next (up);
  644. g_assert (addr != NULL);
  645. session->ctx = rspamd_redis_pool_connect (backend->pool,
  646. backend->dbname, backend->password,
  647. rspamd_inet_address_to_string (addr),
  648. rspamd_inet_address_get_port (addr));
  649. if (session->ctx == NULL) {
  650. rspamd_upstream_fail (up, TRUE, strerror (errno));
  651. rspamd_fuzzy_redis_session_dtor (session, TRUE);
  652. if (cb) {
  653. cb (0, ud);
  654. }
  655. }
  656. else {
  657. if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_count_callback,
  658. session, session->nargs,
  659. (const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
  660. rspamd_fuzzy_redis_session_dtor (session, TRUE);
  661. if (cb) {
  662. cb (0, ud);
  663. }
  664. }
  665. else {
  666. /* Add timeout */
  667. session->timeout.data = session;
  668. ev_timer_init (&session->timeout,
  669. rspamd_fuzzy_redis_timeout,
  670. session->backend->timeout, 0.0);
  671. ev_timer_start (session->event_loop, &session->timeout);
  672. }
  673. }
  674. }
  675. static void
  676. rspamd_fuzzy_redis_version_callback (redisAsyncContext *c, gpointer r,
  677. gpointer priv)
  678. {
  679. struct rspamd_fuzzy_redis_session *session = priv;
  680. redisReply *reply = r;
  681. gulong nelts;
  682. ev_timer_stop (session->event_loop, &session->timeout);
  683. if (c->err == 0) {
  684. rspamd_upstream_ok (session->up);
  685. if (reply->type == REDIS_REPLY_INTEGER) {
  686. if (session->callback.cb_version) {
  687. session->callback.cb_version (reply->integer, session->cbdata);
  688. }
  689. }
  690. else if (reply->type == REDIS_REPLY_STRING) {
  691. nelts = strtoul (reply->str, NULL, 10);
  692. if (session->callback.cb_version) {
  693. session->callback.cb_version (nelts, session->cbdata);
  694. }
  695. }
  696. else {
  697. if (session->callback.cb_version) {
  698. session->callback.cb_version (0, session->cbdata);
  699. }
  700. }
  701. }
  702. else {
  703. if (session->callback.cb_version) {
  704. session->callback.cb_version (0, session->cbdata);
  705. }
  706. if (c->errstr) {
  707. msg_err_redis_session ("error getting version: %s", c->errstr);
  708. }
  709. rspamd_upstream_fail (session->up, FALSE, strerror (errno));
  710. }
  711. rspamd_fuzzy_redis_session_dtor (session, FALSE);
  712. }
  713. void
  714. rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk,
  715. const gchar *src,
  716. rspamd_fuzzy_version_cb cb, void *ud,
  717. void *subr_ud)
  718. {
  719. struct rspamd_fuzzy_backend_redis *backend = subr_ud;
  720. struct rspamd_fuzzy_redis_session *session;
  721. struct upstream *up;
  722. struct upstream_list *ups;
  723. rspamd_inet_addr_t *addr;
  724. GString *key;
  725. g_assert (backend != NULL);
  726. session = g_malloc0 (sizeof (*session));
  727. session->backend = backend;
  728. REF_RETAIN (session->backend);
  729. session->callback.cb_version = cb;
  730. session->cbdata = ud;
  731. session->command = RSPAMD_FUZZY_REDIS_COMMAND_VERSION;
  732. session->event_loop = rspamd_fuzzy_backend_event_base (bk);
  733. session->nargs = 2;
  734. session->argv = g_malloc (sizeof (gchar *) * 2);
  735. session->argv_lens = g_malloc (sizeof (gsize) * 2);
  736. key = g_string_new (backend->redis_object);
  737. g_string_append (key, src);
  738. session->argv[0] = g_strdup ("GET");
  739. session->argv_lens[0] = 3;
  740. session->argv[1] = key->str;
  741. session->argv_lens[1] = key->len;
  742. g_string_free (key, FALSE); /* Do not free underlying array */
  743. ups = rspamd_redis_get_servers (backend, "read_servers");
  744. up = rspamd_upstream_get (ups,
  745. RSPAMD_UPSTREAM_ROUND_ROBIN,
  746. NULL,
  747. 0);
  748. session->up = up;
  749. addr = rspamd_upstream_addr_next (up);
  750. g_assert (addr != NULL);
  751. session->ctx = rspamd_redis_pool_connect (backend->pool,
  752. backend->dbname, backend->password,
  753. rspamd_inet_address_to_string (addr),
  754. rspamd_inet_address_get_port (addr));
  755. if (session->ctx == NULL) {
  756. rspamd_upstream_fail (up, FALSE, strerror (errno));
  757. rspamd_fuzzy_redis_session_dtor (session, TRUE);
  758. if (cb) {
  759. cb (0, ud);
  760. }
  761. }
  762. else {
  763. if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_version_callback,
  764. session, session->nargs,
  765. (const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
  766. rspamd_fuzzy_redis_session_dtor (session, TRUE);
  767. if (cb) {
  768. cb (0, ud);
  769. }
  770. }
  771. else {
  772. /* Add timeout */
  773. session->timeout.data = session;
  774. ev_timer_init (&session->timeout,
  775. rspamd_fuzzy_redis_timeout,
  776. session->backend->timeout, 0.0);
  777. ev_timer_start (session->event_loop, &session->timeout);
  778. }
  779. }
  780. }
  781. const gchar*
  782. rspamd_fuzzy_backend_id_redis (struct rspamd_fuzzy_backend *bk,
  783. void *subr_ud)
  784. {
  785. struct rspamd_fuzzy_backend_redis *backend = subr_ud;
  786. g_assert (backend != NULL);
  787. return backend->id;
  788. }
  789. void
  790. rspamd_fuzzy_backend_expire_redis (struct rspamd_fuzzy_backend *bk,
  791. void *subr_ud)
  792. {
  793. struct rspamd_fuzzy_backend_redis *backend = subr_ud;
  794. g_assert (backend != NULL);
  795. }
  796. static gboolean
  797. rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk,
  798. struct rspamd_fuzzy_redis_session *session,
  799. struct fuzzy_peer_cmd *io_cmd, guint *shift)
  800. {
  801. GString *key, *value;
  802. guint cur_shift = *shift;
  803. guint i, klen;
  804. struct rspamd_fuzzy_cmd *cmd;
  805. if (io_cmd->is_shingle) {
  806. cmd = &io_cmd->cmd.shingle.basic;
  807. }
  808. else {
  809. cmd = &io_cmd->cmd.normal;
  810. }
  811. if (cmd->cmd == FUZZY_WRITE) {
  812. /*
  813. * For each normal hash addition we do 5 redis commands:
  814. * HSET <key> F <flag>
  815. * HSETNX <key> C <time>
  816. * HINCRBY <key> V <weight>
  817. * EXPIRE <key> <expire>
  818. * Where <key> is <prefix> || <digest>
  819. */
  820. /* HSET */
  821. klen = strlen (session->backend->redis_object) +
  822. sizeof (cmd->digest) + 1;
  823. key = g_string_sized_new (klen);
  824. g_string_append (key, session->backend->redis_object);
  825. g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
  826. value = g_string_sized_new (sizeof ("4294967296"));
  827. rspamd_printf_gstring (value, "%d", cmd->flag);
  828. session->argv[cur_shift] = g_strdup ("HSET");
  829. session->argv_lens[cur_shift++] = sizeof ("HSET") - 1;
  830. session->argv[cur_shift] = key->str;
  831. session->argv_lens[cur_shift++] = key->len;
  832. session->argv[cur_shift] = g_strdup ("F");
  833. session->argv_lens[cur_shift++] = sizeof ("F") - 1;
  834. session->argv[cur_shift] = value->str;
  835. session->argv_lens[cur_shift++] = value->len;
  836. g_string_free (key, FALSE);
  837. g_string_free (value, FALSE);
  838. if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
  839. 4,
  840. (const gchar **)&session->argv[cur_shift - 4],
  841. &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
  842. return FALSE;
  843. }
  844. /* HSETNX */
  845. klen = strlen (session->backend->redis_object) +
  846. sizeof (cmd->digest) + 1;
  847. key = g_string_sized_new (klen);
  848. g_string_append (key, session->backend->redis_object);
  849. g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
  850. value = g_string_sized_new (sizeof ("18446744073709551616"));
  851. rspamd_printf_gstring (value, "%L", (gint64)rspamd_get_calendar_ticks ());
  852. session->argv[cur_shift] = g_strdup ("HSETNX");
  853. session->argv_lens[cur_shift++] = sizeof ("HSETNX") - 1;
  854. session->argv[cur_shift] = key->str;
  855. session->argv_lens[cur_shift++] = key->len;
  856. session->argv[cur_shift] = g_strdup ("C");
  857. session->argv_lens[cur_shift++] = sizeof ("C") - 1;
  858. session->argv[cur_shift] = value->str;
  859. session->argv_lens[cur_shift++] = value->len;
  860. g_string_free (key, FALSE);
  861. g_string_free (value, FALSE);
  862. if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
  863. 4,
  864. (const gchar **)&session->argv[cur_shift - 4],
  865. &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
  866. return FALSE;
  867. }
  868. /* HINCRBY */
  869. key = g_string_sized_new (klen);
  870. g_string_append (key, session->backend->redis_object);
  871. g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
  872. value = g_string_sized_new (sizeof ("4294967296"));
  873. rspamd_printf_gstring (value, "%d", cmd->value);
  874. session->argv[cur_shift] = g_strdup ("HINCRBY");
  875. session->argv_lens[cur_shift++] = sizeof ("HINCRBY") - 1;
  876. session->argv[cur_shift] = key->str;
  877. session->argv_lens[cur_shift++] = key->len;
  878. session->argv[cur_shift] = g_strdup ("V");
  879. session->argv_lens[cur_shift++] = sizeof ("V") - 1;
  880. session->argv[cur_shift] = value->str;
  881. session->argv_lens[cur_shift++] = value->len;
  882. g_string_free (key, FALSE);
  883. g_string_free (value, FALSE);
  884. if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
  885. 4,
  886. (const gchar **)&session->argv[cur_shift - 4],
  887. &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
  888. return FALSE;
  889. }
  890. /* EXPIRE */
  891. key = g_string_sized_new (klen);
  892. g_string_append (key, session->backend->redis_object);
  893. g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
  894. value = g_string_sized_new (sizeof ("4294967296"));
  895. rspamd_printf_gstring (value, "%d",
  896. (gint)rspamd_fuzzy_backend_get_expire (bk));
  897. session->argv[cur_shift] = g_strdup ("EXPIRE");
  898. session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1;
  899. session->argv[cur_shift] = key->str;
  900. session->argv_lens[cur_shift++] = key->len;
  901. session->argv[cur_shift] = value->str;
  902. session->argv_lens[cur_shift++] = value->len;
  903. g_string_free (key, FALSE);
  904. g_string_free (value, FALSE);
  905. if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
  906. 3,
  907. (const gchar **)&session->argv[cur_shift - 3],
  908. &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
  909. return FALSE;
  910. }
  911. /* INCR */
  912. key = g_string_sized_new (klen);
  913. g_string_append (key, session->backend->redis_object);
  914. g_string_append (key, "_count");
  915. session->argv[cur_shift] = g_strdup ("INCR");
  916. session->argv_lens[cur_shift++] = sizeof ("INCR") - 1;
  917. session->argv[cur_shift] = key->str;
  918. session->argv_lens[cur_shift++] = key->len;
  919. g_string_free (key, FALSE);
  920. if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
  921. 2,
  922. (const gchar **)&session->argv[cur_shift - 2],
  923. &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
  924. return FALSE;
  925. }
  926. }
  927. else if (cmd->cmd == FUZZY_DEL) {
  928. /* DEL */
  929. klen = strlen (session->backend->redis_object) +
  930. sizeof (cmd->digest) + 1;
  931. key = g_string_sized_new (klen);
  932. g_string_append (key, session->backend->redis_object);
  933. g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
  934. session->argv[cur_shift] = g_strdup ("DEL");
  935. session->argv_lens[cur_shift++] = sizeof ("DEL") - 1;
  936. session->argv[cur_shift] = key->str;
  937. session->argv_lens[cur_shift++] = key->len;
  938. g_string_free (key, FALSE);
  939. if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
  940. 2,
  941. (const gchar **)&session->argv[cur_shift - 2],
  942. &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
  943. return FALSE;
  944. }
  945. /* DECR */
  946. key = g_string_sized_new (klen);
  947. g_string_append (key, session->backend->redis_object);
  948. g_string_append (key, "_count");
  949. session->argv[cur_shift] = g_strdup ("DECR");
  950. session->argv_lens[cur_shift++] = sizeof ("DECR") - 1;
  951. session->argv[cur_shift] = key->str;
  952. session->argv_lens[cur_shift++] = key->len;
  953. g_string_free (key, FALSE);
  954. if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
  955. 2,
  956. (const gchar **)&session->argv[cur_shift - 2],
  957. &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
  958. return FALSE;
  959. }
  960. }
  961. else if (cmd->cmd == FUZZY_REFRESH) {
  962. /*
  963. * Issue refresh command by just EXPIRE command
  964. * EXPIRE <key> <expire>
  965. * Where <key> is <prefix> || <digest>
  966. */
  967. klen = strlen (session->backend->redis_object) +
  968. sizeof (cmd->digest) + 1;
  969. /* EXPIRE */
  970. key = g_string_sized_new (klen);
  971. g_string_append (key, session->backend->redis_object);
  972. g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
  973. value = g_string_sized_new (sizeof ("4294967296"));
  974. rspamd_printf_gstring (value, "%d",
  975. (gint)rspamd_fuzzy_backend_get_expire (bk));
  976. session->argv[cur_shift] = g_strdup ("EXPIRE");
  977. session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1;
  978. session->argv[cur_shift] = key->str;
  979. session->argv_lens[cur_shift++] = key->len;
  980. session->argv[cur_shift] = value->str;
  981. session->argv_lens[cur_shift++] = value->len;
  982. g_string_free (key, FALSE);
  983. g_string_free (value, FALSE);
  984. if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
  985. 3,
  986. (const gchar **)&session->argv[cur_shift - 3],
  987. &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
  988. return FALSE;
  989. }
  990. }
  991. else if (cmd->cmd == FUZZY_DUP) {
  992. /* Ignore */
  993. }
  994. else {
  995. g_assert_not_reached ();
  996. }
  997. if (io_cmd->is_shingle) {
  998. if (cmd->cmd == FUZZY_WRITE) {
  999. klen = strlen (session->backend->redis_object) +
  1000. 64 + 1;
  1001. for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
  1002. guchar *hval;
  1003. /*
  1004. * For each command with shingles we additionally emit 32 commands:
  1005. * SETEX <prefix>_<number>_<value> <expire> <digest>
  1006. */
  1007. /* SETEX */
  1008. key = g_string_sized_new (klen);
  1009. rspamd_printf_gstring (key, "%s_%d_%uL",
  1010. session->backend->redis_object,
  1011. i,
  1012. io_cmd->cmd.shingle.sgl.hashes[i]);
  1013. value = g_string_sized_new (sizeof ("4294967296"));
  1014. rspamd_printf_gstring (value, "%d",
  1015. (gint)rspamd_fuzzy_backend_get_expire (bk));
  1016. hval = g_malloc (sizeof (io_cmd->cmd.shingle.basic.digest));
  1017. memcpy (hval, io_cmd->cmd.shingle.basic.digest,
  1018. sizeof (io_cmd->cmd.shingle.basic.digest));
  1019. session->argv[cur_shift] = g_strdup ("SETEX");
  1020. session->argv_lens[cur_shift++] = sizeof ("SETEX") - 1;
  1021. session->argv[cur_shift] = key->str;
  1022. session->argv_lens[cur_shift++] = key->len;
  1023. session->argv[cur_shift] = value->str;
  1024. session->argv_lens[cur_shift++] = value->len;
  1025. session->argv[cur_shift] = hval;
  1026. session->argv_lens[cur_shift++] = sizeof (io_cmd->cmd.shingle.basic.digest);
  1027. g_string_free (key, FALSE);
  1028. g_string_free (value, FALSE);
  1029. if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
  1030. 4,
  1031. (const gchar **)&session->argv[cur_shift - 4],
  1032. &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
  1033. return FALSE;
  1034. }
  1035. }
  1036. }
  1037. else if (cmd->cmd == FUZZY_DEL) {
  1038. klen = strlen (session->backend->redis_object) +
  1039. 64 + 1;
  1040. for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
  1041. key = g_string_sized_new (klen);
  1042. rspamd_printf_gstring (key, "%s_%d_%uL",
  1043. session->backend->redis_object,
  1044. i,
  1045. io_cmd->cmd.shingle.sgl.hashes[i]);
  1046. session->argv[cur_shift] = g_strdup ("DEL");
  1047. session->argv_lens[cur_shift++] = sizeof ("DEL") - 1;
  1048. session->argv[cur_shift] = key->str;
  1049. session->argv_lens[cur_shift++] = key->len;
  1050. g_string_free (key, FALSE);
  1051. if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
  1052. 2,
  1053. (const gchar **)&session->argv[cur_shift - 2],
  1054. &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
  1055. return FALSE;
  1056. }
  1057. }
  1058. }
  1059. else if (cmd->cmd == FUZZY_REFRESH) {
  1060. klen = strlen (session->backend->redis_object) +
  1061. 64 + 1;
  1062. for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
  1063. /*
  1064. * For each command with shingles we additionally emit 32 commands:
  1065. * EXPIRE <prefix>_<number>_<value> <expire>
  1066. */
  1067. /* Expire */
  1068. key = g_string_sized_new (klen);
  1069. rspamd_printf_gstring (key, "%s_%d_%uL",
  1070. session->backend->redis_object,
  1071. i,
  1072. io_cmd->cmd.shingle.sgl.hashes[i]);
  1073. value = g_string_sized_new (sizeof ("18446744073709551616"));
  1074. rspamd_printf_gstring (value, "%d",
  1075. (gint)rspamd_fuzzy_backend_get_expire (bk));
  1076. session->argv[cur_shift] = g_strdup ("EXPIRE");
  1077. session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1;
  1078. session->argv[cur_shift] = key->str;
  1079. session->argv_lens[cur_shift++] = key->len;
  1080. session->argv[cur_shift] = value->str;
  1081. session->argv_lens[cur_shift++] = value->len;
  1082. g_string_free (key, FALSE);
  1083. g_string_free (value, FALSE);
  1084. if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
  1085. 3,
  1086. (const gchar **)&session->argv[cur_shift - 3],
  1087. &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
  1088. return FALSE;
  1089. }
  1090. }
  1091. }
  1092. else if (cmd->cmd == FUZZY_DUP) {
  1093. /* Ignore */
  1094. }
  1095. else {
  1096. g_assert_not_reached ();
  1097. }
  1098. }
  1099. *shift = cur_shift;
  1100. return TRUE;
  1101. }
  1102. static void
  1103. rspamd_fuzzy_redis_update_callback (redisAsyncContext *c, gpointer r,
  1104. gpointer priv)
  1105. {
  1106. struct rspamd_fuzzy_redis_session *session = priv;
  1107. redisReply *reply = r;
  1108. ev_timer_stop (session->event_loop, &session->timeout);
  1109. if (c->err == 0) {
  1110. rspamd_upstream_ok (session->up);
  1111. if (reply->type == REDIS_REPLY_ARRAY) {
  1112. /* TODO: check all replies somehow */
  1113. if (session->callback.cb_update) {
  1114. session->callback.cb_update (TRUE,
  1115. session->nadded,
  1116. session->ndeleted,
  1117. session->nextended,
  1118. session->nignored,
  1119. session->cbdata);
  1120. }
  1121. }
  1122. else {
  1123. if (session->callback.cb_update) {
  1124. session->callback.cb_update (FALSE, 0, 0, 0, 0, session->cbdata);
  1125. }
  1126. }
  1127. }
  1128. else {
  1129. if (session->callback.cb_update) {
  1130. session->callback.cb_update (FALSE, 0, 0, 0, 0, session->cbdata);
  1131. }
  1132. if (c->errstr) {
  1133. msg_err_redis_session ("error sending update to redis: %s", c->errstr);
  1134. }
  1135. rspamd_upstream_fail (session->up, FALSE, strerror (errno));
  1136. }
  1137. rspamd_fuzzy_redis_session_dtor (session, FALSE);
  1138. }
  1139. void
  1140. rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk,
  1141. GArray *updates, const gchar *src,
  1142. rspamd_fuzzy_update_cb cb, void *ud,
  1143. void *subr_ud)
  1144. {
  1145. struct rspamd_fuzzy_backend_redis *backend = subr_ud;
  1146. struct rspamd_fuzzy_redis_session *session;
  1147. struct upstream *up;
  1148. struct upstream_list *ups;
  1149. rspamd_inet_addr_t *addr;
  1150. guint i;
  1151. GString *key;
  1152. struct fuzzy_peer_cmd *io_cmd;
  1153. struct rspamd_fuzzy_cmd *cmd = NULL;
  1154. guint nargs, ncommands, cur_shift;
  1155. g_assert (backend != NULL);
  1156. session = g_malloc0 (sizeof (*session));
  1157. session->backend = backend;
  1158. REF_RETAIN (session->backend);
  1159. /*
  1160. * For each normal hash addition we do 3 redis commands:
  1161. * HSET <key> F <flag>
  1162. * HINCRBY <key> V <weight>
  1163. * EXPIRE <key> <expire>
  1164. * INCR <prefix||fuzzy_count>
  1165. *
  1166. * Where <key> is <prefix> || <digest>
  1167. *
  1168. * For each command with shingles we additionally emit 32 commands:
  1169. * SETEX <prefix>_<number>_<value> <expire> <digest>
  1170. *
  1171. * For each delete command we emit:
  1172. * DEL <key>
  1173. *
  1174. * For each delete command with shingles we emit also 32 commands:
  1175. * DEL <prefix>_<number>_<value>
  1176. * DECR <prefix||fuzzy_count>
  1177. */
  1178. ncommands = 3; /* For MULTI + EXEC + INCR <src> */
  1179. nargs = 4;
  1180. for (i = 0; i < updates->len; i ++) {
  1181. io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i);
  1182. if (io_cmd->is_shingle) {
  1183. cmd = &io_cmd->cmd.shingle.basic;
  1184. }
  1185. else {
  1186. cmd = &io_cmd->cmd.normal;
  1187. }
  1188. if (cmd->cmd == FUZZY_WRITE) {
  1189. ncommands += 5;
  1190. nargs += 17;
  1191. session->nadded ++;
  1192. if (io_cmd->is_shingle) {
  1193. ncommands += RSPAMD_SHINGLE_SIZE;
  1194. nargs += RSPAMD_SHINGLE_SIZE * 4;
  1195. }
  1196. }
  1197. else if (cmd->cmd == FUZZY_DEL) {
  1198. ncommands += 2;
  1199. nargs += 4;
  1200. session->ndeleted ++;
  1201. if (io_cmd->is_shingle) {
  1202. ncommands += RSPAMD_SHINGLE_SIZE;
  1203. nargs += RSPAMD_SHINGLE_SIZE * 2;
  1204. }
  1205. }
  1206. else if (cmd->cmd == FUZZY_REFRESH) {
  1207. ncommands += 1;
  1208. nargs += 3;
  1209. session->nextended ++;
  1210. if (io_cmd->is_shingle) {
  1211. ncommands += RSPAMD_SHINGLE_SIZE;
  1212. nargs += RSPAMD_SHINGLE_SIZE * 3;
  1213. }
  1214. }
  1215. else {
  1216. session->nignored ++;
  1217. }
  1218. }
  1219. /* Now we need to create a new request */
  1220. session->callback.cb_update = cb;
  1221. session->cbdata = ud;
  1222. session->command = RSPAMD_FUZZY_REDIS_COMMAND_UPDATES;
  1223. session->cmd = cmd;
  1224. session->prob = 1.0;
  1225. session->event_loop = rspamd_fuzzy_backend_event_base (bk);
  1226. /* First of all check digest */
  1227. session->nargs = nargs;
  1228. session->argv = g_malloc0 (sizeof (gchar *) * session->nargs);
  1229. session->argv_lens = g_malloc0 (sizeof (gsize) * session->nargs);
  1230. ups = rspamd_redis_get_servers (backend, "write_servers");
  1231. up = rspamd_upstream_get (ups,
  1232. RSPAMD_UPSTREAM_MASTER_SLAVE,
  1233. NULL,
  1234. 0);
  1235. session->up = up;
  1236. addr = rspamd_upstream_addr_next (up);
  1237. g_assert (addr != NULL);
  1238. session->ctx = rspamd_redis_pool_connect (backend->pool,
  1239. backend->dbname, backend->password,
  1240. rspamd_inet_address_to_string (addr),
  1241. rspamd_inet_address_get_port (addr));
  1242. if (session->ctx == NULL) {
  1243. rspamd_upstream_fail (up, TRUE, strerror (errno));
  1244. rspamd_fuzzy_redis_session_dtor (session, TRUE);
  1245. if (cb) {
  1246. cb (FALSE, 0, 0, 0, 0, ud);
  1247. }
  1248. }
  1249. else {
  1250. /* Start with MULTI command */
  1251. session->argv[0] = g_strdup ("MULTI");
  1252. session->argv_lens[0] = 5;
  1253. if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
  1254. 1,
  1255. (const gchar **)session->argv,
  1256. session->argv_lens) != REDIS_OK) {
  1257. if (cb) {
  1258. cb (FALSE, 0, 0, 0, 0, ud);
  1259. }
  1260. rspamd_fuzzy_redis_session_dtor (session, TRUE);
  1261. return;
  1262. }
  1263. /* Now split the rest of commands in packs and emit them command by command */
  1264. cur_shift = 1;
  1265. for (i = 0; i < updates->len; i ++) {
  1266. io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i);
  1267. if (!rspamd_fuzzy_update_append_command (bk, session, io_cmd,
  1268. &cur_shift)) {
  1269. if (cb) {
  1270. cb (FALSE, 0, 0, 0, 0, ud);
  1271. }
  1272. rspamd_fuzzy_redis_session_dtor (session, TRUE);
  1273. return;
  1274. }
  1275. }
  1276. /* Now INCR command for the source */
  1277. key = g_string_new (backend->redis_object);
  1278. g_string_append (key, src);
  1279. session->argv[cur_shift] = g_strdup ("INCR");
  1280. session->argv_lens[cur_shift ++] = 4;
  1281. session->argv[cur_shift] = key->str;
  1282. session->argv_lens[cur_shift ++] = key->len;
  1283. g_string_free (key, FALSE);
  1284. if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
  1285. 2,
  1286. (const gchar **)&session->argv[cur_shift - 2],
  1287. &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
  1288. if (cb) {
  1289. cb (FALSE, 0, 0, 0, 0, ud);
  1290. }
  1291. rspamd_fuzzy_redis_session_dtor (session, TRUE);
  1292. return;
  1293. }
  1294. /* Finally we call EXEC with a specific callback */
  1295. session->argv[cur_shift] = g_strdup ("EXEC");
  1296. session->argv_lens[cur_shift] = 4;
  1297. if (redisAsyncCommandArgv (session->ctx,
  1298. rspamd_fuzzy_redis_update_callback, session,
  1299. 1,
  1300. (const gchar **)&session->argv[cur_shift],
  1301. &session->argv_lens[cur_shift]) != REDIS_OK) {
  1302. if (cb) {
  1303. cb (FALSE, 0, 0, 0, 0, ud);
  1304. }
  1305. rspamd_fuzzy_redis_session_dtor (session, TRUE);
  1306. return;
  1307. }
  1308. else {
  1309. /* Add timeout */
  1310. session->timeout.data = session;
  1311. ev_timer_init (&session->timeout,
  1312. rspamd_fuzzy_redis_timeout,
  1313. session->backend->timeout, 0.0);
  1314. ev_timer_start (session->event_loop, &session->timeout);
  1315. }
  1316. }
  1317. }
  1318. void
  1319. rspamd_fuzzy_backend_close_redis (struct rspamd_fuzzy_backend *bk,
  1320. void *subr_ud)
  1321. {
  1322. struct rspamd_fuzzy_backend_redis *backend = subr_ud;
  1323. g_assert (backend != NULL);
  1324. REF_RELEASE (backend);
  1325. }