Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

redis_backend.c 43KB


  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "rspamd.h"
  18. #include "stat_internal.h"
  19. #include "upstream.h"
  20. #include "lua/lua_common.h"
  21. #include "libserver/mempool_vars_internal.h"
  22. #ifdef WITH_HIREDIS
  23. #include "hiredis.h"
  24. #include "adapters/libevent.h"
  25. #include "ref.h"
  26. #define msg_debug_stat_redis(...) rspamd_conditional_debug_fast (NULL, NULL, \
  27. rspamd_stat_redis_log_id, "stat_redis", task->task_pool->tag.uid, \
  28. G_STRFUNC, \
  29. __VA_ARGS__)
  30. INIT_LOG_MODULE(stat_redis)
  31. #define REDIS_CTX(p) (struct redis_stat_ctx *)(p)
  32. #define REDIS_RUNTIME(p) (struct redis_stat_runtime *)(p)
  33. #define REDIS_BACKEND_TYPE "redis"
  34. #define REDIS_DEFAULT_PORT 6379
  35. #define REDIS_DEFAULT_OBJECT "%s%l"
  36. #define REDIS_DEFAULT_USERS_OBJECT "%s%l%r"
  37. #define REDIS_DEFAULT_TIMEOUT 0.5
  38. #define REDIS_STAT_TIMEOUT 30
  39. struct redis_stat_ctx {
  40. lua_State *L;
  41. struct rspamd_statfile_config *stcf;
  42. gint conf_ref;
  43. struct rspamd_stat_async_elt *stat_elt;
  44. const gchar *redis_object;
  45. const gchar *password;
  46. const gchar *dbname;
  47. gdouble timeout;
  48. gboolean enable_users;
  49. gboolean store_tokens;
  50. gboolean new_schema;
  51. gboolean enable_signatures;
  52. guint expiry;
  53. gint cbref_user;
  54. };
  55. enum rspamd_redis_connection_state {
  56. RSPAMD_REDIS_DISCONNECTED = 0,
  57. RSPAMD_REDIS_CONNECTED,
  58. RSPAMD_REDIS_REQUEST_SENT,
  59. RSPAMD_REDIS_TIMEDOUT,
  60. RSPAMD_REDIS_TERMINATED
  61. };
  62. struct redis_stat_runtime {
  63. struct redis_stat_ctx *ctx;
  64. struct rspamd_task *task;
  65. struct upstream *selected;
  66. struct event timeout_event;
  67. GArray *results;
  68. struct rspamd_statfile_config *stcf;
  69. gchar *redis_object_expanded;
  70. redisAsyncContext *redis;
  71. guint64 learned;
  72. gint id;
  73. gboolean has_event;
  74. GError *err;
  75. };
  76. /* Used to get statistics from redis */
  77. struct rspamd_redis_stat_cbdata;
  78. struct rspamd_redis_stat_elt {
  79. struct redis_stat_ctx *ctx;
  80. struct rspamd_stat_async_elt *async;
  81. struct event_base *ev_base;
  82. ucl_object_t *stat;
  83. struct rspamd_redis_stat_cbdata *cbdata;
  84. };
  85. struct rspamd_redis_stat_cbdata {
  86. struct rspamd_redis_stat_elt *elt;
  87. redisAsyncContext *redis;
  88. ucl_object_t *cur;
  89. GPtrArray *cur_keys;
  90. struct upstream *selected;
  91. guint inflight;
  92. gboolean wanna_die;
  93. };
  94. #define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt)
  95. static const gchar *M = "redis statistics";
  96. static GQuark
  97. rspamd_redis_stat_quark (void)
  98. {
  99. return g_quark_from_static_string (M);
  100. }
  101. static inline struct upstream_list *
  102. rspamd_redis_get_servers (struct redis_stat_ctx *ctx,
  103. const gchar *what)
  104. {
  105. lua_State *L = ctx->L;
  106. struct upstream_list *res;
  107. lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->conf_ref);
  108. lua_pushstring (L, what);
  109. lua_gettable (L, -2);
  110. res = *((struct upstream_list**)lua_touserdata (L, -1));
  111. lua_settop (L, 0);
  112. return res;
  113. }
  114. /*
  115. * Non-static for lua unit testing
  116. */
  117. gsize
  118. rspamd_redis_expand_object (const gchar *pattern,
  119. struct redis_stat_ctx *ctx,
  120. struct rspamd_task *task,
  121. gchar **target)
  122. {
  123. gsize tlen = 0;
  124. const gchar *p = pattern, *elt;
  125. gchar *d, *end;
  126. enum {
  127. just_char,
  128. percent_char,
  129. mod_char
  130. } state = just_char;
  131. struct rspamd_statfile_config *stcf;
  132. lua_State *L = NULL;
  133. struct rspamd_task **ptask;
  134. GString *tb;
  135. const gchar *rcpt = NULL;
  136. gint err_idx;
  137. g_assert (ctx != NULL);
  138. stcf = ctx->stcf;
  139. L = task->cfg->lua_state;
  140. if (ctx->enable_users) {
  141. if (ctx->cbref_user == -1) {
  142. rcpt = rspamd_task_get_principal_recipient (task);
  143. }
  144. else if (L) {
  145. /* Execute lua function to get userdata */
  146. lua_pushcfunction (L, &rspamd_lua_traceback);
  147. err_idx = lua_gettop (L);
  148. lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->cbref_user);
  149. ptask = lua_newuserdata (L, sizeof (struct rspamd_task *));
  150. *ptask = task;
  151. rspamd_lua_setclass (L, "rspamd{task}", -1);
  152. if (lua_pcall (L, 1, 1, err_idx) != 0) {
  153. tb = lua_touserdata (L, -1);
  154. msg_err_task ("call to user extraction script failed: %v", tb);
  155. g_string_free (tb, TRUE);
  156. }
  157. else {
  158. rcpt = rspamd_mempool_strdup (task->task_pool, lua_tostring (L, -1));
  159. }
  160. /* Result + error function */
  161. lua_pop (L, 2);
  162. }
  163. if (rcpt) {
  164. rspamd_mempool_set_variable (task->task_pool, "stat_user",
  165. (gpointer)rcpt, NULL);
  166. }
  167. }
  168. /* Length calculation */
  169. while (*p) {
  170. switch (state) {
  171. case just_char:
  172. if (*p == '%') {
  173. state = percent_char;
  174. }
  175. else {
  176. tlen ++;
  177. }
  178. p ++;
  179. break;
  180. case percent_char:
  181. switch (*p) {
  182. case '%':
  183. tlen ++;
  184. state = just_char;
  185. break;
  186. case 'u':
  187. elt = GET_TASK_ELT (task, user);
  188. if (elt) {
  189. tlen += strlen (elt);
  190. }
  191. break;
  192. case 'r':
  193. if (rcpt == NULL) {
  194. elt = rspamd_task_get_principal_recipient (task);
  195. }
  196. else {
  197. elt = rcpt;
  198. }
  199. if (elt) {
  200. tlen += strlen (elt);
  201. }
  202. break;
  203. case 'l':
  204. if (stcf->label) {
  205. tlen += strlen (stcf->label);
  206. }
  207. /* Label miss is OK */
  208. break;
  209. case 's':
  210. if (ctx->new_schema) {
  211. tlen += sizeof ("RS") - 1;
  212. }
  213. else {
  214. if (stcf->symbol) {
  215. tlen += strlen (stcf->symbol);
  216. }
  217. }
  218. break;
  219. default:
  220. state = just_char;
  221. tlen ++;
  222. break;
  223. }
  224. if (state == percent_char) {
  225. state = mod_char;
  226. }
  227. p ++;
  228. break;
  229. case mod_char:
  230. switch (*p) {
  231. case 'd':
  232. p ++;
  233. state = just_char;
  234. break;
  235. default:
  236. state = just_char;
  237. break;
  238. }
  239. break;
  240. }
  241. }
  242. if (target == NULL || task == NULL) {
  243. return -1;
  244. }
  245. *target = rspamd_mempool_alloc (task->task_pool, tlen + 1);
  246. d = *target;
  247. end = d + tlen + 1;
  248. d[tlen] = '\0';
  249. p = pattern;
  250. state = just_char;
  251. /* Expand string */
  252. while (*p && d < end) {
  253. switch (state) {
  254. case just_char:
  255. if (*p == '%') {
  256. state = percent_char;
  257. }
  258. else {
  259. *d++ = *p;
  260. }
  261. p ++;
  262. break;
  263. case percent_char:
  264. switch (*p) {
  265. case '%':
  266. *d++ = *p;
  267. state = just_char;
  268. break;
  269. case 'u':
  270. elt = GET_TASK_ELT (task, user);
  271. if (elt) {
  272. d += rspamd_strlcpy (d, elt, end - d);
  273. }
  274. break;
  275. case 'r':
  276. if (rcpt == NULL) {
  277. elt = rspamd_task_get_principal_recipient (task);
  278. }
  279. else {
  280. elt = rcpt;
  281. }
  282. if (elt) {
  283. d += rspamd_strlcpy (d, elt, end - d);
  284. }
  285. break;
  286. case 'l':
  287. if (stcf->label) {
  288. d += rspamd_strlcpy (d, stcf->label, end - d);
  289. }
  290. break;
  291. case 's':
  292. if (ctx->new_schema) {
  293. d += rspamd_strlcpy (d, "RS", end - d);
  294. }
  295. else {
  296. if (stcf->symbol) {
  297. d += rspamd_strlcpy (d, stcf->symbol, end - d);
  298. }
  299. }
  300. break;
  301. default:
  302. state = just_char;
  303. *d++ = *p;
  304. break;
  305. }
  306. if (state == percent_char) {
  307. state = mod_char;
  308. }
  309. p ++;
  310. break;
  311. case mod_char:
  312. switch (*p) {
  313. case 'd':
  314. /* TODO: not supported yet */
  315. p ++;
  316. state = just_char;
  317. break;
  318. default:
  319. state = just_char;
  320. break;
  321. }
  322. break;
  323. }
  324. }
  325. return tlen;
  326. }
  327. static void
  328. rspamd_redis_maybe_auth (struct redis_stat_ctx *ctx, redisAsyncContext *redis)
  329. {
  330. if (ctx->password) {
  331. redisAsyncCommand (redis, NULL, NULL, "AUTH %s", ctx->password);
  332. }
  333. if (ctx->dbname) {
  334. redisAsyncCommand (redis, NULL, NULL, "SELECT %s", ctx->dbname);
  335. }
  336. }
  337. static rspamd_fstring_t *
  338. rspamd_redis_tokens_to_query (struct rspamd_task *task,
  339. struct redis_stat_runtime *rt,
  340. GPtrArray *tokens,
  341. const gchar *command,
  342. const gchar *prefix,
  343. gboolean learn,
  344. gint idx,
  345. gboolean intvals)
  346. {
  347. rspamd_fstring_t *out;
  348. rspamd_token_t *tok;
  349. gchar n0[512], n1[64];
  350. guint i, l0, l1, cmd_len, prefix_len;
  351. gint ret;
  352. g_assert (tokens != NULL);
  353. cmd_len = strlen (command);
  354. prefix_len = strlen (prefix);
  355. out = rspamd_fstring_sized_new (1024);
  356. if (learn) {
  357. rspamd_printf_fstring (&out, "*1\r\n$5\r\nMULTI\r\n");
  358. ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  359. out->str, out->len);
  360. if (ret != REDIS_OK) {
  361. msg_err_task ("call to redis failed: %s", rt->redis->errstr);
  362. rspamd_fstring_free (out);
  363. return NULL;
  364. }
  365. out->len = 0;
  366. }
  367. else {
  368. if (rt->ctx->new_schema) {
  369. /* Multi + HGET */
  370. rspamd_printf_fstring (&out, "*1\r\n$5\r\nMULTI\r\n");
  371. ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  372. out->str, out->len);
  373. if (ret != REDIS_OK) {
  374. msg_err_task ("call to redis failed: %s", rt->redis->errstr);
  375. rspamd_fstring_free (out);
  376. return NULL;
  377. }
  378. out->len = 0;
  379. }
  380. else {
  381. rspamd_printf_fstring (&out, ""
  382. "*%d\r\n"
  383. "$%d\r\n"
  384. "%s\r\n"
  385. "$%d\r\n"
  386. "%s\r\n",
  387. (tokens->len + 2),
  388. cmd_len, command,
  389. prefix_len, prefix);
  390. }
  391. }
  392. for (i = 0; i < tokens->len; i ++) {
  393. tok = g_ptr_array_index (tokens, i);
  394. if (learn) {
  395. if (intvals) {
  396. l1 = rspamd_snprintf (n1, sizeof (n1), "%L",
  397. (gint64) tok->values[idx]);
  398. } else {
  399. l1 = rspamd_snprintf (n1, sizeof (n1), "%f",
  400. tok->values[idx]);
  401. }
  402. if (rt->ctx->new_schema) {
  403. /*
  404. * HINCRBY <prefix_token> <0|1> <value>
  405. */
  406. l0 = rspamd_snprintf (n0, sizeof (n0), "%*s_%uL",
  407. prefix_len, prefix,
  408. tok->data);
  409. rspamd_printf_fstring (&out, ""
  410. "*4\r\n"
  411. "$%d\r\n"
  412. "%s\r\n"
  413. "$%d\r\n"
  414. "%s\r\n"
  415. "$%d\r\n"
  416. "%s\r\n"
  417. "$%d\r\n"
  418. "%s\r\n",
  419. cmd_len, command,
  420. l0, n0,
  421. 1, rt->stcf->is_spam ? "S" : "H",
  422. l1, n1);
  423. }
  424. else {
  425. l0 = rspamd_snprintf (n0, sizeof (n0), "%uL", tok->data);
  426. /*
  427. * HINCRBY <prefix> <token> <value>
  428. */
  429. rspamd_printf_fstring (&out, ""
  430. "*4\r\n"
  431. "$%d\r\n"
  432. "%s\r\n"
  433. "$%d\r\n"
  434. "%s\r\n"
  435. "$%d\r\n"
  436. "%s\r\n"
  437. "$%d\r\n"
  438. "%s\r\n",
  439. cmd_len, command,
  440. prefix_len, prefix,
  441. l0, n0,
  442. l1, n1);
  443. }
  444. ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  445. out->str, out->len);
  446. if (ret != REDIS_OK) {
  447. msg_err_task ("call to redis failed: %s", rt->redis->errstr);
  448. rspamd_fstring_free (out);
  449. return NULL;
  450. }
  451. if (rt->ctx->store_tokens) {
  452. if (!rt->ctx->new_schema) {
  453. /*
  454. * We store tokens in form
  455. * HSET prefix_tokens <token_id> "token_string"
  456. * ZINCRBY prefix_z 1.0 <token_id>
  457. */
  458. if (tok->t1 && tok->t2) {
  459. redisAsyncCommand (rt->redis, NULL, NULL,
  460. "HSET %b_tokens %b %b:%b",
  461. prefix, (size_t) prefix_len,
  462. n0, (size_t) l0,
  463. tok->t1->stemmed.begin, tok->t1->stemmed.len,
  464. tok->t2->stemmed.begin, tok->t2->stemmed.len);
  465. } else if (tok->t1) {
  466. redisAsyncCommand (rt->redis, NULL, NULL,
  467. "HSET %b_tokens %b %b",
  468. prefix, (size_t) prefix_len,
  469. n0, (size_t) l0,
  470. tok->t1->stemmed.begin, tok->t1->stemmed.len);
  471. }
  472. }
  473. else {
  474. /*
  475. * We store tokens in form
  476. * HSET <token_id> "tokens" "token_string"
  477. * ZINCRBY prefix_z 1.0 <token_id>
  478. */
  479. if (tok->t1 && tok->t2) {
  480. redisAsyncCommand (rt->redis, NULL, NULL,
  481. "HSET %b %s %b:%b",
  482. n0, (size_t) l0,
  483. "tokens",
  484. tok->t1->stemmed.begin, tok->t1->stemmed.len,
  485. tok->t2->stemmed.begin, tok->t2->stemmed.len);
  486. } else if (tok->t1) {
  487. redisAsyncCommand (rt->redis, NULL, NULL,
  488. "HSET %b %s %b",
  489. n0, (size_t) l0,
  490. "tokens",
  491. tok->t1->stemmed.begin, tok->t1->stemmed.len);
  492. }
  493. }
  494. redisAsyncCommand (rt->redis, NULL, NULL,
  495. "ZINCRBY %b_z %b %b",
  496. prefix, (size_t)prefix_len,
  497. n1, (size_t)l1,
  498. n0, (size_t)l0);
  499. }
  500. if (rt->ctx->new_schema && rt->ctx->expiry > 0) {
  501. out->len = 0;
  502. l1 = rspamd_snprintf (n1, sizeof (n1), "%d",
  503. rt->ctx->expiry);
  504. rspamd_printf_fstring (&out, ""
  505. "*3\r\n"
  506. "$6\r\n"
  507. "EXPIRE\r\n"
  508. "$%d\r\n"
  509. "%s\r\n"
  510. "$%d\r\n"
  511. "%s\r\n",
  512. l0, n0,
  513. l1, n1);
  514. redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  515. out->str, out->len);
  516. }
  517. out->len = 0;
  518. }
  519. else {
  520. if (rt->ctx->new_schema) {
  521. l0 = rspamd_snprintf (n0, sizeof (n0), "%*s_%uL",
  522. prefix_len, prefix,
  523. tok->data);
  524. rspamd_printf_fstring (&out, ""
  525. "*3\r\n"
  526. "$%d\r\n"
  527. "%s\r\n"
  528. "$%d\r\n"
  529. "%s\r\n"
  530. "$%d\r\n"
  531. "%s\r\n",
  532. cmd_len, command,
  533. l0, n0,
  534. 1, rt->stcf->is_spam ? "S" : "H");
  535. ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  536. out->str, out->len);
  537. if (ret != REDIS_OK) {
  538. msg_err_task ("call to redis failed: %s", rt->redis->errstr);
  539. rspamd_fstring_free (out);
  540. return NULL;
  541. }
  542. out->len = 0;
  543. }
  544. else {
  545. l0 = rspamd_snprintf (n0, sizeof (n0), "%uL", tok->data);
  546. rspamd_printf_fstring (&out, ""
  547. "$%d\r\n"
  548. "%s\r\n", l0, n0);
  549. }
  550. }
  551. }
  552. if (!learn && rt->ctx->new_schema) {
  553. rspamd_printf_fstring (&out, "*1\r\n$4\r\nEXEC\r\n");
  554. }
  555. return out;
  556. }
  557. static void
  558. rspamd_redis_store_stat_signature (struct rspamd_task *task,
  559. struct redis_stat_runtime *rt,
  560. GPtrArray *tokens,
  561. const gchar *prefix)
  562. {
  563. gchar *sig, keybuf[512], nbuf[64];
  564. rspamd_token_t *tok;
  565. guint i, blen, klen;
  566. rspamd_fstring_t *out;
  567. out = rspamd_fstring_sized_new (1024);
  568. sig = rspamd_mempool_get_variable (task->task_pool,
  569. RSPAMD_MEMPOOL_STAT_SIGNATURE);
  570. if (sig == NULL) {
  571. msg_err_task ("cannot get bayes signature");
  572. return;
  573. }
  574. klen = rspamd_snprintf (keybuf, sizeof (keybuf), "%s_%s_%s",
  575. prefix, sig, rt->stcf->is_spam ? "S" : "H");
  576. out->len = 0;
  577. /* Cleanup key */
  578. rspamd_printf_fstring (&out, ""
  579. "*2\r\n"
  580. "$3\r\n"
  581. "DEL\r\n"
  582. "$%d\r\n"
  583. "%s\r\n",
  584. klen, keybuf);
  585. redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  586. out->str, out->len);
  587. out->len = 0;
  588. rspamd_printf_fstring (&out, ""
  589. "*%d\r\n"
  590. "$5\r\n"
  591. "LPUSH\r\n"
  592. "$%d\r\n"
  593. "%s\r\n",
  594. tokens->len + 2,
  595. klen, keybuf);
  596. PTR_ARRAY_FOREACH (tokens, i, tok) {
  597. blen = rspamd_snprintf (nbuf, sizeof (nbuf), "%uL", tok->data);
  598. rspamd_printf_fstring (&out, ""
  599. "$%d\r\n"
  600. "%s\r\n", blen, nbuf);
  601. }
  602. redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  603. out->str, out->len);
  604. out->len = 0;
  605. if (rt->ctx->expiry > 0) {
  606. out->len = 0;
  607. blen = rspamd_snprintf (nbuf, sizeof (nbuf), "%d",
  608. rt->ctx->expiry);
  609. rspamd_printf_fstring (&out, ""
  610. "*3\r\n"
  611. "$6\r\n"
  612. "EXPIRE\r\n"
  613. "$%d\r\n"
  614. "%s\r\n"
  615. "$%d\r\n"
  616. "%s\r\n",
  617. klen, keybuf,
  618. blen, nbuf);
  619. redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  620. out->str, out->len);
  621. }
  622. rspamd_fstring_free (out);
  623. }
  624. static void
  625. rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata)
  626. {
  627. guint i;
  628. gchar *k;
  629. if (cbdata && !cbdata->wanna_die) {
  630. /* Avoid double frees */
  631. cbdata->wanna_die = TRUE;
  632. redisAsyncFree (cbdata->redis);
  633. for (i = 0; i < cbdata->cur_keys->len; i ++) {
  634. k = g_ptr_array_index (cbdata->cur_keys, i);
  635. g_free (k);
  636. }
  637. g_ptr_array_free (cbdata->cur_keys, TRUE);
  638. if (cbdata->elt) {
  639. cbdata->elt->cbdata = NULL;
  640. /* Re-enable parent event */
  641. cbdata->elt->async->enabled = TRUE;
  642. /* Replace ucl object */
  643. if (cbdata->cur) {
  644. if (cbdata->elt->stat) {
  645. ucl_object_unref (cbdata->elt->stat);
  646. }
  647. cbdata->elt->stat = cbdata->cur;
  648. cbdata->cur = NULL;
  649. }
  650. }
  651. if (cbdata->cur) {
  652. ucl_object_unref (cbdata->cur);
  653. }
  654. g_free (cbdata);
  655. }
  656. }
  657. /* Called when we get number of learns for a specific key */
  658. static void
  659. rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv)
  660. {
  661. struct rspamd_redis_stat_cbdata *cbdata = priv;
  662. redisReply *reply = r;
  663. ucl_object_t *obj;
  664. gulong num = 0;
  665. if (cbdata->wanna_die) {
  666. return;
  667. }
  668. cbdata->inflight --;
  669. if (c->err == 0 && r != NULL) {
  670. if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
  671. num = reply->integer;
  672. }
  673. else if (reply->type == REDIS_REPLY_STRING) {
  674. rspamd_strtoul (reply->str, reply->len, &num);
  675. }
  676. obj = (ucl_object_t *) ucl_object_lookup (cbdata->cur, "revision");
  677. if (obj) {
  678. obj->value.iv += num;
  679. }
  680. }
  681. if (cbdata->inflight == 0) {
  682. rspamd_redis_async_cbdata_cleanup (cbdata);
  683. }
  684. }
  685. /* Called when we get number of elements for a specific key */
  686. static void
  687. rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv)
  688. {
  689. struct rspamd_redis_stat_cbdata *cbdata = priv;
  690. redisReply *reply = r;
  691. ucl_object_t *obj;
  692. glong num = 0;
  693. if (cbdata->wanna_die) {
  694. return;
  695. }
  696. cbdata->inflight --;
  697. if (c->err == 0 && r != NULL) {
  698. if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
  699. num = reply->integer;
  700. }
  701. else if (reply->type == REDIS_REPLY_STRING) {
  702. rspamd_strtol (reply->str, reply->len, &num);
  703. }
  704. if (num < 0) {
  705. msg_err ("bad learns count: %L", (gint64)num);
  706. num = 0;
  707. }
  708. obj = (ucl_object_t *)ucl_object_lookup (cbdata->cur, "used");
  709. if (obj) {
  710. obj->value.iv += num;
  711. }
  712. obj = (ucl_object_t *)ucl_object_lookup (cbdata->cur, "total");
  713. if (obj) {
  714. obj->value.iv += num;
  715. }
  716. obj = (ucl_object_t *)ucl_object_lookup (cbdata->cur, "size");
  717. if (obj) {
  718. /* Size of key + size of int64_t */
  719. obj->value.iv += num * (sizeof (G_STRINGIFY (G_MAXINT64)) +
  720. sizeof (guint64) + sizeof (gpointer));
  721. }
  722. }
  723. if (cbdata->inflight == 0) {
  724. rspamd_redis_async_cbdata_cleanup (cbdata);
  725. }
  726. }
  727. /* Called when we have connected to the redis server and got keys to check */
  728. static void
  729. rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
  730. {
  731. struct rspamd_redis_stat_cbdata *cbdata = priv;
  732. redisReply *reply = r, *elt;
  733. gchar **pk, *k;
  734. guint i, processed = 0;
  735. if (cbdata->wanna_die) {
  736. return;
  737. }
  738. cbdata->inflight --;
  739. if (c->err == 0 && r != NULL) {
  740. if (reply->type == REDIS_REPLY_ARRAY) {
  741. g_ptr_array_set_size (cbdata->cur_keys, reply->elements);
  742. for (i = 0; i < reply->elements; i ++) {
  743. elt = reply->element[i];
  744. if (elt->type == REDIS_REPLY_STRING) {
  745. pk = (gchar **)&g_ptr_array_index (cbdata->cur_keys, i);
  746. *pk = g_malloc (elt->len + 1);
  747. rspamd_strlcpy (*pk, elt->str, elt->len + 1);
  748. processed ++;
  749. }
  750. }
  751. if (processed) {
  752. for (i = 0; i < cbdata->cur_keys->len; i ++) {
  753. k = (gchar *)g_ptr_array_index (cbdata->cur_keys, i);
  754. if (k) {
  755. const gchar *learned_key = "learns";
  756. if (cbdata->elt->ctx->new_schema) {
  757. if (cbdata->elt->ctx->stcf->is_spam) {
  758. learned_key = "learns_spam";
  759. }
  760. else {
  761. learned_key = "learns_ham";
  762. }
  763. redisAsyncCommand (cbdata->redis,
  764. rspamd_redis_stat_learns,
  765. cbdata,
  766. "HGET %s %s",
  767. k, learned_key);
  768. cbdata->inflight += 1;
  769. }
  770. else {
  771. redisAsyncCommand (cbdata->redis,
  772. rspamd_redis_stat_key,
  773. cbdata,
  774. "HLEN %s",
  775. k);
  776. redisAsyncCommand (cbdata->redis,
  777. rspamd_redis_stat_learns,
  778. cbdata,
  779. "HGET %s %s",
  780. k, learned_key);
  781. cbdata->inflight += 2;
  782. }
  783. }
  784. }
  785. }
  786. }
  787. /* Set up the required keys */
  788. ucl_object_insert_key (cbdata->cur,
  789. ucl_object_typed_new (UCL_INT), "revision", 0, false);
  790. ucl_object_insert_key (cbdata->cur,
  791. ucl_object_typed_new (UCL_INT), "used", 0, false);
  792. ucl_object_insert_key (cbdata->cur,
  793. ucl_object_typed_new (UCL_INT), "total", 0, false);
  794. ucl_object_insert_key (cbdata->cur,
  795. ucl_object_typed_new (UCL_INT), "size", 0, false);
  796. ucl_object_insert_key (cbdata->cur,
  797. ucl_object_fromstring (cbdata->elt->ctx->stcf->symbol),
  798. "symbol", 0, false);
  799. ucl_object_insert_key (cbdata->cur, ucl_object_fromstring ("redis"),
  800. "type", 0, false);
  801. ucl_object_insert_key (cbdata->cur, ucl_object_fromint (0),
  802. "languages", 0, false);
  803. ucl_object_insert_key (cbdata->cur, ucl_object_fromint (processed),
  804. "users", 0, false);
  805. rspamd_upstream_ok (cbdata->selected);
  806. if (cbdata->inflight == 0) {
  807. rspamd_redis_async_cbdata_cleanup (cbdata);
  808. }
  809. }
  810. else {
  811. if (c->errstr) {
  812. msg_err ("cannot get keys to gather stat: %s", c->errstr);
  813. }
  814. else {
  815. msg_err ("cannot get keys to gather stat: unknown error");
  816. }
  817. rspamd_upstream_fail (cbdata->selected, FALSE);
  818. rspamd_redis_async_cbdata_cleanup (cbdata);
  819. }
  820. }
  821. static void
  822. rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
  823. {
  824. struct redis_stat_ctx *ctx;
  825. struct rspamd_redis_stat_elt *redis_elt = elt->ud;
  826. struct rspamd_redis_stat_cbdata *cbdata;
  827. rspamd_inet_addr_t *addr;
  828. struct upstream_list *ups;
  829. g_assert (redis_elt != NULL);
  830. ctx = redis_elt->ctx;
  831. if (redis_elt->cbdata) {
  832. /* We have some other process pending */
  833. rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
  834. }
  835. /* Disable further events unless needed */
  836. elt->enabled = FALSE;
  837. ups = rspamd_redis_get_servers (ctx, "read_servers");
  838. if (!ups) {
  839. return;
  840. }
  841. cbdata = g_malloc0 (sizeof (*cbdata));
  842. cbdata->selected = rspamd_upstream_get (ups,
  843. RSPAMD_UPSTREAM_ROUND_ROBIN,
  844. NULL,
  845. 0);
  846. g_assert (cbdata->selected != NULL);
  847. addr = rspamd_upstream_addr (cbdata->selected);
  848. g_assert (addr != NULL);
  849. if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
  850. cbdata->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
  851. }
  852. else {
  853. cbdata->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
  854. rspamd_inet_address_get_port (addr));
  855. }
  856. g_assert (cbdata->redis != NULL);
  857. redisLibeventAttach (cbdata->redis, redis_elt->ev_base);
  858. cbdata->inflight = 1;
  859. cbdata->cur = ucl_object_typed_new (UCL_OBJECT);
  860. cbdata->elt = redis_elt;
  861. cbdata->cur_keys = g_ptr_array_new ();
  862. redis_elt->cbdata = cbdata;
  863. /* XXX: deal with timeouts maybe */
  864. /* Get keys in redis that match our symbol */
  865. rspamd_redis_maybe_auth (ctx, cbdata->redis);
  866. redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, cbdata,
  867. "SMEMBERS %s_keys",
  868. ctx->stcf->symbol);
  869. }
  870. static void
  871. rspamd_redis_async_stat_fin (struct rspamd_stat_async_elt *elt, gpointer d)
  872. {
  873. struct rspamd_redis_stat_elt *redis_elt = elt->ud;
  874. rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
  875. }
  876. /* Called on connection termination */
  877. static void
  878. rspamd_redis_fin (gpointer data)
  879. {
  880. struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
  881. redisAsyncContext *redis;
  882. rt->has_event = FALSE;
  883. /* Stop timeout */
  884. if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
  885. event_del (&rt->timeout_event);
  886. }
  887. if (rt->redis) {
  888. redis = rt->redis;
  889. rt->redis = NULL;
  890. /* This calls for all callbacks pending */
  891. redisAsyncFree (redis);
  892. }
  893. }
  894. static void
  895. rspamd_redis_fin_learn (gpointer data)
  896. {
  897. struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
  898. redisAsyncContext *redis;
  899. rt->has_event = FALSE;
  900. /* Stop timeout */
  901. if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
  902. event_del (&rt->timeout_event);
  903. }
  904. if (rt->redis) {
  905. redis = rt->redis;
  906. rt->redis = NULL;
  907. /* This calls for all callbacks pending */
  908. redisAsyncFree (redis);
  909. }
  910. }
  911. static void
  912. rspamd_redis_timeout (gint fd, short what, gpointer d)
  913. {
  914. struct redis_stat_runtime *rt = REDIS_RUNTIME (d);
  915. struct rspamd_task *task;
  916. redisAsyncContext *redis;
  917. task = rt->task;
  918. msg_err_task_check ("connection to redis server %s timed out",
  919. rspamd_upstream_name (rt->selected));
  920. rspamd_upstream_fail (rt->selected, FALSE);
  921. if (rt->redis) {
  922. redis = rt->redis;
  923. rt->redis = NULL;
  924. /* This calls for all callbacks pending */
  925. redisAsyncFree (redis);
  926. }
  927. if (!rt->err) {
  928. g_set_error (&rt->err, rspamd_redis_stat_quark (), ETIMEDOUT,
  929. "error getting reply from redis server %s: timeout",
  930. rspamd_upstream_name (rt->selected));
  931. }
  932. }
  933. /* Called when we have connected to the redis server and got stats */
  934. static void
  935. rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
  936. {
  937. struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
  938. redisReply *reply = r;
  939. struct rspamd_task *task;
  940. glong val = 0;
  941. task = rt->task;
  942. if (c->err == 0) {
  943. if (r != NULL) {
  944. if (G_UNLIKELY (reply->type == REDIS_REPLY_INTEGER)) {
  945. val = reply->integer;
  946. }
  947. else if (reply->type == REDIS_REPLY_STRING) {
  948. rspamd_strtol (reply->str, reply->len, &val);
  949. }
  950. else {
  951. if (reply->type != REDIS_REPLY_NIL) {
  952. msg_err_task ("bad learned type for %s: %s, nil expected",
  953. rt->stcf->symbol,
  954. rspamd_redis_type_to_string (reply->type));
  955. }
  956. val = 0;
  957. }
  958. if (val < 0) {
  959. msg_warn_task ("invalid number of learns for %s: %L",
  960. rt->stcf->symbol, val);
  961. val = 0;
  962. }
  963. rt->learned = val;
  964. msg_debug_stat_redis ("connected to redis server, tokens learned for %s: %uL",
  965. rt->redis_object_expanded, rt->learned);
  966. rspamd_upstream_ok (rt->selected);
  967. }
  968. }
  969. else {
  970. msg_err_task ("error getting reply from redis server %s: %s",
  971. rspamd_upstream_name (rt->selected), c->errstr);
  972. rspamd_upstream_fail (rt->selected, FALSE);
  973. if (!rt->err) {
  974. g_set_error (&rt->err, rspamd_redis_stat_quark (), c->err,
  975. "error getting reply from redis server %s: %s",
  976. rspamd_upstream_name (rt->selected), c->errstr);
  977. }
  978. }
  979. }
  980. /* Called when we have received tokens values from redis */
  981. static void
  982. rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv)
  983. {
  984. struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
  985. redisReply *reply = r, *elt;
  986. struct rspamd_task *task;
  987. rspamd_token_t *tok;
  988. guint i, processed = 0, found = 0;
  989. gulong val;
  990. gdouble float_val;
  991. task = rt->task;
  992. if (c->err == 0) {
  993. if (r != NULL) {
  994. if (reply->type == REDIS_REPLY_ARRAY) {
  995. if (reply->elements == task->tokens->len) {
  996. for (i = 0; i < reply->elements; i ++) {
  997. tok = g_ptr_array_index (task->tokens, i);
  998. elt = reply->element[i];
  999. if (G_UNLIKELY (elt->type == REDIS_REPLY_INTEGER)) {
  1000. tok->values[rt->id] = elt->integer;
  1001. found ++;
  1002. }
  1003. else if (elt->type == REDIS_REPLY_STRING) {
  1004. if (rt->stcf->clcf->flags &
  1005. RSPAMD_FLAG_CLASSIFIER_INTEGER) {
  1006. rspamd_strtoul (elt->str, elt->len, &val);
  1007. tok->values[rt->id] = val;
  1008. }
  1009. else {
  1010. float_val = strtod (elt->str, NULL);
  1011. tok->values[rt->id] = float_val;
  1012. }
  1013. found ++;
  1014. }
  1015. else {
  1016. tok->values[rt->id] = 0;
  1017. }
  1018. processed ++;
  1019. }
  1020. if (rt->stcf->is_spam) {
  1021. task->flags |= RSPAMD_TASK_FLAG_HAS_SPAM_TOKENS;
  1022. }
  1023. else {
  1024. task->flags |= RSPAMD_TASK_FLAG_HAS_HAM_TOKENS;
  1025. }
  1026. }
  1027. else {
  1028. msg_err_task_check ("got invalid length of reply vector from redis: "
  1029. "%d, expected: %d",
  1030. (gint)reply->elements,
  1031. (gint)task->tokens->len);
  1032. }
  1033. }
  1034. else {
  1035. msg_err_task_check ("got invalid reply from redis: %s, array expected",
  1036. rspamd_redis_type_to_string (reply->type));
  1037. }
  1038. msg_debug_stat_redis ("received tokens for %s: %d processed, %d found",
  1039. rt->redis_object_expanded, processed, found);
  1040. rspamd_upstream_ok (rt->selected);
  1041. }
  1042. }
  1043. else {
  1044. msg_err_task ("error getting reply from redis server %s: %s",
  1045. rspamd_upstream_name (rt->selected), c->errstr);
  1046. if (rt->redis) {
  1047. rspamd_upstream_fail (rt->selected, FALSE);
  1048. }
  1049. if (!rt->err) {
  1050. g_set_error (&rt->err, rspamd_redis_stat_quark (), c->err,
  1051. "cannot get values: error getting reply from redis server %s: %s",
  1052. rspamd_upstream_name (rt->selected), c->errstr);
  1053. }
  1054. }
  1055. if (rt->has_event) {
  1056. rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
  1057. }
  1058. }
  1059. /* Called when we have set tokens during learning */
  1060. static void
  1061. rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv)
  1062. {
  1063. struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
  1064. struct rspamd_task *task;
  1065. task = rt->task;
  1066. if (c->err == 0) {
  1067. rspamd_upstream_ok (rt->selected);
  1068. }
  1069. else {
  1070. msg_err_task_check ("error getting reply from redis server %s: %s",
  1071. rspamd_upstream_name (rt->selected), c->errstr);
  1072. if (rt->redis) {
  1073. rspamd_upstream_fail (rt->selected, FALSE);
  1074. }
  1075. if (!rt->err) {
  1076. g_set_error (&rt->err, rspamd_redis_stat_quark (), c->err,
  1077. "cannot get learned: error getting reply from redis server %s: %s",
  1078. rspamd_upstream_name (rt->selected), c->errstr);
  1079. }
  1080. }
  1081. if (rt->has_event) {
  1082. rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt);
  1083. }
  1084. }
  1085. static void
  1086. rspamd_redis_parse_classifier_opts (struct redis_stat_ctx *backend,
  1087. const ucl_object_t *obj,
  1088. struct rspamd_config *cfg)
  1089. {
  1090. const gchar *lua_script;
  1091. const ucl_object_t *elt, *users_enabled;
  1092. users_enabled = ucl_object_lookup_any (obj, "per_user",
  1093. "users_enabled", NULL);
  1094. if (users_enabled != NULL) {
  1095. if (ucl_object_type (users_enabled) == UCL_BOOLEAN) {
  1096. backend->enable_users = ucl_object_toboolean (users_enabled);
  1097. backend->cbref_user = -1;
  1098. }
  1099. else if (ucl_object_type (users_enabled) == UCL_STRING) {
  1100. lua_script = ucl_object_tostring (users_enabled);
  1101. if (luaL_dostring (cfg->lua_state, lua_script) != 0) {
  1102. msg_err_config ("cannot execute lua script for users "
  1103. "extraction: %s", lua_tostring (cfg->lua_state, -1));
  1104. }
  1105. else {
  1106. if (lua_type (cfg->lua_state, -1) == LUA_TFUNCTION) {
  1107. backend->enable_users = TRUE;
  1108. backend->cbref_user = luaL_ref (cfg->lua_state,
  1109. LUA_REGISTRYINDEX);
  1110. }
  1111. else {
  1112. msg_err_config ("lua script must return "
  1113. "function(task) and not %s",
  1114. lua_typename (cfg->lua_state, lua_type (
  1115. cfg->lua_state, -1)));
  1116. }
  1117. }
  1118. }
  1119. }
  1120. else {
  1121. backend->enable_users = FALSE;
  1122. backend->cbref_user = -1;
  1123. }
  1124. elt = ucl_object_lookup (obj, "prefix");
  1125. if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
  1126. /* Default non-users statistics */
  1127. if (backend->enable_users || backend->cbref_user != -1) {
  1128. backend->redis_object = REDIS_DEFAULT_USERS_OBJECT;
  1129. }
  1130. else {
  1131. backend->redis_object = REDIS_DEFAULT_OBJECT;
  1132. }
  1133. }
  1134. else {
  1135. /* XXX: sanity check */
  1136. backend->redis_object = ucl_object_tostring (elt);
  1137. }
  1138. elt = ucl_object_lookup (obj, "store_tokens");
  1139. if (elt) {
  1140. backend->store_tokens = ucl_object_toboolean (elt);
  1141. }
  1142. else {
  1143. backend->store_tokens = FALSE;
  1144. }
  1145. elt = ucl_object_lookup (obj, "new_schema");
  1146. if (elt) {
  1147. backend->new_schema = ucl_object_toboolean (elt);
  1148. }
  1149. else {
  1150. backend->new_schema = FALSE;
  1151. msg_warn_config ("you are using old bayes schema for redis statistics, "
  1152. "please consider converting it to a new one "
  1153. "by using 'rspamadm configwizard statistics'");
  1154. }
  1155. elt = ucl_object_lookup (obj, "signatures");
  1156. if (elt) {
  1157. backend->enable_signatures = ucl_object_toboolean (elt);
  1158. }
  1159. else {
  1160. backend->enable_signatures = FALSE;
  1161. }
  1162. elt = ucl_object_lookup_any (obj, "expiry", "expire", NULL);
  1163. if (elt) {
  1164. backend->expiry = ucl_object_toint (elt);
  1165. }
  1166. else {
  1167. backend->expiry = 0;
  1168. }
  1169. }
  1170. gpointer
  1171. rspamd_redis_init (struct rspamd_stat_ctx *ctx,
  1172. struct rspamd_config *cfg, struct rspamd_statfile *st)
  1173. {
  1174. struct redis_stat_ctx *backend;
  1175. struct rspamd_statfile_config *stf = st->stcf;
  1176. struct rspamd_redis_stat_elt *st_elt;
  1177. const ucl_object_t *obj;
  1178. gboolean ret = FALSE;
  1179. gint conf_ref = -1;
  1180. lua_State *L = (lua_State *)cfg->lua_state;
  1181. backend = g_malloc0 (sizeof (*backend));
  1182. backend->L = L;
  1183. backend->timeout = REDIS_DEFAULT_TIMEOUT;
  1184. /* First search in backend configuration */
  1185. obj = ucl_object_lookup (st->classifier->cfg->opts, "backend");
  1186. if (obj != NULL && ucl_object_type (obj) == UCL_OBJECT) {
  1187. ret = rspamd_lua_try_load_redis (L, obj, cfg, &conf_ref);
  1188. }
  1189. /* Now try statfiles config */
  1190. if (!ret && stf->opts) {
  1191. ret = rspamd_lua_try_load_redis (L, stf->opts, cfg, &conf_ref);
  1192. }
  1193. /* Now try classifier config */
  1194. if (!ret && st->classifier->cfg->opts) {
  1195. ret = rspamd_lua_try_load_redis (L, st->classifier->cfg->opts, cfg, &conf_ref);
  1196. }
  1197. /* Now try global redis settings */
  1198. if (!ret) {
  1199. obj = ucl_object_lookup (cfg->rcl_obj, "redis");
  1200. if (obj) {
  1201. const ucl_object_t *specific_obj;
  1202. specific_obj = ucl_object_lookup (obj, "statistics");
  1203. if (specific_obj) {
  1204. ret = rspamd_lua_try_load_redis (L,
  1205. specific_obj, cfg, &conf_ref);
  1206. }
  1207. else {
  1208. ret = rspamd_lua_try_load_redis (L,
  1209. obj, cfg, &conf_ref);
  1210. }
  1211. }
  1212. }
  1213. if (!ret) {
  1214. msg_err_config ("cannot init redis backend for %s", stf->symbol);
  1215. g_free (backend);
  1216. return NULL;
  1217. }
  1218. backend->conf_ref = conf_ref;
  1219. /* Check some common table values */
  1220. lua_rawgeti (L, LUA_REGISTRYINDEX, conf_ref);
  1221. lua_pushstring (L, "timeout");
  1222. lua_gettable (L, -2);
  1223. if (lua_type (L, -1) == LUA_TNUMBER) {
  1224. backend->timeout = lua_tonumber (L, -1);
  1225. }
  1226. lua_pop (L, 1);
  1227. lua_pushstring (L, "db");
  1228. lua_gettable (L, -2);
  1229. if (lua_type (L, -1) == LUA_TSTRING) {
  1230. backend->dbname = rspamd_mempool_strdup (cfg->cfg_pool,
  1231. lua_tostring (L, -1));
  1232. }
  1233. lua_pop (L, 1);
  1234. lua_pushstring (L, "password");
  1235. lua_gettable (L, -2);
  1236. if (lua_type (L, -1) == LUA_TSTRING) {
  1237. backend->password = rspamd_mempool_strdup (cfg->cfg_pool,
  1238. lua_tostring (L, -1));
  1239. }
  1240. lua_pop (L, 1);
  1241. lua_settop (L, 0);
  1242. rspamd_redis_parse_classifier_opts (backend, st->classifier->cfg->opts, cfg);
  1243. stf->clcf->flags |= RSPAMD_FLAG_CLASSIFIER_INCREMENTING_BACKEND;
  1244. backend->stcf = stf;
  1245. st_elt = g_malloc0 (sizeof (*st_elt));
  1246. st_elt->ev_base = ctx->ev_base;
  1247. st_elt->ctx = backend;
  1248. backend->stat_elt = rspamd_stat_ctx_register_async (
  1249. rspamd_redis_async_stat_cb,
  1250. rspamd_redis_async_stat_fin,
  1251. st_elt,
  1252. REDIS_STAT_TIMEOUT);
  1253. st_elt->async = backend->stat_elt;
  1254. return (gpointer)backend;
  1255. }
  1256. gpointer
  1257. rspamd_redis_runtime (struct rspamd_task *task,
  1258. struct rspamd_statfile_config *stcf,
  1259. gboolean learn, gpointer c)
  1260. {
  1261. struct redis_stat_ctx *ctx = REDIS_CTX (c);
  1262. struct redis_stat_runtime *rt;
  1263. struct upstream *up;
  1264. struct upstream_list *ups;
  1265. char *object_expanded = NULL;
  1266. rspamd_inet_addr_t *addr;
  1267. g_assert (ctx != NULL);
  1268. g_assert (stcf != NULL);
  1269. if (learn) {
  1270. ups = rspamd_redis_get_servers (ctx, "write_servers");
  1271. if (!ups) {
  1272. msg_err_task ("no write servers defined for %s, cannot learn",
  1273. stcf->symbol);
  1274. return NULL;
  1275. }
  1276. up = rspamd_upstream_get (ups,
  1277. RSPAMD_UPSTREAM_MASTER_SLAVE,
  1278. NULL,
  1279. 0);
  1280. }
  1281. else {
  1282. ups = rspamd_redis_get_servers (ctx, "read_servers");
  1283. if (!ups) {
  1284. msg_err_task ("no read servers defined for %s, cannot stat",
  1285. stcf->symbol);
  1286. return NULL;
  1287. }
  1288. up = rspamd_upstream_get (ups,
  1289. RSPAMD_UPSTREAM_ROUND_ROBIN,
  1290. NULL,
  1291. 0);
  1292. }
  1293. if (up == NULL) {
  1294. msg_err_task ("no upstreams reachable");
  1295. return NULL;
  1296. }
  1297. if (rspamd_redis_expand_object (ctx->redis_object, ctx, task,
  1298. &object_expanded) == 0) {
  1299. msg_err_task ("expansion for learning failed for symbol %s "
  1300. "(maybe learning per user classifier with no user or recipient)",
  1301. stcf->symbol);
  1302. return NULL;
  1303. }
  1304. rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
  1305. rspamd_mempool_add_destructor (task->task_pool,
  1306. rspamd_gerror_free_maybe, &rt->err);
  1307. rt->selected = up;
  1308. rt->task = task;
  1309. rt->ctx = ctx;
  1310. rt->stcf = stcf;
  1311. rt->redis_object_expanded = object_expanded;
  1312. addr = rspamd_upstream_addr (up);
  1313. g_assert (addr != NULL);
  1314. if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
  1315. rt->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
  1316. }
  1317. else {
  1318. rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
  1319. rspamd_inet_address_get_port (addr));
  1320. }
  1321. if (rt->redis == NULL) {
  1322. msg_err_task ("cannot connect redis");
  1323. return NULL;
  1324. }
  1325. redisLibeventAttach (rt->redis, task->ev_base);
  1326. rspamd_redis_maybe_auth (ctx, rt->redis);
  1327. return rt;
  1328. }
  1329. void
  1330. rspamd_redis_close (gpointer p)
  1331. {
  1332. struct redis_stat_ctx *ctx = REDIS_CTX (p);
  1333. lua_State *L = ctx->L;
  1334. if (ctx->conf_ref) {
  1335. luaL_unref (L, LUA_REGISTRYINDEX, ctx->conf_ref);
  1336. }
  1337. g_free (ctx);
  1338. }
  1339. gboolean
  1340. rspamd_redis_process_tokens (struct rspamd_task *task,
  1341. GPtrArray *tokens,
  1342. gint id, gpointer p)
  1343. {
  1344. struct redis_stat_runtime *rt = REDIS_RUNTIME (p);
  1345. rspamd_fstring_t *query;
  1346. struct timeval tv;
  1347. gint ret;
  1348. const gchar *learned_key = "learns";
  1349. if (rspamd_session_blocked (task->s)) {
  1350. return FALSE;
  1351. }
  1352. if (tokens == NULL || tokens->len == 0 || rt->redis == NULL) {
  1353. return FALSE;
  1354. }
  1355. rt->id = id;
  1356. if (rt->ctx->new_schema) {
  1357. if (rt->ctx->stcf->is_spam) {
  1358. learned_key = "learns_spam";
  1359. }
  1360. else {
  1361. learned_key = "learns_ham";
  1362. }
  1363. }
  1364. if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
  1365. rt->redis_object_expanded, learned_key) == REDIS_OK) {
  1366. rspamd_session_add_event (task->s, rspamd_redis_fin, rt, M);
  1367. rt->has_event = TRUE;
  1368. if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
  1369. event_del (&rt->timeout_event);
  1370. }
  1371. event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt);
  1372. event_base_set (task->ev_base, &rt->timeout_event);
  1373. double_to_tv (rt->ctx->timeout, &tv);
  1374. event_add (&rt->timeout_event, &tv);
  1375. query = rspamd_redis_tokens_to_query (task, rt, tokens,
  1376. rt->ctx->new_schema ? "HGET" : "HMGET",
  1377. rt->redis_object_expanded, FALSE, -1,
  1378. rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER);
  1379. g_assert (query != NULL);
  1380. rspamd_mempool_add_destructor (task->task_pool,
  1381. (rspamd_mempool_destruct_t)rspamd_fstring_free, query);
  1382. ret = redisAsyncFormattedCommand (rt->redis, rspamd_redis_processed, rt,
  1383. query->str, query->len);
  1384. if (ret == REDIS_OK) {
  1385. return TRUE;
  1386. }
  1387. else {
  1388. msg_err_task ("call to redis failed: %s", rt->redis->errstr);
  1389. }
  1390. }
  1391. return FALSE;
  1392. }
  1393. gboolean
  1394. rspamd_redis_finalize_process (struct rspamd_task *task, gpointer runtime,
  1395. gpointer ctx)
  1396. {
  1397. struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
  1398. redisAsyncContext *redis;
  1399. if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
  1400. event_del (&rt->timeout_event);
  1401. }
  1402. if (rt->redis) {
  1403. redis = rt->redis;
  1404. rt->redis = NULL;
  1405. redisAsyncFree (redis);
  1406. }
  1407. if (rt->err) {
  1408. return FALSE;
  1409. }
  1410. return TRUE;
  1411. }
  1412. gboolean
  1413. rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
  1414. gint id, gpointer p)
  1415. {
  1416. struct redis_stat_runtime *rt = REDIS_RUNTIME (p);
  1417. struct upstream *up;
  1418. struct upstream_list *ups;
  1419. rspamd_inet_addr_t *addr;
  1420. struct timeval tv;
  1421. rspamd_fstring_t *query;
  1422. const gchar *redis_cmd;
  1423. rspamd_token_t *tok;
  1424. gint ret;
  1425. goffset off;
  1426. const gchar *learned_key = "learns";
  1427. if (rspamd_session_blocked (task->s)) {
  1428. return FALSE;
  1429. }
  1430. ups = rspamd_redis_get_servers (rt->ctx, "write_servers");
  1431. if (!ups) {
  1432. return FALSE;
  1433. }
  1434. up = rspamd_upstream_get (ups,
  1435. RSPAMD_UPSTREAM_MASTER_SLAVE,
  1436. NULL,
  1437. 0);
  1438. if (up == NULL) {
  1439. msg_err_task ("no upstreams reachable");
  1440. return FALSE;
  1441. }
  1442. rt->selected = up;
  1443. if (rt->ctx->new_schema) {
  1444. if (rt->ctx->stcf->is_spam) {
  1445. learned_key = "learns_spam";
  1446. }
  1447. else {
  1448. learned_key = "learns_ham";
  1449. }
  1450. }
  1451. addr = rspamd_upstream_addr (up);
  1452. g_assert (addr != NULL);
  1453. if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
  1454. rt->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
  1455. }
  1456. else {
  1457. rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
  1458. rspamd_inet_address_get_port (addr));
  1459. }
  1460. g_assert (rt->redis != NULL);
  1461. redisLibeventAttach (rt->redis, task->ev_base);
  1462. rspamd_redis_maybe_auth (rt->ctx, rt->redis);
  1463. /*
  1464. * Add the current key to the set of learned keys
  1465. */
  1466. redisAsyncCommand (rt->redis, NULL, NULL, "SADD %s_keys %s",
  1467. rt->stcf->symbol, rt->redis_object_expanded);
  1468. if (rt->ctx->new_schema) {
  1469. redisAsyncCommand (rt->redis, NULL, NULL, "HSET %s version 2",
  1470. rt->redis_object_expanded);
  1471. }
  1472. if (rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER) {
  1473. redis_cmd = "HINCRBY";
  1474. }
  1475. else {
  1476. redis_cmd = "HINCRBYFLOAT";
  1477. }
  1478. rt->id = id;
  1479. query = rspamd_redis_tokens_to_query (task, rt, tokens,
  1480. redis_cmd, rt->redis_object_expanded, TRUE, id,
  1481. rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER);
  1482. g_assert (query != NULL);
  1483. query->len = 0;
  1484. /*
  1485. * XXX:
  1486. * Dirty hack: we get a token and check if it's value is -1 or 1, so
  1487. * we could understand that we are learning or unlearning
  1488. */
  1489. tok = g_ptr_array_index (task->tokens, 0);
  1490. if (tok->values[id] > 0) {
  1491. rspamd_printf_fstring (&query, ""
  1492. "*4\r\n"
  1493. "$7\r\n"
  1494. "HINCRBY\r\n"
  1495. "$%d\r\n"
  1496. "%s\r\n"
  1497. "$%d\r\n"
  1498. "%s\r\n" /* Learned key */
  1499. "$1\r\n"
  1500. "1\r\n",
  1501. (gint)strlen (rt->redis_object_expanded),
  1502. rt->redis_object_expanded,
  1503. (gint)strlen (learned_key),
  1504. learned_key);
  1505. }
  1506. else {
  1507. rspamd_printf_fstring (&query, ""
  1508. "*4\r\n"
  1509. "$7\r\n"
  1510. "HINCRBY\r\n"
  1511. "$%d\r\n"
  1512. "%s\r\n"
  1513. "$%d\r\n"
  1514. "%s\r\n" /* Learned key */
  1515. "$2\r\n"
  1516. "-1\r\n",
  1517. (gint)strlen (rt->redis_object_expanded),
  1518. rt->redis_object_expanded,
  1519. (gint)strlen (learned_key),
  1520. learned_key);
  1521. }
  1522. ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
  1523. query->str, query->len);
  1524. if (ret != REDIS_OK) {
  1525. msg_err_task ("call to redis failed: %s", rt->redis->errstr);
  1526. rspamd_fstring_free (query);
  1527. return FALSE;
  1528. }
  1529. off = query->len;
  1530. ret = rspamd_printf_fstring (&query, "*1\r\n$4\r\nEXEC\r\n");
  1531. ret = redisAsyncFormattedCommand (rt->redis, rspamd_redis_learned, rt,
  1532. query->str + off, ret);
  1533. rspamd_mempool_add_destructor (task->task_pool,
  1534. (rspamd_mempool_destruct_t)rspamd_fstring_free, query);
  1535. if (ret == REDIS_OK) {
  1536. /* Add signature if needed */
  1537. if (rt->ctx->enable_signatures) {
  1538. rspamd_redis_store_stat_signature (task, rt, tokens,
  1539. "RSIG");
  1540. }
  1541. rspamd_session_add_event (task->s, rspamd_redis_fin_learn, rt, M);
  1542. rt->has_event = TRUE;
  1543. /* Set timeout */
  1544. if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
  1545. event_del (&rt->timeout_event);
  1546. }
  1547. event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt);
  1548. event_base_set (task->ev_base, &rt->timeout_event);
  1549. double_to_tv (rt->ctx->timeout, &tv);
  1550. event_add (&rt->timeout_event, &tv);
  1551. return TRUE;
  1552. }
  1553. else {
  1554. msg_err_task ("call to redis failed: %s", rt->redis->errstr);
  1555. }
  1556. return FALSE;
  1557. }
  1558. gboolean
  1559. rspamd_redis_finalize_learn (struct rspamd_task *task, gpointer runtime,
  1560. gpointer ctx, GError **err)
  1561. {
  1562. struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
  1563. redisAsyncContext *redis;
  1564. if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
  1565. event_del (&rt->timeout_event);
  1566. }
  1567. if (rt->redis) {
  1568. redis = rt->redis;
  1569. rt->redis = NULL;
  1570. redisAsyncFree (redis);
  1571. }
  1572. if (rt->err) {
  1573. g_propagate_error (err, rt->err);
  1574. rt->err = NULL;
  1575. return FALSE;
  1576. }
  1577. return TRUE;
  1578. }
  1579. gulong
  1580. rspamd_redis_total_learns (struct rspamd_task *task, gpointer runtime,
  1581. gpointer ctx)
  1582. {
  1583. struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
  1584. return rt->learned;
  1585. }
  1586. gulong
  1587. rspamd_redis_inc_learns (struct rspamd_task *task, gpointer runtime,
  1588. gpointer ctx)
  1589. {
  1590. struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
  1591. /* XXX: may cause races */
  1592. return rt->learned + 1;
  1593. }
  1594. gulong
  1595. rspamd_redis_dec_learns (struct rspamd_task *task, gpointer runtime,
  1596. gpointer ctx)
  1597. {
  1598. struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
  1599. /* XXX: may cause races */
  1600. return rt->learned + 1;
  1601. }
  1602. gulong
  1603. rspamd_redis_learns (struct rspamd_task *task, gpointer runtime,
  1604. gpointer ctx)
  1605. {
  1606. struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
  1607. return rt->learned;
  1608. }
  1609. ucl_object_t *
  1610. rspamd_redis_get_stat (gpointer runtime,
  1611. gpointer ctx)
  1612. {
  1613. struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
  1614. struct rspamd_redis_stat_elt *st;
  1615. redisAsyncContext *redis;
  1616. if (rt->ctx->stat_elt) {
  1617. st = rt->ctx->stat_elt->ud;
  1618. if (rt->redis) {
  1619. redis = rt->redis;
  1620. rt->redis = NULL;
  1621. redisAsyncFree (redis);
  1622. }
  1623. if (st->stat) {
  1624. return ucl_object_ref (st->stat);
  1625. }
  1626. }
  1627. return NULL;
  1628. }
  1629. gpointer
  1630. rspamd_redis_load_tokenizer_config (gpointer runtime,
  1631. gsize *len)
  1632. {
  1633. return NULL;
  1634. }
  1635. #endif