Browse Source

* Add support for dynamic configuration to the controller's interface.

* File maps are now being watched even if they don't exist on rspamd start.
Several fixes to dynamic configuration logic.
tags/0.5.3
Vsevolod Stakhov 11 years ago
parent
commit
5d1f19fc99
6 changed files with 342 additions and 44 deletions
  1. 2
    0
      src/cfg_utils.c
  2. 6
    0
      src/cfg_xml.c
  3. 125
    2
      src/controller.c
  4. 167
    34
      src/dynamic_cfg.c
  5. 21
    0
      src/dynamic_cfg.h
  6. 21
    8
      src/map.c

+ 2
- 0
src/cfg_utils.c View File

@@ -34,6 +34,7 @@
#include "lua/lua_common.h"
#include "kvstorage_config.h"
#include "map.h"
#include "dynamic_cfg.h"

#define DEFAULT_SCORE 10.0

@@ -733,6 +734,7 @@ post_load_config (struct config_file *cfg)

/* Lua options */
(void)lua_post_load_config (cfg);
init_dynamic_config (cfg);
}

#if 0

+ 6
- 0
src/cfg_xml.c View File

@@ -312,6 +312,12 @@ static struct xml_parser_rule grammar[] = {
G_STRUCT_OFFSET (struct config_file, map_timeout),
NULL
},
{
"dynamic_conf",
xml_handle_string,
G_STRUCT_OFFSET (struct config_file, dynamic_conf),
NULL
},
NULL_ATTR
},
NULL_DEF_ATTR

+ 125
- 2
src/controller.c View File

@@ -37,6 +37,7 @@
#include "binlog.h"
#include "statfile_sync.h"
#include "lua/lua_common.h"
#include "dynamic_cfg.h"

#define END "END" CRLF

@@ -72,7 +73,9 @@ enum command_type {
COMMAND_SYNC,
COMMAND_WEIGHTS,
COMMAND_GET,
COMMAND_POST
COMMAND_POST,
COMMAND_ADD_SYMBOL,
COMMAND_ADD_ACTION
};

struct controller_command {
@@ -110,7 +113,9 @@ static struct controller_command commands[] = {
{"learn_spam", TRUE, COMMAND_LEARN_SPAM},
{"learn_ham", TRUE, COMMAND_LEARN_HAM},
{"get", FALSE, COMMAND_GET},
{"post", FALSE, COMMAND_POST}
{"post", FALSE, COMMAND_POST},
{"add_symbol", TRUE, COMMAND_ADD_SYMBOL},
{"add_action", TRUE, COMMAND_ADD_ACTION}
};

static GList *custom_commands = NULL;
@@ -503,6 +508,114 @@ process_stat_command (struct controller_session *session)
}
}

