aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2008-06-09 14:20:34 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2008-06-09 14:20:34 +0400
commit2564777f536f6fde73ddaf24eaf4697775970e13 (patch)
tree86a4c3c87b3618f1e56d19f52948ff754f909704
parent8dbb9984edd945090c2c8ae27f4cfede944ba912 (diff)
downloadrspamd-2564777f536f6fde73ddaf24eaf4697775970e13.tar.gz
rspamd-2564777f536f6fde73ddaf24eaf4697775970e13.zip
* Add gnome mime parser to rspamd
* Add some protocol parser with evbuffer * Add pkg-check to configure script for detecting gmime and glib
-rw-r--r--cfg_file.h1
-rwxr-xr-xconfigure62
-rw-r--r--fstring.h8
-rw-r--r--main.h17
-rw-r--r--mime.c116
-rw-r--r--mime.h95
-rw-r--r--worker.c255
7 files changed, 338 insertions, 216 deletions
diff --git a/cfg_file.h b/cfg_file.h
index 53e0a7677..37e0cd4d5 100644
--- a/cfg_file.h
+++ b/cfg_file.h
@@ -14,6 +14,7 @@
#endif
#include <netinet/in.h>
#include <sys/un.h>
+#include <event.h>
#include "upstream.h"
#include "memcached.h"
diff --git a/configure b/configure
index 672307866..74e00b7d2 100755
--- a/configure
+++ b/configure
@@ -6,6 +6,8 @@ LEX=""
YACC=""
OS=""
+PKG_CONFIG=`which pkg-config`
+
LOCALBASE=/usr/local
PREFIX=$LOCALBASE
@@ -17,15 +19,15 @@ LEX_SRC="cfg_file.l"
YACC_OUTPUT="cfg_yacc.c"
LEX_OUTPUT="cfg_lex.c"
-SOURCES="upstream.c cfg_utils.c memcached.c main.c util.c worker.c fstring.c mime.c ${LEX_OUTPUT} ${YACC_OUTPUT}"
+SOURCES="upstream.c cfg_utils.c memcached.c main.c util.c worker.c fstring.c ${LEX_OUTPUT} ${YACC_OUTPUT}"
-CFLAGS="$CFLAGS -W -Wall -Wpointer-arith -Wno-unused-parameter"
+CFLAGS="$CFLAGS -W -Wpointer-arith -Wno-unused-parameter"
CFLAGS="$CFLAGS -Wno-unused-function -Wunused-variable -Wno-sign-compare"
CFLAGS="$CFLAGS -Wunused-value -ggdb -I${LOCALBASE}/include"
-CFLAGS="$CFLAGS -DRVERSION=\\\"${VERSION}\\\" -DHASH_COMPAT"
+CFLAGS="$CFLAGS "
LDFLAGS="$LDFLAGS -L/usr/lib -L${LOCALBASE}/lib"
OPT_FLAGS="-O -pipe -fno-omit-frame-pointer"
-DEPS="cfg_file.h memcached.h util.h main.h upstream.h fstring.h mime.h ${LEX_OUTPUT} ${YACC_OUTPUT}"
+DEPS="cfg_file.h memcached.h util.h main.h upstream.h fstring.h ${LEX_OUTPUT} ${YACC_OUTPUT}"
EXEC=rspamd
USER=postfix
GROUP=postfix
@@ -251,6 +253,50 @@ check_lib()
fi
}
+check_package()
+{
+ PACKAGE=$1
+ PLIBS=`$PKG_CONFIG --libs $PACKAGE`
+ PCFLAGS=`$PKG_CONFIG --cflags $PACKAGE`
+ while [ $# -ne 1 -a -n $1 ] ; do
+ shift
+ if [ "F$INCLUDE" = "F" ] ; then
+ INCLUDE="$1"
+ else
+ INCLUDE="$INCLUDE $1"
+ fi
+ done
+ echo -n "Testing for $PACKAGE: "
+ echo >> config.log
+ echo "Testing for $PACKAGE: " >> config.log
+ echo "#include <sys/types.h>" > autotest.c
+ if [ "F$INCLUDE" != "F" ] ; then
+ for inc in $INCLUDE ; do
+ echo "#include \"$inc\"" >> autotest.c
+ done
+ fi
+ echo "#include <stdlib.h>" >> autotest.c
+ echo "int main (int argc, char **argv) { return 0; }" >> autotest.c
+ echo "$GCC $CFLAGS $PCFLAGS $PTHREAD_CFLAGS -o autotest $LDFLAGS $LIBS $PLIBS $PTHREAD_LDFLAGS autotest.c" >>config.log
+ $GCC $CFLAGS $PCFLAGS $PTHREAD_CFLAGS -o autotest $LDFLAGS $LIBS $PLIBS $PTHREAD_LDFLAGS autotest.c >>config.log 2>&1
+ if [ $? -eq 0 ] ; then
+ echo "found"
+ LIBS="$LIBS $PLIBS"
+ CFLAGS="$CFLAGS $PCFLAGS"
+ cleanup
+ echo "-> OK" >> config.log
+ return 0
+ else
+ echo "not found"
+ echo "-> FAILED" >> config.log
+ echo "Failed program was:" >> config.log
+ cat autotest.c >> config.log
+ cleanup
+ return 1
+ fi
+
+}
+
check_os()
{
_OS=`uname -s`
@@ -304,6 +350,11 @@ write_result()
echo "Ldflags: $LDFLAGS" >> config.log
echo "Libs: $LIBS" >> config.log
OBJECTS=`echo $SOURCES | sed -e 's/\.c/\.o/g'`
+ # Make CFLAGS more readable
+ CFLAGS="$CFLAGS -DRVERSION=\\\"${VERSION}\\\" -DHASH_COMPAT"
+ CFLAGS=`echo $CFLAGS | sed -e 's/[ ]/ \\\\\\n/g' | sort -r -k2 | uniq`
+ LDFLAGS=`echo $LDLAGS | sed -e 's/[ ]/ \\\\\\n/g' | sort -r -k2 | uniq`
+ LIBS=`echo $LIBS | sed -e 's/[ ]/ \\\\\\n/g' | sort -r -k2 | uniq`
cat > $MAKEFILE << END
# This is ${EXEC} Makefile
# For options edit Makefile.in, this file is autogenerated by configure
@@ -505,6 +556,9 @@ else
CFLAGS="$CFLAGS -DHAVE_PATH_MAX"
fi
+check_package "glib-2.0" "glib.h"
+check_package "gmime-2.0" "gmime/gmime.h"
+
check_group $GROUP
if [ $? -ne 0 ] ; then
TARGETS="$TARGETS creategroup"
diff --git a/fstring.h b/fstring.h
index 4c831f052..c13860d3e 100644
--- a/fstring.h
+++ b/fstring.h
@@ -7,12 +7,20 @@
#include <sys/types.h>
+#define update_buf_size(x) (x)->free = (x)->buf->size - ((x)->pos - (x)->buf->begin); (x)->buf->len = (x)->pos - (x)->buf->begin
+
typedef struct f_str_s {
char *begin;
size_t len;
size_t size;
} f_str_t;
+typedef struct f_str_buf_s {
+ f_str_t *buf;
+ char *pos;
+ size_t free;
+} f_str_buf_t;
+
typedef struct f_tok_s {
f_str_t word;
size_t pos;
diff --git a/main.h b/main.h
index fa5685526..4c4a8aaac 100644
--- a/main.h
+++ b/main.h
@@ -15,6 +15,9 @@
#include <arpa/inet.h>
#include <signal.h>
+#include <event.h>
+
+#include "fstring.h"
/* Default values */
#define FIXED_CONFIG_FILE "./rspamd.conf"
@@ -41,6 +44,8 @@ struct rspamd_worker {
TAILQ_ENTRY (rspamd_worker) next;
struct rspamd_main *srv;
enum process_type type;
+ struct event sig_ev;
+ struct event bind_ev;
};
struct pidfh;
@@ -59,7 +64,17 @@ struct rspamd_main {
};
struct worker_task {
- int id;
+ struct rspamd_worker *worker;
+ enum {
+ READ_COMMAND,
+ READ_HEADER,
+ READ_MESSAGE,
+ WRITE_REPLY,
+ WRITE_ERROR,
+ } state;
+ size_t content_length;
+ f_str_buf_t *msg;
+ struct bufferevent *bev;
};
void start_worker (struct rspamd_worker *worker, int listen_sock);
diff --git a/mime.c b/mime.c
deleted file mode 100644
index 14b23247f..000000000
--- a/mime.c
+++ /dev/null
@@ -1,116 +0,0 @@
-#include <stdlib.h>
-#include <ctype.h>
-
-#include "mime.h"
-#include "fstring.h"
-
-/*
- * Quoted printable and base64 decoders for mime parser
- */
-
-static f_str_t *
-base64decode (f_str_t *src)
-{
- int bits = 0, buf = 0, padding = 0, v;
- size_t pos;
- char c;
- f_str_t *res;
-
- res = fstralloc (src->len);
- if (res == NULL) {
- return NULL;
- }
-
- for (pos = 0; pos < src->len; pos ++) {
- c = *(src->begin + pos);
- if (c >= 'A' && c <= 'Z') {
- v = c - 'A';
- }
- else if (c >= 'a' && c <= 'z') {
- v = c - 'a' + 26;
- }
- else if (c >= '0' && c <= '9') {
- v = c - '0' + 52;
- }
- else if ('+' == c) {
- v = 62;
- }
- else if ('/' == c) {
- v = 63;
- }
- else if ('=' == c) {
- padding++;
- continue;
- }
- else {
- continue;
- }
- if (padding) {
- padding = 0;
- }
- buf = buf << 6 | v;
- bits += 6;
- if (bits >= 8) {
- c = 255 & (buf >> (bits - 8));
- fstrpush (res, c);
- }
- }
-
- return res;
-}
-
-static f_str_t *
-qpdecode (f_str_t *src, short header)
-{
- f_str_t *res;
- size_t pos;
- char c;
-
- res = fstralloc (src->len);
- if (res == NULL) {
- return NULL;
- }
-
- for (pos = 0; pos < src->len; pos++) {
- c = *(src->begin + pos);
- if (header && '_' == c) {
- c = 0x20;
- }
- else if ('=' == c && pos + 3 <= src->len && isxdigit (fstridx (src, pos + 1)) && isxdigit (fstridx (src, pos + 2))) {
- if (isdigit (fstridx (src, pos + 2))) {
- if (isdigit (fstridx (src, pos + 1))) {
- c = (toupper (fstridx (src, pos + 2)) - '0') | (16 * (fstridx (src, pos + 1) - '0'));
- }
- else {
- c = (toupper (fstridx (src, pos + 2)) - '0') | (16 * (toupper (fstridx (src, pos + 1)) - 'A' + 10));
- }
- }
- else if (isdigit (fstridx (src, pos + 1))) {
- c = (toupper (fstridx (src, pos + 2)) - 'A' + 10) | (16 * (fstridx (src, pos + 1) - '0'));
- }
- else {
- c = (toupper (fstridx (src, pos + 2)) - 'A' + 10) | (16 * (toupper (fstridx (src, pos + 1)) - 'A' + 10));
- }
- pos += 2;
- }
- else if ('=' == c && pos + 2 <= src->len && ('\r' == fstridx (src, pos + 1) || '\n' == fstridx (src, pos + 1))) {
- if ('\r' == fstridx (src, pos + 1)) {
- if (pos + 3 <= src->len && '\n' == fstridx (src, pos + 2)) {
- pos ++;
- }
- pos ++;
- }
- if ('\n' == fstridx (src, pos + 1)) {
- if (pos + 3 <= src->len && '\r' == fstridx (src, pos + 2)) {
- pos ++;
- }
- pos ++;
- }
- continue;
- }
- fstrpush (res, c);
- }
-
- return res;
-}
-
diff --git a/mime.h b/mime.h
deleted file mode 100644
index cf56b780d..000000000
--- a/mime.h
+++ /dev/null
@@ -1,95 +0,0 @@
-#ifndef MIME_H
-#define MIME_H
-
-#include "fstring.h"
-#ifndef OWN_QUEUE_H
-#include <sys/queue.h>
-#else
-#include "queue.h"
-#endif
-
-/*
- * Header types. If we reach 31, we must group the headers we need to
- * remember at the beginning, or we should use fd_set bit sets.
- */
-#define HDR_APPARENTLY_TO 1
-#define HDR_BCC 2
-#define HDR_CC 3
-#define HDR_CONTENT_LENGTH 4
-#define HDR_CONTENT_TRANSFER_ENCODING 5
-#define HDR_CONTENT_TYPE 6
-#define HDR_DATE 7
-#define HDR_DELIVERED_TO 8
-#define HDR_ERRORS_TO 9
-#define HDR_FROM 10
-#define HDR_MESSAGE_ID 11
-#define HDR_RECEIVED 12
-#define HDR_REPLY_TO 13
-#define HDR_RESENT_BCC 14
-#define HDR_RESENT_CC 15
-#define HDR_RESENT_DATE 16
-#define HDR_RESENT_FROM 17
-#define HDR_RESENT_MESSAGE_ID 18
-#define HDR_RESENT_REPLY_TO 19
-#define HDR_RESENT_SENDER 20
-#define HDR_RESENT_TO 21
-#define HDR_RETURN_PATH 22
-#define HDR_RETURN_RECEIPT_TO 23
-#define HDR_SENDER 24
-#define HDR_TO 25
-#define HDR_MAIL_FOLLOWUP_TO 26
-#define HDR_CONTENT_DESCRIPTION 27
-#define HDR_CONTENT_DISPOSITION 28
-#define HDR_CONTENT_ID 29
-#define HDR_MIME_VERSION 30
-#define HDR_DISP_NOTIFICATION 31
-
-#define URL_A 1
-#define URL_IMG 2
-
-/*
- * Headers:
- * name - header name
- * value - decoded, translated to utf8 and normalized version
- * type - type of header in case of known headers
- */
-typedef struct mime_header_s {
- f_str_t *name;
- f_str_t *value;
- int type;
- LIST_ENTRY (mime_header_s) next;
-} mime_header_t;
-
-/*
- * Body part:
- * data - content of this part, translated to utf, decoded, normalized and deHTMLed
- * type - content-type of this part
- * encoding - original encoding of body part
- */
-typedef struct mime_body_part_s {
- f_str_t *data;
- f_str_t *type;
- f_str_t *encoding;
- LIST_ENTRY (mime_body_part_s) next;
-} mime_body_part_t;
-
-/*
- * Image and A urls:
- * url - normalized and decoded url
- * caption - decoded caption for this url (if any)
- * type - image or a references
- */
-typedef struct mime_url_s {
- f_str_t *url;
- f_str_t *caption;
- int type;
-} mime_url_t;
-
-typedef struct mime_ctx_s {
- LIST_HEAD (headersl, mime_header_s) headers;
- LIST_HEAD (bodypartsl, mime_body_part_s) parts;
- f_str_t *cur_content_type;
- f_str_t *cur_encoding;
-} mime_ctx_t;
-
-#endif
diff --git a/worker.c b/worker.c
index e5e7c341d..1b0682fb0 100644
--- a/worker.c
+++ b/worker.c
@@ -15,11 +15,22 @@
#include <fcntl.h>
#include <netdb.h>
+#include <glib.h>
+#include <gmime/gmime.h>
+
#include "util.h"
#include "main.h"
#include "upstream.h"
#include "cfg_file.h"
+#define CONTENT_LENGTH_HEADER "Content-Length:"
+
+const f_str_t CRLF = {
+ /* begin */"\r\n",
+ /* len */2,
+ /* size */2
+};
+
static
void sig_handler (int signo)
{
@@ -31,6 +42,239 @@ void sig_handler (int signo)
}
}
+static void
+sigusr_handler (int fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ /* Do not accept new connections, preparing to end worker's process */
+ struct timeval tv;
+ tv.tv_sec = SOFT_SHUTDOWN_TIME;
+ tv.tv_usec = 0;
+ event_del (&worker->sig_ev);
+ event_del (&worker->bind_ev);
+ msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+ event_loopexit (&tv);
+ return;
+}
+
+static void
+mime_foreach_callback (GMimeObject *part, gpointer user_data)
+{
+ int *count = user_data;
+
+ (*count)++;
+
+ /* 'part' points to the current part node that g_mime_message_foreach_part() is iterating over */
+
+ /* find out what class 'part' is... */
+ if (GMIME_IS_MESSAGE_PART (part)) {
+ /* message/rfc822 or message/news */
+ GMimeMessage *message;
+
+ /* g_mime_message_foreach_part() won't descend into
+ child message parts, so if we want to count any
+ subparts of this child message, we'll have to call
+ g_mime_message_foreach_part() again here. */
+
+ message = g_mime_message_part_get_message ((GMimeMessagePart *) part);
+ g_mime_message_foreach_part (message, mime_foreach_callback, count);
+ g_object_unref (message);
+ } else if (GMIME_IS_MESSAGE_PARTIAL (part)) {
+ /* message/partial */
+
+ /* this is an incomplete message part, probably a
+ large message that the sender has broken into
+ smaller parts and is sending us bit by bit. we
+ could save some info about it so that we could
+ piece this back together again once we get all the
+ parts? */
+ } else if (GMIME_IS_MULTIPART (part)) {
+ /* multipart/mixed, multipart/alternative, multipart/related, multipart/signed, multipart/encrypted, etc... */
+
+ /* we'll get to finding out if this is a signed/encrypted multipart later... */
+ } else if (GMIME_IS_PART (part)) {
+ /* a normal leaf part, could be text/plain or image/jpeg etc */
+ } else {
+ g_assert_not_reached ();
+ }
+}
+
+
+static void
+process_message (f_str_t *msg)
+{
+ int count = 0;
+ GMimeMessage *message;
+ GMimeParser *parser;
+ GMimeStream *stream;
+
+ stream = g_mime_stream_mem_new_with_buffer (msg->begin, msg->len);
+ /* create a new parser object to parse the stream */
+ parser = g_mime_parser_new_with_stream (stream);
+ /* create a new parser object to parse the stream */
+ parser = g_mime_parser_new_with_stream (stream);
+
+ /* unref the stream (parser owns a ref, so this object does not actually get free'd until we destroy the parser) */
+ g_object_unref (stream);
+
+ /* parse the message from the stream */
+ message = g_mime_parser_construct_message (parser);
+
+ /* free the parser (and the stream) */
+ g_object_unref (parser);
+
+ g_mime_message_foreach_part (message, mime_foreach_callback, &count);
+
+ msg_info ("process_message: found %d parts in message", count);
+}
+
+static void
+read_socket (struct bufferevent *bev, void *arg)
+{
+ struct worker_task *task = (struct worker_task *)arg;
+ ssize_t r;
+ char *s;
+
+ switch (task->state) {
+ case READ_COMMAND:
+ s = evbuffer_readline (EVBUFFER_INPUT (bev));
+ if (s != NULL) {
+ msg_info ("read_socket: got command %s", s);
+ free (s);
+ task->state = READ_HEADER;
+ }
+ break;
+ case READ_HEADER:
+ s = evbuffer_readline (EVBUFFER_INPUT (bev));
+ if (s != NULL) {
+ msg_info ("read_socket: got header %s", s);
+ if (strncasecmp (s, CONTENT_LENGTH_HEADER, sizeof (CONTENT_LENGTH_HEADER) - 1) == 0) {
+ task->content_length = atoi (s + sizeof (CONTENT_LENGTH_HEADER));
+ msg_info ("read_socket: parsed content-length: %ld", (long int)task->content_length);
+ task->msg = malloc (sizeof (f_str_buf_t));
+ if (task->msg == NULL) {
+ msg_err ("read_socket: cannot allocate memory");
+ bufferevent_disable (bev, EV_READ);
+ bufferevent_free (bev);
+ free (task);
+ }
+ task->msg->buf = fstralloc (task->content_length);
+ if (task->msg->buf == NULL) {
+ msg_err ("read_socket: cannot allocate memory for message buffer");
+ bufferevent_disable (bev, EV_READ);
+ bufferevent_free (bev);
+ free (task->msg);
+ free (task);
+ }
+ task->msg->pos = task->msg->buf->begin;
+ update_buf_size (task->msg);
+ }
+ else if (strlen (s) == 0) {
+ if (task->content_length != 0) {
+ task->state = READ_MESSAGE;
+ }
+ else {
+ task->state = WRITE_ERROR;
+ }
+ }
+ free (s);
+ }
+ break;
+ case READ_MESSAGE:
+ r = bufferevent_read (bev, task->msg->pos, task->msg->free);
+ if (r > 0) {
+ task->msg->pos += r;
+ update_buf_size (task->msg);
+ if (task->msg->free == 0) {
+ process_message (task->msg->buf);
+ task->state = WRITE_REPLY;
+ }
+ }
+ else {
+ msg_err ("read_socket: cannot read data to buffer: %ld", (long int)r);
+ bufferevent_disable (bev, EV_READ);
+ bufferevent_free (bev);
+ fstrfree (task->msg->buf);
+ free (task->msg);
+ free (task);
+ }
+ break;
+ case WRITE_REPLY:
+ r = bufferevent_write (bev, "Ok\r\n", sizeof ("Ok\r\n") - 1);
+ bufferevent_disable (bev, EV_READ);
+ bufferevent_enable (bev, EV_WRITE);
+ break;
+ case WRITE_ERROR:
+ r = bufferevent_write (bev, "Error\r\n", sizeof ("Error\r\n") - 1);
+ bufferevent_disable (bev, EV_READ);
+ bufferevent_enable (bev, EV_WRITE);
+ break;
+ }
+}
+
+static void
+write_socket (struct bufferevent *bev, void *arg)
+{
+ struct worker_task *task = (struct worker_task *)arg;
+
+ if (task->state > READ_MESSAGE) {
+ msg_info ("closing connection");
+ /* Free buffers */
+ fstrfree (task->msg->buf);
+ free (task->msg);
+ bufferevent_disable (bev, EV_WRITE);
+ bufferevent_free (bev);
+
+ free (task);
+ }
+}
+
+static void
+err_socket (struct bufferevent *bev, short what, void *arg)
+{
+ struct worker_task *task = (struct worker_task *)arg;
+ msg_info ("closing connection");
+ /* Free buffers */
+ if (task->state > READ_HEADER) {
+ fstrfree (task->msg->buf);
+ free (task->msg);
+ }
+ bufferevent_disable (bev, EV_READ);
+ bufferevent_free (bev);
+
+ free (task);
+}
+
+static void
+accept_socket (int fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct sockaddr_storage ss;
+ struct worker_task *new_task;
+ socklen_t addrlen = sizeof(ss);
+ int nfd;
+
+ if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
+ return;
+ }
+ if (event_make_socket_nonblocking(fd) < 0) {
+ return;
+ }
+
+ new_task = malloc (sizeof (struct worker_task));
+ if (new_task == NULL) {
+ msg_err ("accept_socket: cannot allocate memory for task");
+ return;
+ }
+ new_task->worker = worker;
+ new_task->state = READ_COMMAND;
+ new_task->content_length = 0;
+
+ /* Read event */
+ new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task);
+ bufferevent_enable (new_task->bev, EV_READ);
+}
+
void
start_worker (struct rspamd_worker *worker, int listen_sock)
{
@@ -38,13 +282,24 @@ start_worker (struct rspamd_worker *worker, int listen_sock)
struct config_file *cfg = worker->srv->cfg;
worker->srv->pid = getpid ();
worker->srv->type = TYPE_WORKER;
+ event_init ();
+ g_mime_init (0);
init_signals (&signals, sig_handler);
sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+ /* SIGUSR2 handler */
+ signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
+ signal_add (&worker->sig_ev, NULL);
+
+ /* Accept event */
+ event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_add(&worker->bind_ev, NULL);
+
/* Send SIGUSR2 to parent */
kill (getppid (), SIGUSR2);
+ event_loop (0);
}
/*