summaryrefslogtreecommitdiffstats
path: root/src/controller.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-09-22 20:22:31 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-09-22 20:22:31 +0400
commit626a11ad9819593eadaca1e321192c75a32b51f3 (patch)
tree7f062ddf5d6ec04d7e2f4009541aa417df1cfe59 /src/controller.c
parentfe815ce580d3c455292e1acda406ddb4d371120a (diff)
downloadrspamd-626a11ad9819593eadaca1e321192c75a32b51f3.tar.gz
rspamd-626a11ad9819593eadaca1e321192c75a32b51f3.zip
* Implement new system of async events handling (experimental)
Diffstat (limited to 'src/controller.c')
-rw-r--r--src/controller.c60
1 files changed, 39 insertions, 21 deletions
diff --git a/src/controller.c b/src/controller.c
index 0aaa8bd99..dd1f4a91b 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -84,6 +84,8 @@ static time_t start_time;
static char greetingbuf[1024];
extern rspamd_hash_t *counters;
+static gboolean controller_write_socket (void *arg);
+
static
void sig_handler (int signo)
{
@@ -119,12 +121,13 @@ completion_func (gpointer elem)
}
static void
-free_session (struct controller_session *session, gboolean is_soft)
+free_session (void *ud)
{
GList *part;
struct mime_part *p;
+ struct controller_session *session = ud;
- msg_debug ("free_session: freeing session %p", session);
+ msg_info ("free_session: freeing session %p", session);
while ((part = g_list_first (session->parts))) {
session->parts = g_list_remove_link (session->parts, part);
@@ -132,13 +135,7 @@ free_session (struct controller_session *session, gboolean is_soft)
g_byte_array_free (p->content, FALSE);
g_list_free_1 (part);
}
- if (is_soft) {
- /* Plan dispatcher shutdown */
- session->dispatcher->wanna_die = 1;
- }
- else {
- rspamd_remove_dispatcher (session->dispatcher);
- }
+ rspamd_remove_dispatcher (session->dispatcher);
close (session->sock);
@@ -397,7 +394,7 @@ process_custom_command (char *line, char **cmd_args, struct controller_session *
return FALSE;
}
-static void
+static gboolean
controller_read_socket (f_str_t *in, void *arg)
{
struct controller_session *session = (struct controller_session *)arg;
@@ -426,13 +423,17 @@ controller_read_socket (f_str_t *in, void *arg)
if (!process_custom_command (cmd, &params[1], session)) {
msg_debug ("Unknown command: '%s'", cmd);
i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
- rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
+ if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
+ return FALSE;
+ }
}
break;
default:
msg_debug ("Ambigious command: '%s'", cmd);
i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF);
- rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
+ if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
+ return FALSE;
+ }
break;
}
}
@@ -440,7 +441,9 @@ controller_read_socket (f_str_t *in, void *arg)
session->state = STATE_REPLY;
}
if (session->state != STATE_LEARN && session->state != STATE_OTHER) {
- rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE, TRUE);
+ if (!rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE, TRUE)) {
+ return FALSE;
+ }
}
g_strfreev (params);
@@ -454,17 +457,17 @@ controller_read_socket (f_str_t *in, void *arg)
if (!session->learn_classifier->tokenizer->tokenize_func (session->learn_classifier->tokenizer,
session->session_pool, &c, &tokens)) {
i = snprintf (out_buf, sizeof (out_buf), "learn fail, tokenizer error" CRLF);
- rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
+ if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
+ return FALSE;
+ }
session->state = STATE_REPLY;
- return;
+ return TRUE;
}
}
cls_ctx = session->learn_classifier->classifier->init_func (session->session_pool, session->learn_classifier);
session->learn_classifier->classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool,
session->learn_symbol, tokens, session->in_class);
session->worker->srv->stat->messages_learned ++;
- i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
- rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
/* Clean learned parts */
while ((cur = g_list_first (session->parts))) {
@@ -474,6 +477,11 @@ controller_read_socket (f_str_t *in, void *arg)
g_list_free_1 (cur);
}
+ i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
+ if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
+ return FALSE;
+ }
+
session->state = STATE_REPLY;
break;
case STATE_OTHER:
@@ -489,22 +497,29 @@ controller_read_socket (f_str_t *in, void *arg)
msg_debug ("controller_read_socket: unknown state while reading %d", session->state);
break;
}
+
+ if (session->state == STATE_REPLY || session->state == STATE_QUIT) {
+ (void)controller_write_socket (session);
+ }
+
+ return TRUE;
}
-static void
+static gboolean
controller_write_socket (void *arg)
{
struct controller_session *session = (struct controller_session *)arg;
if (session->state == STATE_QUIT) {
/* Free buffers */
- free_session (session, TRUE);
- return;
+ destroy_session (session->s);
+ return FALSE;
}
else if (session->state == STATE_REPLY) {
session->state = STATE_COMMAND;
rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_LINE, BUFSIZ);
}
+ return TRUE;
}
static void
@@ -515,8 +530,9 @@ controller_err_socket (GError *err, void *arg)
if (err->code != EOF) {
msg_info ("controller_err_socket: abnormally closing control connection, error: %s", err->message);
}
+
/* Free buffers */
- free_session (session, FALSE);
+ destroy_session (session->s);
}
static void
@@ -552,6 +568,8 @@ accept_socket (int fd, short what, void *arg)
io_tv->tv_sec = CONTROLLER_IO_TIMEOUT;
io_tv->tv_usec = 0;
+ new_session->s = new_async_session (new_session->session_pool, free_session, new_session);
+
new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, controller_read_socket,
controller_write_socket, controller_err_socket, io_tv,
(void *)new_session);