Bläddra i källkod

* Add learning interface to rspamd (still requires classifier to work)

tags/0.2.7
Vsevolod Stakhov 15 år sedan
förälder
incheckning
d62fb36650
10 ändrade filer med 246 tillägg och 39 borttagningar
  1. 5
    1
      src/cfg_file.h
  2. 3
    0
      src/cfg_file.l
  3. 28
    2
      src/cfg_file.y
  4. 1
    0
      src/cfg_utils.c
  5. 159
    34
      src/controller.c
  6. 3
    0
      src/main.c
  7. 12
    0
      src/main.h
  8. 2
    2
      src/tokenizers/osb.c
  9. 18
    0
      src/tokenizers/tokenizers.c
  10. 15
    0
      src/tokenizers/tokenizers.h

+ 5
- 1
src/cfg_file.h Visa fil

@@ -33,6 +33,8 @@
#define DEFAULT_UPSTREAM_ERROR_TIME 10
#define DEFAULT_UPSTREAM_DEAD_TIME 300
#define DEFAULT_UPSTREAM_MAXERRORS 10
/* Statfile pool size, 50Mb */
#define DEFAULT_STATFILE_SIZE 52428800L

/* 1 worker by default */
#define DEFAULT_WORKERS_NUM 1
@@ -95,7 +97,8 @@ struct module_opt {
struct statfile {
char *alias;
char *pattern;
double weight;
double weight;
size_t size;
};

struct config_file {
@@ -124,6 +127,7 @@ struct config_file {
int log_level;
char *log_file;
int log_fd;
size_t max_statfile_size;

struct memcached_server memcached_servers[MAX_MEMCACHED_SERVERS];
size_t memcached_servers_num;

+ 3
- 0
src/cfg_file.l Visa fil

@@ -54,6 +54,7 @@ statfile return STATFILE;
alias return ALIAS;
pattern return PATTERN;
weight return WEIGHT;
size return SIZE;

logging return LOGGING;

@@ -70,6 +71,8 @@ ERROR return LOG_LEVEL_ERROR;
log_facility return LOG_FACILITY;
log_file return LOG_FILENAME;

statfile_pool_size return STATFILE_POOL_SIZE;

\{ return OBRACE;
\} return EBRACE;
; return SEMICOLON;

+ 28
- 2
src/cfg_file.y Visa fil

@@ -53,7 +53,7 @@ struct statfile *cur_statfile = NULL;
%token REQUIRED_SCORE FUNCTION FRACT COMPOSITES CONTROL PASSWORD
%token LOGGING LOG_TYPE LOG_TYPE_CONSOLE LOG_TYPE_SYSLOG LOG_TYPE_FILE
%token LOG_LEVEL LOG_LEVEL_DEBUG LOG_LEVEL_INFO LOG_LEVEL_WARNING LOG_LEVEL_ERROR LOG_FACILITY LOG_FILENAME
%token STATFILE ALIAS PATTERN WEIGHT
%token STATFILE ALIAS PATTERN WEIGHT STATFILE_POOL_SIZE SIZE

%type <string> STRING
%type <string> VARIABLE
@@ -94,6 +94,7 @@ command :
| composites
| logging
| statfile
| statfile_pool_size
;

tempdir :
@@ -544,7 +545,8 @@ loggingfile:

statfile:
STATFILE OBRACE statfilebody EBRACE {
if (cur_statfile == NULL || cur_statfile->alias == NULL || cur_statfile->pattern == NULL || cur_statfile->weight == 0) {
if (cur_statfile == NULL || cur_statfile->alias == NULL || cur_statfile->pattern == NULL
|| cur_statfile->weight == 0 || cur_statfile->size == 0) {
yyerror ("yyparse: not enough arguments in statfile definition");
YYERROR;
}
@@ -562,6 +564,7 @@ statfilecmd:
| statfilealias
| statfilepattern
| statfileweight
| statfilesize
;
statfilealias:
@@ -597,7 +600,30 @@ statfileweight:
}
;

statfilesize:
SIZE EQSIGN NUMBER {
if (cur_statfile == NULL) {
cur_statfile = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct statfile));
}
cur_statfile->size = $3;
}
| WEIGHT EQSIGN SIZELIMIT {
if (cur_statfile == NULL) {
cur_statfile = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct statfile));
}
cur_statfile->size = $3;
}
;


