From: Vsevolod Stakhov Date: Wed, 29 Jul 2009 16:17:29 +0000 (+0400) Subject: * Add support for extending controller protocol by modules X-Git-Tag: 0.2.7~62 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=c4621d35aebaf9c6d466bf7a15a9de340b20b0ce;p=rspamd.git * Add support for extending controller protocol by modules * Add write support via controller to fuzzy storage TODO: Add delete and check commands support to controller interface --- diff --git a/src/controller.c b/src/controller.c index 87e7f1b0f..a4c0a7bb4 100644 --- a/src/controller.c +++ b/src/controller.c @@ -53,22 +53,31 @@ enum command_type { struct controller_command { char *command; - int privilleged; + gboolean privilleged; enum command_type type; }; +struct custom_controller_command { + const char *command; + gboolean privilleged; + gboolean require_message; + controller_func_t handler; +}; + static struct controller_command commands[] = { - {"password", 0, COMMAND_PASSWORD}, - {"quit", 0, COMMAND_QUIT}, - {"reload", 1, COMMAND_RELOAD}, - {"stat", 0, COMMAND_STAT}, - {"shutdown", 1, COMMAND_SHUTDOWN}, - {"uptime", 0, COMMAND_UPTIME}, - {"learn", 1, COMMAND_LEARN}, - {"help", 0, COMMAND_HELP}, - {"counters", 0, COMMAND_COUNTERS}, + {"password", FALSE, COMMAND_PASSWORD}, + {"quit", FALSE, COMMAND_QUIT}, + {"reload", TRUE, COMMAND_RELOAD}, + {"stat", FALSE, COMMAND_STAT}, + {"shutdown", TRUE, COMMAND_SHUTDOWN}, + {"uptime", FALSE, COMMAND_UPTIME}, + {"learn", TRUE, COMMAND_LEARN}, + {"help", FALSE, COMMAND_HELP}, + {"counters", FALSE, COMMAND_COUNTERS}, }; +static GList *custom_commands = NULL; + static GCompletion *comp; static time_t start_time; @@ -395,6 +404,26 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control } } +static gboolean +process_custom_command (char *line, char **cmd_args, struct controller_session *session) +{ + GList *cur; + struct custom_controller_command *cmd; + + cur = custom_commands; + while (cur) { + cmd = cur->data; + if (g_ascii_strcasecmp (cmd->command, line) == 0) { + /* Call handler */ + cmd->handler (cmd_args, session); + return TRUE; + } + cur = g_list_next (cur); + } + + return FALSE; +} + static void controller_read_socket (f_str_t *in, void *arg) { @@ -421,9 +450,11 @@ controller_read_socket (f_str_t *in, void *arg) process_command ((struct controller_command *)comp_list->data, ¶ms[1], session); break; case 0: - msg_debug ("Unknown command: '%s'", cmd); - i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF); - rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE); + if (!process_custom_command (cmd, ¶ms[1], session)) { + msg_debug ("Unknown command: '%s'", cmd); + i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE); + } break; default: msg_debug ("Ambigious command: '%s'", cmd); @@ -472,6 +503,15 @@ controller_read_socket (f_str_t *in, void *arg) session->state = STATE_REPLY; break; + case STATE_OTHER: + if (session->other_handler) { + session->other_handler (session, in); + } + session->state = STATE_REPLY; + break; + case STATE_WAIT: + rspamd_dispatcher_pause (session->dispatcher); + break; default: msg_debug ("controller_read_socket: unknown state while reading %d", session->state); break; @@ -595,6 +635,18 @@ start_controller (struct rspamd_worker *worker) exit (EXIT_SUCCESS); } +void +register_custom_controller_command (const char *name, controller_func_t handler, gboolean privilleged, gboolean require_message) +{ + struct custom_controller_command *cmd; + + cmd->command = name; + cmd->handler = handler; + cmd->privilleged = privilleged; + cmd->require_message = require_message; + + custom_commands = g_list_prepend (custom_commands, cmd); +} /* * vi:ts=4 diff --git a/src/lmtp.c b/src/lmtp.c index d1928cf5c..6554ba4f4 100644 --- a/src/lmtp.c +++ b/src/lmtp.c @@ -85,13 +85,13 @@ rcpt_destruct (void *pointer) * Free all structures of lmtp proto */ static void -free_task (struct rspamd_lmtp_proto *lmtp, gboolean is_soft) +free_lmtp_task (struct rspamd_lmtp_proto *lmtp, gboolean is_soft) { GList *part; struct mime_part *p; if (lmtp) { - msg_debug ("free_task: free pointer %p", lmtp->task); + msg_debug ("free_lmtp_task: free pointer %p", lmtp->task); if (lmtp->task->memc_ctx) { memc_close_ctx (lmtp->task->memc_ctx); } @@ -188,7 +188,7 @@ lmtp_write_socket (void *arg) break; case CLOSING_CONNECTION: msg_debug ("lmtp_write_socket: normally closing connection"); - free_task (lmtp, TRUE); + free_lmtp_task (lmtp, TRUE); break; default: msg_debug ("lmtp_write_socket: invalid state while writing to socket %d", lmtp->task->state); @@ -205,7 +205,7 @@ lmtp_err_socket (GError *err, void *arg) struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg; msg_info ("lmtp_err_socket: abnormally closing connection, error: %s", err->message); /* Free buffers */ - free_task (lmtp, FALSE); + free_lmtp_task (lmtp, FALSE); } /* diff --git a/src/main.h b/src/main.h index e87b4efde..ee34a2cae 100644 --- a/src/main.h +++ b/src/main.h @@ -117,6 +117,8 @@ struct save_point { unsigned int saved; /**< how much time we have delayed processing */ }; + + /** * Control session object */ @@ -127,6 +129,8 @@ struct controller_session { STATE_LEARN, STATE_REPLY, STATE_QUIT, + STATE_OTHER, + STATE_WAIT, } state; /**< current session state */ int sock; /**< socket descriptor */ /* Access to authorized commands */ @@ -142,8 +146,13 @@ struct controller_session { f_str_t *learn_buf; /**< learn input */ GList *parts; /**< extracted mime parts */ int in_class; /**< positive or negative learn */ + void (*other_handler)(struct controller_session *session, + f_str_t *in); /**< other command handler to execute at the end of processing */ + void *other_data; /**< and its data */ }; +typedef void (*controller_func_t)(char **args, struct controller_session *session); + /** * Worker task structure */ @@ -213,6 +222,20 @@ struct c_module { void start_worker (struct rspamd_worker *worker); void start_controller (struct rspamd_worker *worker); +/** + * Register custom controller function + */ +void register_custom_controller_command (const char *name, controller_func_t handler, gboolean privilleged, gboolean require_message); + +/** + * Construct new task for worker + */ +struct worker_task* construct_task (struct rspamd_worker *worker); +/** + * Destroy task object and remove its IO dispatcher if it exists + */ +void free_task (struct worker_task *task, gboolean is_soft); + /** * If set, reopen log file on next write */ diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index 085ee5164..e4fdaba93 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -68,6 +68,14 @@ struct fuzzy_client_session { struct worker_task *task; }; +struct fuzzy_learn_session { + struct event ev; + fuzzy_hash_t *h; + struct timeval tv; + struct controller_session *session; + struct worker_task *task; +}; + static struct fuzzy_ctx *fuzzy_module_ctx = NULL; static int fuzzy_mime_filter (struct worker_task *task); @@ -91,6 +99,7 @@ parse_servers_string (char *str) if ((p = strchr (strvec[i], ':')) != NULL) { j = 0; + p ++; while (g_ascii_isdigit (*p) && j < sizeof (portbuf) - 1) { portbuf[j ++] = *p ++; } @@ -244,6 +253,51 @@ fuzzy_io_callback (int fd, short what, void *arg) } +static void +fuzzy_learn_callback (int fd, short what, void *arg) +{ + struct fuzzy_learn_session *session = arg; + struct fuzzy_cmd cmd; + char buf[sizeof ("ERR")]; + + if (what == EV_WRITE) { + /* Send command to storage */ + cmd.blocksize = session->h->block_size; + memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash)); + cmd.cmd = FUZZY_WRITE; + if (write (fd, &cmd, sizeof (struct fuzzy_cmd)) == -1) { + goto err; + } + else { + event_set (&session->ev, fd, EV_READ, fuzzy_io_callback, session); + event_add (&session->ev, &session->tv); + } + } + else if (what == EV_READ) { + if (read (fd, buf, sizeof (buf)) == -1) { + goto err; + } + else if (buf[0] == 'O' && buf[1] == 'K') { + insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, 1, NULL); + } + goto ok; + } + + return; + + err: + msg_err ("fuzzy_io_callback: got error on IO, %d, %s", errno, strerror (errno)); + ok: + event_del (&session->ev); + close (fd); + session->task->save.saved --; + if (session->task->save.saved == 0) { + /* Call other filters */ + session->task->save.saved = 1; + process_filters (session->task); + } +} + static void fuzzy_symbol_callback (struct worker_task *task, void *unused) { @@ -282,6 +336,88 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused) } } +static void +fuzzy_process_handler (struct controller_session *session, f_str_t *in) +{ + struct worker_task *task; + struct fuzzy_learn_session *s; + struct mime_text_part *part; + struct storage_server *selected; + GList *cur; + int sock, r; + + task = construct_task (session->worker); + session->other_data = task; + session->state = STATE_WAIT; + + task->msg = in; + r = process_message (task); + if (r == -1) { + msg_warn ("read_socket: processing of message failed"); + task->last_error = "MIME processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + } + else { + /* Plan new event for writing */ + cur = task->text_parts; + + while (cur) { + part = cur->data; + selected = (struct storage_server *)get_upstream_by_hash (fuzzy_module_ctx->servers, fuzzy_module_ctx->servers_num, + sizeof (struct storage_server), task->ts.tv_sec, + DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, + DEFAULT_UPSTREAM_MAXERRORS, + part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe)); + if (selected) { + if ((sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) { + msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno)); + } + else { + s = memory_pool_alloc (session->session_pool, sizeof (struct fuzzy_learn_session)); + event_set (&s->ev, sock, EV_WRITE, fuzzy_learn_callback, s); + s->tv.tv_sec = IO_TIMEOUT; + s->tv.tv_usec = 0; + s->task = task; + s->h = part->fuzzy; + s->session = session; + event_add (&s->ev, &s->tv); + } + } + cur = g_list_next (cur); + } + } + +} + +static void +fuzzy_controller_handler (char **args, struct controller_session *session) +{ + char *arg, out_buf[BUFSIZ], *err_str; + uint32_t size; + int r; + + arg = *args; + if (!arg || *arg == '\0') { + msg_info ("fuzzy_controller_handler: empty content length"); + r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); + return; + } + + size = strtoul (arg, &err_str, 10); + if (err_str && *err_str != '\0') { + msg_debug ("process_command: statfile size is invalid: %s", arg); + r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); + return; + } + + session->state = STATE_OTHER; + rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size); + session->other_handler = fuzzy_process_handler; +} + static int fuzzy_mime_filter (struct worker_task *task) { diff --git a/src/worker.c b/src/worker.c index 726f7fe00..4832c71fc 100644 --- a/src/worker.c +++ b/src/worker.c @@ -4,11 +4,11 @@ * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED @@ -40,8 +40,8 @@ #include #ifndef WITHOUT_PERL -#include /* from the Perl distribution */ -#include /* from the Perl distribution */ +#include /* from the Perl distribution */ +#include /* from the Perl distribution */ extern PerlInterpreter *perl_interpreter; #endif @@ -111,7 +111,7 @@ rcpt_destruct (void *pointer) /* * Free all structures of worker_task */ -static void +void free_task (struct worker_task *task, gboolean is_soft) { GList *part; @@ -135,14 +135,18 @@ free_task (struct worker_task *task, gboolean is_soft) g_list_free (task->urls); } memory_pool_delete (task->task_pool); - if (is_soft) { - /* Plan dispatcher shutdown */ - task->dispatcher->wanna_die = 1; + if (task->dispatcher) { + if (is_soft) { + /* Plan dispatcher shutdown */ + task->dispatcher->wanna_die = 1; + } + else { + rspamd_remove_dispatcher (task->dispatcher); + } } - else { - rspamd_remove_dispatcher (task->dispatcher); + if (task->sock != -1) { + close (task->sock); } - close (task->sock); g_free (task); } } @@ -174,13 +178,13 @@ read_socket (f_str_t *in, void *arg) task->msg->len = in->len; msg_debug ("read_socket: got string of length %ld", (long int)task->msg->len); r = process_message (task); - if (r == -1) { - msg_warn ("read_socket: processing of message failed"); + if (r == -1) { + msg_warn ("read_socket: processing of message failed"); task->last_error = "MIME processing error"; task->error_code = RSPAMD_FILTER_ERROR; task->state = WRITE_ERROR; write_socket (task); - } + } if (task->cmd == CMD_OTHER) { /* Skip filters */ task->state = WRITE_REPLY; @@ -249,36 +253,10 @@ err_socket (GError *err, void *arg) free_task (task, FALSE); } -/* - * Accept new connection and construct task - */ -static void -accept_socket (int fd, short what, void *arg) +struct worker_task * +construct_task (struct rspamd_worker *worker) { - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - struct sockaddr_storage ss; - struct sockaddr_in *sin; struct worker_task *new_task; - socklen_t addrlen = sizeof(ss); - int nfd; - - if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { - msg_warn ("accept_socket: accept failed: %s", strerror (errno)); - return; - } - /* Check for EAGAIN */ - if (nfd == 0) { - msg_debug ("accept_socket: cannot accept socket as it was already accepted by other worker"); - return; - } - - if (ss.ss_family == AF_UNIX) { - msg_info ("accept_socket: accepted connection from unix socket"); - } - else if (ss.ss_family == AF_INET) { - sin = (struct sockaddr_in *) &ss; - msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port)); - } new_task = g_malloc (sizeof (struct worker_task)); @@ -286,7 +264,6 @@ accept_socket (int fd, short what, void *arg) bzero (new_task, sizeof (struct worker_task)); new_task->worker = worker; new_task->state = READ_COMMAND; - new_task->sock = nfd; new_task->cfg = worker->srv->cfg; new_task->from_addr.s_addr = INADDR_NONE; new_task->view_checked = FALSE; @@ -307,12 +284,50 @@ accept_socket (int fd, short what, void *arg) new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal); memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->re_cache); + return new_task; +} + +/* + * Accept new connection and construct task + */ +static void +accept_socket (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + struct sockaddr_storage ss; + struct sockaddr_in *sin; + struct worker_task *new_task; + socklen_t addrlen = sizeof(ss); + int nfd; + + if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { + msg_warn ("accept_socket: accept failed: %s", strerror (errno)); + return; + } + /* Check for EAGAIN */ + if (nfd == 0) { + msg_debug ("accept_socket: cannot accept socket as it was already accepted by other worker"); + return; + } + + if (ss.ss_family == AF_UNIX) { + msg_info ("accept_socket: accepted connection from unix socket"); + } + else if (ss.ss_family == AF_INET) { + sin = (struct sockaddr_in *) &ss; + msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port)); + } + + new_task = construct_task (worker); + + new_task->sock = nfd; worker->srv->stat->connections_count ++; /* Set up dispatcher */ new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &io_tv, (void *)new_task); + } /*