diff options
Diffstat (limited to 'src/controller.c')
-rw-r--r-- | src/controller.c | 210 |
1 files changed, 90 insertions, 120 deletions
diff --git a/src/controller.c b/src/controller.c index 41f8a9649..7733bc924 100644 --- a/src/controller.c +++ b/src/controller.c @@ -36,6 +36,9 @@ #define CRLF "\r\n" #define END "END" CRLF +/* 120 seconds for controller's IO */ +#define CONTROLLER_IO_TIMEOUT 120 + enum command_type { COMMAND_PASSWORD, COMMAND_QUIT, @@ -68,6 +71,7 @@ static GCompletion *comp; static time_t start_time; static char greetingbuf[1024]; +static struct timeval io_tv; static void sig_handler (int signo) @@ -110,8 +114,7 @@ free_session (struct controller_session *session) struct mime_part *p; msg_debug ("free_session: freeing session %p", session); - bufferevent_disable (session->bev, EV_READ | EV_WRITE); - bufferevent_free (session->bev); + rspamd_remove_dispatcher (session->dispatcher); while ((part = g_list_first (session->parts))) { session->parts = g_list_remove_link (session->parts, part); @@ -132,7 +135,7 @@ check_auth (struct controller_command *cmd, struct controller_session *session) if (cmd->privilleged && !session->authorized) { r = snprintf (out_buf, sizeof (out_buf), "not authorized" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return 0; } @@ -156,18 +159,18 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control if (!arg || *arg == '\0') { msg_debug ("process_command: empty password passed"); r = snprintf (out_buf, sizeof (out_buf), "password command requires one argument" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } if (strncmp (arg, session->cfg->control_password, strlen (arg)) == 0) { session->authorized = 1; r = snprintf (out_buf, sizeof (out_buf), "password accepted" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); } else { session->authorized = 0; r = snprintf (out_buf, sizeof (out_buf), "password NOT accepted" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); } break; case COMMAND_QUIT: @@ -176,7 +179,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control case COMMAND_RELOAD: if (check_auth (cmd, session)) { r = snprintf (out_buf, sizeof (out_buf), "reload request sent" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); kill (getppid (), SIGHUP); } break; @@ -199,13 +202,13 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control mem_st.shared_chunks_allocated); r += snprintf (out_buf + r, sizeof (out_buf) - r, "Chunks freed: %zd" CRLF, mem_st.chunks_freed); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); } break; case COMMAND_SHUTDOWN: if (check_auth (cmd, session)) { r = snprintf (out_buf, sizeof (out_buf), "shutdown request sent" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); kill (getppid (), SIGTERM); } break; @@ -235,7 +238,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control minutes, minutes > 1 ? "s" : " ", (int)uptime, uptime > 1 ? "s" : " "); } - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); } break; case COMMAND_LEARN: @@ -244,37 +247,28 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control if (!arg || *arg == '\0') { msg_debug ("process_command: no statfile specified in learn command"); r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } arg = *(cmd_args + 1); if (arg == NULL || *arg == '\0') { msg_debug ("process_command: no statfile size specified in learn command"); r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } size = strtoul (arg, &err_str, 10); if (err_str && *err_str != '\0') { msg_debug ("process_command: statfile size is invalid: %s", arg); r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF); - bufferevent_write (session->bev, out_buf, r); - return; - } - session->learn_buf = memory_pool_alloc0 (session->session_pool, sizeof (f_str_buf_t)); - session->learn_buf->buf = fstralloc (session->session_pool, size); - if (session->learn_buf->buf == NULL) { - r = snprintf (out_buf, sizeof (out_buf), "allocating buffer for learn failed" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } - session->learn_buf->pos = session->learn_buf->buf->begin; - update_buf_size (session->learn_buf); statfile = g_hash_table_lookup (session->cfg->statfiles, *cmd_args); if (statfile == NULL) { r = snprintf (out_buf, sizeof (out_buf), "statfile %s is not defined" CRLF, *cmd_args); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } @@ -302,7 +296,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control arg = *(cmd_args + 1); if (!arg || *arg == '\0') { r = snprintf (out_buf, sizeof (out_buf), "recipient is not defined" CRLF, arg); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } session->learn_rcpt = memory_pool_strdup (session->session_pool, arg); @@ -311,7 +305,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control arg = *(cmd_args + 1); if (!arg || *arg == '\0') { r = snprintf (out_buf, sizeof (out_buf), "from is not defined" CRLF, arg); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } session->learn_from = memory_pool_strdup (session->session_pool, arg); @@ -321,7 +315,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control break; default: r = snprintf (out_buf, sizeof (out_buf), "tokenizer is not defined" CRLF, arg); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } } @@ -333,15 +327,16 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control if (statfile_pool_create (session->worker->srv->statfile_pool, session->learn_filename, statfile->size / sizeof (struct stat_file_block)) == -1) { r = snprintf (out_buf, sizeof (out_buf), "cannot create statfile %s" CRLF, session->learn_filename); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } if (statfile_pool_open (session->worker->srv->statfile_pool, session->learn_filename) == -1) { r = snprintf (out_buf, sizeof (out_buf), "cannot open statfile %s" CRLF, session->learn_filename); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } } + rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size); session->state = STATE_LEARN; } break; @@ -355,13 +350,13 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control "(*) shutdown - shutdown rspamd" CRLF " stat - show different rspamd stat" CRLF " uptime - rspamd uptime" CRLF); - bufferevent_write (session->bev, out_buf, r); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); break; } } static void -read_socket (struct bufferevent *bev, void *arg) +read_socket (f_str_t *in, void *arg) { struct controller_session *session = (struct controller_session *)arg; struct classifier_ctx *cls_ctx; @@ -375,101 +370,73 @@ read_socket (struct bufferevent *bev, void *arg) switch (session->state) { case STATE_COMMAND: - s = buffer_readline (session->session_pool, EVBUFFER_INPUT (bev)); - msg_debug ("read_socket: got '%s' string from user", s); - if (s != NULL && *s != 0) { - len = strlen (s); - /* Remove end of line characters from string */ - if (s[len - 1] == '\n') { - if (s[len - 2] == '\r') { - s[len - 2] = 0; - } - s[len - 1] = 0; - } - params = g_strsplit (s, " ", -1); - len = g_strv_length (params); - if (len > 0) { - cmd = g_strstrip (params[0]); - comp_list = g_completion_complete (comp, cmd, NULL); - switch (g_list_length (comp_list)) { - case 1: - process_command ((struct controller_command *)comp_list->data, ¶ms[1], session); - break; - case 0: - msg_debug ("Unknown command: '%s'", cmd); - i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF); - bufferevent_write (bev, out_buf, i); - break; - default: - msg_debug ("Ambigious command: '%s'", cmd); - i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF); - bufferevent_write (bev, out_buf, i); - break; - } + s = fstrcstr (in, session->session_pool); + params = g_strsplit (s, " ", -1); + len = g_strv_length (params); + if (len > 0) { + cmd = g_strstrip (params[0]); + comp_list = g_completion_complete (comp, cmd, NULL); + switch (g_list_length (comp_list)) { + case 1: + process_command ((struct controller_command *)comp_list->data, ¶ms[1], session); + break; + case 0: + msg_debug ("Unknown command: '%s'", cmd); + i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE); + break; + default: + msg_debug ("Ambigious command: '%s'", cmd); + i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE); + break; } - if (session->state == STATE_COMMAND) { - session->state = STATE_REPLY; - } - if (session->state != STATE_LEARN) { - bufferevent_write (bev, END, sizeof (END) - 1); - bufferevent_enable (bev, EV_WRITE); - } - g_strfreev (params); } - else { - bufferevent_enable (bev, EV_WRITE); - } + if (session->state == STATE_COMMAND) { + session->state = STATE_REPLY; + } + if (session->state != STATE_LEARN) { + rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE); + } + + g_strfreev (params); break; case STATE_LEARN: - i = bufferevent_read (bev, session->learn_buf->pos, session->learn_buf->free); - if (i > 0) { - session->learn_buf->pos += i; - update_buf_size (session->learn_buf); - if (session->learn_buf->free == 0) { - process_learn (session); - while ((content = get_next_text_part (session->session_pool, session->parts, &cur)) != NULL) { - c.begin = content->data; - c.len = content->len; - if (!session->learn_tokenizer->tokenize_func (session->learn_tokenizer, - session->session_pool, &c, &tokens)) { - i = snprintf (out_buf, sizeof (out_buf), "learn fail, tokenizer error" CRLF); - bufferevent_write (bev, out_buf, i); - session->state = STATE_REPLY; - return; - } - } - cls_ctx = session->learn_classifier->init_func (session->session_pool); - session->learn_classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool, - session->learn_filename, tokens, session->in_class); - session->worker->srv->stat->messages_learned ++; - i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF); - bufferevent_write (bev, out_buf, i); - bufferevent_enable (bev, EV_WRITE); - - /* Clean learned parts */ - while ((cur = g_list_first (session->parts))) { - session->parts = g_list_remove_link (session->parts, cur); - p = (struct mime_part *)cur->data; - g_byte_array_free (p->content, FALSE); - g_list_free_1 (cur); - } - + session->learn_buf = in; + process_learn (session); + while ((content = get_next_text_part (session->session_pool, session->parts, &cur)) != NULL) { + c.begin = content->data; + c.len = content->len; + if (!session->learn_tokenizer->tokenize_func (session->learn_tokenizer, + session->session_pool, &c, &tokens)) { + i = snprintf (out_buf, sizeof (out_buf), "learn fail, tokenizer error" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE); session->state = STATE_REPLY; - break; + return; } } - else { - i = snprintf (out_buf, sizeof (out_buf), "read error: %d" CRLF, i); - bufferevent_write (bev, out_buf, i); - bufferevent_enable (bev, EV_WRITE); - session->state = STATE_REPLY; + cls_ctx = session->learn_classifier->init_func (session->session_pool); + session->learn_classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool, + session->learn_filename, tokens, session->in_class); + session->worker->srv->stat->messages_learned ++; + i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE); + + /* Clean learned parts */ + while ((cur = g_list_first (session->parts))) { + session->parts = g_list_remove_link (session->parts, cur); + p = (struct mime_part *)cur->data; + g_byte_array_free (p->content, FALSE); + g_list_free_1 (cur); } + + session->state = STATE_REPLY; break; } } static void -write_socket (struct bufferevent *bev, void *arg) +write_socket (void *arg) { struct controller_session *session = (struct controller_session *)arg; @@ -482,16 +449,15 @@ write_socket (struct bufferevent *bev, void *arg) } else if (session->state == STATE_REPLY) { session->state = STATE_COMMAND; + rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_LINE, BUFSIZ); } - bufferevent_disable (bev, EV_WRITE); - bufferevent_enable (bev, EV_READ); } static void -err_socket (struct bufferevent *bev, short what, void *arg) +err_socket (GError *err, void *arg) { struct controller_session *session = (struct controller_session *)arg; - msg_info ("closing control connection"); + msg_info ("err_socket: abnormally closing control connection, error: %s", err->message); /* Free buffers */ free_session (session); } @@ -525,10 +491,11 @@ accept_socket (int fd, short what, void *arg) new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1); worker->srv->stat->control_connections_count ++; - /* Read event */ - new_session->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_session); - bufferevent_write (new_session->bev, greetingbuf, strlen (greetingbuf)); - bufferevent_enable (new_session->bev, EV_WRITE); + /* Set up dispatcher */ + new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, + write_socket, err_socket, &io_tv, + (void *)new_session); + rspamd_dispatcher_write (new_session->dispatcher, greetingbuf, strlen (greetingbuf), FALSE); } void @@ -592,6 +559,9 @@ start_controller (struct rspamd_worker *worker) /* Send SIGUSR2 to parent */ kill (getppid (), SIGUSR2); + + io_tv.tv_sec = CONTROLLER_IO_TIMEOUT; + io_tv.tv_usec = 0; event_loop (0); } |