statfile_pool_size:
STATFILE_POOL_SIZE EQSIGN SIZELIMIT {
cfg->max_statfile_size = $3;
}
| STATFILE_POOL_SIZE EQSIGN NUMBER {
cfg->max_statfile_size = $3;
}
;
%%
/*
* vi:ts=4

+ 1
- 0
src/cfg_utils.c Visa fil

@@ -163,6 +163,7 @@ init_defaults (struct config_file *cfg)
cfg->memcached_protocol = TCP_TEXT;

cfg->workers_number = DEFAULT_WORKERS_NUM;
cfg->max_statfile_size = DEFAULT_STATFILE_SIZE;
cfg->modules_opts = g_hash_table_new (g_str_hash, g_str_equal);
cfg->variables = g_hash_table_new (g_str_hash, g_str_equal);
cfg->metrics = g_hash_table_new (g_str_hash, g_str_equal);

+ 159
- 34
src/controller.c Visa fil

@@ -22,6 +22,7 @@
#include "upstream.h"
#include "cfg_file.h"
#include "modules.h"
#include "tokenizers/tokenizers.h"

#define CRLF "\r\n"

@@ -32,6 +33,7 @@ enum command_type {
COMMAND_STAT,
COMMAND_SHUTDOWN,
COMMAND_UPTIME,
COMMAND_LEARN,
};

struct controller_command {
@@ -47,6 +49,7 @@ static struct controller_command commands[] = {
{"stat", 0, COMMAND_STAT},
{"shutdown", 1, COMMAND_SHUTDOWN},
{"uptime", 0, COMMAND_UPTIME},
{"learn", 1, COMMAND_LEARN},
};

static GCompletion *comp;
@@ -112,14 +115,16 @@ check_auth (struct controller_command *cmd, struct controller_session *session)
static void
process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session)
{
char out_buf[512], *arg;
char out_buf[512], *arg, *err_str;
int r = 0, days, hours, minutes;
time_t uptime;
unsigned long size = 0;
struct statfile *statfile;

switch (cmd->type) {
case COMMAND_PASSWORD:
arg = *cmd_args;
if (*arg == '\0') {
if (!arg || *arg == '\0') {
msg_debug ("process_command: empty password passed");
r = snprintf (out_buf, sizeof (out_buf), "password command requires one argument" CRLF);
bufferevent_write (session->bev, out_buf, r);
@@ -185,10 +190,104 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
minutes, minutes > 1 ? "s" : " ",
(int)uptime, uptime > 1 ? "s" : " ");
}
bufferevent_write (session->bev, out_buf, r);
}
bufferevent_write (session->bev, out_buf, r);
break;
case COMMAND_LEARN:
if (check_auth (cmd, session)) {
arg = *cmd_args++;
if (!arg || *arg == '\0') {
msg_debug ("process_command: no statfile specified in learn command");
r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF);
bufferevent_write (session->bev, out_buf, r);
return;
}
arg = *cmd_args;
if (arg == NULL || *arg == '\0') {
msg_debug ("process_command: no statfile size specified in learn command");
r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF);
bufferevent_write (session->bev, out_buf, r);
return;
}
size = strtoul (arg, &err_str, 10);
if (err_str && *err_str != '\0') {
msg_debug ("process_command: statfile size is invalid: %s", arg);
r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
bufferevent_write (session->bev, out_buf, r);
return;
}
session->learn_buf = memory_pool_alloc (session->session_pool, sizeof (f_str_buf_t));
session->learn_buf->buf = fstralloc (session->session_pool, size);
if (session->learn_buf->buf == NULL) {
r = snprintf (out_buf, sizeof (out_buf), "allocating buffer for learn failed" CRLF);
bufferevent_write (session->bev, out_buf, r);
return;
}

statfile = g_hash_table_lookup (session->cfg->statfiles, arg);
if (statfile == NULL) {
r = snprintf (out_buf, sizeof (out_buf), "statfile %s is not defined" CRLF, arg);
bufferevent_write (session->bev, out_buf, r);
return;

}
session->learn_rcpt = NULL;
session->learn_from = NULL;
session->learn_filename = NULL;
session->learn_tokenizer = get_tokenizer ("osb-text");
/* Get all arguments */
while (*cmd_args++) {
arg = *cmd_args;
if (*arg == '-') {
switch (*(arg + 1)) {
case 'r':
arg = *(cmd_args + 1);
if (!arg || *arg == '\0') {
r = snprintf (out_buf, sizeof (out_buf), "recipient is not defined" CRLF, arg);
bufferevent_write (session->bev, out_buf, r);
return;
}
session->learn_rcpt = memory_pool_strdup (session->session_pool, arg);
break;
case 'f':
arg = *(cmd_args + 1);
if (!arg || *arg == '\0') {
r = snprintf (out_buf, sizeof (out_buf), "from is not defined" CRLF, arg);
bufferevent_write (session->bev, out_buf, r);
return;
}
session->learn_from = memory_pool_strdup (session->session_pool, arg);
break;
case 't':
arg = *(cmd_args + 1);
if (!arg || *arg == '\0' || (session->learn_tokenizer = get_tokenizer (arg)) == NULL) {
r = snprintf (out_buf, sizeof (out_buf), "tokenizer is not defined" CRLF, arg);
bufferevent_write (session->bev, out_buf, r);
return;
}
break;
}
}
}
session->learn_filename = resolve_stat_filename (session->session_pool, statfile->pattern, session->learn_rcpt, session->learn_from);
if (statfile_pool_open (session->worker->srv->statfile_pool, session->learn_filename) == -1) {
/* Try to create statfile */
if (statfile_pool_create (session->worker->srv->statfile_pool, session->learn_filename, statfile->size) == -1) {
r = snprintf (out_buf, sizeof (out_buf), "cannot create statfile %s" CRLF, session->learn_filename);
bufferevent_write (session->bev, out_buf, r);
return;
}
if (statfile_pool_open (session->worker->srv->statfile_pool, session->learn_filename) == -1) {
r = snprintf (out_buf, sizeof (out_buf), "cannot open statfile %s" CRLF, session->learn_filename);
bufferevent_write (session->bev, out_buf, r);
return;
}
}
session->state = STATE_LEARN;
r = snprintf (out_buf, sizeof (out_buf), "ok" CRLF);
bufferevent_write (session->bev, out_buf, r);
break;
}
}
}

