]> source.dussan.org Git - rspamd.git/commitdiff
* Add learning interface to rspamd (still requires classifier to work)
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 2 Dec 2008 16:58:28 +0000 (19:58 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 2 Dec 2008 16:58:28 +0000 (19:58 +0300)
src/cfg_file.h
src/cfg_file.l
src/cfg_file.y
src/cfg_utils.c
src/controller.c
src/main.c
src/main.h
src/tokenizers/osb.c
src/tokenizers/tokenizers.c
src/tokenizers/tokenizers.h

index c1c95b6201a6e1ac09028445b9fc0af02b651160..f31efdc6cbf1b4923f544d84503e19e1f56975bb 100644 (file)
@@ -33,6 +33,8 @@
 #define DEFAULT_UPSTREAM_ERROR_TIME 10
 #define DEFAULT_UPSTREAM_DEAD_TIME 300
 #define DEFAULT_UPSTREAM_MAXERRORS 10
+/* Statfile pool size, 50Mb */
+#define DEFAULT_STATFILE_SIZE 52428800L
 
 /* 1 worker by default */
 #define DEFAULT_WORKERS_NUM 1
@@ -95,7 +97,8 @@ struct module_opt {
 struct statfile {
        char *alias;
        char *pattern;
-       double weight;  
+       double weight;
+       size_t size;
 };
 
 struct config_file {
@@ -124,6 +127,7 @@ struct config_file {
        int log_level;
        char *log_file;
        int log_fd;
+       size_t max_statfile_size;
 
        struct memcached_server memcached_servers[MAX_MEMCACHED_SERVERS];
        size_t memcached_servers_num;
index 5827849716f670f7cf94dfa55563b4c27eb90afb..f9a90bb7dbfe0ab2be117d5d177d8cc602773dd8 100644 (file)
@@ -54,6 +54,7 @@ statfile                                              return STATFILE;
 alias                                                  return ALIAS;
 pattern                                                        return PATTERN;
 weight                                                 return WEIGHT;
+size                                                   return SIZE;
 
 logging                                                        return LOGGING;
 
@@ -70,6 +71,8 @@ ERROR                                                 return LOG_LEVEL_ERROR;
 log_facility                                   return LOG_FACILITY;
 log_file                                               return LOG_FILENAME;
 
+statfile_pool_size                             return STATFILE_POOL_SIZE;
+
 \{                                                             return OBRACE;
 \}                                                             return EBRACE;
 ;                                                              return SEMICOLON;
index 62f24748555a5d0d59fa312f95763de24a8c11c8..dc57df2a4cce51f4010686456f66847a91d0341c 100644 (file)
@@ -53,7 +53,7 @@ struct statfile *cur_statfile = NULL;
 %token  REQUIRED_SCORE FUNCTION FRACT COMPOSITES CONTROL PASSWORD
 %token  LOGGING LOG_TYPE LOG_TYPE_CONSOLE LOG_TYPE_SYSLOG LOG_TYPE_FILE
 %token  LOG_LEVEL LOG_LEVEL_DEBUG LOG_LEVEL_INFO LOG_LEVEL_WARNING LOG_LEVEL_ERROR LOG_FACILITY LOG_FILENAME
-%token  STATFILE ALIAS PATTERN WEIGHT
+%token  STATFILE ALIAS PATTERN WEIGHT STATFILE_POOL_SIZE SIZE
 
 %type  <string>        STRING
 %type  <string>        VARIABLE
@@ -94,6 +94,7 @@ command       :
        | composites
        | logging
        | statfile
+       | statfile_pool_size
        ;
 
 tempdir :
@@ -544,7 +545,8 @@ loggingfile:
 
 statfile:
        STATFILE OBRACE statfilebody EBRACE {
-               if (cur_statfile == NULL || cur_statfile->alias == NULL || cur_statfile->pattern == NULL || cur_statfile->weight == 0) {
+               if (cur_statfile == NULL || cur_statfile->alias == NULL || cur_statfile->pattern == NULL 
+                       || cur_statfile->weight == 0 || cur_statfile->size == 0) {
                        yyerror ("yyparse: not enough arguments in statfile definition");
                        YYERROR;
                }
@@ -562,6 +564,7 @@ statfilecmd:
        | statfilealias
        | statfilepattern
        | statfileweight
+       | statfilesize
        ;
        
 statfilealias:
@@ -597,7 +600,30 @@ statfileweight:
        }
        ;
 
+statfilesize:
+       SIZE EQSIGN NUMBER {
+               if (cur_statfile == NULL) {
+                       cur_statfile = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct statfile));
+               }
+               cur_statfile->size = $3;
+       }
+       | WEIGHT EQSIGN SIZELIMIT {
+               if (cur_statfile == NULL) {
+                       cur_statfile = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct statfile));
+               }
+               cur_statfile->size = $3;
+       }
+       ;
+
 
+statfile_pool_size:
+       STATFILE_POOL_SIZE EQSIGN SIZELIMIT {
+               cfg->max_statfile_size = $3;
+       }
+       | STATFILE_POOL_SIZE EQSIGN NUMBER {
+               cfg->max_statfile_size = $3;
+       }
+       ;
 %%
 /* 
  * vi:ts=4 
index 25e03c2a0ae3f06f909929bd05c5144ebecfe42f..72ae975172cf838dac58a3c8cf7b5bd2883f3145 100644 (file)
@@ -163,6 +163,7 @@ init_defaults (struct config_file *cfg)
        cfg->memcached_protocol = TCP_TEXT;
 
        cfg->workers_number = DEFAULT_WORKERS_NUM;
+       cfg->max_statfile_size = DEFAULT_STATFILE_SIZE;
        cfg->modules_opts = g_hash_table_new (g_str_hash, g_str_equal);
        cfg->variables = g_hash_table_new (g_str_hash, g_str_equal);
        cfg->metrics = g_hash_table_new (g_str_hash, g_str_equal);
index 1efb8c35c053b5042c86332b0d3594d8b17bee52..393bc234790378e04420b571611ae86e2bbee3ab 100644 (file)
@@ -22,6 +22,7 @@
 #include "upstream.h"
 #include "cfg_file.h"
 #include "modules.h"
+#include "tokenizers/tokenizers.h"
 
 #define CRLF "\r\n"
 
@@ -32,6 +33,7 @@ enum command_type {
        COMMAND_STAT,
        COMMAND_SHUTDOWN,
        COMMAND_UPTIME,
+       COMMAND_LEARN,
 };
 
 struct controller_command {
@@ -47,6 +49,7 @@ static struct controller_command commands[] = {
        {"stat", 0, COMMAND_STAT},
        {"shutdown", 1, COMMAND_SHUTDOWN},
        {"uptime", 0, COMMAND_UPTIME},
+       {"learn", 1, COMMAND_LEARN},
 };
 
 static GCompletion *comp;
@@ -112,14 +115,16 @@ check_auth (struct controller_command *cmd, struct controller_session *session)
 static void
 process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session)
 {
-       char out_buf[512], *arg;
+       char out_buf[512], *arg, *err_str;
        int r = 0, days, hours, minutes;
        time_t uptime;
+       unsigned long size = 0;
+       struct statfile *statfile;
 
        switch (cmd->type) {
                case COMMAND_PASSWORD:
                        arg = *cmd_args;
-                       if (*arg == '\0') {
+                       if (!arg || *arg == '\0') {
                                msg_debug ("process_command: empty password passed");
                                r = snprintf (out_buf, sizeof (out_buf), "password command requires one argument" CRLF);
                                bufferevent_write (session->bev, out_buf, r);
@@ -185,10 +190,104 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
                                                                minutes, minutes > 1 ? "s" : " ",
                                                                (int)uptime, uptime > 1 ? "s" : " ");
                                }
-                               
+                               bufferevent_write (session->bev, out_buf, r);
                        }
-                       bufferevent_write (session->bev, out_buf, r);
                        break;
+               case COMMAND_LEARN:
+                       if (check_auth (cmd, session)) {
+                               arg = *cmd_args++;
+                               if (!arg || *arg == '\0') {
+                                       msg_debug ("process_command: no statfile specified in learn command");
+                                       r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF);
+                                       bufferevent_write (session->bev, out_buf, r);
+                                       return;
+                               }
+                               arg = *cmd_args;
+                               if (arg == NULL || *arg == '\0') {
+                                       msg_debug ("process_command: no statfile size specified in learn command");
+                                       r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF);
+                                       bufferevent_write (session->bev, out_buf, r);
+                                       return;
+                               }
+                               size = strtoul (arg, &err_str, 10);
+                               if (err_str && *err_str != '\0') {
+                                       msg_debug ("process_command: statfile size is invalid: %s", arg);
+                                       r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
+                                       bufferevent_write (session->bev, out_buf, r);
+                                       return;
+                               }
+                               session->learn_buf = memory_pool_alloc (session->session_pool, sizeof (f_str_buf_t));
+                               session->learn_buf->buf = fstralloc (session->session_pool, size);
+                               if (session->learn_buf->buf == NULL) {
+                                       r = snprintf (out_buf, sizeof (out_buf), "allocating buffer for learn failed" CRLF);
+                                       bufferevent_write (session->bev, out_buf, r);
+                                       return;
+                               }
+
+                               statfile = g_hash_table_lookup (session->cfg->statfiles, arg);
+                               if (statfile == NULL) {
+                                       r = snprintf (out_buf, sizeof (out_buf), "statfile %s is not defined" CRLF, arg);
+                                       bufferevent_write (session->bev, out_buf, r);
+                                       return;
+
+                               }
+                               session->learn_rcpt = NULL;
+                               session->learn_from = NULL;
+                               session->learn_filename = NULL;
+                               session->learn_tokenizer = get_tokenizer ("osb-text");
+                               /* Get all arguments */
+                               while (*cmd_args++) {
+                                       arg = *cmd_args;
+                                       if (*arg == '-') {
+                                               switch (*(arg + 1)) {
+                                                       case 'r':
+                                                               arg = *(cmd_args + 1);
+                                                               if (!arg || *arg == '\0') {
+                                                                       r = snprintf (out_buf, sizeof (out_buf), "recipient is not defined" CRLF, arg);
+                                                                       bufferevent_write (session->bev, out_buf, r);
+                                                                       return;
+                                                               }
+                                                               session->learn_rcpt = memory_pool_strdup (session->session_pool, arg);
+                                                               break;
+                                                       case 'f':
+                                                               arg = *(cmd_args + 1);
+                                                               if (!arg || *arg == '\0') {
+                                                                       r = snprintf (out_buf, sizeof (out_buf), "from is not defined" CRLF, arg);
+                                                                       bufferevent_write (session->bev, out_buf, r);
+                                                                       return;
+                                                               }
+                                                               session->learn_from = memory_pool_strdup (session->session_pool, arg);
+                                                               break;
+                                                       case 't':
+                                                               arg = *(cmd_args + 1);
+                                                               if (!arg || *arg == '\0' || (session->learn_tokenizer = get_tokenizer (arg)) == NULL) {
+                                                                       r = snprintf (out_buf, sizeof (out_buf), "tokenizer is not defined" CRLF, arg);
+                                                                       bufferevent_write (session->bev, out_buf, r);
+                                                                       return;
+                                                               }
+                                                               break;
+                                               }
+                                       }
+                               }
+                               session->learn_filename = resolve_stat_filename (session->session_pool, statfile->pattern, session->learn_rcpt, session->learn_from);
+                               if (statfile_pool_open (session->worker->srv->statfile_pool, session->learn_filename) == -1) {
+                                       /* Try to create statfile */
+                                       if (statfile_pool_create (session->worker->srv->statfile_pool, session->learn_filename, statfile->size) == -1) {
+                                               r = snprintf (out_buf, sizeof (out_buf), "cannot create statfile %s" CRLF, session->learn_filename);
+                                               bufferevent_write (session->bev, out_buf, r);
+                                               return;
+                                       }
+                                       if (statfile_pool_open (session->worker->srv->statfile_pool, session->learn_filename) == -1) {
+                                               r = snprintf (out_buf, sizeof (out_buf), "cannot open statfile %s" CRLF, session->learn_filename);
+                                               bufferevent_write (session->bev, out_buf, r);
+                                               return;
+                                       }
+                               }
+                               session->state = STATE_LEARN;
+                               r = snprintf (out_buf, sizeof (out_buf), "ok" CRLF);
+                               bufferevent_write (session->bev, out_buf, r);
+                               break;
+                       }
        }
 }
 
