diff options
-rw-r--r-- | src/buffer.c | 32 | ||||
-rw-r--r-- | src/plugins/surbl.c | 4 | ||||
-rw-r--r-- | src/worker.c | 6 |
3 files changed, 36 insertions, 6 deletions
diff --git a/src/buffer.c b/src/buffer.c index 6cffbea9e..8883e314d 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -82,7 +82,8 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d) return; } } - else if (errno == EAGAIN) { + else if (r == -1 && errno == EAGAIN) { + msg_debug ("write_buffers: partially write data, retry"); /* Wait for other event */ event_del (d->ev); event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); @@ -128,6 +129,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d) f_str_t res; char *c; unsigned int len; + enum io_policy saved_policy; if (d->in_buf == NULL) { d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); @@ -166,7 +168,8 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d) return; } } - else if (errno == EAGAIN) { + else if (r == -1 && errno == EAGAIN) { + msg_debug ("read_buffers: partially read data, retry"); return; } else { @@ -176,6 +179,10 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d) } + msg_debug ("read_buffers: read %ld characters, policy is %s, watermark is: %ld", + (long int)r, d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", + (long int)d->nchars); + saved_policy = d->policy; c = d->in_buf->data->begin; r = 0; len = d->in_buf->data->len; @@ -204,6 +211,11 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d) d->in_buf->pos -= r + 1; r = 0; len = d->in_buf->data->len; + if (d->policy != saved_policy) { + msg_debug ("read_buffers: policy changed during callback, restart buffer's processing"); + read_buffers (fd, d); + return; + } continue; } } @@ -212,7 +224,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d) } break; case BUFFER_CHARACTER: - while (r < len) { + while (r <= len) { if (r == d->nchars) { res.begin = d->in_buf->data->begin; res.len = r; @@ -225,6 +237,11 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d) d->in_buf->pos -= r; r = 0; len = d->in_buf->data->len; + if (d->policy != saved_policy) { + msg_debug ("read_buffers: policy changed during callback, restart buffer's processing"); + read_buffers (fd, d); + return; + } continue; } @@ -244,6 +261,8 @@ dispatcher_cb (int fd, short what, void *arg) rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *)arg; GError *err; + msg_debug ("dispatcher_cb: in dispatcher callback, what: %d, fd: %d", (int)what, fd); + switch (what) { case EV_TIMEOUT: if (d->err_callback) { @@ -325,6 +344,7 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d, size_t nchars) { f_str_t *tmp; + int t; if (d->policy != policy) { d->policy = policy; @@ -334,19 +354,25 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d, if (d->in_buf && d->in_buf->data->size < nchars) { tmp = fstralloc (d->pool, d->nchars); memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len); + t = d->in_buf->pos - d->in_buf->data->begin; tmp->len = d->in_buf->data->len; d->in_buf->data = tmp; + d->in_buf->pos = d->in_buf->data->begin + t; } } else if (policy == BUFFER_LINE) { if (d->in_buf && d->nchars < BUFSIZ) { tmp = fstralloc (d->pool, BUFSIZ); memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len); + t = d->in_buf->pos - d->in_buf->data->begin; tmp->len = d->in_buf->data->len; d->in_buf->data = tmp; + d->in_buf->pos = d->in_buf->data->begin + t; } } } + + msg_debug ("rspamd_set_dispatcher_policy: new input length watermark is %ld", (long int)d->nchars); } void diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c index e048ed132..330e9cc66 100644 --- a/src/plugins/surbl.c +++ b/src/plugins/surbl.c @@ -310,8 +310,11 @@ static void dns_callback (int result, char type, int count, int ttl, void *addresses, void *data) { struct memcached_param *param = (struct memcached_param *)data; + char c; msg_debug ("dns_callback: in surbl request callback"); + c = *(param->url->host + param->url->hostlen); + *(param->url->host + param->url->hostlen) = 0; /* If we have result from DNS server, this url exists in SURBL, so increase score */ if (result == DNS_ERR_NONE && type == DNS_IPv4_A) { msg_info ("surbl_check: url %s is in surbl %s", param->url->host, surbl_module_ctx->suffix); @@ -320,6 +323,7 @@ dns_callback (int result, char type, int count, int ttl, void *addresses, void * else { msg_debug ("surbl_check: url %s is not in surbl %s", param->url->host, surbl_module_ctx->suffix); } + *(param->url->host + param->url->hostlen) = c; param->task->save.saved --; if (param->task->save.saved == 0) { diff --git a/src/worker.c b/src/worker.c index 143cb54de..03912930f 100644 --- a/src/worker.c +++ b/src/worker.c @@ -144,6 +144,7 @@ read_socket (f_str_t *in, void *arg) break; case READ_MESSAGE: task->msg = in; + msg_debug ("read_socket: got string of length %ld", (long int)task->msg->len); r = process_message (task); r = process_filters (task); if (r == -1) { @@ -242,6 +243,8 @@ accept_socket (int fd, short what, void *arg) new_task->state = READ_COMMAND; new_task->sock = nfd; new_task->cfg = worker->srv->cfg; + io_tv.tv_sec = WORKER_IO_TIMEOUT; + io_tv.tv_usec = 0; TAILQ_INIT (&new_task->urls); new_task->task_pool = memory_pool_new (memory_pool_get_size ()); /* Add destructor for recipients list (it would be better to use anonymous function here */ @@ -289,9 +292,6 @@ start_worker (struct rspamd_worker *worker, int listen_sock) /* Send SIGUSR2 to parent */ kill (getppid (), SIGUSR2); - io_tv.tv_sec = WORKER_IO_TIMEOUT; - io_tv.tv_usec = 0; - event_loop (0); } |