]> source.dussan.org Git - rspamd.git/commitdiff
* Improve performance of IO reading by reworking IO dispatcher algorithm
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 8 May 2009 14:46:39 +0000 (18:46 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 8 May 2009 14:46:39 +0000 (18:46 +0400)
src/buffer.c
src/mem_pool.c
src/mem_pool.h
src/protocol.c

index fe5cf7a72ec77acdd0b2df982c1ceb1bcc346338..b8e2c4dcc6ce18c621e569402a2c83636bd13f64 100644 (file)
@@ -127,10 +127,12 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
        ssize_t r;
        GError *err;
        f_str_t res;
-       char *c;
-       unsigned int len;
+       char *c, *b;
+       char **pos;
+       unsigned int *len;
        enum io_policy saved_policy;
 
+
        if (d->in_buf == NULL) {
                d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
                if (d->policy == BUFFER_LINE) {
@@ -141,6 +143,9 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
                }
                d->in_buf->pos = d->in_buf->data->begin;
        }
+
+       pos = &d->in_buf->pos;
+       len = &d->in_buf->data->len;
        
        if (BUFREMAIN (d->in_buf) == 0) {
                /* Buffer is full, try to call callback with overflow error */
@@ -152,7 +157,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
        }
        else if (!skip_read) {
                /* Try to read the whole buffer */
-               r = read (fd, d->in_buf->pos, BUFREMAIN (d->in_buf));
+               r = read (fd, *pos, BUFREMAIN (d->in_buf));
                if (r == -1 && errno != EAGAIN) {
                        if (d->err_callback) {
                                err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
@@ -173,8 +178,8 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
                        return;
                }
                else {
-                       d->in_buf->pos += r;
-                       d->in_buf->data->len += r;
+                       *pos += r;
+                       *len += r;
                }
                msg_debug ("read_buffers: read %ld characters, policy is %s, watermark is: %ld", 
                                (long int)r, d->policy == BUFFER_LINE ? "LINE" : "CHARACTER",
@@ -183,15 +188,18 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
        
        saved_policy = d->policy;
        c = d->in_buf->data->begin;
+       b = c;
        r = 0;
-       len = d->in_buf->data->len;
 
        switch (d->policy) {
                case BUFFER_LINE:
-                       while (r < len) {
-                               if (*c == '\r' || *c == '\n') {
-                                       res.begin = d->in_buf->data->begin;
+                       while (r < *len) {
+                               if (*c == '\n') {
+                                       res.begin = b;
                                        res.len = r;
+                                       if (*(c - 1) == '\r') {
+                                               res.len --;
+                                       }
                                        if (d->read_callback) {
                                                d->read_callback (&res, d->user_data);
                                                if (d->wanna_die) {
@@ -199,23 +207,18 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
                                                        rspamd_remove_dispatcher (d);
                                                        return;
                                                }
-                                               if (r < len - 1 && *(c + 1) == '\n') {
-                                                       r ++;
-                                                       c ++;
-                                               }
                                                /* Move remaining string to begin of buffer (draining) */
-                                               memmove (d->in_buf->data->begin, c + 1, len - r - 1);
-                                               c = d->in_buf->data->begin;
-                                               d->in_buf->data->len -= r + 1;
-                                               d->in_buf->pos -= r + 1;
+                                               memmove (d->in_buf->data->begin, c + 1, *len - r - 1);
+                                               b = d->in_buf->data->begin;
+                                               c = b;
+                                               *len -= r + 1;
+                                               *pos = b + *len;
                                                r = 0;
-                                               len = d->in_buf->data->len;
                                                if (d->policy != saved_policy) {
                                                        msg_debug ("read_buffers: policy changed during callback, restart buffer's processing");
                                                        read_buffers (fd, d, TRUE);
                                                        return;
                                                }
-                                               continue;
                                        }
                                }
                                r ++;
@@ -223,30 +226,25 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
                        }
                        break;
                case BUFFER_CHARACTER:
-                       while (r <= len) {
-                               if (r == d->nchars) {
-                                       res.begin = d->in_buf->data->begin;
-                                       res.len = r;
-                                       if (d->read_callback) {
-                                               d->read_callback (&res, d->user_data);
-                                               /* Move remaining string to begin of buffer (draining) */
-                                               memmove (d->in_buf->data->begin, c, len - r);
-                                               c = d->in_buf->data->begin;
-                                               d->in_buf->data->len -= r;
-                                               d->in_buf->pos -= r;
-                                               r = 0;
-                                               len = d->in_buf->data->len;
-                                               if (d->policy != saved_policy) {
-                                                       msg_debug ("read_buffers: policy changed during callback, restart buffer's processing");
-                                                       read_buffers (fd, d, TRUE);
-                                                       return;
-                                               }
-                                               continue;
+                       r = d->nchars;
+                       if (*len >= r) {
+                               res.begin = b;
+                               res.len = r;
+                               c = b + r;
+                               if (d->read_callback) {
+                                       d->read_callback (&res, d->user_data);
+                                       /* Move remaining string to begin of buffer (draining) */
+                                       memmove (d->in_buf->data->begin, c, *len - r);
+                                       b = d->in_buf->data->begin;
+                                       c = b;
+                                       *len -= r;
+                                       *pos = b + *len;
+                                       if (d->policy != saved_policy) {
+                                               msg_debug ("read_buffers: policy changed during callback, restart buffer's processing");
+                                               read_buffers (fd, d, TRUE);
+                                               return;
                                        }
-                               
                                }
-                               r ++;
-                               c ++;
                        }
                        break;
        }
index df4c544f93d53231cfa7ab424018e9b236d148ed..76ae1141bdf0e50d843046c3f8531ed483592f97 100644 (file)
@@ -24,6 +24,7 @@
 
 #include "config.h"
 #include "mem_pool.h"
+#include "fstring.h"
 
 /* Sleep time for spin lock in nanoseconds */
 #define MUTEX_SLEEP_TIME 10000000L
@@ -227,6 +228,22 @@ memory_pool_strdup (memory_pool_t *pool, const char *src)
        return newstr;
 }
 
+char *
+memory_pool_fstrdup (memory_pool_t *pool, const struct f_str_s *src)
+{
+       char *newstr;
+
+       if (src == NULL) {
+               return NULL;
+       }
+
+       newstr = memory_pool_alloc (pool, src->len + 1);
+       memcpy (newstr, src->begin, src->len);
+       newstr[src->len] = '\0';
+       return newstr;
+}
+
+
 char *
 memory_pool_strdup_shared (memory_pool_t *pool, const char *src)
 {
index 275da17d4b942e24903adb56c230a2c942c28b27..ab477a78e206a2a8ccf83c54ac40398e915d8570 100644 (file)
@@ -14,6 +14,9 @@
 
 #include "config.h"
 
+
+struct f_str_s;
+
 /** 
  * Destructor type definition 
  */
@@ -116,6 +119,14 @@ void* memory_pool_alloc0 (memory_pool_t* pool, memory_pool_ssize_t size);
  */
 char* memory_pool_strdup (memory_pool_t* pool, const char *src);
 
+/**
+ * Make a copy of fixed string in pool as null terminated string
+ * @param pool memory pool object
+ * @param src source string
+ * @return pointer to newly created string that is copy of src
+ */
+char* memory_pool_fstrdup (memory_pool_t* pool, const struct f_str_s *src);
+
 /**
  * Allocate piece of shared memory
  * @param pool memory pool object
index 60b3b1bf17cbff5587051fd706b36463df0cc9bb..a330334f83f44a4b87bac7ee70a691857128d1c6 100644 (file)
 /* XXX: try to convert rspamd errors to spamd errors */
 #define SPAMD_ERROR "EX_ERROR"
 
+static char *
+separate_command (f_str_t *in, char c)
+{
+       int r = 0;
+       char *p = in->begin, *b;
+       b = p;
+
+       while (r < in->len) {
+               if (*p == c) {
+                       *p = '\0';
+                       in->begin = p + 1;
+                       return b;
+               }
+               p ++;
+               r ++;
+       }
+
+       return NULL;
+}
+
 static int
-parse_command (struct worker_task *task, char *line)
+parse_command (struct worker_task *task, f_str_t *line)
 {
        char *token;
 
-       msg_debug ("parse_command: got line from worker: %s", line);
-       token = strsep (&line, " ");
+       token = separate_command (line, ' ');
        if (line == NULL || token == NULL) {
                msg_debug ("parse_command: bad command: %s", token);
                return -1;
@@ -160,14 +179,13 @@ parse_command (struct worker_task *task, char *line)
                        return -1;
        }
 
-       if (strncasecmp (line, RSPAMC_GREETING, sizeof (RSPAMC_GREETING) - 1) == 0) {
+       if (strncasecmp (line->begin, RSPAMC_GREETING, sizeof (RSPAMC_GREETING) - 1) == 0) {
                task->proto = RSPAMC_PROTO;
        }
-       else if (strncasecmp (line, SPAMC_GREETING, sizeof (SPAMC_GREETING) -1) == 0) {
+       else if (strncasecmp (line->begin, SPAMC_GREETING, sizeof (SPAMC_GREETING) -1) == 0) {
                task->proto = SPAMC_PROTO;
        }
        else {
-               msg_debug ("parse_command: bad protocol version: %s", line);
                return -1;
        }
        task->state = READ_HEADER;
@@ -175,13 +193,12 @@ parse_command (struct worker_task *task, char *line)
 }
 
 static int
-parse_header (struct worker_task *task, char *line)
+parse_header (struct worker_task *task, f_str_t *line)
 {
        char *headern, *err, *tmp;
        
-       msg_debug ("parse_header: got line from worker: %s", line);
        /* Check end of headers */
-       if (*line == '\0') {
+       if (line->len == 0) {
                msg_debug ("parse_header: got empty line, assume it as end of headers");
                if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) {
                        task->state = WRITE_REPLY;
@@ -201,13 +218,12 @@ parse_header (struct worker_task *task, char *line)
                return 0;
        }
 
-       headern = strsep (&line, ":");
+       headern = separate_command (line, ':');
 
        if (line == NULL || headern == NULL) {
                return -1;
        }
        /* Eat whitespaces */
-       g_strstrip (line);
        g_strstrip (headern);
 
        switch (headern[0]) {
@@ -216,7 +232,8 @@ parse_header (struct worker_task *task, char *line)
                        /* content-length */
                        if (strncasecmp (headern, CONTENT_LENGTH_HEADER, sizeof (CONTENT_LENGTH_HEADER) - 1) == 0) {
                 if (task->content_length == 0) {
-                                   task->content_length = strtoul (line, &err, 10);
+                                       tmp = memory_pool_fstrdup (task->task_pool, line);
+                                   task->content_length = strtoul (tmp, &err, 10);
                                        msg_debug ("parse_header: read Content-Length header, value: %lu", (unsigned long int)task->content_length);
                                }
                        }
@@ -229,7 +246,7 @@ parse_header (struct worker_task *task, char *line)
                case 'H':
                        /* helo */
                        if (strncasecmp (headern, HELO_HEADER, sizeof (HELO_HEADER) - 1) == 0) {
-                               task->helo = memory_pool_strdup (task->task_pool, line);
+                               task->helo = memory_pool_fstrdup (task->task_pool, line);
                                msg_debug ("parse_header: read helo header, value: %s", task->helo);
                        }
                        else {
@@ -241,7 +258,7 @@ parse_header (struct worker_task *task, char *line)
                case 'F':
                        /* from */
                        if (strncasecmp (headern, FROM_HEADER, sizeof (FROM_HEADER) - 1) == 0) {
-                               task->from = memory_pool_strdup (task->task_pool, line);
+                               task->from = memory_pool_fstrdup (task->task_pool, line);
                                msg_debug ("parse_header: read from header, value: %s", task->from);
                        }
                        else {
@@ -253,7 +270,7 @@ parse_header (struct worker_task *task, char *line)
                case 'Q':
                        /* Queue id */
                        if (strncasecmp (headern, QUEUE_ID_HEADER, sizeof (QUEUE_ID_HEADER) - 1) == 0) {
-                               task->queue_id = memory_pool_strdup (task->task_pool, line);
+                               task->queue_id = memory_pool_fstrdup (task->task_pool, line);
                                msg_debug ("parse_header: read queue_id header, value: %s", task->queue_id);
                        }
                        else {
@@ -265,7 +282,7 @@ parse_header (struct worker_task *task, char *line)
                case 'R':
                        /* rcpt */
                        if (strncasecmp (headern, RCPT_HEADER, sizeof (RCPT_HEADER) - 1) == 0) {
-                               tmp = memory_pool_strdup (task->task_pool, line);
+                               tmp = memory_pool_fstrdup (task->task_pool, line);
                                task->rcpt = g_list_prepend (task->rcpt, tmp);
                                msg_debug ("parse_header: read rcpt header, value: %s", tmp);
                        }
@@ -278,7 +295,8 @@ parse_header (struct worker_task *task, char *line)
                case 'N':
                        /* nrcpt */
                        if (strncasecmp (headern, NRCPT_HEADER, sizeof (NRCPT_HEADER) - 1) == 0) {
-                               task->nrcpt = strtoul (line, &err, 10);
+                               tmp = memory_pool_fstrdup (task->task_pool, line);
+                               task->nrcpt = strtoul (tmp, &err, 10);
                                msg_debug ("parse_header: read rcpt header, value: %d", (int)task->nrcpt);
                        }
                        else {
@@ -290,11 +308,12 @@ parse_header (struct worker_task *task, char *line)
                case 'I':
                        /* ip_addr */
                        if (strncasecmp (headern, IP_ADDR_HEADER, sizeof (IP_ADDR_HEADER) - 1) == 0) {
-                               if (!inet_aton (line, &task->from_addr)) {
-                                       msg_info ("parse_header: bad ip header: '%s'", line);
+                               tmp = memory_pool_fstrdup (task->task_pool, line);
+                               if (!inet_aton (tmp, &task->from_addr)) {
+                                       msg_info ("parse_header: bad ip header: '%s'", tmp);
                                        return -1;
                                }
-                               msg_debug ("parse_header: read IP header, value: %s", line);
+                               msg_debug ("parse_header: read IP header, value: %s", tmp);
                        }
                        else {
                                msg_info ("parse_header: wrong header: %s", headern);
@@ -314,10 +333,10 @@ read_rspamd_input_line (struct worker_task *task, f_str_t *line)
 {
        switch (task->state) {
                case READ_COMMAND:
-                       return parse_command (task, fstrcstr (line, task->task_pool));
+                       return parse_command (task, line);
                        break;
                case READ_HEADER:
-                       return parse_header (task, fstrcstr (line, task->task_pool));
+                       return parse_header (task, line);
                        break;
                default:
                        return -1;