Browse Source

Rework fuzzy storage radix.

tags/0.7.2
Vsevolod Stakhov 9 years ago
parent
commit
a8c9abd046
5 changed files with 47 additions and 55 deletions
  1. 27
    37
      src/fuzzy_storage.c
  2. 2
    2
      src/libutil/map.c
  3. 2
    2
      src/libutil/radix.c
  4. 12
    10
      src/libutil/radix.h
  5. 4
    4
      test/rspamd_radix_test.c

+ 27
- 37
src/fuzzy_storage.c View File

@@ -93,7 +93,7 @@ struct rspamd_fuzzy_storage_ctx {
gdouble expire;
guint32 frequent_score;
guint32 max_mods;
radix_tree_t *update_ips;
radix_compressed_t *update_ips;
gchar *update_map;
struct event_base *ev_base;
rspamd_rwlock_t *tree_lock;
@@ -114,14 +114,8 @@ struct fuzzy_session {
struct fuzzy_cmd cmd;
gint fd;
u_char *pos;
socklen_t salen;
guint64 time;
union {
struct sockaddr ss;
struct sockaddr_storage sa;
struct sockaddr_in s4;
struct sockaddr_in6 v6;
} client_addr;
rspamd_inet_addr_t addr;
struct rspamd_fuzzy_storage_ctx *ctx;
};

@@ -775,13 +769,8 @@ static gboolean
check_fuzzy_client (struct fuzzy_session *session)
{
if (session->ctx->update_ips != NULL) {
/* XXX: cannot work with ipv6 addresses */
if (session->client_addr.ss.sa_family != AF_INET) {
return FALSE;
}
if (radix32tree_find (session->ctx->update_ips,
ntohl (session->client_addr.s4.sin_addr.s_addr)) ==
RADIX_NO_VALUE) {
if (radix_find_compressed_addr (session->ctx->update_ips,
&session->addr) == RADIX_NO_VALUE) {
return FALSE;
}
}
@@ -789,21 +778,21 @@ check_fuzzy_client (struct fuzzy_session *session)
return TRUE;
}

#define CMD_PROCESS(x) \
do { \
if (process_ ## x ## _command (&session->cmd, session->time, \
session->worker->ctx)) { \
if (sendto (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1, 0, \
&session->client_addr.ss, session->salen) == -1) { \
msg_err ("error while writing reply: %s", strerror (errno)); \
} \
} \
else { \
if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, \
&session->client_addr.ss, session->salen) == -1) { \
msg_err ("error while writing reply: %s", strerror (errno)); \
} \
} \
#define CMD_PROCESS(x) \
do { \
if (process_ ## x ## _command (&session->cmd, session->time, \
session->worker->ctx)) { \
if (sendto (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1, 0, \
&session->addr.addr.sa, session->addr.slen) == -1) { \
msg_err ("error while writing reply: %s", strerror (errno)); \
} \
} \
else { \
if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, \
&session->addr.addr.sa, session->addr.slen) == -1) { \
msg_err ("error while writing reply: %s", strerror (errno)); \
} \
} \
} while (0)

static void
@@ -821,13 +810,13 @@ process_fuzzy_command (struct fuzzy_session *session)
if (r != 0) {
r = rspamd_snprintf (buf, sizeof (buf), "OK %d %d" CRLF, r, flag);
if (sendto (session->fd, buf, r, 0,
&session->client_addr.ss, session->salen) == -1) {
&session->addr.addr.sa, session->addr.slen) == -1) {
msg_err ("error while writing reply: %s", strerror (errno));
}
}
else {
if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0,
&session->client_addr.ss, session->salen) == -1) {
&session->addr.addr.sa, session->addr.slen) == -1) {
msg_err ("error while writing reply: %s", strerror (errno));
}
}
@@ -837,7 +826,7 @@ process_fuzzy_command (struct fuzzy_session *session)
msg_info ("try to insert a hash from an untrusted address");
if (sendto (session->fd, "UNAUTH" CRLF, sizeof ("UNAUTH" CRLF) - 1,
0,
&session->client_addr.ss, session->salen) == -1) {
&session->addr.addr.sa, session->addr.slen) == -1) {
msg_err ("error while writing reply: %s", strerror (errno));
}
}
@@ -850,7 +839,7 @@ process_fuzzy_command (struct fuzzy_session *session)
msg_info ("try to delete a hash from an untrusted address");
if (sendto (session->fd, "UNAUTH" CRLF, sizeof ("UNAUTH" CRLF) - 1,
0,
&session->client_addr.ss, session->salen) == -1) {
&session->addr.addr.sa, session->addr.slen) == -1) {
msg_err ("error while writing reply: %s", strerror (errno));
}
}
@@ -860,7 +849,7 @@ process_fuzzy_command (struct fuzzy_session *session)
break;
default:
if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0,
&session->client_addr.ss, session->salen) == -1) {
&session->addr.addr.sa, session->addr.slen) == -1) {
msg_err ("error while writing reply: %s", strerror (errno));
}
break;
@@ -890,14 +879,14 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
session.worker = worker;
session.fd = fd;
session.pos = (u_char *) &session.cmd;
session.salen = sizeof (session.client_addr);
session.addr.slen = sizeof (session.addr.addr);
session.ctx = worker->ctx;
session.time = (guint64)time (NULL);

