]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Add message block support to the client
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 18 Jul 2019 15:10:04 +0000 (16:10 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 18 Jul 2019 15:10:04 +0000 (16:10 +0100)
src/client/rspamc.c
src/client/rspamdclient.c
src/client/rspamdclient.h
src/libserver/protocol.c
src/libserver/protocol_internal.h
src/rspamd_proxy.c

index 5b3f874ef98c8ba75ebc3afa3dbe1d6ba796792f..ea2fe0d4c5abf5b18a84e05c4cbc6eeb07ac0690 100644 (file)
@@ -1522,6 +1522,7 @@ rspamc_client_cb (struct rspamd_client_connection *conn,
                struct rspamd_http_message *msg,
                const gchar *name, ucl_object_t *result, GString *input,
                gpointer ud, gdouble start_time, gdouble send_time,
+               const gchar *body, gsize bodylen,
                GError *err)
 {
        gchar *ucl_out;
@@ -1529,8 +1530,6 @@ rspamc_client_cb (struct rspamd_client_connection *conn,
        struct rspamc_command *cmd;
        FILE *out = stdout;
        gdouble finish = rspamd_get_ticks (FALSE), diff;
-       const gchar *body;
-       gsize body_len;
 
        cmd = cbdata->cmd;
 
@@ -1603,11 +1602,15 @@ rspamc_client_cb (struct rspamd_client_connection *conn,
                                rspamd_fprintf (out, "%s\n", err->message);
 
                                if (json && msg != NULL) {
-                                       body = rspamd_http_message_get_body (msg, &body_len);
+                                       const gchar *raw;
+                                       gsize rawlen;
 
-                                       if (body) {
+                                       raw = rspamd_http_message_get_body (msg, &rawlen);
+
+                                       if (raw) {
                                                /* We can also output the resulting json */
-                                               rspamd_fprintf (out, "%*s\n", (gint)body_len, body);
+                                               rspamd_fprintf (out, "%*s\n", (gint)rawlen - bodylen,
+                                                               raw);
                                        }
                                }
                        }
index c27ff9f0bf4307404bbeba9ace750677407e33be..7b1bcb73e59d95256fa71299ccb79b4bccc01830 100644 (file)
@@ -17,6 +17,7 @@
 #include "libutil/util.h"
 #include "libutil/http_connection.h"
 #include "libutil/http_private.h"
+#include "libserver/protocol_internal.h"
 #include "unix-std.h"
 #include "contrib/zstd/zstd.h"
 #include "contrib/zstd/zdict.h"
@@ -96,7 +97,7 @@ rspamd_client_error_handler (struct rspamd_http_connection *conn, GError *err)
        c = req->conn;
        req->cb (c, NULL, c->server_name->str, NULL,
                        req->input, req->ud,
-                       c->start_time, c->send_time, err);
+                       c->start_time, c->send_time, NULL, 0, err);
 }
 
 static gint
@@ -109,6 +110,9 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
        struct ucl_parser *parser;
        GError *err;
        const rspamd_ftok_t *tok;
+       const gchar *start, *body = NULL;
+       guchar *out = NULL;
+       gsize len, bodylen = 0;
 
        c = req->conn;
 
@@ -119,6 +123,7 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
                rspamd_http_connection_read_message (c->http_conn,
                        c->req,
                        c->timeout);
+
                return 0;
        }
        else {
@@ -127,13 +132,13 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
                                        msg->code,
                                        (gint)msg->status->len, msg->status->str);
                        req->cb (c, msg, c->server_name->str, NULL, req->input, req->ud,
-                                       c->start_time, c->send_time, err);
+                                       c->start_time, c->send_time, body, bodylen, err);
                        g_error_free (err);
 
                        return 0;
                }
 
