Parcourir la source

[Project] Preliminary addition of the HTTP connections pool

tags/1.9.0
Vsevolod Stakhov il y a 5 ans
Parent
révision
102bbc2318

+ 14
- 195
src/libutil/http_connection.c Voir le fichier

@@ -690,13 +690,18 @@ rspamd_http_simple_client_helper (struct rspamd_http_connection *conn)
struct rspamd_http_connection_private *priv;
gpointer ssl;
gint request_method;
rspamd_fstring_t *prev_host;

priv = conn->priv;
ssl = priv->ssl;
priv->ssl = NULL;
request_method = priv->msg->method;
/* Preserve host for keepalive */
prev_host = priv->msg->host;
priv->msg->host = NULL;
rspamd_http_connection_reset (conn);
priv->ssl = ssl;

/* Plan read message */

if (conn->opts & RSPAMD_HTTP_CLIENT_SHARED) {
@@ -708,7 +713,15 @@ rspamd_http_simple_client_helper (struct rspamd_http_connection *conn)
conn->priv->ptv);
}

priv->msg->method = request_method;
if (priv->msg) {
priv->msg->method = request_method;
priv->msg->host = prev_host;
}
else {
if (prev_host) {
rspamd_fstring_free (prev_host);
}
}
}

static void
@@ -2145,200 +2158,6 @@ rspamd_http_connection_set_max_size (struct rspamd_http_connection *conn,
conn->max_size = sz;
}

void
rspamd_http_message_free (struct rspamd_http_message *msg)
{
struct rspamd_http_header *hdr, *htmp, *hcur, *hcurtmp;


HASH_ITER (hh, msg->headers, hdr, htmp) {
HASH_DEL (msg->headers, hdr);

DL_FOREACH_SAFE (hdr, hcur, hcurtmp) {
rspamd_fstring_free (hcur->combined);
g_free (hcur);
}
}

rspamd_http_message_storage_cleanup (msg);

if (msg->url != NULL) {
rspamd_fstring_free (msg->url);
}
if (msg->status != NULL) {
rspamd_fstring_free (msg->status);
}
if (msg->host != NULL) {
rspamd_fstring_free (msg->host);
}
if (msg->peer_key != NULL) {
rspamd_pubkey_unref (msg->peer_key);
}

g_free (msg);
}

void
rspamd_http_message_set_peer_key (struct rspamd_http_message *msg,
struct rspamd_cryptobox_pubkey *pk)
{
if (msg->peer_key != NULL) {
rspamd_pubkey_unref (msg->peer_key);
}

if (pk) {
msg->peer_key = rspamd_pubkey_ref (pk);
}
else {
msg->peer_key = NULL;
}
}

void
rspamd_http_message_add_header_len (struct rspamd_http_message *msg,
const gchar *name,
const gchar *value,
gsize len)
{
struct rspamd_http_header *hdr, *found = NULL;
guint nlen, vlen;

if (msg != NULL && name != NULL && value != NULL) {
hdr = g_malloc0 (sizeof (struct rspamd_http_header));
nlen = strlen (name);
vlen = len;
hdr->combined = rspamd_fstring_sized_new (nlen + vlen + 4);
rspamd_printf_fstring (&hdr->combined, "%s: %*s\r\n", name, (gint)vlen,
value);
hdr->name.begin = hdr->combined->str;
hdr->name.len = nlen;
hdr->value.begin = hdr->combined->str + nlen + 2;
hdr->value.len = vlen;

HASH_FIND (hh, msg->headers, hdr->name.begin,
hdr->name.len, found);

if (found == NULL) {
HASH_ADD_KEYPTR (hh, msg->headers, hdr->name.begin,
hdr->name.len, hdr);
}

DL_APPEND (found, hdr);
}
}

void
rspamd_http_message_add_header (struct rspamd_http_message *msg,
const gchar *name,
const gchar *value)
{
if (value) {
rspamd_http_message_add_header_len (msg, name, value, strlen (value));
}
}

