You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

redis_cache.c 12KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "learn_cache.h"
  18. #include "rspamd.h"
  19. #include "stat_api.h"
  20. #include "stat_internal.h"
  21. #include "cryptobox.h"
  22. #include "ucl.h"
  23. #include "hiredis.h"
  24. #include "adapters/libev.h"
  25. #include "lua/lua_common.h"
  26. #include "libmime/message.h"
  27. #define REDIS_DEFAULT_TIMEOUT 0.5
  28. #define REDIS_STAT_TIMEOUT 30
  29. #define REDIS_DEFAULT_PORT 6379
  30. #define DEFAULT_REDIS_KEY "learned_ids"
  31. static const gchar *M = "redis learn cache";
  32. struct rspamd_redis_cache_ctx {
  33. lua_State *L;
  34. struct rspamd_statfile_config *stcf;
  35. const gchar *password;
  36. const gchar *dbname;
  37. const gchar *redis_object;
  38. gdouble timeout;
  39. gint conf_ref;
  40. };
  41. struct rspamd_redis_cache_runtime {
  42. struct rspamd_redis_cache_ctx *ctx;
  43. struct rspamd_task *task;
  44. struct upstream *selected;
  45. ev_timer timer_ev;
  46. redisAsyncContext *redis;
  47. gboolean has_event;
  48. };
  49. static GQuark
  50. rspamd_stat_cache_redis_quark (void)
  51. {
  52. return g_quark_from_static_string (M);
  53. }
  54. static inline struct upstream_list *
  55. rspamd_redis_get_servers (struct rspamd_redis_cache_ctx *ctx,
  56. const gchar *what)
  57. {
  58. lua_State *L = ctx->L;
  59. struct upstream_list *res;
  60. lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->conf_ref);
  61. lua_pushstring (L, what);
  62. lua_gettable (L, -2);
  63. res = *((struct upstream_list**)lua_touserdata (L, -1));
  64. lua_settop (L, 0);
  65. return res;
  66. }
  67. static void
  68. rspamd_redis_cache_maybe_auth (struct rspamd_redis_cache_ctx *ctx,
  69. redisAsyncContext *redis)
  70. {
  71. if (ctx->password) {
  72. redisAsyncCommand (redis, NULL, NULL, "AUTH %s", ctx->password);
  73. }
  74. if (ctx->dbname) {
  75. redisAsyncCommand (redis, NULL, NULL, "SELECT %s", ctx->dbname);
  76. }
  77. }
  78. /* Called on connection termination */
  79. static void
  80. rspamd_redis_cache_fin (gpointer data)
  81. {
  82. struct rspamd_redis_cache_runtime *rt = data;
  83. redisAsyncContext *redis;
  84. rt->has_event = FALSE;
  85. ev_timer_stop (rt->task->event_loop, &rt->timer_ev);
  86. if (rt->redis) {
  87. redis = rt->redis;
  88. rt->redis = NULL;
  89. /* This calls for all callbacks pending */
  90. redisAsyncFree (redis);
  91. }
  92. }
  93. static void
  94. rspamd_redis_cache_timeout (EV_P_ ev_timer *w, int revents)
  95. {
  96. struct rspamd_redis_cache_runtime *rt =
  97. (struct rspamd_redis_cache_runtime *)w->data;
  98. struct rspamd_task *task;
  99. task = rt->task;
  100. msg_err_task ("connection to redis server %s timed out",
  101. rspamd_upstream_name (rt->selected));
  102. rspamd_upstream_fail (rt->selected, FALSE, "timeout");
  103. if (rt->has_event) {
  104. rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, rt);
  105. }
  106. }
  107. /* Called when we have checked the specified message id */
  108. static void
  109. rspamd_stat_cache_redis_get (redisAsyncContext *c, gpointer r, gpointer priv)
  110. {
  111. struct rspamd_redis_cache_runtime *rt = priv;
  112. redisReply *reply = r;
  113. struct rspamd_task *task;
  114. glong val = 0;
  115. task = rt->task;
  116. if (c->err == 0) {
  117. if (reply) {
  118. if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
  119. val = reply->integer;
  120. }
  121. else if (reply->type == REDIS_REPLY_STRING) {
  122. rspamd_strtol (reply->str, reply->len, &val);
  123. }
  124. else {
  125. if (reply->type == REDIS_REPLY_ERROR) {
  126. msg_err_task ("cannot learn %s: redis error: \"%s\"",
  127. rt->ctx->stcf->symbol, reply->str);
  128. }
  129. else if (reply->type != REDIS_REPLY_NIL) {
  130. msg_err_task ("bad learned type for %s: %d",
  131. rt->ctx->stcf->symbol, reply->type);
  132. }
  133. val = 0;
  134. }
  135. }
  136. if ((val > 0 && (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM)) ||
  137. (val < 0 && (task->flags & RSPAMD_TASK_FLAG_LEARN_HAM))) {
  138. /* Already learned */
  139. msg_info_task ("<%s> has been already "
  140. "learned as %s, ignore it", MESSAGE_FIELD (task, message_id),
  141. (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM) ? "spam" : "ham");
  142. task->flags |= RSPAMD_TASK_FLAG_ALREADY_LEARNED;
  143. }
  144. else if (val != 0) {
  145. /* Unlearn flag */
  146. task->flags |= RSPAMD_TASK_FLAG_UNLEARN;
  147. }
  148. rspamd_upstream_ok (rt->selected);
  149. }
  150. else {
  151. rspamd_upstream_fail (rt->selected, FALSE, c->errstr);
  152. }
  153. if (rt->has_event) {
  154. rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, rt);
  155. }
  156. }
  157. /* Called when we have learned the specified message id */
  158. static void
  159. rspamd_stat_cache_redis_set (redisAsyncContext *c, gpointer r, gpointer priv)
  160. {
  161. struct rspamd_redis_cache_runtime *rt = priv;
  162. struct rspamd_task *task;
  163. task = rt->task;
  164. if (c->err == 0) {
  165. /* XXX: we ignore results here */
  166. rspamd_upstream_ok (rt->selected);
  167. }
  168. else {
  169. rspamd_upstream_fail (rt->selected, FALSE, c->errstr);
  170. }
  171. if (rt->has_event) {
  172. rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, rt);
  173. }
  174. }
  175. static void
  176. rspamd_stat_cache_redis_generate_id (struct rspamd_task *task)
  177. {
  178. rspamd_cryptobox_hash_state_t st;
  179. rspamd_token_t *tok;
  180. guint i;
  181. guchar out[rspamd_cryptobox_HASHBYTES];
  182. gchar *b32out;
  183. gchar *user = NULL;
  184. rspamd_cryptobox_hash_init (&st, NULL, 0);
  185. user = rspamd_mempool_get_variable (task->task_pool, "stat_user");
  186. /* Use dedicated hash space for per users cache */
  187. if (user != NULL) {
  188. rspamd_cryptobox_hash_update (&st, user, strlen (user));
  189. }
  190. for (i = 0; i < task->tokens->len; i ++) {
  191. tok = g_ptr_array_index (task->tokens, i);
  192. rspamd_cryptobox_hash_update (&st, (guchar *)&tok->data,
  193. sizeof (tok->data));
  194. }
  195. rspamd_cryptobox_hash_final (&st, out);
  196. b32out = rspamd_mempool_alloc (task->task_pool,
  197. sizeof (out) * 8 / 5 + 3);
  198. i = rspamd_encode_base32_buf (out, sizeof (out), b32out,
  199. sizeof (out) * 8 / 5 + 2, RSPAMD_BASE32_DEFAULT);
  200. if (i > 0) {
  201. /* Zero terminate */
  202. b32out[i] = '\0';
  203. }
  204. rspamd_mempool_set_variable (task->task_pool, "words_hash", b32out, NULL);
  205. }
  206. gpointer
  207. rspamd_stat_cache_redis_init (struct rspamd_stat_ctx *ctx,
  208. struct rspamd_config *cfg,
  209. struct rspamd_statfile *st,
  210. const ucl_object_t *cf)
  211. {
  212. struct rspamd_redis_cache_ctx *cache_ctx;
  213. struct rspamd_statfile_config *stf = st->stcf;
  214. const ucl_object_t *obj;
  215. gboolean ret = FALSE;
  216. lua_State *L = (lua_State *)cfg->lua_state;
  217. gint conf_ref = -1;
  218. cache_ctx = g_malloc0 (sizeof (*cache_ctx));
  219. cache_ctx->timeout = REDIS_DEFAULT_TIMEOUT;
  220. cache_ctx->L = L;
  221. /* First search in backend configuration */
  222. obj = ucl_object_lookup (st->classifier->cfg->opts, "backend");
  223. if (obj != NULL && ucl_object_type (obj) == UCL_OBJECT) {
  224. ret = rspamd_lua_try_load_redis (L, obj, cfg, &conf_ref);
  225. }
  226. /* Now try statfiles config */
  227. if (!ret && stf->opts) {
  228. ret = rspamd_lua_try_load_redis (L, stf->opts, cfg, &conf_ref);
  229. }
  230. /* Now try classifier config */
  231. if (!ret && st->classifier->cfg->opts) {
  232. ret = rspamd_lua_try_load_redis (L, st->classifier->cfg->opts, cfg, &conf_ref);
  233. }
  234. /* Now try global redis settings */
  235. if (!ret) {
  236. obj = ucl_object_lookup (cfg->rcl_obj, "redis");
  237. if (obj) {
  238. const ucl_object_t *specific_obj;
  239. specific_obj = ucl_object_lookup (obj, "statistics");
  240. if (specific_obj) {
  241. ret = rspamd_lua_try_load_redis (L,
  242. specific_obj, cfg, &conf_ref);
  243. }
  244. else {
  245. ret = rspamd_lua_try_load_redis (L,
  246. obj, cfg, &conf_ref);
  247. }
  248. }
  249. }
  250. if (!ret) {
  251. msg_err_config ("cannot init redis cache for %s", stf->symbol);
  252. g_free (cache_ctx);
  253. return NULL;
  254. }
  255. obj = ucl_object_lookup (st->classifier->cfg->opts, "cache_key");
  256. if (obj) {
  257. cache_ctx->redis_object = ucl_object_tostring (obj);
  258. }
  259. else {
  260. cache_ctx->redis_object = DEFAULT_REDIS_KEY;
  261. }
  262. cache_ctx->conf_ref = conf_ref;
  263. /* Check some common table values */
  264. lua_rawgeti (L, LUA_REGISTRYINDEX, conf_ref);
  265. lua_pushstring (L, "timeout");
  266. lua_gettable (L, -2);
  267. if (lua_type (L, -1) == LUA_TNUMBER) {
  268. cache_ctx->timeout = lua_tonumber (L, -1);
  269. }
  270. lua_pop (L, 1);
  271. lua_pushstring (L, "db");
  272. lua_gettable (L, -2);
  273. if (lua_type (L, -1) == LUA_TSTRING) {
  274. cache_ctx->dbname = rspamd_mempool_strdup (cfg->cfg_pool,
  275. lua_tostring (L, -1));
  276. }
  277. lua_pop (L, 1);
  278. lua_pushstring (L, "password");
  279. lua_gettable (L, -2);
  280. if (lua_type (L, -1) == LUA_TSTRING) {
  281. cache_ctx->password = rspamd_mempool_strdup (cfg->cfg_pool,
  282. lua_tostring (L, -1));
  283. }
  284. lua_pop (L, 1);
  285. lua_settop (L, 0);
  286. cache_ctx->stcf = stf;
  287. return (gpointer)cache_ctx;
  288. }
  289. gpointer
  290. rspamd_stat_cache_redis_runtime (struct rspamd_task *task,
  291. gpointer c, gboolean learn)
  292. {
  293. struct rspamd_redis_cache_ctx *ctx = c;
  294. struct rspamd_redis_cache_runtime *rt;
  295. struct upstream *up;
  296. struct upstream_list *ups;
  297. rspamd_inet_addr_t *addr;
  298. g_assert (ctx != NULL);
  299. if (task->tokens == NULL || task->tokens->len == 0) {
  300. return NULL;
  301. }
  302. if (learn) {
  303. ups = rspamd_redis_get_servers (ctx, "write_servers");
  304. if (!ups) {
  305. msg_err_task ("no write servers defined for %s, cannot learn",
  306. ctx->stcf->symbol);
  307. return NULL;
  308. }
  309. up = rspamd_upstream_get (ups,
  310. RSPAMD_UPSTREAM_MASTER_SLAVE,
  311. NULL,
  312. 0);
  313. }
  314. else {
  315. ups = rspamd_redis_get_servers (ctx, "read_servers");
  316. if (!ups) {
  317. msg_err_task ("no read servers defined for %s, cannot check",
  318. ctx->stcf->symbol);
  319. return NULL;
  320. }
  321. up = rspamd_upstream_get (ups,
  322. RSPAMD_UPSTREAM_ROUND_ROBIN,
  323. NULL,
  324. 0);
  325. }
  326. if (up == NULL) {
  327. msg_err_task ("no upstreams reachable");
  328. return NULL;
  329. }
  330. rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
  331. rt->selected = up;
  332. rt->task = task;
  333. rt->ctx = ctx;
  334. addr = rspamd_upstream_addr_next (up);
  335. g_assert (addr != NULL);
  336. if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
  337. rt->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
  338. }
  339. else {
  340. rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
  341. rspamd_inet_address_get_port (addr));
  342. }
  343. if (rt->redis == NULL) {
  344. msg_warn_task ("cannot connect to redis server %s: %s",
  345. rspamd_inet_address_to_string_pretty (addr),
  346. strerror (errno));
  347. return NULL;
  348. }
  349. else if (rt->redis->err != REDIS_OK) {
  350. msg_warn_task ("cannot connect to redis server %s: %s",
  351. rspamd_inet_address_to_string_pretty (addr),
  352. rt->redis->errstr);
  353. redisAsyncFree (rt->redis);
  354. rt->redis = NULL;
  355. return NULL;
  356. }
  357. redisLibevAttach (task->event_loop, rt->redis);
  358. /* Now check stats */
  359. rt->timer_ev.data = rt;
  360. ev_timer_init (&rt->timer_ev, rspamd_redis_cache_timeout,
  361. rt->ctx->timeout, 0.0);
  362. rspamd_redis_cache_maybe_auth (ctx, rt->redis);
  363. if (!learn) {
  364. rspamd_stat_cache_redis_generate_id (task);
  365. }
  366. return rt;
  367. }
  368. gint
  369. rspamd_stat_cache_redis_check (struct rspamd_task *task,
  370. gboolean is_spam,
  371. gpointer runtime)
  372. {
  373. struct rspamd_redis_cache_runtime *rt = runtime;
  374. gchar *h;
  375. if (rspamd_session_blocked (task->s)) {
  376. return RSPAMD_LEARN_IGNORE;
  377. }
  378. h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
  379. if (h == NULL) {
  380. return RSPAMD_LEARN_IGNORE;
  381. }
  382. if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_get, rt,
  383. "HGET %s %s",
  384. rt->ctx->redis_object, h) == REDIS_OK) {
  385. rspamd_session_add_event (task->s,
  386. rspamd_redis_cache_fin,
  387. rt,
  388. M);
  389. ev_timer_start (rt->task->event_loop, &rt->timer_ev);
  390. rt->has_event = TRUE;
  391. }
  392. /* We need to return OK every time */
  393. return RSPAMD_LEARN_OK;
  394. }
  395. gint
  396. rspamd_stat_cache_redis_learn (struct rspamd_task *task,
  397. gboolean is_spam,
  398. gpointer runtime)
  399. {
  400. struct rspamd_redis_cache_runtime *rt = runtime;
  401. gchar *h;
  402. gint flag;
  403. if (rt == NULL || rt->ctx == NULL || rspamd_session_blocked (task->s)) {
  404. return RSPAMD_LEARN_IGNORE;
  405. }
  406. h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
  407. g_assert (h != NULL);
  408. flag = (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM) ? 1 : -1;
  409. if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_set, rt,
  410. "HSET %s %s %d",
  411. rt->ctx->redis_object, h, flag) == REDIS_OK) {
  412. rspamd_session_add_event (task->s,
  413. rspamd_redis_cache_fin, rt, M);
  414. ev_timer_start (rt->task->event_loop, &rt->timer_ev);
  415. rt->has_event = TRUE;
  416. }
  417. /* We need to return OK every time */
  418. return RSPAMD_LEARN_OK;
  419. }
  420. void
  421. rspamd_stat_cache_redis_close (gpointer c)
  422. {
  423. struct rspamd_redis_cache_ctx *ctx = (struct rspamd_redis_cache_ctx *)c;
  424. lua_State *L;
  425. L = ctx->L;
  426. if (ctx->conf_ref) {
  427. luaL_unref (L, LUA_REGISTRYINDEX, ctx->conf_ref);
  428. }
  429. g_free (ctx);
  430. }