req->cb (c, c->server_name->str, NULL, req->ud, err);
}
-static void
+static gint
rspamd_client_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
{
c->req_sent = TRUE;
rspamd_http_connection_reset (c->http_conn);
rspamd_http_connection_read_message (c->http_conn, c->req, c->fd, &c->timeout, c->ev_base);
+ return 0;
}
else {
if (msg->body == NULL || msg->body->len == 0 || msg->code != 200) {
err = g_error_new (RCLIENT_ERROR, msg->code, "HTTP error occurred: %d", msg->code);
req->cb (c, c->server_name->str, NULL, req->ud, err);
g_error_free (err);
- return;
+ return -1;
}
parser = ucl_parser_new (0);
ucl_parser_free (parser);
req->cb (c, c->server_name->str, NULL, req->ud, err);
g_error_free (err);
- return;
+ return -1;
}
req->cb (c, c->server_name->str, ucl_parser_get_object (parser), req->ud, NULL);
ucl_parser_free (parser);
}
+
+ return -1;
}
struct rspamd_client_connection *
rspamd_client_destroy (struct rspamd_client_connection *conn)
{
if (conn != NULL) {
- rspamd_http_connection_free (conn->http_conn);
+ rspamd_http_connection_unref (conn->http_conn);
if (conn->req != NULL) {
g_slice_free1 (sizeof (struct rspamd_client_request), conn->req);
}
priv = conn->priv;
if (conn->body_handler != NULL) {
+ rspamd_http_connection_ref (conn);
if (conn->opts & RSPAMD_HTTP_BODY_PARTIAL) {
ret = conn->body_handler (conn, priv->msg, NULL, 0);
}
else {
ret = conn->body_handler (conn, priv->msg, priv->msg->body->str, priv->msg->body->len);
}
+ rspamd_http_connection_unref (conn);
+ }
+
+ if (ret == 0) {
+ rspamd_http_connection_ref (conn);
+ ret = conn->finish_handler (conn, priv->msg);
+ rspamd_http_connection_unref (conn);
}
- conn->finish_handler (conn, priv->msg);
return ret;
}
priv = conn->priv;
if (priv->wr_pos == priv->wr_total) {
+ rspamd_http_connection_ref (conn);
conn->finish_handler (conn, priv->msg);
+ rspamd_http_connection_unref (conn);
return;
}
if (r == -1) {
err = g_error_new (HTTP_ERROR, errno, "IO write error: %s", strerror (errno));
+ rspamd_http_connection_ref (conn);
conn->error_handler (conn, err);
+ rspamd_http_connection_unref (conn);
g_error_free (err);
return;
}
}
if (priv->wr_pos >= priv->wr_total) {
+ rspamd_http_connection_ref (conn);
conn->finish_handler (conn, priv->msg);
+ rspamd_http_connection_unref (conn);
}
else {
/* Want to write more */
}
else {
buf->len = r;
+ rspamd_http_connection_ref (conn);
if (http_parser_execute (&priv->parser, &priv->parser_cb, buf->str, r) != (size_t)r) {
err = g_error_new (HTTP_ERROR, priv->parser.http_errno,
"HTTP parser error: %s", http_errno_description (priv->parser.http_errno));
conn->error_handler (conn, err);
g_error_free (err);
+ rspamd_http_connection_unref (conn);
return;
}
+ rspamd_http_connection_unref (conn);
}
}
else if (what == EV_TIMEOUT) {
err = g_error_new (HTTP_ERROR, ETIMEDOUT,
"IO timeout");
+ rspamd_http_connection_ref (conn);
conn->error_handler (conn, err);
+ rspamd_http_connection_unref (conn);
g_error_free (err);
return;
}
new->error_handler = error_handler;
new->finish_handler = finish_handler;
new->fd = -1;
+ new->ref = 1;
/* Init priv */
priv = g_slice_alloc0 (sizeof (struct rspamd_http_connection_private));
struct rspamd_http_connection_private;
struct rspamd_http_connection;
-typedef gboolean (*rspamd_http_body_handler) (struct rspamd_http_connection *conn,
+typedef int (*rspamd_http_body_handler) (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg,
const gchar *chunk,
gsize len);
typedef void (*rspamd_http_error_handler) (struct rspamd_http_connection *conn, GError *err);
-typedef void (*rspamd_http_finish_handler) (struct rspamd_http_connection *conn,
+typedef int (*rspamd_http_finish_handler) (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg);
/**
enum rspamd_http_options opts;
enum rspamd_http_connection_type type;
gint fd;
+ gint ref;
};
/**
*/
void rspamd_http_connection_free (struct rspamd_http_connection *conn);
+/**
+ * Increase refcount for a connection
+ * @param conn
+ * @return
+ */
+static inline struct rspamd_http_connection *
+rspamd_http_connection_ref (struct rspamd_http_connection *conn)
+{
+ conn->ref ++;
+ return conn;
+}
+
+/**
+ * Decrease a refcount for a connection and free it if refcount is equal to zero
+ * @param conn
+ */
+static void
+rspamd_http_connection_unref (struct rspamd_http_connection *conn)
+{
+ if (--conn->ref <= 0) {
+ rspamd_http_connection_free (conn);
+ }
+}
+
/**
* Reset connection for a new request
* @param conn
}
}
-static void
+static gint
rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
{
task->s->wanna_die = TRUE;
check_session_pending (task->s);
}
+
+ return 0;
}
/*
}
}
if (task->http_conn != NULL) {
- rspamd_http_connection_free (task->http_conn);
+ rspamd_http_connection_unref (task->http_conn);
}
if (task->sock != -1) {
close (task->sock);