aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2008-10-24 19:32:39 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2008-10-24 19:32:39 +0400
commitfb659154cb491bed9b158adfbb34337cadad7a7e (patch)
treede41f735d21fa2c72f10093584989e95df714b28
parente5f01249a3498ab47c7c7852f83564d466629a8b (diff)
downloadrspamd-fb659154cb491bed9b158adfbb34337cadad7a7e.tar.gz
rspamd-fb659154cb491bed9b158adfbb34337cadad7a7e.zip
* Stylify and fix util functions that was gathered from libutil code (memory allocation)
* Fix protocol and network functions * Fix some memory allocation issues
-rw-r--r--cfg_utils.c4
-rw-r--r--controller.c1
-rw-r--r--main.c1
-rw-r--r--main.h3
-rw-r--r--plugins/surbl.c4
-rw-r--r--protocol.c27
-rw-r--r--protocol.h1
-rw-r--r--util.c149
-rw-r--r--worker.c60
9 files changed, 140 insertions, 110 deletions
diff --git a/cfg_utils.c b/cfg_utils.c
index 45442fe62..9fa7aac47 100644
--- a/cfg_utils.c
+++ b/cfg_utils.c
@@ -358,6 +358,10 @@ parse_filters_str (struct config_file *cfg, const char *str, enum script_type ty
gchar **strvec, **p;
struct filter *cur;
int i;
+
+ if (str == NULL) {
+ return;
+ }
strvec = g_strsplit (str, ",", 0);
if (strvec == NULL) {
diff --git a/controller.c b/controller.c
index 0af9d1d2e..3f4f9b358 100644
--- a/controller.c
+++ b/controller.c
@@ -280,6 +280,7 @@ accept_socket (int fd, short what, void *arg)
}
bzero (new_session, sizeof (struct controller_session));
new_session->worker = worker;
+ new_session->sock = nfd;
new_session->cfg = worker->srv->cfg;
#ifdef HAVE_GETPAGESIZE
new_session->session_pool = memory_pool_new (getpagesize () - 1);
diff --git a/main.c b/main.c
index d7ecb418b..a88662085 100644
--- a/main.c
+++ b/main.c
@@ -220,6 +220,7 @@ main (int argc, char **argv)
rspamd->cfg->temp_dir = memory_pool_strdup (rspamd->cfg->cfg_pool, getenv ("TMPDIR"));
if (!rspamd->cfg->temp_dir) {
+ msg_warn ("$TMPDIR is empty too, using /tmp as default");
rspamd->cfg->temp_dir = memory_pool_strdup (rspamd->cfg->cfg_pool, "/tmp");
}
}
diff --git a/main.h b/main.h
index de8969de8..2da935546 100644
--- a/main.h
+++ b/main.h
@@ -109,6 +109,7 @@ struct save_point {
/* Control session */
struct controller_session {
struct rspamd_worker *worker;
+ int sock;
/* Access to authorized commands */
int authorized;
memory_pool_t *session_pool;
@@ -126,10 +127,12 @@ struct worker_task {
WRITE_REPLY,
WRITE_ERROR,
WAIT_FILTER,
+ CLOSING_CONNECTION,
} state;
size_t content_length;
enum rspamd_protocol proto;
enum rspamd_command cmd;
+ int sock;
char *helo;
char *from;
char *rcpt;
diff --git a/plugins/surbl.c b/plugins/surbl.c
index 23c258326..c90a8419e 100644
--- a/plugins/surbl.c
+++ b/plugins/surbl.c
@@ -105,8 +105,6 @@ surbl_module_init (struct config_file *cfg, struct module_ctx **ctx)
*ctx = (struct module_ctx *)surbl_module_ctx;
- evdns_init ();
-
return 0;
}
@@ -117,6 +115,8 @@ surbl_module_config (struct config_file *cfg)
char *value, *cur_tok, *str;
+ evdns_init ();
+
if ((value = get_module_opt (cfg, "surbl", "redirector")) != NULL) {
str = memory_pool_strdup (surbl_module_ctx->surbl_pool, value);
cur_tok = strsep (&str, ":");
diff --git a/protocol.c b/protocol.c
index 0881b9788..94cb2f824 100644
--- a/protocol.c
+++ b/protocol.c
@@ -73,6 +73,7 @@ parse_command (struct worker_task *task, char *line)
token = strsep (&line, " ");
if (line == NULL || token == NULL) {
+ msg_debug ("parse_command: bad comand: %s", token);
return -1;
}
@@ -153,14 +154,27 @@ static int
parse_header (struct worker_task *task, char *line)
{
char *headern, *err;
- headern = strsep (&line, ":");
/* Check end of headers */
- if (*line == '\r' && *(line + 1) == '\n') {
- task->state = READ_MESSAGE;
+ if (*line == '\0') {
+ if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) {
+ task->state = WRITE_REPLY;
+ }
+ else {
+ if (task->content_length > 0) {
+ task->state = READ_MESSAGE;
+ }
+ else {
+ task->last_error = "Unknown content length";
+ task->error_code = RSPAMD_LENGTH_ERROR;
+ task->state = WRITE_ERROR;
+ }
+ }
return 0;
}
+ headern = strsep (&line, ":");
+
if (line == NULL || headern == NULL) {
return -1;
}
@@ -433,14 +447,17 @@ write_reply (struct worker_task *task)
int r;
char outbuf[OUTBUFSIZ];
+ msg_debug ("write_reply: writing reply to client");
if (task->error_code != 0) {
/* Write error message and error code to reply */
if (task->proto == SPAMC_PROTO) {
r = snprintf (outbuf, sizeof (outbuf), "%s %d %s" CRLF CRLF, SPAMD_REPLY_BANNER, task->error_code, SPAMD_ERROR);
+ msg_debug ("write_reply: writing error: %s", outbuf);
}
else {
r = snprintf (outbuf, sizeof (outbuf), "%s %d %s" CRLF "%s: %s" CRLF CRLF, RSPAMD_REPLY_BANNER, task->error_code,
SPAMD_ERROR, ERROR_HEADER, task->last_error);
+ msg_debug ("write_reply: writing error: %s", outbuf);
}
/* Write to bufferevent error message */
bufferevent_write (task->bev, outbuf, r);
@@ -459,12 +476,12 @@ write_reply (struct worker_task *task)
return write_process_reply (task);
break;
case CMD_SKIP:
- r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER,
+ r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER,
SPAMD_OK);
bufferevent_write (task->bev, outbuf, r);
break;
case CMD_PING:
- r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER);
+ r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER);
bufferevent_write (task->bev, outbuf, r);
break;
}
diff --git a/protocol.h b/protocol.h
index 50b458de0..6c750e91f 100644
--- a/protocol.h
+++ b/protocol.h
@@ -6,6 +6,7 @@
#define RSPAMD_FILTER_ERROR 1
#define RSPAMD_NETWORK_ERROR 2
#define RSPAMD_PROTOCOL_ERROR 3
+#define RSPAMD_LENGTH_ERROR 4
struct worker_task;
diff --git a/util.c b/util.c
index 0bd7f91c8..b0d40daf0 100644
--- a/util.c
+++ b/util.c
@@ -77,18 +77,18 @@ make_socket (const char *address, u_short port)
char strport[NI_MAXSERV];
int ai_result;
- memset(&ai, 0, sizeof (ai));
+ memset (&ai, 0, sizeof (ai));
ai.ai_family = AF_INET;
ai.ai_socktype = SOCK_STREAM;
ai.ai_flags = AI_PASSIVE;
- snprintf(strport, sizeof (strport), "%d", port);
- if ((ai_result = getaddrinfo(address, strport, &ai, &aitop)) != 0) {
+ snprintf (strport, sizeof (strport), "%d", port);
+ if ((ai_result = getaddrinfo (address, strport, &ai, &aitop)) != 0) {
return (-1);
}
- fd = make_socket_ai(aitop);
+ fd = make_socket_ai (aitop);
- freeaddrinfo(aitop);
+ freeaddrinfo (aitop);
return (fd);
}
@@ -129,8 +129,7 @@ read_cmd_line (int argc, char **argv, struct config_file *cfg)
break;
case 'c':
if (optarg && cfg->cfg_name) {
- free (cfg->cfg_name);
- cfg->cfg_name = strdup (optarg);
+ cfg->cfg_name = memory_pool_strdup (cfg->cfg_pool, optarg);
}
break;
case 'h':
@@ -211,7 +210,7 @@ static size_t title_buffer_size = 0;
static char *title_progname, *title_progname_full;
int
-setproctitle(const char *fmt, ...)
+setproctitle (const char *fmt, ...)
{
if (!title_buffer || !title_buffer_size) {
errno = ENOMEM;
@@ -258,7 +257,7 @@ setproctitle(const char *fmt, ...)
*/
int
-init_title(int argc, char *argv[], char *envp[])
+init_title (int argc, char *argv[], char *envp[])
{
char *begin_of_buffer = 0, *end_of_buffer = 0;
int i;
@@ -286,13 +285,13 @@ init_title(int argc, char *argv[], char *envp[])
return 0;
for (i = 0; envp[i]; ++i) {
- if (!(new_environ[i] = strdup (envp[i])))
+ if (!(new_environ[i] = g_strdup (envp[i])))
goto cleanup_enomem;
}
new_environ[i] = 0;
if (program_invocation_name) {
- title_progname_full = strdup (program_invocation_name);
+ title_progname_full = g_strdup (program_invocation_name);
if (!title_progname_full)
goto cleanup_enomem;
@@ -316,19 +315,19 @@ init_title(int argc, char *argv[], char *envp[])
cleanup_enomem:
for (--i; i >= 0; --i) {
- free(new_environ[i]);
+ g_free (new_environ[i]);
}
- free(new_environ);
+ g_free (new_environ);
return 0;
}
#endif
#ifndef HAVE_PIDFILE
extern char * __progname;
-static int _pidfile_remove(struct pidfh *pfh, int freeit);
+static int _pidfile_remove (struct pidfh *pfh, int freeit);
static int
-pidfile_verify(struct pidfh *pfh)
+pidfile_verify (struct pidfh *pfh)
{
struct stat sb;
@@ -337,61 +336,61 @@ pidfile_verify(struct pidfh *pfh)
/*
* Check remembered descriptor.
*/
- if (fstat(pfh->pf_fd, &sb) == -1)
+ if (fstat (pfh->pf_fd, &sb) == -1)
return (errno);
if (sb.st_dev != pfh->pf_dev || sb.st_ino != pfh->pf_ino)
- return (-1);
- return (0);
+ return -1;
+ return 0;
}
static int
-pidfile_read(const char *path, pid_t *pidptr)
+pidfile_read (const char *path, pid_t *pidptr)
{
char buf[16], *endptr;
int error, fd, i;
- fd = open(path, O_RDONLY);
+ fd = open (path, O_RDONLY);
if (fd == -1)
return (errno);
- i = read(fd, buf, sizeof(buf) - 1);
+ i = read (fd, buf, sizeof(buf) - 1);
error = errno; /* Remember errno in case close() wants to change it. */
- close(fd);
+ close (fd);
if (i == -1)
- return (error);
+ return error;
else if (i == 0)
- return (EAGAIN);
+ return EAGAIN;
buf[i] = '\0';
- *pidptr = strtol(buf, &endptr, 10);
+ *pidptr = strtol (buf, &endptr, 10);
if (endptr != &buf[i])
- return (EINVAL);
+ return EINVAL;
- return (0);
+ return 0;
}
struct pidfh *
-pidfile_open(const char *path, mode_t mode, pid_t *pidptr)
+pidfile_open (const char *path, mode_t mode, pid_t *pidptr)
{
struct pidfh *pfh;
struct stat sb;
int error, fd, len, count;
struct timespec rqtp;
- pfh = g_malloc(sizeof(*pfh));
+ pfh = g_malloc (sizeof(*pfh));
if (pfh == NULL)
- return (NULL);
+ return NULL;
if (path == NULL)
- len = snprintf(pfh->pf_path, sizeof(pfh->pf_path),
+ len = snprintf (pfh->pf_path, sizeof(pfh->pf_path),
"/var/run/%s.pid", __progname);
else
- len = snprintf(pfh->pf_path, sizeof(pfh->pf_path),
+ len = snprintf (pfh->pf_path, sizeof(pfh->pf_path),
"%s", path);
- if (len >= (int)sizeof(pfh->pf_path)) {
- free(pfh);
+ if (len >= (int)sizeof (pfh->pf_path)) {
+ g_free (pfh);
errno = ENAMETOOLONG;
- return (NULL);
+ return NULL;
}
/*
@@ -400,7 +399,7 @@ pidfile_open(const char *path, mode_t mode, pid_t *pidptr)
* PID file will be truncated again in pidfile_write(), so
* pidfile_write() can be called multiple times.
*/
- fd = open(pfh->pf_path,
+ fd = open (pfh->pf_path,
O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, mode);
flock (fd, LOCK_EX | LOCK_NB);
if (fd == -1) {
@@ -409,41 +408,41 @@ pidfile_open(const char *path, mode_t mode, pid_t *pidptr)
rqtp.tv_nsec = 5000000;
if (errno == EWOULDBLOCK && pidptr != NULL) {
again:
- errno = pidfile_read(pfh->pf_path, pidptr);
+ errno = pidfile_read (pfh->pf_path, pidptr);
if (errno == 0)
errno = EEXIST;
else if (errno == EAGAIN) {
if (++count <= 3) {
- nanosleep(&rqtp, 0);
+ nanosleep (&rqtp, 0);
goto again;
}
}
}
- free(pfh);
- return (NULL);
+ g_free (pfh);
+ return NULL;
}
/*
* Remember file information, so in pidfile_write() we are sure we write
* to the proper descriptor.
*/
- if (fstat(fd, &sb) == -1) {
+ if (fstat (fd, &sb) == -1) {
error = errno;
- unlink(pfh->pf_path);
- close(fd);
- free(pfh);
+ unlink (pfh->pf_path);
+ close (fd);
+ g_free (pfh);
errno = error;
- return (NULL);
+ return NULL;
}
pfh->pf_fd = fd;
pfh->pf_dev = sb.st_dev;
pfh->pf_ino = sb.st_ino;
- return (pfh);
+ return pfh;
}
int
-pidfile_write(struct pidfh *pfh)
+pidfile_write (struct pidfh *pfh)
{
char pidstr[16];
int error, fd;
@@ -452,94 +451,94 @@ pidfile_write(struct pidfh *pfh)
* Check remembered descriptor, so we don't overwrite some other
* file if pidfile was closed and descriptor reused.
*/
- errno = pidfile_verify(pfh);
+ errno = pidfile_verify (pfh);
if (errno != 0) {
/*
* Don't close descriptor, because we are not sure if it's ours.
*/
- return (-1);
+ return -1;
}
fd = pfh->pf_fd;
/*
* Truncate PID file, so multiple calls of pidfile_write() are allowed.
*/
- if (ftruncate(fd, 0) == -1) {
+ if (ftruncate (fd, 0) == -1) {
error = errno;
- _pidfile_remove(pfh, 0);
+ _pidfile_remove (pfh, 0);
errno = error;
- return (-1);
+ return -1;
}
- snprintf(pidstr, sizeof(pidstr), "%u", getpid());
- if (pwrite(fd, pidstr, strlen(pidstr), 0) != (ssize_t)strlen(pidstr)) {
+ snprintf (pidstr, sizeof(pidstr), "%u", getpid ());
+ if (pwrite (fd, pidstr, strlen (pidstr), 0) != (ssize_t)strlen (pidstr)) {
error = errno;
- _pidfile_remove(pfh, 0);
+ _pidfile_remove (pfh, 0);
errno = error;
- return (-1);
+ return -1;
}
- return (0);
+ return 0;
}
int
-pidfile_close(struct pidfh *pfh)
+pidfile_close (struct pidfh *pfh)
{
int error;
- error = pidfile_verify(pfh);
+ error = pidfile_verify (pfh);
if (error != 0) {
errno = error;
- return (-1);
+ return -1;
}
- if (close(pfh->pf_fd) == -1)
+ if (close (pfh->pf_fd) == -1)
error = errno;
- free(pfh);
+ g_free (pfh);
if (error != 0) {
errno = error;
- return (-1);
+ return -1;
}
- return (0);
+ return 0;
}
static int
-_pidfile_remove(struct pidfh *pfh, int freeit)
+_pidfile_remove (struct pidfh *pfh, int freeit)
{
int error;
- error = pidfile_verify(pfh);
+ error = pidfile_verify (pfh);
if (error != 0) {
errno = error;
- return (-1);
+ return -1;
}
- if (unlink(pfh->pf_path) == -1)
+ if (unlink (pfh->pf_path) == -1)
error = errno;
- if (flock(pfh->pf_fd, LOCK_UN) == -1) {
+ if (flock (pfh->pf_fd, LOCK_UN) == -1) {
if (error == 0)
error = errno;
}
- if (close(pfh->pf_fd) == -1) {
+ if (close (pfh->pf_fd) == -1) {
if (error == 0)
error = errno;
}
if (freeit)
- free(pfh);
+ g_free (pfh);
else
pfh->pf_fd = -1;
if (error != 0) {
errno = error;
- return (-1);
+ return -1;
}
- return (0);
+ return 0;
}
int
-pidfile_remove(struct pidfh *pfh)
+pidfile_remove (struct pidfh *pfh)
{
- return (_pidfile_remove(pfh, 1));
+ return (_pidfile_remove (pfh, 1));
}
#endif
diff --git a/worker.c b/worker.c
index 089f1ecfa..e72e3fd0e 100644
--- a/worker.c
+++ b/worker.c
@@ -80,6 +80,9 @@ free_task (struct worker_task *task)
TAILQ_REMOVE (&task->parts, part, next);
}
memory_pool_delete (task->task_pool);
+ bufferevent_disable (task->bev, EV_READ | EV_WRITE);
+ bufferevent_free (task->bev);
+ close (task->sock);
g_free (task);
}
}
@@ -195,6 +198,10 @@ read_socket (struct bufferevent *bev, void *arg)
task->error_code = RSPAMD_NETWORK_ERROR;
task->state = WRITE_ERROR;
}
+ if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) {
+ bufferevent_enable (bev, EV_WRITE);
+ bufferevent_disable (bev, EV_READ);
+ }
free (s);
break;
case READ_MESSAGE:
@@ -213,6 +220,10 @@ read_socket (struct bufferevent *bev, void *arg)
task->state = WAIT_FILTER;
}
}
+ if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) {
+ bufferevent_enable (bev, EV_WRITE);
+ bufferevent_disable (bev, EV_READ);
+ }
}
else {
msg_err ("read_socket: cannot read data to buffer: %ld", (long int)r);
@@ -224,30 +235,33 @@ read_socket (struct bufferevent *bev, void *arg)
case WAIT_FILTER:
bufferevent_disable (bev, EV_READ);
break;
+ }
+}
+
+static void
+write_socket (struct bufferevent *bev, void *arg)
+{
+ struct worker_task *task = (struct worker_task *)arg;
+
+ switch (task->state) {
case WRITE_REPLY:
write_reply (task);
+ task->state = CLOSING_CONNECTION;
bufferevent_disable (bev, EV_READ);
- bufferevent_enable (bev, EV_WRITE);
break;
case WRITE_ERROR:
write_reply (task);
+ task->state = CLOSING_CONNECTION;
bufferevent_disable (bev, EV_READ);
- bufferevent_enable (bev, EV_WRITE);
break;
- }
-}
-
-static void
-write_socket (struct bufferevent *bev, void *arg)
-{
- struct worker_task *task = (struct worker_task *)arg;
-
- if (task->state > READ_MESSAGE) {
- msg_info ("closing connection");
- /* Free buffers */
- free_task (task);
- bufferevent_disable (bev, EV_WRITE);
- bufferevent_free (bev);
+ case CLOSING_CONNECTION:
+ msg_debug ("write_socket: normally closing connection");
+ free_task (task);
+ break;
+ default:
+ msg_info ("write_socket: abnormally closing connection");
+ free_task (task);
+ break;
}
}
@@ -255,11 +269,9 @@ static void
err_socket (struct bufferevent *bev, short what, void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
- msg_info ("closing connection");
+ msg_info ("err_socket: abnormally closing connection");
/* Free buffers */
free_task (task);
- bufferevent_disable (bev, EV_READ);
- bufferevent_free (bev);
}
static void
@@ -286,6 +298,7 @@ accept_socket (int fd, short what, void *arg)
bzero (new_task, sizeof (struct worker_task));
new_task->worker = worker;
new_task->state = READ_COMMAND;
+ new_task->sock = nfd;
new_task->cfg = worker->srv->cfg;
TAILQ_INIT (&new_task->urls);
TAILQ_INIT (&new_task->parts);
@@ -294,15 +307,6 @@ accept_socket (int fd, short what, void *arg)
#else
new_task->task_pool = memory_pool_new (TASK_POOL_SIZE);
#endif
- new_task->memc_ctx = memory_pool_alloc (new_task->task_pool, sizeof (memcached_ctx_t));
- if (new_task->memc_ctx == NULL) {
- msg_err ("accept_socket: cannot allocate memory for memcached ctx, %m");
- }
- else {
- if (memc_init_ctx (new_task->memc_ctx) == -1) {
- msg_err ("accept_socket: cannot init memcached context for task");
- }
- }
/* Read event */
new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task);