summaryrefslogtreecommitdiffstats
path: root/src/plugins
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-07-29 20:17:29 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-07-29 20:17:29 +0400
commitc4621d35aebaf9c6d466bf7a15a9de340b20b0ce (patch)
tree75553d38928ccbb2b029bb7646b67ab8b0afac8f /src/plugins
parenteb9facb84cad4017db7ac877b5a0446472756308 (diff)
downloadrspamd-c4621d35aebaf9c6d466bf7a15a9de340b20b0ce.tar.gz
rspamd-c4621d35aebaf9c6d466bf7a15a9de340b20b0ce.zip
* Add support for extending controller protocol by modules
* Add write support via controller to fuzzy storage TODO: Add delete and check commands support to controller interface
Diffstat (limited to 'src/plugins')
-rw-r--r--src/plugins/fuzzy_check.c136
1 files changed, 136 insertions, 0 deletions
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index 085ee5164..e4fdaba93 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -68,6 +68,14 @@ struct fuzzy_client_session {
struct worker_task *task;
};
+struct fuzzy_learn_session {
+ struct event ev;
+ fuzzy_hash_t *h;
+ struct timeval tv;
+ struct controller_session *session;
+ struct worker_task *task;
+};
+
static struct fuzzy_ctx *fuzzy_module_ctx = NULL;
static int fuzzy_mime_filter (struct worker_task *task);
@@ -91,6 +99,7 @@ parse_servers_string (char *str)
if ((p = strchr (strvec[i], ':')) != NULL) {
j = 0;
+ p ++;
while (g_ascii_isdigit (*p) && j < sizeof (portbuf) - 1) {
portbuf[j ++] = *p ++;
}
@@ -244,6 +253,51 @@ fuzzy_io_callback (int fd, short what, void *arg)
}
+static void
+fuzzy_learn_callback (int fd, short what, void *arg)
+{
+ struct fuzzy_learn_session *session = arg;
+ struct fuzzy_cmd cmd;
+ char buf[sizeof ("ERR")];
+
+ if (what == EV_WRITE) {
+ /* Send command to storage */
+ cmd.blocksize = session->h->block_size;
+ memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash));
+ cmd.cmd = FUZZY_WRITE;
+ if (write (fd, &cmd, sizeof (struct fuzzy_cmd)) == -1) {
+ goto err;
+ }
+ else {
+ event_set (&session->ev, fd, EV_READ, fuzzy_io_callback, session);
+ event_add (&session->ev, &session->tv);
+ }
+ }
+ else if (what == EV_READ) {
+ if (read (fd, buf, sizeof (buf)) == -1) {
+ goto err;
+ }
+ else if (buf[0] == 'O' && buf[1] == 'K') {
+ insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, 1, NULL);
+ }
+ goto ok;
+ }
+
+ return;
+
+ err:
+ msg_err ("fuzzy_io_callback: got error on IO, %d, %s", errno, strerror (errno));
+ ok:
+ event_del (&session->ev);
+ close (fd);
+ session->task->save.saved --;
+ if (session->task->save.saved == 0) {
+ /* Call other filters */
+ session->task->save.saved = 1;
+ process_filters (session->task);
+ }
+}
+
static void
fuzzy_symbol_callback (struct worker_task *task, void *unused)
{
@@ -282,6 +336,88 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused)
}
}
+static void
+fuzzy_process_handler (struct controller_session *session, f_str_t *in)
+{
+ struct worker_task *task;
+ struct fuzzy_learn_session *s;
+ struct mime_text_part *part;
+ struct storage_server *selected;
+ GList *cur;
+ int sock, r;
+
+ task = construct_task (session->worker);
+ session->other_data = task;
+ session->state = STATE_WAIT;
+
+ task->msg = in;
+ r = process_message (task);
+ if (r == -1) {
+ msg_warn ("read_socket: processing of message failed");
+ task->last_error = "MIME processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ }
+ else {
+ /* Plan new event for writing */
+ cur = task->text_parts;
+
+ while (cur) {
+ part = cur->data;
+ selected = (struct storage_server *)get_upstream_by_hash (fuzzy_module_ctx->servers, fuzzy_module_ctx->servers_num,
+ sizeof (struct storage_server), task->ts.tv_sec,
+ DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME,
+ DEFAULT_UPSTREAM_MAXERRORS,
+ part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
+ if (selected) {
+ if ((sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
+ msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
+ }
+ else {
+ s = memory_pool_alloc (session->session_pool, sizeof (struct fuzzy_learn_session));
+ event_set (&s->ev, sock, EV_WRITE, fuzzy_learn_callback, s);
+ s->tv.tv_sec = IO_TIMEOUT;
+ s->tv.tv_usec = 0;
+ s->task = task;
+ s->h = part->fuzzy;
+ s->session = session;
+ event_add (&s->ev, &s->tv);
+ }
+ }
+ cur = g_list_next (cur);
+ }
+ }
+
+}
+
+static void
+fuzzy_controller_handler (char **args, struct controller_session *session)
+{
+ char *arg, out_buf[BUFSIZ], *err_str;
+ uint32_t size;
+ int r;
+
+ arg = *args;
+ if (!arg || *arg == '\0') {
+ msg_info ("fuzzy_controller_handler: empty content length");
+ r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+ return;
+ }
+
+ size = strtoul (arg, &err_str, 10);
+ if (err_str && *err_str != '\0') {
+ msg_debug ("process_command: statfile size is invalid: %s", arg);
+ r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+ return;
+ }
+
+ session->state = STATE_OTHER;
+ rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size);
+ session->other_handler = fuzzy_process_handler;
+}
+
static int
fuzzy_mime_filter (struct worker_task *task)
{