@@ -200,39 +299,64 @@ read_socket (struct bufferevent *bev, void *arg)
        char *s, **params, *cmd, out_buf[128];
        GList *comp_list;
 
-       s = evbuffer_readline (EVBUFFER_INPUT (bev));
-       if (s != NULL && *s != 0) {
-               len = strlen (s);
-               /* Remove end of line characters from string */
-               if (s[len - 1] == '\n') {
-                       if (s[len - 2] == '\r') {
-                               s[len - 2] = 0;
+       switch (session->state) {
+               case STATE_COMMAND:
+                       s = evbuffer_readline (EVBUFFER_INPUT (bev));
+                       if (s != NULL && *s != 0) {
+                               len = strlen (s);
+                               /* Remove end of line characters from string */
+                               if (s[len - 1] == '\n') {
+                                       if (s[len - 2] == '\r') {
+                                               s[len - 2] = 0;
+                                       }
+                                       s[len - 1] = 0;
+                               }
+                               params = g_strsplit (s, " ", -1);
+                               len = g_strv_length (params);
+                               if (len > 0) {
+                                       cmd = g_strstrip (params[0]);
+                                       comp_list = g_completion_complete (comp, cmd, NULL);
+                                       switch (g_list_length (comp_list)) {
+                                               case 1:
+                                                       process_command ((struct controller_command *)comp_list->data, &params[1], session);
+                                                       break;
+                                               case 0:
+                                                       i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
+                                                       bufferevent_write (bev, out_buf, i);
+                                                       break;
+                                               default:
+                                                       i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF);
+                                                       bufferevent_write (bev, out_buf, i);
+                                                       break;
+                                       }
+                               }
+                               g_strfreev (params);
                        }
-                       s[len - 1] = 0;
-               }
-               params = g_strsplit (s, " ", -1);
-               len = g_strv_length (params);
-               if (len > 0) {
-                       cmd = g_strstrip (params[0]);
-                       comp_list = g_completion_complete (comp, cmd, NULL);
-                       switch (g_list_length (comp_list)) {
-                               case 1:
-                                       process_command ((struct controller_command *)comp_list->data, &params[1], session);
-                                       break;
-                               case 0:
-                                       i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
-                                       bufferevent_write (bev, out_buf, i);
-                                       break;
-                               default:
-                                       i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF);
+                       if (s != NULL) {
+                               free (s);
+                       }
+                       break;
+               case STATE_LEARN:
+                       i = bufferevent_read (bev, session->learn_buf->pos, session->learn_buf->free);
+                       if (i > 0) {
+                               session->learn_buf->pos += i;
+                               update_buf_size (session->learn_buf);
+                               if (session->learn_buf->free == 0) {
+                                       /* XXX: require to insert real learning code here */
+                                       session->worker->srv->stat->messages_learned ++;
+                                       i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
                                        bufferevent_write (bev, out_buf, i);
+                                       session->state = STATE_COMMAND;
                                        break;
+                               }
                        }
-               }
-               g_strfreev (params);
-       }
-       if (s != NULL) {
-               free (s);
+                       else {
+                               i = snprintf (out_buf, sizeof (out_buf), "read error: %d" CRLF, i);
+                               bufferevent_write (bev, out_buf, i);
+                               bufferevent_disable (bev, EV_READ);
+                               free_session (session);
+                       }
+                       break;
        }
 }
 
