You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

async_session.c 8.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "rspamd.h"
  18. #include "contrib/uthash/utlist.h"
  19. #include "contrib/libucl/khash.h"
  20. #include "async_session.h"
  21. #include "cryptobox.h"
  22. #define RSPAMD_SESSION_FLAG_DESTROYING (1 << 1)
  23. #define RSPAMD_SESSION_FLAG_CLEANUP (1 << 2)
  24. #define RSPAMD_SESSION_CAN_ADD_EVENT(s) (!((s)->flags & (RSPAMD_SESSION_FLAG_DESTROYING|RSPAMD_SESSION_FLAG_CLEANUP)))
  25. #define msg_err_session(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
  26. "events", session->pool->tag.uid, \
  27. G_STRFUNC, \
  28. __VA_ARGS__)
  29. #define msg_warn_session(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
  30. "events", session->pool->tag.uid, \
  31. G_STRFUNC, \
  32. __VA_ARGS__)
  33. #define msg_info_session(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
  34. "events", session->pool->tag.uid, \
  35. G_STRFUNC, \
  36. __VA_ARGS__)
  37. #define msg_debug_session(...) rspamd_conditional_debug_fast (NULL, NULL, \
  38. rspamd_events_log_id, "events", session->pool->tag.uid, \
  39. G_STRFUNC, \
  40. __VA_ARGS__)
  41. INIT_LOG_MODULE(events)
  42. /* Average symbols count to optimize hash allocation */
  43. static struct rspamd_counter_data events_count;
  44. struct rspamd_async_event {
  45. const gchar *subsystem;
  46. const gchar *loc;
  47. event_finalizer_t fin;
  48. void *user_data;
  49. };
  50. static guint rspamd_event_hash (gconstpointer a);
  51. static gboolean rspamd_event_equal (gconstpointer a, gconstpointer b);
  52. /* Define **SET** of events */
  53. KHASH_INIT (rspamd_events_hash,
  54. struct rspamd_async_event *,
  55. char,
  56. false,
  57. rspamd_event_hash,
  58. rspamd_event_equal);
  59. struct rspamd_async_session {
  60. session_finalizer_t fin;
  61. event_finalizer_t restore;
  62. event_finalizer_t cleanup;
  63. khash_t(rspamd_events_hash) *events;
  64. void *user_data;
  65. rspamd_mempool_t *pool;
  66. guint flags;
  67. };
  68. static gboolean
  69. rspamd_event_equal (gconstpointer a, gconstpointer b)
  70. {
  71. const struct rspamd_async_event *ev1 = a, *ev2 = b;
  72. if (ev1->fin == ev2->fin) {
  73. return ev1->user_data == ev2->user_data;
  74. }
  75. return FALSE;
  76. }
  77. static guint
  78. rspamd_event_hash (gconstpointer a)
  79. {
  80. const struct rspamd_async_event *ev = a;
  81. union _pointer_fp_thunk {
  82. event_finalizer_t f;
  83. gpointer p;
  84. };
  85. struct ev_storage {
  86. union _pointer_fp_thunk p;
  87. gpointer ud;
  88. } st;
  89. st.p.f = ev->fin;
  90. st.ud = ev->user_data;
  91. return rspamd_cryptobox_fast_hash (&st, sizeof (st), rspamd_hash_seed ());
  92. }
  93. static void
  94. rspamd_session_dtor (gpointer d)
  95. {
  96. struct rspamd_async_session *s = (struct rspamd_async_session *)d;
  97. /* Events are usually empty at this point */
  98. rspamd_set_counter_ema (&events_count, s->events->n_buckets, 0.5);
  99. kh_destroy (rspamd_events_hash, s->events);
  100. }
  101. struct rspamd_async_session *
  102. rspamd_session_create (rspamd_mempool_t * pool,
  103. session_finalizer_t fin,
  104. event_finalizer_t restore,
  105. event_finalizer_t cleanup,
  106. void *user_data)
  107. {
  108. struct rspamd_async_session *s;
  109. s = rspamd_mempool_alloc0 (pool, sizeof (struct rspamd_async_session));
  110. s->pool = pool;
  111. s->fin = fin;
  112. s->restore = restore;
  113. s->cleanup = cleanup;
  114. s->user_data = user_data;
  115. s->events = kh_init (rspamd_events_hash);
  116. if (events_count.mean > 4) {
  117. kh_resize (rspamd_events_hash, s->events, events_count.mean);
  118. }
  119. else {
  120. kh_resize (rspamd_events_hash, s->events, 4);
  121. }
  122. rspamd_mempool_add_destructor (pool, rspamd_session_dtor, s);
  123. return s;
  124. }
  125. struct rspamd_async_event *
  126. rspamd_session_add_event_full (struct rspamd_async_session *session,
  127. event_finalizer_t fin,
  128. gpointer user_data,
  129. const gchar *subsystem,
  130. const gchar *loc)
  131. {
  132. struct rspamd_async_event *new_event;
  133. gint ret;
  134. if (session == NULL) {
  135. msg_err ("session is NULL");
  136. g_assert_not_reached ();
  137. }
  138. if (!RSPAMD_SESSION_CAN_ADD_EVENT (session)) {
  139. msg_debug_session ("skip adding event subsystem: %s: "
  140. "session is destroying/cleaning",
  141. subsystem);
  142. return NULL;
  143. }
  144. new_event = rspamd_mempool_alloc (session->pool,
  145. sizeof (struct rspamd_async_event));
  146. new_event->fin = fin;
  147. new_event->user_data = user_data;
  148. new_event->subsystem = subsystem;
  149. new_event->loc = loc;
  150. msg_debug_session ("added event: %p, pending %d (+1) events, "
  151. "subsystem: %s (%s)",
  152. user_data,
  153. kh_size (session->events),
  154. subsystem,
  155. loc);
  156. kh_put (rspamd_events_hash, session->events, new_event, &ret);
  157. g_assert (ret > 0);
  158. return new_event;
  159. }
  160. void
  161. rspamd_session_remove_event_full (struct rspamd_async_session *session,
  162. event_finalizer_t fin,
  163. void *ud,
  164. const gchar *loc)
  165. {
  166. struct rspamd_async_event search_ev, *found_ev;
  167. khiter_t k;
  168. if (session == NULL) {
  169. msg_err ("session is NULL");
  170. return;
  171. }
  172. if (!RSPAMD_SESSION_CAN_ADD_EVENT (session)) {
  173. /* Session is already cleaned up, ignore this */
  174. return;
  175. }
  176. /* Search for event */
  177. search_ev.fin = fin;
  178. search_ev.user_data = ud;
  179. k = kh_get (rspamd_events_hash, session->events, &search_ev);
  180. if (k == kh_end (session->events)) {
  181. gchar t;
  182. msg_err_session ("cannot find event: %p(%p) from %s", fin, ud, loc);
  183. kh_foreach (session->events, found_ev, t, {
  184. msg_err_session ("existing event %s (%s): %p(%p)",
  185. found_ev->subsystem,
  186. found_ev->loc,
  187. found_ev->fin,
  188. found_ev->user_data);
  189. });
  190. (void)t;
  191. g_assert_not_reached ();
  192. }
  193. found_ev = kh_key (session->events, k);
  194. msg_debug_session ("removed event: %p, pending %d (-1) events, "
  195. "subsystem: %s (%s), added at %s",
  196. ud,
  197. kh_size (session->events),
  198. found_ev->subsystem,
  199. loc,
  200. found_ev->loc);
  201. kh_del (rspamd_events_hash, session->events, k);
  202. /* Remove event */
  203. if (fin) {
  204. fin (ud);
  205. }
  206. rspamd_session_pending (session);
  207. }
  208. gboolean
  209. rspamd_session_destroy (struct rspamd_async_session *session)
  210. {
  211. if (session == NULL) {
  212. msg_err ("session is NULL");
  213. return FALSE;
  214. }
  215. if (!rspamd_session_blocked (session)) {
  216. session->flags |= RSPAMD_SESSION_FLAG_DESTROYING;
  217. rspamd_session_cleanup (session);
  218. if (session->cleanup != NULL) {
  219. session->cleanup (session->user_data);
  220. }
  221. }
  222. return TRUE;
  223. }
  224. void
  225. rspamd_session_cleanup (struct rspamd_async_session *session)
  226. {
  227. struct rspamd_async_event *ev;
  228. if (session == NULL) {
  229. msg_err ("session is NULL");
  230. return;
  231. }
  232. session->flags |= RSPAMD_SESSION_FLAG_CLEANUP;
  233. kh_foreach_key (session->events, ev, {
  234. /* Call event's finalizer */
  235. msg_debug_session ("removed event on destroy: %p, subsystem: %s",
  236. ev->user_data,
  237. ev->subsystem);
  238. if (ev->fin != NULL) {
  239. ev->fin (ev->user_data);
  240. }
  241. });
  242. kh_clear (rspamd_events_hash, session->events);
  243. session->flags &= ~RSPAMD_SESSION_FLAG_CLEANUP;
  244. }
  245. gboolean
  246. rspamd_session_pending (struct rspamd_async_session *session)
  247. {
  248. gboolean ret = TRUE;
  249. if (kh_size (session->events) == 0) {
  250. if (session->fin != NULL) {
  251. msg_debug_session ("call fin handler, as no events are pending");
  252. if (!session->fin (session->user_data)) {
  253. /* Session finished incompletely, perform restoration */
  254. msg_debug_session ("restore incomplete session");
  255. if (session->restore != NULL) {
  256. session->restore (session->user_data);
  257. }
  258. }
  259. else {
  260. ret = FALSE;
  261. }
  262. }
  263. ret = FALSE;
  264. }
  265. return ret;
  266. }
  267. guint
  268. rspamd_session_events_pending (struct rspamd_async_session *session)
  269. {
  270. guint npending;
  271. g_assert (session != NULL);
  272. npending = kh_size (session->events);
  273. msg_debug_session ("pending %d events", npending);
  274. return npending;
  275. }
  276. rspamd_mempool_t *
  277. rspamd_session_mempool (struct rspamd_async_session *session)
  278. {
  279. g_assert (session != NULL);
  280. return session->pool;
  281. }
  282. gboolean
  283. rspamd_session_blocked (struct rspamd_async_session *session)
  284. {
  285. g_assert (session != NULL);
  286. return !RSPAMD_SESSION_CAN_ADD_EVENT (session);
  287. }