@@ -73,7 +73,7 @@ rspamd_client_error_handler (struct rspamd_http_connection *conn, GError *err) | |||
req->cb (c, c->server_name->str, NULL, req->ud, err); | |||
} | |||
static void | |||
static gint | |||
rspamd_client_finish_handler (struct rspamd_http_connection *conn, | |||
struct rspamd_http_message *msg) | |||
{ | |||
@@ -88,13 +88,14 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn, | |||
c->req_sent = TRUE; | |||
rspamd_http_connection_reset (c->http_conn); | |||
rspamd_http_connection_read_message (c->http_conn, c->req, c->fd, &c->timeout, c->ev_base); | |||
return 0; | |||
} | |||
else { | |||
if (msg->body == NULL || msg->body->len == 0 || msg->code != 200) { | |||
err = g_error_new (RCLIENT_ERROR, msg->code, "HTTP error occurred: %d", msg->code); | |||
req->cb (c, c->server_name->str, NULL, req->ud, err); | |||
g_error_free (err); | |||
return; | |||
return -1; | |||
} | |||
parser = ucl_parser_new (0); | |||
@@ -104,12 +105,14 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn, | |||
ucl_parser_free (parser); | |||
req->cb (c, c->server_name->str, NULL, req->ud, err); | |||
g_error_free (err); | |||
return; | |||
return -1; | |||
} | |||
req->cb (c, c->server_name->str, ucl_parser_get_object (parser), req->ud, NULL); | |||
ucl_parser_free (parser); | |||
} | |||
return -1; | |||
} | |||
struct rspamd_client_connection * | |||
@@ -203,7 +206,7 @@ void | |||
rspamd_client_destroy (struct rspamd_client_connection *conn) | |||
{ | |||
if (conn != NULL) { | |||
rspamd_http_connection_free (conn->http_conn); | |||
rspamd_http_connection_unref (conn->http_conn); | |||
if (conn->req != NULL) { | |||
g_slice_free1 (sizeof (struct rspamd_client_request), conn->req); | |||
} |
@@ -465,14 +465,21 @@ rspamd_http_on_message_complete (http_parser* parser) | |||
priv = conn->priv; | |||
if (conn->body_handler != NULL) { | |||
rspamd_http_connection_ref (conn); | |||
if (conn->opts & RSPAMD_HTTP_BODY_PARTIAL) { | |||
ret = conn->body_handler (conn, priv->msg, NULL, 0); | |||
} | |||
else { | |||
ret = conn->body_handler (conn, priv->msg, priv->msg->body->str, priv->msg->body->len); | |||
} | |||
rspamd_http_connection_unref (conn); | |||
} | |||
if (ret == 0) { | |||
rspamd_http_connection_ref (conn); | |||
ret = conn->finish_handler (conn, priv->msg); | |||
rspamd_http_connection_unref (conn); | |||
} | |||
conn->finish_handler (conn, priv->msg); | |||
return ret; | |||
} | |||
@@ -490,7 +497,9 @@ rspamd_http_write_helper (struct rspamd_http_connection *conn) | |||
priv = conn->priv; | |||
if (priv->wr_pos == priv->wr_total) { | |||
rspamd_http_connection_ref (conn); | |||
conn->finish_handler (conn, priv->msg); | |||
rspamd_http_connection_unref (conn); | |||
return; | |||
} | |||
@@ -516,7 +525,9 @@ rspamd_http_write_helper (struct rspamd_http_connection *conn) | |||
if (r == -1) { | |||
err = g_error_new (HTTP_ERROR, errno, "IO write error: %s", strerror (errno)); | |||
rspamd_http_connection_ref (conn); | |||
conn->error_handler (conn, err); | |||
rspamd_http_connection_unref (conn); | |||
g_error_free (err); | |||
return; | |||
} | |||
@@ -525,7 +536,9 @@ rspamd_http_write_helper (struct rspamd_http_connection *conn) | |||
} | |||
if (priv->wr_pos >= priv->wr_total) { | |||
rspamd_http_connection_ref (conn); | |||
conn->finish_handler (conn, priv->msg); | |||
rspamd_http_connection_unref (conn); | |||
} | |||
else { | |||
/* Want to write more */ | |||
@@ -555,19 +568,24 @@ rspamd_http_event_handler (int fd, short what, gpointer ud) | |||
} | |||
else { | |||
buf->len = r; | |||
rspamd_http_connection_ref (conn); | |||
if (http_parser_execute (&priv->parser, &priv->parser_cb, buf->str, r) != (size_t)r) { | |||
err = g_error_new (HTTP_ERROR, priv->parser.http_errno, | |||
"HTTP parser error: %s", http_errno_description (priv->parser.http_errno)); | |||
conn->error_handler (conn, err); | |||
g_error_free (err); | |||
rspamd_http_connection_unref (conn); | |||
return; | |||
} | |||
rspamd_http_connection_unref (conn); | |||
} | |||
} | |||
else if (what == EV_TIMEOUT) { | |||
err = g_error_new (HTTP_ERROR, ETIMEDOUT, | |||
"IO timeout"); | |||
rspamd_http_connection_ref (conn); | |||
conn->error_handler (conn, err); | |||
rspamd_http_connection_unref (conn); | |||
g_error_free (err); | |||
return; | |||
} | |||
@@ -597,6 +615,7 @@ rspamd_http_connection_new (rspamd_http_body_handler body_handler, | |||
new->error_handler = error_handler; | |||
new->finish_handler = finish_handler; | |||
new->fd = -1; | |||
new->ref = 1; | |||
/* Init priv */ | |||
priv = g_slice_alloc0 (sizeof (struct rspamd_http_connection_private)); |
@@ -72,14 +72,14 @@ enum rspamd_http_options { | |||
struct rspamd_http_connection_private; | |||
struct rspamd_http_connection; | |||
typedef gboolean (*rspamd_http_body_handler) (struct rspamd_http_connection *conn, | |||
typedef int (*rspamd_http_body_handler) (struct rspamd_http_connection *conn, | |||
struct rspamd_http_message *msg, | |||
const gchar *chunk, | |||
gsize len); | |||
typedef void (*rspamd_http_error_handler) (struct rspamd_http_connection *conn, GError *err); | |||
typedef void (*rspamd_http_finish_handler) (struct rspamd_http_connection *conn, | |||
typedef int (*rspamd_http_finish_handler) (struct rspamd_http_connection *conn, | |||
struct rspamd_http_message *msg); | |||
/** | |||
@@ -94,6 +94,7 @@ struct rspamd_http_connection { | |||
enum rspamd_http_options opts; | |||
enum rspamd_http_connection_type type; | |||
gint fd; | |||
gint ref; | |||
}; | |||
/** | |||
@@ -145,6 +146,30 @@ void rspamd_http_connection_write_message ( | |||
*/ | |||
void rspamd_http_connection_free (struct rspamd_http_connection *conn); | |||
/** | |||
* Increase refcount for a connection | |||
* @param conn | |||
* @return | |||
*/ | |||
static inline struct rspamd_http_connection * | |||
rspamd_http_connection_ref (struct rspamd_http_connection *conn) | |||
{ | |||
conn->ref ++; | |||
return conn; | |||
} | |||
/** | |||
* Decrease a refcount for a connection and free it if refcount is equal to zero | |||
* @param conn | |||
*/ | |||
static void | |||
rspamd_http_connection_unref (struct rspamd_http_connection *conn) | |||
{ | |||
if (--conn->ref <= 0) { | |||
rspamd_http_connection_free (conn); | |||
} | |||
} | |||
/** | |||
* Reset connection for a new request | |||
* @param conn |
@@ -382,7 +382,7 @@ rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err) | |||
} | |||
} | |||
static void | |||
static gint | |||
rspamd_worker_finish_handler (struct rspamd_http_connection *conn, | |||
struct rspamd_http_message *msg) | |||
{ | |||
@@ -400,6 +400,8 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn, | |||
task->s->wanna_die = TRUE; | |||
check_session_pending (task->s); | |||
} | |||
return 0; | |||
} | |||
/* |
@@ -162,7 +162,7 @@ free_task (struct worker_task *task, gboolean is_soft) | |||
} | |||
} | |||
if (task->http_conn != NULL) { | |||
rspamd_http_connection_free (task->http_conn); | |||
rspamd_http_connection_unref (task->http_conn); | |||
} | |||
if (task->sock != -1) { | |||
close (task->sock); |