From fe2efaafebe67860ec3f5b3c259208ce7db05eeb Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Fri, 8 May 2009 18:46:39 +0400 Subject: [PATCH] * Improve performance of IO reading by reworking IO dispatcher algorithm --- src/buffer.c | 80 ++++++++++++++++++++++++-------------------------- src/mem_pool.c | 17 +++++++++++ src/mem_pool.h | 11 +++++++ src/protocol.c | 63 +++++++++++++++++++++++++-------------- 4 files changed, 108 insertions(+), 63 deletions(-) diff --git a/src/buffer.c b/src/buffer.c index fe5cf7a72..b8e2c4dcc 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -127,10 +127,12 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read) ssize_t r; GError *err; f_str_t res; - char *c; - unsigned int len; + char *c, *b; + char **pos; + unsigned int *len; enum io_policy saved_policy; + if (d->in_buf == NULL) { d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); if (d->policy == BUFFER_LINE) { @@ -141,6 +143,9 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read) } d->in_buf->pos = d->in_buf->data->begin; } + + pos = &d->in_buf->pos; + len = &d->in_buf->data->len; if (BUFREMAIN (d->in_buf) == 0) { /* Buffer is full, try to call callback with overflow error */ @@ -152,7 +157,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read) } else if (!skip_read) { /* Try to read the whole buffer */ - r = read (fd, d->in_buf->pos, BUFREMAIN (d->in_buf)); + r = read (fd, *pos, BUFREMAIN (d->in_buf)); if (r == -1 && errno != EAGAIN) { if (d->err_callback) { err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); @@ -173,8 +178,8 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read) return; } else { - d->in_buf->pos += r; - d->in_buf->data->len += r; + *pos += r; + *len += r; } msg_debug ("read_buffers: read %ld characters, policy is %s, watermark is: %ld", (long int)r, d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", @@ -183,15 +188,18 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read) saved_policy = d->policy; c = d->in_buf->data->begin; + b = c; r = 0; - len = d->in_buf->data->len; switch (d->policy) { case BUFFER_LINE: - while (r < len) { - if (*c == '\r' || *c == '\n') { - res.begin = d->in_buf->data->begin; + while (r < *len) { + if (*c == '\n') { + res.begin = b; res.len = r; + if (*(c - 1) == '\r') { + res.len --; + } if (d->read_callback) { d->read_callback (&res, d->user_data); if (d->wanna_die) { @@ -199,23 +207,18 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read) rspamd_remove_dispatcher (d); return; } - if (r < len - 1 && *(c + 1) == '\n') { - r ++; - c ++; - } /* Move remaining string to begin of buffer (draining) */ - memmove (d->in_buf->data->begin, c + 1, len - r - 1); - c = d->in_buf->data->begin; - d->in_buf->data->len -= r + 1; - d->in_buf->pos -= r + 1; + memmove (d->in_buf->data->begin, c + 1, *len - r - 1); + b = d->in_buf->data->begin; + c = b; + *len -= r + 1; + *pos = b + *len; r = 0; - len = d->in_buf->data->len; if (d->policy != saved_policy) { msg_debug ("read_buffers: policy changed during callback, restart buffer's processing"); read_buffers (fd, d, TRUE); return; } - continue; } } r ++; @@ -223,30 +226,25 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read) } break; case BUFFER_CHARACTER: - while (r <= len) { - if (r == d->nchars) { - res.begin = d->in_buf->data->begin; - res.len = r; - if (d->read_callback) { - d->read_callback (&res, d->user_data); - /* Move remaining string to begin of buffer (draining) */ - memmove (d->in_buf->data->begin, c, len - r); - c = d->in_buf->data->begin; - d->in_buf->data->len -= r; - d->in_buf->pos -= r; - r = 0; - len = d->in_buf->data->len; - if (d->policy != saved_policy) { - msg_debug ("read_buffers: policy changed during callback, restart buffer's processing"); - read_buffers (fd, d, TRUE); - return; - } - continue; + r = d->nchars; + if (*len >= r) { + res.begin = b; + res.len = r; + c = b + r; + if (d->read_callback) { + d->read_callback (&res, d->user_data); + /* Move remaining string to begin of buffer (draining) */ + memmove (d->in_buf->data->begin, c, *len - r); + b = d->in_buf->data->begin; + c = b; + *len -= r; + *pos = b + *len; + if (d->policy != saved_policy) { + msg_debug ("read_buffers: policy changed during callback, restart buffer's processing"); + read_buffers (fd, d, TRUE); + return; } - } - r ++; - c ++; } break; } diff --git a/src/mem_pool.c b/src/mem_pool.c index df4c544f9..76ae1141b 100644 --- a/src/mem_pool.c +++ b/src/mem_pool.c @@ -24,6 +24,7 @@ #include "config.h" #include "mem_pool.h" +#include "fstring.h" /* Sleep time for spin lock in nanoseconds */ #define MUTEX_SLEEP_TIME 10000000L @@ -227,6 +228,22 @@ memory_pool_strdup (memory_pool_t *pool, const char *src) return newstr; } +char * +memory_pool_fstrdup (memory_pool_t *pool, const struct f_str_s *src) +{ + char *newstr; + + if (src == NULL) { + return NULL; + } + + newstr = memory_pool_alloc (pool, src->len + 1); + memcpy (newstr, src->begin, src->len); + newstr[src->len] = '\0'; + return newstr; +} + + char * memory_pool_strdup_shared (memory_pool_t *pool, const char *src) { diff --git a/src/mem_pool.h b/src/mem_pool.h index 275da17d4..ab477a78e 100644 --- a/src/mem_pool.h +++ b/src/mem_pool.h @@ -14,6 +14,9 @@ #include "config.h" + +struct f_str_s; + /** * Destructor type definition */ @@ -116,6 +119,14 @@ void* memory_pool_alloc0 (memory_pool_t* pool, memory_pool_ssize_t size); */ char* memory_pool_strdup (memory_pool_t* pool, const char *src); +/** + * Make a copy of fixed string in pool as null terminated string + * @param pool memory pool object + * @param src source string + * @return pointer to newly created string that is copy of src + */ +char* memory_pool_fstrdup (memory_pool_t* pool, const struct f_str_s *src); + /** * Allocate piece of shared memory * @param pool memory pool object diff --git a/src/protocol.c b/src/protocol.c index 60b3b1bf1..a330334f8 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -89,13 +89,32 @@ /* XXX: try to convert rspamd errors to spamd errors */ #define SPAMD_ERROR "EX_ERROR" +static char * +separate_command (f_str_t *in, char c) +{ + int r = 0; + char *p = in->begin, *b; + b = p; + + while (r < in->len) { + if (*p == c) { + *p = '\0'; + in->begin = p + 1; + return b; + } + p ++; + r ++; + } + + return NULL; +} + static int -parse_command (struct worker_task *task, char *line) +parse_command (struct worker_task *task, f_str_t *line) { char *token; - msg_debug ("parse_command: got line from worker: %s", line); - token = strsep (&line, " "); + token = separate_command (line, ' '); if (line == NULL || token == NULL) { msg_debug ("parse_command: bad command: %s", token); return -1; @@ -160,14 +179,13 @@ parse_command (struct worker_task *task, char *line) return -1; } - if (strncasecmp (line, RSPAMC_GREETING, sizeof (RSPAMC_GREETING) - 1) == 0) { + if (strncasecmp (line->begin, RSPAMC_GREETING, sizeof (RSPAMC_GREETING) - 1) == 0) { task->proto = RSPAMC_PROTO; } - else if (strncasecmp (line, SPAMC_GREETING, sizeof (SPAMC_GREETING) -1) == 0) { + else if (strncasecmp (line->begin, SPAMC_GREETING, sizeof (SPAMC_GREETING) -1) == 0) { task->proto = SPAMC_PROTO; } else { - msg_debug ("parse_command: bad protocol version: %s", line); return -1; } task->state = READ_HEADER; @@ -175,13 +193,12 @@ parse_command (struct worker_task *task, char *line) } static int -parse_header (struct worker_task *task, char *line) +parse_header (struct worker_task *task, f_str_t *line) { char *headern, *err, *tmp; - msg_debug ("parse_header: got line from worker: %s", line); /* Check end of headers */ - if (*line == '\0') { + if (line->len == 0) { msg_debug ("parse_header: got empty line, assume it as end of headers"); if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) { task->state = WRITE_REPLY; @@ -201,13 +218,12 @@ parse_header (struct worker_task *task, char *line) return 0; } - headern = strsep (&line, ":"); + headern = separate_command (line, ':'); if (line == NULL || headern == NULL) { return -1; } /* Eat whitespaces */ - g_strstrip (line); g_strstrip (headern); switch (headern[0]) { @@ -216,7 +232,8 @@ parse_header (struct worker_task *task, char *line) /* content-length */ if (strncasecmp (headern, CONTENT_LENGTH_HEADER, sizeof (CONTENT_LENGTH_HEADER) - 1) == 0) { if (task->content_length == 0) { - task->content_length = strtoul (line, &err, 10); + tmp = memory_pool_fstrdup (task->task_pool, line); + task->content_length = strtoul (tmp, &err, 10); msg_debug ("parse_header: read Content-Length header, value: %lu", (unsigned long int)task->content_length); } } @@ -229,7 +246,7 @@ parse_header (struct worker_task *task, char *line) case 'H': /* helo */ if (strncasecmp (headern, HELO_HEADER, sizeof (HELO_HEADER) - 1) == 0) { - task->helo = memory_pool_strdup (task->task_pool, line); + task->helo = memory_pool_fstrdup (task->task_pool, line); msg_debug ("parse_header: read helo header, value: %s", task->helo); } else { @@ -241,7 +258,7 @@ parse_header (struct worker_task *task, char *line) case 'F': /* from */ if (strncasecmp (headern, FROM_HEADER, sizeof (FROM_HEADER) - 1) == 0) { - task->from = memory_pool_strdup (task->task_pool, line); + task->from = memory_pool_fstrdup (task->task_pool, line); msg_debug ("parse_header: read from header, value: %s", task->from); } else { @@ -253,7 +270,7 @@ parse_header (struct worker_task *task, char *line) case 'Q': /* Queue id */ if (strncasecmp (headern, QUEUE_ID_HEADER, sizeof (QUEUE_ID_HEADER) - 1) == 0) { - task->queue_id = memory_pool_strdup (task->task_pool, line); + task->queue_id = memory_pool_fstrdup (task->task_pool, line); msg_debug ("parse_header: read queue_id header, value: %s", task->queue_id); } else { @@ -265,7 +282,7 @@ parse_header (struct worker_task *task, char *line) case 'R': /* rcpt */ if (strncasecmp (headern, RCPT_HEADER, sizeof (RCPT_HEADER) - 1) == 0) { - tmp = memory_pool_strdup (task->task_pool, line); + tmp = memory_pool_fstrdup (task->task_pool, line); task->rcpt = g_list_prepend (task->rcpt, tmp); msg_debug ("parse_header: read rcpt header, value: %s", tmp); } @@ -278,7 +295,8 @@ parse_header (struct worker_task *task, char *line) case 'N': /* nrcpt */ if (strncasecmp (headern, NRCPT_HEADER, sizeof (NRCPT_HEADER) - 1) == 0) { - task->nrcpt = strtoul (line, &err, 10); + tmp = memory_pool_fstrdup (task->task_pool, line); + task->nrcpt = strtoul (tmp, &err, 10); msg_debug ("parse_header: read rcpt header, value: %d", (int)task->nrcpt); } else { @@ -290,11 +308,12 @@ parse_header (struct worker_task *task, char *line) case 'I': /* ip_addr */ if (strncasecmp (headern, IP_ADDR_HEADER, sizeof (IP_ADDR_HEADER) - 1) == 0) { - if (!inet_aton (line, &task->from_addr)) { - msg_info ("parse_header: bad ip header: '%s'", line); + tmp = memory_pool_fstrdup (task->task_pool, line); + if (!inet_aton (tmp, &task->from_addr)) { + msg_info ("parse_header: bad ip header: '%s'", tmp); return -1; } - msg_debug ("parse_header: read IP header, value: %s", line); + msg_debug ("parse_header: read IP header, value: %s", tmp); } else { msg_info ("parse_header: wrong header: %s", headern); @@ -314,10 +333,10 @@ read_rspamd_input_line (struct worker_task *task, f_str_t *line) { switch (task->state) { case READ_COMMAND: - return parse_command (task, fstrcstr (line, task->task_pool)); + return parse_command (task, line); break; case READ_HEADER: - return parse_header (task, fstrcstr (line, task->task_pool)); + return parse_header (task, line); break; default: return -1; -- 2.39.5