aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorcebka@cebka-laptop <cebka@cebka-laptop>2008-10-10 19:21:23 +0400
committercebka@cebka-laptop <cebka@cebka-laptop>2008-10-10 19:21:23 +0400
commit6c55c3314c9599fa1d97d7afd6a31b1982805265 (patch)
treefc81201dee4091545addf6119bdd35d81ec1f177
parent9571811836cf8afe7733f0599976e90e3cbde560 (diff)
downloadrspamd-6c55c3314c9599fa1d97d7afd6a31b1982805265.tar.gz
rspamd-6c55c3314c9599fa1d97d7afd6a31b1982805265.zip
* Write protocol output functions
* Fix test suite build under linux
-rw-r--r--cfg_file.h2
-rw-r--r--main.h5
-rw-r--r--protocol.c218
-rw-r--r--protocol.h5
-rw-r--r--test/rspamd_test_suite.c4
-rw-r--r--url.h2
-rw-r--r--worker.c11
7 files changed, 240 insertions, 7 deletions
diff --git a/cfg_file.h b/cfg_file.h
index 6eb441c71..5b4e2d84b 100644
--- a/cfg_file.h
+++ b/cfg_file.h
@@ -8,7 +8,7 @@
#include "config.h"
#include <sys/types.h>
-#ifndef OWN_QUEUE_H
+#ifndef HAVE_OWN_QUEUE_H
#include <sys/queue.h>
#else
#include "queue.h"
diff --git a/main.h b/main.h
index 5877d12d0..adefa576f 100644
--- a/main.h
+++ b/main.h
@@ -128,8 +128,13 @@ struct worker_task {
TAILQ_HEAD (uriq, uri) urls;
/* Hash of metric result structures */
GHashTable *results;
+ /* Config file to write to */
struct config_file *cfg;
+ /* Save point for filters deferred processing */
struct save_point save;
+ /* Saved error message and code */
+ char *last_error;
+ int error_code;
/* Memory pool that is associated with this task */
memory_pool_t *task_pool;
};
diff --git a/protocol.c b/protocol.c
index 4a5ec5360..832a14281 100644
--- a/protocol.c
+++ b/protocol.c
@@ -4,6 +4,9 @@
#include <glib.h>
#include "main.h"
+#define CRLF "\r\n"
+/* Max line size as it is defined in rfc2822 */
+#define OUTBUFSIZ 1000
/*
* Just check if the passed message is spam or not and reply as
* described below
@@ -53,6 +56,15 @@
#define IP_ADDR_HEADER "IP"
#define NRCPT_HEADER "Recipient-Number"
#define RCPT_HEADER "Rcpt"
+#define ERROR_HEADER "Error"
+/*
+ * Reply messages
+ */
+#define RSPAMD_REPLY_BANNER "RSPAMD/1.0"
+#define SPAMD_REPLY_BANNER "SPAMD/1.1"
+#define SPAMD_OK "EX_OK"
+/* XXX: try to convert rspamd errors to spamd errors */
+#define SPAMD_ERROR "EX_ERROR"
static int
parse_command (struct worker_task *task, char *line)
@@ -252,3 +264,209 @@ read_rspamd_input_line (struct worker_task *task, char *line)
break;
}
}
+
+static void
+show_url_header (struct worker_task *task)
+{
+ int r = 0;
+ char outbuf[OUTBUFSIZ], c;
+ struct uri *url;
+ f_str_t host;
+
+ r = snprintf (outbuf, sizeof (outbuf), "Urls: ");
+ TAILQ_FOREACH (url, &task->urls, next) {
+ host.begin = url->host;
+ host.len = url->hostlen;
+ /* Skip long hosts to avoid protocol coollisions */
+ if (host.len > OUTBUFSIZ) {
+ continue;
+ }
+ /* Do header folding */
+ if (host.len + r >= OUTBUFSIZ - 3) {
+ outbuf[r ++] = '\r'; outbuf[r ++] = '\n'; outbuf[r] = ' ';
+ bufferevent_write (task->bev, outbuf, r);
+ r = 0;
+ }
+ /* Write url host to buf */
+ if (TAILQ_NEXT (url, next) != NULL) {
+ c = *(host.begin + host.len);
+ *(host.begin + host.len) = '\0';
+ r += snprintf (outbuf, sizeof (outbuf) - r, "%s, ", host.begin);
+ *(host.begin + host.len) = c;
+ }
+ else {
+ c = *(host.begin + host.len);
+ *(host.begin + host.len) = '\0';
+ r += snprintf (outbuf, sizeof (outbuf) - r, "%s" CRLF, host.begin);
+ *(host.begin + host.len) = c;
+ }
+ }
+ bufferevent_write (task->bev, outbuf, r);
+}
+
+static void
+show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data)
+{
+ struct worker_task *task = (struct worker_task *)user_data;
+ int r;
+ char outbuf[OUTBUFSIZ];
+ struct metric_result *metric_res = (struct metric_result *)metric_value;
+ int is_spam = 0;
+
+ if (metric_res->score >= metric_res->metric->required_score) {
+ is_spam = 1;
+ }
+ if (task->proto == SPAMC_PROTO) {
+ r = snprintf (outbuf, sizeof (outbuf), "Spam: %s ; %.2f / %.2f" CRLF,
+ (is_spam) ? "True" : "False", metric_res->score, metric_res->metric->required_score);
+ }
+ else {
+ r = snprintf (outbuf, sizeof (outbuf), "%s: %s ; %.2f / %.2f" CRLF, (char *)metric_name,
+ (is_spam) ? "True" : "False", metric_res->score, metric_res->metric->required_score);
+ }
+ bufferevent_write (task->bev, outbuf, r);
+}
+
+static int
+write_check_reply (struct worker_task *task)
+{
+ int r;
+ char outbuf[OUTBUFSIZ];
+ struct metric_result *metric_res;
+
+ r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER);
+ bufferevent_write (task->bev, outbuf, r);
+ if (task->proto == SPAMC_PROTO) {
+ /* Ignore metrics, just write report for 'default' metric */
+ metric_res = g_hash_table_lookup (task->results, "default");
+ if (metric_res == NULL) {
+ return -1;
+ }
+ else {
+ show_metric_result ((gpointer)"default", (gpointer)metric_res, (void *)task);
+ }
+ }
+ else {
+ /* Write result for each metric separately */
+ g_hash_table_foreach (task->results, show_metric_result, task);
+ /* URL stat */
+ show_url_header (task);
+ }
+ bufferevent_write (task->bev, CRLF, sizeof (CRLF) - 1);
+
+ return 0;
+}
+
+static void
+show_metric_symbols (gpointer metric_name, gpointer metric_value, void *user_data)
+{
+ struct worker_task *task = (struct worker_task *)user_data;
+ int r = 0;
+ char outbuf[OUTBUFSIZ];
+ struct filter_result *result;
+ struct metric_result *metric_res = (struct metric_result *)metric_value;
+
+ if (task->proto == RSPAMC_PROTO) {
+ r = snprintf (outbuf, sizeof (outbuf), "%s: ", (char *)metric_name);
+ }
+
+ LIST_FOREACH (result, &metric_res->results, next) {
+ if (result->flag) {
+ if (LIST_NEXT (result, next) != NULL) {
+ r += snprintf (outbuf + r, sizeof (outbuf) - r, "%s,", result->symbol);
+ }
+ else {
+ r += snprintf (outbuf + r, sizeof (outbuf) - r, "%s", result->symbol);
+ }
+ }
+ }
+ outbuf[r++] = '\r'; outbuf[r] = '\n';
+ bufferevent_write (task->bev, outbuf, r);
+}
+
+static int
+write_symbols_reply (struct worker_task *task)
+{
+ struct metric_result *metric_res;
+
+ /* First of all write normal results by calling write_check_reply */
+ if (write_check_reply (task) == -1) {
+ return -1;
+ }
+ /* Now write symbols */
+ if (task->proto == SPAMC_PROTO) {
+ /* Ignore metrics, just write report for 'default' metric */
+ metric_res = g_hash_table_lookup (task->results, "default");
+ if (metric_res == NULL) {
+ return -1;
+ }
+ else {
+ show_metric_symbols ((gpointer)"default", (gpointer)metric_res, (void *)task);
+ }
+ }
+ else {
+ /* Write result for each metric separately */
+ g_hash_table_foreach (task->results, show_metric_symbols, task);
+ }
+ return 0;
+}
+
+static int
+write_process_reply (struct worker_task *task)
+{
+ int r;
+ char outbuf[OUTBUFSIZ];
+
+ r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF "Content-Length: %zd" CRLF CRLF,
+ (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, task->msg->buf->len);
+ bufferevent_write (task->bev, outbuf, r);
+ bufferevent_write (task->bev, task->msg->buf->begin, task->msg->buf->len);
+
+ return 0;
+}
+
+int
+write_reply (struct worker_task *task)
+{
+ int r;
+ char outbuf[OUTBUFSIZ];
+
+ if (task->error_code != 0) {
+ /* Write error message and error code to reply */
+ if (task->proto == SPAMC_PROTO) {
+ r = snprintf (outbuf, sizeof (outbuf), "%s %d %s" CRLF CRLF, SPAMD_REPLY_BANNER, task->error_code, SPAMD_ERROR);
+ }
+ else {
+ r = snprintf (outbuf, sizeof (outbuf), "%s %d %s" CRLF "%s: %s" CRLF CRLF, RSPAMD_REPLY_BANNER, task->error_code,
+ SPAMD_ERROR, ERROR_HEADER, task->last_error);
+ }
+ /* Write to bufferevent error message */
+ bufferevent_write (task->bev, outbuf, r);
+ }
+ else {
+ switch (task->cmd) {
+ case CMD_REPORT_IFSPAM:
+ case CMD_REPORT:
+ case CMD_CHECK:
+ return write_check_reply (task);
+ break;
+ case CMD_SYMBOLS:
+ return write_symbols_reply (task);
+ break;
+ case CMD_PROCESS:
+ return write_process_reply (task);
+ break;
+ case CMD_SKIP:
+ r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER,
+ SPAMD_OK);
+ bufferevent_write (task->bev, outbuf, r);
+ break;
+ case CMD_PING:
+ r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER);
+ bufferevent_write (task->bev, outbuf, r);
+ break;
+ }
+ }
+
+ return 0;
+}
diff --git a/protocol.h b/protocol.h
index 17a7a9315..50b458de0 100644
--- a/protocol.h
+++ b/protocol.h
@@ -3,6 +3,10 @@
#include "config.h"
+#define RSPAMD_FILTER_ERROR 1
+#define RSPAMD_NETWORK_ERROR 2
+#define RSPAMD_PROTOCOL_ERROR 3
+
struct worker_task;
enum rspamd_protocol {
@@ -21,5 +25,6 @@ enum rspamd_command {
};
int read_rspamd_input_line (struct worker_task *task, char *line);
+int write_reply (struct worker_task *task);
#endif
diff --git a/test/rspamd_test_suite.c b/test/rspamd_test_suite.c
index c7c239f87..90e64b112 100644
--- a/test/rspamd_test_suite.c
+++ b/test/rspamd_test_suite.c
@@ -13,6 +13,10 @@
#include "../cfg_file.h"
#include "tests.h"
+#ifdef HAVE_STRLCPY_H
+#include "../strlcpy.c"
+#endif
+
int
main (int argc, char **argv)
{
diff --git a/url.h b/url.h
index 7a666f406..2d9acaaf5 100644
--- a/url.h
+++ b/url.h
@@ -4,7 +4,7 @@
#include <sys/types.h>
#include <sys/socket.h>
-#ifndef OWN_QUEUE_H
+#ifndef HAVE_OWN_QUEUE_H
#include <sys/queue.h>
#else
#include "queue.h"
diff --git a/worker.c b/worker.c
index 0c67fe7ce..0b1594ba7 100644
--- a/worker.c
+++ b/worker.c
@@ -201,6 +201,8 @@ read_socket (struct bufferevent *bev, void *arg)
case READ_HEADER:
s = evbuffer_readline (EVBUFFER_INPUT (bev));
if (read_rspamd_input_line (task, s) != 0) {
+ task->last_error = "Read error";
+ task->error_code = RSPAMD_NETWORK_ERROR;
task->state = WRITE_ERROR;
}
free (s);
@@ -213,6 +215,8 @@ read_socket (struct bufferevent *bev, void *arg)
if (task->msg->free == 0) {
r = process_message (task);
if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
}
else if (r == 1) {
@@ -229,15 +233,14 @@ read_socket (struct bufferevent *bev, void *arg)
break;
case WAIT_FILTER:
bufferevent_disable (bev, EV_READ);
- bufferevent_disable (bev, EV_READ);
break;
case WRITE_REPLY:
- r = bufferevent_write (bev, "Ok\r\n", sizeof ("Ok\r\n") - 1);
+ write_reply (task);
bufferevent_disable (bev, EV_READ);
bufferevent_enable (bev, EV_WRITE);
break;
case WRITE_ERROR:
- r = bufferevent_write (bev, "Error\r\n", sizeof ("Error\r\n") - 1);
+ write_reply (task);
bufferevent_disable (bev, EV_READ);
bufferevent_enable (bev, EV_WRITE);
break;
@@ -293,8 +296,6 @@ accept_socket (int fd, short what, void *arg)
bzero (new_task, sizeof (struct worker_task));
new_task->worker = worker;
new_task->state = READ_COMMAND;
- new_task->content_length = 0;
- new_task->parts_count = 0;
new_task->cfg = worker->srv->cfg;
TAILQ_INIT (&new_task->urls);
TAILQ_INIT (&new_task->parts);