You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

redis_cache.c 13KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. /*
  2. * Copyright 2023 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "learn_cache.h"
  18. #include "rspamd.h"
  19. #include "stat_api.h"
  20. #include "stat_internal.h"
  21. #include "cryptobox.h"
  22. #include "ucl.h"
  23. #include "hiredis.h"
  24. #include "adapters/libev.h"
  25. #include "lua/lua_common.h"
  26. #include "libmime/message.h"
  27. #define REDIS_DEFAULT_TIMEOUT 0.5
  28. #define REDIS_STAT_TIMEOUT 30
  29. #define REDIS_DEFAULT_PORT 6379
  30. #define DEFAULT_REDIS_KEY "learned_ids"
  31. static const gchar *M = "redis learn cache";
  32. struct rspamd_redis_cache_ctx {
  33. lua_State *L;
  34. struct rspamd_statfile_config *stcf;
  35. const gchar *username;
  36. const gchar *password;
  37. const gchar *dbname;
  38. const gchar *redis_object;
  39. gdouble timeout;
  40. gint conf_ref;
  41. };
  42. struct rspamd_redis_cache_runtime {
  43. struct rspamd_redis_cache_ctx *ctx;
  44. struct rspamd_task *task;
  45. struct upstream *selected;
  46. ev_timer timer_ev;
  47. redisAsyncContext *redis;
  48. gboolean has_event;
  49. };
  50. static GQuark
  51. rspamd_stat_cache_redis_quark(void)
  52. {
  53. return g_quark_from_static_string(M);
  54. }
  55. static inline struct upstream_list *
  56. rspamd_redis_get_servers(struct rspamd_redis_cache_ctx *ctx,
  57. const gchar *what)
  58. {
  59. lua_State *L = ctx->L;
  60. struct upstream_list *res;
  61. lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->conf_ref);
  62. lua_pushstring(L, what);
  63. lua_gettable(L, -2);
  64. res = *((struct upstream_list **) lua_touserdata(L, -1));
  65. lua_settop(L, 0);
  66. return res;
  67. }
  68. static void
  69. rspamd_redis_cache_maybe_auth(struct rspamd_redis_cache_ctx *ctx,
  70. redisAsyncContext *redis)
  71. {
  72. if (ctx->username) {
  73. if (ctx->password) {
  74. redisAsyncCommand(redis, NULL, NULL, "AUTH %s %s", ctx->username, ctx->password);
  75. }
  76. else {
  77. msg_warn("Redis requires a password when username is supplied");
  78. }
  79. }
  80. else if (ctx->password) {
  81. redisAsyncCommand(redis, NULL, NULL, "AUTH %s", ctx->password);
  82. }
  83. if (ctx->dbname) {
  84. redisAsyncCommand(redis, NULL, NULL, "SELECT %s", ctx->dbname);
  85. }
  86. }
  87. /* Called on connection termination */
  88. static void
  89. rspamd_redis_cache_fin(gpointer data)
  90. {
  91. struct rspamd_redis_cache_runtime *rt = data;
  92. redisAsyncContext *redis;
  93. rt->has_event = FALSE;
  94. ev_timer_stop(rt->task->event_loop, &rt->timer_ev);
  95. if (rt->redis) {
  96. redis = rt->redis;
  97. rt->redis = NULL;
  98. /* This calls for all callbacks pending */
  99. redisAsyncFree(redis);
  100. }
  101. }
  102. static void
  103. rspamd_redis_cache_timeout(EV_P_ ev_timer *w, int revents)
  104. {
  105. struct rspamd_redis_cache_runtime *rt =
  106. (struct rspamd_redis_cache_runtime *) w->data;
  107. struct rspamd_task *task;
  108. task = rt->task;
  109. msg_err_task("connection to redis server %s timed out",
  110. rspamd_upstream_name(rt->selected));
  111. rspamd_upstream_fail(rt->selected, FALSE, "timeout");
  112. if (rt->has_event) {
  113. rspamd_session_remove_event(task->s, rspamd_redis_cache_fin, rt);
  114. }
  115. }
  116. /* Called when we have checked the specified message id */
  117. static void
  118. rspamd_stat_cache_redis_get(redisAsyncContext *c, gpointer r, gpointer priv)
  119. {
  120. struct rspamd_redis_cache_runtime *rt = priv;
  121. redisReply *reply = r;
  122. struct rspamd_task *task;
  123. glong val = 0;
  124. task = rt->task;
  125. if (c->err == 0) {
  126. if (reply) {
  127. if (G_LIKELY(reply->type == REDIS_REPLY_INTEGER)) {
  128. val = reply->integer;
  129. }
  130. else if (reply->type == REDIS_REPLY_STRING) {
  131. rspamd_strtol(reply->str, reply->len, &val);
  132. }
  133. else {
  134. if (reply->type == REDIS_REPLY_ERROR) {
  135. msg_err_task("cannot learn %s: redis error: \"%s\"",
  136. rt->ctx->stcf->symbol, reply->str);
  137. }
  138. else if (reply->type != REDIS_REPLY_NIL) {
  139. msg_err_task("bad learned type for %s: %d",
  140. rt->ctx->stcf->symbol, reply->type);
  141. }
  142. val = 0;
  143. }
  144. }
  145. if ((val > 0 && (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM)) ||
  146. (val < 0 && (task->flags & RSPAMD_TASK_FLAG_LEARN_HAM))) {
  147. /* Already learned */
  148. msg_info_task("<%s> has been already "
  149. "learned as %s, ignore it",
  150. MESSAGE_FIELD(task, message_id),
  151. (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM) ? "spam" : "ham");
  152. task->flags |= RSPAMD_TASK_FLAG_ALREADY_LEARNED;
  153. }
  154. else if (val != 0) {
  155. /* Unlearn flag */
  156. task->flags |= RSPAMD_TASK_FLAG_UNLEARN;
  157. }
  158. rspamd_upstream_ok(rt->selected);
  159. }
  160. else {
  161. rspamd_upstream_fail(rt->selected, FALSE, c->errstr);
  162. }
  163. if (rt->has_event) {
  164. rspamd_session_remove_event(task->s, rspamd_redis_cache_fin, rt);
  165. }
  166. }
  167. /* Called when we have learned the specified message id */
  168. static void
  169. rspamd_stat_cache_redis_set(redisAsyncContext *c, gpointer r, gpointer priv)
  170. {
  171. struct rspamd_redis_cache_runtime *rt = priv;
  172. struct rspamd_task *task;
  173. task = rt->task;
  174. if (c->err == 0) {
  175. /* XXX: we ignore results here */
  176. rspamd_upstream_ok(rt->selected);
  177. }
  178. else {
  179. rspamd_upstream_fail(rt->selected, FALSE, c->errstr);
  180. }
  181. if (rt->has_event) {
  182. rspamd_session_remove_event(task->s, rspamd_redis_cache_fin, rt);
  183. }
  184. }
  185. static void
  186. rspamd_stat_cache_redis_generate_id(struct rspamd_task *task)
  187. {
  188. rspamd_cryptobox_hash_state_t st;
  189. rspamd_token_t *tok;
  190. guint i;
  191. guchar out[rspamd_cryptobox_HASHBYTES];
  192. gchar *b32out;
  193. gchar *user = NULL;
  194. rspamd_cryptobox_hash_init(&st, NULL, 0);
  195. user = rspamd_mempool_get_variable(task->task_pool, "stat_user");
  196. /* Use dedicated hash space for per users cache */
  197. if (user != NULL) {
  198. rspamd_cryptobox_hash_update(&st, user, strlen(user));
  199. }
  200. for (i = 0; i < task->tokens->len; i++) {
  201. tok = g_ptr_array_index(task->tokens, i);
  202. rspamd_cryptobox_hash_update(&st, (guchar *) &tok->data,
  203. sizeof(tok->data));
  204. }
  205. rspamd_cryptobox_hash_final(&st, out);
  206. b32out = rspamd_mempool_alloc(task->task_pool,
  207. sizeof(out) * 8 / 5 + 3);
  208. i = rspamd_encode_base32_buf(out, sizeof(out), b32out,
  209. sizeof(out) * 8 / 5 + 2, RSPAMD_BASE32_DEFAULT);
  210. if (i > 0) {
  211. /* Zero terminate */
  212. b32out[i] = '\0';
  213. }
  214. rspamd_mempool_set_variable(task->task_pool, "words_hash", b32out, NULL);
  215. }
  216. gpointer
  217. rspamd_stat_cache_redis_init(struct rspamd_stat_ctx *ctx,
  218. struct rspamd_config *cfg,
  219. struct rspamd_statfile *st,
  220. const ucl_object_t *cf)
  221. {
  222. struct rspamd_redis_cache_ctx *cache_ctx;
  223. struct rspamd_statfile_config *stf = st->stcf;
  224. const ucl_object_t *obj;
  225. gboolean ret = FALSE;
  226. lua_State *L = (lua_State *) cfg->lua_state;
  227. gint conf_ref = -1;
  228. cache_ctx = g_malloc0(sizeof(*cache_ctx));
  229. cache_ctx->timeout = REDIS_DEFAULT_TIMEOUT;
  230. cache_ctx->L = L;
  231. /* First search in backend configuration */
  232. obj = ucl_object_lookup(st->classifier->cfg->opts, "backend");
  233. if (obj != NULL && ucl_object_type(obj) == UCL_OBJECT) {
  234. ret = rspamd_lua_try_load_redis(L, obj, cfg, &conf_ref);
  235. }
  236. /* Now try statfiles config */
  237. if (!ret && stf->opts) {
  238. ret = rspamd_lua_try_load_redis(L, stf->opts, cfg, &conf_ref);
  239. }
  240. /* Now try classifier config */
  241. if (!ret && st->classifier->cfg->opts) {
  242. ret = rspamd_lua_try_load_redis(L, st->classifier->cfg->opts, cfg, &conf_ref);
  243. }
  244. /* Now try global redis settings */
  245. if (!ret) {
  246. obj = ucl_object_lookup(cfg->cfg_ucl_obj, "redis");
  247. if (obj) {
  248. const ucl_object_t *specific_obj;
  249. specific_obj = ucl_object_lookup(obj, "statistics");
  250. if (specific_obj) {
  251. ret = rspamd_lua_try_load_redis(L,
  252. specific_obj, cfg, &conf_ref);
  253. }
  254. else {
  255. ret = rspamd_lua_try_load_redis(L,
  256. obj, cfg, &conf_ref);
  257. }
  258. }
  259. }
  260. if (!ret) {
  261. msg_err_config("cannot init redis cache for %s", stf->symbol);
  262. g_free(cache_ctx);
  263. return NULL;
  264. }
  265. obj = ucl_object_lookup(st->classifier->cfg->opts, "cache_key");
  266. if (obj) {
  267. cache_ctx->redis_object = ucl_object_tostring(obj);
  268. }
  269. else {
  270. cache_ctx->redis_object = DEFAULT_REDIS_KEY;
  271. }
  272. cache_ctx->conf_ref = conf_ref;
  273. /* Check some common table values */
  274. lua_rawgeti(L, LUA_REGISTRYINDEX, conf_ref);
  275. lua_pushstring(L, "timeout");
  276. lua_gettable(L, -2);
  277. if (lua_type(L, -1) == LUA_TNUMBER) {
  278. cache_ctx->timeout = lua_tonumber(L, -1);
  279. }
  280. lua_pop(L, 1);
  281. lua_pushstring(L, "db");
  282. lua_gettable(L, -2);
  283. if (lua_type(L, -1) == LUA_TSTRING) {
  284. cache_ctx->dbname = rspamd_mempool_strdup(cfg->cfg_pool,
  285. lua_tostring(L, -1));
  286. }
  287. lua_pop(L, 1);
  288. lua_pushstring(L, "username");
  289. lua_gettable(L, -2);
  290. if (lua_type(L, -1) == LUA_TSTRING) {
  291. cache_ctx->username = rspamd_mempool_strdup(cfg->cfg_pool,
  292. lua_tostring(L, -1));
  293. }
  294. lua_pop(L, 1);
  295. lua_pushstring(L, "password");
  296. lua_gettable(L, -2);
  297. if (lua_type(L, -1) == LUA_TSTRING) {
  298. cache_ctx->password = rspamd_mempool_strdup(cfg->cfg_pool,
  299. lua_tostring(L, -1));
  300. }
  301. lua_pop(L, 1);
  302. lua_settop(L, 0);
  303. cache_ctx->stcf = stf;
  304. return (gpointer) cache_ctx;
  305. }
  306. gpointer
  307. rspamd_stat_cache_redis_runtime(struct rspamd_task *task,
  308. gpointer c, gboolean learn)
  309. {
  310. struct rspamd_redis_cache_ctx *ctx = c;
  311. struct rspamd_redis_cache_runtime *rt;
  312. struct upstream *up;
  313. struct upstream_list *ups;
  314. rspamd_inet_addr_t *addr;
  315. g_assert(ctx != NULL);
  316. if (task->tokens == NULL || task->tokens->len == 0) {
  317. return NULL;
  318. }
  319. if (learn) {
  320. ups = rspamd_redis_get_servers(ctx, "write_servers");
  321. if (!ups) {
  322. msg_err_task("no write servers defined for %s, cannot learn",
  323. ctx->stcf->symbol);
  324. return NULL;
  325. }
  326. up = rspamd_upstream_get(ups,
  327. RSPAMD_UPSTREAM_MASTER_SLAVE,
  328. NULL,
  329. 0);
  330. }
  331. else {
  332. ups = rspamd_redis_get_servers(ctx, "read_servers");
  333. if (!ups) {
  334. msg_err_task("no read servers defined for %s, cannot check",
  335. ctx->stcf->symbol);
  336. return NULL;
  337. }
  338. up = rspamd_upstream_get(ups,
  339. RSPAMD_UPSTREAM_ROUND_ROBIN,
  340. NULL,
  341. 0);
  342. }
  343. if (up == NULL) {
  344. msg_err_task("no upstreams reachable");
  345. return NULL;
  346. }
  347. rt = rspamd_mempool_alloc0(task->task_pool, sizeof(*rt));
  348. rt->selected = up;
  349. rt->task = task;
  350. rt->ctx = ctx;
  351. addr = rspamd_upstream_addr_next(up);
  352. g_assert(addr != NULL);
  353. if (rspamd_inet_address_get_af(addr) == AF_UNIX) {
  354. rt->redis = redisAsyncConnectUnix(rspamd_inet_address_to_string(addr));
  355. }
  356. else {
  357. rt->redis = redisAsyncConnect(rspamd_inet_address_to_string(addr),
  358. rspamd_inet_address_get_port(addr));
  359. }
  360. if (rt->redis == NULL) {
  361. msg_warn_task("cannot connect to redis server %s: %s",
  362. rspamd_inet_address_to_string_pretty(addr),
  363. strerror(errno));
  364. return NULL;
  365. }
  366. else if (rt->redis->err != REDIS_OK) {
  367. msg_warn_task("cannot connect to redis server %s: %s",
  368. rspamd_inet_address_to_string_pretty(addr),
  369. rt->redis->errstr);
  370. redisAsyncFree(rt->redis);
  371. rt->redis = NULL;
  372. return NULL;
  373. }
  374. redisLibevAttach(task->event_loop, rt->redis);
  375. /* Now check stats */
  376. rt->timer_ev.data = rt;
  377. ev_timer_init(&rt->timer_ev, rspamd_redis_cache_timeout,
  378. rt->ctx->timeout, 0.0);
  379. rspamd_redis_cache_maybe_auth(ctx, rt->redis);
  380. if (!learn) {
  381. rspamd_stat_cache_redis_generate_id(task);
  382. }
  383. return rt;
  384. }
  385. gint rspamd_stat_cache_redis_check(struct rspamd_task *task,
  386. gboolean is_spam,
  387. gpointer runtime)
  388. {
  389. struct rspamd_redis_cache_runtime *rt = runtime;
  390. gchar *h;
  391. if (rspamd_session_blocked(task->s)) {
  392. return RSPAMD_LEARN_IGNORE;
  393. }
  394. h = rspamd_mempool_get_variable(task->task_pool, "words_hash");
  395. if (h == NULL) {
  396. return RSPAMD_LEARN_IGNORE;
  397. }
  398. if (redisAsyncCommand(rt->redis, rspamd_stat_cache_redis_get, rt,
  399. "HGET %s %s",
  400. rt->ctx->redis_object, h) == REDIS_OK) {
  401. rspamd_session_add_event(task->s,
  402. rspamd_redis_cache_fin,
  403. rt,
  404. M);
  405. ev_timer_start(rt->task->event_loop, &rt->timer_ev);
  406. rt->has_event = TRUE;
  407. }
  408. /* We need to return OK every time */
  409. return RSPAMD_LEARN_OK;
  410. }
  411. gint rspamd_stat_cache_redis_learn(struct rspamd_task *task,
  412. gboolean is_spam,
  413. gpointer runtime)
  414. {
  415. struct rspamd_redis_cache_runtime *rt = runtime;
  416. gchar *h;
  417. gint flag;
  418. if (rt == NULL || rt->ctx == NULL || rspamd_session_blocked(task->s)) {
  419. return RSPAMD_LEARN_IGNORE;
  420. }
  421. h = rspamd_mempool_get_variable(task->task_pool, "words_hash");
  422. g_assert(h != NULL);
  423. flag = (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM) ? 1 : -1;
  424. if (redisAsyncCommand(rt->redis, rspamd_stat_cache_redis_set, rt,
  425. "HSET %s %s %d",
  426. rt->ctx->redis_object, h, flag) == REDIS_OK) {
  427. rspamd_session_add_event(task->s,
  428. rspamd_redis_cache_fin, rt, M);
  429. ev_timer_start(rt->task->event_loop, &rt->timer_ev);
  430. rt->has_event = TRUE;
  431. }
  432. /* We need to return OK every time */
  433. return RSPAMD_LEARN_OK;
  434. }
  435. void rspamd_stat_cache_redis_close(gpointer c)
  436. {
  437. struct rspamd_redis_cache_ctx *ctx = (struct rspamd_redis_cache_ctx *) c;
  438. lua_State *L;
  439. L = ctx->L;
  440. if (ctx->conf_ref) {
  441. luaL_unref(L, LUA_REGISTRYINDEX, ctx->conf_ref);
  442. }
  443. g_free(ctx);
  444. }