]> source.dussan.org Git - rspamd.git/commitdiff
Write the new client library.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 20 Jan 2014 14:29:02 +0000 (14:29 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 20 Jan 2014 14:29:02 +0000 (14:29 +0000)
src/client/rspamdclient.c
src/client/rspamdclient.h
src/http.c

index d7f1c77ddbf855862af1cd9ce0eeb79423cac57f..c612901e133ce0faec0e6ec52e36815d0645d348 100644 (file)
 
 #include "rspamdclient.h"
 #include "util.h"
+#include "http.h"
+
+struct rspamd_client_request;
+
+/*
+ * Since rspamd uses untagged HTTP we can pass a single message per socket
+ */
+struct rspamd_client_connection {
+       gint fd;
+       GString *server_name;
+       struct event_base *ev_base;
+       struct timeval timeout;
+       struct rspamd_http_connection *http_conn;
+       gboolean connected;
+       struct rspamd_client_request *req;
+};
+
+struct rspamd_client_request {
+       struct rspamd_client_connection *conn;
+       struct rspamd_http_message *msg;
+       rspamd_client_callback cb;
+       gpointer ud;
+};
+
+#define RCLIENT_ERROR rspamd_client_error_quark ()
+GQuark
+rspamd_client_error_quark (void)
+{
+       return g_quark_from_static_string ("rspamd-client-error");
+}
+
+static gint
+rspamd_client_body_handler (struct rspamd_http_connection *conn,
+               struct rspamd_http_message *msg,
+               const gchar *chunk, gsize len)
+{
+       /* Do nothing here */
+       return 0;
+}
+
+static void
+rspamd_client_error_handler (struct rspamd_http_connection *conn, GError *err)
+{
+       struct rspamd_client_request *req = (struct rspamd_client_request *)conn->ud;
+       struct rspamd_client_connection *c;
+
+       c = req->conn;
+       req->cb (c->server_name->str, NULL, req->ud, err);
+}
+
+static void
+rspamd_client_finish_handler (struct rspamd_http_connection *conn,
+               struct rspamd_http_message *msg)
+{
+       struct rspamd_client_request *req = (struct rspamd_client_request *)conn->ud;
+       struct rspamd_client_connection *c;
+       struct ucl_parser *parser;
+       GError *err;
+
+       c = req->conn;
+       if (msg->body == NULL || msg->body->len == 0 || msg->code != 200) {
+               err = g_error_new (RCLIENT_ERROR, msg->code, "HTTP error occurred: %d", msg->code);
+               req->cb (c->server_name->str, NULL, req->ud, err);
+               g_error_free (err);
+               return;
+       }
+
+       parser = ucl_parser_new (0);
+       if (!ucl_parser_add_chunk (parser, msg->body->str, msg->body->len)) {
+               err = g_error_new (RCLIENT_ERROR, msg->code, "Cannot parse UCL: %s",
+                               ucl_parser_get_error (parser));
+               ucl_parser_free (parser);
+               req->cb (c->server_name->str, NULL, req->ud, err);
+               g_error_free (err);
+               return;
+       }
+
+       req->cb (c->server_name->str, ucl_parser_get_object (parser), req->ud, NULL);
+       ucl_parser_free (parser);
+}
+
+struct rspamd_client_connection *
+rspamd_client_init (struct event_base *ev_base, const gchar *name,
+               guint16 port, gdouble timeout)
+{
+       struct rspamd_client_connection *conn;
+       gint fd;
+
+       fd = make_universal_socket (name, port, SOCK_STREAM, TRUE, FALSE, TRUE);
+       if (fd == -1) {
+               return NULL;
+       }
+
+       conn = g_slice_alloc (sizeof (struct rspamd_client_connection));
+       conn->ev_base = ev_base;
+       conn->fd = fd;
+       conn->connected = FALSE;
+       conn->http_conn = rspamd_http_connection_new (rspamd_client_body_handler,
+                       rspamd_client_error_handler, rspamd_client_finish_handler, 0, RSPAMD_HTTP_CLIENT);
+       conn->server_name = g_string_new (name);
+       if (port != 0) {
+               rspamd_printf_gstring (conn->server_name, ":%d", (int)port);
+       }
+
+       double_to_tv (timeout, &conn->timeout);
+
+       return conn;
+}
 
 gboolean
-rspamd_client_command (struct event_base *ev_base, const gchar *name,
-               guint16 port, const gchar *command, GHashTable *attrs,
-               gdouble timeout, FILE *in, gpointer ud)
+rspamd_client_command (struct rspamd_client_connection *conn,
+               const gchar *command, GHashTable *attrs,
+               FILE *in, rspamd_client_callback cb,
+               gpointer ud, GError **err)
 {
+       struct rspamd_client_request *req;
+       gchar *p, *hn, *hv;
+       gsize remain;
+       GHashTableIter it;
 
+       req = g_slice_alloc (sizeof (struct rspamd_client_request));
+       req->conn = conn;
+       req->cb = cb;
+       req->ud = ud;
+
+       req->msg = rspamd_http_new_message (HTTP_REQUEST);
+       if (in != NULL) {
+               /* Read input stream */
+               req->msg->body = g_string_sized_new (BUFSIZ);
+               while (!feof (in)) {
+                       p = req->msg->body->str + req->msg->body->len;
+                       remain = req->msg->body->allocated_len - req->msg->body->len - 1;
+                       if (remain == 0) {
+                               g_string_set_size (req->msg->body, req->msg->body->len * 2);
+                       }
+                       remain = fread (p, 1, remain, in);
+                       if (remain > 0) {
+                               req->msg->body->len += remain;
+                               req->msg->body->str[req->msg->body->len] = '\0';
+                       }
+               }
+               if (ferror (in) != 0) {
+                       g_set_error (err, RCLIENT_ERROR, ferror (in), "input IO error: %s", strerror (ferror (in)));
+                       g_slice_free1 (sizeof (struct rspamd_client_request), req);
+                       return FALSE;
+               }
+       }
+       else {
+               req->msg->body = NULL;
+       }
+
+       /* Convert headers */
+       g_hash_table_iter_init (&it, attrs);
+       while (g_hash_table_iter_next (&it, (gpointer *)&hn, (gpointer *)&hv)) {
+               rspamd_http_message_add_header (req->msg, hn, hv);
+       }
+
+       req->msg->url = g_string_new ("/");
+       g_string_append (req->msg->url, command);
+
+       conn->req = req;
+
+       rspamd_http_connection_write_message (conn->http_conn, req->msg, NULL,
+                       "text/plain", req, conn->fd, &conn->timeout, conn->ev_base);
+
+       return TRUE;
+}
+
+void
+rspamd_client_destroy (struct rspamd_client_connection *conn)
+{
+       if (conn != NULL) {
+               rspamd_http_connection_free (conn->http_conn);
+               if (conn->req != NULL) {
+                       rspamd_http_message_free (conn->req->msg);
+                       g_slice_free1 (sizeof (struct rspamd_client_request), conn->req);
+               }
+               close (conn->fd);
+               g_string_free (conn->server_name, TRUE);
+               g_slice_free1 (sizeof (struct rspamd_client_connection), conn);
+       }
 }
index a274426fabbd7893ba93ac84822bfd7eb921c215..6d6a8d46975d232b9188fa9c508a68ae3267f35b 100644 (file)
 #include "config.h"
 #include "ucl.h"
 
+struct rspamd_client_connection;
+
 /**
- * Callback is called on client request completed
+ * Callback is called on client connection completed
  * @param name name of server
  * @param port port for server
  * @param result result object
  * @param ud opaque user data
- * @param err error pointer (should be freed if not NULL)
+ * @param err error pointer
  */
 typedef void (*rspamd_client_callback) (
                const gchar *name,
-               guint16 port,
                ucl_object_t *result,
                gpointer ud,
                GError *err);
@@ -46,22 +47,39 @@ typedef void (*rspamd_client_callback) (
  * Start rspamd worker or controller command
  * @param ev_base event base
  * @param name server name (hostname or unix socket)
- * @param command command name
- * @param attrs additional attributes
  * @param port port number (in host order)
  * @param timeout timeout in seconds
- * @param in input file or NULL if no input required
- * @param ud opaque user data
  * @return
  */
-gboolean rspamd_client_command (
+struct rspamd_client_connection* rspamd_client_init (
                struct event_base *ev_base,
                const gchar *name,
                guint16 port,
+               gdouble timeout);
+
+/**
+ *
+ * @param conn connection object
+ * @param command command name
+ * @param attrs additional attributes
+ * @param in input file or NULL if no input required
+ * @param cb callback to be called on command completion
+ * @param ud opaque user data
+ * @return
+ */
+gboolean rspamd_client_command (
+               struct rspamd_client_connection *conn,
                const gchar *command,
                GHashTable *attrs,
-               gdouble timeout,
                FILE *in,
-               gpointer ud);
+               rspamd_client_callback cb,
+               gpointer ud,
+               GError **err);
+
+/**
+ * Destroy a connection to rspamd
+ * @param conn
+ */
+void rspamd_client_destroy (struct rspamd_client_connection *conn);
 
 #endif /* RSPAMDCLIENT_H_ */
index 22c202b88a518d7a237112d908b98f642ad9c3f6..be1a389e0f59e1eca767fbf2cbb5af87f81db523 100644 (file)
@@ -688,8 +688,9 @@ rspamd_http_connection_write_message (struct rspamd_http_connection *conn,
        struct rspamd_http_connection_private *priv = conn->priv;
        struct rspamd_http_header *hdr;
        struct tm t, *ptm;
-       gchar datebuf[64];
+       gchar datebuf[64], *pbody;
        gint i;
+       gsize bodylen;
 
        conn->fd = fd;
        conn->ud = ud;
@@ -748,10 +749,18 @@ rspamd_http_connection_write_message (struct rspamd_http_connection *conn,
                                http_method_str (msg->method), msg->url, msg->body->len);
                }
        }
-
+       if (msg->body == NULL) {
+               pbody = NULL;
+               bodylen = 0;
+               priv->outlen = 2;
+       }
+       else {
+               pbody = msg->body->str;
+               bodylen = msg->body->len;
+               priv->outlen = 3;
+       }
        /* Allocate iov */
-       priv->outlen = 3;
-       priv->wr_total = msg->body->len + priv->buf->len + 2;
+       priv->wr_total = bodylen + priv->buf->len + 2;
        DL_FOREACH (msg->headers, hdr) {
                /* <name><: ><value><\r\n> */
                priv->wr_total += hdr->name->len + hdr->value->len + 4;
@@ -776,8 +785,10 @@ rspamd_http_connection_write_message (struct rspamd_http_connection *conn,
        }
        priv->out[i].iov_base = "\r\n";
        priv->out[i++].iov_len = 2;
-       priv->out[i].iov_base = msg->body->str;
-       priv->out[i++].iov_len = msg->body->len;
+       if (msg->body != NULL) {
+               priv->out[i].iov_base = pbody;
+               priv->out[i++].iov_len = bodylen;
+       }
 
        event_set (&priv->ev, fd, EV_WRITE, rspamd_http_event_handler, conn);
        event_base_set (base, &priv->ev);