From: Vsevolod Stakhov Date: Tue, 2 Dec 2008 16:58:28 +0000 (+0300) Subject: * Add learning interface to rspamd (still requires classifier to work) X-Git-Tag: 0.2.7~340 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=d62fb36650acfd0863c32a78b0941a4c0d0e58b1;p=rspamd.git * Add learning interface to rspamd (still requires classifier to work) --- diff --git a/src/cfg_file.h b/src/cfg_file.h index c1c95b620..f31efdc6c 100644 --- a/src/cfg_file.h +++ b/src/cfg_file.h @@ -33,6 +33,8 @@ #define DEFAULT_UPSTREAM_ERROR_TIME 10 #define DEFAULT_UPSTREAM_DEAD_TIME 300 #define DEFAULT_UPSTREAM_MAXERRORS 10 +/* Statfile pool size, 50Mb */ +#define DEFAULT_STATFILE_SIZE 52428800L /* 1 worker by default */ #define DEFAULT_WORKERS_NUM 1 @@ -95,7 +97,8 @@ struct module_opt { struct statfile { char *alias; char *pattern; - double weight; + double weight; + size_t size; }; struct config_file { @@ -124,6 +127,7 @@ struct config_file { int log_level; char *log_file; int log_fd; + size_t max_statfile_size; struct memcached_server memcached_servers[MAX_MEMCACHED_SERVERS]; size_t memcached_servers_num; diff --git a/src/cfg_file.l b/src/cfg_file.l index 582784971..f9a90bb7d 100644 --- a/src/cfg_file.l +++ b/src/cfg_file.l @@ -54,6 +54,7 @@ statfile return STATFILE; alias return ALIAS; pattern return PATTERN; weight return WEIGHT; +size return SIZE; logging return LOGGING; @@ -70,6 +71,8 @@ ERROR return LOG_LEVEL_ERROR; log_facility return LOG_FACILITY; log_file return LOG_FILENAME; +statfile_pool_size return STATFILE_POOL_SIZE; + \{ return OBRACE; \} return EBRACE; ; return SEMICOLON; diff --git a/src/cfg_file.y b/src/cfg_file.y index 62f247485..dc57df2a4 100644 --- a/src/cfg_file.y +++ b/src/cfg_file.y @@ -53,7 +53,7 @@ struct statfile *cur_statfile = NULL; %token REQUIRED_SCORE FUNCTION FRACT COMPOSITES CONTROL PASSWORD %token LOGGING LOG_TYPE LOG_TYPE_CONSOLE LOG_TYPE_SYSLOG LOG_TYPE_FILE %token LOG_LEVEL LOG_LEVEL_DEBUG LOG_LEVEL_INFO LOG_LEVEL_WARNING LOG_LEVEL_ERROR LOG_FACILITY LOG_FILENAME -%token STATFILE ALIAS PATTERN WEIGHT +%token STATFILE ALIAS PATTERN WEIGHT STATFILE_POOL_SIZE SIZE %type STRING %type VARIABLE @@ -94,6 +94,7 @@ command : | composites | logging | statfile + | statfile_pool_size ; tempdir : @@ -544,7 +545,8 @@ loggingfile: statfile: STATFILE OBRACE statfilebody EBRACE { - if (cur_statfile == NULL || cur_statfile->alias == NULL || cur_statfile->pattern == NULL || cur_statfile->weight == 0) { + if (cur_statfile == NULL || cur_statfile->alias == NULL || cur_statfile->pattern == NULL + || cur_statfile->weight == 0 || cur_statfile->size == 0) { yyerror ("yyparse: not enough arguments in statfile definition"); YYERROR; } @@ -562,6 +564,7 @@ statfilecmd: | statfilealias | statfilepattern | statfileweight + | statfilesize ; statfilealias: @@ -597,7 +600,30 @@ statfileweight: } ; +statfilesize: + SIZE EQSIGN NUMBER { + if (cur_statfile == NULL) { + cur_statfile = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct statfile)); + } + cur_statfile->size = $3; + } + | WEIGHT EQSIGN SIZELIMIT { + if (cur_statfile == NULL) { + cur_statfile = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct statfile)); + } + cur_statfile->size = $3; + } + ; + +statfile_pool_size: + STATFILE_POOL_SIZE EQSIGN SIZELIMIT { + cfg->max_statfile_size = $3; + } + | STATFILE_POOL_SIZE EQSIGN NUMBER { + cfg->max_statfile_size = $3; + } + ; %% /* * vi:ts=4 diff --git a/src/cfg_utils.c b/src/cfg_utils.c index 25e03c2a0..72ae97517 100644 --- a/src/cfg_utils.c +++ b/src/cfg_utils.c @@ -163,6 +163,7 @@ init_defaults (struct config_file *cfg) cfg->memcached_protocol = TCP_TEXT; cfg->workers_number = DEFAULT_WORKERS_NUM; + cfg->max_statfile_size = DEFAULT_STATFILE_SIZE; cfg->modules_opts = g_hash_table_new (g_str_hash, g_str_equal); cfg->variables = g_hash_table_new (g_str_hash, g_str_equal); cfg->metrics = g_hash_table_new (g_str_hash, g_str_equal); diff --git a/src/controller.c b/src/controller.c index 1efb8c35c..393bc2347 100644 --- a/src/controller.c +++ b/src/controller.c @@ -22,6 +22,7 @@ #include "upstream.h" #include "cfg_file.h" #include "modules.h" +#include "tokenizers/tokenizers.h" #define CRLF "\r\n" @@ -32,6 +33,7 @@ enum command_type { COMMAND_STAT, COMMAND_SHUTDOWN, COMMAND_UPTIME, + COMMAND_LEARN, }; struct controller_command { @@ -47,6 +49,7 @@ static struct controller_command commands[] = { {"stat", 0, COMMAND_STAT}, {"shutdown", 1, COMMAND_SHUTDOWN}, {"uptime", 0, COMMAND_UPTIME}, + {"learn", 1, COMMAND_LEARN}, }; static GCompletion *comp; @@ -112,14 +115,16 @@ check_auth (struct controller_command *cmd, struct controller_session *session) static void process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session) { - char out_buf[512], *arg; + char out_buf[512], *arg, *err_str; int r = 0, days, hours, minutes; time_t uptime; + unsigned long size = 0; + struct statfile *statfile; switch (cmd->type) { case COMMAND_PASSWORD: arg = *cmd_args; - if (*arg == '\0') { + if (!arg || *arg == '\0') { msg_debug ("process_command: empty password passed"); r = snprintf (out_buf, sizeof (out_buf), "password command requires one argument" CRLF); bufferevent_write (session->bev, out_buf, r); @@ -185,10 +190,104 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control minutes, minutes > 1 ? "s" : " ", (int)uptime, uptime > 1 ? "s" : " "); } - + bufferevent_write (session->bev, out_buf, r); } - bufferevent_write (session->bev, out_buf, r); break; + case COMMAND_LEARN: + if (check_auth (cmd, session)) { + arg = *cmd_args++; + if (!arg || *arg == '\0') { + msg_debug ("process_command: no statfile specified in learn command"); + r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF); + bufferevent_write (session->bev, out_buf, r); + return; + } + arg = *cmd_args; + if (arg == NULL || *arg == '\0') { + msg_debug ("process_command: no statfile size specified in learn command"); + r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF); + bufferevent_write (session->bev, out_buf, r); + return; + } + size = strtoul (arg, &err_str, 10); + if (err_str && *err_str != '\0') { + msg_debug ("process_command: statfile size is invalid: %s", arg); + r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF); + bufferevent_write (session->bev, out_buf, r); + return; + } + session->learn_buf = memory_pool_alloc (session->session_pool, sizeof (f_str_buf_t)); + session->learn_buf->buf = fstralloc (session->session_pool, size); + if (session->learn_buf->buf == NULL) { + r = snprintf (out_buf, sizeof (out_buf), "allocating buffer for learn failed" CRLF); + bufferevent_write (session->bev, out_buf, r); + return; + } + + statfile = g_hash_table_lookup (session->cfg->statfiles, arg); + if (statfile == NULL) { + r = snprintf (out_buf, sizeof (out_buf), "statfile %s is not defined" CRLF, arg); + bufferevent_write (session->bev, out_buf, r); + return; + + } + session->learn_rcpt = NULL; + session->learn_from = NULL; + session->learn_filename = NULL; + session->learn_tokenizer = get_tokenizer ("osb-text"); + /* Get all arguments */ + while (*cmd_args++) { + arg = *cmd_args; + if (*arg == '-') { + switch (*(arg + 1)) { + case 'r': + arg = *(cmd_args + 1); + if (!arg || *arg == '\0') { + r = snprintf (out_buf, sizeof (out_buf), "recipient is not defined" CRLF, arg); + bufferevent_write (session->bev, out_buf, r); + return; + } + session->learn_rcpt = memory_pool_strdup (session->session_pool, arg); + break; + case 'f': + arg = *(cmd_args + 1); + if (!arg || *arg == '\0') { + r = snprintf (out_buf, sizeof (out_buf), "from is not defined" CRLF, arg); + bufferevent_write (session->bev, out_buf, r); + return; + } + session->learn_from = memory_pool_strdup (session->session_pool, arg); + break; + case 't': + arg = *(cmd_args + 1); + if (!arg || *arg == '\0' || (session->learn_tokenizer = get_tokenizer (arg)) == NULL) { + r = snprintf (out_buf, sizeof (out_buf), "tokenizer is not defined" CRLF, arg); + bufferevent_write (session->bev, out_buf, r); + return; + } + break; + } + } + } + session->learn_filename = resolve_stat_filename (session->session_pool, statfile->pattern, session->learn_rcpt, session->learn_from); + if (statfile_pool_open (session->worker->srv->statfile_pool, session->learn_filename) == -1) { + /* Try to create statfile */ + if (statfile_pool_create (session->worker->srv->statfile_pool, session->learn_filename, statfile->size) == -1) { + r = snprintf (out_buf, sizeof (out_buf), "cannot create statfile %s" CRLF, session->learn_filename); + bufferevent_write (session->bev, out_buf, r); + return; + } + if (statfile_pool_open (session->worker->srv->statfile_pool, session->learn_filename) == -1) { + r = snprintf (out_buf, sizeof (out_buf), "cannot open statfile %s" CRLF, session->learn_filename); + bufferevent_write (session->bev, out_buf, r); + return; + } + } + session->state = STATE_LEARN; + r = snprintf (out_buf, sizeof (out_buf), "ok" CRLF); + bufferevent_write (session->bev, out_buf, r); + break; + } } } @@ -200,39 +299,64 @@ read_socket (struct bufferevent *bev, void *arg) char *s, **params, *cmd, out_buf[128]; GList *comp_list; - s = evbuffer_readline (EVBUFFER_INPUT (bev)); - if (s != NULL && *s != 0) { - len = strlen (s); - /* Remove end of line characters from string */ - if (s[len - 1] == '\n') { - if (s[len - 2] == '\r') { - s[len - 2] = 0; + switch (session->state) { + case STATE_COMMAND: + s = evbuffer_readline (EVBUFFER_INPUT (bev)); + if (s != NULL && *s != 0) { + len = strlen (s); + /* Remove end of line characters from string */ + if (s[len - 1] == '\n') { + if (s[len - 2] == '\r') { + s[len - 2] = 0; + } + s[len - 1] = 0; + } + params = g_strsplit (s, " ", -1); + len = g_strv_length (params); + if (len > 0) { + cmd = g_strstrip (params[0]); + comp_list = g_completion_complete (comp, cmd, NULL); + switch (g_list_length (comp_list)) { + case 1: + process_command ((struct controller_command *)comp_list->data, ¶ms[1], session); + break; + case 0: + i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF); + bufferevent_write (bev, out_buf, i); + break; + default: + i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF); + bufferevent_write (bev, out_buf, i); + break; + } + } + g_strfreev (params); } - s[len - 1] = 0; - } - params = g_strsplit (s, " ", -1); - len = g_strv_length (params); - if (len > 0) { - cmd = g_strstrip (params[0]); - comp_list = g_completion_complete (comp, cmd, NULL); - switch (g_list_length (comp_list)) { - case 1: - process_command ((struct controller_command *)comp_list->data, ¶ms[1], session); - break; - case 0: - i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF); - bufferevent_write (bev, out_buf, i); - break; - default: - i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF); + if (s != NULL) { + free (s); + } + break; + case STATE_LEARN: + i = bufferevent_read (bev, session->learn_buf->pos, session->learn_buf->free); + if (i > 0) { + session->learn_buf->pos += i; + update_buf_size (session->learn_buf); + if (session->learn_buf->free == 0) { + /* XXX: require to insert real learning code here */ + session->worker->srv->stat->messages_learned ++; + i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF); bufferevent_write (bev, out_buf, i); + session->state = STATE_COMMAND; break; + } } - } - g_strfreev (params); - } - if (s != NULL) { - free (s); + else { + i = snprintf (out_buf, sizeof (out_buf), "read error: %d" CRLF, i); + bufferevent_write (bev, out_buf, i); + bufferevent_disable (bev, EV_READ); + free_session (session); + } + break; } } @@ -252,7 +376,7 @@ static void err_socket (struct bufferevent *bev, short what, void *arg) { struct controller_session *session = (struct controller_session *)arg; - msg_info ("closing connection"); + msg_info ("closing control connection"); /* Free buffers */ free_session (session); } @@ -282,6 +406,7 @@ accept_socket (int fd, short what, void *arg) new_session->worker = worker; new_session->sock = nfd; new_session->cfg = worker->srv->cfg; + new_session->state = STATE_COMMAND; new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1); memory_pool_add_destructor (new_session->session_pool, (pool_destruct_func)bufferevent_free, new_session->bev); worker->srv->stat->control_connections_count ++; diff --git a/src/main.c b/src/main.c index d2dec7e47..445b0e42b 100644 --- a/src/main.c +++ b/src/main.c @@ -318,6 +318,9 @@ main (int argc, char **argv) TAILQ_INIT (&rspamd->workers); setproctitle ("main process"); + + /* Init statfile pool */ + rspamd->statfile_pool = statfile_pool_new (cfg->max_statfile_size); for (i = 0; i < cfg->workers_number; i++) { fork_worker (rspamd, listen_sock, 0, TYPE_WORKER); diff --git a/src/main.h b/src/main.h index efb716ab0..7f2a60c4c 100644 --- a/src/main.h +++ b/src/main.h @@ -21,6 +21,7 @@ #include "fstring.h" #include "mem_pool.h" +#include "statfile.h" #include "url.h" #include "memcached.h" #include "protocol.h" @@ -81,6 +82,7 @@ struct rspamd_worker { struct pidfh; struct config_file; +struct tokenizer; /* Server statistics */ struct rspamd_stat { @@ -103,6 +105,7 @@ struct rspamd_main { struct rspamd_stat *stat; memory_pool_t *server_pool; + statfile_pool_t *statfile_pool; TAILQ_HEAD (workq, rspamd_worker) workers; }; @@ -122,12 +125,21 @@ struct save_point { /* Control session */ struct controller_session { struct rspamd_worker *worker; + enum { + STATE_COMMAND, + STATE_LEARN, + } state; int sock; /* Access to authorized commands */ int authorized; memory_pool_t *session_pool; struct bufferevent *bev; struct config_file *cfg; + char *learn_rcpt; + char *learn_from; + struct tokenizer *learn_tokenizer; + char *learn_filename; + f_str_buf_t *learn_buf; }; /* Worker task structure */ diff --git a/src/tokenizers/osb.c b/src/tokenizers/osb.c index f78e20992..6799a121b 100644 --- a/src/tokenizers/osb.c +++ b/src/tokenizers/osb.c @@ -21,7 +21,7 @@ static const int primes[] = { }; token_list_t * -osb_tokenize_text (memory_pool_t *pool, f_str_t *input) +osb_tokenize_text (struct tokenizer *tokenizer, memory_pool_t *pool, f_str_t *input) { token_list_t *new = NULL, *head = NULL, *last = NULL; f_str_t token = { NULL, 0, 0 }; @@ -33,7 +33,7 @@ osb_tokenize_text (memory_pool_t *pool, f_str_t *input) hashpipe[i] = 0xABCDEF; } - while (get_next_word (input, &token)) { + while (tokenizer->get_next_word (input, &token)) { /* Shift hashpipe */ for (i = FEATURE_WINDOW_SIZE - 1; i > 0; i --) { hashpipe[i] = hashpipe[i - 1]; diff --git a/src/tokenizers/tokenizers.c b/src/tokenizers/tokenizers.c index 132a57ce0..25b13a289 100644 --- a/src/tokenizers/tokenizers.c +++ b/src/tokenizers/tokenizers.c @@ -5,6 +5,24 @@ #include #include "tokenizers.h" +struct tokenizer tokenizers[] = { + {"osb-text", osb_tokenize_text, get_next_word }, +}; + +struct tokenizer* +get_tokenizer (char *name) +{ + int i; + + for (i = 0; i < sizeof (tokenizers) / sizeof (tokenizers[0]); i ++) { + if (strcmp (tokenizers[i].name, name) == 0) { + return &tokenizers[i]; + } + } + + return NULL; +} + /* Get next word from specified f_str_t buf */ f_str_t * get_next_word (f_str_t *buf, f_str_t *token) diff --git a/src/tokenizers/tokenizers.h b/src/tokenizers/tokenizers.h index 6b4bff5e0..96a2027a5 100644 --- a/src/tokenizers/tokenizers.h +++ b/src/tokenizers/tokenizers.h @@ -20,8 +20,23 @@ typedef struct token_list_s { struct token_list_s *next; } token_list_t; + +/* Common tokenizer structure */ +struct tokenizer { + char *name; + token_list_t* (*tokenize_func)(struct tokenizer *tokenizer, memory_pool_t *pool, f_str_t *input); + f_str_t* (*get_next_word)(f_str_t *buf, f_str_t *token); +}; + +/* Get tokenizer structure by name or return NULL if this name is not found */ +struct tokenizer* get_tokenizer (char *name); /* Get next word from specified f_str_t buf */ f_str_t *get_next_word (f_str_t *buf, f_str_t *token); +/* OSB tokenize function */ +token_list_t* osb_tokenize_text (struct tokenizer *tokenizer, memory_pool_t *pool, f_str_t *input); + +/* Array of all defined tokenizers */ +extern struct tokenizer tokenizers[]; #endif /*