summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-08-01 15:32:45 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-08-01 15:32:45 +0400
commit643576fcff380140d714e6f9711c402246f16a28 (patch)
treeafff9f26fa71cc67c5163009c6fbbf3abf822493
parent0c6e9d1f0bc8e010d30c27af1635513adfa9a6dd (diff)
downloadrspamd-643576fcff380140d714e6f9711c402246f16a28.tar.gz
rspamd-643576fcff380140d714e6f9711c402246f16a28.zip
Fix controller stages and debug info.
Handle timeouts in librspamdclient correctly.
-rw-r--r--lib/librspamdclient.c17
-rw-r--r--src/buffer.c4
-rw-r--r--src/controller.c25
-rw-r--r--src/worker.c4
4 files changed, 37 insertions, 13 deletions
diff --git a/lib/librspamdclient.c b/lib/librspamdclient.c
index c52a71806..e871c29f6 100644
--- a/lib/librspamdclient.c
+++ b/lib/librspamdclient.c
@@ -899,9 +899,12 @@ read_rspamd_reply_line (struct rspamd_connection *c, GError **err)
}
}
/* Poll socket */
- if (poll_sync_socket (c->socket, client->read_timeout, POLL_IN) == -1) {
+ if ((r = poll_sync_socket (c->socket, client->read_timeout, POLL_IN)) <= 0) {
if (*err == NULL) {
- *err = g_error_new (G_RSPAMD_ERROR, errno, "Could not connect to server %s: %s",
+ if (r == 0) {
+ errno = ETIMEDOUT;
+ }
+ *err = g_error_new (G_RSPAMD_ERROR, errno, "Cannot read reply from controller %s: %s",
c->server->name, strerror (errno));
}
upstream_fail (&c->server->up, c->connection_time);
@@ -1100,8 +1103,11 @@ rspamd_send_controller_command (struct rspamd_connection *c, const gchar *line,
make_socket_nonblocking (c->socket);
/* Poll socket */
do {
- if (poll_sync_socket (c->socket, client->read_timeout, POLL_IN) == -1) {
+ if ((r = poll_sync_socket (c->socket, client->read_timeout, POLL_IN)) <= 0) {
if (*err == NULL) {
+ if (r == 0) {
+ errno = ETIMEDOUT;
+ }
*err = g_error_new (G_RSPAMD_ERROR, errno, "Cannot read reply from controller %s: %s",
c->server->name, strerror (errno));
}
@@ -1186,8 +1192,11 @@ rspamd_read_controller_greeting (struct rspamd_connection *c, GError **err)
gint r;
static const gchar greeting_str[] = "Rspamd";
- if (poll_sync_socket (c->socket, client->read_timeout, POLL_IN) == -1) {
+ if ((r = poll_sync_socket (c->socket, client->read_timeout, POLL_IN)) <= 0) {
if (*err == NULL) {
+ if (r == 0) {
+ errno = ETIMEDOUT;
+ }
*err = g_error_new (G_RSPAMD_ERROR, errno, "Cannot read reply from controller %s: %s",
c->server->name, strerror (errno));
}
diff --git a/src/buffer.c b/src/buffer.c
index 4f9ef304e..bf357d949 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -314,8 +314,8 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
d->in_buf->pos += r;
d->in_buf->data->len += r;
}
- debug_ip("read %z characters, policy is %s, watermark is: %z", r,
- d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", d->nchars);
+ debug_ip("read %z characters, policy is %s, watermark is: %z, buffer has %z bytes", r,
+ d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", d->nchars, d->in_buf->data->len);
}
saved_policy = d->policy;
diff --git a/src/controller.c b/src/controller.c
index 981eee490..ca9cdce7a 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -911,10 +911,10 @@ controller_read_socket (f_str_t * in, void *arg)
free_task (task, FALSE);
i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF END);
+ session->state = STATE_REPLY;
if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
return FALSE;
}
- session->state = STATE_REPLY;
break;
case STATE_LEARN_SPAM_PRE:
session->learn_buf = in;
@@ -948,7 +948,7 @@ controller_read_socket (f_str_t * in, void *arg)
}
}
else if (r == 0) {
- session->state = STATE_LEARN;
+ session->state = STATE_LEARN_SPAM;
task->dispatcher = session->dispatcher;
session->learn_task = task;
rspamd_dispatcher_pause (session->dispatcher);
@@ -1075,7 +1075,7 @@ controller_read_socket (f_str_t * in, void *arg)
}
if (session->state == STATE_REPLY || session->state == STATE_QUIT) {
- return controller_write_socket (session);
+ rspamd_dispatcher_restore (session->dispatcher);
}
return TRUE;
@@ -1094,7 +1094,7 @@ controller_write_socket (void *arg)
destroy_session (session->s);
return FALSE;
}
- else if (session->state == STATE_LEARN) {
+ else if (session->state == STATE_LEARN_SPAM) {
/* Perform actual learn here */
if (! learn_task_spam (session->learn_classifier, session->learn_task, session->in_class, &err)) {
if (err) {
@@ -1114,6 +1114,7 @@ controller_write_socket (void *arg)
if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
return FALSE;
}
+ return TRUE;
}
else if (session->state == STATE_REPLY) {
session->state = STATE_COMMAND;
@@ -1140,16 +1141,16 @@ static void
accept_socket (gint fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
- struct sockaddr_storage ss;
+ union sa_union su;
struct controller_session *new_session;
struct timeval *io_tv;
- socklen_t addrlen = sizeof (ss);
+ socklen_t addrlen = sizeof (su.ss);
gint nfd;
struct rspamd_controller_ctx *ctx;
ctx = worker->ctx;
- if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
+ if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
return;
}
@@ -1175,6 +1176,16 @@ accept_socket (gint fd, short what, void *arg)
new_session->s = new_async_session (new_session->session_pool, free_session, new_session);
new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, controller_read_socket, controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
+ if (su.ss.ss_family == AF_UNIX) {
+ msg_info ("accepted connection from unix socket");
+ new_session->dispatcher->peer_addr = INADDR_LOOPBACK;
+ }
+ else if (su.ss.ss_family == AF_INET) {
+ msg_info ("accepted connection from %s port %d",
+ inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port));
+ memcpy (&new_session->dispatcher->peer_addr, &su.s4.sin_addr,
+ sizeof (guint32));
+ }
if (! rspamd_dispatcher_write (new_session->dispatcher, greetingbuf, strlen (greetingbuf), FALSE, FALSE)) {
msg_warn ("cannot write greeting");
}
diff --git a/src/worker.c b/src/worker.c
index b919ad407..16e33ed9e 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -272,6 +272,7 @@ read_socket (f_str_t * in, void *arg)
task->msg->begin = in->begin;
task->msg->len = in->len;
debug_task ("got string of length %z", task->msg->len);
+ task->state = WAIT_FILTER;
r = process_message (task);
if (r == -1) {
msg_warn ("processing of message failed");
@@ -323,6 +324,9 @@ read_socket (f_str_t * in, void *arg)
case WRITE_ERROR:
return write_socket (task);
break;
+ case WAIT_FILTER:
+ msg_info ("ignoring trailing garbadge of size %z", in->len);
+ break;
default:
debug_task ("invalid state on reading stage");
break;