diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-09-22 20:22:31 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-09-22 20:22:31 +0400 |
commit | 626a11ad9819593eadaca1e321192c75a32b51f3 (patch) | |
tree | 7f062ddf5d6ec04d7e2f4009541aa417df1cfe59 /src/controller.c | |
parent | fe815ce580d3c455292e1acda406ddb4d371120a (diff) | |
download | rspamd-626a11ad9819593eadaca1e321192c75a32b51f3.tar.gz rspamd-626a11ad9819593eadaca1e321192c75a32b51f3.zip |
* Implement new system of async events handling (experimental)
Diffstat (limited to 'src/controller.c')
-rw-r--r-- | src/controller.c | 60 |
1 files changed, 39 insertions, 21 deletions
diff --git a/src/controller.c b/src/controller.c index 0aaa8bd99..dd1f4a91b 100644 --- a/src/controller.c +++ b/src/controller.c @@ -84,6 +84,8 @@ static time_t start_time; static char greetingbuf[1024]; extern rspamd_hash_t *counters; +static gboolean controller_write_socket (void *arg); + static void sig_handler (int signo) { @@ -119,12 +121,13 @@ completion_func (gpointer elem) } static void -free_session (struct controller_session *session, gboolean is_soft) +free_session (void *ud) { GList *part; struct mime_part *p; + struct controller_session *session = ud; - msg_debug ("free_session: freeing session %p", session); + msg_info ("free_session: freeing session %p", session); while ((part = g_list_first (session->parts))) { session->parts = g_list_remove_link (session->parts, part); @@ -132,13 +135,7 @@ free_session (struct controller_session *session, gboolean is_soft) g_byte_array_free (p->content, FALSE); g_list_free_1 (part); } - if (is_soft) { - /* Plan dispatcher shutdown */ - session->dispatcher->wanna_die = 1; - } - else { - rspamd_remove_dispatcher (session->dispatcher); - } + rspamd_remove_dispatcher (session->dispatcher); close (session->sock); @@ -397,7 +394,7 @@ process_custom_command (char *line, char **cmd_args, struct controller_session * return FALSE; } -static void +static gboolean controller_read_socket (f_str_t *in, void *arg) { struct controller_session *session = (struct controller_session *)arg; @@ -426,13 +423,17 @@ controller_read_socket (f_str_t *in, void *arg) if (!process_custom_command (cmd, ¶ms[1], session)) { msg_debug ("Unknown command: '%s'", cmd); i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF); - rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE); + if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) { + return FALSE; + } } break; default: msg_debug ("Ambigious command: '%s'", cmd); i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF); - rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE); + if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) { + return FALSE; + } break; } } @@ -440,7 +441,9 @@ controller_read_socket (f_str_t *in, void *arg) session->state = STATE_REPLY; } if (session->state != STATE_LEARN && session->state != STATE_OTHER) { - rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE, TRUE); + if (!rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE, TRUE)) { + return FALSE; + } } g_strfreev (params); @@ -454,17 +457,17 @@ controller_read_socket (f_str_t *in, void *arg) if (!session->learn_classifier->tokenizer->tokenize_func (session->learn_classifier->tokenizer, session->session_pool, &c, &tokens)) { i = snprintf (out_buf, sizeof (out_buf), "learn fail, tokenizer error" CRLF); - rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE); + if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) { + return FALSE; + } session->state = STATE_REPLY; - return; + return TRUE; } } cls_ctx = session->learn_classifier->classifier->init_func (session->session_pool, session->learn_classifier); session->learn_classifier->classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool, session->learn_symbol, tokens, session->in_class); session->worker->srv->stat->messages_learned ++; - i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF); - rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE); /* Clean learned parts */ while ((cur = g_list_first (session->parts))) { @@ -474,6 +477,11 @@ controller_read_socket (f_str_t *in, void *arg) g_list_free_1 (cur); } + i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF); + if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) { + return FALSE; + } + session->state = STATE_REPLY; break; case STATE_OTHER: @@ -489,22 +497,29 @@ controller_read_socket (f_str_t *in, void *arg) msg_debug ("controller_read_socket: unknown state while reading %d", session->state); break; } + + if (session->state == STATE_REPLY || session->state == STATE_QUIT) { + (void)controller_write_socket (session); + } + + return TRUE; } -static void +static gboolean controller_write_socket (void *arg) { struct controller_session *session = (struct controller_session *)arg; if (session->state == STATE_QUIT) { /* Free buffers */ - free_session (session, TRUE); - return; + destroy_session (session->s); + return FALSE; } else if (session->state == STATE_REPLY) { session->state = STATE_COMMAND; rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_LINE, BUFSIZ); } + return TRUE; } static void @@ -515,8 +530,9 @@ controller_err_socket (GError *err, void *arg) if (err->code != EOF) { msg_info ("controller_err_socket: abnormally closing control connection, error: %s", err->message); } + /* Free buffers */ - free_session (session, FALSE); + destroy_session (session->s); } static void @@ -552,6 +568,8 @@ accept_socket (int fd, short what, void *arg) io_tv->tv_sec = CONTROLLER_IO_TIMEOUT; io_tv->tv_usec = 0; + new_session->s = new_async_session (new_session->session_pool, free_session, new_session); + new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, controller_read_socket, controller_write_socket, controller_err_socket, io_tv, (void *)new_session); |