@@ -200,39 +299,64 @@ read_socket (struct bufferevent *bev, void *arg)
char *s, **params, *cmd, out_buf[128];
GList *comp_list;

s = evbuffer_readline (EVBUFFER_INPUT (bev));
if (s != NULL && *s != 0) {
len = strlen (s);
/* Remove end of line characters from string */
if (s[len - 1] == '\n') {
if (s[len - 2] == '\r') {
s[len - 2] = 0;
switch (session->state) {
case STATE_COMMAND:
s = evbuffer_readline (EVBUFFER_INPUT (bev));
if (s != NULL && *s != 0) {
len = strlen (s);
/* Remove end of line characters from string */
if (s[len - 1] == '\n') {
if (s[len - 2] == '\r') {
s[len - 2] = 0;
}
s[len - 1] = 0;
}
params = g_strsplit (s, " ", -1);
len = g_strv_length (params);
if (len > 0) {
cmd = g_strstrip (params[0]);
comp_list = g_completion_complete (comp, cmd, NULL);
switch (g_list_length (comp_list)) {
case 1:
process_command ((struct controller_command *)comp_list->data, &params[1], session);
break;
case 0:
i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
bufferevent_write (bev, out_buf, i);
break;
default:
i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF);
bufferevent_write (bev, out_buf, i);
break;
}
}
g_strfreev (params);
}
s[len - 1] = 0;
}
params = g_strsplit (s, " ", -1);
len = g_strv_length (params);
if (len > 0) {
cmd = g_strstrip (params[0]);
comp_list = g_completion_complete (comp, cmd, NULL);
switch (g_list_length (comp_list)) {
case 1:
process_command ((struct controller_command *)comp_list->data, &params[1], session);
break;
case 0:
i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
bufferevent_write (bev, out_buf, i);
break;
default:
i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF);
if (s != NULL) {
free (s);
}
break;
case STATE_LEARN:
i = bufferevent_read (bev, session->learn_buf->pos, session->learn_buf->free);
if (i > 0) {
session->learn_buf->pos += i;
update_buf_size (session->learn_buf);
if (session->learn_buf->free == 0) {
/* XXX: require to insert real learning code here */
session->worker->srv->stat->messages_learned ++;
i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
bufferevent_write (bev, out_buf, i);
session->state = STATE_COMMAND;
break;
}
}
}
g_strfreev (params);
}
if (s != NULL) {
free (s);
else {
i = snprintf (out_buf, sizeof (out_buf), "read error: %d" CRLF, i);
bufferevent_write (bev, out_buf, i);
bufferevent_disable (bev, EV_READ);
free_session (session);
}
break;
}
}

@@ -252,7 +376,7 @@ static void
err_socket (struct bufferevent *bev, short what, void *arg)
{
struct controller_session *session = (struct controller_session *)arg;
msg_info ("closing connection");
msg_info ("closing control connection");
/* Free buffers */
free_session (session);
}
@@ -282,6 +406,7 @@ accept_socket (int fd, short what, void *arg)
new_session->worker = worker;
new_session->sock = nfd;
new_session->cfg = worker->srv->cfg;
new_session->state = STATE_COMMAND;
new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1);
memory_pool_add_destructor (new_session->session_pool, (pool_destruct_func)bufferevent_free, new_session->bev);
worker->srv->stat->control_connections_count ++;