static gboolean
process_dynamic_conf_command (gchar **cmd_args, struct controller_session *session, gboolean is_action)
{
struct config_file *cfg = session->cfg;
gchar *arg, *metric, *name, *err_str;
gdouble value;
gboolean res;

if (cfg->dynamic_conf == NULL) {
if (!session->restful) {
return rspamd_dispatcher_write (session->dispatcher, "dynamic config is not specified" CRLF, 0, FALSE, TRUE);
}
else {
return restful_write_reply (500, "dynamic config is not specified", NULL, 0, session->dispatcher);
}
}

if (session->restful) {
if ((arg = g_hash_table_lookup (session->kwargs, "metric")) == NULL) {
metric = DEFAULT_METRIC;
}
else {
metric = arg;
}
if ((arg = g_hash_table_lookup (session->kwargs, "name")) == NULL) {
goto invalid_arguments;
}
name = arg;
if ((arg = g_hash_table_lookup (session->kwargs, "value")) == NULL) {
goto invalid_arguments;
}
value = strtod (arg, &err_str);
if (err_str && *err_str != '\0') {
msg_info ("double value is invalid: %s", arg);
goto invalid_arguments;
}
}
else {
if (cmd_args[0] == NULL || cmd_args[1] == NULL) {
goto invalid_arguments;
}
if (cmd_args[2] == NULL) {
metric = DEFAULT_METRIC;
name = cmd_args[0];
arg = cmd_args[1];
value = strtod (arg, &err_str);
if (err_str && *err_str != '\0') {
msg_info ("double value is invalid: %s", arg);
goto invalid_arguments;
}
}
else {
metric = cmd_args[0];
name = cmd_args[1];
arg = cmd_args[2];
value = strtod (arg, &err_str);
if (err_str && *err_str != '\0') {
msg_info ("double value is invalid: %s", arg);
goto invalid_arguments;
}
}
}

if (is_action) {
res = add_dynamic_action (cfg, metric, name, value);
}
else {
res = add_dynamic_symbol (cfg, metric, name, value);
}

if (res) {

res = dump_dynamic_config (cfg);
if (res) {
if (!session->restful) {
return rspamd_dispatcher_write (session->dispatcher, "OK" CRLF, 0, FALSE, TRUE);
}
else {
return restful_write_reply (200, "OK", NULL, 0, session->dispatcher);
}
}
else {
if (!session->restful) {
return rspamd_dispatcher_write (session->dispatcher, "Error dumping dynamic config" CRLF, 0, FALSE, TRUE);
}
else {
return restful_write_reply (500, "Error dumping dynamic config", NULL, 0, session->dispatcher);
}
}
}
else {
if (!session->restful) {
return rspamd_dispatcher_write (session->dispatcher, "Cannot add dynamic rule" CRLF, 0, FALSE, TRUE);
}
else {
return restful_write_reply (500, "Cannot add dynamic rule", NULL, 0, session->dispatcher);
}
}

invalid_arguments:
if (!session->restful) {
return rspamd_dispatcher_write (session->dispatcher, "Invalid arguments" CRLF, 0, FALSE, TRUE);
}
else {
return restful_write_reply (500, "Invalid arguments", NULL, 0, session->dispatcher);
}
}

static gboolean
process_command (struct controller_command *cmd, gchar **cmd_args, struct controller_session *session)
{
@@ -943,6 +1056,16 @@ process_command (struct controller_command *cmd, gchar **cmd_args, struct contro
case COMMAND_COUNTERS:
rspamd_hash_foreach (rspamd_main->counters, counter_write_callback, session);
break;
case COMMAND_ADD_ACTION:
if (check_auth (cmd, session)) {
return process_dynamic_conf_command (cmd_args, session, TRUE);
}
break;
case COMMAND_ADD_SYMBOL:
if (check_auth (cmd, session)) {
return process_dynamic_conf_command (cmd_args, session, FALSE);
}
break;
}
return TRUE;
}

+ 167
- 34
src/dynamic_cfg.c View File

@@ -156,6 +156,7 @@ json_config_read_cb (memory_pool_t * pool, gchar * chunk, gint len, struct map_c
jb->cfg = ((struct config_json_buf *)data->prev_data)->cfg;
jb->buf = NULL;
jb->pos = NULL;
jb->config_metrics = NULL;
data->cur_data = jb;
}
else {
@@ -246,9 +247,9 @@ json_config_fin_cb (memory_pool_t * pool, struct map_cb_data *data)
continue;
}

