aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2008-10-23 19:10:40 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2008-10-23 19:10:40 +0400
commite5f01249a3498ab47c7c7852f83564d466629a8b (patch)
tree03f4a6f4a1dc52ca29e8a9e3aff6f9048c3cf9a3
parent9d6f80f8a3fbdc7d8079f6a60d936532098e27a4 (diff)
downloadrspamd-e5f01249a3498ab47c7c7852f83564d466629a8b.tar.gz
rspamd-e5f01249a3498ab47c7c7852f83564d466629a8b.zip
* Add initial implementation of control interface
-rw-r--r--cfg_file.h12
-rw-r--r--cfg_file.l2
-rw-r--r--cfg_file.y35
-rw-r--r--cfg_utils.c68
-rwxr-xr-xconfigure4
-rw-r--r--controller.c351
-rw-r--r--main.c17
-rw-r--r--main.h14
-rw-r--r--worker.c1
9 files changed, 478 insertions, 26 deletions
diff --git a/cfg_file.h b/cfg_file.h
index e67479318..d664a15f0 100644
--- a/cfg_file.h
+++ b/cfg_file.h
@@ -23,6 +23,7 @@
#include "filter.h"
#define DEFAULT_BIND_PORT 768
+#define DEFAULT_CONTROL_PORT 7608
#define MAX_MEMCACHED_SERVERS 48
#define DEFAULT_MEMCACHED_PORT 11211
/* Memcached timeouts */
@@ -96,7 +97,14 @@ struct config_file {
uint16_t bind_port;
uint16_t bind_family;
- char no_fork;
+ char *control_host;
+ struct in_addr control_addr;
+ uint16_t control_port;
+ uint16_t control_family;
+ int controller_enabled;
+ char *control_password;
+
+ int no_fork;
unsigned int workers_number;
struct memcached_server memcached_servers[MAX_MEMCACHED_SERVERS];
@@ -125,7 +133,7 @@ struct config_file {
};
int add_memcached_server (struct config_file *cf, char *str);
-int parse_bind_line (struct config_file *cf, char *str);
+int parse_bind_line (struct config_file *cf, char *str, char is_control);
void init_defaults (struct config_file *cfg);
void free_config (struct config_file *cfg);
char* get_module_opt (struct config_file *cfg, char *module_name, char *opt_name);
diff --git a/cfg_file.l b/cfg_file.l
index 198f65792..e52f2585b 100644
--- a/cfg_file.l
+++ b/cfg_file.l
@@ -47,6 +47,8 @@ metric return METRIC;
name return NAME;
required_score return REQUIRED_SCORE;
function return FUNCTION;
+control return CONTROL;
+password return PASSWORD;
\{ return OBRACE;
\} return EBRACE;
diff --git a/cfg_file.y b/cfg_file.y
index c5badb437..1ccb928b7 100644
--- a/cfg_file.y
+++ b/cfg_file.y
@@ -49,7 +49,7 @@ struct metric *cur_metric = NULL;
%token MEMCACHED WORKERS REQUIRE MODULE
%token MODULE_OPT PARAM VARIABLE
%token HEADER_FILTERS MIME_FILTERS MESSAGE_FILTERS URL_FILTERS FACTORS METRIC NAME
-%token REQUIRED_SCORE FUNCTION FRACT COMPOSITES
+%token REQUIRED_SCORE FUNCTION FRACT COMPOSITES CONTROL PASSWORD
%type <string> STRING
%type <string> VARIABLE
@@ -73,6 +73,7 @@ file : /* empty */
command :
bindsock
+ | control
| tempdir
| pidfile
| memcached
@@ -112,9 +113,39 @@ pidfile :
}
;
+control:
+ CONTROL OBRACE controlbody EBRACE
+ ;
+
+controlbody:
+ controlcmd SEMICOLON
+ | controlbody controlcmd SEMICOLON
+ ;
+
+controlcmd:
+ controlsock
+ | controlpassword
+ ;
+
+controlsock:
+ BINDSOCK EQSIGN bind_cred {
+ if (!parse_bind_line (cfg, $3, 1)) {
+ yyerror ("yyparse: parse_bind_line");
+ YYERROR;
+ }
+ cfg->controller_enabled = 1;
+ free ($3);
+ }
+ ;
+controlpassword:
+ PASSWORD EQSIGN QUOTEDSTRING {
+ cfg->control_password = memory_pool_strdup (cfg->cfg_pool, $3);
+ }
+ ;
+
bindsock:
BINDSOCK EQSIGN bind_cred {
- if (!parse_bind_line (cfg, $3)) {
+ if (!parse_bind_line (cfg, $3, 0)) {
yyerror ("yyparse: parse_bind_line");
YYERROR;
}
diff --git a/cfg_utils.c b/cfg_utils.c
index 430c572e7..45442fe62 100644
--- a/cfg_utils.c
+++ b/cfg_utils.c
@@ -74,7 +74,7 @@ add_memcached_server (struct config_file *cf, char *str)
}
int
-parse_bind_line (struct config_file *cf, char *str)
+parse_bind_line (struct config_file *cf, char *str, char is_control)
{
char *cur_tok, *err_str;
struct hostent *hent;
@@ -84,35 +84,69 @@ parse_bind_line (struct config_file *cf, char *str)
cur_tok = strsep (&str, ":");
if (cur_tok[0] == '/' || cur_tok[0] == '.') {
- cf->bind_host = strdup (cur_tok);
- cf->bind_family = AF_UNIX;
+ if (is_control) {
+ cf->control_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ cf->control_family = AF_UNIX;
+ }
+ else {
+ cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ cf->bind_family = AF_UNIX;
+ }
return 1;
} else {
if (str == '\0') {
- cf->bind_port = DEFAULT_BIND_PORT;
+ if (is_control) {
+ cf->control_port = DEFAULT_CONTROL_PORT;
+ }
+ else {
+ cf->bind_port = DEFAULT_BIND_PORT;
+ }
}
else {
- cf->bind_port = (uint16_t)strtoul (str, &err_str, 10);
+ if (is_control) {
+ cf->control_port = (uint16_t)strtoul (str, &err_str, 10);
+ }
+ else {
+ cf->bind_port = (uint16_t)strtoul (str, &err_str, 10);
+ }
if (*err_str != '\0') {
return 0;
}
}
-
- if (!inet_aton (cur_tok, &cf->bind_addr)) {
- /* Try to call gethostbyname */
- hent = gethostbyname (cur_tok);
- if (hent == NULL) {
- return 0;
- }
- else {
- cf->bind_host = strdup (cur_tok);
- memcpy((char *)&cf->bind_addr, hent->h_addr, sizeof(struct in_addr));
- s = strlen (cur_tok) + 1;
+
+ if (is_control) {
+ if (!inet_aton (cur_tok, &cf->control_addr)) {
+ /* Try to call gethostbyname */
+ hent = gethostbyname (cur_tok);
+ if (hent == NULL) {
+ return 0;
+ }
+ else {
+ cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ memcpy((char *)&cf->control_addr, hent->h_addr, sizeof(struct in_addr));
+ s = strlen (cur_tok) + 1;
+ }
}
+
+ cf->control_family = AF_INET;
}
+ else {
+ if (!inet_aton (cur_tok, &cf->bind_addr)) {
+ /* Try to call gethostbyname */
+ hent = gethostbyname (cur_tok);
+ if (hent == NULL) {
+ return 0;
+ }
+ else {
+ cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ memcpy((char *)&cf->bind_addr, hent->h_addr, sizeof(struct in_addr));
+ s = strlen (cur_tok) + 1;
+ }
+ }
- cf->bind_family = AF_INET;
+ cf->bind_family = AF_INET;
+ }
return 1;
}
diff --git a/configure b/configure
index f255de36d..bdaffc1c5 100755
--- a/configure
+++ b/configure
@@ -21,7 +21,7 @@ YACC_OUTPUT="cfg_yacc.c"
LEX_OUTPUT="cfg_lex.c"
CONFIG="config.h"
-SOURCES="upstream.c cfg_utils.c memcached.c main.c util.c worker.c fstring.c url.c perl.c protocol.c mem_pool.c filter.c plugins/regexp.c plugins/surbl.c ${LEX_OUTPUT} ${YACC_OUTPUT}"
+SOURCES="upstream.c cfg_utils.c memcached.c main.c util.c controller.c worker.c fstring.c url.c perl.c protocol.c mem_pool.c filter.c plugins/regexp.c plugins/surbl.c ${LEX_OUTPUT} ${YACC_OUTPUT}"
MODULES="surbl regexp"
CFLAGS="$CFLAGS -W -Wpointer-arith -Wno-unused-parameter"
@@ -576,7 +576,7 @@ SOURCES=$SOURCES
# ${EXEC} objects
OBJECTS=$OBJECTS
# Version of product
-VERION=$VERSION
+VERSION=$VERSION
# Detected operation system
OS=$OS
# Lex and yacc executables
diff --git a/controller.c b/controller.c
new file mode 100644
index 000000000..0af9d1d2e
--- /dev/null
+++ b/controller.c
@@ -0,0 +1,351 @@
+#include <sys/stat.h>
+#include <sys/param.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <errno.h>
+#include <signal.h>
+
+#include <netinet/in.h>
+#include <syslog.h>
+#include <fcntl.h>
+#include <netdb.h>
+
+#include <glib.h>
+
+#include "util.h"
+#include "main.h"
+#include "protocol.h"
+#include "upstream.h"
+#include "cfg_file.h"
+#include "modules.h"
+
+#define CRLF "\r\n"
+
+enum command_type {
+ COMMAND_PASSWORD,
+ COMMAND_QUIT,
+ COMMAND_RELOAD,
+ COMMAND_STAT,
+ COMMAND_SHUTDOWN,
+ COMMAND_UPTIME,
+};
+
+struct controller_command {
+ char *command;
+ int privilleged;
+ enum command_type type;
+};
+
+static struct controller_command commands[] = {
+ {"password", 0, COMMAND_PASSWORD},
+ {"quit", 0, COMMAND_QUIT},
+ {"reload", 1, COMMAND_RELOAD},
+ {"stat", 0, COMMAND_STAT},
+ {"shutdown", 1, COMMAND_SHUTDOWN},
+ {"uptime", 0, COMMAND_UPTIME},
+};
+
+static GCompletion *comp;
+static time_t start_time;
+
+static
+void sig_handler (int signo)
+{
+ switch (signo) {
+ case SIGINT:
+ case SIGTERM:
+ _exit (1);
+ break;
+ }
+}
+
+static void
+sigusr_handler (int fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ /* Do not accept new connections, preparing to end worker's process */
+ struct timeval tv;
+ tv.tv_sec = SOFT_SHUTDOWN_TIME;
+ tv.tv_usec = 0;
+ event_del (&worker->sig_ev);
+ event_del (&worker->bind_ev);
+ msg_info ("controller's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+ event_loopexit (&tv);
+ return;
+}
+
+static gchar*
+completion_func (gpointer elem)
+{
+ struct controller_command *cmd = (struct controller_command *)elem;
+
+ return cmd->command;
+}
+
+static void
+free_session (struct controller_session *session)
+{
+ bufferevent_disable (session->bev, EV_READ | EV_WRITE);
+ memory_pool_delete (session->session_pool);
+ g_free (session);
+}
+
+static int
+check_auth (struct controller_command *cmd, struct controller_session *session)
+{
+ char out_buf[128];
+ int r;
+
+ if (cmd->privilleged && !session->authorized) {
+ r = snprintf (out_buf, sizeof (out_buf), "not authorized" CRLF);
+ bufferevent_write (session->bev, out_buf, r);
+ return 0;
+ }
+
+ return 1;
+}
+
+static void
+process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session)
+{
+ char out_buf[512], *arg;
+ int r = 0, days, hours, minutes;
+ time_t uptime;
+
+ switch (cmd->type) {
+ case COMMAND_PASSWORD:
+ arg = *cmd_args;
+ if (*arg == '\0') {
+ msg_debug ("process_command: empty password passed");
+ r = snprintf (out_buf, sizeof (out_buf), "password command requires one argument" CRLF);
+ bufferevent_write (session->bev, out_buf, r);
+ return;
+ }
+ if (strncmp (arg, session->cfg->control_password, strlen (arg)) == 0) {
+ session->authorized = 1;
+ r = snprintf (out_buf, sizeof (out_buf), "password accepted" CRLF);
+ bufferevent_write (session->bev, out_buf, r);
+ }
+ else {
+ session->authorized = 0;
+ r = snprintf (out_buf, sizeof (out_buf), "password NOT accepted" CRLF);
+ bufferevent_write (session->bev, out_buf, r);
+ }
+ break;
+ case COMMAND_QUIT:
+ free_session (session);
+ break;
+ case COMMAND_RELOAD:
+ if (check_auth (cmd, session)) {
+ r = snprintf (out_buf, sizeof (out_buf), "reload request sent" CRLF);
+ bufferevent_write (session->bev, out_buf, r);
+ kill (getppid (), SIGHUP);
+ }
+ break;
+ case COMMAND_STAT:
+ /* XXX need to implement stat */
+ if (check_auth (cmd, session)) {
+ r = snprintf (out_buf, sizeof (out_buf), "-- end of stats report" CRLF);
+ bufferevent_write (session->bev, out_buf, r);
+ }
+ case COMMAND_SHUTDOWN:
+ if (check_auth (cmd, session)) {
+ r = snprintf (out_buf, sizeof (out_buf), "shutdown request sent" CRLF);
+ bufferevent_write (session->bev, out_buf, r);
+ kill (getppid (), SIGTERM);
+ }
+ break;
+ case COMMAND_UPTIME:
+ if (check_auth (cmd, session)) {
+ uptime = time (NULL) - start_time;
+ /* If uptime more than 2 hours, print as a number of days. */
+ if (uptime >= 2 * 3600) {
+ days = uptime / 86400;
+ hours = (uptime % 3600) / 60;
+ minutes = (uptime % 60) / 60;
+ r = snprintf (out_buf, sizeof (out_buf), "%dday%s %dhour%s %dminute%s" CRLF,
+ days, days > 1 ? "s" : " ",
+ hours, hours > 1 ? "s" : " ",
+ minutes, minutes > 1 ? "s" : " ");
+ }
+ /* If uptime is less than 1 minute print only seconds */
+ else if (uptime / 60 == 0) {
+ r = snprintf (out_buf, sizeof (out_buf), "%dsecond%s", uptime, uptime > 1 ? "s" : " ");
+ }
+ /* Else print the minutes and seconds. */
+ else {
+ hours = uptime / 3600;
+ minutes = (uptime % 60) / 60;
+ r = snprintf (out_buf, sizeof (out_buf), "%dhour%s %dminite%s %dsecond%s",
+ hours, hours > 1 ? "s" : " ",
+ minutes, minutes > 1 ? "s" : " ",
+ (int)uptime, uptime > 1 ? "s" : " ");
+ }
+
+ }
+ bufferevent_write (session->bev, out_buf, r);
+ break;
+ }
+}
+
+static void
+read_socket (struct bufferevent *bev, void *arg)
+{
+ struct controller_session *session = (struct controller_session *)arg;
+ int len, i;
+ char *s, **params, *cmd, out_buf[128];
+ GList *comp_list;
+
+ s = evbuffer_readline (EVBUFFER_INPUT (bev));
+ if (s != NULL && *s != 0) {
+ len = strlen (s);
+ /* Remove end of line characters from string */
+ if (s[len - 1] == '\n') {
+ if (s[len - 2] == '\r') {
+ s[len - 2] = 0;
+ }
+ s[len - 1] = 0;
+ }
+ params = g_strsplit (s, " ", -1);
+ len = g_strv_length (params);
+ if (len > 0) {
+ cmd = g_strstrip (params[0]);
+ comp_list = g_completion_complete (comp, cmd, NULL);
+ switch (g_list_length (comp_list)) {
+ case 1:
+ process_command ((struct controller_command *)comp_list->data, &params[1], session);
+ break;
+ case 0:
+ i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
+ bufferevent_write (bev, out_buf, i);
+ break;
+ default:
+ i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF);
+ bufferevent_write (bev, out_buf, i);
+ break;
+ }
+ }
+ g_strfreev (params);
+ }
+ if (s != NULL) {
+ free (s);
+ }
+}
+
+static void
+write_socket (struct bufferevent *bev, void *arg)
+{
+ char buf[1024], hostbuf[256];
+
+ gethostname (hostbuf, sizeof (hostbuf - 1));
+ hostbuf[sizeof (hostbuf) - 1] = '\0';
+ snprintf (buf, sizeof (buf), "Rspamd version %s is running on %s" CRLF, RVERSION, hostbuf);
+ bufferevent_disable (bev, EV_WRITE);
+ bufferevent_enable (bev, EV_READ);
+}
+
+static void
+err_socket (struct bufferevent *bev, short what, void *arg)
+{
+ struct controller_session *session = (struct controller_session *)arg;
+ msg_info ("closing connection");
+ /* Free buffers */
+ free_session (session);
+}
+
+static void
+accept_socket (int fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct sockaddr_storage ss;
+ struct controller_session *new_session;
+ socklen_t addrlen = sizeof(ss);
+ int nfd;
+
+ if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
+ return;
+ }
+ if (event_make_socket_nonblocking(fd) < 0) {
+ return;
+ }
+
+ new_session = g_malloc (sizeof (struct controller_session));
+ if (new_session == NULL) {
+ msg_err ("accept_socket: cannot allocate memory for task, %m");
+ return;
+ }
+ bzero (new_session, sizeof (struct controller_session));
+ new_session->worker = worker;
+ new_session->cfg = worker->srv->cfg;
+#ifdef HAVE_GETPAGESIZE
+ new_session->session_pool = memory_pool_new (getpagesize () - 1);
+#else
+ new_session->session_pool = memory_pool_new (4095);
+#endif
+ memory_pool_add_destructor (new_session->session_pool, (pool_destruct_func)bufferevent_free, new_session->bev);
+
+ /* Read event */
+ new_session->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_session);
+ bufferevent_enable (new_session->bev, EV_WRITE);
+}
+
+void
+start_controller (struct rspamd_worker *worker)
+{
+ struct sigaction signals;
+ int listen_sock, i;
+ struct sockaddr_un *un_addr;
+ GList *comp_list = NULL;
+
+ worker->srv->pid = getpid ();
+ worker->srv->type = TYPE_CONTROLLER;
+ event_init ();
+ g_mime_init (0);
+
+ init_signals (&signals, sig_handler);
+ sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+
+ /* SIGUSR2 handler */
+ signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
+ signal_add (&worker->sig_ev, NULL);
+
+ if (worker->srv->cfg->control_family == AF_INET) {
+ if ((listen_sock = make_socket (worker->srv->cfg->control_host, worker->srv->cfg->control_port)) == -1) {
+ msg_err ("start_controller: cannot create tcp listen socket. %m");
+ exit(-errno);
+ }
+ }
+ else {
+ un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un));
+ if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->bind_host, un_addr)) == -1) {
+ msg_err ("start_controller: cannot create unix listen socket. %m");
+ exit(-errno);
+ }
+ }
+
+ start_time = time (NULL);
+
+ /* Init command completion */
+ for (i = 0; i < sizeof (commands) / sizeof (commands[0]) - 1; i ++) {
+ comp_list = g_list_prepend (comp_list, &commands[i]);
+ }
+ comp = g_completion_new (completion_func);
+ g_completion_add_items (comp, comp_list);
+ /* Accept event */
+ event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_add(&worker->bind_ev, NULL);
+
+ /* Send SIGUSR2 to parent */
+ kill (getppid (), SIGUSR2);
+
+ event_loop (0);
+}
+
+
+/*
+ * vi:ts=4
+ */
diff --git a/main.c b/main.c
index 5c9fe7909..d7ecb418b 100644
--- a/main.c
+++ b/main.c
@@ -128,6 +128,13 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum pro
case 0:
/* TODO: add worker code */
switch (type) {
+ case TYPE_CONTROLLER:
+ setproctitle ("controller process");
+ pidfile_close (rspamd->pfh);
+ msg_info ("fork_worker: starting controller process %d", getpid ());
+ cur->type = TYPE_CONTROLLER;
+ start_controller (cur);
+ break;
case TYPE_WORKER:
default:
setproctitle ("worker process");
@@ -281,7 +288,10 @@ main (int argc, char **argv)
for (i = 0; i < cfg->workers_number; i++) {
fork_worker (rspamd, listen_sock, 0, TYPE_WORKER);
}
-
+ /* Start controller if enabled */
+ if (cfg->controller_enabled) {
+ fork_worker (rspamd, listen_sock, 0, TYPE_CONTROLLER);
+ }
/* Signal processing cycle */
for (;;) {
@@ -305,6 +315,11 @@ main (int argc, char **argv)
active_worker = NULL;
}
TAILQ_REMOVE(&rspamd->workers, cur, next);
+ if (cur->type == TYPE_CONTROLLER) {
+ msg_info ("main: do not restart dead controller");
+ g_free (cur);
+ break;
+ }
if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
/* Normal worker termination, do not fork one more */
msg_info ("main: worker process %d terminated normally", cur->pid);
diff --git a/main.h b/main.h
index 1902f2d84..de8969de8 100644
--- a/main.h
+++ b/main.h
@@ -46,6 +46,7 @@
enum process_type {
TYPE_MAIN,
TYPE_WORKER,
+ TYPE_CONTROLLER,
};
/* Filter type */
@@ -105,6 +106,17 @@ struct save_point {
unsigned int saved;
};
+/* Control session */
+struct controller_session {
+ struct rspamd_worker *worker;
+ /* Access to authorized commands */
+ int authorized;
+ memory_pool_t *session_pool;
+ struct bufferevent *bev;
+ struct config_file *cfg;
+};
+
+/* Worker task structure */
struct worker_task {
struct rspamd_worker *worker;
enum {
@@ -138,7 +150,6 @@ struct worker_task {
TAILQ_HEAD (uriq, uri) urls;
/* Hash of metric result structures */
GHashTable *results;
- /* Config file to write to */
struct config_file *cfg;
/* Save point for filters deferred processing */
struct save_point save;
@@ -163,6 +174,7 @@ struct c_module {
};
void start_worker (struct rspamd_worker *worker, int listen_sock);
+void start_controller (struct rspamd_worker *worker);
#endif
diff --git a/worker.c b/worker.c
index 8e09a84d3..089f1ecfa 100644
--- a/worker.c
+++ b/worker.c
@@ -1,4 +1,3 @@
-
#include <sys/stat.h>
#include <sys/param.h>
#include <sys/types.h>