aboutsummaryrefslogtreecommitdiffstats
path: root/src/controller.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/controller.c')
-rw-r--r--src/controller.c210
1 files changed, 90 insertions, 120 deletions
diff --git a/src/controller.c b/src/controller.c
index 41f8a9649..7733bc924 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -36,6 +36,9 @@
#define CRLF "\r\n"
#define END "END" CRLF
+/* 120 seconds for controller's IO */
+#define CONTROLLER_IO_TIMEOUT 120
+
enum command_type {
COMMAND_PASSWORD,
COMMAND_QUIT,
@@ -68,6 +71,7 @@ static GCompletion *comp;
static time_t start_time;
static char greetingbuf[1024];
+static struct timeval io_tv;
static
void sig_handler (int signo)
@@ -110,8 +114,7 @@ free_session (struct controller_session *session)
struct mime_part *p;
msg_debug ("free_session: freeing session %p", session);
- bufferevent_disable (session->bev, EV_READ | EV_WRITE);
- bufferevent_free (session->bev);
+ rspamd_remove_dispatcher (session->dispatcher);
while ((part = g_list_first (session->parts))) {
session->parts = g_list_remove_link (session->parts, part);
@@ -132,7 +135,7 @@ check_auth (struct controller_command *cmd, struct controller_session *session)
if (cmd->privilleged && !session->authorized) {
r = snprintf (out_buf, sizeof (out_buf), "not authorized" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return 0;
}
@@ -156,18 +159,18 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
if (!arg || *arg == '\0') {
msg_debug ("process_command: empty password passed");
r = snprintf (out_buf, sizeof (out_buf), "password command requires one argument" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
if (strncmp (arg, session->cfg->control_password, strlen (arg)) == 0) {
session->authorized = 1;
r = snprintf (out_buf, sizeof (out_buf), "password accepted" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
}
else {
session->authorized = 0;
r = snprintf (out_buf, sizeof (out_buf), "password NOT accepted" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
}
break;
case COMMAND_QUIT:
@@ -176,7 +179,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
case COMMAND_RELOAD:
if (check_auth (cmd, session)) {
r = snprintf (out_buf, sizeof (out_buf), "reload request sent" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
kill (getppid (), SIGHUP);
}
break;
@@ -199,13 +202,13 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
mem_st.shared_chunks_allocated);
r += snprintf (out_buf + r, sizeof (out_buf) - r, "Chunks freed: %zd" CRLF,
mem_st.chunks_freed);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
}
break;
case COMMAND_SHUTDOWN:
if (check_auth (cmd, session)) {
r = snprintf (out_buf, sizeof (out_buf), "shutdown request sent" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
kill (getppid (), SIGTERM);
}
break;
@@ -235,7 +238,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
minutes, minutes > 1 ? "s" : " ",
(int)uptime, uptime > 1 ? "s" : " ");
}
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
}
break;
case COMMAND_LEARN:
@@ -244,37 +247,28 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
if (!arg || *arg == '\0') {
msg_debug ("process_command: no statfile specified in learn command");
r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
arg = *(cmd_args + 1);
if (arg == NULL || *arg == '\0') {
msg_debug ("process_command: no statfile size specified in learn command");
r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
size = strtoul (arg, &err_str, 10);
if (err_str && *err_str != '\0') {
msg_debug ("process_command: statfile size is invalid: %s", arg);
r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
- bufferevent_write (session->bev, out_buf, r);
- return;
- }
- session->learn_buf = memory_pool_alloc0 (session->session_pool, sizeof (f_str_buf_t));
- session->learn_buf->buf = fstralloc (session->session_pool, size);
- if (session->learn_buf->buf == NULL) {
- r = snprintf (out_buf, sizeof (out_buf), "allocating buffer for learn failed" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
- session->learn_buf->pos = session->learn_buf->buf->begin;
- update_buf_size (session->learn_buf);
statfile = g_hash_table_lookup (session->cfg->statfiles, *cmd_args);
if (statfile == NULL) {
r = snprintf (out_buf, sizeof (out_buf), "statfile %s is not defined" CRLF, *cmd_args);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
@@ -302,7 +296,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
arg = *(cmd_args + 1);
if (!arg || *arg == '\0') {
r = snprintf (out_buf, sizeof (out_buf), "recipient is not defined" CRLF, arg);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
session->learn_rcpt = memory_pool_strdup (session->session_pool, arg);
@@ -311,7 +305,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
arg = *(cmd_args + 1);
if (!arg || *arg == '\0') {
r = snprintf (out_buf, sizeof (out_buf), "from is not defined" CRLF, arg);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
session->learn_from = memory_pool_strdup (session->session_pool, arg);
@@ -321,7 +315,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
break;
default:
r = snprintf (out_buf, sizeof (out_buf), "tokenizer is not defined" CRLF, arg);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
}
@@ -333,15 +327,16 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
if (statfile_pool_create (session->worker->srv->statfile_pool,
session->learn_filename, statfile->size / sizeof (struct stat_file_block)) == -1) {
r = snprintf (out_buf, sizeof (out_buf), "cannot create statfile %s" CRLF, session->learn_filename);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
if (statfile_pool_open (session->worker->srv->statfile_pool, session->learn_filename) == -1) {
r = snprintf (out_buf, sizeof (out_buf), "cannot open statfile %s" CRLF, session->learn_filename);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
}
+ rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size);
session->state = STATE_LEARN;
}
break;
@@ -355,13 +350,13 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
"(*) shutdown - shutdown rspamd" CRLF
" stat - show different rspamd stat" CRLF
" uptime - rspamd uptime" CRLF);
- bufferevent_write (session->bev, out_buf, r);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
break;
}
}
static void
-read_socket (struct bufferevent *bev, void *arg)
+read_socket (f_str_t *in, void *arg)
{
struct controller_session *session = (struct controller_session *)arg;
struct classifier_ctx *cls_ctx;
@@ -375,101 +370,73 @@ read_socket (struct bufferevent *bev, void *arg)
switch (session->state) {
case STATE_COMMAND:
- s = buffer_readline (session->session_pool, EVBUFFER_INPUT (bev));
- msg_debug ("read_socket: got '%s' string from user", s);
- if (s != NULL && *s != 0) {
- len = strlen (s);
- /* Remove end of line characters from string */
- if (s[len - 1] == '\n') {
- if (s[len - 2] == '\r') {
- s[len - 2] = 0;
- }
- s[len - 1] = 0;
- }
- params = g_strsplit (s, " ", -1);
- len = g_strv_length (params);
- if (len > 0) {
- cmd = g_strstrip (params[0]);
- comp_list = g_completion_complete (comp, cmd, NULL);
- switch (g_list_length (comp_list)) {
- case 1:
- process_command ((struct controller_command *)comp_list->data, &params[1], session);
- break;
- case 0:
- msg_debug ("Unknown command: '%s'", cmd);
- i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
- bufferevent_write (bev, out_buf, i);
- break;
- default:
- msg_debug ("Ambigious command: '%s'", cmd);
- i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF);
- bufferevent_write (bev, out_buf, i);
- break;
- }
+ s = fstrcstr (in, session->session_pool);
+ params = g_strsplit (s, " ", -1);
+ len = g_strv_length (params);
+ if (len > 0) {
+ cmd = g_strstrip (params[0]);
+ comp_list = g_completion_complete (comp, cmd, NULL);
+ switch (g_list_length (comp_list)) {
+ case 1:
+ process_command ((struct controller_command *)comp_list->data, &params[1], session);
+ break;
+ case 0:
+ msg_debug ("Unknown command: '%s'", cmd);
+ i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE);
+ break;
+ default:
+ msg_debug ("Ambigious command: '%s'", cmd);
+ i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE);
+ break;
}
- if (session->state == STATE_COMMAND) {
- session->state = STATE_REPLY;
- }
- if (session->state != STATE_LEARN) {
- bufferevent_write (bev, END, sizeof (END) - 1);
- bufferevent_enable (bev, EV_WRITE);
- }
- g_strfreev (params);
}
- else {
- bufferevent_enable (bev, EV_WRITE);
- }
+ if (session->state == STATE_COMMAND) {
+ session->state = STATE_REPLY;
+ }
+ if (session->state != STATE_LEARN) {
+ rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE);
+ }
+
+ g_strfreev (params);
break;
case STATE_LEARN:
- i = bufferevent_read (bev, session->learn_buf->pos, session->learn_buf->free);
- if (i > 0) {
- session->learn_buf->pos += i;
- update_buf_size (session->learn_buf);
- if (session->learn_buf->free == 0) {
- process_learn (session);
- while ((content = get_next_text_part (session->session_pool, session->parts, &cur)) != NULL) {
- c.begin = content->data;
- c.len = content->len;
- if (!session->learn_tokenizer->tokenize_func (session->learn_tokenizer,
- session->session_pool, &c, &tokens)) {
- i = snprintf (out_buf, sizeof (out_buf), "learn fail, tokenizer error" CRLF);
- bufferevent_write (bev, out_buf, i);
- session->state = STATE_REPLY;
- return;
- }
- }
- cls_ctx = session->learn_classifier->init_func (session->session_pool);
- session->learn_classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool,
- session->learn_filename, tokens, session->in_class);
- session->worker->srv->stat->messages_learned ++;
- i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
- bufferevent_write (bev, out_buf, i);
- bufferevent_enable (bev, EV_WRITE);
-
- /* Clean learned parts */
- while ((cur = g_list_first (session->parts))) {
- session->parts = g_list_remove_link (session->parts, cur);
- p = (struct mime_part *)cur->data;
- g_byte_array_free (p->content, FALSE);
- g_list_free_1 (cur);
- }
-
+ session->learn_buf = in;
+ process_learn (session);
+ while ((content = get_next_text_part (session->session_pool, session->parts, &cur)) != NULL) {
+ c.begin = content->data;
+ c.len = content->len;
+ if (!session->learn_tokenizer->tokenize_func (session->learn_tokenizer,
+ session->session_pool, &c, &tokens)) {
+ i = snprintf (out_buf, sizeof (out_buf), "learn fail, tokenizer error" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE);
session->state = STATE_REPLY;
- break;
+ return;
}
}
- else {
- i = snprintf (out_buf, sizeof (out_buf), "read error: %d" CRLF, i);
- bufferevent_write (bev, out_buf, i);
- bufferevent_enable (bev, EV_WRITE);
- session->state = STATE_REPLY;
+ cls_ctx = session->learn_classifier->init_func (session->session_pool);
+ session->learn_classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool,
+ session->learn_filename, tokens, session->in_class);
+ session->worker->srv->stat->messages_learned ++;
+ i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE);
+
+ /* Clean learned parts */
+ while ((cur = g_list_first (session->parts))) {
+ session->parts = g_list_remove_link (session->parts, cur);
+ p = (struct mime_part *)cur->data;
+ g_byte_array_free (p->content, FALSE);
+ g_list_free_1 (cur);
}
+
+ session->state = STATE_REPLY;
break;
}
}
static void
-write_socket (struct bufferevent *bev, void *arg)
+write_socket (void *arg)
{
struct controller_session *session = (struct controller_session *)arg;
@@ -482,16 +449,15 @@ write_socket (struct bufferevent *bev, void *arg)
}
else if (session->state == STATE_REPLY) {
session->state = STATE_COMMAND;
+ rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_LINE, BUFSIZ);
}
- bufferevent_disable (bev, EV_WRITE);
- bufferevent_enable (bev, EV_READ);
}
static void
-err_socket (struct bufferevent *bev, short what, void *arg)
+err_socket (GError *err, void *arg)
{
struct controller_session *session = (struct controller_session *)arg;
- msg_info ("closing control connection");
+ msg_info ("err_socket: abnormally closing control connection, error: %s", err->message);
/* Free buffers */
free_session (session);
}
@@ -525,10 +491,11 @@ accept_socket (int fd, short what, void *arg)
new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1);
worker->srv->stat->control_connections_count ++;
- /* Read event */
- new_session->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_session);
- bufferevent_write (new_session->bev, greetingbuf, strlen (greetingbuf));
- bufferevent_enable (new_session->bev, EV_WRITE);
+ /* Set up dispatcher */
+ new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket,
+ write_socket, err_socket, &io_tv,
+ (void *)new_session);
+ rspamd_dispatcher_write (new_session->dispatcher, greetingbuf, strlen (greetingbuf), FALSE);
}
void
@@ -592,6 +559,9 @@ start_controller (struct rspamd_worker *worker)
/* Send SIGUSR2 to parent */
kill (getppid (), SIGUSR2);
+
+ io_tv.tv_sec = CONTROLLER_IO_TIMEOUT;
+ io_tv.tv_usec = 0;
event_loop (0);
}