cur_nm = json_object_get (cur_elt, "name");
cur_nm = json_object_get (cur_elt, "metric");
if (!cur_nm || !json_is_string (cur_nm)) {
msg_err ("loaded json array element has no 'name' attribute");
msg_err ("loaded json metric object element has no 'metric' attribute");
continue;
}
cur_metric = g_slice_alloc0 (sizeof (struct dynamic_cfg_metric));
@@ -267,6 +268,9 @@ json_config_fin_cb (memory_pool_t * pool, struct map_cb_data *data)
/* Insert symbol */
cur_metric->symbols = g_list_prepend (cur_metric->symbols, cur_symbol);
}
else {
msg_info ("json symbol object has no mandatory 'name' and 'value' attributes");
}
}
}
}
@@ -290,6 +294,9 @@ json_config_fin_cb (memory_pool_t * pool, struct map_cb_data *data)
/* Insert action */
cur_metric->actions = g_list_prepend (cur_metric->actions, cur_action);
}
else {
msg_info ("json symbol object has no mandatory 'name' and 'value' attributes");
}
}
}
}
@@ -314,7 +321,6 @@ json_config_fin_cb (memory_pool_t * pool, struct map_cb_data *data)
void
init_dynamic_config (struct config_file *cfg)
{
struct stat st;
struct config_json_buf *jb, **pjb;

if (cfg->dynamic_conf == NULL) {
@@ -322,17 +328,8 @@ init_dynamic_config (struct config_file *cfg)
return;
}

if (stat (cfg->dynamic_conf, &st) == -1) {
msg_warn ("%s is unavailable: %s", cfg->dynamic_conf, strerror (errno));
return;
}
if (access (cfg->dynamic_conf, W_OK | R_OK) == -1) {
msg_warn ("%s is inaccessible: %s", cfg->dynamic_conf, strerror (errno));
return;
}

/* Now try to add map with json data */
jb = g_malloc (sizeof (struct config_json_buf));
jb = g_malloc0 (sizeof (struct config_json_buf));
pjb = g_malloc (sizeof (struct config_json_buf *));
jb->buf = NULL;
jb->cfg = cfg;
@@ -363,7 +360,7 @@ dump_dynamic_list (gint fd, GList *rules)
cur = rules;
while (cur) {
metric = cur->data;
fprintf (f, "{\n\"metric\": \"%s\"\n", metric->name);
fprintf (f, "{\n \"metric\": \"%s\",\n", metric->name);
if (metric->symbols) {
fprintf (f, " \"symbols\": [\n");
cur_elt = metric->symbols;
@@ -371,10 +368,10 @@ dump_dynamic_list (gint fd, GList *rules)
sym = cur_elt->data;
cur_elt = g_list_next (cur_elt);
if (cur_elt) {
fprintf (f, " {\"name\": \"%s\",\n\"value\": %.2f},\n", sym->name, sym->value);
fprintf (f, " {\"name\": \"%s\",\"value\": %.2f},\n", sym->name, sym->value);
}
else {
fprintf (f, " {\"name\": \"%s\",\n\"value\": %.2f}\n", sym->name, sym->value);
fprintf (f, " {\"name\": \"%s\",\"value\": %.2f}\n", sym->name, sym->value);
}
}
if (metric->actions) {
@@ -392,10 +389,10 @@ dump_dynamic_list (gint fd, GList *rules)
act = cur_elt->data;
cur_elt = g_list_next (cur_elt);
if (cur_elt) {
fprintf (f, " {\"name\": \"%s\",\n\"value\": %.2f},\n", str_action_metric (act->action), act->value);
fprintf (f, " {\"name\": \"%s\",\"value\": %.2f},\n", str_action_metric (act->action), act->value);
}
else {
fprintf (f, " {\"name\": \"%s\",\n\"value\": %.2f}\n", str_action_metric (act->action), act->value);
fprintf (f, " {\"name\": \"%s\",\"value\": %.2f}\n", str_action_metric (act->action), act->value);
}
}
fprintf (f, " ]\n");
@@ -431,15 +428,6 @@ dump_dynamic_config (struct config_file *cfg)
return FALSE;
}

if (stat (cfg->dynamic_conf, &st) == -1) {
msg_warn ("%s is unavailable: %s", cfg->dynamic_conf, strerror (errno));
return FALSE;
}
if (access (cfg->dynamic_conf, W_OK | R_OK) == -1) {
msg_warn ("%s is inaccessible: %s", cfg->dynamic_conf, strerror (errno));
return FALSE;
}

dir = g_path_get_dirname (cfg->dynamic_conf);
if (dir == NULL) {
/* Inaccessible path */
@@ -450,7 +438,17 @@ dump_dynamic_config (struct config_file *cfg)
return FALSE;
}

if (stat (cfg->dynamic_conf, &st) == -1) {
msg_debug ("%s is unavailable: %s", cfg->dynamic_conf, strerror (errno));
st.st_mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
}
if (access (dir, W_OK | R_OK) == -1) {
msg_warn ("%s is inaccessible: %s", dir, strerror (errno));
g_free (dir);
return FALSE;
}
rspamd_snprintf (pathbuf, sizeof (pathbuf), "%s%crconf-XXXXXX", dir, G_DIR_SEPARATOR);
g_free (dir);
#ifdef HAVE_MKSTEMP
/* Umask is set before */
fd = mkstemp (pathbuf);
@@ -469,21 +467,17 @@ dump_dynamic_config (struct config_file *cfg)
return FALSE;
}