-               tok = rspamd_http_message_find_header (msg, "compression");
+               tok = rspamd_http_message_find_header (msg, COMPRESSION_HEADER);
 
                if (tok) {
                        /* Need to uncompress */
@@ -146,7 +151,6 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
                                ZSTD_DStream *zstream;
                                ZSTD_inBuffer zin;
                                ZSTD_outBuffer zout;
-                               guchar *out;
                                gsize outlen, r;
 
                                zstream = ZSTD_createDStream ();
@@ -174,12 +178,11 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
                                                                ZSTD_getErrorName (r));
                                                req->cb (c, msg, c->server_name->str, NULL,
                                                                req->input, req->ud, c->start_time,
-                                                               c->send_time, err);
+                                                               c->send_time, body, bodylen, err);
                                                g_error_free (err);
                                                ZSTD_freeDStream (zstream);
-                                               g_free (out);
 
-                                               return 0;
+                                               goto end;
                                        }
 
                                        if (zout.pos == zout.size) {
@@ -191,50 +194,64 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
 
                                ZSTD_freeDStream (zstream);
 
-                               parser = ucl_parser_new (0);
-                               if (!ucl_parser_add_chunk (parser, zout.dst, zout.pos)) {
-                                       err = g_error_new (RCLIENT_ERROR, msg->code, "Cannot parse UCL: %s",
-                                                       ucl_parser_get_error (parser));
-                                       ucl_parser_free (parser);
-                                       req->cb (c, msg, c->server_name->str, NULL, req->input,
-                                                       req->ud, c->start_time, c->send_time, err);
-                                       g_error_free (err);
-                                       g_free (zout.dst);
-
-                                       return 0;
-                               }
-
-                               g_free (zout.dst);
+                               start = zout.dst;
+                               len = zout.pos;
                        }
                        else {
                                err = g_error_new (RCLIENT_ERROR, 500,
                                                "Invalid compression method");
                                req->cb (c, msg, c->server_name->str, NULL,
-                                               req->input, req->ud, c->start_time, c->send_time, err);
+                                               req->input, req->ud, c->start_time, c->send_time,
+                                               body, bodylen, err);
                                g_error_free (err);
 
                                return 0;
                        }
                }
                else {
-                       parser = ucl_parser_new (0);
-                       if (!ucl_parser_add_chunk (parser, msg->body_buf.begin, msg->body_buf.len)) {
-                               err = g_error_new (RCLIENT_ERROR, msg->code, "Cannot parse UCL: %s",
-                                               ucl_parser_get_error (parser));
-                               ucl_parser_free (parser);
-                               req->cb (c, msg, c->server_name->str, NULL,
-                                               req->input, req->ud, c->start_time, c->send_time, err);
-                               g_error_free (err);
+                       start = msg->body_buf.begin;
+                       len = msg->body_buf.len;
+               }
 
-                               return 0;
+               /* Deal with body */
+               tok = rspamd_http_message_find_header (msg, MESSAGE_OFFSET_HEADER);
+
+               if (tok) {
+                       gulong value = 0;
+
+                       if (rspamd_strtoul (tok->begin, tok->len, &value) &&
+                                       value < len) {
+                               body = start + value;
+                               bodylen = len - value;
+                               len = value;
                        }
                }
 
-               req->cb (c, msg, c->server_name->str, ucl_parser_get_object (
-                               parser), req->input, req->ud, c->start_time, c->send_time, NULL);
+               parser = ucl_parser_new (0);
+               if (!ucl_parser_add_chunk (parser, start, len)) {
+                       err = g_error_new (RCLIENT_ERROR, msg->code, "Cannot parse UCL: %s",
+                                       ucl_parser_get_error (parser));
+                       ucl_parser_free (parser);
+                       req->cb (c, msg, c->server_name->str, NULL,
+                                       req->input, req->ud,
+                                       c->start_time, c->send_time, body, bodylen, err);
+                       g_error_free (err);
+
+                       goto end;
+               }
+
+               req->cb (c, msg, c->server_name->str,
+                               ucl_parser_get_object (parser),
+                               req->input, req->ud,
+                               c->start_time, c->send_time, body, bodylen, NULL);
                ucl_parser_free (parser);
        }
 
+end:
+       if (out) {
+               g_free (out);
+       }
+
        return 0;
 }
 
