From af204fee67890fd8e1043042f94ce2e9b9205bc6 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 18 Jul 2019 16:10:04 +0100 Subject: [PATCH] [Project] Add message block support to the client --- src/client/rspamc.c | 13 +++-- src/client/rspamdclient.c | 85 ++++++++++++++++++------------- src/client/rspamdclient.h | 2 + src/libserver/protocol.c | 2 +- src/libserver/protocol_internal.h | 1 + src/rspamd_proxy.c | 8 +-- 6 files changed, 67 insertions(+), 44 deletions(-) diff --git a/src/client/rspamc.c b/src/client/rspamc.c index 5b3f874ef..ea2fe0d4c 100644 --- a/src/client/rspamc.c +++ b/src/client/rspamc.c @@ -1522,6 +1522,7 @@ rspamc_client_cb (struct rspamd_client_connection *conn, struct rspamd_http_message *msg, const gchar *name, ucl_object_t *result, GString *input, gpointer ud, gdouble start_time, gdouble send_time, + const gchar *body, gsize bodylen, GError *err) { gchar *ucl_out; @@ -1529,8 +1530,6 @@ rspamc_client_cb (struct rspamd_client_connection *conn, struct rspamc_command *cmd; FILE *out = stdout; gdouble finish = rspamd_get_ticks (FALSE), diff; - const gchar *body; - gsize body_len; cmd = cbdata->cmd; @@ -1603,11 +1602,15 @@ rspamc_client_cb (struct rspamd_client_connection *conn, rspamd_fprintf (out, "%s\n", err->message); if (json && msg != NULL) { - body = rspamd_http_message_get_body (msg, &body_len); + const gchar *raw; + gsize rawlen; - if (body) { + raw = rspamd_http_message_get_body (msg, &rawlen); + + if (raw) { /* We can also output the resulting json */ - rspamd_fprintf (out, "%*s\n", (gint)body_len, body); + rspamd_fprintf (out, "%*s\n", (gint)rawlen - bodylen, + raw); } } } diff --git a/src/client/rspamdclient.c b/src/client/rspamdclient.c index c27ff9f0b..7b1bcb73e 100644 --- a/src/client/rspamdclient.c +++ b/src/client/rspamdclient.c @@ -17,6 +17,7 @@ #include "libutil/util.h" #include "libutil/http_connection.h" #include "libutil/http_private.h" +#include "libserver/protocol_internal.h" #include "unix-std.h" #include "contrib/zstd/zstd.h" #include "contrib/zstd/zdict.h" @@ -96,7 +97,7 @@ rspamd_client_error_handler (struct rspamd_http_connection *conn, GError *err) c = req->conn; req->cb (c, NULL, c->server_name->str, NULL, req->input, req->ud, - c->start_time, c->send_time, err); + c->start_time, c->send_time, NULL, 0, err); } static gint @@ -109,6 +110,9 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn, struct ucl_parser *parser; GError *err; const rspamd_ftok_t *tok; + const gchar *start, *body = NULL; + guchar *out = NULL; + gsize len, bodylen = 0; c = req->conn; @@ -119,6 +123,7 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn, rspamd_http_connection_read_message (c->http_conn, c->req, c->timeout); + return 0; } else { @@ -127,13 +132,13 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn, msg->code, (gint)msg->status->len, msg->status->str); req->cb (c, msg, c->server_name->str, NULL, req->input, req->ud, - c->start_time, c->send_time, err); + c->start_time, c->send_time, body, bodylen, err); g_error_free (err); return 0; } - tok = rspamd_http_message_find_header (msg, "compression"); + tok = rspamd_http_message_find_header (msg, COMPRESSION_HEADER); if (tok) { /* Need to uncompress */ @@ -146,7 +151,6 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn, ZSTD_DStream *zstream; ZSTD_inBuffer zin; ZSTD_outBuffer zout; - guchar *out; gsize outlen, r; zstream = ZSTD_createDStream (); @@ -174,12 +178,11 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn, ZSTD_getErrorName (r)); req->cb (c, msg, c->server_name->str, NULL, req->input, req->ud, c->start_time, - c->send_time, err); + c->send_time, body, bodylen, err); g_error_free (err); ZSTD_freeDStream (zstream); - g_free (out); - return 0; + goto end; } if (zout.pos == zout.size) { @@ -191,50 +194,64 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn, ZSTD_freeDStream (zstream); - parser = ucl_parser_new (0); - if (!ucl_parser_add_chunk (parser, zout.dst, zout.pos)) { - err = g_error_new (RCLIENT_ERROR, msg->code, "Cannot parse UCL: %s", - ucl_parser_get_error (parser)); - ucl_parser_free (parser); - req->cb (c, msg, c->server_name->str, NULL, req->input, - req->ud, c->start_time, c->send_time, err); - g_error_free (err); - g_free (zout.dst); - - return 0; - } - - g_free (zout.dst); + start = zout.dst; + len = zout.pos; } else { err = g_error_new (RCLIENT_ERROR, 500, "Invalid compression method"); req->cb (c, msg, c->server_name->str, NULL, - req->input, req->ud, c->start_time, c->send_time, err); + req->input, req->ud, c->start_time, c->send_time, + body, bodylen, err); g_error_free (err); return 0; } } else { - parser = ucl_parser_new (0); - if (!ucl_parser_add_chunk (parser, msg->body_buf.begin, msg->body_buf.len)) { - err = g_error_new (RCLIENT_ERROR, msg->code, "Cannot parse UCL: %s", - ucl_parser_get_error (parser)); - ucl_parser_free (parser); - req->cb (c, msg, c->server_name->str, NULL, - req->input, req->ud, c->start_time, c->send_time, err); - g_error_free (err); + start = msg->body_buf.begin; + len = msg->body_buf.len; + } - return 0; + /* Deal with body */ + tok = rspamd_http_message_find_header (msg, MESSAGE_OFFSET_HEADER); + + if (tok) { + gulong value = 0; + + if (rspamd_strtoul (tok->begin, tok->len, &value) && + value < len) { + body = start + value; + bodylen = len - value; + len = value; } } - req->cb (c, msg, c->server_name->str, ucl_parser_get_object ( - parser), req->input, req->ud, c->start_time, c->send_time, NULL); + parser = ucl_parser_new (0); + if (!ucl_parser_add_chunk (parser, start, len)) { + err = g_error_new (RCLIENT_ERROR, msg->code, "Cannot parse UCL: %s", + ucl_parser_get_error (parser)); + ucl_parser_free (parser); + req->cb (c, msg, c->server_name->str, NULL, + req->input, req->ud, + c->start_time, c->send_time, body, bodylen, err); + g_error_free (err); + + goto end; + } + + req->cb (c, msg, c->server_name->str, + ucl_parser_get_object (parser), + req->input, req->ud, + c->start_time, c->send_time, body, bodylen, NULL); ucl_parser_free (parser); } +end: + if (out) { + g_free (out); + } + return 0; } @@ -419,7 +436,7 @@ rspamd_client_command (struct rspamd_client_connection *conn, } if (compressed) { - rspamd_http_message_add_header (req->msg, "Compression", "zstd"); + rspamd_http_message_add_header (req->msg, COMPRESSION_HEADER, "zstd"); if (dict_id != 0) { gchar dict_str[32]; diff --git a/src/client/rspamdclient.h b/src/client/rspamdclient.h index dc274b7a8..b42abebda 100644 --- a/src/client/rspamdclient.h +++ b/src/client/rspamdclient.h @@ -49,6 +49,8 @@ typedef void (*rspamd_client_callback) ( gpointer ud, gdouble start_time, gdouble send_time, + const gchar *body, + gsize body_len, GError *err); struct rspamd_http_context; diff --git a/src/libserver/protocol.c b/src/libserver/protocol.c index e8bbd979b..dd840284a 100644 --- a/src/libserver/protocol.c +++ b/src/libserver/protocol.c @@ -1661,7 +1661,7 @@ rspamd_protocol_http_reply (struct rspamd_http_message *msg, compressed_reply->len = zout.pos; rspamd_fstring_free (reply); rspamd_http_message_set_body_from_fstring_steal (msg, compressed_reply); - rspamd_http_message_add_header (msg, "Compression", "zstd"); + rspamd_http_message_add_header (msg, COMPRESSION_HEADER, "zstd"); if (task->cfg->libs_ctx->out_dict && task->cfg->libs_ctx->out_dict->id != 0) { diff --git a/src/libserver/protocol_internal.h b/src/libserver/protocol_internal.h index d9616e03d..8cec5d80e 100644 --- a/src/libserver/protocol_internal.h +++ b/src/libserver/protocol_internal.h @@ -88,6 +88,7 @@ extern "C" { #define CERT_ISSUER_HEADER "TLS-Cert-Issuer" #define MAILER_HEADER "Mailer" #define RAW_DATA_HEADER "Raw" +#define COMPRESSION_HEADER "Compression" #define MESSAGE_OFFSET_HEADER "Message-Offset" #ifdef __cplusplus diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index cca9f792f..70e393ddd 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -1066,7 +1066,7 @@ proxy_request_compress (struct rspamd_http_message *msg) flags = rspamd_http_message_get_flags (msg); - if (!rspamd_http_message_find_header (msg, "Compression")) { + if (!rspamd_http_message_find_header (msg, COMPRESSION_HEADER)) { if ((flags & RSPAMD_HTTP_FLAG_SHMEM) || !(flags & RSPAMD_HTTP_FLAG_HAS_BODY)) { /* Cannot compress shared or empty message */ @@ -1094,7 +1094,7 @@ proxy_request_compress (struct rspamd_http_message *msg) ZSTD_freeCCtx (zctx); rspamd_http_message_set_body_from_fstring_steal (msg, body); - rspamd_http_message_add_header (msg, "Compression", "zstd"); + rspamd_http_message_add_header (msg, COMPRESSION_HEADER, "zstd"); } } @@ -1108,7 +1108,7 @@ proxy_request_decompress (struct rspamd_http_message *msg) ZSTD_inBuffer zin; ZSTD_outBuffer zout; - if (rspamd_http_message_find_header (msg, "Compression")) { + if (rspamd_http_message_find_header (msg, COMPRESSION_HEADER)) { in = rspamd_http_message_get_body (msg, &inlen); if (in == NULL || inlen == 0) { @@ -1154,7 +1154,7 @@ proxy_request_decompress (struct rspamd_http_message *msg) body->len = zout.pos; ZSTD_freeDStream (zstream); rspamd_http_message_set_body_from_fstring_steal (msg, body); - rspamd_http_message_remove_header (msg, "Compression"); + rspamd_http_message_remove_header (msg, COMPRESSION_HEADER); } } -- 2.39.5