diff options
-rw-r--r-- | lib/librspamdclient.c | 17 | ||||
-rw-r--r-- | src/buffer.c | 4 | ||||
-rw-r--r-- | src/controller.c | 25 | ||||
-rw-r--r-- | src/worker.c | 4 |
4 files changed, 37 insertions, 13 deletions
diff --git a/lib/librspamdclient.c b/lib/librspamdclient.c index c52a71806..e871c29f6 100644 --- a/lib/librspamdclient.c +++ b/lib/librspamdclient.c @@ -899,9 +899,12 @@ read_rspamd_reply_line (struct rspamd_connection *c, GError **err) } } /* Poll socket */ - if (poll_sync_socket (c->socket, client->read_timeout, POLL_IN) == -1) { + if ((r = poll_sync_socket (c->socket, client->read_timeout, POLL_IN)) <= 0) { if (*err == NULL) { - *err = g_error_new (G_RSPAMD_ERROR, errno, "Could not connect to server %s: %s", + if (r == 0) { + errno = ETIMEDOUT; + } + *err = g_error_new (G_RSPAMD_ERROR, errno, "Cannot read reply from controller %s: %s", c->server->name, strerror (errno)); } upstream_fail (&c->server->up, c->connection_time); @@ -1100,8 +1103,11 @@ rspamd_send_controller_command (struct rspamd_connection *c, const gchar *line, make_socket_nonblocking (c->socket); /* Poll socket */ do { - if (poll_sync_socket (c->socket, client->read_timeout, POLL_IN) == -1) { + if ((r = poll_sync_socket (c->socket, client->read_timeout, POLL_IN)) <= 0) { if (*err == NULL) { + if (r == 0) { + errno = ETIMEDOUT; + } *err = g_error_new (G_RSPAMD_ERROR, errno, "Cannot read reply from controller %s: %s", c->server->name, strerror (errno)); } @@ -1186,8 +1192,11 @@ rspamd_read_controller_greeting (struct rspamd_connection *c, GError **err) gint r; static const gchar greeting_str[] = "Rspamd"; - if (poll_sync_socket (c->socket, client->read_timeout, POLL_IN) == -1) { + if ((r = poll_sync_socket (c->socket, client->read_timeout, POLL_IN)) <= 0) { if (*err == NULL) { + if (r == 0) { + errno = ETIMEDOUT; + } *err = g_error_new (G_RSPAMD_ERROR, errno, "Cannot read reply from controller %s: %s", c->server->name, strerror (errno)); } diff --git a/src/buffer.c b/src/buffer.c index 4f9ef304e..bf357d949 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -314,8 +314,8 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) d->in_buf->pos += r; d->in_buf->data->len += r; } - debug_ip("read %z characters, policy is %s, watermark is: %z", r, - d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", d->nchars); + debug_ip("read %z characters, policy is %s, watermark is: %z, buffer has %z bytes", r, + d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", d->nchars, d->in_buf->data->len); } saved_policy = d->policy; diff --git a/src/controller.c b/src/controller.c index 981eee490..ca9cdce7a 100644 --- a/src/controller.c +++ b/src/controller.c @@ -911,10 +911,10 @@ controller_read_socket (f_str_t * in, void *arg) free_task (task, FALSE); i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF END); + session->state = STATE_REPLY; if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) { return FALSE; } - session->state = STATE_REPLY; break; case STATE_LEARN_SPAM_PRE: session->learn_buf = in; @@ -948,7 +948,7 @@ controller_read_socket (f_str_t * in, void *arg) } } else if (r == 0) { - session->state = STATE_LEARN; + session->state = STATE_LEARN_SPAM; task->dispatcher = session->dispatcher; session->learn_task = task; rspamd_dispatcher_pause (session->dispatcher); @@ -1075,7 +1075,7 @@ controller_read_socket (f_str_t * in, void *arg) } if (session->state == STATE_REPLY || session->state == STATE_QUIT) { - return controller_write_socket (session); + rspamd_dispatcher_restore (session->dispatcher); } return TRUE; @@ -1094,7 +1094,7 @@ controller_write_socket (void *arg) destroy_session (session->s); return FALSE; } - else if (session->state == STATE_LEARN) { + else if (session->state == STATE_LEARN_SPAM) { /* Perform actual learn here */ if (! learn_task_spam (session->learn_classifier, session->learn_task, session->in_class, &err)) { if (err) { @@ -1114,6 +1114,7 @@ controller_write_socket (void *arg) if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) { return FALSE; } + return TRUE; } else if (session->state == STATE_REPLY) { session->state = STATE_COMMAND; @@ -1140,16 +1141,16 @@ static void accept_socket (gint fd, short what, void *arg) { struct rspamd_worker *worker = (struct rspamd_worker *)arg; - struct sockaddr_storage ss; + union sa_union su; struct controller_session *new_session; struct timeval *io_tv; - socklen_t addrlen = sizeof (ss); + socklen_t addrlen = sizeof (su.ss); gint nfd; struct rspamd_controller_ctx *ctx; ctx = worker->ctx; - if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { + if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { return; } @@ -1175,6 +1176,16 @@ accept_socket (gint fd, short what, void *arg) new_session->s = new_async_session (new_session->session_pool, free_session, new_session); new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, controller_read_socket, controller_write_socket, controller_err_socket, io_tv, (void *)new_session); + if (su.ss.ss_family == AF_UNIX) { + msg_info ("accepted connection from unix socket"); + new_session->dispatcher->peer_addr = INADDR_LOOPBACK; + } + else if (su.ss.ss_family == AF_INET) { + msg_info ("accepted connection from %s port %d", + inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port)); + memcpy (&new_session->dispatcher->peer_addr, &su.s4.sin_addr, + sizeof (guint32)); + } if (! rspamd_dispatcher_write (new_session->dispatcher, greetingbuf, strlen (greetingbuf), FALSE, FALSE)) { msg_warn ("cannot write greeting"); } diff --git a/src/worker.c b/src/worker.c index b919ad407..16e33ed9e 100644 --- a/src/worker.c +++ b/src/worker.c @@ -272,6 +272,7 @@ read_socket (f_str_t * in, void *arg) task->msg->begin = in->begin; task->msg->len = in->len; debug_task ("got string of length %z", task->msg->len); + task->state = WAIT_FILTER; r = process_message (task); if (r == -1) { msg_warn ("processing of message failed"); @@ -323,6 +324,9 @@ read_socket (f_str_t * in, void *arg) case WRITE_ERROR: return write_socket (task); break; + case WAIT_FILTER: + msg_info ("ignoring trailing garbadge of size %z", in->len); + break; default: debug_task ("invalid state on reading stage"); break; |