Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

redis_pool.c 7.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include <event.h>
  18. #include "redis_pool.h"
  19. #include "cfg_file.h"
  20. #include "contrib/hiredis/hiredis.h"
  21. #include "contrib/hiredis/async.h"
  22. #include "contrib/hiredis/adapters/libevent.h"
  23. #include "cryptobox.h"
  24. #include "ref.h"
  25. #include "logger.h"
  26. struct rspamd_redis_pool_elt;
  27. struct rspamd_redis_pool_connection {
  28. struct redisAsyncContext *ctx;
  29. struct rspamd_redis_pool_elt *elt;
  30. GList *entry;
  31. struct event timeout;
  32. gboolean active;
  33. ref_entry_t ref;
  34. };
  35. struct rspamd_redis_pool_elt {
  36. struct rspamd_redis_pool *pool;
  37. guint64 key;
  38. GQueue *active;
  39. GQueue *inactive;
  40. };
  41. struct rspamd_redis_pool {
  42. struct event_base *ev_base;
  43. struct rspamd_config *cfg;
  44. GHashTable *elts_by_key;
  45. GHashTable *elts_by_ctx;
  46. gdouble timeout;
  47. guint max_conns;
  48. };
  49. static const gdouble default_timeout = 60.0;
  50. static const guint default_max_conns = 100;
  51. static inline guint64
  52. rspamd_redis_pool_get_key (const gchar *db, const gchar *password,
  53. const char *ip, int port)
  54. {
  55. rspamd_cryptobox_fast_hash_state_t st;
  56. rspamd_cryptobox_fast_hash_init (&st, rspamd_hash_seed ());
  57. if (db) {
  58. rspamd_cryptobox_fast_hash_update (&st, db, strlen (db));
  59. }
  60. if (password) {
  61. rspamd_cryptobox_fast_hash_update (&st, password, strlen (password));
  62. }
  63. rspamd_cryptobox_fast_hash_update (&st, ip, strlen (ip));
  64. rspamd_cryptobox_fast_hash_update (&st, &port, sizeof (port));
  65. return rspamd_cryptobox_fast_hash_final (&st);
  66. }
  67. static void
  68. rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *c)
  69. {
  70. if (c->active) {
  71. if (c->ctx) {
  72. g_hash_table_remove (c->elt->pool->elts_by_ctx, c->ctx);
  73. redisAsyncFree (c->ctx);
  74. }
  75. g_queue_unlink (c->elt->active, c->entry);
  76. }
  77. else {
  78. if (event_get_base (&c->timeout)) {
  79. event_del (&c->timeout);
  80. }
  81. g_queue_unlink (c->elt->inactive, c->entry);
  82. }
  83. g_list_free (c->entry);
  84. g_slice_free1 (sizeof (*c), c);
  85. }
  86. static void
  87. rspamd_redis_pool_elt_dtor (gpointer p)
  88. {
  89. GList *cur;
  90. struct rspamd_redis_pool_elt *elt = p;
  91. struct rspamd_redis_pool_connection *c;
  92. for (cur = elt->active->head; cur != NULL; cur = g_list_next (cur)) {
  93. c = cur->data;
  94. REF_RELEASE (c);
  95. }
  96. for (cur = elt->inactive->head; cur != NULL; cur = g_list_next (cur)) {
  97. c = cur->data;
  98. REF_RELEASE (c);
  99. }
  100. g_queue_free (elt->active);
  101. g_queue_free (elt->inactive);
  102. g_slice_free1 (sizeof (*elt), elt);
  103. }
  104. static void
  105. rspamd_redis_conn_timeout (gint fd, short what, gpointer p)
  106. {
  107. struct rspamd_redis_pool_connection *conn = p;
  108. REF_RELEASE (conn);
  109. }
  110. static void
  111. rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn)
  112. {
  113. struct timeval tv;
  114. gdouble real_timeout;
  115. guint active_elts;
  116. active_elts = g_queue_get_length (conn->elt->active);
  117. if (active_elts > conn->elt->pool->max_conns) {
  118. real_timeout = conn->elt->pool->timeout / 2.0;
  119. real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 4.0);
  120. }
  121. else {
  122. real_timeout = conn->elt->pool->timeout;
  123. real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 2.0);
  124. }
  125. double_to_tv (real_timeout, &tv);
  126. event_set (&conn->timeout, -1, EV_TIMEOUT, rspamd_redis_conn_timeout, conn);
  127. event_base_set (conn->elt->pool->ev_base, &conn->timeout);
  128. event_add (&conn->timeout, &tv);
  129. }
  130. static struct rspamd_redis_pool_connection *
  131. rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
  132. struct rspamd_redis_pool_elt *elt,
  133. const char *db,
  134. const char *password,
  135. const char *ip,
  136. gint port)
  137. {
  138. struct rspamd_redis_pool_connection *conn;
  139. struct redisAsyncContext *ctx;
  140. ctx = redisAsyncConnect (ip, port);
  141. if (ctx) {
  142. if (ctx->err != REDIS_OK) {
  143. redisAsyncFree (ctx);
  144. return NULL;
  145. }
  146. else {
  147. conn = g_slice_alloc0 (sizeof (*conn));
  148. conn->entry = g_list_prepend (NULL, conn);
  149. conn->elt = elt;
  150. conn->active = TRUE;
  151. g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
  152. g_queue_push_head_link (elt->active, conn->entry);
  153. conn->ctx = ctx;
  154. REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor);
  155. redisLibeventAttach (ctx, pool->ev_base);
  156. if (password) {
  157. redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
  158. }
  159. if (db) {
  160. redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db);
  161. }
  162. }
  163. return conn;
  164. }
  165. return NULL;
  166. }
  167. static struct rspamd_redis_pool_elt *
  168. rspamd_redis_pool_new_elt (struct rspamd_redis_pool *pool)
  169. {
  170. struct rspamd_redis_pool_elt *elt;
  171. elt = g_slice_alloc0 (sizeof (*elt));
  172. elt->active = g_queue_new ();
  173. elt->inactive = g_queue_new ();
  174. elt->pool = pool;
  175. return elt;
  176. }
  177. struct rspamd_redis_pool *
  178. rspamd_redis_pool_init (void)
  179. {
  180. struct rspamd_redis_pool *pool;
  181. pool = g_slice_alloc0 (sizeof (*pool));
  182. pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal, NULL,
  183. rspamd_redis_pool_elt_dtor);
  184. pool->elts_by_ctx = g_hash_table_new (g_direct_hash, g_direct_equal);
  185. return pool;
  186. }
  187. void
  188. rspamd_redis_pool_config (struct rspamd_redis_pool *pool,
  189. struct rspamd_config *cfg,
  190. struct event_base *ev_base)
  191. {
  192. g_assert (pool != NULL);
  193. pool->ev_base = ev_base;
  194. pool->cfg = cfg;
  195. pool->timeout = default_timeout;
  196. pool->max_conns = default_max_conns;
  197. }
  198. struct redisAsyncContext*
  199. rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
  200. const gchar *db, const gchar *password,
  201. const char *ip, int port)
  202. {
  203. guint64 key;
  204. struct rspamd_redis_pool_elt *elt;
  205. GList *conn_entry;
  206. struct rspamd_redis_pool_connection *conn;
  207. g_assert (pool != NULL);
  208. g_assert (pool->ev_base != NULL);
  209. g_assert (ip != NULL);
  210. key = rspamd_redis_pool_get_key (db, password, ip, port);
  211. elt = g_hash_table_lookup (pool->elts_by_key, &key);
  212. if (elt) {
  213. if (g_queue_get_length (elt->inactive) > 0) {
  214. conn_entry = g_queue_pop_head_link (elt->inactive);
  215. conn = conn_entry->data;
  216. if (event_get_base (&conn->timeout)) {
  217. event_del (&conn->timeout);
  218. }
  219. conn->active = TRUE;
  220. g_queue_push_tail_link (elt->active, conn_entry);
  221. }
  222. else {
  223. /* Need to create connection */
  224. conn = rspamd_redis_pool_new_connection (pool, elt,
  225. db, password, ip, port);
  226. }
  227. }
  228. else {
  229. elt = rspamd_redis_pool_new_elt (pool);
  230. elt->key = key;
  231. g_hash_table_insert (pool->elts_by_key, &elt->key, elt);
  232. conn = rspamd_redis_pool_new_connection (pool, elt,
  233. db, password, ip, port);
  234. }
  235. REF_RETAIN (conn);
  236. return conn->ctx;
  237. }
  238. void
  239. rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
  240. struct redisAsyncContext *ctx, gboolean is_fatal)
  241. {
  242. struct rspamd_redis_pool_connection *conn;
  243. g_assert (pool != NULL);
  244. g_assert (ctx != NULL);
  245. conn = g_hash_table_lookup (pool->elts_by_ctx, ctx);
  246. if (conn != NULL) {
  247. REF_RELEASE (conn);
  248. if (is_fatal || ctx->err == REDIS_ERR_IO || ctx->err == REDIS_ERR_EOF) {
  249. /* We need to terminate connection forcefully */
  250. REF_RELEASE (conn);
  251. }
  252. else {
  253. /* Just move it to the inactive queue */
  254. g_queue_unlink (conn->elt->active, conn->entry);
  255. g_queue_push_head_link (conn->elt->inactive, conn->entry);
  256. conn->active = FALSE;
  257. rspamd_redis_pool_schedule_timeout (conn);
  258. }
  259. }
  260. else {
  261. g_assert_not_reached ();
  262. }
  263. }
  264. void
  265. rspamd_redis_pool_destroy (struct rspamd_redis_pool *pool)
  266. {
  267. struct rspamd_redis_pool_elt *elt;
  268. GHashTableIter it;
  269. gpointer k, v;
  270. g_assert (pool != NULL);
  271. g_hash_table_iter_init (&it, pool->elts_by_key);
  272. while (g_hash_table_iter_next (&it, &k, &v)) {
  273. elt = v;
  274. rspamd_redis_pool_elt_dtor (elt);
  275. g_hash_table_iter_steal (&it);
  276. }
  277. g_hash_table_unref (pool->elts_by_ctx);
  278. g_hash_table_unref (pool->elts_by_key);
  279. g_slice_free1 (sizeof (*pool), pool);
  280. }