Browse Source

Add some comments about maps and logging internals.

tags/0.3.7
Vsevolod Stakhov 13 years ago
parent
commit
0382dce3ed
3 changed files with 159 additions and 5 deletions
  1. 51
    3
      src/logger.c
  2. 62
    1
      src/map.c
  3. 46
    1
      src/map.h

+ 51
- 3
src/logger.c View File

@@ -37,6 +37,10 @@
sig_atomic_t do_reopen_log = 0;
#endif

/**
* Static structure that store logging parameters
* It is NOT shared between processes and is created by main process
*/
typedef struct rspamd_logger_s {
rspamd_log_func_t log_func;
struct config_file *cfg;
@@ -65,6 +69,7 @@ rspamd_logger_t *rspamd_log = NULL;

static const gchar lf_chr = '\n';


static void
syslog_log_function (const gchar * log_domain, const gchar *function,
GLogLevelFlags log_level, const gchar * message,
@@ -74,6 +79,9 @@ file_log_function (const gchar * log_domain, const gchar *function,
GLogLevelFlags log_level, const gchar * message,
gboolean forced, gpointer arg);

/**
* Calculate checksum for log line (used for repeating logic)
*/
static inline guint32
rspamd_log_calculate_cksum (const gchar *message, size_t mlen)
{
@@ -91,6 +99,9 @@ rspamd_log_calculate_cksum (const gchar *message, size_t mlen)
}

/*
* Write a line to log file (unbuffered)
*/
static void
direct_write_log_line (void *data, gint count, gboolean is_iov)
{
@@ -201,6 +212,9 @@ close_log (void)
rspamd_log->enabled = FALSE;
}

/*
* Setup logger
*/
void
rspamd_set_logger (enum rspamd_log_type type, enum process_type ptype, struct config_file *cfg)
{
@@ -298,6 +312,9 @@ reopen_log (void)
return -1;
}

/**
* Used after fork() for updating structure params
*/
void
update_log_pid (enum process_type ptype)
{
@@ -305,6 +322,9 @@ update_log_pid (enum process_type ptype)
rspamd_log->process_type = ptype;
}

/**
* Flush logging buffer
*/
void
flush_log_buf (void)
{
@@ -314,7 +334,9 @@ flush_log_buf (void)
}
}


/**
* This log functions select real logger and write message if level is less or equal to configured log level
*/
void
rspamd_common_log_function (GLogLevelFlags log_level, const gchar *function, const gchar *fmt, ...)
{
@@ -333,7 +355,9 @@ rspamd_common_log_function (GLogLevelFlags log_level, const gchar *function, con
}


/* Fill buffer with message (limits must be checked BEFORE this call) */
/**
* Fill buffer with message (limits must be checked BEFORE this call)
*/
static void
fill_buffer (const struct iovec *iov, gint iovcnt)
{
@@ -346,7 +370,9 @@ fill_buffer (const struct iovec *iov, gint iovcnt)

}

/* Write message to buffer or to file */
/*
* Write message to buffer or to file (using direct_write_log_line function)
*/
static void
file_log_helper (const struct iovec *iov, gint iovcnt)
{
@@ -380,6 +406,9 @@ file_log_helper (const struct iovec *iov, gint iovcnt)
}
}

/**
* Syslog interface for logging
*/
static void
syslog_log_function (const gchar * log_domain, const gchar *function, GLogLevelFlags log_level, const gchar * message, gboolean forced, gpointer arg)
{
@@ -422,6 +451,9 @@ syslog_log_function (const gchar * log_domain, const gchar *function, GLogLevelF
}
}

/**
* Main file interface for logging
*/
static void
file_log_function (const gchar * log_domain, const gchar *function, GLogLevelFlags log_level, const gchar * message, gboolean forced, gpointer arg)
{
@@ -480,6 +512,7 @@ file_log_function (const gchar * log_domain, const gchar *function, GLogLevelFla
}
}
else {
/* Reset counter if new message differs from saved message */
rspamd_log->last_line_cksum = cksum;
if (rspamd_log->repeats > REPEATS_MIN) {
rspamd_snprintf (tmpbuf, sizeof (tmpbuf), "Last message repeated %ud times", rspamd_log->repeats);
@@ -505,6 +538,7 @@ file_log_function (const gchar * log_domain, const gchar *function, GLogLevelFla
now = time (NULL);
}

/* Format time */
tms = localtime (&now);

strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", tms);
@@ -537,6 +571,7 @@ file_log_function (const gchar * log_domain, const gchar *function, GLogLevelFla
else {
r = rspamd_snprintf (tmpbuf, sizeof (tmpbuf), "%s #%P(%s) %s: ", timebuf, rspamd_log->pid, cptype, function);
}
/* Construct IOV for log line */
iov[0].iov_base = tmpbuf;
iov[0].iov_len = r;
iov[1].iov_base = (void *)message;
@@ -544,10 +579,14 @@ file_log_function (const gchar * log_domain, const gchar *function, GLogLevelFla
iov[2].iov_base = (void *)&lf_chr;
iov[2].iov_len = 1;
/* Call helper (for buffering) */
file_log_helper (iov, 3);
}
}

/**
* Write log line depending on ip
*/
void
rspamd_conditional_debug (guint32 addr, const gchar *function, const gchar *fmt, ...)
{
@@ -567,6 +606,9 @@ rspamd_conditional_debug (guint32 addr, const gchar *function, const gchar *fmt,
}
}

/**
* Wrapper for glib logger
*/
void
rspamd_glib_log_function (const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer arg)
{
@@ -578,12 +620,18 @@ rspamd_glib_log_function (const gchar *log_domain, GLogLevelFlags log_level, con
}
}

/**
* Temporary turn on debugging
*/
void
rspamd_log_debug ()
{
rspamd_log->is_debug = TRUE;
}

/**
* Turn off temporary debugging
*/
void
rspamd_log_nodebug ()
{

+ 62
- 1
src/map.c View File

@@ -63,6 +63,9 @@ struct http_callback_data {
#define HTTP_CONNECT_TIMEOUT 2
#define HTTP_READ_TIMEOUT 10

/**
* Helper for HTTP connection establishment
*/
static gint
connect_http (struct rspamd_map *map, struct http_map_data *data, gboolean is_async)
{
@@ -76,6 +79,9 @@ connect_http (struct rspamd_map *map, struct http_map_data *data, gboolean is_as
return sock;
}

/**
* Write HTTP request
*/
static void
write_http_request (struct rspamd_map *map, struct http_map_data *data, gint sock)
{
@@ -94,6 +100,9 @@ write_http_request (struct rspamd_map *map, struct http_map_data *data, gint soc
}
}

/**
* FSM for parsing HTTP reply
*/
static u_char *
parse_http_reply (u_char * chunk, size_t len, struct http_reply *reply)
{
@@ -182,6 +191,9 @@ parse_http_reply (u_char * chunk, size_t len, struct http_reply *reply)
return s;
}

/**
* Read and parse chunked header
*/
static gint
read_chunk_header (u_char * buf, size_t len, struct http_map_data *data)
{
@@ -190,6 +202,7 @@ read_chunk_header (u_char * buf, size_t len, struct http_map_data *data)

p = chunkbuf;
c = buf;
/* Find hex digits */
while (g_ascii_isxdigit (*c) && p - chunkbuf < sizeof (chunkbuf) - 1) {
*p++ = *c++;
skip++;
@@ -210,6 +223,9 @@ read_chunk_header (u_char * buf, size_t len, struct http_map_data *data)
return skip;
}

/**
* Helper callback for reading chunked reply
*/
static gboolean
read_http_chunked (u_char * buf, size_t len, struct rspamd_map *map, struct http_map_data *data, struct map_cb_data *cbdata)
{
@@ -269,6 +285,9 @@ read_http_chunked (u_char * buf, size_t len, struct rspamd_map *map, struct http
return TRUE;
}

/**
* Callback for reading HTTP reply
*/
static gboolean
read_http_common (struct rspamd_map *map, struct http_map_data *data, struct http_reply *reply, struct map_cb_data *cbdata, gint fd)
{
@@ -291,6 +310,7 @@ read_http_common (struct rspamd_map *map, struct http_map_data *data, struct htt
return TRUE;
}
if (reply->parser_state == 6) {
/* If reply header is parsed successfully, try to read further data */
if (reply->code != 200 && reply->code != 304) {
msg_err ("got error reply from server %s, %d", data->host, reply->code);
return FALSE;
@@ -300,6 +320,7 @@ read_http_common (struct rspamd_map *map, struct http_map_data *data, struct htt
return FALSE;
}
pos = data->read_buf;
/* Check for chunked */
if (!data->chunked && (te = g_hash_table_lookup (reply->headers, "Transfer-Encoding")) != NULL) {
if (g_ascii_strcasecmp (te, "chunked") == 0) {
data->chunked = TRUE;
@@ -308,6 +329,7 @@ read_http_common (struct rspamd_map *map, struct http_map_data *data, struct htt
if (data->chunked) {
return read_http_chunked (data->read_buf, r, map, data, cbdata);
}
/* Read more data */
remain = map->read_callback (map->pool, pos, r, cbdata);
if (remain != NULL && remain != pos + r) {
/* copy remaining data->read_buffer to start of data->read_buffer */
@@ -323,6 +345,9 @@ read_http_common (struct rspamd_map *map, struct http_map_data *data, struct htt
return TRUE;
}

/**
* Sync read of HTTP reply
*/
static void
read_http_sync (struct rspamd_map *map, struct http_map_data *data)
{
@@ -362,6 +387,9 @@ read_http_sync (struct rspamd_map *map, struct http_map_data *data)
g_free (repl);
}

/**
* Callback for reading data from file
*/
static void
read_map_file (struct rspamd_map *map, struct file_map_data *data)
{
@@ -507,6 +535,9 @@ add_map (const gchar *map_line, map_cb_t read_callback, map_fin_cb_t fin_callbac
return TRUE;
}

/**
* FSM for parsing lists
*/
u_char *
abstract_parse_list (memory_pool_t * pool, u_char * chunk, size_t len, struct map_cb_data *data, insert_func func)
{
@@ -523,7 +554,9 @@ abstract_parse_list (memory_pool_t * pool, u_char * chunk, size_t len, struct ma
/* READ_SYMBOL */
case 0:
if (*p == '#') {
/* Got comment */
if (s != str) {
/* Save previous string in lines like: "127.0.0.1 #localhost" */
*s = '\0';
s = memory_pool_strdup (pool, g_strstrip (str));
if (strlen (s) > 0) {
@@ -535,6 +568,7 @@ abstract_parse_list (memory_pool_t * pool, u_char * chunk, size_t len, struct ma
data->state = 1;
}
else if (*p == '\r' || *p == '\n') {
/* Got EOL marker, save stored string */
if (s != str) {
*s = '\0';
s = memory_pool_strdup (pool, g_strstrip (str));
@@ -543,12 +577,14 @@ abstract_parse_list (memory_pool_t * pool, u_char * chunk, size_t len, struct ma
}
s = str;
}
/* Skip EOL symbols */
while ((*p == '\r' || *p == '\n') && p - chunk < len) {
p++;
}
start = p;
}
else {
/* Store new string in s */
*s = *p;
s++;
p++;
@@ -556,6 +592,7 @@ abstract_parse_list (memory_pool_t * pool, u_char * chunk, size_t len, struct ma
break;
/* SKIP_COMMENT */
case 1:
/* Skip comment till end of line */
if (*p == '\r' || *p == '\n') {
while ((*p == '\r' || *p == '\n') && p - chunk < len) {
p++;
@@ -576,6 +613,9 @@ abstract_parse_list (memory_pool_t * pool, u_char * chunk, size_t len, struct ma
return start;
}

/**
* Radix tree helper function
*/
static void
radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value)
{
@@ -587,6 +627,7 @@ radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value)
struct in_addr ina;
gint k;

/* Split string if there are multiple items inside a single string */
strv = g_strsplit_set ((gchar *)key, " ,;", 0);
cur = strv;
while (*cur) {
@@ -594,11 +635,13 @@ radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value)
cur++;
continue;
}
/* Extract ipnet */
ipnet = *cur;
token = strsep (&ipnet, "/");

if (ipnet != NULL) {
errno = 0;
/* Get mask */
k = strtoul (ipnet, &err_str, 10);
if (errno != 0) {
msg_warn ("invalid netmask, error detected on symbol: %s, erorr: %s", err_str, strerror (errno));
@@ -608,14 +651,17 @@ radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value)
msg_warn ("invalid netmask value: %d", k);
k = 32;
}
/* Calculate mask based on CIDR presentation */
mask = mask << (32 - k);
}

/* Check IP */
if (inet_aton (token, &ina) == 0) {
msg_err ("invalid ip address: %s", token);
return;
}

/* Insert ip in a tree */
ip = ntohl ((guint32) ina.s_addr);
k = radix32tree_insert (tree, ip, mask, 1);
if (k == -1) {
@@ -630,6 +676,7 @@ radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value)
g_strfreev (strv);
}

/* Helpers */
u_char *
read_host_list (memory_pool_t * pool, u_char * chunk, size_t len, struct map_cb_data *data)
{
@@ -664,6 +711,9 @@ fin_radix_list (memory_pool_t * pool, struct map_cb_data *data)
}
}

/**
* Common file callback
*/
static void
file_callback (gint fd, short what, void *ud)
{
@@ -678,6 +728,7 @@ file_callback (gint fd, short what, void *ud)
evtimer_add (&map->ev, &map->tv);

if (stat (data->filename, &st) != -1 && st.st_mtime > data->st.st_mtime) {
/* File was modified since last check */
memcpy (&data->st, &st, sizeof (struct stat));
}
else {
@@ -688,6 +739,9 @@ file_callback (gint fd, short what, void *ud)
read_map_file (map, data);
}

/**
* Callback for destroying HTTP callback data
*/
static void
free_http_cbdata (struct http_callback_data *cbd)
{
@@ -700,6 +754,9 @@ free_http_cbdata (struct http_callback_data *cbd)
g_free (cbd);
}

/**
* Async HTTP request parser
*/
static void
http_async_callback (gint fd, short what, void *ud)
{
@@ -732,6 +789,7 @@ http_async_callback (gint fd, short what, void *ud)
return;
}
}
/* Got reply, parse it */
else if (what == EV_READ) {
if (cbd->state >= 1) {
if (!read_http_common (cbd->map, cbd->data, cbd->reply, &cbd->cbdata, cbd->fd)) {
@@ -741,6 +799,7 @@ http_async_callback (gint fd, short what, void *ud)
msg_info ("data is not modified for server %s", cbd->data->host);
}
else if (cbd->cbdata.cur_data != NULL) {
/* Destroy old data and start reading request data */
cbd->map->fin_callback (cbd->map->pool, &cbd->cbdata);
*cbd->map->user_data = cbd->cbdata.cur_data;
cbd->data->last_checked = time (NULL);
@@ -766,7 +825,9 @@ http_async_callback (gint fd, short what, void *ud)
}
}


/**
* Async HTTP callback
*/
static void
http_callback (gint fd, short what, void *ud)
{

+ 46
- 1
src/map.h View File

@@ -5,22 +5,37 @@
#include "mem_pool.h"
#include "radix.h"

/**
* Maps API is designed to load lists data from different dynamic sources.
* It monitor files and HTTP locations for modifications and reload them if they are
* modified.
*/

enum fetch_proto {
PROTO_FILE,
PROTO_HTTP,
};

/**
* Callback data for async load
*/
struct map_cb_data {
gint state;
void *prev_data;
void *cur_data;
};

/**
* Data specific to file maps
*/
struct file_map_data {
const gchar *filename;
struct stat st;
};

/**
* Data specific to HTTP maps
*/
struct http_map_data {
struct in_addr addr;
guint16 port;
@@ -34,9 +49,15 @@ struct http_map_data {
guint32 chunk_read;
};

/**
* Callback types
*/
typedef u_char* (*map_cb_t)(memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
typedef void (*map_fin_cb_t)(memory_pool_t *pool, struct map_cb_data *data);

/**
* Common map object
*/
struct rspamd_map {
memory_pool_t *pool;
enum fetch_proto protocol;
@@ -48,18 +69,42 @@ struct rspamd_map {
void *map_data;
};

/**
* Add map from line
*/
gboolean add_map (const gchar *map_line, map_cb_t read_callback, map_fin_cb_t fin_callback, void **user_data);

/**
* Start watching of maps by adding events to libevent event loop
*/
void start_map_watch (void);

/**
* Remove all maps watched (remove events)
*/
void remove_all_maps (void);

typedef void (*insert_func) (gpointer st, gconstpointer key, gpointer value);

/* Common callbacks */
/**
* Common callbacks for frequent types of lists
*/

/**
* Radix list is a list like ip/mask
*/
u_char* read_radix_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
void fin_radix_list (memory_pool_t *pool, struct map_cb_data *data);

/**
* Host list is an ordinal list of hosts or domains
*/
u_char* read_host_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
void fin_host_list (memory_pool_t *pool, struct map_cb_data *data);

/**
* FSM for lists parsing (support comments, blank lines and partial replies)
*/
u_char * abstract_parse_list (memory_pool_t * pool, u_char * chunk, size_t len, struct map_cb_data *data, insert_func func);

#endif

Loading…
Cancel
Save