@@ -252,7 +376,7 @@ static void
 err_socket (struct bufferevent *bev, short what, void *arg)
 {
        struct controller_session *session = (struct controller_session *)arg;
-       msg_info ("closing connection");
+       msg_info ("closing control connection");
        /* Free buffers */
        free_session (session);
 }
@@ -282,6 +406,7 @@ accept_socket (int fd, short what, void *arg)
        new_session->worker = worker;
        new_session->sock = nfd;
        new_session->cfg = worker->srv->cfg;
+       new_session->state = STATE_COMMAND;
        new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1);
        memory_pool_add_destructor (new_session->session_pool, (pool_destruct_func)bufferevent_free, new_session->bev);
        worker->srv->stat->control_connections_count ++;
index d2dec7e47613f77535ce1e65233d79944f24f5fa..445b0e42bb8cc1ca023d3a0af68f6cf5a28157fd 100644 (file)
@@ -318,6 +318,9 @@ main (int argc, char **argv)
        TAILQ_INIT (&rspamd->workers);
 
        setproctitle ("main process");
+
+       /* Init statfile pool */
+       rspamd->statfile_pool = statfile_pool_new (cfg->max_statfile_size);
        
        for (i = 0; i < cfg->workers_number; i++) {
                fork_worker (rspamd, listen_sock, 0, TYPE_WORKER);
index efb716ab0054c9a4f7e1962ece069ca40d1fdf68..7f2a60c4cf9d984720dcf73ad1f9f3888e90c578 100644 (file)
@@ -21,6 +21,7 @@
 
 #include "fstring.h"
 #include "mem_pool.h"
+#include "statfile.h"
 #include "url.h"
 #include "memcached.h"
 #include "protocol.h"
@@ -81,6 +82,7 @@ struct rspamd_worker {
 
 struct pidfh;
 struct config_file;
+struct tokenizer;
 
 /* Server statistics */
 struct rspamd_stat {
@@ -103,6 +105,7 @@ struct rspamd_main {
        struct rspamd_stat *stat;
 
        memory_pool_t *server_pool;
+       statfile_pool_t *statfile_pool;
 
        TAILQ_HEAD (workq, rspamd_worker) workers;
 };
@@ -122,12 +125,21 @@ struct save_point {
 /* Control session */
 struct controller_session {
        struct rspamd_worker *worker;
+       enum {
+               STATE_COMMAND,
+               STATE_LEARN,
+       } state;
        int sock;
        /* Access to authorized commands */
        int authorized;
        memory_pool_t *session_pool;
        struct bufferevent *bev;
        struct config_file *cfg;
+       char *learn_rcpt;
+       char *learn_from;
+       struct tokenizer *learn_tokenizer;
+       char *learn_filename;
+       f_str_buf_t *learn_buf;
 };
 
 /* Worker task structure */
index f78e20992911641cc9f8b8eb8ac138a9bdc96481..6799a121b76f99deba50d020aec9cd0be008f131 100644 (file)
@@ -21,7 +21,7 @@ static const int primes[] = {
 };
 
 token_list_t *
-osb_tokenize_text (memory_pool_t *pool, f_str_t *input)
+osb_tokenize_text (struct tokenizer *tokenizer, memory_pool_t *pool, f_str_t *input)
 {
        token_list_t *new = NULL, *head = NULL, *last = NULL;
        f_str_t token = { NULL, 0, 0 };
@@ -33,7 +33,7 @@ osb_tokenize_text (memory_pool_t *pool, f_str_t *input)
                hashpipe[i] = 0xABCDEF;
        }
 
-       while (get_next_word (input, &token)) {
+       while (tokenizer->get_next_word (input, &token)) {
                /* Shift hashpipe */
                for (i = FEATURE_WINDOW_SIZE - 1; i > 0; i --) {
                        hashpipe[i] = hashpipe[i - 1];
index 132a57ce0ee32e8d28e7cef3fb6779584b9882bd..25b13a28942c52c93b2452331efb10437aebc651 100644 (file)
@@ -5,6 +5,24 @@
 #include <sys/types.h>
 #include "tokenizers.h"
 
+struct tokenizer tokenizers[] = {
+       {"osb-text", osb_tokenize_text, get_next_word },
+};
+
+struct tokenizer*
+get_tokenizer (char *name)
+{
+       int i;
+
+       for (i = 0; i < sizeof (tokenizers) / sizeof (tokenizers[0]); i ++) {
+               if (strcmp (tokenizers[i].name, name) == 0) {
+                       return &tokenizers[i];
+               }
+       }
+
+       return NULL;
+}
+
 /* Get next word from specified f_str_t buf */
 f_str_t *
 get_next_word (f_str_t *buf, f_str_t *token)
index 6b4bff5e034eee4e1b36ed746494dbfbad52f793..96a2027a59a47b68a10c83fe9dd525de718e3c24 100644 (file)
@@ -20,8 +20,23 @@ typedef struct token_list_s {
        struct token_list_s *next;
 } token_list_t;
 
+
+/* Common tokenizer structure */
+struct tokenizer {
+       char *name;
+       token_list_t* (*tokenize_func)(struct tokenizer *tokenizer, memory_pool_t *pool, f_str_t *input);
+       f_str_t* (*get_next_word)(f_str_t *buf, f_str_t *token);
+};
+
+/* Get tokenizer structure by name or return NULL if this name is not found */
+struct tokenizer* get_tokenizer (char *name);
 /* Get next word from specified f_str_t buf */
 f_str_t *get_next_word (f_str_t *buf, f_str_t *token);
+/* OSB tokenize function */
+token_list_t* osb_tokenize_text (struct tokenizer *tokenizer, memory_pool_t *pool, f_str_t *input);
+
+/* Array of all defined tokenizers */
+extern struct tokenizer tokenizers[];
 
 #endif
 /*