]> source.dussan.org Git - rspamd.git/commitdiff
Fix controller stages and debug info.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 1 Aug 2011 11:32:45 +0000 (15:32 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 1 Aug 2011 11:32:45 +0000 (15:32 +0400)
Handle timeouts in librspamdclient correctly.

lib/librspamdclient.c
src/buffer.c
src/controller.c
src/worker.c

index c52a71806d5692af9009b2bda54d57155d2f069f..e871c29f6db2bd097bb6e619e2248b6b167770bd 100644 (file)
@@ -899,9 +899,12 @@ read_rspamd_reply_line (struct rspamd_connection *c, GError **err)
                }
        }
        /* Poll socket */
-       if (poll_sync_socket (c->socket, client->read_timeout, POLL_IN) == -1) {
+       if ((r = poll_sync_socket (c->socket, client->read_timeout, POLL_IN)) <= 0) {
                if (*err == NULL) {
-                       *err = g_error_new (G_RSPAMD_ERROR, errno, "Could not connect to server %s: %s",
+                       if (r == 0) {
+                               errno = ETIMEDOUT;
+                       }
+                       *err = g_error_new (G_RSPAMD_ERROR, errno, "Cannot read reply from controller %s: %s",
                                        c->server->name, strerror (errno));
                }
                upstream_fail (&c->server->up, c->connection_time);
@@ -1100,8 +1103,11 @@ rspamd_send_controller_command (struct rspamd_connection *c, const gchar *line,
        make_socket_nonblocking (c->socket);
        /* Poll socket */
        do {
-               if (poll_sync_socket (c->socket, client->read_timeout, POLL_IN) == -1) {
+               if ((r = poll_sync_socket (c->socket, client->read_timeout, POLL_IN)) <= 0) {
                        if (*err == NULL) {
+                               if (r == 0) {
+                                       errno = ETIMEDOUT;
+                               }
                                *err = g_error_new (G_RSPAMD_ERROR, errno, "Cannot read reply from controller %s: %s",
                                                c->server->name, strerror (errno));
                        }
@@ -1186,8 +1192,11 @@ rspamd_read_controller_greeting (struct rspamd_connection *c, GError **err)
        gint                            r;
        static const gchar              greeting_str[] = "Rspamd";
 
-       if (poll_sync_socket (c->socket, client->read_timeout, POLL_IN) == -1) {
+       if ((r = poll_sync_socket (c->socket, client->read_timeout, POLL_IN)) <= 0) {
                if (*err == NULL) {
+                       if (r == 0) {
+                               errno = ETIMEDOUT;
+                       }
                        *err = g_error_new (G_RSPAMD_ERROR, errno, "Cannot read reply from controller %s: %s",
                                        c->server->name, strerror (errno));
                }
index 4f9ef304ebb7bac2cea7d53a61dd7d06ea3c6865..bf357d9493d82ee80a925e8d713874a9bee7f9f7 100644 (file)
@@ -314,8 +314,8 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
                        d->in_buf->pos += r;
                        d->in_buf->data->len += r;
                }
-               debug_ip("read %z characters, policy is %s, watermark is: %z", r,
-                               d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", d->nchars);
+               debug_ip("read %z characters, policy is %s, watermark is: %z, buffer has %z bytes", r,
+                               d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", d->nchars, d->in_buf->data->len);
        }
 
        saved_policy = d->policy;
index 981eee490e825d01b75c495b880fd865b2de030c..ca9cdce7adbce87e24c9d551cdfd0c9f47d30df9 100644 (file)
@@ -911,10 +911,10 @@ controller_read_socket (f_str_t * in, void *arg)
 
                free_task (task, FALSE);
                i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF END);
+               session->state = STATE_REPLY;
                if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
                        return FALSE;
                }
-               session->state = STATE_REPLY;
                break;
        case STATE_LEARN_SPAM_PRE:
                session->learn_buf = in;
@@ -948,7 +948,7 @@ controller_read_socket (f_str_t * in, void *arg)
                        }
                }
                else if (r == 0) {
-                       session->state = STATE_LEARN;
+                       session->state = STATE_LEARN_SPAM;
                        task->dispatcher = session->dispatcher;
                        session->learn_task = task;
                        rspamd_dispatcher_pause (session->dispatcher);
@@ -1075,7 +1075,7 @@ controller_read_socket (f_str_t * in, void *arg)
        }
 
        if (session->state == STATE_REPLY || session->state == STATE_QUIT) {
-               return controller_write_socket (session);
+               rspamd_dispatcher_restore (session->dispatcher);
        }
 
        return TRUE;
@@ -1094,7 +1094,7 @@ controller_write_socket (void *arg)
                destroy_session (session->s);
                return FALSE;
        }
-       else if (session->state == STATE_LEARN) {
+       else if (session->state == STATE_LEARN_SPAM) {
                /* Perform actual learn here */
                if (! learn_task_spam (session->learn_classifier, session->learn_task, session->in_class, &err)) {
                        if (err) {
@@ -1114,6 +1114,7 @@ controller_write_socket (void *arg)
                if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
                        return FALSE;
                }
+               return TRUE;
        }
        else if (session->state == STATE_REPLY) {
                session->state = STATE_COMMAND;
@@ -1140,16 +1141,16 @@ static void
 accept_socket (gint fd, short what, void *arg)
 {
        struct rspamd_worker           *worker = (struct rspamd_worker *)arg;
-       struct sockaddr_storage         ss;
+       union sa_union                  su;
        struct controller_session      *new_session;
        struct timeval                 *io_tv;
-       socklen_t                       addrlen = sizeof (ss);
+       socklen_t                       addrlen = sizeof (su.ss);
        gint                            nfd;
        struct rspamd_controller_ctx   *ctx;
 
        ctx = worker->ctx;
 
-       if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
+       if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
                return;
        }
 
@@ -1175,6 +1176,16 @@ accept_socket (gint fd, short what, void *arg)
        new_session->s = new_async_session (new_session->session_pool, free_session, new_session);
 
        new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, controller_read_socket, controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
+       if (su.ss.ss_family == AF_UNIX) {
+               msg_info ("accepted connection from unix socket");
+               new_session->dispatcher->peer_addr = INADDR_LOOPBACK;
+       }
+       else if (su.ss.ss_family == AF_INET) {
+               msg_info ("accepted connection from %s port %d",
+                               inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port));
+               memcpy (&new_session->dispatcher->peer_addr, &su.s4.sin_addr,
+                               sizeof (guint32));
+       }
        if (! rspamd_dispatcher_write (new_session->dispatcher, greetingbuf, strlen (greetingbuf), FALSE, FALSE)) {
                msg_warn ("cannot write greeting");
        }
index b919ad407ee9aa530169115896948be433a36611..16e33ed9e92b948c6ee1f759bbc789097545df45 100644 (file)
@@ -272,6 +272,7 @@ read_socket (f_str_t * in, void *arg)
                task->msg->begin = in->begin;
                task->msg->len = in->len;
                debug_task ("got string of length %z", task->msg->len);
+               task->state = WAIT_FILTER;
                r = process_message (task);
                if (r == -1) {
                        msg_warn ("processing of message failed");
@@ -323,6 +324,9 @@ read_socket (f_str_t * in, void *arg)
        case WRITE_ERROR:
                return write_socket (task);
                break;
+       case WAIT_FILTER:
+               msg_info ("ignoring trailing garbadge of size %z", in->len);
+               break;
        default:
                debug_task ("invalid state on reading stage");
                break;