static GList *custom_commands = NULL;
-/* XXX: remove this legacy sometimes */
-static const gchar *
-str_action_metric_spamc (enum rspamd_metric_action action)
-{
- switch (action) {
- case METRIC_ACTION_REJECT:
- return "reject";
- case METRIC_ACTION_SOFT_REJECT:
- return "soft reject";
- case METRIC_ACTION_REWRITE_SUBJECT:
- return "rewrite subject";
- case METRIC_ACTION_ADD_HEADER:
- return "add header";
- case METRIC_ACTION_GREYLIST:
- return "greylist";
- case METRIC_ACTION_NOACTION:
- return "no action";
- case METRIC_ACTION_MAX:
- return "invalid max action";
- }
- return "unknown action";
-}
-
-static inline const gchar *
-rspamc_proto_str (guint ver)
+/*
+ * Remove <> from the fixed string and copy it to the pool
+ */
+static gchar *
+rspamd_protocol_escape_braces (GString *in)
{
+ gint len = 0;
+ gchar *orig, *p;
- if (G_LIKELY (ver == 12)) {
- return "1.2";
+ orig = in->str;
+ while ((g_ascii_isspace (*orig) || *orig == '<') && orig - in->str < (gint)in->len) {
+ orig ++;
}
- else if (G_UNLIKELY (ver == 11)) {
- return "1.1";
- }
- else if (G_UNLIKELY (ver == 13)) {
- return "1.3";
- }
- else if (G_UNLIKELY (ver == 14)) {
- return "1.4";
- }
- else if (G_UNLIKELY (ver == 15)) {
- return "1.5";
- }
- else {
- return "1.0";
- }
-}
-gchar *
-separate_command (f_str_t * in, gchar c)
-{
- guint r = 0;
- gchar *p = in->begin, *b;
- b = p;
+ g_string_erase (in, 0, orig - in->str);
- while (r < in->len) {
- if (*p == c) {
- *p = '\0';
- in->begin = p + 1;
- in->len -= r + 1;
- return b;
- }
- else if (*p == '\0') {
- /* Actually we cannot allow several \0 characters in string, so write to the log about it */
- msg_warn ("cannot separate command with \0 character, this can be an attack attempt");
- return NULL;
- }
- p++;
- r++;
+ p = orig;
+ while ((!g_ascii_isspace (*p) && *p != '>') && p - in->str < (gint)in->len) {
+ p ++;
+ len ++;
}
- return NULL;
+ g_string_truncate (in, len);
+
+ return in->str;
}
static gboolean
-parse_check_command (struct worker_task *task, gchar *token)
+rspamd_protocol_handle_url (struct worker_task *task, struct rspamd_http_message *msg)
{
GList *cur;
struct custom_command *cmd;
+ const gchar *p;
+
+ if (msg->url == NULL || msg->url->len == 0) {
+ task->last_error = "command is absent";
+ task->error_code = 400;
+ return FALSE;
+ }
+
+ if (msg->url->str[0] == '/') {
+ p = &msg->url->str[1];
+ }
+ else {
+ p = msg->url->str;
+ }
- switch (token[0]) {
+ switch (*p) {
case 'c':
case 'C':
/* check */
- if (g_ascii_strcasecmp (token + 1, MSG_CMD_CHECK + 1) == 0) {
+ if (g_ascii_strcasecmp (p + 1, MSG_CMD_CHECK + 1) == 0) {
task->cmd = CMD_CHECK;
}
else {
- debug_task ("bad command: %s", token);
- return FALSE;
+ goto err;
}
break;
case 's':
case 'S':
/* symbols, skip */
- if (g_ascii_strcasecmp (token + 1, MSG_CMD_SYMBOLS + 1) == 0) {
+ if (g_ascii_strcasecmp (p + 1, MSG_CMD_SYMBOLS + 1) == 0) {
task->cmd = CMD_SYMBOLS;
}
- else if (g_ascii_strcasecmp (token + 1, MSG_CMD_SKIP + 1) == 0) {
+ else if (g_ascii_strcasecmp (p + 1, MSG_CMD_SKIP + 1) == 0) {
task->cmd = CMD_SKIP;
}
else {
- debug_task ("bad command: %s", token);
- return FALSE;
+ goto err;
}
break;
case 'p':
case 'P':
/* ping, process */
- if (g_ascii_strcasecmp (token + 1, MSG_CMD_PING + 1) == 0) {
+ if (g_ascii_strcasecmp (p + 1, MSG_CMD_PING + 1) == 0) {
task->cmd = CMD_PING;
}
- else if (g_ascii_strcasecmp (token + 1, MSG_CMD_PROCESS + 1) == 0) {
+ else if (g_ascii_strcasecmp (p + 1, MSG_CMD_PROCESS + 1) == 0) {
task->cmd = CMD_PROCESS;
}
else {
- debug_task ("bad command: %s", token);
- return FALSE;
+ goto err;
}
break;
case 'r':
case 'R':
/* report, report_ifspam */
- if (g_ascii_strcasecmp (token + 1, MSG_CMD_REPORT + 1) == 0) {
+ if (g_ascii_strcasecmp (p + 1, MSG_CMD_REPORT + 1) == 0) {
task->cmd = CMD_REPORT;
}
- else if (g_ascii_strcasecmp (token + 1, MSG_CMD_REPORT_IFSPAM + 1) == 0) {
+ else if (g_ascii_strcasecmp (p + 1, MSG_CMD_REPORT_IFSPAM + 1) == 0) {
task->cmd = CMD_REPORT_IFSPAM;
}
else {
- debug_task ("bad command: %s", token);
- return FALSE;
- }
- break;
- case 'l':
- case 'L':
- if (g_ascii_strcasecmp (token + 1, MSG_CMD_LEARN + 1) == 0) {
- if (task->allow_learn) {
- task->cmd = CMD_LEARN;
- }
- else {
- msg_info ("learning is disabled");
- return FALSE;
- }
- }
- else {
- debug_task ("bad command: %s", token);
- return FALSE;
+ goto err;
}
break;
default:
cur = custom_commands;
while (cur) {
cmd = cur->data;
- if (g_ascii_strcasecmp (token, cmd->name) == 0) {
+ if (g_ascii_strcasecmp (p, cmd->name) == 0) {
task->cmd = CMD_OTHER;
task->custom_cmd = cmd;
break;
}
if (cur == NULL) {
- debug_task ("bad command: %s", token);
- return FALSE;
+ goto err;
}
break;
}
return TRUE;
+
+err:
+ debug_task ("bad command: %s", p);
+ task->last_error = "invalid command";
+ task->error_code = 400;
+ return FALSE;
}
static gboolean
-parse_rspamc_command (struct worker_task *task, f_str_t * line)
+rspamd_protocol_handle_headers (struct worker_task *task, struct rspamd_http_message *msg)
{
- gchar *token;
+ gchar *headern, *err, *tmp;
+ gboolean res = TRUE;
+ struct rspamd_http_header *h;
- /* Separate line */
- token = separate_command (line, ' ');
- if (line == NULL || token == NULL) {
- debug_task ("bad command");
- return FALSE;
- }
+ LL_FOREACH (msg->headers, h) {
+ headern = h->name->str;
- if (!parse_check_command (task, token)) {
- return FALSE;
- }
-
- if (g_ascii_strncasecmp (line->begin, RSPAMC_GREETING, sizeof (RSPAMC_GREETING) - 1) == 0) {
- task->proto = RSPAMC_PROTO;
- task->proto_ver = 10;
- if (*(line->begin + sizeof (RSPAMC_GREETING) - 1) == '/') {
- /* Extract protocol version */
- token = line->begin + sizeof (RSPAMC_GREETING);
- if (strncmp (token, RSPAMC_PROTO_1_1, sizeof (RSPAMC_PROTO_1_1) - 1) == 0) {
- task->proto_ver = 11;
+ switch (headern[0]) {
+ case 'd':
+ case 'D':
+ if (g_ascii_strcasecmp (headern, DELIVER_TO_HEADER) == 0) {
+ task->deliver_to = rspamd_protocol_escape_braces (h->value);
+ debug_task ("read deliver-to header, value: %s", task->deliver_to);
}
- else if (strncmp (token, RSPAMC_PROTO_1_2, sizeof (RSPAMC_PROTO_1_2) - 1) == 0) {
- task->proto_ver = 12;
+ else {
+ debug_task ("wrong header: %s", headern);
+ res = FALSE;
}
- else if (strncmp (token, RSPAMC_PROTO_1_3, sizeof (RSPAMC_PROTO_1_3) - 1) == 0) {
- task->proto_ver = 13;
+ break;
+ case 'h':
+ case 'H':
+ if (g_ascii_strcasecmp (headern, HELO_HEADER) == 0) {
+ task->helo = h->value->str;
+ debug_task ("read helo header, value: %s", task->helo);
}
- }
- }
- else if (g_ascii_strncasecmp (line->begin, SPAMC_GREETING, sizeof (SPAMC_GREETING) - 1) == 0) {
- task->proto = SPAMC_PROTO;
- task->proto_ver = 12;
- }
- else {
- return FALSE;
- }
-
- task->state = READ_HEADER;
-
- return TRUE;
-}
-
-static gboolean
-parse_http_command (struct worker_task *task, f_str_t * line)
-{
- guint8 *p, *end, *c;
- gint state = 0, next_state = 0;
- gchar *cmd;
-
- p = line->begin;
- c = p;
- end = p + line->len;
- task->proto = RSPAMC_PROTO;
-
- while (p < end) {
- switch (state) {
- case 0:
- /* Expect GET or POST here */
- if ((end - p > 3 &&
- (*p == 'G' || *p == 'g') &&
- (p[1] == 'E' || p[1] == 'e') &&
- (p[2] == 'T' || p[2] == 't')) ||
- (end - p > 4 &&
- (*p == 'P' || *p == 'p') &&
- (p[1] == 'O' || p[1] == 'o') &&
- (p[2] == 'S' || p[2] == 's') &&
- (p[3] == 'T' || p[3] == 't'))) {
- state = 99;
- next_state = 1;
- p += (*p == 'g' || *p == 'G') ? 3 : 4;
+ else if (g_ascii_strcasecmp (headern, HOSTNAME_HEADER) == 0) {
+ task->hostname = h->value->str;
+ debug_task ("read hostname header, value: %s", task->hostname);
}
else {
- msg_info ("invalid HTTP request: %V", line);
- return FALSE;
+ debug_task ("wrong header: %s", headern);
+ res = FALSE;
}
break;
- case 1:
- /* Get command or path */
- if (!g_ascii_isspace (*p)) {
- p ++;
+ case 'f':
+ case 'F':
+ if (g_ascii_strcasecmp (headern, FROM_HEADER) == 0) {
+ task->from = rspamd_protocol_escape_braces (h->value);
+ debug_task ("read from header, value: %s", task->from);
}
else {
- /* Copy command */
- cmd = memory_pool_alloc (task->task_pool, p - c + 1);
- rspamd_strlcpy (cmd, c, p - c + 1);
- /* Skip the first '/' */
- if (*cmd == '/') {
- cmd ++;
- }
- if (!parse_check_command (task, cmd)) {
- /* Assume that command is symbols */
- task->cmd = CMD_SYMBOLS;
- }
- state = 99;
- next_state = 2;
+ debug_task ("wrong header: %s", headern);
+ res = FALSE;
}
break;
- case 2:
- /* Get HTTP/1.0 or HTTP/1.1 */
- if (p == end - 1) {
- /* We are at the end */
- if (g_ascii_strncasecmp (c, "HTTP/1.0", sizeof ("HTTP/1.0") - 1) == 0 ||
- g_ascii_strncasecmp (c, "HTTP/1.1", sizeof ("HTTP/1.1") - 1) == 0) {
- task->state = READ_HEADER;
- return TRUE;
- }
+ case 'j':
+ case 'J':
+ if (g_ascii_strcasecmp (headern, JSON_HEADER) == 0) {
+ task->is_json = parse_flag (h->value->str);
}
else {
- p ++;
+ debug_task ("wrong header: %s", headern);
+ res = FALSE;
}
break;
- case 99:
- /* Skip spaces */
- if (g_ascii_isspace (*p)) {
- p ++;
+ case 'q':
+ case 'Q':
+ if (g_ascii_strcasecmp (headern, QUEUE_ID_HEADER) == 0) {
+ task->queue_id = h->value->str;
+ debug_task ("read queue_id header, value: %s", task->queue_id);
}
else {
- state = next_state;
- c = p;
+ debug_task ("wrong header: %s", headern);
+ res = FALSE;
}
break;
- }
- }
-
- return FALSE;
-}
-
-static gboolean
-parse_command (struct worker_task *task, f_str_t * line)
-{
- task->proto_ver = 11;
-
- if (! task->is_http) {
- return parse_rspamc_command (task, line);
- }
- else {
- return parse_http_command (task, line);
- }
-
- /* Unreached */
- return FALSE;
-}
-
-static gboolean
-parse_header (struct worker_task *task, f_str_t * line)
-{
- gchar *headern, *err, *tmp;
- gboolean res = TRUE;
-
- /* Check end of headers */
- if (line->len == 0) {
- debug_task ("got empty line, assume it as end of headers");
- if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) {
- task->state = WRITE_REPLY;
- }
- else {
- if (task->content_length > 0) {
- if (task->cmd == CMD_LEARN) {
- if (task->statfile != NULL) {
- rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_CHARACTER, task->content_length);
- task->state = READ_MESSAGE;
- }
- else {
- task->last_error = "Unknown statfile";
- task->error_code = RSPAMD_STATFILE_ERROR;
- task->state = WRITE_ERROR;
- return FALSE;
- }
- }
- else {
- rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_CHARACTER, task->content_length);
- task->state = READ_MESSAGE;
- task->msg = memory_pool_alloc0 (task->task_pool, sizeof (f_str_t));
- }
+ case 'r':
+ case 'R':
+ if (g_ascii_strcasecmp (headern, RCPT_HEADER) == 0) {
+ tmp = rspamd_protocol_escape_braces (h->value);
+ task->rcpt = g_list_prepend (task->rcpt, tmp);
+ debug_task ("read rcpt header, value: %s", tmp);
}
- else if (task->cmd != CMD_LEARN && task->cmd != CMD_OTHER) {
- rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_ANY, 0);
- task->state = READ_MESSAGE;
- task->msg = memory_pool_alloc0 (task->task_pool, sizeof (f_str_t));
+ else if (g_ascii_strcasecmp (headern, NRCPT_HEADER) == 0) {
+ task->nrcpt = strtoul (h->value->str, &err, 10);
+ debug_task ("read rcpt header, value: %d", (gint)task->nrcpt);
}
else {
- task->last_error = "Unknown content length";
- task->error_code = RSPAMD_LENGTH_ERROR;
- task->state = WRITE_ERROR;
- return FALSE;
+ msg_info ("wrong header: %s", headern);
+ res = FALSE;
}
- }
- return TRUE;
- }
-
- headern = separate_command (line, ':');
-
- if (line == NULL || headern == NULL) {
- return FALSE;
- }
- /* Eat whitespaces */
- g_strstrip (headern);
- fstrstrip (line);
-
- switch (headern[0]) {
- case 'c':
- case 'C':
- /* content-length */
- if (g_ascii_strncasecmp (headern, CONTENT_LENGTH_HEADER, sizeof (CONTENT_LENGTH_HEADER) - 1) == 0) {
- if (task->content_length == 0) {
- tmp = memory_pool_fstrdup (task->task_pool, line);
- task->content_length = strtoul (tmp, &err, 10);
- debug_task ("read Content-Length header, value: %ul", (guint32)task->content_length);
- }
- }
- else {
- msg_info ("wrong header: %s", headern);
- res = FALSE;
- }
- break;
- case 'd':
- case 'D':
- /* Deliver-To */
- if (g_ascii_strncasecmp (headern, DELIVER_TO_HEADER, sizeof (DELIVER_TO_HEADER) - 1) == 0) {
- task->deliver_to = escape_braces_addr_fstr (task->task_pool, line);
- debug_task ("read deliver-to header, value: %s", task->deliver_to);
- }
- else {
- msg_info ("wrong header: %s", headern);
- res = FALSE;
- }
- break;
- case 'h':
- case 'H':
- /* helo */
- if (g_ascii_strncasecmp (headern, HELO_HEADER, sizeof (HELO_HEADER) - 1) == 0) {
- task->helo = memory_pool_fstrdup (task->task_pool, line);
- debug_task ("read helo header, value: %s", task->helo);
- }
- else if (g_ascii_strncasecmp (headern, HOSTNAME_HEADER, sizeof (HOSTNAME_HEADER) - 1) == 0) {
- task->hostname = memory_pool_fstrdup (task->task_pool, line);
- debug_task ("read hostname header, value: %s", task->hostname);
- }
- else {
- msg_info ("wrong header: %s", headern);
- res = FALSE;
- }
- break;
- case 'f':
- case 'F':
- /* from */
- if (g_ascii_strncasecmp (headern, FROM_HEADER, sizeof (FROM_HEADER) - 1) == 0) {
- task->from = escape_braces_addr_fstr (task->task_pool, line);
- debug_task ("read from header, value: %s", task->from);
- }
- else {
- msg_info ("wrong header: %s", headern);
- res = FALSE;
- }
- break;
- case 'j':
- case 'J':
- /* json */
- if (g_ascii_strncasecmp (headern, JSON_HEADER, sizeof (JSON_HEADER) - 1) == 0) {
- task->is_json = parse_flag (memory_pool_fstrdup (task->task_pool, line));
- }
- else {
- msg_info ("wrong header: %s", headern);
- res = FALSE;
- }
- break;
- case 'q':
- case 'Q':
- /* Queue id */
- if (g_ascii_strncasecmp (headern, QUEUE_ID_HEADER, sizeof (QUEUE_ID_HEADER) - 1) == 0) {
- task->queue_id = memory_pool_fstrdup (task->task_pool, line);
- debug_task ("read queue_id header, value: %s", task->queue_id);
- }
- else {
- msg_info ("wrong header: %s", headern);
- res = FALSE;
- }
- break;
- case 'r':
- case 'R':
- /* rcpt */
- if (g_ascii_strncasecmp (headern, RCPT_HEADER, sizeof (RCPT_HEADER) - 1) == 0) {
- tmp = escape_braces_addr_fstr (task->task_pool, line);
- task->rcpt = g_list_prepend (task->rcpt, tmp);
- debug_task ("read rcpt header, value: %s", tmp);
- }
- else if (g_ascii_strncasecmp (headern, NRCPT_HEADER, sizeof (NRCPT_HEADER) - 1) == 0) {
- tmp = memory_pool_fstrdup (task->task_pool, line);
- task->nrcpt = strtoul (tmp, &err, 10);
- debug_task ("read rcpt header, value: %d", (gint)task->nrcpt);
- }
- else {
- msg_info ("wrong header: %s", headern);
- res = FALSE;
- }
- break;
- case 'i':
- case 'I':
- /* ip_addr */
- if (g_ascii_strncasecmp (headern, IP_ADDR_HEADER, sizeof (IP_ADDR_HEADER) - 1) == 0) {
- tmp = memory_pool_fstrdup (task->task_pool, line);
+ break;
+ case 'i':
+ case 'I':
+ if (g_ascii_strcasecmp (headern, IP_ADDR_HEADER) == 0) {
+ tmp = h->value->str;
#ifdef HAVE_INET_PTON
- if (g_ascii_strncasecmp (tmp, "IPv6:", 5) == 0) {
- if (inet_pton (AF_INET6, tmp + 6, &task->from_addr.d.in6) == 1) {
- task->from_addr.ipv6 = TRUE;
- }
- else {
- msg_err ("bad ip header: '%s'", tmp);
- return FALSE;
- }
- task->from_addr.has_addr = TRUE;
- }
- else {
- if (inet_pton (AF_INET, tmp, &task->from_addr.d.in4) != 1) {
- /* Try ipv6 */
- if (inet_pton (AF_INET6, tmp, &task->from_addr.d.in6) == 1) {
+ if (g_ascii_strncasecmp (tmp, "IPv6:", 5) == 0) {
+ if (inet_pton (AF_INET6, tmp + 6, &task->from_addr.d.in6) == 1) {
task->from_addr.ipv6 = TRUE;
}
else {
msg_err ("bad ip header: '%s'", tmp);
return FALSE;
}
+ task->from_addr.has_addr = TRUE;
}
else {
- task->from_addr.ipv6 = FALSE;
+ if (inet_pton (AF_INET, tmp, &task->from_addr.d.in4) != 1) {
+ /* Try ipv6 */
+ if (inet_pton (AF_INET6, tmp, &task->from_addr.d.in6) == 1) {
+ task->from_addr.ipv6 = TRUE;
+ }
+ else {
+ msg_err ("bad ip header: '%s'", tmp);
+ return FALSE;
+ }
+ }
+ else {
+ task->from_addr.ipv6 = FALSE;
+ }
+ task->from_addr.has_addr = TRUE;
}
- task->from_addr.has_addr = TRUE;
- }
#else
- if (!inet_aton (tmp, &task->from_addr)) {
- msg_err ("bad ip header: '%s'", tmp);
- return FALSE;
- }
+ if (!inet_aton (tmp, &task->from_addr)) {
+ msg_err ("bad ip header: '%s'", tmp);
+ return FALSE;
+ }
#endif
- debug_task ("read IP header, value: %s", tmp);
- }
- else {
- msg_info ("wrong header: %s", headern);
- res = FALSE;
- }
- break;
- case 'p':
- case 'P':
- /* Pass header */
- if (g_ascii_strncasecmp (headern, PASS_HEADER, sizeof (PASS_HEADER) - 1) == 0) {
- if (line->len == sizeof ("all") - 1 && g_ascii_strncasecmp (line->begin, "all", sizeof ("all") - 1) == 0) {
- task->pass_all_filters = TRUE;
- msg_info ("pass all filters");
- }
- }
- else {
- res = FALSE;
- }
- break;
- case 's':
- case 'S':
- if (g_ascii_strncasecmp (headern, SUBJECT_HEADER, sizeof (SUBJECT_HEADER) - 1) == 0) {
- task->subject = memory_pool_fstrdup (task->task_pool, line);
- }
- else if (g_ascii_strncasecmp (headern, STATFILE_HEADER, sizeof (STATFILE_HEADER) - 1) == 0) {
- task->statfile = memory_pool_fstrdup (task->task_pool, line);
- }
- else {
- res = FALSE;
- }
- break;
- case 'u':
- case 'U':
- if (g_ascii_strncasecmp (headern, USER_HEADER, sizeof (USER_HEADER) - 1) == 0) {
- task->user = memory_pool_fstrdup (task->task_pool, line);
- }
- else {
+ debug_task ("read IP header, value: %s", tmp);
+ }
+ else {
+ debug_task ("wrong header: %s", headern);
+ res = FALSE;
+ }
+ break;
+ case 'p':
+ case 'P':
+ if (g_ascii_strcasecmp (headern, PASS_HEADER) == 0) {
+ if (h->value->len == sizeof ("all") - 1 &&
+ g_ascii_strcasecmp (h->value->str, "all") == 0) {
+ task->pass_all_filters = TRUE;
+ debug_task ("pass all filters");
+ }
+ }
+ else {
+ res = FALSE;
+ }
+ break;
+ case 's':
+ case 'S':
+ if (g_ascii_strcasecmp (headern, SUBJECT_HEADER) == 0) {
+ task->subject = h->value->str;
+ }
+ else {
+ res = FALSE;
+ }
+ break;
+ case 'u':
+ case 'U':
+ if (g_ascii_strcasecmp (headern, USER_HEADER) == 0) {
+ task->user = h->value->str;
+ }
+ else {
+ res = FALSE;
+ }
+ break;
+ default:
+ debug_task ("wrong header: %s", headern);
res = FALSE;
+ break;
}
- break;
- default:
- msg_info ("wrong header: %s", headern);
- res = FALSE;
- break;
}
if (!res && task->cfg->strict_protocol_headers) {
msg_err ("deny processing of a request with incorrect or unknown headers");
+ task->last_error = "invalid header";
+ task->error_code = 400;
return FALSE;
}
}
gboolean
-read_rspamd_input_line (struct worker_task *task, f_str_t * line)
+rspamd_protocol_handle_request (struct worker_task *task,
+ struct rspamd_http_message *msg)
{
- switch (task->state) {
- case READ_COMMAND:
- return parse_command (task, line);
- break;
- case READ_HEADER:
- return parse_header (task, line);
- break;
- default:
- return FALSE;
+ if (rspamd_protocol_handle_url (task, msg)) {
+ return rspamd_protocol_handle_headers (task, msg);
}
+
return FALSE;
}
if (logbuf->str[logbuf->len - 1] == ',') {
logbuf->len --;
}
- rspamd_printf_gstring (logbuf, "]), ");
+
+#ifdef HAVE_CLOCK_GETTIME
+ rspamd_printf_gstring (logbuf, "]), len: %z, time: %s, dns req: %d,",
+ task->msg->len, calculate_check_time (&task->tv, &task->ts,
+ task->cfg->clock_res, &task->scan_milliseconds), task->dns_requests);
+#else
+ rspamd_printf_gstring (logbuf, "]), len: %z, time: %s, dns req: %d,",
+ task->msg->len,
+ calculate_check_time (&task->tv, task->cfg->clock_res, &task->scan_milliseconds),
+ task->dns_requests);
+#endif
return obj;
}
write_hashes_to_log (task, logbuf);
msg_info ("%v", logbuf);
+ g_string_free (logbuf, TRUE);
msg->body = g_string_sized_new (BUFSIZ);
func.ud = msg->body;
ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT, &func);
+ ucl_object_unref (top);
/* Increase counters */
task->worker->srv->stat->messages_scanned++;
}
gboolean
-write_reply (struct worker_task *task)
+rspamd_protocol_write_reply (struct worker_task *task)
{
struct rspamd_http_message *msg;
"text/plain", task, task->sock, &task->tv, task->ev_base);
task->state = CLOSING_CONNECTION;
break;
- case CMD_LEARN:
- msg->code = task->error_code;
- rspamd_http_connection_write_message (task->http_conn, msg, NULL,
- "text/plain", task, task->sock, &task->tv, task->ev_base);
- task->state = CLOSING_CONNECTION;
- return TRUE;
- break;
case CMD_OTHER:
task->state = CLOSING_CONNECTION;
return task->custom_cmd->func (task);
struct event_base *ev_base;
};
-static gboolean write_socket (void *arg);
-
static sig_atomic_t wanna_die = 0;
#ifndef HAVE_SA_SIGINFO
return;
}
-# if 0
-/*
- * Callback that is called when there is data to read in buffer
- */
-static gboolean
-read_socket (f_str_t * in, void *arg)
-{
- struct worker_task *task = (struct worker_task *) arg;
- struct rspamd_worker_ctx *ctx;
- ssize_t r;
- GError *err = NULL;
-
- ctx = task->worker->ctx;
- switch (task->state) {
- case READ_COMMAND:
- case READ_HEADER:
- if (!read_rspamd_input_line (task, in)) {
- if (!task->last_error) {
- task->last_error = "Read error";
- task->error_code = RSPAMD_NETWORK_ERROR;
- }
- task->state = WRITE_ERROR;
- }
- if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) {
- return write_socket (task);
- }
- break;
- case READ_MESSAGE:
- /* Allow half-closed connections to be proceed */
-
- debug_task ("got string of length %z", task->msg->len);
- if (task->content_length > 0) {
- task->msg->begin = in->begin;
- task->msg->len = in->len;
- task->state = WAIT_FILTER;
- task->dispatcher->want_read = FALSE;
- }
- else {
- task->dispatcher->want_read = FALSE;
- if (in->len > 0) {
- if (task->msg->begin == NULL) {
- /* Allocate buf */
- task->msg->size = MAX (BUFSIZ, in->len);
- task->msg->begin = g_malloc (task->msg->size);
- memcpy (task->msg->begin, in->begin, in->len);
- task->msg->len = in->len;
- }
- else if (task->msg->size >= task->msg->len + in->len) {
- memcpy (task->msg->begin + task->msg->len, in->begin, in->len);
- task->msg->len += in->len;
- }
- else {
- /* Need to realloc */
- task->msg->size = MAX (task->msg->size * 2, task->msg->size + in->len);
- task->msg->begin = g_realloc (task->msg->begin, task->msg->size);
- memcpy (task->msg->begin + task->msg->len, in->begin, in->len);
- task->msg->len += in->len;
- }
- /* Want more */
- return TRUE;
- }
- else if (task->msg->len > 0) {
- memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_free, task->msg->begin);
- }
- else {
- msg_warn ("empty message passed");
- task->last_error = "MIME processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
- return write_socket (task);
- }
- }
-
- r = process_message (task);
- if (r == -1) {
- msg_warn ("processing of message failed");
- task->last_error = "MIME processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
- return write_socket (task);
- }
- if (task->cmd == CMD_OTHER) {
- /* Skip filters */
- task->state = WRITE_REPLY;
- return write_socket (task);
- }
- else if (task->cmd == CMD_LEARN) {
- if (!learn_task (task->statfile, task, &err)) {
- task->last_error = memory_pool_strdup (task->task_pool, err->message);
- task->error_code = err->code;
- g_error_free (err);
- task->state = WRITE_ERROR;
- }
- else {
- task->last_error = "learn ok";
- task->error_code = 0;
- task->state = WRITE_REPLY;
- }
- return write_socket (task);
- }
- else {
- if (task->cfg->pre_filters == NULL) {
- r = process_filters (task);
- if (r == -1) {
- task->last_error = "Filter processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
- return write_socket (task);
- }
- /* Add task to classify to classify pool */
- if (!task->is_skipped && ctx->classify_pool) {
- register_async_thread (task->s);
- g_thread_pool_push (ctx->classify_pool, task, &err);
- if (err != NULL) {
- msg_err ("cannot pull task to the pool: %s", err->message);
- remove_async_thread (task->s);
- }
- }
- if (task->is_skipped) {
- /* Call write_socket to write reply and exit */
- return write_socket (task);
- }
- }
- else {
- lua_call_pre_filters (task);
- /* We want fin_task after pre filters are processed */
- task->s->wanna_die = TRUE;
- task->state = WAIT_PRE_FILTER;
- check_session_pending (task->s);
- }
- }
- break;
- case WRITE_REPLY:
- case WRITE_ERROR:
- return write_socket (task);
- break;
- case WAIT_FILTER:
- case WAIT_POST_FILTER:
- case WAIT_PRE_FILTER:
- msg_info ("ignoring trailing garbadge of size %z", in->len);
- break;
- default:
- debug_task ("invalid state on reading stage");
- break;
- }
-
- return TRUE;
-}
-
-/*
- * Callback for socket writing
- */
-static gboolean
-write_socket (void *arg)
-{
- struct worker_task *task = (struct worker_task *) arg;
- struct rspamd_worker_ctx *ctx;
- GError *err = NULL;
- gint r;
-
- ctx = task->worker->ctx;
-
- switch (task->state) {
- case WRITE_REPLY:
- task->state = WRITING_REPLY;
- if (!write_reply (task)) {
- return FALSE;
- }
- destroy_session (task->s);
- return FALSE;
- break;
- case WRITE_ERROR:
- task->state = WRITING_REPLY;
- if (!write_reply (task)) {
- return FALSE;
- }
- destroy_session (task->s);
- return FALSE;
- break;
- case CLOSING_CONNECTION:
- debug_task ("normally closing connection");
- destroy_session (task->s);
- return FALSE;
- break;
- case WRITING_REPLY:
- case WAIT_FILTER:
- case WAIT_POST_FILTER:
- /* Do nothing here */
- break;
- case WAIT_PRE_FILTER:
- task->state = WAIT_FILTER;
- r = process_filters (task);
- if (r == -1) {
- task->last_error = "Filter processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
- return write_socket (task);
- }
- /* Add task to classify to classify pool */
- if (!task->is_skipped && ctx->classify_pool) {
- register_async_thread (task->s);
- g_thread_pool_push (ctx->classify_pool, task, &err);
- if (err != NULL) {
- msg_err ("cannot pull task to the pool: %s", err->message);
- remove_async_thread (task->s);
- }
- }
- if (task->is_skipped) {
- /* Call write_socket again to write reply and exit */
- return write_socket (task);
- }
- break;
- default:
- msg_info ("abnormally closing connection at state: %d", task->state);
- destroy_session (task->s);
- return FALSE;
- break;
- }
- return TRUE;
-}
-
-/*
- * Called if something goes wrong
- */
-static void
-err_socket (GError * err, void *arg)
-{
- struct worker_task *task = (struct worker_task *) arg;
-
- msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message);
- /* Free buffers */
- g_error_free (err);
- destroy_session (task->s);
-}
-#endif
-
/*
* Called if all filters are processed
*/
task->fin_callback (task->fin_arg);
}
else {
- write_reply (task);
+ rspamd_protocol_write_reply (task);
}
return TRUE;
}
task->fin_callback (task->fin_arg);
}
else {
- write_reply (task);
+ rspamd_protocol_write_reply (task);
}
}
else {
task->fin_callback (task->fin_arg);
}
else {
- write_reply (task);
+ rspamd_protocol_write_reply (task);
}
}
else {
task->last_error = "Filter processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
- write_reply (task);
+ rspamd_protocol_write_reply (task);
}
/* Add task to classify to classify pool */
if (!task->is_skipped && ctx->classify_pool) {
}
}
if (task->is_skipped) {
- write_reply (task);
+ rspamd_protocol_write_reply (task);
}
}
}
(*tasks) --;
}
-static gboolean
+static gint
rspamd_worker_body_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg,
const gchar *chunk, gsize len)
if (msg->body->len == 0) {
msg_err ("got zero length body, cannot continue");
- return FALSE;
+ return 0;
+ }
+
+ if (!rspamd_protocol_handle_request (task, msg)) {
+ task->state = WRITE_ERROR;
+ return 0;
}
task->msg = msg->body;
task->last_error = "MIME processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
- return FALSE;
+ return 0;
}
if (task->cmd == CMD_OTHER) {
/* Skip filters */
task->state = WRITE_REPLY;
- return FALSE;
- }
- else if (task->cmd == CMD_LEARN) {
- if (!learn_task (task->statfile, task, &err)) {
- task->last_error = memory_pool_strdup (task->task_pool, err->message);
- task->error_code = err->code;
- g_error_free (err);
- task->state = WRITE_ERROR;
- }
- else {
- task->last_error = "learn ok";
- task->error_code = 0;
- task->state = WRITE_REPLY;
- }
- return FALSE;
+ return 0;
}
else {
if (task->cfg->pre_filters == NULL) {
task->last_error = "Filter processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
- return FALSE;
+ return 0;
}
/* Add task to classify to classify pool */
if (!task->is_skipped && ctx->classify_pool) {
}
if (task->is_skipped) {
/* Call write_socket to write reply and exit */
- return TRUE;
+ task->state = WRITE_REPLY;
+ return 0;
}
}
else {
check_session_pending (task->s);
}
}
- return TRUE;
+ return 0;
}
static void
msg_debug ("normally closing connection from: %s", inet_ntoa (task->client_addr));
destroy_session (task->s);
}
+ else {
+ check_session_pending (task->s);
+ }
}
/*
new_task->sock = nfd;
new_task->is_mime = ctx->is_mime;
new_task->is_json = ctx->is_json;
- new_task->is_http = ctx->is_http;
new_task->allow_learn = ctx->allow_learn;
worker->srv->stat->connections_count++;
event_base_loop (ctx->ev_base, 0);
-
+ g_mime_shutdown ();
close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);
}