You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

redis_backend.c 43KB


  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "rspamd.h"
  18. #include "stat_internal.h"
  19. #include "upstream.h"
  20. #include "lua/lua_common.h"
  21. #include "libserver/mempool_vars_internal.h"
  22. #ifdef WITH_HIREDIS
  23. #include "hiredis.h"
  24. #include "adapters/libevent.h"
  25. #include "ref.h"
  26. #define msg_debug_stat_redis(...) rspamd_conditional_debug_fast (NULL, NULL, \
  27. rspamd_stat_redis_log_id, "stat_redis", task->task_pool->tag.uid, \
  28. G_STRFUNC, \
  29. __VA_ARGS__)
  30. INIT_LOG_MODULE(stat_redis)
  31. #define REDIS_CTX(p) (struct redis_stat_ctx *)(p)
  32. #define REDIS_RUNTIME(p) (struct redis_stat_runtime *)(p)
  33. #define REDIS_BACKEND_TYPE "redis"
  34. #define REDIS_DEFAULT_PORT 6379
  35. #define REDIS_DEFAULT_OBJECT "%s%l"
  36. #define REDIS_DEFAULT_USERS_OBJECT "%s%l%r"
  37. #define REDIS_DEFAULT_TIMEOUT 0.5
  38. #define REDIS_STAT_TIMEOUT 30
  39. struct redis_stat_ctx {
  40. lua_State *L;
  41. struct rspamd_statfile_config *stcf;
  42. gint conf_ref;
  43. struct rspamd_stat_async_elt *stat_elt;
  44. const gchar *redis_object;
  45. const gchar *password;
  46. const gchar *dbname;
  47. gdouble timeout;
  48. gboolean enable_users;
  49. gboolean store_tokens;
  50. gboolean new_schema;
  51. gboolean enable_signatures;
  52. guint expiry;
  53. gint cbref_user;
  54. };
  55. enum rspamd_redis_connection_state {
  56. RSPAMD_REDIS_DISCONNECTED = 0,
  57. RSPAMD_REDIS_CONNECTED,
  58. RSPAMD_REDIS_REQUEST_SENT,
  59. RSPAMD_REDIS_TIMEDOUT,
  60. RSPAMD_REDIS_TERMINATED
  61. };
  62. struct redis_stat_runtime {
  63. struct redis_stat_ctx *ctx;
  64. struct rspamd_task *task;
  65. struct upstream *selected;
  66. struct event timeout_event;
  67. GArray *results;
  68. struct rspamd_statfile_config *stcf;
  69. gchar *redis_object_expanded;
  70. redisAsyncContext *redis;
  71. guint64 learned;
  72. gint id;
  73. gboolean has_event;
  74. GError *err;
  75. };
  76. /* Used to get statistics from redis */
  77. struct rspamd_redis_stat_cbdata;
  78. struct rspamd_redis_stat_elt {
  79. struct redis_stat_ctx *ctx;
  80. struct rspamd_stat_async_elt *async;
  81. struct event_base *ev_base;
  82. ucl_object_t *stat;
  83. struct rspamd_redis_stat_cbdata *cbdata;
  84. };
  85. struct rspamd_redis_stat_cbdata {
  86. struct rspamd_redis_stat_elt *elt;
  87. redisAsyncContext *redis;
  88. ucl_object_t *cur;
  89. GPtrArray *cur_keys;
  90. struct upstream *selected;
  91. guint inflight;
  92. gboolean wanna_die;
  93. };
  94. #define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt)
  95. static const gchar *M = "redis statistics";
  96. static GQuark
  97. rspamd_redis_stat_quark (void)
  98. {
  99. return g_quark_from_static_string (M);
  100. }
  101. static inline struct upstream_list *
  102. rspamd_redis_get_servers (struct redis_stat_ctx *ctx,
  103. const gchar *what)
  104. {
  105. lua_State *L = ctx->L;
  106. struct upstream_list *res;
  107. lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->conf_ref);
  108. lua_pushstring (L, what);
  109. lua_gettable (L, -2);
  110. res = *((struct upstream_list**)lua_touserdata (L, -1));
  111. lua_settop (L, 0);
  112. return res;
  113. }
  114. /*
  115. * Non-static for lua unit testing
  116. */
  117. gsize
  118. rspamd_redis_expand_object (const gchar *pattern,
  119. struct redis_stat_ctx *ctx,
  120. struct rspamd_task *task,
  121. gchar **target)
  122. {
  123. gsize tlen = 0;
  124. const gchar *p = pattern, *elt;
  125. gchar *d, *end;
  126. enum {
  127. just_char,
  128. percent_char,
  129. mod_char
  130. } state = just_char;
  131. struct rspamd_statfile_config *stcf;
  132. lua_State *L = NULL;
  133. struct rspamd_task **ptask;
  134. GString *tb;
  135. const gchar *rcpt = NULL;
  136. gint err_idx;
  137. g_assert (ctx != NULL);
  138. stcf = ctx->stcf;
  139. L = task->cfg->lua_state;
  140. if (ctx->enable_users) {
  141. if (ctx->cbref_user == -1) {
  142. rcpt = rspamd_task_get_principal_recipient (task);
  143. }
  144. else if (L) {
  145. /* Execute lua function to get userdata */
  146. lua_pushcfunction (L, &rspamd_lua_traceback);
  147. err_idx = lua_gettop (L);
  148. lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->cbref_user);
  149. ptask = lua_newuserdata (L, sizeof (struct rspamd_task *));
  150. *ptask = task;
  151. rspamd_lua_setclass (L, "rspamd{task}", -1);
  152. if (lua_pcall (L, 1, 1, err_idx) != 0) {
  153. tb = lua_touserdata (L, -1);
  154. msg_err_task ("call to user extraction script failed: %v", tb);
  155. g_string_free (tb, TRUE);
  156. }
  157. else {
  158. rcpt = rspamd_mempool_strdup (task->task_pool, lua_tostring (L, -1));
  159. }
  160. /* Result + error function */
  161. lua_pop (L, 2);
  162. }
  163. if (rcpt) {
  164. rspamd_mempool_set_variable (task->task_pool, "stat_user",
  165. (gpointer)rcpt, NULL);
  166. }
  167. }
  168. /* Length calculation */
  169. while (*p) {
  170. switch (state) {
  171. case just_char:
  172. if (*p == '%') {
  173. state = percent_char;
  174. }
  175. else {
  176. tlen ++;
  177. }
  178. p ++;
  179. break;
  180. case percent_char:
  181. switch (*p) {
  182. case '%':
  183. tlen ++;
  184. state = just_char;
  185. break;
  186. case 'u':
  187. elt = GET_TASK_ELT (task, user);
  188. if (elt) {
  189. tlen += strlen (elt);
  190. }
  191. break;
  192. case 'r':
  193. if (rcpt == NULL) {
  194. elt = rspamd_task_get_principal_recipient (task);
  195. }
  196. else {
  197. elt = rcpt;
  198. }
  199. if (elt) {
  200. tlen += strlen (elt);
  201. }
  202. break;
  203. case 'l':
  204. if (stcf->label) {
  205. tlen += strlen (stcf->label);
  206. }
  207. /* Label miss is OK */
  208. break;
  209. case 's':
  210. if (ctx->new_schema) {
  211. tlen += sizeof ("RS") - 1;
  212. }
  213. else {
  214. if (stcf->symbol) {
  215. tlen += strlen (stcf->symbol);
  216. }
  217. }
  218. break;
  219. default:
  220. state = just_char;
  221. tlen ++;
  222. break;
  223. }
  224. if (state == percent_char) {
  225. state = mod_char;
  226. }
  227. p ++;
  228. break;
  229. case mod_char:
  230. switch (*p) {
  231. case 'd':
  232. p ++;
  233. state = just_char;
  234. break;
  235. default:
  236. state = just_char;
  237. break;
  238. }
  239. break;
  240. }
  241. }
  242. if (target == NULL || task == NULL) {
  243. return -1;
  244. }
  245. *target = rspamd_mempool_alloc (task->task_pool, tlen + 1);
  246. d = *target;
  247. end = d + tlen + 1;
  248. d[tlen] = '\0';
  249. p = pattern;
  250. state = just_char;
  251. /* Expand string */
  252. while (*p && d < end) {
  253. switch (state) {
  254. case just_char:
  255. if (*p == '%') {
  256. state = percent_char;
  257. }
  258. else {
  259. *d++ = *p;
  260. }
  261. p ++;
  262. break;
  263. case percent_char:
  264. switch (*p) {
  265. case '%':
  266. *d++ = *p;
  267. state = just_char;
  268. break;
  269. case 'u':
  270. elt = GET_TASK_ELT (task, user);
  271. if (elt) {
  272. d += rspamd_strlcpy (d, elt, end - d);
  273. }
  274. break;
  275. case 'r':
  276. if (rcpt == NULL) {
  277. elt = rspamd_task_get_principal_recipient (task);
  278. }
  279. else {
  280. elt = rcpt;
  281. }
  282. if (elt) {
  283. d += rspamd_strlcpy (d, elt, end - d);
  284. }
  285. break;
  286. case 'l':
  287. if (stcf->label) {
  288. d += rspamd_strlcpy (d, stcf->label, end - d);
  289. }
  290. break;
  291. case 's':
  292. if (ctx->new_schema) {
  293. d += rspamd_strlcpy (d, "RS", end - d);
  294. }
  295. else {
  296. if (stcf->symbol) {
  297. d += rspamd_strlcpy (d, stcf->symbol, end - d);
  298. }
  299. }
  300. break;
  301. default:
  302. state = just_char;
  303. *d++ = *p;
  304. break;
  305. }
  306. if (state == percent_char) {
  307. state = mod_char;
  308. }
  309. p ++;
  310. break;
  311. case mod_char:
  312. switch (*p) {
  313. case 'd':
  314. /* TODO: not supported yet */
  315. p ++;
  316. state = just_char;
  317. break;
  318. default:
  319. state = just_char;
  320. break;
  321. }
  322. break;
  323. }
  324. }
  325. return tlen;
  326. }
  327. static void
  328. rspamd_redis_maybe_auth (struct redis_stat_ctx *ctx, redisAsyncContext *redis)
  329. {
  330. if (ctx->password) {
  331. redisAsyncCommand (redis, NULL, NULL, "AUTH %s", ctx->password);
  332. }
  333. if (ctx->dbname) {
  334. redisAsyncCommand (redis, NULL, NULL, "SELECT %s", ctx->dbname);
  335. }
  336. }
  337. static rspamd_fstring_t *
  338. rspamd_redis_tokens_to_query (struct rspamd_task *task,
  339. struct redis_stat_runtime *rt,
  340. GPtrArray *tokens,
  341. const gchar *command,
  342. const gchar *prefix,
  343. gboolean learn,
  344. gint idx,
  345. gboolean intvals)
  346. {
  347. rspamd_fstring_t *out;
  348. rspamd_token_t *tok;
  349. gchar n0[512], n1[64];
  350. guint i, l0, l1, cmd_len, prefix_len;
  351. gint ret;
  352. g_assert (tokens != NULL);
  353. cmd_len = strlen (command);
  354. prefix_len = strlen (prefix);
  355. out = rspamd_fstring_sized_new (1024);
  356. if (learn) {
  357. rspamd_printf_fstring (&out, "*1\r\n$5\r\nMULTI\r\n");
  358. ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  359. out->str, out->len);
  360. if (ret != REDIS_OK) {
  361. msg_err_task ("call to redis failed: %s", rt->redis->errstr);
  362. rspamd_fstring_free (out);
  363. return NULL;
  364. }
  365. out->len = 0;
  366. }
  367. else {
  368. if (rt->ctx->new_schema) {
  369. /* Multi + HGET */
  370. rspamd_printf_fstring (&out, "*1\r\n$5\r\nMULTI\r\n");
  371. ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  372. out->str, out->len);
  373. if (ret != REDIS_OK) {
  374. msg_err_task ("call to redis failed: %s", rt->redis->errstr);
  375. rspamd_fstring_free (out);
  376. return NULL;
  377. }
  378. out->len = 0;
  379. }
  380. else {
  381. rspamd_printf_fstring (&out, ""
  382. "*%d\r\n"
  383. "$%d\r\n"
  384. "%s\r\n"
  385. "$%d\r\n"
  386. "%s\r\n",
  387. (tokens->len + 2),
  388. cmd_len, command,
  389. prefix_len, prefix);
  390. }
  391. }
  392. for (i = 0; i < tokens->len; i ++) {
  393. tok = g_ptr_array_index (tokens, i);
  394. if (learn) {
  395. if (intvals) {
  396. l1 = rspamd_snprintf (n1, sizeof (n1), "%L",
  397. (gint64) tok->values[idx]);
  398. } else {
  399. l1 = rspamd_snprintf (n1, sizeof (n1), "%f",
  400. tok->values[idx]);
  401. }
  402. if (rt->ctx->new_schema) {
  403. /*
  404. * HINCRBY <prefix_token> <0|1> <value>
  405. */
  406. l0 = rspamd_snprintf (n0, sizeof (n0), "%*s_%uL",
  407. prefix_len, prefix,
  408. tok->data);
  409. rspamd_printf_fstring (&out, ""
  410. "*4\r\n"
  411. "$%d\r\n"
  412. "%s\r\n"
  413. "$%d\r\n"
  414. "%s\r\n"
  415. "$%d\r\n"
  416. "%s\r\n"
  417. "$%d\r\n"
  418. "%s\r\n",
  419. cmd_len, command,
  420. l0, n0,
  421. 1, rt->stcf->is_spam ? "S" : "H",
  422. l1, n1);
  423. }
  424. else {
  425. l0 = rspamd_snprintf (n0, sizeof (n0), "%uL", tok->data);
  426. /*
  427. * HINCRBY <prefix> <token> <value>
  428. */
  429. rspamd_printf_fstring (&out, ""
  430. "*4\r\n"
  431. "$%d\r\n"
  432. "%s\r\n"
  433. "$%d\r\n"
  434. "%s\r\n"
  435. "$%d\r\n"
  436. "%s\r\n"
  437. "$%d\r\n"
  438. "%s\r\n",
  439. cmd_len, command,
  440. prefix_len, prefix,
  441. l0, n0,
  442. l1, n1);
  443. }
  444. ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  445. out->str, out->len);
  446. if (ret != REDIS_OK) {
  447. msg_err_task ("call to redis failed: %s", rt->redis->errstr);
  448. rspamd_fstring_free (out);
  449. return NULL;
  450. }
  451. if (rt->ctx->store_tokens) {
  452. if (!rt->ctx->new_schema) {
  453. /*
  454. * We store tokens in form
  455. * HSET prefix_tokens <token_id> "token_string"
  456. * ZINCRBY prefix_z 1.0 <token_id>
  457. */
  458. if (tok->t1 && tok->t2) {
  459. redisAsyncCommand (rt->redis, NULL, NULL,
  460. "HSET %b_tokens %b %b:%b",
  461. prefix, (size_t) prefix_len,
  462. n0, (size_t) l0,
  463. tok->t1->stemmed.begin, tok->t1->stemmed.len,
  464. tok->t2->stemmed.begin, tok->t2->stemmed.len);
  465. } else if (tok->t1) {
  466. redisAsyncCommand (rt->redis, NULL, NULL,
  467. "HSET %b_tokens %b %b",
  468. prefix, (size_t) prefix_len,
  469. n0, (size_t) l0,
  470. tok->t1->stemmed.begin,
  471. tok->t1->stemmed.len);
  472. }
  473. }
  474. else {
  475. /*
  476. * We store tokens in form
  477. * HSET <token_id> "tokens" "token_string"
  478. * ZINCRBY prefix_z 1.0 <token_id>
  479. */
  480. if (tok->t1 && tok->t2) {
  481. redisAsyncCommand (rt->redis, NULL, NULL,
  482. "HSET %b %s %b:%b",
  483. n0, (size_t) l0,
  484. "tokens",
  485. tok->t1->stemmed.begin, tok->t1->stemmed.len,
  486. tok->t2->stemmed.begin, tok->t2->stemmed.len);
  487. } else if (tok->t1) {
  488. redisAsyncCommand (rt->redis, NULL, NULL,
  489. "HSET %b %s %b",
  490. n0, (size_t) l0,
  491. "tokens",
  492. tok->t1->stemmed.begin, tok->t1->stemmed.len);
  493. }
  494. }
  495. redisAsyncCommand (rt->redis, NULL, NULL,
  496. "ZINCRBY %b_z %b %b",
  497. prefix, (size_t)prefix_len,
  498. n1, (size_t)l1,
  499. n0, (size_t)l0);
  500. }
  501. if (rt->ctx->new_schema && rt->ctx->expiry > 0) {
  502. out->len = 0;
  503. l1 = rspamd_snprintf (n1, sizeof (n1), "%d",
  504. rt->ctx->expiry);
  505. rspamd_printf_fstring (&out, ""
  506. "*3\r\n"
  507. "$6\r\n"
  508. "EXPIRE\r\n"
  509. "$%d\r\n"
  510. "%s\r\n"
  511. "$%d\r\n"
  512. "%s\r\n",
  513. l0, n0,
  514. l1, n1);
  515. redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  516. out->str, out->len);
  517. }
  518. out->len = 0;
  519. }
  520. else {
  521. if (rt->ctx->new_schema) {
  522. l0 = rspamd_snprintf (n0, sizeof (n0), "%*s_%uL",
  523. prefix_len, prefix,
  524. tok->data);
  525. rspamd_printf_fstring (&out, ""
  526. "*3\r\n"
  527. "$%d\r\n"
  528. "%s\r\n"
  529. "$%d\r\n"
  530. "%s\r\n"
  531. "$%d\r\n"
  532. "%s\r\n",
  533. cmd_len, command,
  534. l0, n0,
  535. 1, rt->stcf->is_spam ? "S" : "H");
  536. ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  537. out->str, out->len);
  538. if (ret != REDIS_OK) {
  539. msg_err_task ("call to redis failed: %s", rt->redis->errstr);
  540. rspamd_fstring_free (out);
  541. return NULL;
  542. }
  543. out->len = 0;
  544. }
  545. else {
  546. l0 = rspamd_snprintf (n0, sizeof (n0), "%uL", tok->data);
  547. rspamd_printf_fstring (&out, ""
  548. "$%d\r\n"
  549. "%s\r\n", l0, n0);
  550. }
  551. }
  552. }
  553. if (!learn && rt->ctx->new_schema) {
  554. rspamd_printf_fstring (&out, "*1\r\n$4\r\nEXEC\r\n");
  555. }
  556. return out;
  557. }
  558. static void
  559. rspamd_redis_store_stat_signature (struct rspamd_task *task,
  560. struct redis_stat_runtime *rt,
  561. GPtrArray *tokens,
  562. const gchar *prefix)
  563. {
  564. gchar *sig, keybuf[512], nbuf[64];
  565. rspamd_token_t *tok;
  566. guint i, blen, klen;
  567. rspamd_fstring_t *out;
  568. out = rspamd_fstring_sized_new (1024);
  569. sig = rspamd_mempool_get_variable (task->task_pool,
  570. RSPAMD_MEMPOOL_STAT_SIGNATURE);
  571. if (sig == NULL) {
  572. msg_err_task ("cannot get bayes signature");
  573. return;
  574. }
  575. klen = rspamd_snprintf (keybuf, sizeof (keybuf), "%s_%s_%s",
  576. prefix, sig, rt->stcf->is_spam ? "S" : "H");
  577. out->len = 0;
  578. /* Cleanup key */
  579. rspamd_printf_fstring (&out, ""
  580. "*2\r\n"
  581. "$3\r\n"
  582. "DEL\r\n"
  583. "$%d\r\n"
  584. "%s\r\n",
  585. klen, keybuf);
  586. redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  587. out->str, out->len);
  588. out->len = 0;
  589. rspamd_printf_fstring (&out, ""
  590. "*%d\r\n"
  591. "$5\r\n"
  592. "LPUSH\r\n"
  593. "$%d\r\n"
  594. "%s\r\n",
  595. tokens->len + 2,
  596. klen, keybuf);
  597. PTR_ARRAY_FOREACH (tokens, i, tok) {
  598. blen = rspamd_snprintf (nbuf, sizeof (nbuf), "%uL", tok->data);
  599. rspamd_printf_fstring (&out, ""
  600. "$%d\r\n"
  601. "%s\r\n", blen, nbuf);
  602. }
  603. redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  604. out->str, out->len);
  605. out->len = 0;
  606. if (rt->ctx->expiry > 0) {
  607. out->len = 0;
  608. blen = rspamd_snprintf (nbuf, sizeof (nbuf), "%d",
  609. rt->ctx->expiry);
  610. rspamd_printf_fstring (&out, ""
  611. "*3\r\n"
  612. "$6\r\n"
  613. "EXPIRE\r\n"
  614. "$%d\r\n"
  615. "%s\r\n"
  616. "$%d\r\n"
  617. "%s\r\n",
  618. klen, keybuf,
  619. blen, nbuf);
  620. redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  621. out->str, out->len);
  622. }
  623. rspamd_fstring_free (out);
  624. }
  625. static void
  626. rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata)
  627. {
  628. guint i;
  629. gchar *k;
  630. if (cbdata && !cbdata->wanna_die) {
  631. /* Avoid double frees */
  632. cbdata->wanna_die = TRUE;
  633. redisAsyncFree (cbdata->redis);
  634. for (i = 0; i < cbdata->cur_keys->len; i ++) {
  635. k = g_ptr_array_index (cbdata->cur_keys, i);
  636. g_free (k);
  637. }
  638. g_ptr_array_free (cbdata->cur_keys, TRUE);
  639. if (cbdata->elt) {
  640. cbdata->elt->cbdata = NULL;
  641. /* Re-enable parent event */
  642. cbdata->elt->async->enabled = TRUE;
  643. /* Replace ucl object */
  644. if (cbdata->cur) {
  645. if (cbdata->elt->stat) {
  646. ucl_object_unref (cbdata->elt->stat);
  647. }
  648. cbdata->elt->stat = cbdata->cur;
  649. cbdata->cur = NULL;
  650. }
  651. }
  652. if (cbdata->cur) {
  653. ucl_object_unref (cbdata->cur);
  654. }
  655. g_free (cbdata);
  656. }
  657. }
  658. /* Called when we get number of learns for a specific key */
  659. static void
  660. rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv)
  661. {
  662. struct rspamd_redis_stat_cbdata *cbdata = priv;
  663. redisReply *reply = r;
  664. ucl_object_t *obj;
  665. gulong num = 0;
  666. if (cbdata->wanna_die) {
  667. return;
  668. }
  669. cbdata->inflight --;
  670. if (c->err == 0 && r != NULL) {
  671. if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
  672. num = reply->integer;
  673. }
  674. else if (reply->type == REDIS_REPLY_STRING) {
  675. rspamd_strtoul (reply->str, reply->len, &num);
  676. }
  677. obj = (ucl_object_t *) ucl_object_lookup (cbdata->cur, "revision");
  678. if (obj) {
  679. obj->value.iv += num;
  680. }
  681. }
  682. if (cbdata->inflight == 0) {
  683. rspamd_redis_async_cbdata_cleanup (cbdata);
  684. }
  685. }
  686. /* Called when we get number of elements for a specific key */
  687. static void
  688. rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv)
  689. {
  690. struct rspamd_redis_stat_cbdata *cbdata = priv;
  691. redisReply *reply = r;
  692. ucl_object_t *obj;
  693. glong num = 0;
  694. if (cbdata->wanna_die) {
  695. return;
  696. }
  697. cbdata->inflight --;
  698. if (c->err == 0 && r != NULL) {
  699. if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
  700. num = reply->integer;
  701. }
  702. else if (reply->type == REDIS_REPLY_STRING) {
  703. rspamd_strtol (reply->str, reply->len, &num);
  704. }
  705. if (num < 0) {
  706. msg_err ("bad learns count: %L", (gint64)num);
  707. num = 0;
  708. }
  709. obj = (ucl_object_t *)ucl_object_lookup (cbdata->cur, "used");
  710. if (obj) {
  711. obj->value.iv += num;
  712. }
  713. obj = (ucl_object_t *)ucl_object_lookup (cbdata->cur, "total");
  714. if (obj) {
  715. obj->value.iv += num;
  716. }
  717. obj = (ucl_object_t *)ucl_object_lookup (cbdata->cur, "size");
  718. if (obj) {
  719. /* Size of key + size of int64_t */
  720. obj->value.iv += num * (sizeof (G_STRINGIFY (G_MAXINT64)) +
  721. sizeof (guint64) + sizeof (gpointer));
  722. }
  723. }
  724. if (cbdata->inflight == 0) {
  725. rspamd_redis_async_cbdata_cleanup (cbdata);
  726. }
  727. }
  728. /* Called when we have connected to the redis server and got keys to check */
  729. static void
  730. rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
  731. {
  732. struct rspamd_redis_stat_cbdata *cbdata = priv;
  733. redisReply *reply = r, *elt;
  734. gchar **pk, *k;
  735. guint i, processed = 0;
  736. if (cbdata->wanna_die) {
  737. return;
  738. }
  739. cbdata->inflight --;
  740. if (c->err == 0 && r != NULL) {
  741. if (reply->type == REDIS_REPLY_ARRAY) {
  742. g_ptr_array_set_size (cbdata->cur_keys, reply->elements);
  743. for (i = 0; i < reply->elements; i ++) {
  744. elt = reply->element[i];
  745. if (elt->type == REDIS_REPLY_STRING) {
  746. pk = (gchar **)&g_ptr_array_index (cbdata->cur_keys, i);
  747. *pk = g_malloc (elt->len + 1);
  748. rspamd_strlcpy (*pk, elt->str, elt->len + 1);
  749. processed ++;
  750. }
  751. }
  752. if (processed) {
  753. for (i = 0; i < cbdata->cur_keys->len; i ++) {
  754. k = (gchar *)g_ptr_array_index (cbdata->cur_keys, i);
  755. if (k) {
  756. const gchar *learned_key = "learns";
  757. if (cbdata->elt->ctx->new_schema) {
  758. if (cbdata->elt->ctx->stcf->is_spam) {
  759. learned_key = "learns_spam";
  760. }
  761. else {
  762. learned_key = "learns_ham";
  763. }
  764. redisAsyncCommand (cbdata->redis,
  765. rspamd_redis_stat_learns,
  766. cbdata,
  767. "HGET %s %s",
  768. k, learned_key);
  769. cbdata->inflight += 1;
  770. }
  771. else {
  772. redisAsyncCommand (cbdata->redis,
  773. rspamd_redis_stat_key,
  774. cbdata,
  775. "HLEN %s",
  776. k);
  777. redisAsyncCommand (cbdata->redis,
  778. rspamd_redis_stat_learns,
  779. cbdata,
  780. "HGET %s %s",
  781. k, learned_key);
  782. cbdata->inflight += 2;
  783. }
  784. }
  785. }
  786. }
  787. }
  788. /* Set up the required keys */
  789. ucl_object_insert_key (cbdata->cur,
  790. ucl_object_typed_new (UCL_INT), "revision", 0, false);
  791. ucl_object_insert_key (cbdata->cur,
  792. ucl_object_typed_new (UCL_INT), "used", 0, false);
  793. ucl_object_insert_key (cbdata->cur,
  794. ucl_object_typed_new (UCL_INT), "total", 0, false);
  795. ucl_object_insert_key (cbdata->cur,
  796. ucl_object_typed_new (UCL_INT), "size", 0, false);
  797. ucl_object_insert_key (cbdata->cur,
  798. ucl_object_fromstring (cbdata->elt->ctx->stcf->symbol),
  799. "symbol", 0, false);
  800. ucl_object_insert_key (cbdata->cur, ucl_object_fromstring ("redis"),
  801. "type", 0, false);
  802. ucl_object_insert_key (cbdata->cur, ucl_object_fromint (0),
  803. "languages", 0, false);
  804. ucl_object_insert_key (cbdata->cur, ucl_object_fromint (processed),
  805. "users", 0, false);
  806. rspamd_upstream_ok (cbdata->selected);
  807. if (cbdata->inflight == 0) {
  808. rspamd_redis_async_cbdata_cleanup (cbdata);
  809. }
  810. }
  811. else {
  812. if (c->errstr) {
  813. msg_err ("cannot get keys to gather stat: %s", c->errstr);
  814. }
  815. else {
  816. msg_err ("cannot get keys to gather stat: unknown error");
  817. }
  818. rspamd_upstream_fail (cbdata->selected, FALSE);
  819. rspamd_redis_async_cbdata_cleanup (cbdata);
  820. }
  821. }
  822. static void
  823. rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
  824. {
  825. struct redis_stat_ctx *ctx;
  826. struct rspamd_redis_stat_elt *redis_elt = elt->ud;
  827. struct rspamd_redis_stat_cbdata *cbdata;
  828. rspamd_inet_addr_t *addr;
  829. struct upstream_list *ups;
  830. g_assert (redis_elt != NULL);
  831. ctx = redis_elt->ctx;
  832. if (redis_elt->cbdata) {
  833. /* We have some other process pending */
  834. rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
  835. }
  836. /* Disable further events unless needed */
  837. elt->enabled = FALSE;
  838. ups = rspamd_redis_get_servers (ctx, "read_servers");
  839. if (!ups) {
  840. return;
  841. }
  842. cbdata = g_malloc0 (sizeof (*cbdata));
  843. cbdata->selected = rspamd_upstream_get (ups,
  844. RSPAMD_UPSTREAM_ROUND_ROBIN,
  845. NULL,
  846. 0);
  847. g_assert (cbdata->selected != NULL);
  848. addr = rspamd_upstream_addr_next (cbdata->selected);
  849. g_assert (addr != NULL);
  850. if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
  851. cbdata->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
  852. }
  853. else {
  854. cbdata->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
  855. rspamd_inet_address_get_port (addr));
  856. }
  857. g_assert (cbdata->redis != NULL);
  858. redisLibeventAttach (cbdata->redis, redis_elt->ev_base);
  859. cbdata->inflight = 1;
  860. cbdata->cur = ucl_object_typed_new (UCL_OBJECT);
  861. cbdata->elt = redis_elt;
  862. cbdata->cur_keys = g_ptr_array_new ();
  863. redis_elt->cbdata = cbdata;
  864. /* XXX: deal with timeouts maybe */
  865. /* Get keys in redis that match our symbol */
  866. rspamd_redis_maybe_auth (ctx, cbdata->redis);
  867. redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, cbdata,
  868. "SMEMBERS %s_keys",
  869. ctx->stcf->symbol);
  870. }
  871. static void
  872. rspamd_redis_async_stat_fin (struct rspamd_stat_async_elt *elt, gpointer d)
  873. {
  874. struct rspamd_redis_stat_elt *redis_elt = elt->ud;
  875. rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
  876. }
  877. /* Called on connection termination */
  878. static void
  879. rspamd_redis_fin (gpointer data)
  880. {
  881. struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
  882. redisAsyncContext *redis;
  883. rt->has_event = FALSE;
  884. /* Stop timeout */
  885. if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
  886. event_del (&rt->timeout_event);
  887. }
  888. if (rt->redis) {
  889. redis = rt->redis;
  890. rt->redis = NULL;
  891. /* This calls for all callbacks pending */
  892. redisAsyncFree (redis);
  893. }
  894. }
  895. static void
  896. rspamd_redis_fin_learn (gpointer data)
  897. {
  898. struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
  899. redisAsyncContext *redis;
  900. rt->has_event = FALSE;
  901. /* Stop timeout */
  902. if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
  903. event_del (&rt->timeout_event);
  904. }
  905. if (rt->redis) {
  906. redis = rt->redis;
  907. rt->redis = NULL;
  908. /* This calls for all callbacks pending */
  909. redisAsyncFree (redis);
  910. }
  911. }
  912. static void
  913. rspamd_redis_timeout (gint fd, short what, gpointer d)
  914. {
  915. struct redis_stat_runtime *rt = REDIS_RUNTIME (d);
  916. struct rspamd_task *task;
  917. redisAsyncContext *redis;
  918. task = rt->task;
  919. msg_err_task_check ("connection to redis server %s timed out",
  920. rspamd_upstream_name (rt->selected));
  921. rspamd_upstream_fail (rt->selected, FALSE);
  922. if (rt->redis) {
  923. redis = rt->redis;
  924. rt->redis = NULL;
  925. /* This calls for all callbacks pending */
  926. redisAsyncFree (redis);
  927. }
  928. if (!rt->err) {
  929. g_set_error (&rt->err, rspamd_redis_stat_quark (), ETIMEDOUT,
  930. "error getting reply from redis server %s: timeout",
  931. rspamd_upstream_name (rt->selected));
  932. }
  933. }
  934. /* Called when we have connected to the redis server and got stats */
  935. static void
  936. rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
  937. {
  938. struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
  939. redisReply *reply = r;
  940. struct rspamd_task *task;
  941. glong val = 0;
  942. task = rt->task;
  943. if (c->err == 0) {
  944. if (r != NULL) {
  945. if (G_UNLIKELY (reply->type == REDIS_REPLY_INTEGER)) {
  946. val = reply->integer;
  947. }
  948. else if (reply->type == REDIS_REPLY_STRING) {
  949. rspamd_strtol (reply->str, reply->len, &val);
  950. }
  951. else {
  952. if (reply->type != REDIS_REPLY_NIL) {
  953. msg_err_task ("bad learned type for %s: %s, nil expected",
  954. rt->stcf->symbol,
  955. rspamd_redis_type_to_string (reply->type));
  956. }
  957. val = 0;
  958. }
  959. if (val < 0) {
  960. msg_warn_task ("invalid number of learns for %s: %L",
  961. rt->stcf->symbol, val);
  962. val = 0;
  963. }
  964. rt->learned = val;
  965. msg_debug_stat_redis ("connected to redis server, tokens learned for %s: %uL",
  966. rt->redis_object_expanded, rt->learned);
  967. rspamd_upstream_ok (rt->selected);
  968. }
  969. }
  970. else {
  971. msg_err_task ("error getting reply from redis server %s: %s",
  972. rspamd_upstream_name (rt->selected), c->errstr);
  973. rspamd_upstream_fail (rt->selected, FALSE);
  974. if (!rt->err) {
  975. g_set_error (&rt->err, rspamd_redis_stat_quark (), c->err,
  976. "error getting reply from redis server %s: %s",
  977. rspamd_upstream_name (rt->selected), c->errstr);
  978. }
  979. }
  980. }
  981. /* Called when we have received tokens values from redis */
  982. static void
  983. rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv)
  984. {
  985. struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
  986. redisReply *reply = r, *elt;
  987. struct rspamd_task *task;
  988. rspamd_token_t *tok;
  989. guint i, processed = 0, found = 0;
  990. gulong val;
  991. gdouble float_val;
  992. task = rt->task;
  993. if (c->err == 0) {
  994. if (r != NULL) {
  995. if (reply->type == REDIS_REPLY_ARRAY) {
  996. if (reply->elements == task->tokens->len) {
  997. for (i = 0; i < reply->elements; i ++) {
  998. tok = g_ptr_array_index (task->tokens, i);
  999. elt = reply->element[i];
  1000. if (G_UNLIKELY (elt->type == REDIS_REPLY_INTEGER)) {
  1001. tok->values[rt->id] = elt->integer;
  1002. found ++;
  1003. }
  1004. else if (elt->type == REDIS_REPLY_STRING) {
  1005. if (rt->stcf->clcf->flags &
  1006. RSPAMD_FLAG_CLASSIFIER_INTEGER) {
  1007. rspamd_strtoul (elt->str, elt->len, &val);
  1008. tok->values[rt->id] = val;
  1009. }
  1010. else {
  1011. float_val = strtod (elt->str, NULL);
  1012. tok->values[rt->id] = float_val;
  1013. }
  1014. found ++;
  1015. }
  1016. else {
  1017. tok->values[rt->id] = 0;
  1018. }
  1019. processed ++;
  1020. }
  1021. if (rt->stcf->is_spam) {
  1022. task->flags |= RSPAMD_TASK_FLAG_HAS_SPAM_TOKENS;
  1023. }
  1024. else {
  1025. task->flags |= RSPAMD_TASK_FLAG_HAS_HAM_TOKENS;
  1026. }
  1027. }
  1028. else {
  1029. msg_err_task_check ("got invalid length of reply vector from redis: "
  1030. "%d, expected: %d",
  1031. (gint)reply->elements,
  1032. (gint)task->tokens->len);
  1033. }
  1034. }
  1035. else {
  1036. msg_err_task_check ("got invalid reply from redis: %s, array expected",
  1037. rspamd_redis_type_to_string (reply->type));
  1038. }
  1039. msg_debug_stat_redis ("received tokens for %s: %d processed, %d found",
  1040. rt->redis_object_expanded, processed, found);
  1041. rspamd_upstream_ok (rt->selected);
  1042. }
  1043. }
  1044. else {
  1045. msg_err_task ("error getting reply from redis server %s: %s",
  1046. rspamd_upstream_name (rt->selected), c->errstr);
  1047. if (rt->redis) {
  1048. rspamd_upstream_fail (rt->selected, FALSE);
  1049. }
  1050. if (!rt->err) {
  1051. g_set_error (&rt->err, rspamd_redis_stat_quark (), c->err,
  1052. "cannot get values: error getting reply from redis server %s: %s",
  1053. rspamd_upstream_name (rt->selected), c->errstr);
  1054. }
  1055. }
  1056. if (rt->has_event) {
  1057. rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
  1058. }
  1059. }
  1060. /* Called when we have set tokens during learning */
  1061. static void
  1062. rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv)
  1063. {
  1064. struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
  1065. struct rspamd_task *task;
  1066. task = rt->task;
  1067. if (c->err == 0) {
  1068. rspamd_upstream_ok (rt->selected);
  1069. }
  1070. else {
  1071. msg_err_task_check ("error getting reply from redis server %s: %s",
  1072. rspamd_upstream_name (rt->selected), c->errstr);
  1073. if (rt->redis) {
  1074. rspamd_upstream_fail (rt->selected, FALSE);
  1075. }
  1076. if (!rt->err) {
  1077. g_set_error (&rt->err, rspamd_redis_stat_quark (), c->err,
  1078. "cannot get learned: error getting reply from redis server %s: %s",
  1079. rspamd_upstream_name (rt->selected), c->errstr);
  1080. }
  1081. }
  1082. if (rt->has_event) {
  1083. rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt);
  1084. }
  1085. }
  1086. static void
  1087. rspamd_redis_parse_classifier_opts (struct redis_stat_ctx *backend,
  1088. const ucl_object_t *obj,
  1089. struct rspamd_config *cfg)
  1090. {
  1091. const gchar *lua_script;
  1092. const ucl_object_t *elt, *users_enabled;
  1093. users_enabled = ucl_object_lookup_any (obj, "per_user",
  1094. "users_enabled", NULL);
  1095. if (users_enabled != NULL) {
  1096. if (ucl_object_type (users_enabled) == UCL_BOOLEAN) {
  1097. backend->enable_users = ucl_object_toboolean (users_enabled);
  1098. backend->cbref_user = -1;
  1099. }
  1100. else if (ucl_object_type (users_enabled) == UCL_STRING) {
  1101. lua_script = ucl_object_tostring (users_enabled);
  1102. if (luaL_dostring (cfg->lua_state, lua_script) != 0) {
  1103. msg_err_config ("cannot execute lua script for users "
  1104. "extraction: %s", lua_tostring (cfg->lua_state, -1));
  1105. }
  1106. else {
  1107. if (lua_type (cfg->lua_state, -1) == LUA_TFUNCTION) {
  1108. backend->enable_users = TRUE;
  1109. backend->cbref_user = luaL_ref (cfg->lua_state,
  1110. LUA_REGISTRYINDEX);
  1111. }
  1112. else {
  1113. msg_err_config ("lua script must return "
  1114. "function(task) and not %s",
  1115. lua_typename (cfg->lua_state, lua_type (
  1116. cfg->lua_state, -1)));
  1117. }
  1118. }
  1119. }
  1120. }
  1121. else {
  1122. backend->enable_users = FALSE;
  1123. backend->cbref_user = -1;
  1124. }
  1125. elt = ucl_object_lookup (obj, "prefix");
  1126. if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
  1127. /* Default non-users statistics */
  1128. if (backend->enable_users || backend->cbref_user != -1) {
  1129. backend->redis_object = REDIS_DEFAULT_USERS_OBJECT;
  1130. }
  1131. else {
  1132. backend->redis_object = REDIS_DEFAULT_OBJECT;
  1133. }
  1134. }
  1135. else {
  1136. /* XXX: sanity check */
  1137. backend->redis_object = ucl_object_tostring (elt);
  1138. }
  1139. elt = ucl_object_lookup (obj, "store_tokens");
  1140. if (elt) {
  1141. backend->store_tokens = ucl_object_toboolean (elt);
  1142. }
  1143. else {
  1144. backend->store_tokens = FALSE;
  1145. }
  1146. elt = ucl_object_lookup (obj, "new_schema");
  1147. if (elt) {
  1148. backend->new_schema = ucl_object_toboolean (elt);
  1149. }
  1150. else {
  1151. backend->new_schema = FALSE;
  1152. msg_warn_config ("you are using old bayes schema for redis statistics, "
  1153. "please consider converting it to a new one "
  1154. "by using 'rspamadm configwizard statistics'");
  1155. }
  1156. elt = ucl_object_lookup (obj, "signatures");
  1157. if (elt) {
  1158. backend->enable_signatures = ucl_object_toboolean (elt);
  1159. }
  1160. else {
  1161. backend->enable_signatures = FALSE;
  1162. }
  1163. elt = ucl_object_lookup_any (obj, "expiry", "expire", NULL);
  1164. if (elt) {
  1165. backend->expiry = ucl_object_toint (elt);
  1166. }
  1167. else {
  1168. backend->expiry = 0;
  1169. }
  1170. }
  1171. gpointer
  1172. rspamd_redis_init (struct rspamd_stat_ctx *ctx,
  1173. struct rspamd_config *cfg, struct rspamd_statfile *st)
  1174. {
  1175. struct redis_stat_ctx *backend;
  1176. struct rspamd_statfile_config *stf = st->stcf;
  1177. struct rspamd_redis_stat_elt *st_elt;
  1178. const ucl_object_t *obj;
  1179. gboolean ret = FALSE;
  1180. gint conf_ref = -1;
  1181. lua_State *L = (lua_State *)cfg->lua_state;
  1182. backend = g_malloc0 (sizeof (*backend));
  1183. backend->L = L;
  1184. backend->timeout = REDIS_DEFAULT_TIMEOUT;
  1185. /* First search in backend configuration */
  1186. obj = ucl_object_lookup (st->classifier->cfg->opts, "backend");
  1187. if (obj != NULL && ucl_object_type (obj) == UCL_OBJECT) {
  1188. ret = rspamd_lua_try_load_redis (L, obj, cfg, &conf_ref);
  1189. }
  1190. /* Now try statfiles config */
  1191. if (!ret && stf->opts) {
  1192. ret = rspamd_lua_try_load_redis (L, stf->opts, cfg, &conf_ref);
  1193. }
  1194. /* Now try classifier config */
  1195. if (!ret && st->classifier->cfg->opts) {
  1196. ret = rspamd_lua_try_load_redis (L, st->classifier->cfg->opts, cfg, &conf_ref);
  1197. }
  1198. /* Now try global redis settings */
  1199. if (!ret) {
  1200. obj = ucl_object_lookup (cfg->rcl_obj, "redis");
  1201. if (obj) {
  1202. const ucl_object_t *specific_obj;
  1203. specific_obj = ucl_object_lookup (obj, "statistics");
  1204. if (specific_obj) {
  1205. ret = rspamd_lua_try_load_redis (L,
  1206. specific_obj, cfg, &conf_ref);
  1207. }
  1208. else {
  1209. ret = rspamd_lua_try_load_redis (L,
  1210. obj, cfg, &conf_ref);
  1211. }
  1212. }
  1213. }
  1214. if (!ret) {
  1215. msg_err_config ("cannot init redis backend for %s", stf->symbol);
  1216. g_free (backend);
  1217. return NULL;
  1218. }
  1219. backend->conf_ref = conf_ref;
  1220. /* Check some common table values */
  1221. lua_rawgeti (L, LUA_REGISTRYINDEX, conf_ref);
  1222. lua_pushstring (L, "timeout");
  1223. lua_gettable (L, -2);
  1224. if (lua_type (L, -1) == LUA_TNUMBER) {
  1225. backend->timeout = lua_tonumber (L, -1);
  1226. }
  1227. lua_pop (L, 1);
  1228. lua_pushstring (L, "db");
  1229. lua_gettable (L, -2);
  1230. if (lua_type (L, -1) == LUA_TSTRING) {
  1231. backend->dbname = rspamd_mempool_strdup (cfg->cfg_pool,
  1232. lua_tostring (L, -1));
  1233. }
  1234. lua_pop (L, 1);
  1235. lua_pushstring (L, "password");
  1236. lua_gettable (L, -2);
  1237. if (lua_type (L, -1) == LUA_TSTRING) {
  1238. backend->password = rspamd_mempool_strdup (cfg->cfg_pool,
  1239. lua_tostring (L, -1));
  1240. }
  1241. lua_pop (L, 1);
  1242. lua_settop (L, 0);
  1243. rspamd_redis_parse_classifier_opts (backend, st->classifier->cfg->opts, cfg);
  1244. stf->clcf->flags |= RSPAMD_FLAG_CLASSIFIER_INCREMENTING_BACKEND;
  1245. backend->stcf = stf;
  1246. st_elt = g_malloc0 (sizeof (*st_elt));
  1247. st_elt->ev_base = ctx->ev_base;
  1248. st_elt->ctx = backend;
  1249. backend->stat_elt = rspamd_stat_ctx_register_async (
  1250. rspamd_redis_async_stat_cb,
  1251. rspamd_redis_async_stat_fin,
  1252. st_elt,
  1253. REDIS_STAT_TIMEOUT);
  1254. st_elt->async = backend->stat_elt;
  1255. return (gpointer)backend;
  1256. }
  1257. gpointer
  1258. rspamd_redis_runtime (struct rspamd_task *task,
  1259. struct rspamd_statfile_config *stcf,
  1260. gboolean learn, gpointer c)
  1261. {
  1262. struct redis_stat_ctx *ctx = REDIS_CTX (c);
  1263. struct redis_stat_runtime *rt;
  1264. struct upstream *up;
  1265. struct upstream_list *ups;
  1266. char *object_expanded = NULL;
  1267. rspamd_inet_addr_t *addr;
  1268. g_assert (ctx != NULL);
  1269. g_assert (stcf != NULL);
  1270. if (learn) {
  1271. ups = rspamd_redis_get_servers (ctx, "write_servers");
  1272. if (!ups) {
  1273. msg_err_task ("no write servers defined for %s, cannot learn",
  1274. stcf->symbol);
  1275. return NULL;
  1276. }
  1277. up = rspamd_upstream_get (ups,
  1278. RSPAMD_UPSTREAM_MASTER_SLAVE,
  1279. NULL,
  1280. 0);
  1281. }
  1282. else {
  1283. ups = rspamd_redis_get_servers (ctx, "read_servers");
  1284. if (!ups) {
  1285. msg_err_task ("no read servers defined for %s, cannot stat",
  1286. stcf->symbol);
  1287. return NULL;
  1288. }
  1289. up = rspamd_upstream_get (ups,
  1290. RSPAMD_UPSTREAM_ROUND_ROBIN,
  1291. NULL,
  1292. 0);
  1293. }
  1294. if (up == NULL) {
  1295. msg_err_task ("no upstreams reachable");
  1296. return NULL;
  1297. }
  1298. if (rspamd_redis_expand_object (ctx->redis_object, ctx, task,
  1299. &object_expanded) == 0) {
  1300. msg_err_task ("expansion for learning failed for symbol %s "
  1301. "(maybe learning per user classifier with no user or recipient)",
  1302. stcf->symbol);
  1303. return NULL;
  1304. }
  1305. rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
  1306. rspamd_mempool_add_destructor (task->task_pool,
  1307. rspamd_gerror_free_maybe, &rt->err);
  1308. rt->selected = up;
  1309. rt->task = task;
  1310. rt->ctx = ctx;
  1311. rt->stcf = stcf;
  1312. rt->redis_object_expanded = object_expanded;
  1313. addr = rspamd_upstream_addr_next (up);
  1314. g_assert (addr != NULL);
  1315. if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
  1316. rt->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
  1317. }
  1318. else {
  1319. rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
  1320. rspamd_inet_address_get_port (addr));
  1321. }
  1322. if (rt->redis == NULL) {
  1323. msg_err_task ("cannot connect redis");
  1324. return NULL;
  1325. }
  1326. redisLibeventAttach (rt->redis, task->ev_base);
  1327. rspamd_redis_maybe_auth (ctx, rt->redis);
  1328. return rt;
  1329. }
  1330. void
  1331. rspamd_redis_close (gpointer p)
  1332. {
  1333. struct redis_stat_ctx *ctx = REDIS_CTX (p);
  1334. lua_State *L = ctx->L;
  1335. if (ctx->conf_ref) {
  1336. luaL_unref (L, LUA_REGISTRYINDEX, ctx->conf_ref);
  1337. }
  1338. g_free (ctx);
  1339. }
  1340. gboolean
  1341. rspamd_redis_process_tokens (struct rspamd_task *task,
  1342. GPtrArray *tokens,
  1343. gint id, gpointer p)
  1344. {
  1345. struct redis_stat_runtime *rt = REDIS_RUNTIME (p);
  1346. rspamd_fstring_t *query;
  1347. struct timeval tv;
  1348. gint ret;
  1349. const gchar *learned_key = "learns";
  1350. if (rspamd_session_blocked (task->s)) {
  1351. return FALSE;
  1352. }
  1353. if (tokens == NULL || tokens->len == 0 || rt->redis == NULL) {
  1354. return FALSE;
  1355. }
  1356. rt->id = id;
  1357. if (rt->ctx->new_schema) {
  1358. if (rt->ctx->stcf->is_spam) {
  1359. learned_key = "learns_spam";
  1360. }
  1361. else {
  1362. learned_key = "learns_ham";
  1363. }
  1364. }
  1365. if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
  1366. rt->redis_object_expanded, learned_key) == REDIS_OK) {
  1367. rspamd_session_add_event (task->s, rspamd_redis_fin, rt, M);
  1368. rt->has_event = TRUE;
  1369. if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
  1370. event_del (&rt->timeout_event);
  1371. }
  1372. event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt);
  1373. event_base_set (task->ev_base, &rt->timeout_event);
  1374. double_to_tv (rt->ctx->timeout, &tv);
  1375. event_add (&rt->timeout_event, &tv);
  1376. query = rspamd_redis_tokens_to_query (task, rt, tokens,
  1377. rt->ctx->new_schema ? "HGET" : "HMGET",
  1378. rt->redis_object_expanded, FALSE, -1,
  1379. rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER);
  1380. g_assert (query != NULL);
  1381. rspamd_mempool_add_destructor (task->task_pool,
  1382. (rspamd_mempool_destruct_t)rspamd_fstring_free, query);
  1383. ret = redisAsyncFormattedCommand (rt->redis, rspamd_redis_processed, rt,
  1384. query->str, query->len);
  1385. if (ret == REDIS_OK) {
  1386. return TRUE;
  1387. }
  1388. else {
  1389. msg_err_task ("call to redis failed: %s", rt->redis->errstr);
  1390. }
  1391. }
  1392. return FALSE;
  1393. }
  1394. gboolean
  1395. rspamd_redis_finalize_process (struct rspamd_task *task, gpointer runtime,
  1396. gpointer ctx)
  1397. {
  1398. struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
  1399. redisAsyncContext *redis;
  1400. if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
  1401. event_del (&rt->timeout_event);
  1402. }
  1403. if (rt->redis) {
  1404. redis = rt->redis;
  1405. rt->redis = NULL;
  1406. redisAsyncFree (redis);
  1407. }
  1408. if (rt->err) {
  1409. return FALSE;
  1410. }
  1411. return TRUE;
  1412. }
  1413. gboolean
  1414. rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
  1415. gint id, gpointer p)
  1416. {
  1417. struct redis_stat_runtime *rt = REDIS_RUNTIME (p);
  1418. struct upstream *up;
  1419. struct upstream_list *ups;
  1420. rspamd_inet_addr_t *addr;
  1421. struct timeval tv;
  1422. rspamd_fstring_t *query;
  1423. const gchar *redis_cmd;
  1424. rspamd_token_t *tok;
  1425. gint ret;
  1426. goffset off;
  1427. const gchar *learned_key = "learns";
  1428. if (rspamd_session_blocked (task->s)) {
  1429. return FALSE;
  1430. }
  1431. ups = rspamd_redis_get_servers (rt->ctx, "write_servers");
  1432. if (!ups) {
  1433. return FALSE;
  1434. }
  1435. up = rspamd_upstream_get (ups,
  1436. RSPAMD_UPSTREAM_MASTER_SLAVE,
  1437. NULL,
  1438. 0);
  1439. if (up == NULL) {
  1440. msg_err_task ("no upstreams reachable");
  1441. return FALSE;
  1442. }
  1443. rt->selected = up;
  1444. if (rt->ctx->new_schema) {
  1445. if (rt->ctx->stcf->is_spam) {
  1446. learned_key = "learns_spam";
  1447. }
  1448. else {
  1449. learned_key = "learns_ham";
  1450. }
  1451. }
  1452. addr = rspamd_upstream_addr_next (up);
  1453. g_assert (addr != NULL);
  1454. if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
  1455. rt->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
  1456. }
  1457. else {
  1458. rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
  1459. rspamd_inet_address_get_port (addr));
  1460. }
  1461. g_assert (rt->redis != NULL);
  1462. redisLibeventAttach (rt->redis, task->ev_base);
  1463. rspamd_redis_maybe_auth (rt->ctx, rt->redis);
  1464. /*
  1465. * Add the current key to the set of learned keys
  1466. */
  1467. redisAsyncCommand (rt->redis, NULL, NULL, "SADD %s_keys %s",
  1468. rt->stcf->symbol, rt->redis_object_expanded);
  1469. if (rt->ctx->new_schema) {
  1470. redisAsyncCommand (rt->redis, NULL, NULL, "HSET %s version 2",
  1471. rt->redis_object_expanded);
  1472. }
  1473. if (rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER) {
  1474. redis_cmd = "HINCRBY";
  1475. }
  1476. else {
  1477. redis_cmd = "HINCRBYFLOAT";
  1478. }
  1479. rt->id = id;
  1480. query = rspamd_redis_tokens_to_query (task, rt, tokens,
  1481. redis_cmd, rt->redis_object_expanded, TRUE, id,
  1482. rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER);
  1483. g_assert (query != NULL);
  1484. query->len = 0;
  1485. /*
  1486. * XXX:
  1487. * Dirty hack: we get a token and check if it's value is -1 or 1, so
  1488. * we could understand that we are learning or unlearning
  1489. */
  1490. tok = g_ptr_array_index (task->tokens, 0);
  1491. if (tok->values[id] > 0) {
  1492. rspamd_printf_fstring (&query, ""
  1493. "*4\r\n"
  1494. "$7\r\n"
  1495. "HINCRBY\r\n"
  1496. "$%d\r\n"
  1497. "%s\r\n"
  1498. "$%d\r\n"
  1499. "%s\r\n" /* Learned key */
  1500. "$1\r\n"
  1501. "1\r\n",
  1502. (gint)strlen (rt->redis_object_expanded),
  1503. rt->redis_object_expanded,
  1504. (gint)strlen (learned_key),
  1505. learned_key);
  1506. }
  1507. else {
  1508. rspamd_printf_fstring (&query, ""
  1509. "*4\r\n"
  1510. "$7\r\n"
  1511. "HINCRBY\r\n"
  1512. "$%d\r\n"
  1513. "%s\r\n"
  1514. "$%d\r\n"
  1515. "%s\r\n" /* Learned key */
  1516. "$2\r\n"
  1517. "-1\r\n",
  1518. (gint)strlen (rt->redis_object_expanded),
  1519. rt->redis_object_expanded,
  1520. (gint)strlen (learned_key),
  1521. learned_key);
  1522. }
  1523. ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  1524. query->str, query->len);
  1525. if (ret != REDIS_OK) {
  1526. msg_err_task ("call to redis failed: %s", rt->redis->errstr);
  1527. rspamd_fstring_free (query);
  1528. return FALSE;
  1529. }
  1530. off = query->len;
  1531. ret = rspamd_printf_fstring (&query, "*1\r\n$4\r\nEXEC\r\n");
  1532. ret = redisAsyncFormattedCommand (rt->redis, rspamd_redis_learned, rt,
  1533. query->str + off, ret);
  1534. rspamd_mempool_add_destructor (task->task_pool,
  1535. (rspamd_mempool_destruct_t)rspamd_fstring_free, query);
  1536. if (ret == REDIS_OK) {
  1537. /* Add signature if needed */
  1538. if (rt->ctx->enable_signatures) {
  1539. rspamd_redis_store_stat_signature (task, rt, tokens,
  1540. "RSIG");
  1541. }
  1542. rspamd_session_add_event (task->s, rspamd_redis_fin_learn, rt, M);
  1543. rt->has_event = TRUE;
  1544. /* Set timeout */
  1545. if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
  1546. event_del (&rt->timeout_event);
  1547. }
  1548. event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt);
  1549. event_base_set (task->ev_base, &rt->timeout_event);
  1550. double_to_tv (rt->ctx->timeout, &tv);
  1551. event_add (&rt->timeout_event, &tv);
  1552. return TRUE;
  1553. }
  1554. else {
  1555. msg_err_task ("call to redis failed: %s", rt->redis->errstr);
  1556. }
  1557. return FALSE;
  1558. }
  1559. gboolean
  1560. rspamd_redis_finalize_learn (struct rspamd_task *task, gpointer runtime,
  1561. gpointer ctx, GError **err)
  1562. {
  1563. struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
  1564. redisAsyncContext *redis;
  1565. if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
  1566. event_del (&rt->timeout_event);
  1567. }
  1568. if (rt->redis) {
  1569. redis = rt->redis;
  1570. rt->redis = NULL;
  1571. redisAsyncFree (redis);
  1572. }
  1573. if (rt->err) {
  1574. g_propagate_error (err, rt->err);
  1575. rt->err = NULL;
  1576. return FALSE;
  1577. }
  1578. return TRUE;
  1579. }
  1580. gulong
  1581. rspamd_redis_total_learns (struct rspamd_task *task, gpointer runtime,
  1582. gpointer ctx)
  1583. {
  1584. struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
  1585. return rt->learned;
  1586. }
  1587. gulong
  1588. rspamd_redis_inc_learns (struct rspamd_task *task, gpointer runtime,
  1589. gpointer ctx)
  1590. {
  1591. struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
  1592. /* XXX: may cause races */
  1593. return rt->learned + 1;
  1594. }
  1595. gulong
  1596. rspamd_redis_dec_learns (struct rspamd_task *task, gpointer runtime,
  1597. gpointer ctx)
  1598. {
  1599. struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
  1600. /* XXX: may cause races */
  1601. return rt->learned + 1;
  1602. }
  1603. gulong
  1604. rspamd_redis_learns (struct rspamd_task *task, gpointer runtime,
  1605. gpointer ctx)
  1606. {
  1607. struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
  1608. return rt->learned;
  1609. }
  1610. ucl_object_t *
  1611. rspamd_redis_get_stat (gpointer runtime,
  1612. gpointer ctx)
  1613. {
  1614. struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
  1615. struct rspamd_redis_stat_elt *st;
  1616. redisAsyncContext *redis;
  1617. if (rt->ctx->stat_elt) {
  1618. st = rt->ctx->stat_elt->ud;
  1619. if (rt->redis) {
  1620. redis = rt->redis;
  1621. rt->redis = NULL;
  1622. redisAsyncFree (redis);
  1623. }
  1624. if (st->stat) {
  1625. return ucl_object_ref (st->stat);
  1626. }
  1627. }
  1628. return NULL;
  1629. }
  1630. gpointer
  1631. rspamd_redis_load_tokenizer_config (gpointer runtime,
  1632. gsize *len)
  1633. {
  1634. return NULL;
  1635. }
  1636. #endif