aboutsummaryrefslogtreecommitdiffstats
path: root/src/main.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.c')
-rw-r--r--src/main.c116
1 files changed, 34 insertions, 82 deletions
diff --git a/src/main.c b/src/main.c
index e16d9d5e0..bb6209974 100644
--- a/src/main.c
+++ b/src/main.c
@@ -262,9 +262,9 @@ drop_priv (struct rspamd_main *rspamd)
}
static void
-config_logger (struct rspamd_main *rspamd, gboolean is_fatal)
+config_logger (struct rspamd_main *rspamd, GQuark type, gboolean is_fatal)
{
- rspamd_set_logger (rspamd->cfg->log_type, TYPE_MAIN, rspamd);
+ rspamd_set_logger (rspamd->cfg->log_type, type, rspamd);
if (open_log_priv (rspamd->logger, rspamd->workers_uid, rspamd->workers_gid) == -1) {
if (is_fatal) {
fprintf (stderr, "Fatal error, cannot open logfile, exiting\n");
@@ -283,12 +283,12 @@ reread_config (struct rspamd_main *rspamd)
gchar *cfg_file;
GList *l;
struct filter *filt;
+ GQuark type;
tmp_cfg = (struct config_file *)g_malloc (sizeof (struct config_file));
if (tmp_cfg) {
bzero (tmp_cfg, sizeof (struct config_file));
tmp_cfg->cfg_pool = memory_pool_new (memory_pool_get_size ());
- tmp_cfg->modules_num = MODULES_NUM;
init_defaults (tmp_cfg);
cfg_file = memory_pool_strdup (tmp_cfg->cfg_pool, rspamd->cfg->cfg_name);
/* Save some variables */
@@ -309,7 +309,8 @@ reread_config (struct rspamd_main *rspamd)
if (is_debug) {
rspamd->cfg->log_level = G_LOG_LEVEL_DEBUG;
}
- config_logger (rspamd, FALSE);
+ type = g_quark_try_string ("main");
+ config_logger (rspamd, type, FALSE);
/* Pre-init of cache */
rspamd->cfg->cache = g_new0 (struct symbols_cache, 1);
rspamd->cfg->cache->static_pool = memory_pool_new (memory_pool_get_size ());
@@ -368,15 +369,9 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
cur->type = cf->type;
cur->pid = fork ();
cur->cf = g_malloc (sizeof (struct worker_conf));
- /* Copy or init context */
- if (cf->ctx) {
- cur->ctx = cf->ctx;
- }
- else {
- cur->ctx = init_workers_ctx (cf->type);
- }
memcpy (cur->cf, cf, sizeof (struct worker_conf));
cur->pending = FALSE;
+ cur->ctx = cf->ctx;
switch (cur->pid) {
case 0:
/* Update pid for logging */
@@ -385,45 +380,10 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
drop_priv (rspamd);
/* Set limits */
set_worker_limits (cf);
- switch (cf->type) {
- case TYPE_CONTROLLER:
- setproctitle ("controller process");
- rspamd_pidfile_close (rspamd->pfh);
- msg_info ("starting controller process %P", getpid ());
- start_controller (cur);
- break;
- case TYPE_LMTP:
- setproctitle ("lmtp process");
- rspamd_pidfile_close (rspamd->pfh);
- msg_info ("starting lmtp process %P", getpid ());
- start_lmtp_worker (cur);
- break;
- case TYPE_SMTP:
- setproctitle ("smtp process");
- rspamd_pidfile_close (rspamd->pfh);
- msg_info ("starting smtp process %P", getpid ());
- start_smtp_worker (cur);
- break;
- case TYPE_FUZZY:
- setproctitle ("fuzzy storage");
- rspamd_pidfile_close (rspamd->pfh);
- msg_info ("starting fuzzy storage process %P", getpid ());
- start_fuzzy_storage (cur);
- break;
- case TYPE_KVSTORAGE:
- setproctitle ("kv storage");
- rspamd_pidfile_close (rspamd->pfh);
- msg_info ("starting key-value storage process %P", getpid ());
- start_kvstorage_worker (cur);
- break;
- case TYPE_WORKER:
- default:
- setproctitle ("worker process");
- rspamd_pidfile_close (rspamd->pfh);
- msg_info ("starting worker process %P", getpid ());
- start_worker (cur);
- break;
- }
+ setproctitle ("%s process", cf->worker->name);
+ rspamd_pidfile_close (rspamd->pfh);
+ msg_info ("starting %s process %P", cf->worker->name, getpid ());
+ cf->worker->worker_start_func (cur);
break;
case -1:
msg_err ("cannot fork main process. %s", strerror (errno));
@@ -584,7 +544,7 @@ spawn_workers (struct rspamd_main *rspamd)
while (cur) {
cf = cur->data;
- if (cf->has_socket) {
+ if (cf->worker->has_socket) {
if ((p = g_hash_table_lookup (listen_sockets, GINT_TO_POINTER (
make_listen_key (&cf->bind_addr, cf->bind_port, cf->bind_family, cf->bind_host)))) == NULL) {
/* Create listen socket */
@@ -603,13 +563,13 @@ spawn_workers (struct rspamd_main *rspamd)
cf->listen_sock = listen_sock;
}
- if (cf->type == TYPE_FUZZY) {
+ if (cf->worker->unique) {
if (cf->count > 1) {
- msg_err ("cannot spawn more than 1 fuzzy storage worker, so spawn one");
+ msg_err ("cannot spawn more than 1 %s worker, so spawn one", cf->worker->name);
}
fork_worker (rspamd, cf);
}
- else if (cf->type == TYPE_KVSTORAGE) {
+ else if (cf->worker->threaded) {
fork_worker (rspamd, cf);
}
else {
@@ -646,19 +606,19 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
if (waitpid (w->pid, &res, 0) == -1) {
if (errno == EINTR) {
got_alarm = 1;
- if (w->type != TYPE_KVSTORAGE) {
+ if (w->cf->worker->killable) {
msg_info ("terminate worker %P with SIGKILL", w->pid);
kill (w->pid, SIGKILL);
}
else {
- msg_info ("waiting for storages to sync");
+ msg_info ("waiting for workers to sync");
wait_for_workers (key, value, unused);
return TRUE;
}
}
}
- msg_info ("%s process %P terminated %s", process_to_str (w->type), w->pid,
+ msg_info ("%s process %P terminated %s", g_quark_to_string (w->type), w->pid,
got_alarm ? "hardly" : "softly");
g_free (w->cf);
g_free (w);
@@ -841,24 +801,6 @@ print_symbols_cache (struct config_file *cfg)
}
}
-gpointer
-init_workers_ctx (enum process_type type)
-{
- switch (type) {
- case TYPE_WORKER:
- return init_worker ();
- case TYPE_CONTROLLER:
- return init_controller ();
- case TYPE_FUZZY:
- return init_fuzzy_storage ();
- case TYPE_SMTP:
- return init_smtp_worker ();
- case TYPE_KVSTORAGE:
- return init_kvstorage_worker ();
- default:
- return NULL;
- }
-}
gint
main (gint argc, gchar **argv, gchar **env)
@@ -870,6 +812,8 @@ main (gint argc, gchar **argv, gchar **env)
struct filter *filt;
pid_t wrk;
GList *l;
+ worker_t **pworker;
+ GQuark type;
#ifdef HAVE_SA_SIGINFO
signals_info = g_queue_new ();
@@ -893,7 +837,6 @@ main (gint argc, gchar **argv, gchar **env)
memset (rspamd_main->cfg, 0, sizeof (struct config_file));
rspamd_main->cfg->cfg_pool = memory_pool_new (memory_pool_get_size ());
- rspamd_main->cfg->modules_num = MODULES_NUM;
init_defaults (rspamd_main->cfg);
memset (&signals, 0, sizeof (struct sigaction));
@@ -910,6 +853,8 @@ main (gint argc, gchar **argv, gchar **env)
rspamd_main->cfg->log_level = G_LOG_LEVEL_CRITICAL;
}
+ type = g_quark_from_static_string ("main");
+
#ifdef HAVE_SETLOCALE
/* Set locale setting to C locale to avoid problems in future */
setlocale (LC_ALL, "C");
@@ -919,13 +864,20 @@ main (gint argc, gchar **argv, gchar **env)
#endif
/* First set logger to console logger */
- rspamd_set_logger (RSPAMD_LOG_CONSOLE, TYPE_MAIN, rspamd_main);
+ rspamd_set_logger (RSPAMD_LOG_CONSOLE, type, rspamd_main);
(void)open_log (rspamd_main->logger);
g_log_set_default_handler (rspamd_glib_log_function, rspamd_main->logger);
detect_priv (rspamd_main);
init_lua (rspamd_main->cfg);
+ pworker = &workers[0];
+ while (*pworker) {
+ /* Init string quarks */
+ (void)g_quark_from_static_string ((*pworker)->name);
+ pworker ++;
+ }
+
/* Init counters */
counters = rspamd_hash_new_shared (rspamd_main->server_pool, g_str_hash, g_str_equal, 64);
/* Init listen sockets hash */
@@ -998,7 +950,7 @@ main (gint argc, gchar **argv, gchar **env)
rlim.rlim_cur = 100 * 1024 * 1024;
setrlimit (RLIMIT_STACK, &rlim);
- config_logger (rspamd_main, TRUE);
+ config_logger (rspamd_main, type, TRUE);
msg_info ("rspamd " RVERSION " is starting, build id: " RID);
rspamd_main->cfg->cfg_name = memory_pool_strdup (rspamd_main->cfg->cfg_pool, rspamd_main->cfg->cfg_name);
@@ -1011,7 +963,7 @@ main (gint argc, gchar **argv, gchar **env)
/* Write info */
rspamd_main->pid = getpid ();
- rspamd_main->type = TYPE_MAIN;
+ rspamd_main->type = type;
init_signals (&signals, sig_handler);
@@ -1098,14 +1050,14 @@ main (gint argc, gchar **argv, gchar **env)
if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
/* Normal worker termination, do not fork one more */
- msg_info ("%s process %P terminated normally", process_to_str (cur->type), cur->pid);
+ msg_info ("%s process %P terminated normally", g_quark_to_string (cur->type), cur->pid);
}
else {
if (WIFSIGNALED (res)) {
- msg_warn ("%s process %P terminated abnormally by signal: %d", process_to_str (cur->type), cur->pid, WTERMSIG (res));
+ msg_warn ("%s process %P terminated abnormally by signal: %d", g_quark_to_string (cur->type), cur->pid, WTERMSIG (res));
}
else {
- msg_warn ("%s process %P terminated abnormally", process_to_str (cur->type), cur->pid);
+ msg_warn ("%s process %P terminated abnormally", g_quark_to_string (cur->type), cur->pid);
}
/* Fork another worker in replace of dead one */
delay_fork (cur->cf);