+ 3
- 0
src/main.c Visa fil

@@ -318,6 +318,9 @@ main (int argc, char **argv)
TAILQ_INIT (&rspamd->workers);

setproctitle ("main process");

/* Init statfile pool */
rspamd->statfile_pool = statfile_pool_new (cfg->max_statfile_size);
for (i = 0; i < cfg->workers_number; i++) {
fork_worker (rspamd, listen_sock, 0, TYPE_WORKER);

+ 12
- 0
src/main.h Visa fil

@@ -21,6 +21,7 @@

#include "fstring.h"
#include "mem_pool.h"
#include "statfile.h"
#include "url.h"
#include "memcached.h"
#include "protocol.h"
@@ -81,6 +82,7 @@ struct rspamd_worker {

struct pidfh;
struct config_file;
struct tokenizer;

/* Server statistics */
struct rspamd_stat {
@@ -103,6 +105,7 @@ struct rspamd_main {
struct rspamd_stat *stat;

memory_pool_t *server_pool;
statfile_pool_t *statfile_pool;

TAILQ_HEAD (workq, rspamd_worker) workers;
};
@@ -122,12 +125,21 @@ struct save_point {
/* Control session */
struct controller_session {
struct rspamd_worker *worker;
enum {
STATE_COMMAND,
STATE_LEARN,
} state;
int sock;
/* Access to authorized commands */
int authorized;
memory_pool_t *session_pool;
struct bufferevent *bev;
struct config_file *cfg;
char *learn_rcpt;
char *learn_from;
struct tokenizer *learn_tokenizer;
char *learn_filename;
f_str_buf_t *learn_buf;
};

/* Worker task structure */

+ 2
- 2
src/tokenizers/osb.c Visa fil

@@ -21,7 +21,7 @@ static const int primes[] = {
};

token_list_t *
osb_tokenize_text (memory_pool_t *pool, f_str_t *input)
osb_tokenize_text (struct tokenizer *tokenizer, memory_pool_t *pool, f_str_t *input)
{
token_list_t *new = NULL, *head = NULL, *last = NULL;
f_str_t token = { NULL, 0, 0 };
@@ -33,7 +33,7 @@ osb_tokenize_text (memory_pool_t *pool, f_str_t *input)
hashpipe[i] = 0xABCDEF;
}

while (get_next_word (input, &token)) {
while (tokenizer->get_next_word (input, &token)) {
/* Shift hashpipe */
for (i = FEATURE_WINDOW_SIZE - 1; i > 0; i --) {
hashpipe[i] = hashpipe[i - 1];

+ 18
- 0
src/tokenizers/tokenizers.c Visa fil

@@ -5,6 +5,24 @@
#include <sys/types.h>
#include "tokenizers.h"

struct tokenizer tokenizers[] = {
{"osb-text", osb_tokenize_text, get_next_word },
};

struct tokenizer*
get_tokenizer (char *name)
{
int i;

for (i = 0; i < sizeof (tokenizers) / sizeof (tokenizers[0]); i ++) {
if (strcmp (tokenizers[i].name, name) == 0) {
return &tokenizers[i];
}
}

return NULL;
}

/* Get next word from specified f_str_t buf */
f_str_t *
get_next_word (f_str_t *buf, f_str_t *token)

+ 15
- 0
src/tokenizers/tokenizers.h Visa fil

@@ -20,8 +20,23 @@ typedef struct token_list_s {
struct token_list_s *next;
} token_list_t;


/* Common tokenizer structure */
struct tokenizer {
char *name;
token_list_t* (*tokenize_func)(struct tokenizer *tokenizer, memory_pool_t *pool, f_str_t *input);
f_str_t* (*get_next_word)(f_str_t *buf, f_str_t *token);
};

/* Get tokenizer structure by name or return NULL if this name is not found */
struct tokenizer* get_tokenizer (char *name);
/* Get next word from specified f_str_t buf */
f_str_t *get_next_word (f_str_t *buf, f_str_t *token);
/* OSB tokenize function */
token_list_t* osb_tokenize_text (struct tokenizer *tokenizer, memory_pool_t *pool, f_str_t *input);

/* Array of all defined tokenizers */
extern struct tokenizer tokenizers[];

#endif
/*

Laddar…
Avbryt
Spara