void
rspamd_http_message_add_header_fstr (struct rspamd_http_message *msg,
const gchar *name,
rspamd_fstring_t *value)
{
struct rspamd_http_header *hdr, *found = NULL;
guint nlen, vlen;

if (msg != NULL && name != NULL && value != NULL) {
hdr = g_malloc0 (sizeof (struct rspamd_http_header));
nlen = strlen (name);
vlen = value->len;
hdr->combined = rspamd_fstring_sized_new (nlen + vlen + 4);
rspamd_printf_fstring (&hdr->combined, "%s: %V\r\n", name, value);
hdr->name.begin = hdr->combined->str;
hdr->name.len = nlen;
hdr->value.begin = hdr->combined->str + nlen + 2;
hdr->value.len = vlen;

HASH_FIND (hh, msg->headers, hdr->name.begin,
hdr->name.len, found);

if (found == NULL) {
HASH_ADD_KEYPTR (hh, msg->headers, hdr->name.begin,
hdr->name.len, hdr);
}

DL_APPEND (found, hdr);
}
}

const rspamd_ftok_t *
rspamd_http_message_find_header (struct rspamd_http_message *msg,
const gchar *name)
{
struct rspamd_http_header *hdr;
const rspamd_ftok_t *res = NULL;
guint slen = strlen (name);

if (msg != NULL) {
HASH_FIND (hh, msg->headers, name, slen, hdr);

if (hdr) {
res = &hdr->value;
}
}

return res;
}

GPtrArray*
rspamd_http_message_find_header_multiple (
struct rspamd_http_message *msg,
const gchar *name)
{
GPtrArray *res = NULL;
struct rspamd_http_header *hdr, *cur;

guint slen = strlen (name);

if (msg != NULL) {
HASH_FIND (hh, msg->headers, name, slen, hdr);

if (hdr) {
res = g_ptr_array_sized_new (4);

LL_FOREACH (hdr, cur) {
g_ptr_array_add (res, &cur->value);
}
}
}


return res;
}


gboolean
rspamd_http_message_remove_header (struct rspamd_http_message *msg,
const gchar *name)
{
struct rspamd_http_header *hdr, *hcur, *hcurtmp;
gboolean res = FALSE;
guint slen = strlen (name);

if (msg != NULL) {
HASH_FIND (hh, msg->headers, name, slen, hdr);

if (hdr) {
HASH_DEL (msg->headers, hdr);
res = TRUE;

DL_FOREACH_SAFE (hdr, hcur, hcurtmp) {
rspamd_fstring_free (hcur->combined);
g_free (hcur);
}
}
}

return res;
}

void
rspamd_http_connection_set_key (struct rspamd_http_connection *conn,
struct rspamd_cryptobox_keypair *key)

+ 3
- 0
src/libutil/http_connection.h Voir le fichier

@@ -44,6 +44,7 @@ struct rspamd_http_connection_private;
struct rspamd_http_connection;
struct rspamd_http_connection_router;
struct rspamd_http_connection_entry;
struct rspamd_keepalive_hash_key;

