summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/dns.c306
-rw-r--r--src/dns.h1
-rw-r--r--src/logger.h2
-rw-r--r--src/main.h3
-rw-r--r--src/mem_pool.c12
-rw-r--r--src/statfile.c8
-rw-r--r--src/worker.c93
7 files changed, 317 insertions, 108 deletions
diff --git a/src/dns.c b/src/dns.c
index c0282f66f..12cc63b16 100644
--- a/src/dns.c
+++ b/src/dns.c
@@ -49,6 +49,8 @@
#define UDP_PACKET_SIZE 512
+#define DNS_COMPRESSION_BITS 0xC0
+
/*
* P E R M U T A T I O N G E N E R A T O R
*/
@@ -292,13 +294,15 @@ try_compress_label (memory_pool_t *pool, guint8 *target, guint8 *start, guint8 l
{
GList *cur;
struct dns_name_table *tbl;
+ guint16 pointer;
cur = table;
while (cur) {
tbl = cur->data;
if (tbl->len == len) {
if (memcmp (label, tbl->label, len) == 0) {
- *target = tbl->off | 0xC0;
+ pointer = htons ((guint16)tbl->off | 0xC0);
+ memcpy (target, &pointer, sizeof (pointer));
return TRUE;
}
}
@@ -337,7 +341,7 @@ make_dns_header (struct rspamd_dns_request *req)
memset (header, 0 , sizeof (struct dns_header));
header->qid = dns_k_permutor_step (req->resolver->permutor);
header->rd = 1;
- header->qdcount = 1;
+ header->qdcount = htons (1);
req->pos += sizeof (struct dns_header);
req->id = header->qid;
}
@@ -345,7 +349,7 @@ make_dns_header (struct rspamd_dns_request *req)
static void
format_dns_name (struct rspamd_dns_request *req, const char *name, guint namelen)
{
- guint8 *pos = req->packet + req->pos, *begin, *end;
+ guint8 *pos = req->packet + req->pos, *end, *dot, *begin;
guint remain = req->packet_len - req->pos - 5, label_len;
GList *table = NULL;
@@ -354,9 +358,11 @@ format_dns_name (struct rspamd_dns_request *req, const char *name, guint namelen
}
begin = (guint8 *)name;
+ end = (guint8 *)name + namelen;
for (;;) {
- end = strchr (begin, '.');
- if (end) {
+ dot = strchr (begin, '.');
+ if (dot) {
+ label_len = dot - begin;
if (label_len > DNS_D_MAXLABEL) {
msg_err ("dns name component is longer than 63 bytes, should be stripped");
label_len = DNS_D_MAXLABEL;
@@ -367,20 +373,17 @@ format_dns_name (struct rspamd_dns_request *req, const char *name, guint namelen
}
/* First try to compress name */
if (! try_compress_label (req->pool, pos, req->packet, end - begin, begin, table)) {
- label_len = end - begin;
-
*pos++ = (guint8)label_len;
memcpy (pos, begin, label_len);
pos += label_len;
- remain -= label_len + 1;
- begin = end + 1;
}
else {
- pos ++;
+ pos += 2;
}
+ remain -= label_len + 1;
+ begin = dot + 1;
}
else {
- end = (guint8 *)name + namelen;
label_len = end - begin;
if (label_len == 0) {
/* If name is ended with dot */
@@ -407,7 +410,7 @@ format_dns_name (struct rspamd_dns_request *req, const char *name, guint namelen
}
/* Termination label */
*(++pos) = '\0';
- req->pos += pos - (req->packet + req->pos) + 1;
+ req->pos += pos - (req->packet + req->pos);
if (table != NULL) {
g_list_free (table);
}
@@ -429,9 +432,9 @@ make_ptr_req (struct rspamd_dns_request *req, struct in_addr addr)
allocate_packet (req, r);
make_dns_header (req);
format_dns_name (req, ipbuf, r);
- p = (guint16 *)req->packet + req->pos;
- *p++ = htons (DNS_C_IN);
- *p = htons (DNS_T_PTR);
+ p = (guint16 *)(req->packet + req->pos);
+ *p++ = htons (DNS_T_PTR);
+ *p = htons (DNS_C_IN);
req->pos += sizeof (guint16) * 2;
req->type = DNS_REQUEST_PTR;
}
@@ -444,9 +447,9 @@ make_a_req (struct rspamd_dns_request *req, const char *name)
allocate_packet (req, strlen (name));
make_dns_header (req);
format_dns_name (req, name, 0);
- p = (guint16 *)req->packet + req->pos;
- *p++ = htons (DNS_C_IN);
- *p = htons (DNS_T_A);
+ p = (guint16 *)(req->packet + req->pos);
+ *p++ = htons (DNS_T_A);
+ *p = htons (DNS_C_IN);
req->pos += sizeof (guint16) * 2;
req->type = DNS_REQUEST_A;
}
@@ -459,9 +462,9 @@ make_txt_req (struct rspamd_dns_request *req, const char *name)
allocate_packet (req, strlen (name));
make_dns_header (req);
format_dns_name (req, name, 0);
- p = (guint16 *)req->packet + req->pos;
- *p++ = htons (DNS_C_IN);
- *p = htons (DNS_T_A);
+ p = (guint16 *)(req->packet + req->pos);
+ *p++ = htons (DNS_T_TXT);
+ *p = htons (DNS_C_IN);
req->pos += sizeof (guint16) * 2;
req->type = DNS_REQUEST_TXT;
}
@@ -474,9 +477,9 @@ make_mx_req (struct rspamd_dns_request *req, const char *name)
allocate_packet (req, strlen (name));
make_dns_header (req);
format_dns_name (req, name, 0);
- p = (guint16 *)req->packet + req->pos;
- *p++ = htons (DNS_C_IN);
- *p = htons (DNS_T_A);
+ p = (guint16 *)(req->packet + req->pos);
+ *p++ = htons (DNS_T_MX);
+ *p = htons (DNS_C_IN);
req->pos += sizeof (guint16) * 2;
req->type = DNS_REQUEST_MX;
}
@@ -515,24 +518,25 @@ dns_fin_cb (gpointer arg)
{
struct rspamd_dns_request *req = arg;
- /* XXX: call callback if possible */
+ g_hash_table_remove (req->resolver->requests, GUINT_TO_POINTER (req->id));
}
static guint8 *
-decompress_label (guint8 *begin, guint8 *len)
+decompress_label (guint8 *begin, guint16 *len)
{
- guint8 offset;
- offset = (*len) ^ 0xC0;
+ guint16 offset;
+ offset = ntohs ((*len) ^ DNS_COMPRESSION_BITS);
*len = *(begin + offset);
- return begin + offset + 1;
+ return begin + offset;
}
static guint8 *
dns_request_reply_cmp (struct rspamd_dns_request *req, guint8 *in, int len)
{
guint8 *p, *c, *l1, *l2;
- guint8 len1, len2;
+ guint16 len1, len2;
+ gint decompressed = 0;
/* QR format:
* labels - len:octets
@@ -554,17 +558,21 @@ dns_request_reply_cmp (struct rspamd_dns_request *req, guint8 *in, int len)
return NULL;
}
/* This may be compressed, so we need to decompress it */
- if (len1 & 0xC0) {
+ if (len1 & DNS_COMPRESSION_BITS) {
l1 = decompress_label (in, &len1);
- p ++;
+ decompressed ++;
+ l1 ++;
+ p += 2;
}
else {
l1 = ++p;
p += len1;
}
- if (len2 & 0xC0) {
+ if (len2 & DNS_COMPRESSION_BITS) {
l2 = decompress_label (req->packet, &len2);
- c ++;
+ decompressed ++;
+ l2 ++;
+ c += 2;
}
else {
l2 = ++c;
@@ -580,6 +588,9 @@ dns_request_reply_cmp (struct rspamd_dns_request *req, guint8 *in, int len)
if (memcmp (l1, l2, len1) != 0) {
return NULL;
}
+ if (decompressed == 2) {
+ break;
+ }
}
/* p now points to the end of QR section */
@@ -590,50 +601,175 @@ dns_request_reply_cmp (struct rspamd_dns_request *req, guint8 *in, int len)
return NULL;
}
+#define MAX_RECURSION_LEVEL 10
+
static gboolean
-dns_parse_rr (union rspamd_reply_element *elt, guint8 **pos, struct rspamd_dns_reply *rep, int *remain)
+dns_parse_labels (guint8 *in, char **target, guint8 **pos, struct rspamd_dns_reply *rep, int *remain, gboolean make_name)
{
- guint8 *p = *pos;
- guint16 type, datalen;
-
- /* Skip the whole name */
- while (p - *pos < *remain) {
- if (*p & 0xC0) {
- p ++;
+ guint16 namelen = 0;
+ guint8 *p = *pos, *begin = *pos, *l, *t;
+ guint16 llen;
+ gint offset = -1;
+ gint length = *remain;
+ gint ptrs = 0, labels = 0;
+
+ /* First go through labels and calculate name length */
+ while (p - begin < length) {
+ if (ptrs > MAX_RECURSION_LEVEL) {
+ msg_warn ("dns pointers are nested too much");
+ return FALSE;
}
- else if (*p == 0) {
- p ++;
+ llen = *p;
+ if (llen == 0) {
break;
}
+ else if (llen & DNS_COMPRESSION_BITS) {
+ ptrs ++;
+ memcpy (&llen, p, sizeof (guint16));
+ l = decompress_label (in, &llen);
+ if (offset < 0) {
+ offset = p - begin + 2;
+ }
+ if (l < in || l > begin + length) {
+ msg_warn ("invalid pointer in DNS packet");
+ return FALSE;
+ }
+ begin = l;
+ p = l + *l + 1;
+ namelen += *p;
+ labels ++;
+ }
else {
+ namelen += *p;
p += *p + 1;
+ labels ++;
}
}
+
+ if (!make_name) {
+ goto end;
+ }
+ *target = memory_pool_alloc (rep->request->pool, namelen + labels + 1);
+ t = (guint8 *)*target;
+ p = *pos;
+ /* Now copy labels to name */
+ while (p - begin < length) {
+ llen = *p;
+ if (llen == 0) {
+ break;
+ }
+ else if (llen & DNS_COMPRESSION_BITS) {
+ memcpy (&llen, p, sizeof (guint16));
+ l = decompress_label (in, &llen);
+ begin = p;
+ p = l + *l + 1;
+ namelen += *p;
+ }
+ else {
+ memcpy (t, p + 1, *p);
+ t += *p;
+ *t ++ = '.';
+ p += *p + 1;
+ }
+ }
+ *t = '\0';
+end:
+ if (offset < 0) {
+ offset = p - begin;
+ }
+ *remain -= offset;
+ *pos += offset;
+
+ return TRUE;
+}
+
+#define GET16(x) do {if (*remain < sizeof (guint16)) {goto err;} memcpy (&(x), p, sizeof (guint16)); (x) = ntohs ((x)); p += sizeof (guint16); *remain -= sizeof (guint16); } while(0)
+#define GET32(x) do {if (*remain < sizeof (guint32)) {goto err;} memcpy (&(x), p, sizeof (guint32)); (x) = ntohl ((x)); p += sizeof (guint32); *remain -= sizeof (guint32); } while(0)
+
+static gboolean
+dns_parse_rr (guint8 *in, union rspamd_reply_element *elt, guint8 **pos, struct rspamd_dns_reply *rep, int *remain)
+{
+ guint8 *p = *pos;
+ guint16 type, datalen;
+ guint16 addrcount;
+
+ /* Skip the whole name */
+ if (! dns_parse_labels (in, NULL, &p, rep, remain, FALSE)) {
+ msg_info ("bad RR name");
+ return FALSE;
+ }
if (p - *pos >= *remain - sizeof (guint16) * 5) {
msg_info ("stripped dns reply");
return FALSE;
}
- type = *((guint16 *)p);
+ GET16 (type);
/* Skip ttl and class */
- p += sizeof (guint16) * 2 + sizeof (guint32);
- datalen = *((guint16 *)p);
- p += sizeof (guint16);
- *remain -= p - *pos;
+ p += sizeof (guint16) + sizeof (guint32);
+ *remain -= sizeof (guint16) + sizeof (guint32);
+ GET16 (datalen);
/* Now p points to RR data */
switch (type) {
case DNS_T_A:
- if ((datalen & 0x3) && *remain >= datalen) {
- elt->a.addr[0].s_addr = *((guint32 *)p);
- p += sizeof (guint32);
+ if (rep->request->type != DNS_REQUEST_A) {
+ p += datalen;
}
else {
- msg_info ("corrupted A record");
- return FALSE;
+ if (!(datalen & 0x3) && datalen <= *remain) {
+ addrcount = MIN (elt->a.addrcount + (datalen >> 2), MAX_ADDRS);
+ memcpy (&elt->a.addr[elt->a.addrcount], p, addrcount * sizeof (struct in_addr));
+ p += datalen;
+ elt->a.addrcount += addrcount;
+ }
+ else {
+ msg_info ("corrupted A record");
+ return FALSE;
+ }
+ }
+ break;
+ case DNS_T_PTR:
+ if (rep->request->type != DNS_REQUEST_PTR) {
+ p += datalen;
+ }
+ else {
+ if (! dns_parse_labels (in, &elt->ptr.name, &p, rep, remain, TRUE)) {
+ msg_info ("invalid labels in PTR record");
+ return FALSE;
+ }
}
break;
+ case DNS_T_MX:
+ if (rep->request->type != DNS_REQUEST_MX) {
+ p += datalen;
+ }
+ else {
+ GET16 (elt->mx.priority);
+ if (! dns_parse_labels (in, &elt->mx.name, &p, rep, remain, TRUE)) {
+ msg_info ("invalid labels in MX record");
+ return FALSE;
+ }
+ }
+ break;
+ case DNS_T_TXT:
+ if (rep->request->type != DNS_REQUEST_TXT) {
+ p += datalen;
+ }
+ else {
+ elt->txt.data = memory_pool_alloc (rep->request->pool, datalen + 1);
+ memcpy (elt->txt.data, p, datalen);
+ *(elt->txt.data + datalen) = '\0';
+ }
+ break;
+ default:
+ msg_info ("unexpected RR type: %d", type);
}
*remain -= datalen;
*pos = p;
+
+ return TRUE;
+
+err:
+ msg_info ("incomplete RR, only %d bytes remain, packet length %d", (int)*remain, (int)(*pos - in));
+ return FALSE;
}
static struct rspamd_dns_reply *
@@ -665,21 +801,27 @@ dns_parse_reply (guint8 *in, int r, struct rspamd_dns_resolver *resolver)
return NULL;
}
/*
+ * Remove delayed retransmits for this packet
+ */
+ event_del (&req->timer_event);
+ /*
* Now pos is in answer section, so we should extract data and form reply
*/
rep = memory_pool_alloc (req->pool, sizeof (struct rspamd_dns_reply));
rep->request = req;
rep->type = req->type;
rep->elements = NULL;
+ rep->code = ntohs (header->rcode);
r -= pos - in;
/* Extract RR records */
- for (i = 0; i < header->ancount; i ++) {
+ for (i = 0; i < ntohs (header->ancount); i ++) {
elt = memory_pool_alloc (req->pool, sizeof (union rspamd_reply_element));
- if (! dns_parse_rr (elt, &pos, rep, &r)) {
+ if (! dns_parse_rr (in, elt, &pos, rep, &r)) {
msg_info ("incomplete reply");
break;
}
+ rep->elements = g_list_prepend (rep->elements, elt);
}
return rep;
@@ -689,8 +831,7 @@ static void
dns_read_cb (int fd, short what, void *arg)
{
struct rspamd_dns_resolver *resolver = arg;
- int i, r;
- struct rspamd_dns_server *serv;
+ int r;
struct rspamd_dns_reply *rep;
guint8 in[UDP_PACKET_SIZE];
@@ -700,23 +841,30 @@ dns_read_cb (int fd, short what, void *arg)
r = read (fd, in, sizeof (in));
if (r > 96) {
if ((rep = dns_parse_reply (in, r, resolver)) != NULL) {
-
+ rep->request->func (rep, rep->request->arg);
+ upstream_ok (&rep->request->server->up, time (NULL));
+ return;
}
}
+
}
static void
dns_timer_cb (int fd, short what, void *arg)
{
struct rspamd_dns_request *req = arg;
-
+ struct rspamd_dns_reply *rep;
+ int r;
/* Retransmit dns request */
req->retransmits ++;
if (req->retransmits >= req->resolver->max_retransmits) {
msg_err ("maximum number of retransmits expired");
event_del (&req->timer_event);
- /* XXX: call user's callback here */
+ rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
+ rep->request = req;
+ rep->code = DNS_RC_SERVFAIL;
+ req->func (rep, req->arg);
return;
}
/* Select other server */
@@ -725,7 +873,10 @@ dns_timer_cb (int fd, short what, void *arg)
time (NULL), DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS);
if (req->server == NULL) {
event_del (&req->timer_event);
- /* XXX: call user's callback here */
+ rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
+ rep->request = req;
+ rep->code = DNS_RC_SERVFAIL;
+ req->func (rep, req->arg);
return;
}
@@ -736,18 +887,31 @@ dns_timer_cb (int fd, short what, void *arg)
if (req->sock == -1) {
event_del (&req->timer_event);
- /* XXX: call user's callback here */
+ rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
+ rep->request = req;
+ rep->code = DNS_RC_SERVFAIL;
+ req->func (rep, req->arg);
return;
}
/* Add other retransmit event */
evtimer_add (&req->timer_event, &req->tv);
+ r = send_dns_request (req);
+ if (r == -1) {
+ event_del (&req->io_event);
+ rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
+ rep->request = req;
+ rep->code = DNS_RC_SERVFAIL;
+ req->func (rep, req->arg);
+ upstream_fail (&req->server->up, time (NULL));
+ }
}
static void
dns_retransmit_handler (int fd, short what, void *arg)
{
struct rspamd_dns_request *req = arg;
+ struct rspamd_dns_reply *rep;
gint r;
if (what == EV_WRITE) {
@@ -756,13 +920,19 @@ dns_retransmit_handler (int fd, short what, void *arg)
if (req->retransmits >= req->resolver->max_retransmits) {
msg_err ("maximum number of retransmits expired");
event_del (&req->io_event);
- /* XXX: call user's callback here */
+ rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
+ rep->request = req;
+ rep->code = DNS_RC_SERVFAIL;
+ req->func (rep, req->arg);
return;
}
r = send_dns_request (req);
if (r == -1) {
event_del (&req->io_event);
- /* XXX: call user's callback here */
+ rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
+ rep->request = req;
+ rep->code = DNS_RC_SERVFAIL;
+ req->func (rep, req->arg);
upstream_fail (&req->server->up, time (NULL));
}
else if (r == 1) {
@@ -794,6 +964,7 @@ make_dns_request (struct rspamd_dns_resolver *resolver,
req->resolver = resolver;
req->func = cb;
req->arg = ud;
+ req->type = type;
va_start (args, type);
switch (type) {
@@ -969,12 +1140,13 @@ dns_resolver_init (struct config_file *cfg)
/* Now init all servers */
for (i = 0; i < new->servers_num; i ++) {
serv = &new->servers[i];
- serv->sock = make_udp_socket (&serv->addr, htons (53), FALSE, TRUE);
+ serv->sock = make_udp_socket (&serv->addr, 53, FALSE, TRUE);
if (serv->sock == -1) {
msg_warn ("cannot create socket to server %s", serv->name);
}
else {
- event_set (&serv->ev, serv->sock, EV_READ, dns_read_cb, new);
+ event_set (&serv->ev, serv->sock, EV_READ | EV_PERSIST, dns_read_cb, new);
+ event_add (&serv->ev, NULL);
}
}
diff --git a/src/dns.h b/src/dns.h
index 6573377cb..7bac41037 100644
--- a/src/dns.h
+++ b/src/dns.h
@@ -118,6 +118,7 @@ enum dns_rcode {
struct rspamd_dns_reply {
enum rspamd_request_type type;
struct rspamd_dns_request *request;
+ enum dns_rcode code;
GList *elements;
};
diff --git a/src/logger.h b/src/logger.h
index 8e517541e..fe6e0bda5 100644
--- a/src/logger.h
+++ b/src/logger.h
@@ -66,7 +66,7 @@ void rspamd_conditional_debug (uint32_t addr, const char *function, const char *
/* Typical functions */
/* Logging in postfix style */
-#if (defined(RSPAMD_MAIN) || defined(RSPAMD_LIB))
+#if (defined(RSPAMD_MAIN) || defined(RSPAMD_LIB) || defined(RSPAMD_TEST))
#define msg_err(args...) rspamd_common_log_function(G_LOG_LEVEL_CRITICAL, __FUNCTION__, ##args)
#define msg_warn(args...) rspamd_common_log_function(G_LOG_LEVEL_WARNING, __FUNCTION__, ##args)
#define msg_info(args...) rspamd_common_log_function(G_LOG_LEVEL_INFO, __FUNCTION__, ##args)
diff --git a/src/main.h b/src/main.h
index defa25c0d..b729e87de 100644
--- a/src/main.h
+++ b/src/main.h
@@ -64,6 +64,7 @@ struct classifier;
struct classifier_config;
struct mime_part;
struct rspamd_view;
+struct rspamd_dns_resolver;
/**
* Server statistics
@@ -219,6 +220,8 @@ struct worker_task {
uint32_t parser_recursion; /**< for avoiding recursion stack overflow */
gboolean (*fin_callback)(void *arg); /**< calback for filters finalizing */
void *fin_arg; /**< argument for fin callback */
+
+ struct rspamd_dns_resolver *resolver; /**< DNS resolver */
};
/**
diff --git a/src/mem_pool.c b/src/mem_pool.c
index 8d5c22ad1..3398cbf41 100644
--- a/src/mem_pool.c
+++ b/src/mem_pool.c
@@ -86,7 +86,6 @@ pool_chain_new_shared (memory_pool_ssize_t size)
chain = (struct _pool_chain_shared *)mmap (NULL, size + sizeof (struct _pool_chain_shared), PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0);
g_assert (chain != MAP_FAILED);
chain->begin = ((u_char *) chain) + sizeof (struct _pool_chain_shared);
- g_assert (chain->begin != MAP_FAILED);
#elif defined(HAVE_MMAP_ZERO)
int fd;
@@ -97,13 +96,12 @@ pool_chain_new_shared (memory_pool_ssize_t size)
chain = (struct _pool_chain_shared *)mmap (NULL, size + sizeof (struct _pool_chain_shared), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
g_assert (chain != MAP_FAILED);
chain->begin = ((u_char *) chain) + sizeof (struct _pool_chain_shared);
- g_assert (chain->begin != MAP_FAILED);
#else
# error No mmap methods are defined
#endif
chain->len = size;
chain->pos = chain->begin;
- chain->lock = 0;
+ chain->lock = NULL;
chain->next = NULL;
STAT_LOCK ();
mem_pool_stat->shared_chunks_allocated++;
@@ -387,7 +385,9 @@ memory_pool_lock_shared (memory_pool_t * pool, void *pointer)
if (chain == NULL) {
return;
}
-
+ if (chain->lock == NULL) {
+ chain->lock = memory_pool_get_mutex (pool);
+ }
memory_pool_lock_mutex (chain->lock);
}
@@ -400,6 +400,10 @@ memory_pool_unlock_shared (memory_pool_t * pool, void *pointer)
if (chain == NULL) {
return;
}
+ if (chain->lock == NULL) {
+ chain->lock = memory_pool_get_mutex (pool);
+ return;
+ }
memory_pool_unlock_mutex (chain->lock);
}
diff --git a/src/statfile.c b/src/statfile.c
index 08b271065..662a70e74 100644
--- a/src/statfile.c
+++ b/src/statfile.c
@@ -130,7 +130,7 @@ statfile_pool_check (stat_file_t * file)
}
if (file->len < sizeof (struct stat_file)) {
- msg_info ("file %s is too short to be stat file: %zd", file->filename, file->len);
+ msg_info ("file %s is too short to be stat file: %z", file->filename, file->len);
return -1;
}
@@ -304,14 +304,14 @@ statfile_pool_open (statfile_pool_t * pool, char *filename, size_t size, gboolea
}
if (!forced && st.st_size > pool->max) {
- msg_info ("cannot attach file to pool, too large: %zd", (size_t) st.st_size);
+ msg_info ("cannot attach file to pool, too large: %z", (size_t) st.st_size);
return NULL;
}
memory_pool_lock_mutex (pool->lock);
if (!forced && abs (st.st_size - size) > sizeof (struct stat_file)) {
memory_pool_unlock_mutex (pool->lock);
- msg_warn ("need to reindex statfile old size: %zd, new size: %zd", st.st_size, size);
+ msg_warn ("need to reindex statfile old size: %z, new size: %z", st.st_size, size);
return statfile_pool_reindex (pool, filename, st.st_size, size);
}
memory_pool_unlock_mutex (pool->lock);
@@ -454,7 +454,7 @@ statfile_pool_create (statfile_pool_t * pool, char *filename, size_t size)
/* Buffer for write 256 blocks at once */
if (nblocks > 256) {
- buflen = MIN (nblocks / 256 * sizeof (block), sizeof (block) * 256);
+ buflen = sizeof (block) * 256;
buf = g_malloc0 (buflen);
}
diff --git a/src/worker.c b/src/worker.c
index 02077d5c9..11bb24867 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -36,6 +36,7 @@
#include "modules.h"
#include "message.h"
#include "map.h"
+#include "dns.h"
#include "evdns/evdns.h"
@@ -70,13 +71,19 @@ struct custom_filter {
#endif
-static struct timeval io_tv;
-/* Detect whether this worker is mime worker */
-static gboolean is_mime;
-
-/* Detect whether this worker bypass normal filters and is using custom filters */
-static gboolean is_custom;
-static GList *custom_filters;
+/*
+ * Worker's context
+ */
+struct rspamd_worker_ctx {
+ struct timeval io_tv;
+ /* Detect whether this worker is mime worker */
+ gboolean is_mime;
+ /* Detect whether this worker is mime worker */
+ gboolean is_custom;
+ GList *custom_filters;
+ /* DNS resolver */
+ struct rspamd_dns_resolver *resolver;
+};
static gboolean write_socket (void *arg);
@@ -150,8 +157,9 @@ fin_custom_filters (struct worker_task *task)
GList *cur, *curd;
struct custom_filter *filt;
char *output = NULL, *log = NULL;
+ struct rspamd_worker_ctx *ctx = task->worker->ctx;
- cur = custom_filters;
+ cur = ctx->custom_filters;
curd = task->rcpt;
while (cur) {
filt = cur->data;
@@ -183,8 +191,9 @@ parse_line_custom (struct worker_task *task, f_str_t *in)
struct custom_filter *filt;
char *output = NULL;
gboolean res = TRUE;
+ struct rspamd_worker_ctx *ctx = task->worker->ctx;
- cur = custom_filters;
+ cur = ctx->custom_filters;
curd = task->rcpt;
while (cur) {
filt = cur->data;
@@ -280,12 +289,14 @@ static gboolean
read_socket (f_str_t * in, void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
+ struct rspamd_worker_ctx *ctx;
ssize_t r;
+ ctx = task->worker->ctx;
switch (task->state) {
case READ_COMMAND:
case READ_HEADER:
- if (is_custom) {
+ if (ctx->is_custom) {
if (! parse_line_custom (task, in)) {
task->last_error = "Read error";
task->error_code = RSPAMD_NETWORK_ERROR;
@@ -352,13 +363,16 @@ static gboolean
write_socket (void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
+ struct rspamd_worker_ctx *ctx;
+
+ ctx = task->worker->ctx;
switch (task->state) {
case WRITE_REPLY:
if (! write_reply (task)) {
return FALSE;
}
- if (is_custom) {
+ if (ctx->is_custom) {
fin_custom_filters (task);
}
destroy_session (task->s);
@@ -368,7 +382,7 @@ write_socket (void *arg)
if (! write_reply (task)) {
return FALSE;
}
- if (is_custom) {
+ if (ctx->is_custom) {
fin_custom_filters (task);
}
destroy_session (task->s);
@@ -376,7 +390,7 @@ write_socket (void *arg)
break;
case CLOSING_CONNECTION:
debug_task ("normally closing connection");
- if (is_custom) {
+ if (ctx->is_custom) {
fin_custom_filters (task);
}
destroy_session (task->s);
@@ -384,7 +398,7 @@ write_socket (void *arg)
break;
default:
msg_info ("abnormally closing connection");
- if (is_custom) {
+ if (ctx->is_custom) {
fin_custom_filters (task);
}
destroy_session (task->s);
@@ -401,9 +415,12 @@ static void
err_socket (GError * err, void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
+ struct rspamd_worker_ctx *ctx;
+
+ ctx = task->worker->ctx;
msg_info ("abnormally closing connection, error: %s", err->message);
/* Free buffers */
- if (is_custom) {
+ if (ctx->is_custom) {
fin_custom_filters (task);
}
destroy_session (task->s);
@@ -434,8 +451,7 @@ construct_task (struct rspamd_worker *worker)
if (gettimeofday (&new_task->tv, NULL) == -1) {
msg_warn ("gettimeofday failed: %s", strerror (errno));
}
- io_tv.tv_sec = WORKER_IO_TIMEOUT;
- io_tv.tv_usec = 0;
+
new_task->task_pool = memory_pool_new (memory_pool_get_size ());
/* Add destructor for recipients list (it would be better to use anonymous function here */
@@ -458,6 +474,7 @@ static void
accept_socket (int fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct rspamd_worker_ctx *ctx;
union sa_union su;
struct worker_task *new_task;
GList *cur;
@@ -466,6 +483,7 @@ accept_socket (int fd, short what, void *arg)
socklen_t addrlen = sizeof (su.ss);
int nfd;
+ ctx = worker->ctx;
if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
msg_warn ("accept failed: %s", strerror (errno));
return;
@@ -487,17 +505,20 @@ accept_socket (int fd, short what, void *arg)
}
new_task->sock = nfd;
- new_task->is_mime = is_mime;
+ new_task->is_mime = ctx->is_mime;
worker->srv->stat->connections_count++;
+ new_task->resolver = ctx->resolver;
+ ctx->io_tv.tv_sec = WORKER_IO_TIMEOUT;
+ ctx->io_tv.tv_usec = 0;
/* Set up dispatcher */
- new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &io_tv, (void *)new_task);
+ new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &ctx->io_tv, (void *)new_task);
new_task->dispatcher->peer_addr = new_task->client_addr.s_addr;
/* Init custom filters */
#ifndef BUILD_STATIC
- if (is_custom) {
- cur = custom_filters;
+ if (ctx->is_custom) {
+ cur = ctx->custom_filters;
while (cur) {
filt = cur->data;
if (filt->before_connect) {
@@ -515,7 +536,7 @@ accept_socket (int fd, short what, void *arg)
#ifndef BUILD_STATIC
static gboolean
-load_custom_filter (struct config_file *cfg, const char *file)
+load_custom_filter (struct config_file *cfg, const char *file, struct rspamd_worker_ctx *ctx)
{
struct custom_filter *filt;
struct stat st;
@@ -548,7 +569,7 @@ load_custom_filter (struct config_file *cfg, const char *file)
filt->init_func (cfg);
filt->filename = g_strdup (file);
- custom_filters = g_list_prepend (custom_filters, filt);
+ ctx->custom_filters = g_list_prepend (ctx->custom_filters, filt);
return TRUE;
}
@@ -561,6 +582,7 @@ load_custom_filters (struct rspamd_worker *worker, const char *path)
{
glob_t gp;
int r, i;
+ struct rspamd_worker_ctx *ctx = worker->ctx;
gp.gl_offs = 0;
if ((r = glob (path, GLOB_NOSORT, NULL, &gp)) != 0) {
@@ -569,7 +591,7 @@ load_custom_filters (struct rspamd_worker *worker, const char *path)
}
for (i = 0; i < gp.gl_pathc; i ++) {
- if (! load_custom_filter (worker->srv->cfg, gp.gl_pathv[i])) {
+ if (! load_custom_filter (worker->srv->cfg, gp.gl_pathv[i], ctx)) {
globfree (&gp);
return FALSE;
}
@@ -581,12 +603,12 @@ load_custom_filters (struct rspamd_worker *worker, const char *path)
}
static void
-unload_custom_filters (void)
+unload_custom_filters (struct rspamd_worker_ctx *ctx)
{
GList *cur;
struct custom_filter *filt;
- cur = custom_filters;
+ cur = ctx->custom_filters;
while (cur) {
filt = cur->data;
if (filt->fin_func) {
@@ -597,7 +619,7 @@ unload_custom_filters (void)
cur = g_list_next (cur);
}
- g_list_free (custom_filters);
+ g_list_free (ctx->custom_filters);
}
#endif
@@ -611,6 +633,7 @@ start_worker (struct rspamd_worker *worker)
struct sigaction signals;
char *is_mime_str;
char *is_custom_str;
+ struct rspamd_worker_ctx *ctx;
#ifdef WITH_PROFILER
extern void _start (void), etext (void);
@@ -635,12 +658,16 @@ start_worker (struct rspamd_worker *worker)
event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
event_add (&worker->bind_ev, NULL);
+ /* Fill ctx */
+ ctx = g_malloc0 (sizeof (struct rspamd_worker_ctx));
+ worker->ctx = ctx;
+
#ifndef BUILD_STATIC
/* Check if this worker is not usual rspamd worker, but uses custom filters from specified path */
is_custom_str = g_hash_table_lookup (worker->cf->params, "custom_filters");
if (is_custom_str && g_module_supported () && load_custom_filters (worker, is_custom_str)) {
msg_info ("starting custom process, loaded modules from %s", is_custom_str);
- is_custom = TRUE;
+ ctx->is_custom = TRUE;
}
else {
#endif
@@ -649,20 +676,22 @@ start_worker (struct rspamd_worker *worker)
/* Check whether we are mime worker */
is_mime_str = g_hash_table_lookup (worker->cf->params, "mime");
if (is_mime_str != NULL && (g_ascii_strcasecmp (is_mime_str, "no") == 0 || g_ascii_strcasecmp (is_mime_str, "false") == 0)) {
- is_mime = FALSE;
+ ctx->is_mime = FALSE;
}
else {
- is_mime = TRUE;
+ ctx->is_mime = TRUE;
}
#ifndef BUILD_STATIC
}
#endif
+ ctx->resolver = dns_resolver_init (worker->srv->cfg);
+
event_loop (0);
#ifndef BUILD_STATIC
- if (is_custom) {
- unload_custom_filters ();
+ if (ctx->is_custom) {
+ unload_custom_filters (ctx);
}
#endif