if (unlink (cfg->dynamic_conf) == -1) {
msg_err ("unlink error: %s", strerror (errno));
close (fd);
unlink (pathbuf);
return FALSE;
}
(void)unlink (cfg->dynamic_conf);

/* Rename old config */
if (rename (pathbuf, cfg->dynamic_conf) == -1) {
msg_err ("unlink error: %s", strerror (errno));
msg_err ("rename error: %s", strerror (errno));
close (fd);
unlink (pathbuf);
return FALSE;
}
/* Set permissions */

if (chmod (cfg->dynamic_conf, st.st_mode) == -1) {
msg_warn ("chmod failed: %s", strerror (errno));
}
@@ -491,3 +485,142 @@ dump_dynamic_config (struct config_file *cfg)
close (fd);
return TRUE;
}

/**
* Add symbol for specified metric
* @param cfg config file object
* @param metric metric's name
* @param symbol symbol's name
* @param value value of symbol
* @return
*/
gboolean
add_dynamic_symbol (struct config_file *cfg, const gchar *metric_name, const gchar *symbol, gdouble value)
{
GList *cur;
struct dynamic_cfg_metric *metric = NULL;
struct dynamic_cfg_symbol *sym;

if (cfg->dynamic_conf == NULL) {
msg_info ("dynamic conf is disabled");
return FALSE;
}

cur = cfg->current_dynamic_conf;
while (cur) {
metric = cur->data;
if (g_ascii_strcasecmp (metric->name, metric_name) == 0) {
break;
}
metric = NULL;
cur = g_list_next (cur);
}

if (metric != NULL) {
/* Search for a symbol */
cur = metric->symbols;
while (cur) {
sym = cur->data;
if (g_ascii_strcasecmp (sym->name, symbol) == 0) {
sym->value = value;
break;
}
sym = NULL;
cur = g_list_next (cur);
}
if (sym == NULL) {
/* Symbol not found, insert it */
sym = g_slice_alloc (sizeof (struct dynamic_cfg_symbol));
sym->name = g_strdup (symbol);
sym->value = value;
metric->symbols = g_list_prepend (metric->symbols, sym);
}
}
else {
/* Metric not found, create it */
metric = g_slice_alloc0 (sizeof (struct dynamic_cfg_metric));
sym = g_slice_alloc (sizeof (struct dynamic_cfg_symbol));
sym->name = g_strdup (symbol);
sym->value = value;
metric->symbols = g_list_prepend (metric->symbols, sym);
metric->name = g_strdup (metric_name);
cfg->current_dynamic_conf = g_list_prepend (cfg->current_dynamic_conf, metric);
}

apply_dynamic_conf (cfg->current_dynamic_conf, cfg);

return TRUE;
}


/**
* Add action for specified metric
* @param cfg config file object
* @param metric metric's name
* @param action action's name
* @param value value of symbol
* @return
*/
gboolean
add_dynamic_action (struct config_file *cfg, const gchar *metric_name, const gchar *action, gdouble value)
{
GList *cur;
struct dynamic_cfg_metric *metric = NULL;
struct dynamic_cfg_action *act;
gint real_act;

if (cfg->dynamic_conf == NULL) {
msg_info ("dynamic conf is disabled");
return FALSE;
}

if (!check_action_str (action, &real_act)) {
msg_info ("invalid action string: %s", action);
return FALSE;
}

cur = cfg->current_dynamic_conf;
while (cur) {
metric = cur->data;
if (g_ascii_strcasecmp (metric->name, metric_name) == 0) {
break;
}
metric = NULL;
cur = g_list_next (cur);
}

if (metric != NULL) {
/* Search for a symbol */
cur = metric->symbols;
while (cur) {
act = cur->data;
if ((gint)act->action == real_act) {
act->value = value;
break;
}
act = NULL;
cur = g_list_next (cur);
}
if (act == NULL) {
/* Action not found, insert it */
act = g_slice_alloc (sizeof (struct dynamic_cfg_symbol));
act->action = real_act;
act->value = value;
metric->actions = g_list_prepend (metric->symbols, act);
}
}
else {
/* Metric not found, create it */
metric = g_slice_alloc0 (sizeof (struct dynamic_cfg_metric));
act = g_slice_alloc (sizeof (struct dynamic_cfg_symbol));
act->action = real_act;
act->value = value;
metric->actions = g_list_prepend (metric->symbols, act);
metric->name = g_strdup (metric_name);
cfg->current_dynamic_conf = g_list_prepend (cfg->current_dynamic_conf, metric);
}

apply_dynamic_conf (cfg->current_dynamic_conf, cfg);

return TRUE;
}