struct rspamd_storage_shmem {
gchar *shm_name;
@@ -106,6 +107,8 @@ struct rspamd_http_connection {
rspamd_http_error_handler_t error_handler;
rspamd_http_finish_handler_t finish_handler;
gpointer ud;
/* Used for keepalive */
struct rspamd_keepalive_hash_key *keepalive_hash_key;
gsize max_size;
unsigned opts;
enum rspamd_http_connection_type type;

+ 202
- 11
src/libutil/http_context.c Voir le fichier

@@ -24,6 +24,34 @@

static struct rspamd_http_context *default_ctx = NULL;

struct rspamd_http_keepalive_cbdata {
struct rspamd_http_connection *conn;
GQueue *queue;
GList *link;
struct event ev;
};

static void
rspamd_http_keepalive_queue_cleanup (GQueue *conns)
{
GList *cur;

cur = conns->head;

while (cur) {
struct rspamd_http_keepalive_cbdata *cbd;

cbd = (struct rspamd_http_keepalive_cbdata *)cur->data;
rspamd_http_connection_unref (cbd->conn);
event_del (&cbd->ev);
g_free (cbd);

cur = cur->next;
}

g_queue_clear (conns);
}

static void
rspamd_http_context_client_rotate_ev (gint fd, short what, void *arg)
{
@@ -69,6 +97,8 @@ rspamd_http_context_new_default (struct rspamd_config *cfg,

ctx->ev_base = ev_base;

ctx->keep_alive_hash = kh_init (rspamd_keep_alive_hash);

return ctx;
}

@@ -161,6 +191,7 @@ rspamd_http_context_create (struct rspamd_config *cfg,
return ctx;
}


void
rspamd_http_context_free (struct rspamd_http_context *ctx)
{
@@ -185,6 +216,20 @@ rspamd_http_context_free (struct rspamd_http_context *ctx)
}
}

struct rspamd_keepalive_hash_key *hk;

kh_foreach_key (ctx->keep_alive_hash, hk, {
if (hk->host) {
g_free (hk->host);
}

rspamd_inet_address_free (hk->addr);
rspamd_http_keepalive_queue_cleanup (&hk->conns);
g_free (hk);
});

kh_destroy (rspamd_keep_alive_hash, ctx->keep_alive_hash);

g_free (ctx);
}

@@ -210,32 +255,178 @@ rspamd_http_context_default (void)
}