@@ -419,7 +436,7 @@ rspamd_client_command (struct rspamd_client_connection *conn,
        }
 
        if (compressed) {
-               rspamd_http_message_add_header (req->msg, "Compression", "zstd");
+               rspamd_http_message_add_header (req->msg, COMPRESSION_HEADER, "zstd");
 
                if (dict_id != 0) {
                        gchar dict_str[32];
index dc274b7a8757b4fe094f022cf36313e3ec3bf1a0..b42abebda6d2b9c4df937c2a9812fec5058259fd 100644 (file)
@@ -49,6 +49,8 @@ typedef void (*rspamd_client_callback) (
                gpointer ud,
                gdouble start_time,
                gdouble send_time,
+               const gchar *body,
+               gsize body_len,
                GError *err);
 
 struct rspamd_http_context;
index e8bbd979ba7f4dda53ea5aa7a922d36e8d58b215..dd840284aa57a13171a350949323622bc042d70a 100644 (file)
@@ -1661,7 +1661,7 @@ rspamd_protocol_http_reply (struct rspamd_http_message *msg,
                compressed_reply->len = zout.pos;
                rspamd_fstring_free (reply);
                rspamd_http_message_set_body_from_fstring_steal (msg, compressed_reply);
-               rspamd_http_message_add_header (msg, "Compression", "zstd");
+               rspamd_http_message_add_header (msg, COMPRESSION_HEADER, "zstd");
 
                if (task->cfg->libs_ctx->out_dict &&
                                task->cfg->libs_ctx->out_dict->id != 0) {
index d9616e03dcced7622ab3862747272526ace9ba6c..8cec5d80e6f3eeae3bd935366ebda0878673b28d 100644 (file)
@@ -88,6 +88,7 @@ extern "C" {
 #define CERT_ISSUER_HEADER "TLS-Cert-Issuer"
 #define MAILER_HEADER "Mailer"
 #define RAW_DATA_HEADER "Raw"
+#define COMPRESSION_HEADER "Compression"
 #define MESSAGE_OFFSET_HEADER "Message-Offset"
 
 #ifdef  __cplusplus
index cca9f792fbd7d56dc2998be12f2d47fc34e2c140..70e393dddc91666e8a5de4265d6955c7506d57ce 100644 (file)
@@ -1066,7 +1066,7 @@ proxy_request_compress (struct rspamd_http_message *msg)
 
        flags = rspamd_http_message_get_flags (msg);
 
-       if (!rspamd_http_message_find_header (msg, "Compression")) {
+       if (!rspamd_http_message_find_header (msg, COMPRESSION_HEADER)) {
                if ((flags & RSPAMD_HTTP_FLAG_SHMEM) ||
                                !(flags & RSPAMD_HTTP_FLAG_HAS_BODY)) {
                        /* Cannot compress shared or empty message */
@@ -1094,7 +1094,7 @@ proxy_request_compress (struct rspamd_http_message *msg)
 
                ZSTD_freeCCtx (zctx);
                rspamd_http_message_set_body_from_fstring_steal (msg, body);
-               rspamd_http_message_add_header (msg, "Compression", "zstd");
+               rspamd_http_message_add_header (msg, COMPRESSION_HEADER, "zstd");
        }
 }
 
@@ -1108,7 +1108,7 @@ proxy_request_decompress (struct rspamd_http_message *msg)
        ZSTD_inBuffer zin;
        ZSTD_outBuffer zout;
 
-       if (rspamd_http_message_find_header (msg, "Compression")) {
+       if (rspamd_http_message_find_header (msg, COMPRESSION_HEADER)) {
                in = rspamd_http_message_get_body (msg, &inlen);
 
                if (in == NULL || inlen == 0) {
@@ -1154,7 +1154,7 @@ proxy_request_decompress (struct rspamd_http_message *msg)
                body->len = zout.pos;
                ZSTD_freeDStream (zstream);
                rspamd_http_message_set_body_from_fstring_steal (msg, body);
-               rspamd_http_message_remove_header (msg, "Compression");
+               rspamd_http_message_remove_header (msg, COMPRESSION_HEADER);
        }
 }