Browse Source

* Fix dispatcher timeouts handling

* Add wanna_die flag that can be used in dispatcher's callbacks
tags/0.2.7
Vsevolod Stakhov 15 years ago
parent
commit
606128de4c
5 changed files with 57 additions and 15 deletions
  1. 34
    10
      src/buffer.c
  2. 1
    0
      src/buffer.h
  3. 7
    1
      src/controller.c
  4. 2
    0
      src/protocol.c
  5. 13
    4
      src/worker.c

+ 34
- 10
src/buffer.c View File

@@ -24,6 +24,7 @@

#include "config.h"
#include "buffer.h"
#include "main.h"

#define G_DISPATCHER_ERROR dispatcher_error_quark()

@@ -57,7 +58,6 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
cur = g_list_next (cur);
continue;
}

r = write (fd, buf->pos, BUFREMAIN (buf));
if (r == -1 && errno != EAGAIN) {
if (d->err_callback) {
@@ -70,6 +70,7 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
buf->pos += r;
if (BUFREMAIN (buf) != 0) {
/* Continue with this buffer */
msg_debug ("write_buffers: wrote %ld bytes of %ld", (long int)r, (long int)buf->data->len);
continue;
}
}
@@ -83,6 +84,9 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
}
else if (errno == EAGAIN) {
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
event_add (d->ev, d->tv);
return;
}
cur = g_list_next (cur);
@@ -93,23 +97,37 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
g_list_free (d->out_buffers);
d->out_buffers = NULL;

msg_debug ("write_buffers: all buffers were written successfully");

if (d->write_callback) {
d->write_callback (d->user_data);
if (d->wanna_die) {
msg_debug ("write_buffers: callback set wanna_die flag, terminating");
rspamd_remove_dispatcher (d);
return;
}
}

event_del (d->ev);
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
event_add (d->ev, d->tv);
}
else {
/* Plan other write event */
event_del (d->ev);
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
event_add (d->ev, d->tv);
}
}

static void
read_buffers (int fd, rspamd_io_dispatcher_t *d)
{
ssize_t r, len;
ssize_t r;
GError *err;
f_str_t res;
char *c;
unsigned int len;

if (d->in_buf == NULL) {
d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
@@ -170,12 +188,17 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d)
res.len = r;
if (d->read_callback) {
d->read_callback (&res, d->user_data);
if (d->wanna_die) {
msg_debug ("read_buffers: callback set wanna_die flag, terminating");
rspamd_remove_dispatcher (d);
return;
}
if (r < len - 1 && *(c + 1) == '\n') {
r ++;
c ++;
}
/* Move remaining string to begin of buffer (draining) */
memmove (d->in_buf->data->begin, c, len - r);
memmove (d->in_buf->data->begin, c + 1, len - r - 1);
c = d->in_buf->data->begin;
d->in_buf->data->len -= r + 1;
d->in_buf->pos -= r + 1;
@@ -274,7 +297,7 @@ rspamd_create_dispatcher (int fd, enum io_policy policy,
new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
new->fd = fd;

event_set (new->ev, fd, EV_READ | EV_WRITE | EV_PERSIST, dispatcher_cb, (void *)new);
event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
event_add (new->ev, new->tv);

return new;
@@ -328,18 +351,19 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
rspamd_buffer_t *newbuf;

newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
newbuf->data = memory_pool_alloc (d->pool, sizeof (f_str_t));
newbuf->data->begin = memory_pool_alloc (d->pool, len);
newbuf->data = fstralloc (d->pool, len);
/* We need to copy data to temporary internal buffer to avoid using of stack variables */
memcpy (newbuf->data->begin, data, len);
newbuf->data->size = len;
newbuf->pos = newbuf->data->begin;
newbuf->data->len = len;
d->out_buffers = g_list_prepend (d->out_buffers, newbuf);

if (!delayed) {
msg_debug ("rspamd_dispatcher_write: plan write event");
event_del (d->ev);
event_set (d->ev, d->fd, EV_READ | EV_WRITE | EV_PERSIST, dispatcher_cb, (void *)d);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
event_add (d->ev, d->tv);
}
}

+ 1
- 0
src/buffer.h View File

@@ -39,6 +39,7 @@ typedef struct rspamd_io_dispatcher_s {
enum io_policy policy; /**< IO policy */
size_t nchars; /**< how many chars to read */
int fd; /**< descriptor */
gboolean wanna_die; /**< if dispatcher should be stopped */
dispatcher_read_callback_t read_callback; /**< read callback */
dispatcher_write_callback_t write_callback; /**< write callback */
dispatcher_err_callback_t err_callback; /**< error callback */

+ 7
- 1
src/controller.c View File

@@ -457,7 +457,13 @@ static void
err_socket (GError *err, void *arg)
{
struct controller_session *session = (struct controller_session *)arg;
msg_info ("err_socket: abnormally closing control connection, error: %s", err->message);

if (err->code == EOF) {
msg_info ("err_socket: client closed control connection");
}
else {
msg_info ("err_socket: abnormally closing control connection, error: %s", err->message);
}
/* Free buffers */
free_session (session);
}

+ 2
- 0
src/protocol.c View File

@@ -91,6 +91,7 @@ parse_command (struct worker_task *task, char *line)
{
char *token;

msg_debug ("parse_command: got line from worker: %s", line);
token = strsep (&line, " ");
if (line == NULL || token == NULL) {
msg_debug ("parse_command: bad command: %s", token);
@@ -191,6 +192,7 @@ parse_header (struct worker_task *task, char *line)
task->last_error = "Unknown content length";
task->error_code = RSPAMD_LENGTH_ERROR;
task->state = WRITE_ERROR;
return -1;
}
}
return 0;

+ 13
- 4
src/worker.c View File

@@ -40,8 +40,8 @@
#include <perl.h> /* from the Perl distribution */

#define TASK_POOL_SIZE 4095
/* 2 seconds for worker's IO */
#define WORKER_IO_TIMEOUT 2
/* 60 seconds for worker's IO */
#define WORKER_IO_TIMEOUT 60

const f_str_t CRLF = {
/* begin */"\r\n",
@@ -119,7 +119,8 @@ free_task (struct worker_task *task)
g_list_free_1 (part);
}
memory_pool_delete (task->task_pool);
rspamd_remove_dispatcher (task->dispatcher);
/* Plan dispatcher shutdown */
task->dispatcher->wanna_die = 1;
close (task->sock);
g_free (task);
}
@@ -216,7 +217,8 @@ accept_socket (int fd, short what, void *arg)
struct sockaddr_storage ss;
struct worker_task *new_task;
socklen_t addrlen = sizeof(ss);
int nfd;
int nfd, on = 1;
struct linger linger;

if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
return;
@@ -224,6 +226,13 @@ accept_socket (int fd, short what, void *arg)
if (event_make_socket_nonblocking(fd) < 0) {
return;
}

/* Socket options */
setsockopt (nfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on));
setsockopt (nfd, SOL_SOCKET, SO_REUSEADDR, (void *)&on, sizeof(on));
linger.l_onoff = 1;
linger.l_linger = 2;
setsockopt (nfd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger));
new_task = g_malloc (sizeof (struct worker_task));
if (new_task == NULL) {

Loading…
Cancel
Save