]> source.dussan.org Git - rspamd.git/commitdiff
* Stylify and fix util functions that was gathered from libutil code (memory allocation)
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 24 Oct 2008 15:32:39 +0000 (19:32 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 24 Oct 2008 15:32:39 +0000 (19:32 +0400)
* Fix protocol and network functions
* Fix some memory allocation issues

cfg_utils.c
controller.c
main.c
main.h
plugins/surbl.c
protocol.c
protocol.h
util.c
worker.c

index 45442fe62e31d6fe281330a6eedd893d010d54cf..9fa7aac4752aa8f383a37fa4e38fe63d3ff07770 100644 (file)
@@ -358,6 +358,10 @@ parse_filters_str (struct config_file *cfg, const char *str, enum script_type ty
        gchar **strvec, **p;
        struct filter *cur;
        int i;
+       
+       if (str == NULL) {
+               return; 
+       }
 
        strvec = g_strsplit (str, ",", 0);
        if (strvec == NULL) {
index 0af9d1d2e45d635fff0a9a96686b9921641bf0d2..3f4f9b358d77a33da055b56fc7db103e17015dc4 100644 (file)
@@ -280,6 +280,7 @@ accept_socket (int fd, short what, void *arg)
        }
        bzero (new_session, sizeof (struct controller_session));
        new_session->worker = worker;
+       new_session->sock = nfd;
        new_session->cfg = worker->srv->cfg;
 #ifdef HAVE_GETPAGESIZE
        new_session->session_pool = memory_pool_new (getpagesize () - 1);
diff --git a/main.c b/main.c
index d7ecb418b0320819ed5d5758045f7a57d3f2e65f..a886620852c3b49e9b7573cc5e16863aeab5d9e3 100644 (file)
--- a/main.c
+++ b/main.c
@@ -220,6 +220,7 @@ main (int argc, char **argv)
                rspamd->cfg->temp_dir = memory_pool_strdup (rspamd->cfg->cfg_pool, getenv ("TMPDIR"));
 
                if (!rspamd->cfg->temp_dir) {
+                       msg_warn ("$TMPDIR is empty too, using /tmp as default");
                rspamd->cfg->temp_dir = memory_pool_strdup (rspamd->cfg->cfg_pool, "/tmp");
                }
     }
diff --git a/main.h b/main.h
index de8969de8c50257479c2c9ad8a7118a1dcb1a0e6..2da935546f21298a4541de42046337b0386fbf74 100644 (file)
--- a/main.h
+++ b/main.h
@@ -109,6 +109,7 @@ struct save_point {
 /* Control session */
 struct controller_session {
        struct rspamd_worker *worker;
+       int sock;
        /* Access to authorized commands */
        int authorized;
        memory_pool_t *session_pool;
@@ -126,10 +127,12 @@ struct worker_task {
                WRITE_REPLY,
                WRITE_ERROR,
                WAIT_FILTER,
+               CLOSING_CONNECTION,
        } state;
        size_t content_length;
        enum rspamd_protocol proto;
        enum rspamd_command cmd;
+       int sock;
        char *helo;
        char *from;
        char *rcpt;
index 23c258326d4f306fcbfec385aa29d4f7cd181b66..c90a8419e01c820c31c17375a1ccc91eb2c0e025 100644 (file)
@@ -105,8 +105,6 @@ surbl_module_init (struct config_file *cfg, struct module_ctx **ctx)
 
        *ctx = (struct module_ctx *)surbl_module_ctx;
 
-       evdns_init ();
-
        return 0;
 }
 
@@ -117,6 +115,8 @@ surbl_module_config (struct config_file *cfg)
 
        char *value, *cur_tok, *str;
 
+       evdns_init ();
+
        if ((value = get_module_opt (cfg, "surbl", "redirector")) != NULL) {
                str = memory_pool_strdup (surbl_module_ctx->surbl_pool, value);
                cur_tok = strsep (&str, ":");
index 0881b9788712a4ca2aea9c62106afc5b125a27c3..94cb2f824d84ef1a11f1e1697f33a4c0404c71c7 100644 (file)
@@ -73,6 +73,7 @@ parse_command (struct worker_task *task, char *line)
 
        token = strsep (&line, " ");
        if (line == NULL || token == NULL) {
+               msg_debug ("parse_command: bad comand: %s", token);
                return -1;
        }
 
@@ -153,14 +154,27 @@ static int
 parse_header (struct worker_task *task, char *line)
 {
        char *headern, *err;
-       headern = strsep (&line, ":");
 
        /* Check end of headers */
-       if (*line == '\r' && *(line + 1) == '\n') {
-               task->state = READ_MESSAGE;
+       if (*line == '\0') {
+               if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) {
+                       task->state = WRITE_REPLY;
+               }
+               else {
+                       if (task->content_length > 0) {
+                               task->state = READ_MESSAGE;
+                       }
+                       else {
+                               task->last_error = "Unknown content length";
+                               task->error_code = RSPAMD_LENGTH_ERROR;
+                               task->state = WRITE_ERROR;
+                       }
+               }
                return 0;
        }
 
+       headern = strsep (&line, ":");
+
        if (line == NULL || headern == NULL) {
                return -1;
        }
@@ -433,14 +447,17 @@ write_reply (struct worker_task *task)
        int r;
        char outbuf[OUTBUFSIZ];
 
+       msg_debug ("write_reply: writing reply to client");
        if (task->error_code != 0) {
                /* Write error message and error code to reply */
                if (task->proto == SPAMC_PROTO) {
                        r = snprintf (outbuf, sizeof (outbuf), "%s %d %s" CRLF CRLF, SPAMD_REPLY_BANNER, task->error_code, SPAMD_ERROR);
+                       msg_debug ("write_reply: writing error: %s", outbuf);
                }
                else {
                        r = snprintf (outbuf, sizeof (outbuf), "%s %d %s" CRLF "%s: %s" CRLF CRLF, RSPAMD_REPLY_BANNER, task->error_code, 
                                                                SPAMD_ERROR, ERROR_HEADER, task->last_error);
+                       msg_debug ("write_reply: writing error: %s", outbuf);
                }
                /* Write to bufferevent error message */
                bufferevent_write (task->bev, outbuf, r);
@@ -459,12 +476,12 @@ write_reply (struct worker_task *task)
                                return write_process_reply (task);
                                break;
                        case CMD_SKIP:
-                               r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER,
+                               r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER,
                                                                                                                                SPAMD_OK);
                                bufferevent_write (task->bev, outbuf, r);
                                break;
                        case CMD_PING:
-                               r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER);
+                               r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER);
                                bufferevent_write (task->bev, outbuf, r);
                                break;
                }
index 50b458de0ab076aa136197bd0fc5d7a3b384032a..6c750e91fd1a8262551a64534cf869b4ecb72b8b 100644 (file)
@@ -6,6 +6,7 @@
 #define RSPAMD_FILTER_ERROR 1
 #define RSPAMD_NETWORK_ERROR 2
 #define RSPAMD_PROTOCOL_ERROR 3
+#define RSPAMD_LENGTH_ERROR 4
 
 struct worker_task;
 
diff --git a/util.c b/util.c
index 0bd7f91c8edebd465af7b2fa09f30dc4b816283e..b0d40daf0a4201ac40c2ebf9539c80dd9ff89a1e 100644 (file)
--- a/util.c
+++ b/util.c
@@ -77,18 +77,18 @@ make_socket (const char *address, u_short port)
        char strport[NI_MAXSERV];
        int ai_result;
 
-       memset(&ai, 0, sizeof (ai));
+       memset (&ai, 0, sizeof (ai));
        ai.ai_family = AF_INET;
        ai.ai_socktype = SOCK_STREAM;
        ai.ai_flags = AI_PASSIVE;
-       snprintf(strport, sizeof (strport), "%d", port);
-       if ((ai_result = getaddrinfo(address, strport, &ai, &aitop)) != 0) {
+       snprintf (strport, sizeof (strport), "%d", port);
+       if ((ai_result = getaddrinfo (address, strport, &ai, &aitop)) != 0) {
                return (-1);
        }
 
-       fd = make_socket_ai(aitop);
+       fd = make_socket_ai (aitop);
 
-       freeaddrinfo(aitop);
+       freeaddrinfo (aitop);
 
        return (fd);
 }
@@ -129,8 +129,7 @@ read_cmd_line (int argc, char **argv, struct config_file *cfg)
                 break;
             case 'c':
                 if (optarg && cfg->cfg_name) {
-                    free (cfg->cfg_name);
-                    cfg->cfg_name = strdup (optarg);
+                    cfg->cfg_name = memory_pool_strdup (cfg->cfg_pool, optarg);
                 }
                 break;
             case 'h':
@@ -211,7 +210,7 @@ static size_t title_buffer_size = 0;
 static char *title_progname, *title_progname_full;
 
 int
-setproctitle(const char *fmt, ...)
+setproctitle (const char *fmt, ...)
 {
        if (!title_buffer || !title_buffer_size) {
                errno = ENOMEM;
@@ -258,7 +257,7 @@ setproctitle(const char *fmt, ...)
 */
 
 int
-init_title(int argc, char *argv[], char *envp[])
+init_title (int argc, char *argv[], char *envp[])
 {
        char   *begin_of_buffer = 0, *end_of_buffer = 0;
        int     i;
@@ -286,13 +285,13 @@ init_title(int argc, char *argv[], char *envp[])
                return 0;
 
        for (i = 0; envp[i]; ++i) {
-               if (!(new_environ[i] = strdup (envp[i])))
+               if (!(new_environ[i] = g_strdup (envp[i])))
                        goto cleanup_enomem;
        }
        new_environ[i] = 0;
 
        if (program_invocation_name) {
-               title_progname_full = strdup (program_invocation_name);
+               title_progname_full = g_strdup (program_invocation_name);
 
                if (!title_progname_full)
                        goto cleanup_enomem;
@@ -316,19 +315,19 @@ init_title(int argc, char *argv[], char *envp[])
 
     cleanup_enomem:
        for (--i; i >= 0; --i) {
-               free(new_environ[i]);
+               g_free (new_environ[i]);
        }
-       free(new_environ);
+       g_free (new_environ);
        return 0;
 }
 #endif
 
 #ifndef HAVE_PIDFILE
 extern char * __progname;
-static int _pidfile_remove(struct pidfh *pfh, int freeit);
+static int _pidfile_remove (struct pidfh *pfh, int freeit);
 
 static int
-pidfile_verify(struct pidfh *pfh)
+pidfile_verify (struct pidfh *pfh)
 {
        struct stat sb;
 
@@ -337,61 +336,61 @@ pidfile_verify(struct pidfh *pfh)
        /*
         * Check remembered descriptor.
         */
-       if (fstat(pfh->pf_fd, &sb) == -1)
+       if (fstat (pfh->pf_fd, &sb) == -1)
                return (errno);
        if (sb.st_dev != pfh->pf_dev || sb.st_ino != pfh->pf_ino)
-               return (-1);
-       return (0);
+               return -1;
+       return 0;
 }
 
 static int
-pidfile_read(const char *path, pid_t *pidptr)
+pidfile_read (const char *path, pid_t *pidptr)
 {
        char buf[16], *endptr;
        int error, fd, i;
 
-       fd = open(path, O_RDONLY);
+       fd = open (path, O_RDONLY);
        if (fd == -1)
                return (errno);
 
-       i = read(fd, buf, sizeof(buf) - 1);
+       i = read (fd, buf, sizeof(buf) - 1);
        error = errno;  /* Remember errno in case close() wants to change it. */
-       close(fd);
+       close (fd);
        if (i == -1)
-               return (error);
+               return error;
        else if (i == 0)
-               return (EAGAIN);
+               return EAGAIN;
        buf[i] = '\0';
 
-       *pidptr = strtol(buf, &endptr, 10);
+       *pidptr = strtol (buf, &endptr, 10);
        if (endptr != &buf[i])
-               return (EINVAL);
+               return EINVAL;
 
-       return (0);
+       return 0;
 }
 
 struct pidfh *
-pidfile_open(const char *path, mode_t mode, pid_t *pidptr)
+pidfile_open (const char *path, mode_t mode, pid_t *pidptr)
 {
        struct pidfh *pfh;
        struct stat sb;
        int error, fd, len, count;
        struct timespec rqtp;
 
-       pfh = g_malloc(sizeof(*pfh));
+       pfh = g_malloc (sizeof(*pfh));
        if (pfh == NULL)
-               return (NULL);
+               return NULL;
 
        if (path == NULL)
-               len = snprintf(pfh->pf_path, sizeof(pfh->pf_path),
+               len = snprintf (pfh->pf_path, sizeof(pfh->pf_path),
                    "/var/run/%s.pid", __progname);
        else
-               len = snprintf(pfh->pf_path, sizeof(pfh->pf_path),
+               len = snprintf (pfh->pf_path, sizeof(pfh->pf_path),
                    "%s", path);
-       if (len >= (int)sizeof(pfh->pf_path)) {
-               free(pfh);
+       if (len >= (int)sizeof (pfh->pf_path)) {
+               g_free (pfh);
                errno = ENAMETOOLONG;
-               return (NULL);
+               return NULL;
        }
 
        /*
@@ -400,7 +399,7 @@ pidfile_open(const char *path, mode_t mode, pid_t *pidptr)
         * PID file will be truncated again in pidfile_write(), so
         * pidfile_write() can be called multiple times.
         */
-       fd = open(pfh->pf_path,
+       fd = open (pfh->pf_path,
            O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, mode);
        flock (fd, LOCK_EX | LOCK_NB);
        if (fd == -1) {
@@ -409,41 +408,41 @@ pidfile_open(const char *path, mode_t mode, pid_t *pidptr)
                rqtp.tv_nsec = 5000000;
                if (errno == EWOULDBLOCK && pidptr != NULL) {
                again:
-                       errno = pidfile_read(pfh->pf_path, pidptr);
+                       errno = pidfile_read (pfh->pf_path, pidptr);
                        if (errno == 0)
                                errno = EEXIST;
                        else if (errno == EAGAIN) {
                                if (++count <= 3) {
-                                       nanosleep(&rqtp, 0);
+                                       nanosleep (&rqtp, 0);
                                        goto again;
                                }
                        }
                }
-               free(pfh);
-               return (NULL);
+               g_free (pfh);
+               return NULL;
        }
        /*
         * Remember file information, so in pidfile_write() we are sure we write
         * to the proper descriptor.
         */
-       if (fstat(fd, &sb) == -1) {
+       if (fstat (fd, &sb) == -1) {
                error = errno;
-               unlink(pfh->pf_path);
-               close(fd);
-               free(pfh);
+               unlink (pfh->pf_path);
+               close (fd);
+               g_free (pfh);
                errno = error;
-               return (NULL);
+               return NULL;
        }
 
        pfh->pf_fd = fd;
        pfh->pf_dev = sb.st_dev;
        pfh->pf_ino = sb.st_ino;
 
-       return (pfh);
+       return pfh;
 }
 
 int
-pidfile_write(struct pidfh *pfh)
+pidfile_write (struct pidfh *pfh)
 {
        char pidstr[16];
        int error, fd;
@@ -452,94 +451,94 @@ pidfile_write(struct pidfh *pfh)
         * Check remembered descriptor, so we don't overwrite some other
         * file if pidfile was closed and descriptor reused.
         */
-       errno = pidfile_verify(pfh);
+       errno = pidfile_verify (pfh);
        if (errno != 0) {
                /*
                 * Don't close descriptor, because we are not sure if it's ours.
                 */
-               return (-1);
+               return -1;
        }
        fd = pfh->pf_fd;
 
        /*
         * Truncate PID file, so multiple calls of pidfile_write() are allowed.
         */
-       if (ftruncate(fd, 0) == -1) {
+       if (ftruncate (fd, 0) == -1) {
                error = errno;
-               _pidfile_remove(pfh, 0);
+               _pidfile_remove (pfh, 0);
                errno = error;
-               return (-1);
+               return -1;
        }
 
-       snprintf(pidstr, sizeof(pidstr), "%u", getpid());
-       if (pwrite(fd, pidstr, strlen(pidstr), 0) != (ssize_t)strlen(pidstr)) {
+       snprintf (pidstr, sizeof(pidstr), "%u", getpid ());
+       if (pwrite (fd, pidstr, strlen (pidstr), 0) != (ssize_t)strlen (pidstr)) {
                error = errno;
-               _pidfile_remove(pfh, 0);
+               _pidfile_remove (pfh, 0);
                errno = error;
-               return (-1);
+               return -1;
        }
 
-       return (0);
+       return 0;
 }
 
 int
-pidfile_close(struct pidfh *pfh)
+pidfile_close (struct pidfh *pfh)
 {
        int error;
 
-       error = pidfile_verify(pfh);
+       error = pidfile_verify (pfh);
        if (error != 0) {
                errno = error;
-               return (-1);
+               return -1;
        }
 
-       if (close(pfh->pf_fd) == -1)
+       if (close (pfh->pf_fd) == -1)
                error = errno;
-       free(pfh);
+       g_free (pfh);
        if (error != 0) {
                errno = error;
-               return (-1);
+               return -1;
        }
-       return (0);
+       return 0;
 }
 
 static int
-_pidfile_remove(struct pidfh *pfh, int freeit)
+_pidfile_remove (struct pidfh *pfh, int freeit)
 {
        int error;
 
-       error = pidfile_verify(pfh);
+       error = pidfile_verify (pfh);
        if (error != 0) {
                errno = error;
-               return (-1);
+               return -1;
        }
 
-       if (unlink(pfh->pf_path) == -1)
+       if (unlink (pfh->pf_path) == -1)
                error = errno;
-       if (flock(pfh->pf_fd, LOCK_UN) == -1) {
+       if (flock (pfh->pf_fd, LOCK_UN) == -1) {
                if (error == 0)
                        error = errno;
        }
-       if (close(pfh->pf_fd) == -1) {
+       if (close (pfh->pf_fd) == -1) {
                if (error == 0)
                        error = errno;
        }
        if (freeit)
-               free(pfh);
+               g_free (pfh);
        else
                pfh->pf_fd = -1;
        if (error != 0) {
                errno = error;
-               return (-1);
+               return -1;
        }
-       return (0);
+       return 0;
 }
 
 int
-pidfile_remove(struct pidfh *pfh)
+pidfile_remove (struct pidfh *pfh)
 {
 
-       return (_pidfile_remove(pfh, 1));
+       return (_pidfile_remove (pfh, 1));
 }
 #endif
 
index 089f1ecfab4da5c3f43c8e2f14134f70c87c3844..e72e3fd0e8d0fdc2c081ad6d550c2074df8656d1 100644 (file)
--- a/worker.c
+++ b/worker.c
@@ -80,6 +80,9 @@ free_task (struct worker_task *task)
                        TAILQ_REMOVE (&task->parts, part, next);
                }
                memory_pool_delete (task->task_pool);
+               bufferevent_disable (task->bev, EV_READ | EV_WRITE);
+               bufferevent_free (task->bev);
+               close (task->sock);
                g_free (task);
        }
 }
@@ -195,6 +198,10 @@ read_socket (struct bufferevent *bev, void *arg)
                                task->error_code = RSPAMD_NETWORK_ERROR;
                                task->state = WRITE_ERROR;
                        }
+                       if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) {
+                               bufferevent_enable (bev, EV_WRITE);
+                               bufferevent_disable (bev, EV_READ);
+                       }
                        free (s);
                        break;
                case READ_MESSAGE:
@@ -213,6 +220,10 @@ read_socket (struct bufferevent *bev, void *arg)
                                                task->state = WAIT_FILTER;
                                        }
                                }
+                               if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) {
+                                       bufferevent_enable (bev, EV_WRITE);
+                                       bufferevent_disable (bev, EV_READ);
+                               }
                        }
                        else {
                                msg_err ("read_socket: cannot read data to buffer: %ld", (long int)r);
@@ -224,30 +235,33 @@ read_socket (struct bufferevent *bev, void *arg)
                case WAIT_FILTER:
                        bufferevent_disable (bev, EV_READ);
                        break;
+       }
+}
+
+static void
+write_socket (struct bufferevent *bev, void *arg)
+{
+       struct worker_task *task = (struct worker_task *)arg;
+       
+       switch (task->state) {
                case WRITE_REPLY:
                        write_reply (task);
+                       task->state = CLOSING_CONNECTION;
                        bufferevent_disable (bev, EV_READ);
-                       bufferevent_enable (bev, EV_WRITE);
                        break;
                case WRITE_ERROR:
                        write_reply (task);
+                       task->state = CLOSING_CONNECTION;
                        bufferevent_disable (bev, EV_READ);
-                       bufferevent_enable (bev, EV_WRITE);
                        break;
-       }
-}
-
-static void
-write_socket (struct bufferevent *bev, void *arg)
-{
-       struct worker_task *task = (struct worker_task *)arg;
-
-       if (task->state > READ_MESSAGE) {
-               msg_info ("closing connection");
-               /* Free buffers */
-               free_task (task);
-               bufferevent_disable (bev, EV_WRITE);
-               bufferevent_free (bev);
+               case CLOSING_CONNECTION:
+                       msg_debug ("write_socket: normally closing connection");
+                       free_task (task);
+                       break;
+               default:
+                       msg_info ("write_socket: abnormally closing connection");
+                       free_task (task);
+                       break;
        }
 }
 
@@ -255,11 +269,9 @@ static void
 err_socket (struct bufferevent *bev, short what, void *arg)
 {
        struct worker_task *task = (struct worker_task *)arg;
-       msg_info ("closing connection");
+       msg_info ("err_socket: abnormally closing connection");
        /* Free buffers */
        free_task (task);
-       bufferevent_disable (bev, EV_READ);
-       bufferevent_free (bev);
 }
 
 static void
@@ -286,6 +298,7 @@ accept_socket (int fd, short what, void *arg)
        bzero (new_task, sizeof (struct worker_task));
        new_task->worker = worker;
        new_task->state = READ_COMMAND;
+       new_task->sock = nfd;
        new_task->cfg = worker->srv->cfg;
        TAILQ_INIT (&new_task->urls);
        TAILQ_INIT (&new_task->parts);
@@ -294,15 +307,6 @@ accept_socket (int fd, short what, void *arg)
 #else
        new_task->task_pool = memory_pool_new (TASK_POOL_SIZE);
 #endif
-       new_task->memc_ctx = memory_pool_alloc (new_task->task_pool, sizeof (memcached_ctx_t));
-       if (new_task->memc_ctx == NULL) {
-               msg_err ("accept_socket: cannot allocate memory for memcached ctx, %m");
-       }
-       else {
-               if (memc_init_ctx (new_task->memc_ctx) == -1) {
-                       msg_err ("accept_socket: cannot init memcached context for task");
-               }
-       }
 
        /* Read event */
        new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task);