From: cebka@mailsupport.rambler.ru Date: Tue, 23 Sep 2008 07:47:56 +0000 (+0400) Subject: * Small updates to memory pool library X-Git-Tag: 0.2.7~371 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=193acf73e51b24ccca8048ebb6aaec2971594268;p=rspamd.git * Small updates to memory pool library - fix cases when new chunk is allocated - add memory pool allocator statistics - let it work in multi-threaded environment - add strdup function for convinience * Use memory pool allocator more widely to avoid memory leaks in future and optimize performance * Task pool chunk size is now pre-defined constant (16 Kb currently) --- diff --git a/fstring.c b/fstring.c index cecf531de..ad0be7d78 100644 --- a/fstring.c +++ b/fstring.c @@ -174,14 +174,14 @@ fstrpush (f_str_t *dest, char c) * Allocate memory for f_str_t */ f_str_t* -fstralloc (size_t len) +fstralloc (memory_pool_t *pool, size_t len) { - f_str_t *res = malloc (sizeof (f_str_t)); + f_str_t *res = memory_pool_alloc (pool, sizeof (f_str_t)); if (res == NULL) { return NULL; } - res->begin = malloc (len); + res->begin = memory_pool_alloc (pool, len); if (res->begin == NULL) { free (res); return NULL; @@ -195,7 +195,7 @@ fstralloc (size_t len) * Truncate string to its len */ f_str_t* -fstrtruncate (f_str_t *orig) +fstrtruncate (memory_pool_t *pool, f_str_t *orig) { f_str_t *res; @@ -203,12 +203,11 @@ fstrtruncate (f_str_t *orig) return orig; } - res = fstralloc (orig->len); + res = fstralloc (pool, orig->len); if (res == NULL) { return NULL; } fstrcpy (res, orig); - fstrfree (orig); return res; } @@ -217,7 +216,7 @@ fstrtruncate (f_str_t *orig) * Enlarge string to new size */ f_str_t* -fstrgrow (f_str_t *orig, size_t newlen) +fstrgrow (memory_pool_t *pool, f_str_t *orig, size_t newlen) { f_str_t *res; @@ -225,12 +224,11 @@ fstrgrow (f_str_t *orig, size_t newlen) return orig; } - res = fstralloc (newlen); + res = fstralloc (pool, newlen); if (res == NULL) { return NULL; } fstrcpy (res, orig); - fstrfree (orig); return res; } diff --git a/fstring.h b/fstring.h index c13860d3e..c3087b2c5 100644 --- a/fstring.h +++ b/fstring.h @@ -6,6 +6,7 @@ #define FSTRING_H #include +#include "mem_pool.h" #define update_buf_size(x) (x)->free = (x)->buf->size - ((x)->pos - (x)->buf->begin); (x)->buf->len = (x)->pos - (x)->buf->begin @@ -65,22 +66,17 @@ int fstrpush (f_str_t *dest, char c); /* * Allocate memory for f_str_t */ -f_str_t* fstralloc (size_t len); +f_str_t* fstralloc (memory_pool_t *pool, size_t len); /* * Truncate string to its len */ -f_str_t* fstrtruncate (f_str_t *orig); +f_str_t* fstrtruncate (memory_pool_t *pool, f_str_t *orig); /* * Enlarge string to new size */ -f_str_t* fstrgrow (f_str_t *orig, size_t newlen); - -/* - * Free memory for f_str_t - */ -#define fstrfree(x) free((x)->begin); free((x)) +f_str_t* fstrgrow (memory_pool_t *pool, f_str_t *orig, size_t newlen); /* * Return specified character diff --git a/mem_pool.c b/mem_pool.c index c6afbfa9f..e32d58f31 100644 --- a/mem_pool.c +++ b/mem_pool.c @@ -1,7 +1,23 @@ #include #include +#include +#include #include "mem_pool.h" +#ifdef _THREAD_SAFE +pthread_mutex_t stat_mtx = PTHREAD_MUTEX_INITIALIZER; +#define STAT_LOCK() do { pthread_mutex_lock (&stat_mtx); } while (0) +#define STAT_UNLOCK() do { pthread_mutex_unlock (&stat_mtx); } while (0) +#else +#define STAT_LOCK() do {} while (0) +#define STAT_UNLOCK() do {} while (0) +#endif + +/* Internal statistic */ +static size_t bytes_allocated = 0; +static size_t chunks_allocated = 0; +static size_t chunks_freed = 0; + static struct _pool_chain * pool_chain_new (size_t size) { @@ -11,7 +27,10 @@ pool_chain_new (size_t size) chain->len = size; chain->pos = chain->begin; chain->next = NULL; - + STAT_LOCK (); + chunks_allocated ++; + STAT_UNLOCK (); + return chain; } @@ -39,9 +58,9 @@ memory_pool_alloc (memory_pool_t *pool, size_t size) while (memory_pool_free (cur) < size && cur->next) { cur = cur->next; } - if (cur->next == NULL) { + if (cur->next == NULL && memory_pool_free (cur) < size) { /* Allocate new pool */ - if (cur->len > size) { + if (cur->len >= size) { new = pool_chain_new (cur->len); } else { @@ -51,15 +70,37 @@ memory_pool_alloc (memory_pool_t *pool, size_t size) cur->next = new; pool->cur_pool = new; new->pos += size; + STAT_LOCK (); + bytes_allocated += size; + STAT_UNLOCK (); return new->begin; } tmp = cur->pos; cur->pos += size; + STAT_LOCK (); + bytes_allocated += size; + STAT_UNLOCK (); return tmp; } return NULL; } +char * +memory_pool_strdup (memory_pool_t *pool, const char *src) +{ + size_t len; + char *newstr; + + if (src == NULL) { + return NULL; + } + + len = strlen (src); + newstr = memory_pool_alloc (pool, len + 1); + memcpy (newstr, src, len + 1); + return newstr; +} + void memory_pool_delete (memory_pool_t *pool) { @@ -69,10 +110,21 @@ memory_pool_delete (memory_pool_t *pool) cur = cur->next; g_free (tmp->begin); g_free (tmp); + STAT_LOCK (); + chunks_freed ++; + STAT_UNLOCK (); } g_free (pool); } +void +memory_pool_stat (memory_pool_stat_t *st) +{ + st->bytes_allocated = bytes_allocated; + st->chunks_allocated = chunks_allocated; + st->chunks_freed = chunks_freed; +} + /* * vi:ts=4 */ diff --git a/mem_pool.h b/mem_pool.h index 0927d9197..1e2c020ca 100644 --- a/mem_pool.h +++ b/mem_pool.h @@ -14,10 +14,19 @@ typedef struct memory_pool_s { struct _pool_chain *first_pool; } memory_pool_t; +typedef struct memory_pool_stat_s { + size_t bytes_allocated; + size_t chunks_allocated; + size_t chunks_freed; +} memory_pool_stat_t; + memory_pool_t* memory_pool_new (size_t size); void* memory_pool_alloc (memory_pool_t* pool, size_t size); +char* memory_pool_strdup (memory_pool_t* pool, const char *src); void memory_pool_delete (memory_pool_t* pool); -#define memory_pool_free(x) ((x)->pos - (x)->begin) +void memory_pool_stat (memory_pool_stat_t *st); + +#define memory_pool_free(x) ((x)->len - ((x)->pos - (x)->begin)) #endif diff --git a/plugins/surbl.c b/plugins/surbl.c index b163a4f2f..c8a545ed1 100644 --- a/plugins/surbl.c +++ b/plugins/surbl.c @@ -462,7 +462,7 @@ redirector_callback (int fd, short what, void *arg) } if (*p == '\0') { msg_info ("redirector_callback: got reply from redirector: '%s' -> '%s'", struri (param->url), c); - parse_uri (param->url, c); + parse_uri (param->url, c, param->task->task_pool); register_memcached_call (param->url, param->task); param->task->save.saved ++; } diff --git a/protocol.c b/protocol.c index 12fdfcf25..4a5ec5360 100644 --- a/protocol.c +++ b/protocol.c @@ -162,8 +162,8 @@ parse_header (struct worker_task *task, char *line) /* content-length */ if (strncasecmp (headern, CONTENT_LENGTH_HEADER, sizeof (CONTENT_LENGTH_HEADER) - 1) == 0) { task->content_length = strtoul (line, &err, 10); - task->msg = g_malloc (sizeof (f_str_buf_t)); - task->msg->buf = fstralloc (task->content_length); + task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_buf_t)); + task->msg->buf = fstralloc (task->task_pool, task->content_length); if (task->msg->buf == NULL) { msg_err ("read_socket: cannot allocate memory for message buffer"); return -1; @@ -178,7 +178,7 @@ parse_header (struct worker_task *task, char *line) case 'H': /* helo */ if (strncasecmp (headern, HELO_HEADER, sizeof (HELO_HEADER) - 1) == 0) { - task->helo = g_strdup (line); + task->helo = memory_pool_strdup (task->task_pool, line); } else { msg_info ("parse_header: wrong header: %s", headern); @@ -189,7 +189,7 @@ parse_header (struct worker_task *task, char *line) case 'F': /* from */ if (strncasecmp (headern, FROM_HEADER, sizeof (FROM_HEADER) - 1) == 0) { - task->from = g_strdup (line); + task->from = memory_pool_strdup (task->task_pool, line); } else { msg_info ("parse_header: wrong header: %s", headern); @@ -200,7 +200,7 @@ parse_header (struct worker_task *task, char *line) case 'R': /* rcpt */ if (strncasecmp (headern, RCPT_HEADER, sizeof (RCPT_HEADER) - 1) == 0) { - task->rcpt = g_strdup (line); + task->rcpt = memory_pool_strdup (task->task_pool, line); } else { msg_info ("parse_header: wrong header: %s", headern); diff --git a/test/rspamd_mem_pool_test.c b/test/rspamd_mem_pool_test.c index 5b2bf3721..50e539cf7 100644 --- a/test/rspamd_mem_pool_test.c +++ b/test/rspamd_mem_pool_test.c @@ -11,6 +11,7 @@ void rspamd_mem_pool_test_func () { memory_pool_t *pool; + memory_pool_stat_t st; char *tmp, *tmp2; pool = memory_pool_new (sizeof (TEST_BUF)); @@ -24,4 +25,10 @@ rspamd_mem_pool_test_func () g_assert (strncmp (tmp2, TEST2_BUF, sizeof (TEST2_BUF)) == 0); memory_pool_delete (pool); + memory_pool_stat (&st); + + /* Check allocator stat */ + g_assert (st.bytes_allocated == sizeof (TEST_BUF) * 3); + g_assert (st.chunks_allocated == 2); + g_assert (st.chunks_freed == 2); } diff --git a/test/rspamd_url_test.c b/test/rspamd_url_test.c index 43545252d..02aaa4a35 100644 --- a/test/rspamd_url_test.c +++ b/test/rspamd_url_test.c @@ -83,6 +83,7 @@ rspamd_url_test_func () html->len = strlen (test_html); bzero (&task, sizeof (task)); TAILQ_INIT (&task.urls); + task.task_pool = memory_pool_new (8192); g_test_timer_start (); g_test_message ("Testing text URL regexp parser"); @@ -97,8 +98,6 @@ rspamd_url_test_func () while (!TAILQ_EMPTY (&task.urls)) { url = TAILQ_FIRST (&task.urls); TAILQ_REMOVE (&task.urls, url, next); - g_free (url->string); - g_free (url); } g_assert (i == 39); @@ -117,8 +116,6 @@ rspamd_url_test_func () while (!TAILQ_EMPTY (&task.urls)) { url = TAILQ_FIRST (&task.urls); TAILQ_REMOVE (&task.urls, url, next); - g_free (url->string); - g_free (url); } g_assert (i == 1); msg_debug ("Time elapsed: %.2f", g_test_timer_elapsed ()); diff --git a/url.c b/url.c index 367aa7d6a..61b1b0ad5 100644 --- a/url.c +++ b/url.c @@ -339,7 +339,7 @@ url_unescape (char *s) freshly allocated string will be returned in all cases. */ static char * -url_escape_1 (const char *s, unsigned char mask, int allow_passthrough) +url_escape_1 (const char *s, unsigned char mask, int allow_passthrough, memory_pool_t *pool) { const char *p1; char *p2, *newstr; @@ -350,11 +350,17 @@ url_escape_1 (const char *s, unsigned char mask, int allow_passthrough) if (urlchr_test (*p1, mask)) addition += 2; /* Two more characters (hex digits) */ - if (!addition) - return allow_passthrough ? (char *)s : strdup (s); + if (!addition) { + if (allow_passthrough) { + return (char *)s; + } + else { + return memory_pool_strdup (pool, s); + } + } newlen = (p1 - s) + addition; - newstr = (char *) g_malloc (newlen + 1); + newstr = (char *) memory_pool_alloc (pool, newlen + 1); p1 = s; p2 = newstr; @@ -378,18 +384,18 @@ url_escape_1 (const char *s, unsigned char mask, int allow_passthrough) string, returning a freshly allocated string. */ char * -url_escape (const char *s) +url_escape (const char *s, memory_pool_t *pool) { - return url_escape_1 (s, urlchr_unsafe, 0); + return url_escape_1 (s, urlchr_unsafe, 0, pool); } /* URL-escape the unsafe characters (see urlchr_table) in a given string. If no characters are unsafe, S is returned. */ static char * -url_escape_allow_passthrough (const char *s) +url_escape_allow_passthrough (const char *s, memory_pool_t *pool) { - return url_escape_1 (s, urlchr_unsafe, 1); + return url_escape_1 (s, urlchr_unsafe, 1, pool); } /* Decide whether the char at position P needs to be encoded. (It is @@ -427,7 +433,7 @@ char_needs_escaping (const char *p) */ static char * -reencode_escapes (const char *s) +reencode_escapes (const char *s, memory_pool_t *pool) { const char *p1; char *newstr, *p2; @@ -441,14 +447,15 @@ reencode_escapes (const char *s) if (char_needs_escaping (p1)) ++encode_count; - if (!encode_count) + if (!encode_count) { /* The string is good as it is. */ - return g_strdup (s); /* C const model sucks. */ + return memory_pool_strdup (pool, s); + } oldlen = p1 - s; /* Each encoding adds two characters (hex digits). */ newlen = oldlen + 2 * encode_count; - newstr = g_malloc (newlen + 1); + newstr = memory_pool_alloc (pool, newlen + 1); /* Second pass: copy the string to the destination address, encoding chars when needed. */ @@ -497,9 +504,9 @@ unescape_single_char (char *str, char chr) characters. */ static char * -url_escape_dir (const char *dir) +url_escape_dir (const char *dir, memory_pool_t *pool) { - char *newdir = url_escape_1 (dir, urlchr_unsafe | urlchr_reserved, 1); + char *newdir = url_escape_1 (dir, urlchr_unsafe | urlchr_reserved, 1, pool); if (newdir == dir) return (char *)dir; @@ -581,7 +588,7 @@ path_simplify (char *path) } enum uri_errno -parse_uri(struct uri *uri, unsigned char *uristring) +parse_uri(struct uri *uri, unsigned char *uristring, memory_pool_t *pool) { unsigned char *prefix_end, *host_end, *p; unsigned char *lbracket, *rbracket; @@ -593,7 +600,7 @@ parse_uri(struct uri *uri, unsigned char *uristring) /* Nothing to do for an empty url. */ if (!*uristring) return URI_ERRNO_EMPTY; - uri->string = reencode_escapes (uristring); + uri->string = reencode_escapes (uristring, pool); msg_debug ("parse_uri: reencoding escapes in original url: '%s'", struri (uri)); uri->protocollen = get_protocol_length (struri (uri)); @@ -818,9 +825,9 @@ url_parse_text (struct worker_task *task, GByteArray *content) url_str = g_match_info_fetch (info, 0); msg_debug ("url_parse_text: extracted string with regexp: '%s'", url_str); if (url_str != NULL) { - new = g_malloc (sizeof (struct uri)); + new = memory_pool_alloc (task->task_pool, sizeof (struct uri)); if (new != NULL) { - parse_uri (new, url_str); + parse_uri (new, url_str, task->task_pool); TAILQ_INSERT_TAIL (&task->urls, new, next); } } @@ -858,9 +865,9 @@ url_parse_html (struct worker_task *task, GByteArray *content) url_str = g_match_info_fetch (info, 1); msg_debug ("url_parse_html: extracted string with regexp: '%s'", url_str); if (url_str != NULL) { - new = g_malloc (sizeof (struct uri)); + new = memory_pool_alloc (task->task_pool, sizeof (struct uri)); if (new != NULL) { - parse_uri (new, url_str); + parse_uri (new, url_str, task->task_pool); TAILQ_INSERT_TAIL (&task->urls, new, next); } } diff --git a/url.h b/url.h index 045bfa90e..7a666f406 100644 --- a/url.h +++ b/url.h @@ -11,6 +11,7 @@ #endif #include +#include "mem_pool.h" struct worker_task; @@ -82,6 +83,6 @@ enum protocol { void url_parse_html (struct worker_task *task, GByteArray *part); void url_parse_text (struct worker_task *task, GByteArray *part); -enum uri_errno parse_uri(struct uri *uri, unsigned char *uristring); +enum uri_errno parse_uri(struct uri *uri, unsigned char *uristring, memory_pool_t *pool); #endif diff --git a/utils/Makefile.in b/utils/Makefile.in index 86f3eab60..4ceb18ae1 100644 --- a/utils/Makefile.in +++ b/utils/Makefile.in @@ -3,7 +3,7 @@ all: url_extracter url_extracter: $(OBJECTS) ../url.o ../util.o - $(CC) $(PTHREAD_LDFLAGS) $(LDFLAGS) $(OBJECTS) ../url.o ../util.o $(LIBS) -o url_extracter + $(CC) $(PTHREAD_LDFLAGS) $(LDFLAGS) $(OBJECTS) ../url.o ../util.o ../mem_pool.o $(LIBS) -o url_extracter clean: rm -f *.o url_extracter *.core diff --git a/worker.c b/worker.c index 7c3ce6b95..9846a5736 100644 --- a/worker.c +++ b/worker.c @@ -35,6 +35,8 @@ #define NRCPT_HEADER "Recipient-Number: " #define RCPT_HEADER "Rcpt: " +#define TASK_POOL_SIZE 16384 + const f_str_t CRLF = { /* begin */"\r\n", /* len */2, @@ -78,47 +80,23 @@ free_task (struct worker_task *task) struct mime_part *part; if (task) { - if (task->msg) { - fstrfree (task->msg->buf); - free (task->msg); - } if (task->message) { g_object_unref (task->message); } - if (task->helo) { - free (task->helo); - } - if (task->from) { - free (task->from); - } - if (task->rcpt) { - free (task->rcpt); - } if (task->memc_ctx) { memc_close_ctx (task->memc_ctx); - free (task->memc_ctx); - } - if (task->task_pool) { - memory_pool_delete (task->task_pool); } while (!TAILQ_EMPTY (&task->urls)) { cur = TAILQ_FIRST (&task->urls); TAILQ_REMOVE (&task->urls, cur, next); - free (cur->string); - free (cur); } while (!TAILQ_EMPTY (&task->results)) { res = TAILQ_FIRST (&task->results); TAILQ_REMOVE (&task->results, res, next); - free (res); } while (!TAILQ_EMPTY (&task->chain_results)) { chain_res = TAILQ_FIRST (&task->chain_results); - if (chain_res->marks != NULL) { - free (chain_res->marks); - } TAILQ_REMOVE (&task->chain_results, chain_res, next); - free (chain_res); } while (!TAILQ_EMPTY (&task->parts)) { @@ -126,9 +104,9 @@ free_task (struct worker_task *task) g_object_unref (part->type); g_object_unref (part->content); TAILQ_REMOVE (&task->parts, part, next); - free (part); } - free (task); + memory_pool_delete (task->task_pool); + g_free (task); } } @@ -180,7 +158,7 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data) if (g_mime_data_wrapper_write_to_stream (wrapper, part_stream) != -1) { part_content = g_mime_stream_mem_get_byte_array (GMIME_STREAM_MEM (part_stream)); type = (GMimeContentType *)g_mime_part_get_content_type (GMIME_PART (part)); - mime_part = g_malloc (sizeof (struct mime_part)); + mime_part = memory_pool_alloc (task->task_pool, sizeof (struct mime_part)); mime_part->type = type; mime_part->content = part_content; TAILQ_INSERT_TAIL (&task->parts, mime_part, next); @@ -227,7 +205,7 @@ process_filters (struct worker_task *task) } } while (c_filter != NULL) { - res = malloc (sizeof (struct filter_result)); + res = memory_pool_alloc (task->task_pool, sizeof (struct filter_result)); if (res == NULL) { msg_err ("process_filters: malloc failed, %m"); return -1; @@ -273,14 +251,14 @@ process_filters (struct worker_task *task) /* Process perl chains */ while (chain != NULL) { - chain_res = malloc (sizeof (struct chain_result)); + chain_res = memory_pool_alloc (task->task_pool, sizeof (struct chain_result)); if (chain_res == NULL) { msg_err ("process_filters: malloc failed, %m"); return -1; } i = 0; chain_res->chain = chain; - chain_res->marks = malloc (sizeof (int) * chain->scripts_number); + chain_res->marks = memory_pool_alloc (task->task_pool, sizeof (int) * chain->scripts_number); chain_res->result_mark = 0; if (chain_res->marks == NULL) { free (chain_res); @@ -292,7 +270,7 @@ process_filters (struct worker_task *task) /* Skip chain filters first */ continue; } - res = malloc (sizeof (struct filter_result)); + res = memory_pool_alloc (task->task_pool, sizeof (struct filter_result)); if (res == NULL) { msg_err ("process_filters: malloc failed, %m"); return -1; @@ -412,7 +390,7 @@ read_socket (struct bufferevent *bev, void *arg) { struct worker_task *task = (struct worker_task *)arg; ssize_t r; - char *s, *c; + char *s; switch (task->state) { case READ_COMMAND: @@ -503,7 +481,7 @@ accept_socket (int fd, short what, void *arg) return; } - new_task = malloc (sizeof (struct worker_task)); + new_task = g_malloc (sizeof (struct worker_task)); if (new_task == NULL) { msg_err ("accept_socket: cannot allocate memory for task, %m"); return; @@ -517,8 +495,8 @@ accept_socket (int fd, short what, void *arg) TAILQ_INIT (&new_task->urls); TAILQ_INIT (&new_task->results); TAILQ_INIT (&new_task->parts); - new_task->memc_ctx = malloc (sizeof (memcached_ctx_t)); - new_task->task_pool = memory_pool_new (1024); + new_task->task_pool = memory_pool_new (TASK_POOL_SIZE); + new_task->memc_ctx = memory_pool_alloc (new_task->task_pool, sizeof (memcached_ctx_t)); if (new_task->memc_ctx == NULL) { msg_err ("accept_socket: cannot allocate memory for memcached ctx, %m"); }