summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-05-08 18:46:39 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-05-08 18:46:39 +0400
commitfe2efaafebe67860ec3f5b3c259208ce7db05eeb (patch)
tree7f02bbed389e4bd6e19a45e91c712887945155c5 /src
parentafe65479c0052b38e080d7d9adaeba7326823dea (diff)
downloadrspamd-fe2efaafebe67860ec3f5b3c259208ce7db05eeb.tar.gz
rspamd-fe2efaafebe67860ec3f5b3c259208ce7db05eeb.zip
* Improve performance of IO reading by reworking IO dispatcher algorithm
Diffstat (limited to 'src')
-rw-r--r--src/buffer.c80
-rw-r--r--src/mem_pool.c17
-rw-r--r--src/mem_pool.h11
-rw-r--r--src/protocol.c63
4 files changed, 108 insertions, 63 deletions
diff --git a/src/buffer.c b/src/buffer.c
index fe5cf7a72..b8e2c4dcc 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -127,10 +127,12 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
ssize_t r;
GError *err;
f_str_t res;
- char *c;
- unsigned int len;
+ char *c, *b;
+ char **pos;
+ unsigned int *len;
enum io_policy saved_policy;
+
if (d->in_buf == NULL) {
d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
if (d->policy == BUFFER_LINE) {
@@ -141,6 +143,9 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
}
d->in_buf->pos = d->in_buf->data->begin;
}
+
+ pos = &d->in_buf->pos;
+ len = &d->in_buf->data->len;
if (BUFREMAIN (d->in_buf) == 0) {
/* Buffer is full, try to call callback with overflow error */
@@ -152,7 +157,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
}
else if (!skip_read) {
/* Try to read the whole buffer */
- r = read (fd, d->in_buf->pos, BUFREMAIN (d->in_buf));
+ r = read (fd, *pos, BUFREMAIN (d->in_buf));
if (r == -1 && errno != EAGAIN) {
if (d->err_callback) {
err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
@@ -173,8 +178,8 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
return;
}
else {
- d->in_buf->pos += r;
- d->in_buf->data->len += r;
+ *pos += r;
+ *len += r;
}
msg_debug ("read_buffers: read %ld characters, policy is %s, watermark is: %ld",
(long int)r, d->policy == BUFFER_LINE ? "LINE" : "CHARACTER",
@@ -183,15 +188,18 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
saved_policy = d->policy;
c = d->in_buf->data->begin;
+ b = c;
r = 0;
- len = d->in_buf->data->len;
switch (d->policy) {
case BUFFER_LINE:
- while (r < len) {
- if (*c == '\r' || *c == '\n') {
- res.begin = d->in_buf->data->begin;
+ while (r < *len) {
+ if (*c == '\n') {
+ res.begin = b;
res.len = r;
+ if (*(c - 1) == '\r') {
+ res.len --;
+ }
if (d->read_callback) {
d->read_callback (&res, d->user_data);
if (d->wanna_die) {
@@ -199,23 +207,18 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
rspamd_remove_dispatcher (d);
return;
}
- if (r < len - 1 && *(c + 1) == '\n') {
- r ++;
- c ++;
- }
/* Move remaining string to begin of buffer (draining) */
- memmove (d->in_buf->data->begin, c + 1, len - r - 1);
- c = d->in_buf->data->begin;
- d->in_buf->data->len -= r + 1;
- d->in_buf->pos -= r + 1;
+ memmove (d->in_buf->data->begin, c + 1, *len - r - 1);
+ b = d->in_buf->data->begin;
+ c = b;
+ *len -= r + 1;
+ *pos = b + *len;
r = 0;
- len = d->in_buf->data->len;
if (d->policy != saved_policy) {
msg_debug ("read_buffers: policy changed during callback, restart buffer's processing");
read_buffers (fd, d, TRUE);
return;
}
- continue;
}
}
r ++;
@@ -223,30 +226,25 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
}
break;
case BUFFER_CHARACTER:
- while (r <= len) {
- if (r == d->nchars) {
- res.begin = d->in_buf->data->begin;
- res.len = r;
- if (d->read_callback) {
- d->read_callback (&res, d->user_data);
- /* Move remaining string to begin of buffer (draining) */
- memmove (d->in_buf->data->begin, c, len - r);
- c = d->in_buf->data->begin;
- d->in_buf->data->len -= r;
- d->in_buf->pos -= r;
- r = 0;
- len = d->in_buf->data->len;
- if (d->policy != saved_policy) {
- msg_debug ("read_buffers: policy changed during callback, restart buffer's processing");
- read_buffers (fd, d, TRUE);
- return;
- }
- continue;
+ r = d->nchars;
+ if (*len >= r) {
+ res.begin = b;
+ res.len = r;
+ c = b + r;
+ if (d->read_callback) {
+ d->read_callback (&res, d->user_data);
+ /* Move remaining string to begin of buffer (draining) */
+ memmove (d->in_buf->data->begin, c, *len - r);
+ b = d->in_buf->data->begin;
+ c = b;
+ *len -= r;
+ *pos = b + *len;
+ if (d->policy != saved_policy) {
+ msg_debug ("read_buffers: policy changed during callback, restart buffer's processing");
+ read_buffers (fd, d, TRUE);
+ return;
}
-
}
- r ++;
- c ++;
}
break;
}
diff --git a/src/mem_pool.c b/src/mem_pool.c
index df4c544f9..76ae1141b 100644
--- a/src/mem_pool.c
+++ b/src/mem_pool.c
@@ -24,6 +24,7 @@
#include "config.h"
#include "mem_pool.h"
+#include "fstring.h"
/* Sleep time for spin lock in nanoseconds */
#define MUTEX_SLEEP_TIME 10000000L
@@ -228,6 +229,22 @@ memory_pool_strdup (memory_pool_t *pool, const char *src)
}
char *
+memory_pool_fstrdup (memory_pool_t *pool, const struct f_str_s *src)
+{
+ char *newstr;
+
+ if (src == NULL) {
+ return NULL;
+ }
+
+ newstr = memory_pool_alloc (pool, src->len + 1);
+ memcpy (newstr, src->begin, src->len);
+ newstr[src->len] = '\0';
+ return newstr;
+}
+
+
+char *
memory_pool_strdup_shared (memory_pool_t *pool, const char *src)
{
memory_pool_ssize_t len;
diff --git a/src/mem_pool.h b/src/mem_pool.h
index 275da17d4..ab477a78e 100644
--- a/src/mem_pool.h
+++ b/src/mem_pool.h
@@ -14,6 +14,9 @@
#include "config.h"
+
+struct f_str_s;
+
/**
* Destructor type definition
*/
@@ -117,6 +120,14 @@ void* memory_pool_alloc0 (memory_pool_t* pool, memory_pool_ssize_t size);
char* memory_pool_strdup (memory_pool_t* pool, const char *src);
/**
+ * Make a copy of fixed string in pool as null terminated string
+ * @param pool memory pool object
+ * @param src source string
+ * @return pointer to newly created string that is copy of src
+ */
+char* memory_pool_fstrdup (memory_pool_t* pool, const struct f_str_s *src);
+
+/**
* Allocate piece of shared memory
* @param pool memory pool object
* @param size bytes to allocate
diff --git a/src/protocol.c b/src/protocol.c
index 60b3b1bf1..a330334f8 100644
--- a/src/protocol.c
+++ b/src/protocol.c
@@ -89,13 +89,32 @@
/* XXX: try to convert rspamd errors to spamd errors */
#define SPAMD_ERROR "EX_ERROR"
+static char *
+separate_command (f_str_t *in, char c)
+{
+ int r = 0;
+ char *p = in->begin, *b;
+ b = p;
+
+ while (r < in->len) {
+ if (*p == c) {
+ *p = '\0';
+ in->begin = p + 1;
+ return b;
+ }
+ p ++;
+ r ++;
+ }
+
+ return NULL;
+}
+
static int
-parse_command (struct worker_task *task, char *line)
+parse_command (struct worker_task *task, f_str_t *line)
{
char *token;
- msg_debug ("parse_command: got line from worker: %s", line);
- token = strsep (&line, " ");
+ token = separate_command (line, ' ');
if (line == NULL || token == NULL) {
msg_debug ("parse_command: bad command: %s", token);
return -1;
@@ -160,14 +179,13 @@ parse_command (struct worker_task *task, char *line)
return -1;
}
- if (strncasecmp (line, RSPAMC_GREETING, sizeof (RSPAMC_GREETING) - 1) == 0) {
+ if (strncasecmp (line->begin, RSPAMC_GREETING, sizeof (RSPAMC_GREETING) - 1) == 0) {
task->proto = RSPAMC_PROTO;
}
- else if (strncasecmp (line, SPAMC_GREETING, sizeof (SPAMC_GREETING) -1) == 0) {
+ else if (strncasecmp (line->begin, SPAMC_GREETING, sizeof (SPAMC_GREETING) -1) == 0) {
task->proto = SPAMC_PROTO;
}
else {
- msg_debug ("parse_command: bad protocol version: %s", line);
return -1;
}
task->state = READ_HEADER;
@@ -175,13 +193,12 @@ parse_command (struct worker_task *task, char *line)
}
static int
-parse_header (struct worker_task *task, char *line)
+parse_header (struct worker_task *task, f_str_t *line)
{
char *headern, *err, *tmp;
- msg_debug ("parse_header: got line from worker: %s", line);
/* Check end of headers */
- if (*line == '\0') {
+ if (line->len == 0) {
msg_debug ("parse_header: got empty line, assume it as end of headers");
if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) {
task->state = WRITE_REPLY;
@@ -201,13 +218,12 @@ parse_header (struct worker_task *task, char *line)
return 0;
}
- headern = strsep (&line, ":");
+ headern = separate_command (line, ':');
if (line == NULL || headern == NULL) {
return -1;
}
/* Eat whitespaces */
- g_strstrip (line);
g_strstrip (headern);
switch (headern[0]) {
@@ -216,7 +232,8 @@ parse_header (struct worker_task *task, char *line)
/* content-length */
if (strncasecmp (headern, CONTENT_LENGTH_HEADER, sizeof (CONTENT_LENGTH_HEADER) - 1) == 0) {
if (task->content_length == 0) {
- task->content_length = strtoul (line, &err, 10);
+ tmp = memory_pool_fstrdup (task->task_pool, line);
+ task->content_length = strtoul (tmp, &err, 10);
msg_debug ("parse_header: read Content-Length header, value: %lu", (unsigned long int)task->content_length);
}
}
@@ -229,7 +246,7 @@ parse_header (struct worker_task *task, char *line)
case 'H':
/* helo */
if (strncasecmp (headern, HELO_HEADER, sizeof (HELO_HEADER) - 1) == 0) {
- task->helo = memory_pool_strdup (task->task_pool, line);
+ task->helo = memory_pool_fstrdup (task->task_pool, line);
msg_debug ("parse_header: read helo header, value: %s", task->helo);
}
else {
@@ -241,7 +258,7 @@ parse_header (struct worker_task *task, char *line)
case 'F':
/* from */
if (strncasecmp (headern, FROM_HEADER, sizeof (FROM_HEADER) - 1) == 0) {
- task->from = memory_pool_strdup (task->task_pool, line);
+ task->from = memory_pool_fstrdup (task->task_pool, line);
msg_debug ("parse_header: read from header, value: %s", task->from);
}
else {
@@ -253,7 +270,7 @@ parse_header (struct worker_task *task, char *line)
case 'Q':
/* Queue id */
if (strncasecmp (headern, QUEUE_ID_HEADER, sizeof (QUEUE_ID_HEADER) - 1) == 0) {
- task->queue_id = memory_pool_strdup (task->task_pool, line);
+ task->queue_id = memory_pool_fstrdup (task->task_pool, line);
msg_debug ("parse_header: read queue_id header, value: %s", task->queue_id);
}
else {
@@ -265,7 +282,7 @@ parse_header (struct worker_task *task, char *line)
case 'R':
/* rcpt */
if (strncasecmp (headern, RCPT_HEADER, sizeof (RCPT_HEADER) - 1) == 0) {
- tmp = memory_pool_strdup (task->task_pool, line);
+ tmp = memory_pool_fstrdup (task->task_pool, line);
task->rcpt = g_list_prepend (task->rcpt, tmp);
msg_debug ("parse_header: read rcpt header, value: %s", tmp);
}
@@ -278,7 +295,8 @@ parse_header (struct worker_task *task, char *line)
case 'N':
/* nrcpt */
if (strncasecmp (headern, NRCPT_HEADER, sizeof (NRCPT_HEADER) - 1) == 0) {
- task->nrcpt = strtoul (line, &err, 10);
+ tmp = memory_pool_fstrdup (task->task_pool, line);
+ task->nrcpt = strtoul (tmp, &err, 10);
msg_debug ("parse_header: read rcpt header, value: %d", (int)task->nrcpt);
}
else {
@@ -290,11 +308,12 @@ parse_header (struct worker_task *task, char *line)
case 'I':
/* ip_addr */
if (strncasecmp (headern, IP_ADDR_HEADER, sizeof (IP_ADDR_HEADER) - 1) == 0) {
- if (!inet_aton (line, &task->from_addr)) {
- msg_info ("parse_header: bad ip header: '%s'", line);
+ tmp = memory_pool_fstrdup (task->task_pool, line);
+ if (!inet_aton (tmp, &task->from_addr)) {
+ msg_info ("parse_header: bad ip header: '%s'", tmp);
return -1;
}
- msg_debug ("parse_header: read IP header, value: %s", line);
+ msg_debug ("parse_header: read IP header, value: %s", tmp);
}
else {
msg_info ("parse_header: wrong header: %s", headern);
@@ -314,10 +333,10 @@ read_rspamd_input_line (struct worker_task *task, f_str_t *line)
{
switch (task->state) {
case READ_COMMAND:
- return parse_command (task, fstrcstr (line, task->task_pool));
+ return parse_command (task, line);
break;
case READ_HEADER:
- return parse_header (task, fstrcstr (line, task->task_pool));
+ return parse_header (task, line);
break;
default:
return -1;