From 43a44c7faf94133305a7afd627b3d009a60aa371 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 20 Jan 2014 14:29:02 +0000 Subject: [PATCH] Write the new client library. --- src/client/rspamdclient.c | 180 +++++++++++++++++++++++++++++++++++++- src/client/rspamdclient.h | 38 +++++--- src/http.c | 23 +++-- 3 files changed, 222 insertions(+), 19 deletions(-) diff --git a/src/client/rspamdclient.c b/src/client/rspamdclient.c index d7f1c77dd..c612901e1 100644 --- a/src/client/rspamdclient.c +++ b/src/client/rspamdclient.c @@ -23,11 +23,185 @@ #include "rspamdclient.h" #include "util.h" +#include "http.h" + +struct rspamd_client_request; + +/* + * Since rspamd uses untagged HTTP we can pass a single message per socket + */ +struct rspamd_client_connection { + gint fd; + GString *server_name; + struct event_base *ev_base; + struct timeval timeout; + struct rspamd_http_connection *http_conn; + gboolean connected; + struct rspamd_client_request *req; +}; + +struct rspamd_client_request { + struct rspamd_client_connection *conn; + struct rspamd_http_message *msg; + rspamd_client_callback cb; + gpointer ud; +}; + +#define RCLIENT_ERROR rspamd_client_error_quark () +GQuark +rspamd_client_error_quark (void) +{ + return g_quark_from_static_string ("rspamd-client-error"); +} + +static gint +rspamd_client_body_handler (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg, + const gchar *chunk, gsize len) +{ + /* Do nothing here */ + return 0; +} + +static void +rspamd_client_error_handler (struct rspamd_http_connection *conn, GError *err) +{ + struct rspamd_client_request *req = (struct rspamd_client_request *)conn->ud; + struct rspamd_client_connection *c; + + c = req->conn; + req->cb (c->server_name->str, NULL, req->ud, err); +} + +static void +rspamd_client_finish_handler (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg) +{ + struct rspamd_client_request *req = (struct rspamd_client_request *)conn->ud; + struct rspamd_client_connection *c; + struct ucl_parser *parser; + GError *err; + + c = req->conn; + if (msg->body == NULL || msg->body->len == 0 || msg->code != 200) { + err = g_error_new (RCLIENT_ERROR, msg->code, "HTTP error occurred: %d", msg->code); + req->cb (c->server_name->str, NULL, req->ud, err); + g_error_free (err); + return; + } + + parser = ucl_parser_new (0); + if (!ucl_parser_add_chunk (parser, msg->body->str, msg->body->len)) { + err = g_error_new (RCLIENT_ERROR, msg->code, "Cannot parse UCL: %s", + ucl_parser_get_error (parser)); + ucl_parser_free (parser); + req->cb (c->server_name->str, NULL, req->ud, err); + g_error_free (err); + return; + } + + req->cb (c->server_name->str, ucl_parser_get_object (parser), req->ud, NULL); + ucl_parser_free (parser); +} + +struct rspamd_client_connection * +rspamd_client_init (struct event_base *ev_base, const gchar *name, + guint16 port, gdouble timeout) +{ + struct rspamd_client_connection *conn; + gint fd; + + fd = make_universal_socket (name, port, SOCK_STREAM, TRUE, FALSE, TRUE); + if (fd == -1) { + return NULL; + } + + conn = g_slice_alloc (sizeof (struct rspamd_client_connection)); + conn->ev_base = ev_base; + conn->fd = fd; + conn->connected = FALSE; + conn->http_conn = rspamd_http_connection_new (rspamd_client_body_handler, + rspamd_client_error_handler, rspamd_client_finish_handler, 0, RSPAMD_HTTP_CLIENT); + conn->server_name = g_string_new (name); + if (port != 0) { + rspamd_printf_gstring (conn->server_name, ":%d", (int)port); + } + + double_to_tv (timeout, &conn->timeout); + + return conn; +} gboolean -rspamd_client_command (struct event_base *ev_base, const gchar *name, - guint16 port, const gchar *command, GHashTable *attrs, - gdouble timeout, FILE *in, gpointer ud) +rspamd_client_command (struct rspamd_client_connection *conn, + const gchar *command, GHashTable *attrs, + FILE *in, rspamd_client_callback cb, + gpointer ud, GError **err) { + struct rspamd_client_request *req; + gchar *p, *hn, *hv; + gsize remain; + GHashTableIter it; + req = g_slice_alloc (sizeof (struct rspamd_client_request)); + req->conn = conn; + req->cb = cb; + req->ud = ud; + + req->msg = rspamd_http_new_message (HTTP_REQUEST); + if (in != NULL) { + /* Read input stream */ + req->msg->body = g_string_sized_new (BUFSIZ); + while (!feof (in)) { + p = req->msg->body->str + req->msg->body->len; + remain = req->msg->body->allocated_len - req->msg->body->len - 1; + if (remain == 0) { + g_string_set_size (req->msg->body, req->msg->body->len * 2); + } + remain = fread (p, 1, remain, in); + if (remain > 0) { + req->msg->body->len += remain; + req->msg->body->str[req->msg->body->len] = '\0'; + } + } + if (ferror (in) != 0) { + g_set_error (err, RCLIENT_ERROR, ferror (in), "input IO error: %s", strerror (ferror (in))); + g_slice_free1 (sizeof (struct rspamd_client_request), req); + return FALSE; + } + } + else { + req->msg->body = NULL; + } + + /* Convert headers */ + g_hash_table_iter_init (&it, attrs); + while (g_hash_table_iter_next (&it, (gpointer *)&hn, (gpointer *)&hv)) { + rspamd_http_message_add_header (req->msg, hn, hv); + } + + req->msg->url = g_string_new ("/"); + g_string_append (req->msg->url, command); + + conn->req = req; + + rspamd_http_connection_write_message (conn->http_conn, req->msg, NULL, + "text/plain", req, conn->fd, &conn->timeout, conn->ev_base); + + return TRUE; +} + +void +rspamd_client_destroy (struct rspamd_client_connection *conn) +{ + if (conn != NULL) { + rspamd_http_connection_free (conn->http_conn); + if (conn->req != NULL) { + rspamd_http_message_free (conn->req->msg); + g_slice_free1 (sizeof (struct rspamd_client_request), conn->req); + } + close (conn->fd); + g_string_free (conn->server_name, TRUE); + g_slice_free1 (sizeof (struct rspamd_client_connection), conn); + } } diff --git a/src/client/rspamdclient.h b/src/client/rspamdclient.h index a274426fa..6d6a8d469 100644 --- a/src/client/rspamdclient.h +++ b/src/client/rspamdclient.h @@ -27,17 +27,18 @@ #include "config.h" #include "ucl.h" +struct rspamd_client_connection; + /** - * Callback is called on client request completed + * Callback is called on client connection completed * @param name name of server * @param port port for server * @param result result object * @param ud opaque user data - * @param err error pointer (should be freed if not NULL) + * @param err error pointer */ typedef void (*rspamd_client_callback) ( const gchar *name, - guint16 port, ucl_object_t *result, gpointer ud, GError *err); @@ -46,22 +47,39 @@ typedef void (*rspamd_client_callback) ( * Start rspamd worker or controller command * @param ev_base event base * @param name server name (hostname or unix socket) - * @param command command name - * @param attrs additional attributes * @param port port number (in host order) * @param timeout timeout in seconds - * @param in input file or NULL if no input required - * @param ud opaque user data * @return */ -gboolean rspamd_client_command ( +struct rspamd_client_connection* rspamd_client_init ( struct event_base *ev_base, const gchar *name, guint16 port, + gdouble timeout); + +/** + * + * @param conn connection object + * @param command command name + * @param attrs additional attributes + * @param in input file or NULL if no input required + * @param cb callback to be called on command completion + * @param ud opaque user data + * @return + */ +gboolean rspamd_client_command ( + struct rspamd_client_connection *conn, const gchar *command, GHashTable *attrs, - gdouble timeout, FILE *in, - gpointer ud); + rspamd_client_callback cb, + gpointer ud, + GError **err); + +/** + * Destroy a connection to rspamd + * @param conn + */ +void rspamd_client_destroy (struct rspamd_client_connection *conn); #endif /* RSPAMDCLIENT_H_ */ diff --git a/src/http.c b/src/http.c index 22c202b88..be1a389e0 100644 --- a/src/http.c +++ b/src/http.c @@ -688,8 +688,9 @@ rspamd_http_connection_write_message (struct rspamd_http_connection *conn, struct rspamd_http_connection_private *priv = conn->priv; struct rspamd_http_header *hdr; struct tm t, *ptm; - gchar datebuf[64]; + gchar datebuf[64], *pbody; gint i; + gsize bodylen; conn->fd = fd; conn->ud = ud; @@ -748,10 +749,18 @@ rspamd_http_connection_write_message (struct rspamd_http_connection *conn, http_method_str (msg->method), msg->url, msg->body->len); } } - + if (msg->body == NULL) { + pbody = NULL; + bodylen = 0; + priv->outlen = 2; + } + else { + pbody = msg->body->str; + bodylen = msg->body->len; + priv->outlen = 3; + } /* Allocate iov */ - priv->outlen = 3; - priv->wr_total = msg->body->len + priv->buf->len + 2; + priv->wr_total = bodylen + priv->buf->len + 2; DL_FOREACH (msg->headers, hdr) { /* <: ><\r\n> */ priv->wr_total += hdr->name->len + hdr->value->len + 4; @@ -776,8 +785,10 @@ rspamd_http_connection_write_message (struct rspamd_http_connection *conn, } priv->out[i].iov_base = "\r\n"; priv->out[i++].iov_len = 2; - priv->out[i].iov_base = msg->body->str; - priv->out[i++].iov_len = msg->body->len; + if (msg->body != NULL) { + priv->out[i].iov_base = pbody; + priv->out[i++].iov_len = bodylen; + } event_set (&priv->ev, fd, EV_WRITE, rspamd_http_event_handler, conn); event_base_set (base, &priv->ev); -- 2.39.5