aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/controller.c78
-rw-r--r--src/lmtp.c8
-rw-r--r--src/main.h23
-rw-r--r--src/plugins/fuzzy_check.c136
-rw-r--r--src/worker.c107
5 files changed, 289 insertions, 63 deletions
diff --git a/src/controller.c b/src/controller.c
index 87e7f1b0f..a4c0a7bb4 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -53,22 +53,31 @@ enum command_type {
struct controller_command {
char *command;
- int privilleged;
+ gboolean privilleged;
enum command_type type;
};
+struct custom_controller_command {
+ const char *command;
+ gboolean privilleged;
+ gboolean require_message;
+ controller_func_t handler;
+};
+
static struct controller_command commands[] = {
- {"password", 0, COMMAND_PASSWORD},
- {"quit", 0, COMMAND_QUIT},
- {"reload", 1, COMMAND_RELOAD},
- {"stat", 0, COMMAND_STAT},
- {"shutdown", 1, COMMAND_SHUTDOWN},
- {"uptime", 0, COMMAND_UPTIME},
- {"learn", 1, COMMAND_LEARN},
- {"help", 0, COMMAND_HELP},
- {"counters", 0, COMMAND_COUNTERS},
+ {"password", FALSE, COMMAND_PASSWORD},
+ {"quit", FALSE, COMMAND_QUIT},
+ {"reload", TRUE, COMMAND_RELOAD},
+ {"stat", FALSE, COMMAND_STAT},
+ {"shutdown", TRUE, COMMAND_SHUTDOWN},
+ {"uptime", FALSE, COMMAND_UPTIME},
+ {"learn", TRUE, COMMAND_LEARN},
+ {"help", FALSE, COMMAND_HELP},
+ {"counters", FALSE, COMMAND_COUNTERS},
};
+static GList *custom_commands = NULL;
+
static GCompletion *comp;
static time_t start_time;
@@ -395,6 +404,26 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
}
}
+static gboolean
+process_custom_command (char *line, char **cmd_args, struct controller_session *session)
+{
+ GList *cur;
+ struct custom_controller_command *cmd;
+
+ cur = custom_commands;
+ while (cur) {
+ cmd = cur->data;
+ if (g_ascii_strcasecmp (cmd->command, line) == 0) {
+ /* Call handler */
+ cmd->handler (cmd_args, session);
+ return TRUE;
+ }
+ cur = g_list_next (cur);
+ }
+
+ return FALSE;
+}
+
static void
controller_read_socket (f_str_t *in, void *arg)
{
@@ -421,9 +450,11 @@ controller_read_socket (f_str_t *in, void *arg)
process_command ((struct controller_command *)comp_list->data, &params[1], session);
break;
case 0:
- msg_debug ("Unknown command: '%s'", cmd);
- i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
- rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
+ if (!process_custom_command (cmd, &params[1], session)) {
+ msg_debug ("Unknown command: '%s'", cmd);
+ i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
+ }
break;
default:
msg_debug ("Ambigious command: '%s'", cmd);
@@ -472,6 +503,15 @@ controller_read_socket (f_str_t *in, void *arg)
session->state = STATE_REPLY;
break;
+ case STATE_OTHER:
+ if (session->other_handler) {
+ session->other_handler (session, in);
+ }
+ session->state = STATE_REPLY;
+ break;
+ case STATE_WAIT:
+ rspamd_dispatcher_pause (session->dispatcher);
+ break;
default:
msg_debug ("controller_read_socket: unknown state while reading %d", session->state);
break;
@@ -595,6 +635,18 @@ start_controller (struct rspamd_worker *worker)
exit (EXIT_SUCCESS);
}
+void
+register_custom_controller_command (const char *name, controller_func_t handler, gboolean privilleged, gboolean require_message)
+{
+ struct custom_controller_command *cmd;
+
+ cmd->command = name;
+ cmd->handler = handler;
+ cmd->privilleged = privilleged;
+ cmd->require_message = require_message;
+
+ custom_commands = g_list_prepend (custom_commands, cmd);
+}
/*
* vi:ts=4
diff --git a/src/lmtp.c b/src/lmtp.c
index d1928cf5c..6554ba4f4 100644
--- a/src/lmtp.c
+++ b/src/lmtp.c
@@ -85,13 +85,13 @@ rcpt_destruct (void *pointer)
* Free all structures of lmtp proto
*/
static void
-free_task (struct rspamd_lmtp_proto *lmtp, gboolean is_soft)
+free_lmtp_task (struct rspamd_lmtp_proto *lmtp, gboolean is_soft)
{
GList *part;
struct mime_part *p;
if (lmtp) {
- msg_debug ("free_task: free pointer %p", lmtp->task);
+ msg_debug ("free_lmtp_task: free pointer %p", lmtp->task);
if (lmtp->task->memc_ctx) {
memc_close_ctx (lmtp->task->memc_ctx);
}
@@ -188,7 +188,7 @@ lmtp_write_socket (void *arg)
break;
case CLOSING_CONNECTION:
msg_debug ("lmtp_write_socket: normally closing connection");
- free_task (lmtp, TRUE);
+ free_lmtp_task (lmtp, TRUE);
break;
default:
msg_debug ("lmtp_write_socket: invalid state while writing to socket %d", lmtp->task->state);
@@ -205,7 +205,7 @@ lmtp_err_socket (GError *err, void *arg)
struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
msg_info ("lmtp_err_socket: abnormally closing connection, error: %s", err->message);
/* Free buffers */
- free_task (lmtp, FALSE);
+ free_lmtp_task (lmtp, FALSE);
}
/*
diff --git a/src/main.h b/src/main.h
index e87b4efde..ee34a2cae 100644
--- a/src/main.h
+++ b/src/main.h
@@ -117,6 +117,8 @@ struct save_point {
unsigned int saved; /**< how much time we have delayed processing */
};
+
+
/**
* Control session object
*/
@@ -127,6 +129,8 @@ struct controller_session {
STATE_LEARN,
STATE_REPLY,
STATE_QUIT,
+ STATE_OTHER,
+ STATE_WAIT,
} state; /**< current session state */
int sock; /**< socket descriptor */
/* Access to authorized commands */
@@ -142,8 +146,13 @@ struct controller_session {
f_str_t *learn_buf; /**< learn input */
GList *parts; /**< extracted mime parts */
int in_class; /**< positive or negative learn */
+ void (*other_handler)(struct controller_session *session,
+ f_str_t *in); /**< other command handler to execute at the end of processing */
+ void *other_data; /**< and its data */
};
+typedef void (*controller_func_t)(char **args, struct controller_session *session);
+
/**
* Worker task structure
*/
@@ -214,6 +223,20 @@ void start_worker (struct rspamd_worker *worker);
void start_controller (struct rspamd_worker *worker);
/**
+ * Register custom controller function
+ */
+void register_custom_controller_command (const char *name, controller_func_t handler, gboolean privilleged, gboolean require_message);
+
+/**
+ * Construct new task for worker
+ */
+struct worker_task* construct_task (struct rspamd_worker *worker);
+/**
+ * Destroy task object and remove its IO dispatcher if it exists
+ */
+void free_task (struct worker_task *task, gboolean is_soft);
+
+/**
* If set, reopen log file on next write
*/
extern sig_atomic_t do_reopen_log;
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index 085ee5164..e4fdaba93 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -68,6 +68,14 @@ struct fuzzy_client_session {
struct worker_task *task;
};
+struct fuzzy_learn_session {
+ struct event ev;
+ fuzzy_hash_t *h;
+ struct timeval tv;
+ struct controller_session *session;
+ struct worker_task *task;
+};
+
static struct fuzzy_ctx *fuzzy_module_ctx = NULL;
static int fuzzy_mime_filter (struct worker_task *task);
@@ -91,6 +99,7 @@ parse_servers_string (char *str)
if ((p = strchr (strvec[i], ':')) != NULL) {
j = 0;
+ p ++;
while (g_ascii_isdigit (*p) && j < sizeof (portbuf) - 1) {
portbuf[j ++] = *p ++;
}
@@ -244,6 +253,51 @@ fuzzy_io_callback (int fd, short what, void *arg)
}
+static void
+fuzzy_learn_callback (int fd, short what, void *arg)
+{
+ struct fuzzy_learn_session *session = arg;
+ struct fuzzy_cmd cmd;
+ char buf[sizeof ("ERR")];
+
+ if (what == EV_WRITE) {
+ /* Send command to storage */
+ cmd.blocksize = session->h->block_size;
+ memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash));
+ cmd.cmd = FUZZY_WRITE;
+ if (write (fd, &cmd, sizeof (struct fuzzy_cmd)) == -1) {
+ goto err;
+ }
+ else {
+ event_set (&session->ev, fd, EV_READ, fuzzy_io_callback, session);
+ event_add (&session->ev, &session->tv);
+ }
+ }
+ else if (what == EV_READ) {
+ if (read (fd, buf, sizeof (buf)) == -1) {
+ goto err;
+ }
+ else if (buf[0] == 'O' && buf[1] == 'K') {
+ insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, 1, NULL);
+ }
+ goto ok;
+ }
+
+ return;
+
+ err:
+ msg_err ("fuzzy_io_callback: got error on IO, %d, %s", errno, strerror (errno));
+ ok:
+ event_del (&session->ev);
+ close (fd);
+ session->task->save.saved --;
+ if (session->task->save.saved == 0) {
+ /* Call other filters */
+ session->task->save.saved = 1;
+ process_filters (session->task);
+ }
+}
+
static void
fuzzy_symbol_callback (struct worker_task *task, void *unused)
{
@@ -282,6 +336,88 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused)
}
}
+static void
+fuzzy_process_handler (struct controller_session *session, f_str_t *in)
+{
+ struct worker_task *task;
+ struct fuzzy_learn_session *s;
+ struct mime_text_part *part;
+ struct storage_server *selected;
+ GList *cur;
+ int sock, r;
+
+ task = construct_task (session->worker);
+ session->other_data = task;
+ session->state = STATE_WAIT;
+
+ task->msg = in;
+ r = process_message (task);
+ if (r == -1) {
+ msg_warn ("read_socket: processing of message failed");
+ task->last_error = "MIME processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ }
+ else {
+ /* Plan new event for writing */
+ cur = task->text_parts;
+
+ while (cur) {
+ part = cur->data;
+ selected = (struct storage_server *)get_upstream_by_hash (fuzzy_module_ctx->servers, fuzzy_module_ctx->servers_num,
+ sizeof (struct storage_server), task->ts.tv_sec,
+ DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME,
+ DEFAULT_UPSTREAM_MAXERRORS,
+ part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
+ if (selected) {
+ if ((sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
+ msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
+ }
+ else {
+ s = memory_pool_alloc (session->session_pool, sizeof (struct fuzzy_learn_session));
+ event_set (&s->ev, sock, EV_WRITE, fuzzy_learn_callback, s);
+ s->tv.tv_sec = IO_TIMEOUT;
+ s->tv.tv_usec = 0;
+ s->task = task;
+ s->h = part->fuzzy;
+ s->session = session;
+ event_add (&s->ev, &s->tv);
+ }
+ }
+ cur = g_list_next (cur);
+ }
+ }
+
+}
+
+static void
+fuzzy_controller_handler (char **args, struct controller_session *session)
+{
+ char *arg, out_buf[BUFSIZ], *err_str;
+ uint32_t size;
+ int r;
+
+ arg = *args;
+ if (!arg || *arg == '\0') {
+ msg_info ("fuzzy_controller_handler: empty content length");
+ r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+ return;
+ }
+
+ size = strtoul (arg, &err_str, 10);
+ if (err_str && *err_str != '\0') {
+ msg_debug ("process_command: statfile size is invalid: %s", arg);
+ r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+ return;
+ }
+
+ session->state = STATE_OTHER;
+ rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size);
+ session->other_handler = fuzzy_process_handler;
+}
+
static int
fuzzy_mime_filter (struct worker_task *task)
{
diff --git a/src/worker.c b/src/worker.c
index 726f7fe00..4832c71fc 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -4,11 +4,11 @@
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
@@ -40,8 +40,8 @@
#include <evdns.h>
#ifndef WITHOUT_PERL
-#include <EXTERN.h> /* from the Perl distribution */
-#include <perl.h> /* from the Perl distribution */
+#include <EXTERN.h> /* from the Perl distribution */
+#include <perl.h> /* from the Perl distribution */
extern PerlInterpreter *perl_interpreter;
#endif
@@ -111,7 +111,7 @@ rcpt_destruct (void *pointer)
/*
* Free all structures of worker_task
*/
-static void
+void
free_task (struct worker_task *task, gboolean is_soft)
{
GList *part;
@@ -135,14 +135,18 @@ free_task (struct worker_task *task, gboolean is_soft)
g_list_free (task->urls);
}
memory_pool_delete (task->task_pool);
- if (is_soft) {
- /* Plan dispatcher shutdown */
- task->dispatcher->wanna_die = 1;
+ if (task->dispatcher) {
+ if (is_soft) {
+ /* Plan dispatcher shutdown */
+ task->dispatcher->wanna_die = 1;
+ }
+ else {
+ rspamd_remove_dispatcher (task->dispatcher);
+ }
}
- else {
- rspamd_remove_dispatcher (task->dispatcher);
+ if (task->sock != -1) {
+ close (task->sock);
}
- close (task->sock);
g_free (task);
}
}
@@ -174,13 +178,13 @@ read_socket (f_str_t *in, void *arg)
task->msg->len = in->len;
msg_debug ("read_socket: got string of length %ld", (long int)task->msg->len);
r = process_message (task);
- if (r == -1) {
- msg_warn ("read_socket: processing of message failed");
+ if (r == -1) {
+ msg_warn ("read_socket: processing of message failed");
task->last_error = "MIME processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
write_socket (task);
- }
+ }
if (task->cmd == CMD_OTHER) {
/* Skip filters */
task->state = WRITE_REPLY;
@@ -249,36 +253,10 @@ err_socket (GError *err, void *arg)
free_task (task, FALSE);
}
-/*
- * Accept new connection and construct task
- */
-static void
-accept_socket (int fd, short what, void *arg)
+struct worker_task *
+construct_task (struct rspamd_worker *worker)
{
- struct rspamd_worker *worker = (struct rspamd_worker *)arg;
- struct sockaddr_storage ss;
- struct sockaddr_in *sin;
struct worker_task *new_task;
- socklen_t addrlen = sizeof(ss);
- int nfd;
-
- if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
- msg_warn ("accept_socket: accept failed: %s", strerror (errno));
- return;
- }
- /* Check for EAGAIN */
- if (nfd == 0) {
- msg_debug ("accept_socket: cannot accept socket as it was already accepted by other worker");
- return;
- }
-
- if (ss.ss_family == AF_UNIX) {
- msg_info ("accept_socket: accepted connection from unix socket");
- }
- else if (ss.ss_family == AF_INET) {
- sin = (struct sockaddr_in *) &ss;
- msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
- }
new_task = g_malloc (sizeof (struct worker_task));
@@ -286,7 +264,6 @@ accept_socket (int fd, short what, void *arg)
bzero (new_task, sizeof (struct worker_task));
new_task->worker = worker;
new_task->state = READ_COMMAND;
- new_task->sock = nfd;
new_task->cfg = worker->srv->cfg;
new_task->from_addr.s_addr = INADDR_NONE;
new_task->view_checked = FALSE;
@@ -307,12 +284,50 @@ accept_socket (int fd, short what, void *arg)
new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal);
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->re_cache);
+ return new_task;
+}
+
+/*
+ * Accept new connection and construct task
+ */
+static void
+accept_socket (int fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct sockaddr_storage ss;
+ struct sockaddr_in *sin;
+ struct worker_task *new_task;
+ socklen_t addrlen = sizeof(ss);
+ int nfd;
+
+ if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
+ msg_warn ("accept_socket: accept failed: %s", strerror (errno));
+ return;
+ }
+ /* Check for EAGAIN */
+ if (nfd == 0) {
+ msg_debug ("accept_socket: cannot accept socket as it was already accepted by other worker");
+ return;
+ }
+
+ if (ss.ss_family == AF_UNIX) {
+ msg_info ("accept_socket: accepted connection from unix socket");
+ }
+ else if (ss.ss_family == AF_INET) {
+ sin = (struct sockaddr_in *) &ss;
+ msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
+ }
+
+ new_task = construct_task (worker);
+
+ new_task->sock = nfd;
worker->srv->stat->connections_count ++;
/* Set up dispatcher */
new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket,
write_socket, err_socket, &io_tv,
(void *)new_task);
+
}
/*