From: Vsevolod Stakhov Date: Mon, 4 Mar 2019 18:13:17 +0000 (+0000) Subject: [Project] Preliminary addition of the HTTP connections pool X-Git-Tag: 1.9.0~53 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=102bbc23181ac715f24095594d9ab3cb96272297;p=rspamd.git [Project] Preliminary addition of the HTTP connections pool --- diff --git a/src/libutil/http_connection.c b/src/libutil/http_connection.c index 7bc92cb1f..aea13522d 100644 --- a/src/libutil/http_connection.c +++ b/src/libutil/http_connection.c @@ -690,13 +690,18 @@ rspamd_http_simple_client_helper (struct rspamd_http_connection *conn) struct rspamd_http_connection_private *priv; gpointer ssl; gint request_method; + rspamd_fstring_t *prev_host; priv = conn->priv; ssl = priv->ssl; priv->ssl = NULL; request_method = priv->msg->method; + /* Preserve host for keepalive */ + prev_host = priv->msg->host; + priv->msg->host = NULL; rspamd_http_connection_reset (conn); priv->ssl = ssl; + /* Plan read message */ if (conn->opts & RSPAMD_HTTP_CLIENT_SHARED) { @@ -708,7 +713,15 @@ rspamd_http_simple_client_helper (struct rspamd_http_connection *conn) conn->priv->ptv); } - priv->msg->method = request_method; + if (priv->msg) { + priv->msg->method = request_method; + priv->msg->host = prev_host; + } + else { + if (prev_host) { + rspamd_fstring_free (prev_host); + } + } } static void @@ -2145,200 +2158,6 @@ rspamd_http_connection_set_max_size (struct rspamd_http_connection *conn, conn->max_size = sz; } -void -rspamd_http_message_free (struct rspamd_http_message *msg) -{ - struct rspamd_http_header *hdr, *htmp, *hcur, *hcurtmp; - - - HASH_ITER (hh, msg->headers, hdr, htmp) { - HASH_DEL (msg->headers, hdr); - - DL_FOREACH_SAFE (hdr, hcur, hcurtmp) { - rspamd_fstring_free (hcur->combined); - g_free (hcur); - } - } - - rspamd_http_message_storage_cleanup (msg); - - if (msg->url != NULL) { - rspamd_fstring_free (msg->url); - } - if (msg->status != NULL) { - rspamd_fstring_free (msg->status); - } - if (msg->host != NULL) { - rspamd_fstring_free (msg->host); - } - if (msg->peer_key != NULL) { - rspamd_pubkey_unref (msg->peer_key); - } - - g_free (msg); -} - -void -rspamd_http_message_set_peer_key (struct rspamd_http_message *msg, - struct rspamd_cryptobox_pubkey *pk) -{ - if (msg->peer_key != NULL) { - rspamd_pubkey_unref (msg->peer_key); - } - - if (pk) { - msg->peer_key = rspamd_pubkey_ref (pk); - } - else { - msg->peer_key = NULL; - } -} - -void -rspamd_http_message_add_header_len (struct rspamd_http_message *msg, - const gchar *name, - const gchar *value, - gsize len) -{ - struct rspamd_http_header *hdr, *found = NULL; - guint nlen, vlen; - - if (msg != NULL && name != NULL && value != NULL) { - hdr = g_malloc0 (sizeof (struct rspamd_http_header)); - nlen = strlen (name); - vlen = len; - hdr->combined = rspamd_fstring_sized_new (nlen + vlen + 4); - rspamd_printf_fstring (&hdr->combined, "%s: %*s\r\n", name, (gint)vlen, - value); - hdr->name.begin = hdr->combined->str; - hdr->name.len = nlen; - hdr->value.begin = hdr->combined->str + nlen + 2; - hdr->value.len = vlen; - - HASH_FIND (hh, msg->headers, hdr->name.begin, - hdr->name.len, found); - - if (found == NULL) { - HASH_ADD_KEYPTR (hh, msg->headers, hdr->name.begin, - hdr->name.len, hdr); - } - - DL_APPEND (found, hdr); - } -} - -void -rspamd_http_message_add_header (struct rspamd_http_message *msg, - const gchar *name, - const gchar *value) -{ - if (value) { - rspamd_http_message_add_header_len (msg, name, value, strlen (value)); - } -} - -void -rspamd_http_message_add_header_fstr (struct rspamd_http_message *msg, - const gchar *name, - rspamd_fstring_t *value) -{ - struct rspamd_http_header *hdr, *found = NULL; - guint nlen, vlen; - - if (msg != NULL && name != NULL && value != NULL) { - hdr = g_malloc0 (sizeof (struct rspamd_http_header)); - nlen = strlen (name); - vlen = value->len; - hdr->combined = rspamd_fstring_sized_new (nlen + vlen + 4); - rspamd_printf_fstring (&hdr->combined, "%s: %V\r\n", name, value); - hdr->name.begin = hdr->combined->str; - hdr->name.len = nlen; - hdr->value.begin = hdr->combined->str + nlen + 2; - hdr->value.len = vlen; - - HASH_FIND (hh, msg->headers, hdr->name.begin, - hdr->name.len, found); - - if (found == NULL) { - HASH_ADD_KEYPTR (hh, msg->headers, hdr->name.begin, - hdr->name.len, hdr); - } - - DL_APPEND (found, hdr); - } -} - -const rspamd_ftok_t * -rspamd_http_message_find_header (struct rspamd_http_message *msg, - const gchar *name) -{ - struct rspamd_http_header *hdr; - const rspamd_ftok_t *res = NULL; - guint slen = strlen (name); - - if (msg != NULL) { - HASH_FIND (hh, msg->headers, name, slen, hdr); - - if (hdr) { - res = &hdr->value; - } - } - - return res; -} - -GPtrArray* -rspamd_http_message_find_header_multiple ( - struct rspamd_http_message *msg, - const gchar *name) -{ - GPtrArray *res = NULL; - struct rspamd_http_header *hdr, *cur; - - guint slen = strlen (name); - - if (msg != NULL) { - HASH_FIND (hh, msg->headers, name, slen, hdr); - - if (hdr) { - res = g_ptr_array_sized_new (4); - - LL_FOREACH (hdr, cur) { - g_ptr_array_add (res, &cur->value); - } - } - } - - - return res; -} - - -gboolean -rspamd_http_message_remove_header (struct rspamd_http_message *msg, - const gchar *name) -{ - struct rspamd_http_header *hdr, *hcur, *hcurtmp; - gboolean res = FALSE; - guint slen = strlen (name); - - if (msg != NULL) { - HASH_FIND (hh, msg->headers, name, slen, hdr); - - if (hdr) { - HASH_DEL (msg->headers, hdr); - res = TRUE; - - DL_FOREACH_SAFE (hdr, hcur, hcurtmp) { - rspamd_fstring_free (hcur->combined); - g_free (hcur); - } - } - } - - return res; -} - void rspamd_http_connection_set_key (struct rspamd_http_connection *conn, struct rspamd_cryptobox_keypair *key) diff --git a/src/libutil/http_connection.h b/src/libutil/http_connection.h index 87159bdd0..a327eec0d 100644 --- a/src/libutil/http_connection.h +++ b/src/libutil/http_connection.h @@ -44,6 +44,7 @@ struct rspamd_http_connection_private; struct rspamd_http_connection; struct rspamd_http_connection_router; struct rspamd_http_connection_entry; +struct rspamd_keepalive_hash_key; struct rspamd_storage_shmem { gchar *shm_name; @@ -106,6 +107,8 @@ struct rspamd_http_connection { rspamd_http_error_handler_t error_handler; rspamd_http_finish_handler_t finish_handler; gpointer ud; + /* Used for keepalive */ + struct rspamd_keepalive_hash_key *keepalive_hash_key; gsize max_size; unsigned opts; enum rspamd_http_connection_type type; diff --git a/src/libutil/http_context.c b/src/libutil/http_context.c index 6695e8032..e326a74a1 100644 --- a/src/libutil/http_context.c +++ b/src/libutil/http_context.c @@ -24,6 +24,34 @@ static struct rspamd_http_context *default_ctx = NULL; +struct rspamd_http_keepalive_cbdata { + struct rspamd_http_connection *conn; + GQueue *queue; + GList *link; + struct event ev; +}; + +static void +rspamd_http_keepalive_queue_cleanup (GQueue *conns) +{ + GList *cur; + + cur = conns->head; + + while (cur) { + struct rspamd_http_keepalive_cbdata *cbd; + + cbd = (struct rspamd_http_keepalive_cbdata *)cur->data; + rspamd_http_connection_unref (cbd->conn); + event_del (&cbd->ev); + g_free (cbd); + + cur = cur->next; + } + + g_queue_clear (conns); +} + static void rspamd_http_context_client_rotate_ev (gint fd, short what, void *arg) { @@ -69,6 +97,8 @@ rspamd_http_context_new_default (struct rspamd_config *cfg, ctx->ev_base = ev_base; + ctx->keep_alive_hash = kh_init (rspamd_keep_alive_hash); + return ctx; } @@ -161,6 +191,7 @@ rspamd_http_context_create (struct rspamd_config *cfg, return ctx; } + void rspamd_http_context_free (struct rspamd_http_context *ctx) { @@ -185,6 +216,20 @@ rspamd_http_context_free (struct rspamd_http_context *ctx) } } + struct rspamd_keepalive_hash_key *hk; + + kh_foreach_key (ctx->keep_alive_hash, hk, { + if (hk->host) { + g_free (hk->host); + } + + rspamd_inet_address_free (hk->addr); + rspamd_http_keepalive_queue_cleanup (&hk->conns); + g_free (hk); + }); + + kh_destroy (rspamd_keep_alive_hash, ctx->keep_alive_hash); + g_free (ctx); } @@ -210,32 +255,178 @@ rspamd_http_context_default (void) } gint32 -rspamd_keep_alive_key_hash (struct rspamd_keepalive_hash_key k) +rspamd_keep_alive_key_hash (struct rspamd_keepalive_hash_key *k) { gint32 h; - h = rspamd_inet_address_port_hash (k.addr); + h = rspamd_inet_address_port_hash (k->addr); - if (k.host) { - h = rspamd_cryptobox_fast_hash (k.host, strlen (k.host), h); + if (k->host) { + h = rspamd_cryptobox_fast_hash (k->host, strlen (k->host), h); } return h; } bool -rspamd_keep_alive_key_equal (struct rspamd_keepalive_hash_key k1, - struct rspamd_keepalive_hash_key k2) +rspamd_keep_alive_key_equal (struct rspamd_keepalive_hash_key *k1, + struct rspamd_keepalive_hash_key *k2) { - if (k1.host && k2.host) { - if (rspamd_inet_address_port_equal (k1.addr, k2.addr)) { - return strcmp (k1.host, k2.host); + if (k1->host && k2->host) { + if (rspamd_inet_address_port_equal (k1->addr, k2->addr)) { + return strcmp (k1->host, k2->host); } } - else if (!k1.host && !k2.host) { - return rspamd_inet_address_port_equal (k1.addr, k2.addr); + else if (!k1->host && !k2->host) { + return rspamd_inet_address_port_equal (k1->addr, k2->addr); } /* One has host and another has no host */ return false; +} + +struct rspamd_http_connection* +rspamd_http_context_check_keepalive (struct rspamd_http_context *ctx, + const rspamd_inet_addr_t *addr, + const gchar *host) +{ + struct rspamd_keepalive_hash_key hk, *phk; + khiter_t k; + + hk.addr = (rspamd_inet_addr_t *)addr; + hk.host = (gchar *)host; + + k = kh_get (rspamd_keep_alive_hash, ctx->keep_alive_hash, &hk); + + if (k != kh_end (ctx->keep_alive_hash)) { + phk = kh_key (ctx->keep_alive_hash, k); + GQueue *conns = &phk->conns; + + /* Use stack based approach */ + + if (g_queue_get_length (conns) > 0) { + struct rspamd_http_keepalive_cbdata *cbd; + struct rspamd_http_connection *conn; + + cbd = g_queue_pop_head (conns); + event_del (&cbd->ev); + conn = cbd->conn; + g_free (cbd); + + /* We transfer refcount here! */ + return conn; + } + } + + return NULL; +} + +void +rspamd_http_context_prepare_keepalive (struct rspamd_http_context *ctx, + struct rspamd_http_connection *conn, + const rspamd_inet_addr_t *addr, + const gchar *host) +{ + struct rspamd_keepalive_hash_key hk, *phk; + khiter_t k; + + hk.addr = (rspamd_inet_addr_t *)addr; + hk.host = (gchar *)host; + + k = kh_get (rspamd_keep_alive_hash, ctx->keep_alive_hash, &hk); + + if (k != kh_end (ctx->keep_alive_hash)) { + /* Reuse existing */ + conn->keepalive_hash_key = kh_key (ctx->keep_alive_hash, k); + } + else { + /* Create new one */ + GQueue empty_init = G_QUEUE_INIT; + gint r; + + phk = g_malloc (sizeof (*phk)); + phk->conns = empty_init; + phk->host = g_strdup (host); + phk->addr = rspamd_inet_address_copy (addr); + + kh_put (rspamd_keep_alive_hash, ctx->keep_alive_hash, phk, &r); + conn->keepalive_hash_key = phk; + } +} + +static void +rspamd_http_keepalive_handler (gint fd, short what, gpointer ud) +{ + struct rspamd_http_keepalive_cbdata *cbdata = + (struct rspamd_http_keepalive_cbdata *)ud; + /* + * We can get here if a remote side reported something or it has + * timed out. In both cases we just terminate keepalive connection. + */ + + g_queue_delete_link (cbdata->queue, cbdata->link); + rspamd_http_connection_unref (cbdata->conn); + g_free (cbdata); +} + +void +rspamd_http_context_push_keepalive (struct rspamd_http_context *ctx, + struct rspamd_http_connection *conn, + struct rspamd_http_message *msg, + struct event_base *ev_base) +{ + struct rspamd_http_keepalive_cbdata *cbdata; + struct timeval tv; + gdouble timeout = ctx->config.keepalive_interval; + + g_assert (conn->keepalive_hash_key != NULL); + + /* Move connection to the keepalive pool */ + cbdata = g_malloc0 (sizeof (*cbdata)); + + cbdata->conn = rspamd_http_connection_ref (conn); + g_queue_push_tail (&conn->keepalive_hash_key->conns, cbdata); + cbdata->link = conn->keepalive_hash_key->conns.tail; + cbdata->queue = &conn->keepalive_hash_key->conns; + + event_set (&cbdata->ev, conn->fd, EV_READ|EV_TIMEOUT, + rspamd_http_keepalive_handler, + &cbdata); + + if (msg) { + const rspamd_ftok_t *tok; + + tok = rspamd_http_message_find_header (msg, "Keep-Alive"); + + if (tok) { + goffset pos = rspamd_substring_search_caseless (tok->begin, + tok->len, "timeout=", sizeof ("timeout=") - 1); + + if (pos != -1) { + pos += sizeof ("timeout="); + + gchar *end_pos = memchr (tok->begin + pos, ',', tok->len - pos); + glong real_timeout; + + if (end_pos) { + if (rspamd_strtol (tok->begin + pos + 1, + (end_pos - tok->begin) - pos - 1, &real_timeout) && + real_timeout > 0) { + timeout = real_timeout; + } + } + else { + if (rspamd_strtol (tok->begin + pos + 1, + tok->len - pos - 1, &real_timeout) && + real_timeout > 0) { + timeout = real_timeout; + } + } + } + } + } + + double_to_tv (timeout, &tv); + event_base_set (ev_base, &cbdata->ev); + event_add (&cbdata->ev, &tv); } \ No newline at end of file diff --git a/src/libutil/http_context.h b/src/libutil/http_context.h index 7d0820687..74e5c69a6 100644 --- a/src/libutil/http_context.h +++ b/src/libutil/http_context.h @@ -19,16 +19,19 @@ #include "config.h" #include "ucl.h" +#include "addr.h" #include struct rspamd_http_context; struct rspamd_config; +struct rspamd_http_message; struct rspamd_http_context_cfg { guint kp_cache_size_client; guint kp_cache_size_server; guint ssl_cache_size; + gdouble keepalive_interval; gdouble client_key_rotate_time; const gchar *user_agent; }; @@ -53,4 +56,40 @@ void rspamd_http_context_free (struct rspamd_http_context *ctx); struct rspamd_http_context* rspamd_http_context_default (void); +/** + * Returns preserved keepalive connection if it's available. + * Refcount is transferred to caller! + * @param ctx + * @param addr + * @param host + * @return + */ +struct rspamd_http_connection* rspamd_http_context_check_keepalive ( + struct rspamd_http_context *ctx, const rspamd_inet_addr_t *addr, + const gchar *host); + +/** + * Prepares keepalive key for a connection by creating a new entry or by reusing existent + * Bear in mind, that keepalive pool has currently no cleanup methods! + * @param ctx + * @param conn + * @param addr + * @param host + */ +void rspamd_http_context_prepare_keepalive (struct rspamd_http_context *ctx, + struct rspamd_http_connection *conn, + const rspamd_inet_addr_t *addr, + const gchar *host); +/** + * Pushes a connection to keepalive pool after client request is finished, + * keepalive key *must* be prepared before using of this function + * @param ctx + * @param conn + * @param msg + */ +void rspamd_http_context_push_keepalive (struct rspamd_http_context *ctx, + struct rspamd_http_connection *conn, + struct rspamd_http_message *msg, + struct event_base *ev_base); + #endif diff --git a/src/libutil/http_message.c b/src/libutil/http_message.c index b090d2f71..0720dc416 100644 --- a/src/libutil/http_message.c +++ b/src/libutil/http_message.c @@ -18,6 +18,7 @@ #include "libutil/http_private.h" #include "libutil/printf.h" #include "libutil/logger.h" +#include "utlist.h" #include "unix-std.h" struct rspamd_http_message * @@ -463,3 +464,197 @@ rspamd_http_message_storage_cleanup (struct rspamd_http_message *msg) msg->body_buf.len = 0; } + +void +rspamd_http_message_free (struct rspamd_http_message *msg) +{ + struct rspamd_http_header *hdr, *htmp, *hcur, *hcurtmp; + + + HASH_ITER (hh, msg->headers, hdr, htmp) { + HASH_DEL (msg->headers, hdr); + + DL_FOREACH_SAFE (hdr, hcur, hcurtmp) { + rspamd_fstring_free (hcur->combined); + g_free (hcur); + } + } + + rspamd_http_message_storage_cleanup (msg); + + if (msg->url != NULL) { + rspamd_fstring_free (msg->url); + } + if (msg->status != NULL) { + rspamd_fstring_free (msg->status); + } + if (msg->host != NULL) { + rspamd_fstring_free (msg->host); + } + if (msg->peer_key != NULL) { + rspamd_pubkey_unref (msg->peer_key); + } + + g_free (msg); +} + +void +rspamd_http_message_set_peer_key (struct rspamd_http_message *msg, + struct rspamd_cryptobox_pubkey *pk) +{ + if (msg->peer_key != NULL) { + rspamd_pubkey_unref (msg->peer_key); + } + + if (pk) { + msg->peer_key = rspamd_pubkey_ref (pk); + } + else { + msg->peer_key = NULL; + } +} + +void +rspamd_http_message_add_header_len (struct rspamd_http_message *msg, + const gchar *name, + const gchar *value, + gsize len) +{ + struct rspamd_http_header *hdr, *found = NULL; + guint nlen, vlen; + + if (msg != NULL && name != NULL && value != NULL) { + hdr = g_malloc0 (sizeof (struct rspamd_http_header)); + nlen = strlen (name); + vlen = len; + hdr->combined = rspamd_fstring_sized_new (nlen + vlen + 4); + rspamd_printf_fstring (&hdr->combined, "%s: %*s\r\n", name, (gint)vlen, + value); + hdr->name.begin = hdr->combined->str; + hdr->name.len = nlen; + hdr->value.begin = hdr->combined->str + nlen + 2; + hdr->value.len = vlen; + + HASH_FIND (hh, msg->headers, hdr->name.begin, + hdr->name.len, found); + + if (found == NULL) { + HASH_ADD_KEYPTR (hh, msg->headers, hdr->name.begin, + hdr->name.len, hdr); + } + + DL_APPEND (found, hdr); + } +} + +void +rspamd_http_message_add_header (struct rspamd_http_message *msg, + const gchar *name, + const gchar *value) +{ + if (value) { + rspamd_http_message_add_header_len (msg, name, value, strlen (value)); + } +} + +void +rspamd_http_message_add_header_fstr (struct rspamd_http_message *msg, + const gchar *name, + rspamd_fstring_t *value) +{ + struct rspamd_http_header *hdr, *found = NULL; + guint nlen, vlen; + + if (msg != NULL && name != NULL && value != NULL) { + hdr = g_malloc0 (sizeof (struct rspamd_http_header)); + nlen = strlen (name); + vlen = value->len; + hdr->combined = rspamd_fstring_sized_new (nlen + vlen + 4); + rspamd_printf_fstring (&hdr->combined, "%s: %V\r\n", name, value); + hdr->name.begin = hdr->combined->str; + hdr->name.len = nlen; + hdr->value.begin = hdr->combined->str + nlen + 2; + hdr->value.len = vlen; + + HASH_FIND (hh, msg->headers, hdr->name.begin, + hdr->name.len, found); + + if (found == NULL) { + HASH_ADD_KEYPTR (hh, msg->headers, hdr->name.begin, + hdr->name.len, hdr); + } + + DL_APPEND (found, hdr); + } +} + +const rspamd_ftok_t * +rspamd_http_message_find_header (struct rspamd_http_message *msg, + const gchar *name) +{ + struct rspamd_http_header *hdr; + const rspamd_ftok_t *res = NULL; + guint slen = strlen (name); + + if (msg != NULL) { + HASH_FIND (hh, msg->headers, name, slen, hdr); + + if (hdr) { + res = &hdr->value; + } + } + + return res; +} + +GPtrArray* +rspamd_http_message_find_header_multiple ( + struct rspamd_http_message *msg, + const gchar *name) +{ + GPtrArray *res = NULL; + struct rspamd_http_header *hdr, *cur; + + guint slen = strlen (name); + + if (msg != NULL) { + HASH_FIND (hh, msg->headers, name, slen, hdr); + + if (hdr) { + res = g_ptr_array_sized_new (4); + + LL_FOREACH (hdr, cur) { + g_ptr_array_add (res, &cur->value); + } + } + } + + + return res; +} + + +gboolean +rspamd_http_message_remove_header (struct rspamd_http_message *msg, + const gchar *name) +{ + struct rspamd_http_header *hdr, *hcur, *hcurtmp; + gboolean res = FALSE; + guint slen = strlen (name); + + if (msg != NULL) { + HASH_FIND (hh, msg->headers, name, slen, hdr); + + if (hdr) { + HASH_DEL (msg->headers, hdr); + res = TRUE; + + DL_FOREACH_SAFE (hdr, hcur, hcurtmp) { + rspamd_fstring_free (hcur->combined); + g_free (hcur); + } + } + } + + return res; +} \ No newline at end of file diff --git a/src/libutil/http_private.h b/src/libutil/http_private.h index 29c6ea45f..dd3d0c6a9 100644 --- a/src/libutil/http_private.h +++ b/src/libutil/http_private.h @@ -80,14 +80,15 @@ struct rspamd_http_message { struct rspamd_keepalive_hash_key { rspamd_inet_addr_t *addr; gchar *host; + GQueue conns; }; -gint32 rspamd_keep_alive_key_hash (struct rspamd_keepalive_hash_key k); -bool rspamd_keep_alive_key_equal (struct rspamd_keepalive_hash_key k1, - struct rspamd_keepalive_hash_key k2); +gint32 rspamd_keep_alive_key_hash (struct rspamd_keepalive_hash_key* k); +bool rspamd_keep_alive_key_equal (struct rspamd_keepalive_hash_key* k1, + struct rspamd_keepalive_hash_key* k2); -KHASH_INIT (rspamd_keep_alive_hash, struct rspamd_keepalive_hash_key, - GQueue, true, rspamd_keep_alive_key_hash, rspamd_keep_alive_key_equal); +KHASH_INIT (rspamd_keep_alive_hash, struct rspamd_keepalive_hash_key *, + char, 0, rspamd_keep_alive_key_hash, rspamd_keep_alive_key_equal); struct rspamd_http_context { struct rspamd_http_context_cfg config;