#include "rspamdclient.h"
#include "util.h"
+#include "http.h"
+
+struct rspamd_client_request;
+
+/*
+ * Since rspamd uses untagged HTTP we can pass a single message per socket
+ */
+struct rspamd_client_connection {
+ gint fd;
+ GString *server_name;
+ struct event_base *ev_base;
+ struct timeval timeout;
+ struct rspamd_http_connection *http_conn;
+ gboolean connected;
+ struct rspamd_client_request *req;
+};
+
+struct rspamd_client_request {
+ struct rspamd_client_connection *conn;
+ struct rspamd_http_message *msg;
+ rspamd_client_callback cb;
+ gpointer ud;
+};
+
+#define RCLIENT_ERROR rspamd_client_error_quark ()
+GQuark
+rspamd_client_error_quark (void)
+{
+ return g_quark_from_static_string ("rspamd-client-error");
+}
+
+static gint
+rspamd_client_body_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg,
+ const gchar *chunk, gsize len)
+{
+ /* Do nothing here */
+ return 0;
+}
+
+static void
+rspamd_client_error_handler (struct rspamd_http_connection *conn, GError *err)
+{
+ struct rspamd_client_request *req = (struct rspamd_client_request *)conn->ud;
+ struct rspamd_client_connection *c;
+
+ c = req->conn;
+ req->cb (c->server_name->str, NULL, req->ud, err);
+}
+
+static void
+rspamd_client_finish_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg)
+{
+ struct rspamd_client_request *req = (struct rspamd_client_request *)conn->ud;
+ struct rspamd_client_connection *c;
+ struct ucl_parser *parser;
+ GError *err;
+
+ c = req->conn;
+ if (msg->body == NULL || msg->body->len == 0 || msg->code != 200) {
+ err = g_error_new (RCLIENT_ERROR, msg->code, "HTTP error occurred: %d", msg->code);
+ req->cb (c->server_name->str, NULL, req->ud, err);
+ g_error_free (err);
+ return;
+ }
+
+ parser = ucl_parser_new (0);
+ if (!ucl_parser_add_chunk (parser, msg->body->str, msg->body->len)) {
+ err = g_error_new (RCLIENT_ERROR, msg->code, "Cannot parse UCL: %s",
+ ucl_parser_get_error (parser));
+ ucl_parser_free (parser);
+ req->cb (c->server_name->str, NULL, req->ud, err);
+ g_error_free (err);
+ return;
+ }
+
+ req->cb (c->server_name->str, ucl_parser_get_object (parser), req->ud, NULL);
+ ucl_parser_free (parser);
+}
+
+struct rspamd_client_connection *
+rspamd_client_init (struct event_base *ev_base, const gchar *name,
+ guint16 port, gdouble timeout)
+{
+ struct rspamd_client_connection *conn;
+ gint fd;
+
+ fd = make_universal_socket (name, port, SOCK_STREAM, TRUE, FALSE, TRUE);
+ if (fd == -1) {
+ return NULL;
+ }
+
+ conn = g_slice_alloc (sizeof (struct rspamd_client_connection));
+ conn->ev_base = ev_base;
+ conn->fd = fd;
+ conn->connected = FALSE;
+ conn->http_conn = rspamd_http_connection_new (rspamd_client_body_handler,
+ rspamd_client_error_handler, rspamd_client_finish_handler, 0, RSPAMD_HTTP_CLIENT);
+ conn->server_name = g_string_new (name);
+ if (port != 0) {
+ rspamd_printf_gstring (conn->server_name, ":%d", (int)port);
+ }
+
+ double_to_tv (timeout, &conn->timeout);
+
+ return conn;
+}
gboolean
-rspamd_client_command (struct event_base *ev_base, const gchar *name,
- guint16 port, const gchar *command, GHashTable *attrs,
- gdouble timeout, FILE *in, gpointer ud)
+rspamd_client_command (struct rspamd_client_connection *conn,
+ const gchar *command, GHashTable *attrs,
+ FILE *in, rspamd_client_callback cb,
+ gpointer ud, GError **err)
{
+ struct rspamd_client_request *req;
+ gchar *p, *hn, *hv;
+ gsize remain;
+ GHashTableIter it;
+ req = g_slice_alloc (sizeof (struct rspamd_client_request));
+ req->conn = conn;
+ req->cb = cb;
+ req->ud = ud;
+
+ req->msg = rspamd_http_new_message (HTTP_REQUEST);
+ if (in != NULL) {
+ /* Read input stream */
+ req->msg->body = g_string_sized_new (BUFSIZ);
+ while (!feof (in)) {
+ p = req->msg->body->str + req->msg->body->len;
+ remain = req->msg->body->allocated_len - req->msg->body->len - 1;
+ if (remain == 0) {
+ g_string_set_size (req->msg->body, req->msg->body->len * 2);
+ }
+ remain = fread (p, 1, remain, in);
+ if (remain > 0) {
+ req->msg->body->len += remain;
+ req->msg->body->str[req->msg->body->len] = '\0';
+ }
+ }
+ if (ferror (in) != 0) {
+ g_set_error (err, RCLIENT_ERROR, ferror (in), "input IO error: %s", strerror (ferror (in)));
+ g_slice_free1 (sizeof (struct rspamd_client_request), req);
+ return FALSE;
+ }
+ }
+ else {
+ req->msg->body = NULL;
+ }
+
+ /* Convert headers */
+ g_hash_table_iter_init (&it, attrs);
+ while (g_hash_table_iter_next (&it, (gpointer *)&hn, (gpointer *)&hv)) {
+ rspamd_http_message_add_header (req->msg, hn, hv);
+ }
+
+ req->msg->url = g_string_new ("/");
+ g_string_append (req->msg->url, command);
+
+ conn->req = req;
+
+ rspamd_http_connection_write_message (conn->http_conn, req->msg, NULL,
+ "text/plain", req, conn->fd, &conn->timeout, conn->ev_base);
+
+ return TRUE;
+}
+
+void
+rspamd_client_destroy (struct rspamd_client_connection *conn)
+{
+ if (conn != NULL) {
+ rspamd_http_connection_free (conn->http_conn);
+ if (conn->req != NULL) {
+ rspamd_http_message_free (conn->req->msg);
+ g_slice_free1 (sizeof (struct rspamd_client_request), conn->req);
+ }
+ close (conn->fd);
+ g_string_free (conn->server_name, TRUE);
+ g_slice_free1 (sizeof (struct rspamd_client_connection), conn);
+ }
}
#include "config.h"
#include "ucl.h"
+struct rspamd_client_connection;
+
/**
- * Callback is called on client request completed
+ * Callback is called on client connection completed
* @param name name of server
* @param port port for server
* @param result result object
* @param ud opaque user data
- * @param err error pointer (should be freed if not NULL)
+ * @param err error pointer
*/
typedef void (*rspamd_client_callback) (
const gchar *name,
- guint16 port,
ucl_object_t *result,
gpointer ud,
GError *err);
* Start rspamd worker or controller command
* @param ev_base event base
* @param name server name (hostname or unix socket)
- * @param command command name
- * @param attrs additional attributes
* @param port port number (in host order)
* @param timeout timeout in seconds
- * @param in input file or NULL if no input required
- * @param ud opaque user data
* @return
*/
-gboolean rspamd_client_command (
+struct rspamd_client_connection* rspamd_client_init (
struct event_base *ev_base,
const gchar *name,
guint16 port,
+ gdouble timeout);
+
+/**
+ *
+ * @param conn connection object
+ * @param command command name
+ * @param attrs additional attributes
+ * @param in input file or NULL if no input required
+ * @param cb callback to be called on command completion
+ * @param ud opaque user data
+ * @return
+ */
+gboolean rspamd_client_command (
+ struct rspamd_client_connection *conn,
const gchar *command,
GHashTable *attrs,
- gdouble timeout,
FILE *in,
- gpointer ud);
+ rspamd_client_callback cb,
+ gpointer ud,
+ GError **err);
+
+/**
+ * Destroy a connection to rspamd
+ * @param conn
+ */
+void rspamd_client_destroy (struct rspamd_client_connection *conn);
#endif /* RSPAMDCLIENT_H_ */
struct rspamd_http_connection_private *priv = conn->priv;
struct rspamd_http_header *hdr;
struct tm t, *ptm;
- gchar datebuf[64];
+ gchar datebuf[64], *pbody;
gint i;
+ gsize bodylen;
conn->fd = fd;
conn->ud = ud;
http_method_str (msg->method), msg->url, msg->body->len);
}
}
-
+ if (msg->body == NULL) {
+ pbody = NULL;
+ bodylen = 0;
+ priv->outlen = 2;
+ }
+ else {
+ pbody = msg->body->str;
+ bodylen = msg->body->len;
+ priv->outlen = 3;
+ }
/* Allocate iov */
- priv->outlen = 3;
- priv->wr_total = msg->body->len + priv->buf->len + 2;
+ priv->wr_total = bodylen + priv->buf->len + 2;
DL_FOREACH (msg->headers, hdr) {
/* <name><: ><value><\r\n> */
priv->wr_total += hdr->name->len + hdr->value->len + 4;
}
priv->out[i].iov_base = "\r\n";
priv->out[i++].iov_len = 2;
- priv->out[i].iov_base = msg->body->str;
- priv->out[i++].iov_len = msg->body->len;
+ if (msg->body != NULL) {
+ priv->out[i].iov_base = pbody;
+ priv->out[i++].iov_len = bodylen;
+ }
event_set (&priv->ev, fd, EV_WRITE, rspamd_http_event_handler, conn);
event_base_set (base, &priv->ev);