|
|
@@ -82,7 +82,8 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d) |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
else if (errno == EAGAIN) { |
|
|
|
else if (r == -1 && errno == EAGAIN) { |
|
|
|
msg_debug ("write_buffers: partially write data, retry"); |
|
|
|
/* Wait for other event */ |
|
|
|
event_del (d->ev); |
|
|
|
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); |
|
|
@@ -128,6 +129,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d) |
|
|
|
f_str_t res; |
|
|
|
char *c; |
|
|
|
unsigned int len; |
|
|
|
enum io_policy saved_policy; |
|
|
|
|
|
|
|
if (d->in_buf == NULL) { |
|
|
|
d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); |
|
|
@@ -166,7 +168,8 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d) |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
else if (errno == EAGAIN) { |
|
|
|
else if (r == -1 && errno == EAGAIN) { |
|
|
|
msg_debug ("read_buffers: partially read data, retry"); |
|
|
|
return; |
|
|
|
} |
|
|
|
else { |
|
|
@@ -176,6 +179,10 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
msg_debug ("read_buffers: read %ld characters, policy is %s, watermark is: %ld", |
|
|
|
(long int)r, d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", |
|
|
|
(long int)d->nchars); |
|
|
|
saved_policy = d->policy; |
|
|
|
c = d->in_buf->data->begin; |
|
|
|
r = 0; |
|
|
|
len = d->in_buf->data->len; |
|
|
@@ -204,6 +211,11 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d) |
|
|
|
d->in_buf->pos -= r + 1; |
|
|
|
r = 0; |
|
|
|
len = d->in_buf->data->len; |
|
|
|
if (d->policy != saved_policy) { |
|
|
|
msg_debug ("read_buffers: policy changed during callback, restart buffer's processing"); |
|
|
|
read_buffers (fd, d); |
|
|
|
return; |
|
|
|
} |
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
@@ -212,7 +224,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d) |
|
|
|
} |
|
|
|
break; |
|
|
|
case BUFFER_CHARACTER: |
|
|
|
while (r < len) { |
|
|
|
while (r <= len) { |
|
|
|
if (r == d->nchars) { |
|
|
|
res.begin = d->in_buf->data->begin; |
|
|
|
res.len = r; |
|
|
@@ -225,6 +237,11 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d) |
|
|
|
d->in_buf->pos -= r; |
|
|
|
r = 0; |
|
|
|
len = d->in_buf->data->len; |
|
|
|
if (d->policy != saved_policy) { |
|
|
|
msg_debug ("read_buffers: policy changed during callback, restart buffer's processing"); |
|
|
|
read_buffers (fd, d); |
|
|
|
return; |
|
|
|
} |
|
|
|
continue; |
|
|
|
} |
|
|
|
|
|
|
@@ -244,6 +261,8 @@ dispatcher_cb (int fd, short what, void *arg) |
|
|
|
rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *)arg; |
|
|
|
GError *err; |
|
|
|
|
|
|
|
msg_debug ("dispatcher_cb: in dispatcher callback, what: %d, fd: %d", (int)what, fd); |
|
|
|
|
|
|
|
switch (what) { |
|
|
|
case EV_TIMEOUT: |
|
|
|
if (d->err_callback) { |
|
|
@@ -325,6 +344,7 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d, |
|
|
|
size_t nchars) |
|
|
|
{ |
|
|
|
f_str_t *tmp; |
|
|
|
int t; |
|
|
|
|
|
|
|
if (d->policy != policy) { |
|
|
|
d->policy = policy; |
|
|
@@ -334,19 +354,25 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d, |
|
|
|
if (d->in_buf && d->in_buf->data->size < nchars) { |
|
|
|
tmp = fstralloc (d->pool, d->nchars); |
|
|
|
memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len); |
|
|
|
t = d->in_buf->pos - d->in_buf->data->begin; |
|
|
|
tmp->len = d->in_buf->data->len; |
|
|
|
d->in_buf->data = tmp; |
|
|
|
d->in_buf->pos = d->in_buf->data->begin + t; |
|
|
|
} |
|
|
|
} |
|
|
|
else if (policy == BUFFER_LINE) { |
|
|
|
if (d->in_buf && d->nchars < BUFSIZ) { |
|
|
|
tmp = fstralloc (d->pool, BUFSIZ); |
|
|
|
memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len); |
|
|
|
t = d->in_buf->pos - d->in_buf->data->begin; |
|
|
|
tmp->len = d->in_buf->data->len; |
|
|
|
d->in_buf->data = tmp; |
|
|
|
d->in_buf->pos = d->in_buf->data->begin + t; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
msg_debug ("rspamd_set_dispatcher_policy: new input length watermark is %ld", (long int)d->nchars); |
|
|
|
} |
|
|
|
|
|
|
|
void |