token = strsep (&line, " ");
if (line == NULL || token == NULL) {
+ msg_debug ("parse_command: bad comand: %s", token);
return -1;
}
parse_header (struct worker_task *task, char *line)
{
char *headern, *err;
- headern = strsep (&line, ":");
/* Check end of headers */
- if (*line == '\r' && *(line + 1) == '\n') {
- task->state = READ_MESSAGE;
+ if (*line == '\0') {
+ if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) {
+ task->state = WRITE_REPLY;
+ }
+ else {
+ if (task->content_length > 0) {
+ task->state = READ_MESSAGE;
+ }
+ else {
+ task->last_error = "Unknown content length";
+ task->error_code = RSPAMD_LENGTH_ERROR;
+ task->state = WRITE_ERROR;
+ }
+ }
return 0;
}
+ headern = strsep (&line, ":");
+
if (line == NULL || headern == NULL) {
return -1;
}
int r;
char outbuf[OUTBUFSIZ];
+ msg_debug ("write_reply: writing reply to client");
if (task->error_code != 0) {
/* Write error message and error code to reply */
if (task->proto == SPAMC_PROTO) {
r = snprintf (outbuf, sizeof (outbuf), "%s %d %s" CRLF CRLF, SPAMD_REPLY_BANNER, task->error_code, SPAMD_ERROR);
+ msg_debug ("write_reply: writing error: %s", outbuf);
}
else {
r = snprintf (outbuf, sizeof (outbuf), "%s %d %s" CRLF "%s: %s" CRLF CRLF, RSPAMD_REPLY_BANNER, task->error_code,
SPAMD_ERROR, ERROR_HEADER, task->last_error);
+ msg_debug ("write_reply: writing error: %s", outbuf);
}
/* Write to bufferevent error message */
bufferevent_write (task->bev, outbuf, r);
return write_process_reply (task);
break;
case CMD_SKIP:
- r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER,
+ r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER,
SPAMD_OK);
bufferevent_write (task->bev, outbuf, r);
break;
case CMD_PING:
- r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER);
+ r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER);
bufferevent_write (task->bev, outbuf, r);
break;
}
char strport[NI_MAXSERV];
int ai_result;
- memset(&ai, 0, sizeof (ai));
+ memset (&ai, 0, sizeof (ai));
ai.ai_family = AF_INET;
ai.ai_socktype = SOCK_STREAM;
ai.ai_flags = AI_PASSIVE;
- snprintf(strport, sizeof (strport), "%d", port);
- if ((ai_result = getaddrinfo(address, strport, &ai, &aitop)) != 0) {
+ snprintf (strport, sizeof (strport), "%d", port);
+ if ((ai_result = getaddrinfo (address, strport, &ai, &aitop)) != 0) {
return (-1);
}
- fd = make_socket_ai(aitop);
+ fd = make_socket_ai (aitop);
- freeaddrinfo(aitop);
+ freeaddrinfo (aitop);
return (fd);
}
break;
case 'c':
if (optarg && cfg->cfg_name) {
- free (cfg->cfg_name);
- cfg->cfg_name = strdup (optarg);
+ cfg->cfg_name = memory_pool_strdup (cfg->cfg_pool, optarg);
}
break;
case 'h':
static char *title_progname, *title_progname_full;
int
-setproctitle(const char *fmt, ...)
+setproctitle (const char *fmt, ...)
{
if (!title_buffer || !title_buffer_size) {
errno = ENOMEM;
*/
int
-init_title(int argc, char *argv[], char *envp[])
+init_title (int argc, char *argv[], char *envp[])
{
char *begin_of_buffer = 0, *end_of_buffer = 0;
int i;
return 0;
for (i = 0; envp[i]; ++i) {
- if (!(new_environ[i] = strdup (envp[i])))
+ if (!(new_environ[i] = g_strdup (envp[i])))
goto cleanup_enomem;
}
new_environ[i] = 0;
if (program_invocation_name) {
- title_progname_full = strdup (program_invocation_name);
+ title_progname_full = g_strdup (program_invocation_name);
if (!title_progname_full)
goto cleanup_enomem;
cleanup_enomem:
for (--i; i >= 0; --i) {
- free(new_environ[i]);
+ g_free (new_environ[i]);
}
- free(new_environ);
+ g_free (new_environ);
return 0;
}
#endif
#ifndef HAVE_PIDFILE
extern char * __progname;
-static int _pidfile_remove(struct pidfh *pfh, int freeit);
+static int _pidfile_remove (struct pidfh *pfh, int freeit);
static int
-pidfile_verify(struct pidfh *pfh)
+pidfile_verify (struct pidfh *pfh)
{
struct stat sb;
/*
* Check remembered descriptor.
*/
- if (fstat(pfh->pf_fd, &sb) == -1)
+ if (fstat (pfh->pf_fd, &sb) == -1)
return (errno);
if (sb.st_dev != pfh->pf_dev || sb.st_ino != pfh->pf_ino)
- return (-1);
- return (0);
+ return -1;
+ return 0;
}
static int
-pidfile_read(const char *path, pid_t *pidptr)
+pidfile_read (const char *path, pid_t *pidptr)
{
char buf[16], *endptr;
int error, fd, i;
- fd = open(path, O_RDONLY);
+ fd = open (path, O_RDONLY);
if (fd == -1)
return (errno);
- i = read(fd, buf, sizeof(buf) - 1);
+ i = read (fd, buf, sizeof(buf) - 1);
error = errno; /* Remember errno in case close() wants to change it. */
- close(fd);
+ close (fd);
if (i == -1)
- return (error);
+ return error;
else if (i == 0)
- return (EAGAIN);
+ return EAGAIN;
buf[i] = '\0';
- *pidptr = strtol(buf, &endptr, 10);
+ *pidptr = strtol (buf, &endptr, 10);
if (endptr != &buf[i])
- return (EINVAL);
+ return EINVAL;
- return (0);
+ return 0;
}
struct pidfh *
-pidfile_open(const char *path, mode_t mode, pid_t *pidptr)
+pidfile_open (const char *path, mode_t mode, pid_t *pidptr)
{
struct pidfh *pfh;
struct stat sb;
int error, fd, len, count;
struct timespec rqtp;
- pfh = g_malloc(sizeof(*pfh));
+ pfh = g_malloc (sizeof(*pfh));
if (pfh == NULL)
- return (NULL);
+ return NULL;
if (path == NULL)
- len = snprintf(pfh->pf_path, sizeof(pfh->pf_path),
+ len = snprintf (pfh->pf_path, sizeof(pfh->pf_path),
"/var/run/%s.pid", __progname);
else
- len = snprintf(pfh->pf_path, sizeof(pfh->pf_path),
+ len = snprintf (pfh->pf_path, sizeof(pfh->pf_path),
"%s", path);
- if (len >= (int)sizeof(pfh->pf_path)) {
- free(pfh);
+ if (len >= (int)sizeof (pfh->pf_path)) {
+ g_free (pfh);
errno = ENAMETOOLONG;
- return (NULL);
+ return NULL;
}
/*
* PID file will be truncated again in pidfile_write(), so
* pidfile_write() can be called multiple times.
*/
- fd = open(pfh->pf_path,
+ fd = open (pfh->pf_path,
O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, mode);
flock (fd, LOCK_EX | LOCK_NB);
if (fd == -1) {
rqtp.tv_nsec = 5000000;
if (errno == EWOULDBLOCK && pidptr != NULL) {
again:
- errno = pidfile_read(pfh->pf_path, pidptr);
+ errno = pidfile_read (pfh->pf_path, pidptr);
if (errno == 0)
errno = EEXIST;
else if (errno == EAGAIN) {
if (++count <= 3) {
- nanosleep(&rqtp, 0);
+ nanosleep (&rqtp, 0);
goto again;
}
}
}
- free(pfh);
- return (NULL);
+ g_free (pfh);
+ return NULL;
}
/*
* Remember file information, so in pidfile_write() we are sure we write
* to the proper descriptor.
*/
- if (fstat(fd, &sb) == -1) {
+ if (fstat (fd, &sb) == -1) {
error = errno;
- unlink(pfh->pf_path);
- close(fd);
- free(pfh);
+ unlink (pfh->pf_path);
+ close (fd);
+ g_free (pfh);
errno = error;
- return (NULL);
+ return NULL;
}
pfh->pf_fd = fd;
pfh->pf_dev = sb.st_dev;
pfh->pf_ino = sb.st_ino;
- return (pfh);
+ return pfh;
}
int
-pidfile_write(struct pidfh *pfh)
+pidfile_write (struct pidfh *pfh)
{
char pidstr[16];
int error, fd;
* Check remembered descriptor, so we don't overwrite some other
* file if pidfile was closed and descriptor reused.
*/
- errno = pidfile_verify(pfh);
+ errno = pidfile_verify (pfh);
if (errno != 0) {
/*
* Don't close descriptor, because we are not sure if it's ours.
*/
- return (-1);
+ return -1;
}
fd = pfh->pf_fd;
/*
* Truncate PID file, so multiple calls of pidfile_write() are allowed.
*/
- if (ftruncate(fd, 0) == -1) {
+ if (ftruncate (fd, 0) == -1) {
error = errno;
- _pidfile_remove(pfh, 0);
+ _pidfile_remove (pfh, 0);
errno = error;
- return (-1);
+ return -1;
}
- snprintf(pidstr, sizeof(pidstr), "%u", getpid());
- if (pwrite(fd, pidstr, strlen(pidstr), 0) != (ssize_t)strlen(pidstr)) {
+ snprintf (pidstr, sizeof(pidstr), "%u", getpid ());
+ if (pwrite (fd, pidstr, strlen (pidstr), 0) != (ssize_t)strlen (pidstr)) {
error = errno;
- _pidfile_remove(pfh, 0);
+ _pidfile_remove (pfh, 0);
errno = error;
- return (-1);
+ return -1;
}
- return (0);
+ return 0;
}
int
-pidfile_close(struct pidfh *pfh)
+pidfile_close (struct pidfh *pfh)
{
int error;
- error = pidfile_verify(pfh);
+ error = pidfile_verify (pfh);
if (error != 0) {
errno = error;
- return (-1);
+ return -1;
}
- if (close(pfh->pf_fd) == -1)
+ if (close (pfh->pf_fd) == -1)
error = errno;
- free(pfh);
+ g_free (pfh);
if (error != 0) {
errno = error;
- return (-1);
+ return -1;
}
- return (0);
+ return 0;
}
static int
-_pidfile_remove(struct pidfh *pfh, int freeit)
+_pidfile_remove (struct pidfh *pfh, int freeit)
{
int error;
- error = pidfile_verify(pfh);
+ error = pidfile_verify (pfh);
if (error != 0) {
errno = error;
- return (-1);
+ return -1;
}
- if (unlink(pfh->pf_path) == -1)
+ if (unlink (pfh->pf_path) == -1)
error = errno;
- if (flock(pfh->pf_fd, LOCK_UN) == -1) {
+ if (flock (pfh->pf_fd, LOCK_UN) == -1) {
if (error == 0)
error = errno;
}
- if (close(pfh->pf_fd) == -1) {
+ if (close (pfh->pf_fd) == -1) {
if (error == 0)
error = errno;
}
if (freeit)
- free(pfh);
+ g_free (pfh);
else
pfh->pf_fd = -1;
if (error != 0) {
errno = error;
- return (-1);
+ return -1;
}
- return (0);
+ return 0;
}
int
-pidfile_remove(struct pidfh *pfh)
+pidfile_remove (struct pidfh *pfh)
{
- return (_pidfile_remove(pfh, 1));
+ return (_pidfile_remove (pfh, 1));
}
#endif
TAILQ_REMOVE (&task->parts, part, next);
}
memory_pool_delete (task->task_pool);
+ bufferevent_disable (task->bev, EV_READ | EV_WRITE);
+ bufferevent_free (task->bev);
+ close (task->sock);
g_free (task);
}
}
task->error_code = RSPAMD_NETWORK_ERROR;
task->state = WRITE_ERROR;
}
+ if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) {
+ bufferevent_enable (bev, EV_WRITE);
+ bufferevent_disable (bev, EV_READ);
+ }
free (s);
break;
case READ_MESSAGE:
task->state = WAIT_FILTER;
}
}
+ if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) {
+ bufferevent_enable (bev, EV_WRITE);
+ bufferevent_disable (bev, EV_READ);
+ }
}
else {
msg_err ("read_socket: cannot read data to buffer: %ld", (long int)r);
case WAIT_FILTER:
bufferevent_disable (bev, EV_READ);
break;
+ }
+}
+
+static void
+write_socket (struct bufferevent *bev, void *arg)
+{
+ struct worker_task *task = (struct worker_task *)arg;
+
+ switch (task->state) {
case WRITE_REPLY:
write_reply (task);
+ task->state = CLOSING_CONNECTION;
bufferevent_disable (bev, EV_READ);
- bufferevent_enable (bev, EV_WRITE);
break;
case WRITE_ERROR:
write_reply (task);
+ task->state = CLOSING_CONNECTION;
bufferevent_disable (bev, EV_READ);
- bufferevent_enable (bev, EV_WRITE);
break;
- }
-}
-
-static void
-write_socket (struct bufferevent *bev, void *arg)
-{
- struct worker_task *task = (struct worker_task *)arg;
-
- if (task->state > READ_MESSAGE) {
- msg_info ("closing connection");
- /* Free buffers */
- free_task (task);
- bufferevent_disable (bev, EV_WRITE);
- bufferevent_free (bev);
+ case CLOSING_CONNECTION:
+ msg_debug ("write_socket: normally closing connection");
+ free_task (task);
+ break;
+ default:
+ msg_info ("write_socket: abnormally closing connection");
+ free_task (task);
+ break;
}
}
err_socket (struct bufferevent *bev, short what, void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
- msg_info ("closing connection");
+ msg_info ("err_socket: abnormally closing connection");
/* Free buffers */
free_task (task);
- bufferevent_disable (bev, EV_READ);
- bufferevent_free (bev);
}
static void
bzero (new_task, sizeof (struct worker_task));
new_task->worker = worker;
new_task->state = READ_COMMAND;
+ new_task->sock = nfd;
new_task->cfg = worker->srv->cfg;
TAILQ_INIT (&new_task->urls);
TAILQ_INIT (&new_task->parts);
#else
new_task->task_pool = memory_pool_new (TASK_POOL_SIZE);
#endif
- new_task->memc_ctx = memory_pool_alloc (new_task->task_pool, sizeof (memcached_ctx_t));
- if (new_task->memc_ctx == NULL) {
- msg_err ("accept_socket: cannot allocate memory for memcached ctx, %m");
- }
- else {
- if (memc_init_ctx (new_task->memc_ctx) == -1) {
- msg_err ("accept_socket: cannot init memcached context for task");
- }
- }
/* Read event */
new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task);