summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/buffer.c32
-rw-r--r--src/plugins/surbl.c4
-rw-r--r--src/worker.c6
3 files changed, 36 insertions, 6 deletions
diff --git a/src/buffer.c b/src/buffer.c
index 6cffbea9e..8883e314d 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -82,7 +82,8 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
return;
}
}
- else if (errno == EAGAIN) {
+ else if (r == -1 && errno == EAGAIN) {
+ msg_debug ("write_buffers: partially write data, retry");
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
@@ -128,6 +129,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d)
f_str_t res;
char *c;
unsigned int len;
+ enum io_policy saved_policy;
if (d->in_buf == NULL) {
d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
@@ -166,7 +168,8 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d)
return;
}
}
- else if (errno == EAGAIN) {
+ else if (r == -1 && errno == EAGAIN) {
+ msg_debug ("read_buffers: partially read data, retry");
return;
}
else {
@@ -176,6 +179,10 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d)
}
+ msg_debug ("read_buffers: read %ld characters, policy is %s, watermark is: %ld",
+ (long int)r, d->policy == BUFFER_LINE ? "LINE" : "CHARACTER",
+ (long int)d->nchars);
+ saved_policy = d->policy;
c = d->in_buf->data->begin;
r = 0;
len = d->in_buf->data->len;
@@ -204,6 +211,11 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d)
d->in_buf->pos -= r + 1;
r = 0;
len = d->in_buf->data->len;
+ if (d->policy != saved_policy) {
+ msg_debug ("read_buffers: policy changed during callback, restart buffer's processing");
+ read_buffers (fd, d);
+ return;
+ }
continue;
}
}
@@ -212,7 +224,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d)
}
break;
case BUFFER_CHARACTER:
- while (r < len) {
+ while (r <= len) {
if (r == d->nchars) {
res.begin = d->in_buf->data->begin;
res.len = r;
@@ -225,6 +237,11 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d)
d->in_buf->pos -= r;
r = 0;
len = d->in_buf->data->len;
+ if (d->policy != saved_policy) {
+ msg_debug ("read_buffers: policy changed during callback, restart buffer's processing");
+ read_buffers (fd, d);
+ return;
+ }
continue;
}
@@ -244,6 +261,8 @@ dispatcher_cb (int fd, short what, void *arg)
rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *)arg;
GError *err;
+ msg_debug ("dispatcher_cb: in dispatcher callback, what: %d, fd: %d", (int)what, fd);
+
switch (what) {
case EV_TIMEOUT:
if (d->err_callback) {
@@ -325,6 +344,7 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
size_t nchars)
{
f_str_t *tmp;
+ int t;
if (d->policy != policy) {
d->policy = policy;
@@ -334,19 +354,25 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
if (d->in_buf && d->in_buf->data->size < nchars) {
tmp = fstralloc (d->pool, d->nchars);
memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
+ t = d->in_buf->pos - d->in_buf->data->begin;
tmp->len = d->in_buf->data->len;
d->in_buf->data = tmp;
+ d->in_buf->pos = d->in_buf->data->begin + t;
}
}
else if (policy == BUFFER_LINE) {
if (d->in_buf && d->nchars < BUFSIZ) {
tmp = fstralloc (d->pool, BUFSIZ);
memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
+ t = d->in_buf->pos - d->in_buf->data->begin;
tmp->len = d->in_buf->data->len;
d->in_buf->data = tmp;
+ d->in_buf->pos = d->in_buf->data->begin + t;
}
}
}
+
+ msg_debug ("rspamd_set_dispatcher_policy: new input length watermark is %ld", (long int)d->nchars);
}
void
diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c
index e048ed132..330e9cc66 100644
--- a/src/plugins/surbl.c
+++ b/src/plugins/surbl.c
@@ -310,8 +310,11 @@ static void
dns_callback (int result, char type, int count, int ttl, void *addresses, void *data)
{
struct memcached_param *param = (struct memcached_param *)data;
+ char c;
msg_debug ("dns_callback: in surbl request callback");
+ c = *(param->url->host + param->url->hostlen);
+ *(param->url->host + param->url->hostlen) = 0;
/* If we have result from DNS server, this url exists in SURBL, so increase score */
if (result == DNS_ERR_NONE && type == DNS_IPv4_A) {
msg_info ("surbl_check: url %s is in surbl %s", param->url->host, surbl_module_ctx->suffix);
@@ -320,6 +323,7 @@ dns_callback (int result, char type, int count, int ttl, void *addresses, void *
else {
msg_debug ("surbl_check: url %s is not in surbl %s", param->url->host, surbl_module_ctx->suffix);
}
+ *(param->url->host + param->url->hostlen) = c;
param->task->save.saved --;
if (param->task->save.saved == 0) {
diff --git a/src/worker.c b/src/worker.c
index 143cb54de..03912930f 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -144,6 +144,7 @@ read_socket (f_str_t *in, void *arg)
break;
case READ_MESSAGE:
task->msg = in;
+ msg_debug ("read_socket: got string of length %ld", (long int)task->msg->len);
r = process_message (task);
r = process_filters (task);
if (r == -1) {
@@ -242,6 +243,8 @@ accept_socket (int fd, short what, void *arg)
new_task->state = READ_COMMAND;
new_task->sock = nfd;
new_task->cfg = worker->srv->cfg;
+ io_tv.tv_sec = WORKER_IO_TIMEOUT;
+ io_tv.tv_usec = 0;
TAILQ_INIT (&new_task->urls);
new_task->task_pool = memory_pool_new (memory_pool_get_size ());
/* Add destructor for recipients list (it would be better to use anonymous function here */
@@ -289,9 +292,6 @@ start_worker (struct rspamd_worker *worker, int listen_sock)
/* Send SIGUSR2 to parent */
kill (getppid (), SIGUSR2);
- io_tv.tv_sec = WORKER_IO_TIMEOUT;
- io_tv.tv_usec = 0;
-
event_loop (0);
}