You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

redis_pool.c 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include <event.h>
  18. #include "redis_pool.h"
  19. #include "cfg_file.h"
  20. #include "contrib/hiredis/hiredis.h"
  21. #include "contrib/hiredis/async.h"
  22. #include "contrib/hiredis/adapters/libevent.h"
  23. #include "cryptobox.h"
  24. #include "logger.h"
  25. struct rspamd_redis_pool_elt;
  26. struct rspamd_redis_pool_connection {
  27. struct redisAsyncContext *ctx;
  28. struct rspamd_redis_pool_elt *elt;
  29. GList *entry;
  30. struct event timeout;
  31. gboolean active;
  32. gchar tag[MEMPOOL_UID_LEN];
  33. ref_entry_t ref;
  34. };
  35. struct rspamd_redis_pool_elt {
  36. struct rspamd_redis_pool *pool;
  37. guint64 key;
  38. GQueue *active;
  39. GQueue *inactive;
  40. };
  41. struct rspamd_redis_pool {
  42. struct event_base *ev_base;
  43. struct rspamd_config *cfg;
  44. GHashTable *elts_by_key;
  45. GHashTable *elts_by_ctx;
  46. gdouble timeout;
  47. guint max_conns;
  48. };
  49. static const gdouble default_timeout = 10.0;
  50. static const guint default_max_conns = 100;
  51. #define msg_err_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
  52. "redis_pool", conn->tag, \
  53. G_STRFUNC, \
  54. __VA_ARGS__)
  55. #define msg_warn_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
  56. "redis_pool", conn->tag, \
  57. G_STRFUNC, \
  58. __VA_ARGS__)
  59. #define msg_info_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
  60. "redis_pool", conn->tag, \
  61. G_STRFUNC, \
  62. __VA_ARGS__)
  63. #define msg_debug_rpool(...) rspamd_conditional_debug_fast (NULL, NULL, \
  64. rspamd_redis_pool_log_id, "redis_pool", conn->tag, \
  65. G_STRFUNC, \
  66. __VA_ARGS__)
  67. INIT_LOG_MODULE(redis_pool)
  68. static inline guint64
  69. rspamd_redis_pool_get_key (const gchar *db, const gchar *password,
  70. const char *ip, int port)
  71. {
  72. rspamd_cryptobox_fast_hash_state_t st;
  73. rspamd_cryptobox_fast_hash_init (&st, rspamd_hash_seed ());
  74. if (db) {
  75. rspamd_cryptobox_fast_hash_update (&st, db, strlen (db));
  76. }
  77. if (password) {
  78. rspamd_cryptobox_fast_hash_update (&st, password, strlen (password));
  79. }
  80. rspamd_cryptobox_fast_hash_update (&st, ip, strlen (ip));
  81. rspamd_cryptobox_fast_hash_update (&st, &port, sizeof (port));
  82. return rspamd_cryptobox_fast_hash_final (&st);
  83. }
  84. static void
  85. rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn)
  86. {
  87. if (conn->active) {
  88. msg_debug_rpool ("active connection removed");
  89. if (conn->ctx) {
  90. if (!(conn->ctx->c.flags & REDIS_FREEING)) {
  91. redisAsyncContext *ac = conn->ctx;
  92. conn->ctx = NULL;
  93. g_hash_table_remove (conn->elt->pool->elts_by_ctx, ac);
  94. ac->onDisconnect = NULL;
  95. redisAsyncFree (ac);
  96. }
  97. }
  98. if (conn->entry) {
  99. g_queue_unlink (conn->elt->active, conn->entry);
  100. }
  101. }
  102. else {
  103. msg_debug_rpool ("inactive connection removed");
  104. if (rspamd_event_pending (&conn->timeout, EV_TIMEOUT)) {
  105. event_del (&conn->timeout);
  106. }
  107. if (conn->ctx && !(conn->ctx->c.flags & REDIS_FREEING)) {
  108. redisAsyncContext *ac = conn->ctx;
  109. /* To prevent on_disconnect here */
  110. conn->active = TRUE;
  111. g_hash_table_remove (conn->elt->pool->elts_by_ctx, ac);
  112. conn->ctx = NULL;
  113. ac->onDisconnect = NULL;
  114. redisAsyncFree (ac);
  115. }
  116. if (conn->entry) {
  117. g_queue_unlink (conn->elt->inactive, conn->entry);
  118. }
  119. }
  120. if (conn->entry) {
  121. g_list_free (conn->entry);
  122. }
  123. g_free (conn);
  124. }
  125. static void
  126. rspamd_redis_pool_elt_dtor (gpointer p)
  127. {
  128. GList *cur;
  129. struct rspamd_redis_pool_elt *elt = p;
  130. struct rspamd_redis_pool_connection *c;
  131. for (cur = elt->active->head; cur != NULL; cur = g_list_next (cur)) {
  132. c = cur->data;
  133. c->entry = NULL;
  134. REF_RELEASE (c);
  135. }
  136. for (cur = elt->inactive->head; cur != NULL; cur = g_list_next (cur)) {
  137. c = cur->data;
  138. c->entry = NULL;
  139. REF_RELEASE (c);
  140. }
  141. g_queue_free (elt->active);
  142. g_queue_free (elt->inactive);
  143. g_free (elt);
  144. }
  145. static void
  146. rspamd_redis_conn_timeout (gint fd, short what, gpointer p)
  147. {
  148. struct rspamd_redis_pool_connection *conn = p;
  149. g_assert (!conn->active);
  150. msg_debug_rpool ("scheduled removal of connection %p, refcount: %d",
  151. conn->ctx, conn->ref.refcount);
  152. REF_RELEASE (conn);
  153. }
  154. static void
  155. rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn)
  156. {
  157. struct timeval tv;
  158. gdouble real_timeout;
  159. guint active_elts;
  160. active_elts = g_queue_get_length (conn->elt->active);
  161. if (active_elts > conn->elt->pool->max_conns) {
  162. real_timeout = conn->elt->pool->timeout / 2.0;
  163. real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 4.0);
  164. }
  165. else {
  166. real_timeout = conn->elt->pool->timeout;
  167. real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 2.0);
  168. }
  169. msg_debug_rpool ("scheduled connection %p cleanup in %.1f seconds",
  170. conn->ctx, real_timeout);
  171. double_to_tv (real_timeout, &tv);
  172. event_set (&conn->timeout, -1, EV_TIMEOUT, rspamd_redis_conn_timeout, conn);
  173. event_base_set (conn->elt->pool->ev_base, &conn->timeout);
  174. event_add (&conn->timeout, &tv);
  175. }
  176. static void
  177. rspamd_redis_pool_on_disconnect (const struct redisAsyncContext *ac, int status,
  178. void *ud)
  179. {
  180. struct rspamd_redis_pool_connection *conn = ud;
  181. /*
  182. * Here, we know that redis itself will free this connection
  183. * so, we need to do something very clever about it
  184. */
  185. if (!conn->active) {
  186. /* Do nothing for active connections as it is already handled somewhere */
  187. if (conn->ctx) {
  188. msg_debug_rpool ("inactive connection terminated: %s, refs: %d",
  189. conn->ctx->errstr, conn->ref.refcount);
  190. }
  191. REF_RELEASE (conn);
  192. }
  193. }
  194. static struct rspamd_redis_pool_connection *
  195. rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
  196. struct rspamd_redis_pool_elt *elt,
  197. const char *db,
  198. const char *password,
  199. const char *ip,
  200. gint port)
  201. {
  202. struct rspamd_redis_pool_connection *conn;
  203. struct redisAsyncContext *ctx;
  204. if (*ip == '/' || *ip == '.') {
  205. ctx = redisAsyncConnectUnix (ip);
  206. }
  207. else {
  208. ctx = redisAsyncConnect (ip, port);
  209. }
  210. if (ctx) {
  211. if (ctx->err != REDIS_OK) {
  212. msg_err ("cannot connect to redis: %s", ctx->errstr);
  213. redisAsyncFree (ctx);
  214. return NULL;
  215. }
  216. else {
  217. conn = g_malloc0 (sizeof (*conn));
  218. conn->entry = g_list_prepend (NULL, conn);
  219. conn->elt = elt;
  220. conn->active = TRUE;
  221. g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
  222. g_queue_push_head_link (elt->active, conn->entry);
  223. conn->ctx = ctx;
  224. rspamd_random_hex (conn->tag, sizeof (conn->tag));
  225. REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor);
  226. msg_debug_rpool ("created new connection to %s:%d: %p", ip, port, ctx);
  227. redisLibeventAttach (ctx, pool->ev_base);
  228. redisAsyncSetDisconnectCallback (ctx, rspamd_redis_pool_on_disconnect,
  229. conn);
  230. if (password) {
  231. redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
  232. }
  233. if (db) {
  234. redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db);
  235. }
  236. }
  237. return conn;
  238. }
  239. return NULL;
  240. }
  241. static struct rspamd_redis_pool_elt *
  242. rspamd_redis_pool_new_elt (struct rspamd_redis_pool *pool)
  243. {
  244. struct rspamd_redis_pool_elt *elt;
  245. elt = g_malloc0 (sizeof (*elt));
  246. elt->active = g_queue_new ();
  247. elt->inactive = g_queue_new ();
  248. elt->pool = pool;
  249. return elt;
  250. }
  251. struct rspamd_redis_pool *
  252. rspamd_redis_pool_init (void)
  253. {
  254. struct rspamd_redis_pool *pool;
  255. pool = g_malloc0 (sizeof (*pool));
  256. pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal, NULL,
  257. rspamd_redis_pool_elt_dtor);
  258. pool->elts_by_ctx = g_hash_table_new (g_direct_hash, g_direct_equal);
  259. return pool;
  260. }
  261. void
  262. rspamd_redis_pool_config (struct rspamd_redis_pool *pool,
  263. struct rspamd_config *cfg,
  264. struct event_base *ev_base)
  265. {
  266. g_assert (pool != NULL);
  267. pool->ev_base = ev_base;
  268. pool->cfg = cfg;
  269. pool->timeout = default_timeout;
  270. pool->max_conns = default_max_conns;
  271. }
  272. struct redisAsyncContext*
  273. rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
  274. const gchar *db, const gchar *password,
  275. const char *ip, int port)
  276. {
  277. guint64 key;
  278. struct rspamd_redis_pool_elt *elt;
  279. GList *conn_entry;
  280. struct rspamd_redis_pool_connection *conn;
  281. g_assert (pool != NULL);
  282. g_assert (pool->ev_base != NULL);
  283. g_assert (ip != NULL);
  284. key = rspamd_redis_pool_get_key (db, password, ip, port);
  285. elt = g_hash_table_lookup (pool->elts_by_key, &key);
  286. if (elt) {
  287. if (g_queue_get_length (elt->inactive) > 0) {
  288. conn_entry = g_queue_pop_head_link (elt->inactive);
  289. conn = conn_entry->data;
  290. g_assert (!conn->active);
  291. if (conn->ctx->err == REDIS_OK) {
  292. event_del (&conn->timeout);
  293. conn->active = TRUE;
  294. g_queue_push_tail_link (elt->active, conn_entry);
  295. msg_debug_rpool ("reused existing connection to %s:%d: %p",
  296. ip, port, conn->ctx);
  297. }
  298. else {
  299. g_list_free (conn->entry);
  300. conn->entry = NULL;
  301. REF_RELEASE (conn);
  302. conn = rspamd_redis_pool_new_connection (pool, elt,
  303. db, password, ip, port);
  304. }
  305. }
  306. else {
  307. /* Need to create connection */
  308. conn = rspamd_redis_pool_new_connection (pool, elt,
  309. db, password, ip, port);
  310. }
  311. }
  312. else {
  313. /* Need to create a pool */
  314. elt = rspamd_redis_pool_new_elt (pool);
  315. elt->key = key;
  316. g_hash_table_insert (pool->elts_by_key, &elt->key, elt);
  317. conn = rspamd_redis_pool_new_connection (pool, elt,
  318. db, password, ip, port);
  319. }
  320. if (!conn) {
  321. return NULL;
  322. }
  323. REF_RETAIN (conn);
  324. return conn->ctx;
  325. }
  326. void
  327. rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
  328. struct redisAsyncContext *ctx, gboolean is_fatal)
  329. {
  330. struct rspamd_redis_pool_connection *conn;
  331. g_assert (pool != NULL);
  332. g_assert (ctx != NULL);
  333. conn = g_hash_table_lookup (pool->elts_by_ctx, ctx);
  334. if (conn != NULL) {
  335. g_assert (conn->active);
  336. if (is_fatal || ctx->err != REDIS_OK) {
  337. /* We need to terminate connection forcefully */
  338. msg_debug_rpool ("closed connection %p forcefully", conn->ctx);
  339. REF_RELEASE (conn);
  340. }
  341. else {
  342. /* Ensure that there are no callbacks attached to this conn */
  343. if (ctx->replies.head == NULL) {
  344. /* Just move it to the inactive queue */
  345. g_queue_unlink (conn->elt->active, conn->entry);
  346. g_queue_push_head_link (conn->elt->inactive, conn->entry);
  347. conn->active = FALSE;
  348. rspamd_redis_pool_schedule_timeout (conn);
  349. msg_debug_rpool ("mark connection %p inactive", conn->ctx);
  350. }
  351. else {
  352. msg_debug_rpool ("closed connection %p due to callbacks left",
  353. conn->ctx);
  354. REF_RELEASE (conn);
  355. }
  356. }
  357. REF_RELEASE (conn);
  358. }
  359. else {
  360. g_assert_not_reached ();
  361. }
  362. }
  363. void
  364. rspamd_redis_pool_destroy (struct rspamd_redis_pool *pool)
  365. {
  366. struct rspamd_redis_pool_elt *elt;
  367. GHashTableIter it;
  368. gpointer k, v;
  369. g_assert (pool != NULL);
  370. g_hash_table_iter_init (&it, pool->elts_by_key);
  371. while (g_hash_table_iter_next (&it, &k, &v)) {
  372. elt = v;
  373. rspamd_redis_pool_elt_dtor (elt);
  374. g_hash_table_iter_steal (&it);
  375. }
  376. g_hash_table_unref (pool->elts_by_ctx);
  377. g_hash_table_unref (pool->elts_by_key);
  378. g_free (pool);
  379. }
  380. const gchar*
  381. rspamd_redis_type_to_string (int type)
  382. {
  383. const gchar *ret = "unknown";
  384. switch (type) {
  385. case REDIS_REPLY_STRING:
  386. ret = "string";
  387. break;
  388. case REDIS_REPLY_ARRAY:
  389. ret = "array";
  390. break;
  391. case REDIS_REPLY_INTEGER:
  392. ret = "int";
  393. break;
  394. case REDIS_REPLY_STATUS:
  395. ret = "status";
  396. break;
  397. case REDIS_REPLY_NIL:
  398. ret = "nil";
  399. break;
  400. case REDIS_REPLY_ERROR:
  401. ret = "error";
  402. break;
  403. default:
  404. break;
  405. }
  406. return ret;
  407. }