/* Got some data */
if (what == EV_READ) {
while ((r = recvfrom (fd, session.pos, sizeof (struct fuzzy_cmd),
MSG_WAITALL, &session.client_addr.ss, &session.salen)) == -1) {
MSG_WAITALL, &session.addr.addr.sa, &session.addr.slen)) == -1) {
if (errno == EINTR) {
continue;
}
@@ -906,6 +895,7 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
strerror (errno));
return;
}
session.addr.af = session.addr.addr.sa.sa_family;
if (r == sizeof (struct fuzzy_cmd)) {
/* Assume that the whole command was read */
process_fuzzy_command (&session);

+ 2
- 2
src/libutil/map.c View File

@@ -917,7 +917,7 @@ read_radix_list (rspamd_mempool_t * pool,
struct map_cb_data *data)
{
if (data->cur_data == NULL) {
data->cur_data = radix_tree_create_compressed ();
data->cur_data = radix_create_compressed ();
}
return abstract_parse_list (pool,
chunk,
@@ -930,6 +930,6 @@ void
fin_radix_list (rspamd_mempool_t * pool, struct map_cb_data *data)
{
if (data->prev_data) {
radix_tree_destroy_compressed (data->prev_data);
radix_destroy_compressed (data->prev_data);
}
}

+ 2
- 2
src/libutil/radix.c View File

@@ -845,7 +845,7 @@ radix_insert_compressed (radix_compressed_t * tree,


radix_compressed_t *
radix_tree_create_compressed (void)
radix_create_compressed (void)
{
radix_compressed_t *tree;

@@ -862,7 +862,7 @@ radix_tree_create_compressed (void)
}

void
radix_tree_destroy_compressed (radix_compressed_t *tree)
radix_destroy_compressed (radix_compressed_t *tree)
{
rspamd_mempool_delete (tree->pool);
g_slice_free1 (sizeof (*tree), tree);

+ 12
- 10
src/libutil/radix.h View File

@@ -81,14 +81,7 @@ uintptr_t radix32tree_find (radix_tree_t *tree, guint32 key);
*/
uintptr_t radix32_tree_find_addr (radix_tree_t *tree, rspamd_inet_addr_t *addr);

/**
* Find specified address in tree (works for any address)
* @param tree
* @param addr
* @return
*/
uintptr_t radix_find_compressed_addr (radix_compressed_t *tree,
rspamd_inet_addr_t *addr);


/**
* Traverse via the whole tree calling specified callback
@@ -111,8 +104,17 @@ radix_insert_compressed (radix_compressed_t * tree,
uintptr_t radix_find_compressed (radix_compressed_t * tree, guint8 *key,
gsize keylen);

void radix_tree_destroy_compressed (radix_compressed_t *tree);
/**
* Find specified address in tree (works for any address)
* @param tree
* @param addr
* @return
*/
uintptr_t radix_find_compressed_addr (radix_compressed_t *tree,
rspamd_inet_addr_t *addr);

void radix_destroy_compressed (radix_compressed_t *tree);

radix_compressed_t *radix_tree_create_compressed (void);
radix_compressed_t *radix_create_compressed (void);

#endif

+ 4
- 4
test/rspamd_radix_test.c View File

@@ -78,7 +78,7 @@ struct _tv {
static void
rspamd_radix_text_vec (void)
{
radix_compressed_t *tree = radix_tree_create_compressed ();
radix_compressed_t *tree = radix_create_compressed ();
struct _tv *t = &test_vec[0];
struct in_addr ina;
struct in6_addr in6a;
@@ -134,14 +134,14 @@ rspamd_radix_text_vec (void)
t ++;
}

radix_tree_destroy_compressed (tree);
radix_destroy_compressed (tree);
}

void
rspamd_radix_test_func (void)
{
radix_tree_t *tree = radix_tree_create ();
radix_compressed_t *comp_tree = radix_tree_create_compressed ();
radix_compressed_t *comp_tree = radix_create_compressed ();
struct {
guint32 addr;
guint32 mask;
@@ -245,7 +245,7 @@ rspamd_radix_test_func (void)
(ts2.tv_nsec - ts1.tv_nsec) / 1000000.; /* Nanoseconds */

msg_info ("Checked %z elements in %.6f ms", nelts, diff);
radix_tree_destroy_compressed (comp_tree);
radix_destroy_compressed (comp_tree);

g_free (addrs);
}

Loading…
Cancel
Save