+ 21
- 0
src/dynamic_cfg.h View File

@@ -41,5 +41,26 @@ void init_dynamic_config (struct config_file *cfg);
*/
gboolean dump_dynamic_config (struct config_file *cfg);

/**
* Add symbol for specified metric
* @param cfg config file object
* @param metric metric's name
* @param symbol symbol's name
* @param value value of symbol
* @return
*/
gboolean add_dynamic_symbol (struct config_file *cfg, const gchar *metric, const gchar *symbol, gdouble value);


/**
* Add action for specified metric
* @param cfg config file object
* @param metric metric's name
* @param action action's name
* @param value value of symbol
* @return
*/
gboolean add_dynamic_action (struct config_file *cfg, const gchar *metric, const gchar *action, gdouble value);


#endif /* DYNAMIC_CFG_H_ */

+ 21
- 8
src/map.c View File

@@ -761,7 +761,7 @@ file_callback (gint fd, short what, void *ud)
map->tv.tv_usec = 0;
evtimer_add (&map->ev, &map->tv);

if (stat (data->filename, &st) != -1 && st.st_mtime > data->st.st_mtime) {
if (stat (data->filename, &st) != -1 && (st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) {
/* File was modified since last check */
memcpy (&data->st, &st, sizeof (struct stat));
}
@@ -909,6 +909,7 @@ start_map_watch (struct config_file *cfg, struct event_base *ev_base)
{
GList *cur = cfg->maps;
struct rspamd_map *map;
struct file_map_data *fdata;

/* First of all do synced read of data */
while (cur) {
@@ -918,7 +919,11 @@ start_map_watch (struct config_file *cfg, struct event_base *ev_base)
evtimer_set (&map->ev, file_callback, map);
event_base_set (map->ev_base, &map->ev);
/* Read initial data */
read_map_file (map, map->map_data);
fdata = map->map_data;
if (fdata->st.st_mtime != -1) {
/* Do not try to read non-existent file */
read_map_file (map, map->map_data);
}
/* Plan event with jitter */
map->tv.tv_sec = (map->cfg->map_timeout + map->cfg->map_timeout * g_random_double ()) / 2.;
map->tv.tv_usec = 0;
@@ -986,7 +991,7 @@ add_map (struct config_file *cfg, const gchar *map_line, map_cb_t read_callback,
struct file_map_data *fdata;
struct http_map_data *hdata;
gchar portbuf[6];
gint i, s, fd;
gint i, s;
struct hostent *hent;

/* First of all detect protocol line */
@@ -1006,13 +1011,21 @@ add_map (struct config_file *cfg, const gchar *map_line, map_cb_t read_callback,

/* Now check for each proto separately */
if (proto == PROTO_FILE) {
if ((fd = open (def, O_RDONLY)) == -1) {
msg_warn ("cannot open file '%s': %s", def, strerror (errno));
return FALSE;
}
fdata = memory_pool_alloc0 (cfg->map_pool, sizeof (struct file_map_data));
if (access (def, R_OK) == -1) {
if (errno != ENOENT) {
msg_err ("cannot open file '%s': %s", def, strerror (errno));
return FALSE;

}
msg_info ("map '%s' is not found, but it can be loaded automatically later", def);
/* We still can add this file */
fdata->st.st_mtime = -1;
}
else {
stat (def, &fdata->st);
}
fdata->filename = memory_pool_strdup (cfg->map_pool, def);
fstat (fd, &fdata->st);
new_map->map_data = fdata;
}
else if (proto == PROTO_HTTP) {

Loading…
Cancel
Save