diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-07-29 20:17:29 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-07-29 20:17:29 +0400 |
commit | c4621d35aebaf9c6d466bf7a15a9de340b20b0ce (patch) | |
tree | 75553d38928ccbb2b029bb7646b67ab8b0afac8f /src/plugins | |
parent | eb9facb84cad4017db7ac877b5a0446472756308 (diff) | |
download | rspamd-c4621d35aebaf9c6d466bf7a15a9de340b20b0ce.tar.gz rspamd-c4621d35aebaf9c6d466bf7a15a9de340b20b0ce.zip |
* Add support for extending controller protocol by modules
* Add write support via controller to fuzzy storage
TODO: Add delete and check commands support to controller interface
Diffstat (limited to 'src/plugins')
-rw-r--r-- | src/plugins/fuzzy_check.c | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index 085ee5164..e4fdaba93 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -68,6 +68,14 @@ struct fuzzy_client_session { struct worker_task *task; }; +struct fuzzy_learn_session { + struct event ev; + fuzzy_hash_t *h; + struct timeval tv; + struct controller_session *session; + struct worker_task *task; +}; + static struct fuzzy_ctx *fuzzy_module_ctx = NULL; static int fuzzy_mime_filter (struct worker_task *task); @@ -91,6 +99,7 @@ parse_servers_string (char *str) if ((p = strchr (strvec[i], ':')) != NULL) { j = 0; + p ++; while (g_ascii_isdigit (*p) && j < sizeof (portbuf) - 1) { portbuf[j ++] = *p ++; } @@ -244,6 +253,51 @@ fuzzy_io_callback (int fd, short what, void *arg) } +static void +fuzzy_learn_callback (int fd, short what, void *arg) +{ + struct fuzzy_learn_session *session = arg; + struct fuzzy_cmd cmd; + char buf[sizeof ("ERR")]; + + if (what == EV_WRITE) { + /* Send command to storage */ + cmd.blocksize = session->h->block_size; + memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash)); + cmd.cmd = FUZZY_WRITE; + if (write (fd, &cmd, sizeof (struct fuzzy_cmd)) == -1) { + goto err; + } + else { + event_set (&session->ev, fd, EV_READ, fuzzy_io_callback, session); + event_add (&session->ev, &session->tv); + } + } + else if (what == EV_READ) { + if (read (fd, buf, sizeof (buf)) == -1) { + goto err; + } + else if (buf[0] == 'O' && buf[1] == 'K') { + insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, 1, NULL); + } + goto ok; + } + + return; + + err: + msg_err ("fuzzy_io_callback: got error on IO, %d, %s", errno, strerror (errno)); + ok: + event_del (&session->ev); + close (fd); + session->task->save.saved --; + if (session->task->save.saved == 0) { + /* Call other filters */ + session->task->save.saved = 1; + process_filters (session->task); + } +} + static void fuzzy_symbol_callback (struct worker_task *task, void *unused) { @@ -282,6 +336,88 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused) } } +static void +fuzzy_process_handler (struct controller_session *session, f_str_t *in) +{ + struct worker_task *task; + struct fuzzy_learn_session *s; + struct mime_text_part *part; + struct storage_server *selected; + GList *cur; + int sock, r; + + task = construct_task (session->worker); + session->other_data = task; + session->state = STATE_WAIT; + + task->msg = in; + r = process_message (task); + if (r == -1) { + msg_warn ("read_socket: processing of message failed"); + task->last_error = "MIME processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + } + else { + /* Plan new event for writing */ + cur = task->text_parts; + + while (cur) { + part = cur->data; + selected = (struct storage_server *)get_upstream_by_hash (fuzzy_module_ctx->servers, fuzzy_module_ctx->servers_num, + sizeof (struct storage_server), task->ts.tv_sec, + DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, + DEFAULT_UPSTREAM_MAXERRORS, + part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe)); + if (selected) { + if ((sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) { + msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno)); + } + else { + s = memory_pool_alloc (session->session_pool, sizeof (struct fuzzy_learn_session)); + event_set (&s->ev, sock, EV_WRITE, fuzzy_learn_callback, s); + s->tv.tv_sec = IO_TIMEOUT; + s->tv.tv_usec = 0; + s->task = task; + s->h = part->fuzzy; + s->session = session; + event_add (&s->ev, &s->tv); + } + } + cur = g_list_next (cur); + } + } + +} + +static void +fuzzy_controller_handler (char **args, struct controller_session *session) +{ + char *arg, out_buf[BUFSIZ], *err_str; + uint32_t size; + int r; + + arg = *args; + if (!arg || *arg == '\0') { + msg_info ("fuzzy_controller_handler: empty content length"); + r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); + return; + } + + size = strtoul (arg, &err_str, 10); + if (err_str && *err_str != '\0') { + msg_debug ("process_command: statfile size is invalid: %s", arg); + r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); + return; + } + + session->state = STATE_OTHER; + rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size); + session->other_handler = fuzzy_process_handler; +} + static int fuzzy_mime_filter (struct worker_task *task) { |