gint32
rspamd_keep_alive_key_hash (struct rspamd_keepalive_hash_key k)
rspamd_keep_alive_key_hash (struct rspamd_keepalive_hash_key *k)
{
gint32 h;

h = rspamd_inet_address_port_hash (k.addr);
h = rspamd_inet_address_port_hash (k->addr);

if (k.host) {
h = rspamd_cryptobox_fast_hash (k.host, strlen (k.host), h);
if (k->host) {
h = rspamd_cryptobox_fast_hash (k->host, strlen (k->host), h);
}

return h;
}

bool
rspamd_keep_alive_key_equal (struct rspamd_keepalive_hash_key k1,
struct rspamd_keepalive_hash_key k2)
rspamd_keep_alive_key_equal (struct rspamd_keepalive_hash_key *k1,
struct rspamd_keepalive_hash_key *k2)
{
if (k1.host && k2.host) {
if (rspamd_inet_address_port_equal (k1.addr, k2.addr)) {
return strcmp (k1.host, k2.host);
if (k1->host && k2->host) {
if (rspamd_inet_address_port_equal (k1->addr, k2->addr)) {
return strcmp (k1->host, k2->host);
}
}
else if (!k1.host && !k2.host) {
return rspamd_inet_address_port_equal (k1.addr, k2.addr);
else if (!k1->host && !k2->host) {
return rspamd_inet_address_port_equal (k1->addr, k2->addr);
}

/* One has host and another has no host */
return false;
}

struct rspamd_http_connection*
rspamd_http_context_check_keepalive (struct rspamd_http_context *ctx,
const rspamd_inet_addr_t *addr,
const gchar *host)
{
struct rspamd_keepalive_hash_key hk, *phk;
khiter_t k;

hk.addr = (rspamd_inet_addr_t *)addr;
hk.host = (gchar *)host;

k = kh_get (rspamd_keep_alive_hash, ctx->keep_alive_hash, &hk);

if (k != kh_end (ctx->keep_alive_hash)) {
phk = kh_key (ctx->keep_alive_hash, k);
GQueue *conns = &phk->conns;

/* Use stack based approach */

if (g_queue_get_length (conns) > 0) {
struct rspamd_http_keepalive_cbdata *cbd;
struct rspamd_http_connection *conn;

cbd = g_queue_pop_head (conns);
event_del (&cbd->ev);
conn = cbd->conn;
g_free (cbd);

/* We transfer refcount here! */
return conn;
}
}

return NULL;
}

void
rspamd_http_context_prepare_keepalive (struct rspamd_http_context *ctx,
struct rspamd_http_connection *conn,
const rspamd_inet_addr_t *addr,
const gchar *host)
{
struct rspamd_keepalive_hash_key hk, *phk;
khiter_t k;

hk.addr = (rspamd_inet_addr_t *)addr;
hk.host = (gchar *)host;

k = kh_get (rspamd_keep_alive_hash, ctx->keep_alive_hash, &hk);

if (k != kh_end (ctx->keep_alive_hash)) {
/* Reuse existing */
conn->keepalive_hash_key = kh_key (ctx->keep_alive_hash, k);
}
else {
/* Create new one */
GQueue empty_init = G_QUEUE_INIT;
gint r;

phk = g_malloc (sizeof (*phk));
phk->conns = empty_init;
phk->host = g_strdup (host);
phk->addr = rspamd_inet_address_copy (addr);

kh_put (rspamd_keep_alive_hash, ctx->keep_alive_hash, phk, &r);
conn->keepalive_hash_key = phk;
}
}

static void
rspamd_http_keepalive_handler (gint fd, short what, gpointer ud)
{
struct rspamd_http_keepalive_cbdata *cbdata =
(struct rspamd_http_keepalive_cbdata *)ud;
/*
* We can get here if a remote side reported something or it has
* timed out. In both cases we just terminate keepalive connection.
*/

g_queue_delete_link (cbdata->queue, cbdata->link);
rspamd_http_connection_unref (cbdata->conn);
g_free (cbdata);
}

void
rspamd_http_context_push_keepalive (struct rspamd_http_context *ctx,
struct rspamd_http_connection *conn,
struct rspamd_http_message *msg,
struct event_base *ev_base)
{
struct rspamd_http_keepalive_cbdata *cbdata;
struct timeval tv;
gdouble timeout = ctx->config.keepalive_interval;

g_assert (conn->keepalive_hash_key != NULL);

/* Move connection to the keepalive pool */
cbdata = g_malloc0 (sizeof (*cbdata));

cbdata->conn = rspamd_http_connection_ref (conn);
g_queue_push_tail (&conn->keepalive_hash_key->conns, cbdata);
cbdata->link = conn->keepalive_hash_key->conns.tail;
cbdata->queue = &conn->keepalive_hash_key->conns;

event_set (&cbdata->ev, conn->fd, EV_READ|EV_TIMEOUT,
rspamd_http_keepalive_handler,
&cbdata);

if (msg) {
const rspamd_ftok_t *tok;

tok = rspamd_http_message_find_header (msg, "Keep-Alive");

if (tok) {
goffset pos = rspamd_substring_search_caseless (tok->begin,
tok->len, "timeout=", sizeof ("timeout=") - 1);

if (pos != -1) {
pos += sizeof ("timeout=");

gchar *end_pos = memchr (tok->begin + pos, ',', tok->len - pos);
glong real_timeout;

if (end_pos) {
if (rspamd_strtol (tok->begin + pos + 1,
(end_pos - tok->begin) - pos - 1, &real_timeout) &&
real_timeout > 0) {
timeout = real_timeout;
}
}
else {
if (rspamd_strtol (tok->begin + pos + 1,
tok->len - pos - 1, &real_timeout) &&
real_timeout > 0) {
timeout = real_timeout;
}
}
}
}
}

double_to_tv (timeout, &tv);
event_base_set (ev_base, &cbdata->ev);
event_add (&cbdata->ev, &tv);
}

+ 39
- 0
src/libutil/http_context.h Voir le fichier

@@ -19,16 +19,19 @@

#include "config.h"
#include "ucl.h"
#include "addr.h"

#include <event.h>

struct rspamd_http_context;
struct rspamd_config;
struct rspamd_http_message;

struct rspamd_http_context_cfg {
guint kp_cache_size_client;
guint kp_cache_size_server;
guint ssl_cache_size;
gdouble keepalive_interval;
gdouble client_key_rotate_time;
const gchar *user_agent;
};
@@ -53,4 +56,40 @@ void rspamd_http_context_free (struct rspamd_http_context *ctx);

struct rspamd_http_context* rspamd_http_context_default (void);

/**
* Returns preserved keepalive connection if it's available.
* Refcount is transferred to caller!
* @param ctx
* @param addr
* @param host
* @return
*/
struct rspamd_http_connection* rspamd_http_context_check_keepalive (
struct rspamd_http_context *ctx, const rspamd_inet_addr_t *addr,
const gchar *host);

/**
* Prepares keepalive key for a connection by creating a new entry or by reusing existent
* Bear in mind, that keepalive pool has currently no cleanup methods!
* @param ctx
* @param conn
* @param addr
* @param host
*/
void rspamd_http_context_prepare_keepalive (struct rspamd_http_context *ctx,
struct rspamd_http_connection *conn,
const rspamd_inet_addr_t *addr,
const gchar *host);
/**
* Pushes a connection to keepalive pool after client request is finished,
* keepalive key *must* be prepared before using of this function
* @param ctx
* @param conn
* @param msg
*/
void rspamd_http_context_push_keepalive (struct rspamd_http_context *ctx,
struct rspamd_http_connection *conn,
struct rspamd_http_message *msg,
struct event_base *ev_base);

#endif

+ 195
- 0
src/libutil/http_message.c Voir le fichier

@@ -18,6 +18,7 @@
#include "libutil/http_private.h"
#include "libutil/printf.h"
#include "libutil/logger.h"
#include "utlist.h"
#include "unix-std.h"

struct rspamd_http_message *
@@ -463,3 +464,197 @@ rspamd_http_message_storage_cleanup (struct rspamd_http_message *msg)

msg->body_buf.len = 0;
}

void
rspamd_http_message_free (struct rspamd_http_message *msg)
{
struct rspamd_http_header *hdr, *htmp, *hcur, *hcurtmp;


HASH_ITER (hh, msg->headers, hdr, htmp) {
HASH_DEL (msg->headers, hdr);

DL_FOREACH_SAFE (hdr, hcur, hcurtmp) {
rspamd_fstring_free (hcur->combined);
g_free (hcur);
}
}

rspamd_http_message_storage_cleanup (msg);

if (msg->url != NULL) {
rspamd_fstring_free (msg->url);
}
if (msg->status != NULL) {
rspamd_fstring_free (msg->status);
}
if (msg->host != NULL) {
rspamd_fstring_free (msg->host);
}
if (msg->peer_key != NULL) {
rspamd_pubkey_unref (msg->peer_key);
}

g_free (msg);
}

void
rspamd_http_message_set_peer_key (struct rspamd_http_message *msg,
struct rspamd_cryptobox_pubkey *pk)
{
if (msg->peer_key != NULL) {
rspamd_pubkey_unref (msg->peer_key);
}

if (pk) {
msg->peer_key = rspamd_pubkey_ref (pk);
}
else {
msg->peer_key = NULL;
}
}

void
rspamd_http_message_add_header_len (struct rspamd_http_message *msg,
const gchar *name,
const gchar *value,
gsize len)
{
struct rspamd_http_header *hdr, *found = NULL;
guint nlen, vlen;

if (msg != NULL && name != NULL && value != NULL) {
hdr = g_malloc0 (sizeof (struct rspamd_http_header));
nlen = strlen (name);
vlen = len;
hdr->combined = rspamd_fstring_sized_new (nlen + vlen + 4);
rspamd_printf_fstring (&hdr->combined, "%s: %*s\r\n", name, (gint)vlen,
value);
hdr->name.begin = hdr->combined->str;
hdr->name.len = nlen;
hdr->value.begin = hdr->combined->str + nlen + 2;
hdr->value.len = vlen;

HASH_FIND (hh, msg->headers, hdr->name.begin,
hdr->name.len, found);

if (found == NULL) {
HASH_ADD_KEYPTR (hh, msg->headers, hdr->name.begin,
hdr->name.len, hdr);
}

DL_APPEND (found, hdr);
}
}

void
rspamd_http_message_add_header (struct rspamd_http_message *msg,
const gchar *name,
const gchar *value)
{
if (value) {
rspamd_http_message_add_header_len (msg, name, value, strlen (value));
}
}

void
rspamd_http_message_add_header_fstr (struct rspamd_http_message *msg,
const gchar *name,
rspamd_fstring_t *value)
{
struct rspamd_http_header *hdr, *found = NULL;
guint nlen, vlen;

if (msg != NULL && name != NULL && value != NULL) {
hdr = g_malloc0 (sizeof (struct rspamd_http_header));
nlen = strlen (name);
vlen = value->len;
hdr->combined = rspamd_fstring_sized_new (nlen + vlen + 4);
rspamd_printf_fstring (&hdr->combined, "%s: %V\r\n", name, value);
hdr->name.begin = hdr->combined->str;
hdr->name.len = nlen;
hdr->value.begin = hdr->combined->str + nlen + 2;
hdr->value.len = vlen;

HASH_FIND (hh, msg->headers, hdr->name.begin,
hdr->name.len, found);

if (found == NULL) {
HASH_ADD_KEYPTR (hh, msg->headers, hdr->name.begin,
hdr->name.len, hdr);
}

DL_APPEND (found, hdr);
}
}

const rspamd_ftok_t *
rspamd_http_message_find_header (struct rspamd_http_message *msg,
const gchar *name)
{
struct rspamd_http_header *hdr;
const rspamd_ftok_t *res = NULL;
guint slen = strlen (name);

if (msg != NULL) {
HASH_FIND (hh, msg->headers, name, slen, hdr);

if (hdr) {
res = &hdr->value;
}
}

return res;
}

GPtrArray*
rspamd_http_message_find_header_multiple (
struct rspamd_http_message *msg,
const gchar *name)
{
GPtrArray *res = NULL;
struct rspamd_http_header *hdr, *cur;

guint slen = strlen (name);

if (msg != NULL) {
HASH_FIND (hh, msg->headers, name, slen, hdr);

if (hdr) {
res = g_ptr_array_sized_new (4);

LL_FOREACH (hdr, cur) {
g_ptr_array_add (res, &cur->value);
}
}
}


return res;
}


gboolean
rspamd_http_message_remove_header (struct rspamd_http_message *msg,
const gchar *name)
{
struct rspamd_http_header *hdr, *hcur, *hcurtmp;
gboolean res = FALSE;
guint slen = strlen (name);

if (msg != NULL) {
HASH_FIND (hh, msg->headers, name, slen, hdr);

if (hdr) {
HASH_DEL (msg->headers, hdr);
res = TRUE;

DL_FOREACH_SAFE (hdr, hcur, hcurtmp) {
rspamd_fstring_free (hcur->combined);
g_free (hcur);
}
}
}

return res;
}

+ 6
- 5
src/libutil/http_private.h Voir le fichier

@@ -80,14 +80,15 @@ struct rspamd_http_message {
struct rspamd_keepalive_hash_key {
rspamd_inet_addr_t *addr;
gchar *host;
GQueue conns;
};

gint32 rspamd_keep_alive_key_hash (struct rspamd_keepalive_hash_key k);
bool rspamd_keep_alive_key_equal (struct rspamd_keepalive_hash_key k1,
struct rspamd_keepalive_hash_key k2);
gint32 rspamd_keep_alive_key_hash (struct rspamd_keepalive_hash_key* k);
bool rspamd_keep_alive_key_equal (struct rspamd_keepalive_hash_key* k1,
struct rspamd_keepalive_hash_key* k2);

KHASH_INIT (rspamd_keep_alive_hash, struct rspamd_keepalive_hash_key,
GQueue, true, rspamd_keep_alive_key_hash, rspamd_keep_alive_key_equal);
KHASH_INIT (rspamd_keep_alive_hash, struct rspamd_keepalive_hash_key *,
char, 0, rspamd_keep_alive_key_hash, rspamd_keep_alive_key_equal);

struct rspamd_http_context {
struct rspamd_http_context_cfg config;

Chargement…
Annuler
Enregistrer