From fb659154cb491bed9b158adfbb34337cadad7a7e Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Fri, 24 Oct 2008 19:32:39 +0400 Subject: [PATCH] * Stylify and fix util functions that was gathered from libutil code (memory allocation) * Fix protocol and network functions * Fix some memory allocation issues --- cfg_utils.c | 4 ++ controller.c | 1 + main.c | 1 + main.h | 3 + plugins/surbl.c | 4 +- protocol.c | 27 +++++++-- protocol.h | 1 + util.c | 149 ++++++++++++++++++++++++------------------------ worker.c | 60 ++++++++++--------- 9 files changed, 140 insertions(+), 110 deletions(-) diff --git a/cfg_utils.c b/cfg_utils.c index 45442fe62..9fa7aac47 100644 --- a/cfg_utils.c +++ b/cfg_utils.c @@ -358,6 +358,10 @@ parse_filters_str (struct config_file *cfg, const char *str, enum script_type ty gchar **strvec, **p; struct filter *cur; int i; + + if (str == NULL) { + return; + } strvec = g_strsplit (str, ",", 0); if (strvec == NULL) { diff --git a/controller.c b/controller.c index 0af9d1d2e..3f4f9b358 100644 --- a/controller.c +++ b/controller.c @@ -280,6 +280,7 @@ accept_socket (int fd, short what, void *arg) } bzero (new_session, sizeof (struct controller_session)); new_session->worker = worker; + new_session->sock = nfd; new_session->cfg = worker->srv->cfg; #ifdef HAVE_GETPAGESIZE new_session->session_pool = memory_pool_new (getpagesize () - 1); diff --git a/main.c b/main.c index d7ecb418b..a88662085 100644 --- a/main.c +++ b/main.c @@ -220,6 +220,7 @@ main (int argc, char **argv) rspamd->cfg->temp_dir = memory_pool_strdup (rspamd->cfg->cfg_pool, getenv ("TMPDIR")); if (!rspamd->cfg->temp_dir) { + msg_warn ("$TMPDIR is empty too, using /tmp as default"); rspamd->cfg->temp_dir = memory_pool_strdup (rspamd->cfg->cfg_pool, "/tmp"); } } diff --git a/main.h b/main.h index de8969de8..2da935546 100644 --- a/main.h +++ b/main.h @@ -109,6 +109,7 @@ struct save_point { /* Control session */ struct controller_session { struct rspamd_worker *worker; + int sock; /* Access to authorized commands */ int authorized; memory_pool_t *session_pool; @@ -126,10 +127,12 @@ struct worker_task { WRITE_REPLY, WRITE_ERROR, WAIT_FILTER, + CLOSING_CONNECTION, } state; size_t content_length; enum rspamd_protocol proto; enum rspamd_command cmd; + int sock; char *helo; char *from; char *rcpt; diff --git a/plugins/surbl.c b/plugins/surbl.c index 23c258326..c90a8419e 100644 --- a/plugins/surbl.c +++ b/plugins/surbl.c @@ -105,8 +105,6 @@ surbl_module_init (struct config_file *cfg, struct module_ctx **ctx) *ctx = (struct module_ctx *)surbl_module_ctx; - evdns_init (); - return 0; } @@ -117,6 +115,8 @@ surbl_module_config (struct config_file *cfg) char *value, *cur_tok, *str; + evdns_init (); + if ((value = get_module_opt (cfg, "surbl", "redirector")) != NULL) { str = memory_pool_strdup (surbl_module_ctx->surbl_pool, value); cur_tok = strsep (&str, ":"); diff --git a/protocol.c b/protocol.c index 0881b9788..94cb2f824 100644 --- a/protocol.c +++ b/protocol.c @@ -73,6 +73,7 @@ parse_command (struct worker_task *task, char *line) token = strsep (&line, " "); if (line == NULL || token == NULL) { + msg_debug ("parse_command: bad comand: %s", token); return -1; } @@ -153,14 +154,27 @@ static int parse_header (struct worker_task *task, char *line) { char *headern, *err; - headern = strsep (&line, ":"); /* Check end of headers */ - if (*line == '\r' && *(line + 1) == '\n') { - task->state = READ_MESSAGE; + if (*line == '\0') { + if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) { + task->state = WRITE_REPLY; + } + else { + if (task->content_length > 0) { + task->state = READ_MESSAGE; + } + else { + task->last_error = "Unknown content length"; + task->error_code = RSPAMD_LENGTH_ERROR; + task->state = WRITE_ERROR; + } + } return 0; } + headern = strsep (&line, ":"); + if (line == NULL || headern == NULL) { return -1; } @@ -433,14 +447,17 @@ write_reply (struct worker_task *task) int r; char outbuf[OUTBUFSIZ]; + msg_debug ("write_reply: writing reply to client"); if (task->error_code != 0) { /* Write error message and error code to reply */ if (task->proto == SPAMC_PROTO) { r = snprintf (outbuf, sizeof (outbuf), "%s %d %s" CRLF CRLF, SPAMD_REPLY_BANNER, task->error_code, SPAMD_ERROR); + msg_debug ("write_reply: writing error: %s", outbuf); } else { r = snprintf (outbuf, sizeof (outbuf), "%s %d %s" CRLF "%s: %s" CRLF CRLF, RSPAMD_REPLY_BANNER, task->error_code, SPAMD_ERROR, ERROR_HEADER, task->last_error); + msg_debug ("write_reply: writing error: %s", outbuf); } /* Write to bufferevent error message */ bufferevent_write (task->bev, outbuf, r); @@ -459,12 +476,12 @@ write_reply (struct worker_task *task) return write_process_reply (task); break; case CMD_SKIP: - r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, + r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, SPAMD_OK); bufferevent_write (task->bev, outbuf, r); break; case CMD_PING: - r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER); + r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER); bufferevent_write (task->bev, outbuf, r); break; } diff --git a/protocol.h b/protocol.h index 50b458de0..6c750e91f 100644 --- a/protocol.h +++ b/protocol.h @@ -6,6 +6,7 @@ #define RSPAMD_FILTER_ERROR 1 #define RSPAMD_NETWORK_ERROR 2 #define RSPAMD_PROTOCOL_ERROR 3 +#define RSPAMD_LENGTH_ERROR 4 struct worker_task; diff --git a/util.c b/util.c index 0bd7f91c8..b0d40daf0 100644 --- a/util.c +++ b/util.c @@ -77,18 +77,18 @@ make_socket (const char *address, u_short port) char strport[NI_MAXSERV]; int ai_result; - memset(&ai, 0, sizeof (ai)); + memset (&ai, 0, sizeof (ai)); ai.ai_family = AF_INET; ai.ai_socktype = SOCK_STREAM; ai.ai_flags = AI_PASSIVE; - snprintf(strport, sizeof (strport), "%d", port); - if ((ai_result = getaddrinfo(address, strport, &ai, &aitop)) != 0) { + snprintf (strport, sizeof (strport), "%d", port); + if ((ai_result = getaddrinfo (address, strport, &ai, &aitop)) != 0) { return (-1); } - fd = make_socket_ai(aitop); + fd = make_socket_ai (aitop); - freeaddrinfo(aitop); + freeaddrinfo (aitop); return (fd); } @@ -129,8 +129,7 @@ read_cmd_line (int argc, char **argv, struct config_file *cfg) break; case 'c': if (optarg && cfg->cfg_name) { - free (cfg->cfg_name); - cfg->cfg_name = strdup (optarg); + cfg->cfg_name = memory_pool_strdup (cfg->cfg_pool, optarg); } break; case 'h': @@ -211,7 +210,7 @@ static size_t title_buffer_size = 0; static char *title_progname, *title_progname_full; int -setproctitle(const char *fmt, ...) +setproctitle (const char *fmt, ...) { if (!title_buffer || !title_buffer_size) { errno = ENOMEM; @@ -258,7 +257,7 @@ setproctitle(const char *fmt, ...) */ int -init_title(int argc, char *argv[], char *envp[]) +init_title (int argc, char *argv[], char *envp[]) { char *begin_of_buffer = 0, *end_of_buffer = 0; int i; @@ -286,13 +285,13 @@ init_title(int argc, char *argv[], char *envp[]) return 0; for (i = 0; envp[i]; ++i) { - if (!(new_environ[i] = strdup (envp[i]))) + if (!(new_environ[i] = g_strdup (envp[i]))) goto cleanup_enomem; } new_environ[i] = 0; if (program_invocation_name) { - title_progname_full = strdup (program_invocation_name); + title_progname_full = g_strdup (program_invocation_name); if (!title_progname_full) goto cleanup_enomem; @@ -316,19 +315,19 @@ init_title(int argc, char *argv[], char *envp[]) cleanup_enomem: for (--i; i >= 0; --i) { - free(new_environ[i]); + g_free (new_environ[i]); } - free(new_environ); + g_free (new_environ); return 0; } #endif #ifndef HAVE_PIDFILE extern char * __progname; -static int _pidfile_remove(struct pidfh *pfh, int freeit); +static int _pidfile_remove (struct pidfh *pfh, int freeit); static int -pidfile_verify(struct pidfh *pfh) +pidfile_verify (struct pidfh *pfh) { struct stat sb; @@ -337,61 +336,61 @@ pidfile_verify(struct pidfh *pfh) /* * Check remembered descriptor. */ - if (fstat(pfh->pf_fd, &sb) == -1) + if (fstat (pfh->pf_fd, &sb) == -1) return (errno); if (sb.st_dev != pfh->pf_dev || sb.st_ino != pfh->pf_ino) - return (-1); - return (0); + return -1; + return 0; } static int -pidfile_read(const char *path, pid_t *pidptr) +pidfile_read (const char *path, pid_t *pidptr) { char buf[16], *endptr; int error, fd, i; - fd = open(path, O_RDONLY); + fd = open (path, O_RDONLY); if (fd == -1) return (errno); - i = read(fd, buf, sizeof(buf) - 1); + i = read (fd, buf, sizeof(buf) - 1); error = errno; /* Remember errno in case close() wants to change it. */ - close(fd); + close (fd); if (i == -1) - return (error); + return error; else if (i == 0) - return (EAGAIN); + return EAGAIN; buf[i] = '\0'; - *pidptr = strtol(buf, &endptr, 10); + *pidptr = strtol (buf, &endptr, 10); if (endptr != &buf[i]) - return (EINVAL); + return EINVAL; - return (0); + return 0; } struct pidfh * -pidfile_open(const char *path, mode_t mode, pid_t *pidptr) +pidfile_open (const char *path, mode_t mode, pid_t *pidptr) { struct pidfh *pfh; struct stat sb; int error, fd, len, count; struct timespec rqtp; - pfh = g_malloc(sizeof(*pfh)); + pfh = g_malloc (sizeof(*pfh)); if (pfh == NULL) - return (NULL); + return NULL; if (path == NULL) - len = snprintf(pfh->pf_path, sizeof(pfh->pf_path), + len = snprintf (pfh->pf_path, sizeof(pfh->pf_path), "/var/run/%s.pid", __progname); else - len = snprintf(pfh->pf_path, sizeof(pfh->pf_path), + len = snprintf (pfh->pf_path, sizeof(pfh->pf_path), "%s", path); - if (len >= (int)sizeof(pfh->pf_path)) { - free(pfh); + if (len >= (int)sizeof (pfh->pf_path)) { + g_free (pfh); errno = ENAMETOOLONG; - return (NULL); + return NULL; } /* @@ -400,7 +399,7 @@ pidfile_open(const char *path, mode_t mode, pid_t *pidptr) * PID file will be truncated again in pidfile_write(), so * pidfile_write() can be called multiple times. */ - fd = open(pfh->pf_path, + fd = open (pfh->pf_path, O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, mode); flock (fd, LOCK_EX | LOCK_NB); if (fd == -1) { @@ -409,41 +408,41 @@ pidfile_open(const char *path, mode_t mode, pid_t *pidptr) rqtp.tv_nsec = 5000000; if (errno == EWOULDBLOCK && pidptr != NULL) { again: - errno = pidfile_read(pfh->pf_path, pidptr); + errno = pidfile_read (pfh->pf_path, pidptr); if (errno == 0) errno = EEXIST; else if (errno == EAGAIN) { if (++count <= 3) { - nanosleep(&rqtp, 0); + nanosleep (&rqtp, 0); goto again; } } } - free(pfh); - return (NULL); + g_free (pfh); + return NULL; } /* * Remember file information, so in pidfile_write() we are sure we write * to the proper descriptor. */ - if (fstat(fd, &sb) == -1) { + if (fstat (fd, &sb) == -1) { error = errno; - unlink(pfh->pf_path); - close(fd); - free(pfh); + unlink (pfh->pf_path); + close (fd); + g_free (pfh); errno = error; - return (NULL); + return NULL; } pfh->pf_fd = fd; pfh->pf_dev = sb.st_dev; pfh->pf_ino = sb.st_ino; - return (pfh); + return pfh; } int -pidfile_write(struct pidfh *pfh) +pidfile_write (struct pidfh *pfh) { char pidstr[16]; int error, fd; @@ -452,94 +451,94 @@ pidfile_write(struct pidfh *pfh) * Check remembered descriptor, so we don't overwrite some other * file if pidfile was closed and descriptor reused. */ - errno = pidfile_verify(pfh); + errno = pidfile_verify (pfh); if (errno != 0) { /* * Don't close descriptor, because we are not sure if it's ours. */ - return (-1); + return -1; } fd = pfh->pf_fd; /* * Truncate PID file, so multiple calls of pidfile_write() are allowed. */ - if (ftruncate(fd, 0) == -1) { + if (ftruncate (fd, 0) == -1) { error = errno; - _pidfile_remove(pfh, 0); + _pidfile_remove (pfh, 0); errno = error; - return (-1); + return -1; } - snprintf(pidstr, sizeof(pidstr), "%u", getpid()); - if (pwrite(fd, pidstr, strlen(pidstr), 0) != (ssize_t)strlen(pidstr)) { + snprintf (pidstr, sizeof(pidstr), "%u", getpid ()); + if (pwrite (fd, pidstr, strlen (pidstr), 0) != (ssize_t)strlen (pidstr)) { error = errno; - _pidfile_remove(pfh, 0); + _pidfile_remove (pfh, 0); errno = error; - return (-1); + return -1; } - return (0); + return 0; } int -pidfile_close(struct pidfh *pfh) +pidfile_close (struct pidfh *pfh) { int error; - error = pidfile_verify(pfh); + error = pidfile_verify (pfh); if (error != 0) { errno = error; - return (-1); + return -1; } - if (close(pfh->pf_fd) == -1) + if (close (pfh->pf_fd) == -1) error = errno; - free(pfh); + g_free (pfh); if (error != 0) { errno = error; - return (-1); + return -1; } - return (0); + return 0; } static int -_pidfile_remove(struct pidfh *pfh, int freeit) +_pidfile_remove (struct pidfh *pfh, int freeit) { int error; - error = pidfile_verify(pfh); + error = pidfile_verify (pfh); if (error != 0) { errno = error; - return (-1); + return -1; } - if (unlink(pfh->pf_path) == -1) + if (unlink (pfh->pf_path) == -1) error = errno; - if (flock(pfh->pf_fd, LOCK_UN) == -1) { + if (flock (pfh->pf_fd, LOCK_UN) == -1) { if (error == 0) error = errno; } - if (close(pfh->pf_fd) == -1) { + if (close (pfh->pf_fd) == -1) { if (error == 0) error = errno; } if (freeit) - free(pfh); + g_free (pfh); else pfh->pf_fd = -1; if (error != 0) { errno = error; - return (-1); + return -1; } - return (0); + return 0; } int -pidfile_remove(struct pidfh *pfh) +pidfile_remove (struct pidfh *pfh) { - return (_pidfile_remove(pfh, 1)); + return (_pidfile_remove (pfh, 1)); } #endif diff --git a/worker.c b/worker.c index 089f1ecfa..e72e3fd0e 100644 --- a/worker.c +++ b/worker.c @@ -80,6 +80,9 @@ free_task (struct worker_task *task) TAILQ_REMOVE (&task->parts, part, next); } memory_pool_delete (task->task_pool); + bufferevent_disable (task->bev, EV_READ | EV_WRITE); + bufferevent_free (task->bev); + close (task->sock); g_free (task); } } @@ -195,6 +198,10 @@ read_socket (struct bufferevent *bev, void *arg) task->error_code = RSPAMD_NETWORK_ERROR; task->state = WRITE_ERROR; } + if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) { + bufferevent_enable (bev, EV_WRITE); + bufferevent_disable (bev, EV_READ); + } free (s); break; case READ_MESSAGE: @@ -213,6 +220,10 @@ read_socket (struct bufferevent *bev, void *arg) task->state = WAIT_FILTER; } } + if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) { + bufferevent_enable (bev, EV_WRITE); + bufferevent_disable (bev, EV_READ); + } } else { msg_err ("read_socket: cannot read data to buffer: %ld", (long int)r); @@ -224,30 +235,33 @@ read_socket (struct bufferevent *bev, void *arg) case WAIT_FILTER: bufferevent_disable (bev, EV_READ); break; + } +} + +static void +write_socket (struct bufferevent *bev, void *arg) +{ + struct worker_task *task = (struct worker_task *)arg; + + switch (task->state) { case WRITE_REPLY: write_reply (task); + task->state = CLOSING_CONNECTION; bufferevent_disable (bev, EV_READ); - bufferevent_enable (bev, EV_WRITE); break; case WRITE_ERROR: write_reply (task); + task->state = CLOSING_CONNECTION; bufferevent_disable (bev, EV_READ); - bufferevent_enable (bev, EV_WRITE); break; - } -} - -static void -write_socket (struct bufferevent *bev, void *arg) -{ - struct worker_task *task = (struct worker_task *)arg; - - if (task->state > READ_MESSAGE) { - msg_info ("closing connection"); - /* Free buffers */ - free_task (task); - bufferevent_disable (bev, EV_WRITE); - bufferevent_free (bev); + case CLOSING_CONNECTION: + msg_debug ("write_socket: normally closing connection"); + free_task (task); + break; + default: + msg_info ("write_socket: abnormally closing connection"); + free_task (task); + break; } } @@ -255,11 +269,9 @@ static void err_socket (struct bufferevent *bev, short what, void *arg) { struct worker_task *task = (struct worker_task *)arg; - msg_info ("closing connection"); + msg_info ("err_socket: abnormally closing connection"); /* Free buffers */ free_task (task); - bufferevent_disable (bev, EV_READ); - bufferevent_free (bev); } static void @@ -286,6 +298,7 @@ accept_socket (int fd, short what, void *arg) bzero (new_task, sizeof (struct worker_task)); new_task->worker = worker; new_task->state = READ_COMMAND; + new_task->sock = nfd; new_task->cfg = worker->srv->cfg; TAILQ_INIT (&new_task->urls); TAILQ_INIT (&new_task->parts); @@ -294,15 +307,6 @@ accept_socket (int fd, short what, void *arg) #else new_task->task_pool = memory_pool_new (TASK_POOL_SIZE); #endif - new_task->memc_ctx = memory_pool_alloc (new_task->task_pool, sizeof (memcached_ctx_t)); - if (new_task->memc_ctx == NULL) { - msg_err ("accept_socket: cannot allocate memory for memcached ctx, %m"); - } - else { - if (memc_init_ctx (new_task->memc_ctx) == -1) { - msg_err ("accept_socket: cannot init memcached context for task"); - } - } /* Read event */ new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task); -- 2.39.5