From: Vsevolod Stakhov Date: Thu, 23 Oct 2008 15:10:40 +0000 (+0400) Subject: * Add initial implementation of control interface X-Git-Tag: 0.2.7~355 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=e5f01249a3498ab47c7c7852f83564d466629a8b;p=rspamd.git * Add initial implementation of control interface --- diff --git a/cfg_file.h b/cfg_file.h index e67479318..d664a15f0 100644 --- a/cfg_file.h +++ b/cfg_file.h @@ -23,6 +23,7 @@ #include "filter.h" #define DEFAULT_BIND_PORT 768 +#define DEFAULT_CONTROL_PORT 7608 #define MAX_MEMCACHED_SERVERS 48 #define DEFAULT_MEMCACHED_PORT 11211 /* Memcached timeouts */ @@ -96,7 +97,14 @@ struct config_file { uint16_t bind_port; uint16_t bind_family; - char no_fork; + char *control_host; + struct in_addr control_addr; + uint16_t control_port; + uint16_t control_family; + int controller_enabled; + char *control_password; + + int no_fork; unsigned int workers_number; struct memcached_server memcached_servers[MAX_MEMCACHED_SERVERS]; @@ -125,7 +133,7 @@ struct config_file { }; int add_memcached_server (struct config_file *cf, char *str); -int parse_bind_line (struct config_file *cf, char *str); +int parse_bind_line (struct config_file *cf, char *str, char is_control); void init_defaults (struct config_file *cfg); void free_config (struct config_file *cfg); char* get_module_opt (struct config_file *cfg, char *module_name, char *opt_name); diff --git a/cfg_file.l b/cfg_file.l index 198f65792..e52f2585b 100644 --- a/cfg_file.l +++ b/cfg_file.l @@ -47,6 +47,8 @@ metric return METRIC; name return NAME; required_score return REQUIRED_SCORE; function return FUNCTION; +control return CONTROL; +password return PASSWORD; \{ return OBRACE; \} return EBRACE; diff --git a/cfg_file.y b/cfg_file.y index c5badb437..1ccb928b7 100644 --- a/cfg_file.y +++ b/cfg_file.y @@ -49,7 +49,7 @@ struct metric *cur_metric = NULL; %token MEMCACHED WORKERS REQUIRE MODULE %token MODULE_OPT PARAM VARIABLE %token HEADER_FILTERS MIME_FILTERS MESSAGE_FILTERS URL_FILTERS FACTORS METRIC NAME -%token REQUIRED_SCORE FUNCTION FRACT COMPOSITES +%token REQUIRED_SCORE FUNCTION FRACT COMPOSITES CONTROL PASSWORD %type STRING %type VARIABLE @@ -73,6 +73,7 @@ file : /* empty */ command : bindsock + | control | tempdir | pidfile | memcached @@ -112,9 +113,39 @@ pidfile : } ; +control: + CONTROL OBRACE controlbody EBRACE + ; + +controlbody: + controlcmd SEMICOLON + | controlbody controlcmd SEMICOLON + ; + +controlcmd: + controlsock + | controlpassword + ; + +controlsock: + BINDSOCK EQSIGN bind_cred { + if (!parse_bind_line (cfg, $3, 1)) { + yyerror ("yyparse: parse_bind_line"); + YYERROR; + } + cfg->controller_enabled = 1; + free ($3); + } + ; +controlpassword: + PASSWORD EQSIGN QUOTEDSTRING { + cfg->control_password = memory_pool_strdup (cfg->cfg_pool, $3); + } + ; + bindsock: BINDSOCK EQSIGN bind_cred { - if (!parse_bind_line (cfg, $3)) { + if (!parse_bind_line (cfg, $3, 0)) { yyerror ("yyparse: parse_bind_line"); YYERROR; } diff --git a/cfg_utils.c b/cfg_utils.c index 430c572e7..45442fe62 100644 --- a/cfg_utils.c +++ b/cfg_utils.c @@ -74,7 +74,7 @@ add_memcached_server (struct config_file *cf, char *str) } int -parse_bind_line (struct config_file *cf, char *str) +parse_bind_line (struct config_file *cf, char *str, char is_control) { char *cur_tok, *err_str; struct hostent *hent; @@ -84,35 +84,69 @@ parse_bind_line (struct config_file *cf, char *str) cur_tok = strsep (&str, ":"); if (cur_tok[0] == '/' || cur_tok[0] == '.') { - cf->bind_host = strdup (cur_tok); - cf->bind_family = AF_UNIX; + if (is_control) { + cf->control_host = memory_pool_strdup (cf->cfg_pool, cur_tok); + cf->control_family = AF_UNIX; + } + else { + cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok); + cf->bind_family = AF_UNIX; + } return 1; } else { if (str == '\0') { - cf->bind_port = DEFAULT_BIND_PORT; + if (is_control) { + cf->control_port = DEFAULT_CONTROL_PORT; + } + else { + cf->bind_port = DEFAULT_BIND_PORT; + } } else { - cf->bind_port = (uint16_t)strtoul (str, &err_str, 10); + if (is_control) { + cf->control_port = (uint16_t)strtoul (str, &err_str, 10); + } + else { + cf->bind_port = (uint16_t)strtoul (str, &err_str, 10); + } if (*err_str != '\0') { return 0; } } - - if (!inet_aton (cur_tok, &cf->bind_addr)) { - /* Try to call gethostbyname */ - hent = gethostbyname (cur_tok); - if (hent == NULL) { - return 0; - } - else { - cf->bind_host = strdup (cur_tok); - memcpy((char *)&cf->bind_addr, hent->h_addr, sizeof(struct in_addr)); - s = strlen (cur_tok) + 1; + + if (is_control) { + if (!inet_aton (cur_tok, &cf->control_addr)) { + /* Try to call gethostbyname */ + hent = gethostbyname (cur_tok); + if (hent == NULL) { + return 0; + } + else { + cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok); + memcpy((char *)&cf->control_addr, hent->h_addr, sizeof(struct in_addr)); + s = strlen (cur_tok) + 1; + } } + + cf->control_family = AF_INET; } + else { + if (!inet_aton (cur_tok, &cf->bind_addr)) { + /* Try to call gethostbyname */ + hent = gethostbyname (cur_tok); + if (hent == NULL) { + return 0; + } + else { + cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok); + memcpy((char *)&cf->bind_addr, hent->h_addr, sizeof(struct in_addr)); + s = strlen (cur_tok) + 1; + } + } - cf->bind_family = AF_INET; + cf->bind_family = AF_INET; + } return 1; } diff --git a/configure b/configure index f255de36d..bdaffc1c5 100755 --- a/configure +++ b/configure @@ -21,7 +21,7 @@ YACC_OUTPUT="cfg_yacc.c" LEX_OUTPUT="cfg_lex.c" CONFIG="config.h" -SOURCES="upstream.c cfg_utils.c memcached.c main.c util.c worker.c fstring.c url.c perl.c protocol.c mem_pool.c filter.c plugins/regexp.c plugins/surbl.c ${LEX_OUTPUT} ${YACC_OUTPUT}" +SOURCES="upstream.c cfg_utils.c memcached.c main.c util.c controller.c worker.c fstring.c url.c perl.c protocol.c mem_pool.c filter.c plugins/regexp.c plugins/surbl.c ${LEX_OUTPUT} ${YACC_OUTPUT}" MODULES="surbl regexp" CFLAGS="$CFLAGS -W -Wpointer-arith -Wno-unused-parameter" @@ -576,7 +576,7 @@ SOURCES=$SOURCES # ${EXEC} objects OBJECTS=$OBJECTS # Version of product -VERION=$VERSION +VERSION=$VERSION # Detected operation system OS=$OS # Lex and yacc executables diff --git a/controller.c b/controller.c new file mode 100644 index 000000000..0af9d1d2e --- /dev/null +++ b/controller.c @@ -0,0 +1,351 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include "util.h" +#include "main.h" +#include "protocol.h" +#include "upstream.h" +#include "cfg_file.h" +#include "modules.h" + +#define CRLF "\r\n" + +enum command_type { + COMMAND_PASSWORD, + COMMAND_QUIT, + COMMAND_RELOAD, + COMMAND_STAT, + COMMAND_SHUTDOWN, + COMMAND_UPTIME, +}; + +struct controller_command { + char *command; + int privilleged; + enum command_type type; +}; + +static struct controller_command commands[] = { + {"password", 0, COMMAND_PASSWORD}, + {"quit", 0, COMMAND_QUIT}, + {"reload", 1, COMMAND_RELOAD}, + {"stat", 0, COMMAND_STAT}, + {"shutdown", 1, COMMAND_SHUTDOWN}, + {"uptime", 0, COMMAND_UPTIME}, +}; + +static GCompletion *comp; +static time_t start_time; + +static +void sig_handler (int signo) +{ + switch (signo) { + case SIGINT: + case SIGTERM: + _exit (1); + break; + } +} + +static void +sigusr_handler (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + /* Do not accept new connections, preparing to end worker's process */ + struct timeval tv; + tv.tv_sec = SOFT_SHUTDOWN_TIME; + tv.tv_usec = 0; + event_del (&worker->sig_ev); + event_del (&worker->bind_ev); + msg_info ("controller's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + event_loopexit (&tv); + return; +} + +static gchar* +completion_func (gpointer elem) +{ + struct controller_command *cmd = (struct controller_command *)elem; + + return cmd->command; +} + +static void +free_session (struct controller_session *session) +{ + bufferevent_disable (session->bev, EV_READ | EV_WRITE); + memory_pool_delete (session->session_pool); + g_free (session); +} + +static int +check_auth (struct controller_command *cmd, struct controller_session *session) +{ + char out_buf[128]; + int r; + + if (cmd->privilleged && !session->authorized) { + r = snprintf (out_buf, sizeof (out_buf), "not authorized" CRLF); + bufferevent_write (session->bev, out_buf, r); + return 0; + } + + return 1; +} + +static void +process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session) +{ + char out_buf[512], *arg; + int r = 0, days, hours, minutes; + time_t uptime; + + switch (cmd->type) { + case COMMAND_PASSWORD: + arg = *cmd_args; + if (*arg == '\0') { + msg_debug ("process_command: empty password passed"); + r = snprintf (out_buf, sizeof (out_buf), "password command requires one argument" CRLF); + bufferevent_write (session->bev, out_buf, r); + return; + } + if (strncmp (arg, session->cfg->control_password, strlen (arg)) == 0) { + session->authorized = 1; + r = snprintf (out_buf, sizeof (out_buf), "password accepted" CRLF); + bufferevent_write (session->bev, out_buf, r); + } + else { + session->authorized = 0; + r = snprintf (out_buf, sizeof (out_buf), "password NOT accepted" CRLF); + bufferevent_write (session->bev, out_buf, r); + } + break; + case COMMAND_QUIT: + free_session (session); + break; + case COMMAND_RELOAD: + if (check_auth (cmd, session)) { + r = snprintf (out_buf, sizeof (out_buf), "reload request sent" CRLF); + bufferevent_write (session->bev, out_buf, r); + kill (getppid (), SIGHUP); + } + break; + case COMMAND_STAT: + /* XXX need to implement stat */ + if (check_auth (cmd, session)) { + r = snprintf (out_buf, sizeof (out_buf), "-- end of stats report" CRLF); + bufferevent_write (session->bev, out_buf, r); + } + case COMMAND_SHUTDOWN: + if (check_auth (cmd, session)) { + r = snprintf (out_buf, sizeof (out_buf), "shutdown request sent" CRLF); + bufferevent_write (session->bev, out_buf, r); + kill (getppid (), SIGTERM); + } + break; + case COMMAND_UPTIME: + if (check_auth (cmd, session)) { + uptime = time (NULL) - start_time; + /* If uptime more than 2 hours, print as a number of days. */ + if (uptime >= 2 * 3600) { + days = uptime / 86400; + hours = (uptime % 3600) / 60; + minutes = (uptime % 60) / 60; + r = snprintf (out_buf, sizeof (out_buf), "%dday%s %dhour%s %dminute%s" CRLF, + days, days > 1 ? "s" : " ", + hours, hours > 1 ? "s" : " ", + minutes, minutes > 1 ? "s" : " "); + } + /* If uptime is less than 1 minute print only seconds */ + else if (uptime / 60 == 0) { + r = snprintf (out_buf, sizeof (out_buf), "%dsecond%s", uptime, uptime > 1 ? "s" : " "); + } + /* Else print the minutes and seconds. */ + else { + hours = uptime / 3600; + minutes = (uptime % 60) / 60; + r = snprintf (out_buf, sizeof (out_buf), "%dhour%s %dminite%s %dsecond%s", + hours, hours > 1 ? "s" : " ", + minutes, minutes > 1 ? "s" : " ", + (int)uptime, uptime > 1 ? "s" : " "); + } + + } + bufferevent_write (session->bev, out_buf, r); + break; + } +} + +static void +read_socket (struct bufferevent *bev, void *arg) +{ + struct controller_session *session = (struct controller_session *)arg; + int len, i; + char *s, **params, *cmd, out_buf[128]; + GList *comp_list; + + s = evbuffer_readline (EVBUFFER_INPUT (bev)); + if (s != NULL && *s != 0) { + len = strlen (s); + /* Remove end of line characters from string */ + if (s[len - 1] == '\n') { + if (s[len - 2] == '\r') { + s[len - 2] = 0; + } + s[len - 1] = 0; + } + params = g_strsplit (s, " ", -1); + len = g_strv_length (params); + if (len > 0) { + cmd = g_strstrip (params[0]); + comp_list = g_completion_complete (comp, cmd, NULL); + switch (g_list_length (comp_list)) { + case 1: + process_command ((struct controller_command *)comp_list->data, ¶ms[1], session); + break; + case 0: + i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF); + bufferevent_write (bev, out_buf, i); + break; + default: + i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF); + bufferevent_write (bev, out_buf, i); + break; + } + } + g_strfreev (params); + } + if (s != NULL) { + free (s); + } +} + +static void +write_socket (struct bufferevent *bev, void *arg) +{ + char buf[1024], hostbuf[256]; + + gethostname (hostbuf, sizeof (hostbuf - 1)); + hostbuf[sizeof (hostbuf) - 1] = '\0'; + snprintf (buf, sizeof (buf), "Rspamd version %s is running on %s" CRLF, RVERSION, hostbuf); + bufferevent_disable (bev, EV_WRITE); + bufferevent_enable (bev, EV_READ); +} + +static void +err_socket (struct bufferevent *bev, short what, void *arg) +{ + struct controller_session *session = (struct controller_session *)arg; + msg_info ("closing connection"); + /* Free buffers */ + free_session (session); +} + +static void +accept_socket (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + struct sockaddr_storage ss; + struct controller_session *new_session; + socklen_t addrlen = sizeof(ss); + int nfd; + + if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { + return; + } + if (event_make_socket_nonblocking(fd) < 0) { + return; + } + + new_session = g_malloc (sizeof (struct controller_session)); + if (new_session == NULL) { + msg_err ("accept_socket: cannot allocate memory for task, %m"); + return; + } + bzero (new_session, sizeof (struct controller_session)); + new_session->worker = worker; + new_session->cfg = worker->srv->cfg; +#ifdef HAVE_GETPAGESIZE + new_session->session_pool = memory_pool_new (getpagesize () - 1); +#else + new_session->session_pool = memory_pool_new (4095); +#endif + memory_pool_add_destructor (new_session->session_pool, (pool_destruct_func)bufferevent_free, new_session->bev); + + /* Read event */ + new_session->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_session); + bufferevent_enable (new_session->bev, EV_WRITE); +} + +void +start_controller (struct rspamd_worker *worker) +{ + struct sigaction signals; + int listen_sock, i; + struct sockaddr_un *un_addr; + GList *comp_list = NULL; + + worker->srv->pid = getpid (); + worker->srv->type = TYPE_CONTROLLER; + event_init (); + g_mime_init (0); + + init_signals (&signals, sig_handler); + sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); + + /* SIGUSR2 handler */ + signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker); + signal_add (&worker->sig_ev, NULL); + + if (worker->srv->cfg->control_family == AF_INET) { + if ((listen_sock = make_socket (worker->srv->cfg->control_host, worker->srv->cfg->control_port)) == -1) { + msg_err ("start_controller: cannot create tcp listen socket. %m"); + exit(-errno); + } + } + else { + un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un)); + if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->bind_host, un_addr)) == -1) { + msg_err ("start_controller: cannot create unix listen socket. %m"); + exit(-errno); + } + } + + start_time = time (NULL); + + /* Init command completion */ + for (i = 0; i < sizeof (commands) / sizeof (commands[0]) - 1; i ++) { + comp_list = g_list_prepend (comp_list, &commands[i]); + } + comp = g_completion_new (completion_func); + g_completion_add_items (comp, comp_list); + /* Accept event */ + event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_add(&worker->bind_ev, NULL); + + /* Send SIGUSR2 to parent */ + kill (getppid (), SIGUSR2); + + event_loop (0); +} + + +/* + * vi:ts=4 + */ diff --git a/main.c b/main.c index 5c9fe7909..d7ecb418b 100644 --- a/main.c +++ b/main.c @@ -128,6 +128,13 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum pro case 0: /* TODO: add worker code */ switch (type) { + case TYPE_CONTROLLER: + setproctitle ("controller process"); + pidfile_close (rspamd->pfh); + msg_info ("fork_worker: starting controller process %d", getpid ()); + cur->type = TYPE_CONTROLLER; + start_controller (cur); + break; case TYPE_WORKER: default: setproctitle ("worker process"); @@ -281,7 +288,10 @@ main (int argc, char **argv) for (i = 0; i < cfg->workers_number; i++) { fork_worker (rspamd, listen_sock, 0, TYPE_WORKER); } - + /* Start controller if enabled */ + if (cfg->controller_enabled) { + fork_worker (rspamd, listen_sock, 0, TYPE_CONTROLLER); + } /* Signal processing cycle */ for (;;) { @@ -305,6 +315,11 @@ main (int argc, char **argv) active_worker = NULL; } TAILQ_REMOVE(&rspamd->workers, cur, next); + if (cur->type == TYPE_CONTROLLER) { + msg_info ("main: do not restart dead controller"); + g_free (cur); + break; + } if (WIFEXITED (res) && WEXITSTATUS (res) == 0) { /* Normal worker termination, do not fork one more */ msg_info ("main: worker process %d terminated normally", cur->pid); diff --git a/main.h b/main.h index 1902f2d84..de8969de8 100644 --- a/main.h +++ b/main.h @@ -46,6 +46,7 @@ enum process_type { TYPE_MAIN, TYPE_WORKER, + TYPE_CONTROLLER, }; /* Filter type */ @@ -105,6 +106,17 @@ struct save_point { unsigned int saved; }; +/* Control session */ +struct controller_session { + struct rspamd_worker *worker; + /* Access to authorized commands */ + int authorized; + memory_pool_t *session_pool; + struct bufferevent *bev; + struct config_file *cfg; +}; + +/* Worker task structure */ struct worker_task { struct rspamd_worker *worker; enum { @@ -138,7 +150,6 @@ struct worker_task { TAILQ_HEAD (uriq, uri) urls; /* Hash of metric result structures */ GHashTable *results; - /* Config file to write to */ struct config_file *cfg; /* Save point for filters deferred processing */ struct save_point save; @@ -163,6 +174,7 @@ struct c_module { }; void start_worker (struct rspamd_worker *worker, int listen_sock); +void start_controller (struct rspamd_worker *worker); #endif diff --git a/worker.c b/worker.c index 8e09a84d3..089f1ecfa 100644 --- a/worker.c +++ b/worker.c @@ -1,4 +1,3 @@ - #include #include #include