aboutsummaryrefslogtreecommitdiffstats
path: root/src/buffer.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-10-02 17:09:38 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-10-02 17:09:38 +0400
commitf3ad9c6f1e91c9912dbe730fdec350b5fc908672 (patch)
tree005e5568431db09becaa9b67a33dfc11f80bba7f /src/buffer.c
parente6a1d22de250c10992b484635fd95a03f197f779 (diff)
downloadrspamd-f3ad9c6f1e91c9912dbe730fdec350b5fc908672.tar.gz
rspamd-f3ad9c6f1e91c9912dbe730fdec350b5fc908672.zip
* Retab, no functional changes
Diffstat (limited to 'src/buffer.c')
-rw-r--r--src/buffer.c209
1 files changed, 100 insertions, 109 deletions
diff --git a/src/buffer.c b/src/buffer.c
index 125c11686..6e862664a 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -28,23 +28,23 @@
#define G_DISPATCHER_ERROR dispatcher_error_quark()
-static void dispatcher_cb (int fd, short what, void *arg);
+static void dispatcher_cb (int fd, short what, void *arg);
-static inline GQuark
+static inline GQuark
dispatcher_error_quark (void)
{
- return g_quark_from_static_string ("g-dispatcher-error-quark");
+ return g_quark_from_static_string ("g-dispatcher-error-quark");
}
#define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
-static gboolean
-write_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean is_delayed)
+static gboolean
+write_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
{
- GList *cur;
- GError *err;
- rspamd_buffer_t *buf;
- ssize_t r;
+ GList *cur;
+ GError *err;
+ rspamd_buffer_t *buf;
+ ssize_t r;
/* Fix order */
if (d->out_buffers) {
@@ -52,7 +52,7 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean is_delayed)
}
cur = g_list_first (d->out_buffers);
while (cur) {
- buf = (rspamd_buffer_t *)cur->data;
+ buf = (rspamd_buffer_t *) cur->data;
if (BUFREMAIN (buf) == 0) {
/* Skip empty buffers */
cur = g_list_next (cur);
@@ -106,7 +106,7 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean is_delayed)
return FALSE;
}
}
-
+
event_del (d->ev);
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
event_add (d->ev, d->tv);
@@ -122,16 +122,16 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean is_delayed)
}
static void
-read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
+read_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
{
- ssize_t r;
- GError *err;
- f_str_t res;
- char *c, *b;
- char **pos;
- size_t *len;
- enum io_policy saved_policy;
-
+ ssize_t r;
+ GError *err;
+ f_str_t res;
+ char *c, *b;
+ char **pos;
+ size_t *len;
+ enum io_policy saved_policy;
+
if (d->wanna_die) {
rspamd_remove_dispatcher (d);
return;
@@ -150,7 +150,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
pos = &d->in_buf->pos;
len = &d->in_buf->data->len;
-
+
if (BUFREMAIN (d->in_buf) == 0) {
/* Buffer is full, try to call callback with overflow error */
if (d->err_callback) {
@@ -185,63 +185,34 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
*pos += r;
*len += r;
}
- msg_debug ("read_buffers: read %ld characters, policy is %s, watermark is: %ld",
- (long int)r, d->policy == BUFFER_LINE ? "LINE" : "CHARACTER",
- (long int)d->nchars);
+ msg_debug ("read_buffers: read %ld characters, policy is %s, watermark is: %ld", (long int)r, d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", (long int)d->nchars);
}
-
+
saved_policy = d->policy;
c = d->in_buf->data->begin;
b = c;
r = 0;
switch (d->policy) {
- case BUFFER_LINE:
- while (r < *len) {
- if (*c == '\n') {
- res.begin = b;
- res.len = r;
- if (r != 0 && *(c - 1) == '\r') {
- res.len --;
- }
- if (d->read_callback) {
- if (!d->read_callback (&res, d->user_data)) {
- return;
- }
- /* Move remaining string to begin of buffer (draining) */
- memmove (d->in_buf->data->begin, c + 1, *len - r - 1);
- b = d->in_buf->data->begin;
- c = b;
- *len -= r + 1;
- *pos = b + *len;
- r = 0;
- if (d->policy != saved_policy) {
- msg_debug ("read_buffers: policy changed during callback, restart buffer's processing");
- read_buffers (fd, d, TRUE);
- return;
- }
- }
- }
- r ++;
- c ++;
- }
- break;
- case BUFFER_CHARACTER:
- r = d->nchars;
- if (*len >= r) {
+ case BUFFER_LINE:
+ while (r < *len) {
+ if (*c == '\n') {
res.begin = b;
res.len = r;
- c = b + r;
+ if (r != 0 && *(c - 1) == '\r') {
+ res.len--;
+ }
if (d->read_callback) {
if (!d->read_callback (&res, d->user_data)) {
return;
}
/* Move remaining string to begin of buffer (draining) */
- memmove (d->in_buf->data->begin, c, *len - r);
+ memmove (d->in_buf->data->begin, c + 1, *len - r - 1);
b = d->in_buf->data->begin;
c = b;
- *len -= r;
+ *len -= r + 1;
*pos = b + *len;
+ r = 0;
if (d->policy != saved_policy) {
msg_debug ("read_buffers: policy changed during callback, restart buffer's processing");
read_buffers (fd, d, TRUE);
@@ -249,7 +220,34 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
}
}
}
- break;
+ r++;
+ c++;
+ }
+ break;
+ case BUFFER_CHARACTER:
+ r = d->nchars;
+ if (*len >= r) {
+ res.begin = b;
+ res.len = r;
+ c = b + r;
+ if (d->read_callback) {
+ if (!d->read_callback (&res, d->user_data)) {
+ return;
+ }
+ /* Move remaining string to begin of buffer (draining) */
+ memmove (d->in_buf->data->begin, c, *len - r);
+ b = d->in_buf->data->begin;
+ c = b;
+ *len -= r;
+ *pos = b + *len;
+ if (d->policy != saved_policy) {
+ msg_debug ("read_buffers: policy changed during callback, restart buffer's processing");
+ read_buffers (fd, d, TRUE);
+ return;
+ }
+ }
+ }
+ break;
}
}
@@ -258,50 +256,47 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
static void
dispatcher_cb (int fd, short what, void *arg)
{
- rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *)arg;
- GError *err;
+ rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *) arg;
+ GError *err;
msg_debug ("dispatcher_cb: in dispatcher callback, what: %d, fd: %d", (int)what, fd);
switch (what) {
- case EV_TIMEOUT:
- if (d->err_callback) {
- err = g_error_new (G_DISPATCHER_ERROR, ETIMEDOUT, "IO timeout");
- d->err_callback (err, d->user_data);
- }
- break;
- case EV_WRITE:
- /* No data to write, disable further EV_WRITE to this fd */
- if (d->out_buffers == NULL) {
- event_del (d->ev);
- event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
- event_add (d->ev, d->tv);
- }
- else {
- /* Delayed write */
- write_buffers (fd, d, TRUE);
- }
- break;
- case EV_READ:
- read_buffers (fd, d, FALSE);
- break;
+ case EV_TIMEOUT:
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, ETIMEDOUT, "IO timeout");
+ d->err_callback (err, d->user_data);
+ }
+ break;
+ case EV_WRITE:
+ /* No data to write, disable further EV_WRITE to this fd */
+ if (d->out_buffers == NULL) {
+ event_del (d->ev);
+ event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
+ else {
+ /* Delayed write */
+ write_buffers (fd, d, TRUE);
+ }
+ break;
+ case EV_READ:
+ read_buffers (fd, d, FALSE);
+ break;
}
}
-rspamd_io_dispatcher_t*
+rspamd_io_dispatcher_t *
rspamd_create_dispatcher (int fd, enum io_policy policy,
- dispatcher_read_callback_t read_cb,
- dispatcher_write_callback_t write_cb,
- dispatcher_err_callback_t err_cb,
- struct timeval *tv, void *user_data)
+ dispatcher_read_callback_t read_cb, dispatcher_write_callback_t write_cb, dispatcher_err_callback_t err_cb, struct timeval *tv, void *user_data)
{
- rspamd_io_dispatcher_t *new;
+ rspamd_io_dispatcher_t *new;
if (fd == -1) {
return NULL;
}
-
+
new = g_malloc (sizeof (rspamd_io_dispatcher_t));
bzero (new, sizeof (rspamd_io_dispatcher_t));
@@ -329,8 +324,8 @@ rspamd_create_dispatcher (int fd, enum io_policy policy,
return new;
}
-void
-rspamd_remove_dispatcher (rspamd_io_dispatcher_t *dispatcher)
+void
+rspamd_remove_dispatcher (rspamd_io_dispatcher_t * dispatcher)
{
if (dispatcher != NULL) {
event_del (dispatcher->ev);
@@ -342,13 +337,11 @@ rspamd_remove_dispatcher (rspamd_io_dispatcher_t *dispatcher)
}
}
-void
-rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
- enum io_policy policy,
- size_t nchars)
+void
+rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy, size_t nchars)
{
- f_str_t *tmp;
- int t;
+ f_str_t *tmp;
+ int t;
if (d->policy != policy) {
d->policy = policy;
@@ -379,17 +372,15 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
msg_debug ("rspamd_set_dispatcher_policy: new input length watermark is %ld", (long int)d->nchars);
}
-gboolean
-rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
- void *data,
- size_t len, gboolean delayed, gboolean allocated)
+gboolean
+rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, void *data, size_t len, gboolean delayed, gboolean allocated)
{
- rspamd_buffer_t *newbuf;
+ rspamd_buffer_t *newbuf;
newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
if (!allocated) {
newbuf->data = fstralloc (d->pool, len);
-
+
/* We need to copy data to temporary internal buffer to avoid using of stack variables */
memcpy (newbuf->data->begin, data, len);
}
@@ -401,7 +392,7 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
newbuf->pos = newbuf->data->begin;
newbuf->data->len = len;
-
+
d->out_buffers = g_list_prepend (d->out_buffers, newbuf);
if (!delayed) {
@@ -411,8 +402,8 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
return TRUE;
}
-void
-rspamd_dispatcher_pause (rspamd_io_dispatcher_t *d)
+void
+rspamd_dispatcher_pause (rspamd_io_dispatcher_t * d)
{
event_del (d->ev);
}