/* * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /* * Rspamd worker implementation */ #include "config.h" #include "libutil/util.h" #include "libserver/maps/map.h" #include "libutil/upstream.h" #include "libserver/protocol.h" #include "libserver/cfg_file.h" #include "libserver/url.h" #include "libserver/dns.h" #include "libmime/message.h" #include "rspamd.h" #include "libstat/stat_api.h" #include "libserver/worker_util.h" #include "libserver/rspamd_control.h" #include "worker_private.h" #include "libserver/http/http_private.h" #include "libserver/cfg_file_private.h" #include #include "unix-std.h" #include "lua/lua_common.h" /* 60 seconds for worker's IO */ #define DEFAULT_WORKER_IO_TIMEOUT 60.0 gpointer init_worker(struct rspamd_config *cfg); void start_worker(struct rspamd_worker *worker); worker_t normal_worker = { "normal", /* Name */ init_worker, /* Init function */ start_worker, /* Start function */ RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE | RSPAMD_WORKER_SCANNER, RSPAMD_WORKER_SOCKET_TCP, /* TCP socket */ RSPAMD_WORKER_VER /* Version info */ }; #define msg_err_ctx(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \ "worker", ctx->cfg->cfg_pool->tag.uid, \ RSPAMD_LOG_FUNC, \ __VA_ARGS__) #define msg_warn_ctx(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \ "worker", ctx->cfg->cfg_pool->tag.uid, \ RSPAMD_LOG_FUNC, \ __VA_ARGS__) #define msg_info_ctx(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \ "worker", ctx->cfg->cfg_pool->tag.uid, \ RSPAMD_LOG_FUNC, \ __VA_ARGS__) struct rspamd_worker_session { int64_t magic; struct rspamd_task *task; int fd; rspamd_inet_addr_t *addr; struct rspamd_worker_ctx *ctx; struct rspamd_http_connection *http_conn; struct rspamd_worker *worker; }; /* * Reduce number of tasks proceeded */ static void reduce_tasks_count(gpointer arg) { struct rspamd_worker *worker = arg; worker->nconns--; if (worker->state == rspamd_worker_wait_connections && worker->nconns == 0) { worker->state = rspamd_worker_wait_final_scripts; msg_info("performing finishing actions"); if (rspamd_worker_call_finish_handlers(worker)) { worker->state = rspamd_worker_wait_final_scripts; } else { worker->state = rspamd_worker_wanna_die; } } else if (worker->state != rspamd_worker_state_running) { worker->state = rspamd_worker_wait_connections; } } static int rspamd_worker_body_handler(struct rspamd_http_connection *conn, struct rspamd_http_message *msg, const char *chunk, gsize len) { struct rspamd_worker_session *session = (struct rspamd_worker_session *) conn->ud; struct rspamd_task *task; struct rspamd_worker_ctx *ctx; const rspamd_ftok_t *hv_tok; gboolean debug_mempool = FALSE; ctx = session->ctx; /* Check debug */ if ((hv_tok = rspamd_http_message_find_header(msg, "Memory")) !
# Bricx Command Center IDE
# http://bricxcc.sourceforge.net
*.bak
*.sym
/* We are done here */ msg_debug_task("normally closing connection from: %s", rspamd_inet_address_to_string(task->client_addr)); rspamd_session_destroy(task->s); } else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) { rspamd_session_pending(task->s); } } else { /* If there was no task, then session is unmanaged */ msg_info("no data received from: %s, closing connection", rspamd_inet_address_to_string_pretty(session->addr)); rspamd_inet_address_free(session->addr); rspamd_http_connection_reset(session->http_conn); rspamd_http_connection_unref(session->http_conn); close(session->fd); g_free(session); } return 0; } /* * Accept new connection and construct task */ static void accept_socket(EV_P_ ev_io *w, int revents) { struct rspamd_worker *worker = (struct rspamd_worker *) w->data; struct rspamd_worker_ctx *ctx; struct rspamd_worker_session *session; rspamd_inet_addr_t *addr = NULL; int nfd, http_opts = 0; ctx = worker->ctx; if (ctx->max_tasks != 0 && worker->nconns > ctx->max_tasks) { msg_info_ctx("current tasks is now: %uD while maximum is: %uD", worker->nconns, ctx->max_tasks); return; } if ((nfd = rspamd_accept_from_socket(w->fd, &addr, rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) { msg_warn_ctx("accept failed: %s", strerror(errno)); return; } /* Check for EAGAIN */ if (nfd == 0) { rspamd_inet_address_free(addr); return; } session = g_malloc0(sizeof(*session)); session->magic = G_MAXINT64; session->addr = addr; session->fd = nfd; session->ctx = ctx; session->worker = worker; if (ctx->encrypted_only && !rspamd_inet_address_is_local(addr)) { http_opts = RSPAMD_HTTP_REQUIRE_ENCRYPTION; } session->http_conn = rspamd_http_connection_new_server( ctx->http_ctx, nfd, rspamd_worker_body_handler, rspamd_worker_error_handler, rspamd_worker_finish_handler, http_opts); worker->srv->stat->connections_count++; rspamd_http_connection_set_max_size(session->http_conn, ctx->cfg->max_message); if (ctx->key) { rspamd_http_connection_set_key(session->http_conn, ctx->key); } rspamd_http_connection_read_message(session->http_conn, session, ctx->timeout); } gpointer init_worker(struct rspamd_config *cfg) { struct rspamd_worker_ctx *ctx; GQuark type; type = g_quark_try_string("normal"); ctx = rspamd_mempool_alloc0(cfg->cfg_pool, sizeof(struct rspamd_worker_ctx)); ctx->magic = rspamd_worker_magic; ctx->is_mime = TRUE; ctx->timeout = DEFAULT_WORKER_IO_TIMEOUT; ctx->cfg = cfg; ctx->task_timeout = NAN; rspamd_rcl_register_worker_option(cfg, type, "mime", rspamd_rcl_parse_struct_boolean, ctx, G_STRUCT_OFFSET(struct rspamd_worker_ctx, is_mime), 0, "Set to `false` if this worker is intended to work with non-MIME messages"); rspamd_rcl_register_worker_option(cfg, type, "encrypted_only", rspamd_rcl_parse_struct_boolean, ctx, G_STRUCT_OFFSET(struct rspamd_worker_ctx, encrypted_only), 0, "Allow only encrypted connections"); rspamd_rcl_register_worker_option(cfg, type, "timeout", rspamd_rcl_parse_struct_time, ctx, G_STRUCT_OFFSET(struct rspamd_worker_ctx, timeout), RSPAMD_CL_FLAG_TIME_FLOAT, "Protocol IO timeout"); rspamd_rcl_register_worker_option(cfg, type, "task_timeout", rspamd_rcl_parse_struct_time, ctx, G_STRUCT_OFFSET(struct rspamd_worker_ctx, task_timeout), RSPAMD_CL_FLAG_TIME_FLOAT, "Maximum task processing time, default: 8.0 seconds"); rspamd_rcl_register_worker_option(cfg, type, "max_tasks", rspamd_rcl_parse_struct_integer, ctx, G_STRUCT_OFFSET(struct rspamd_worker_ctx, max_tasks), RSPAMD_CL_FLAG_INT_32, "Maximum count of parallel tasks processed by a single worker process"); rspamd_rcl_register_worker_option(cfg, type, "keypair", rspamd_rcl_parse_struct_keypair, ctx, G_STRUCT_OFFSET(struct rspamd_worker_ctx, key), 0, "Encryption keypair"); return ctx; } /* * Start worker process */ __attribute__((noreturn)) void start_worker(struct rspamd_worker *worker) { struct rspamd_worker_ctx *ctx = worker->ctx; gboolean is_controller = FALSE; g_assert(rspamd_worker_check_context(worker->ctx, rspamd_worker_magic)); ctx->cfg = worker->srv->cfg; ctx->event_loop = rspamd_prepare_worker(worker, "normal", accept_socket); rspamd_symcache_start_refresh(worker->srv->cfg->cache, ctx->event_loop, worker); ctx->task_timeout = rspamd_worker_check_and_adjust_timeout(ctx->cfg, ctx->task_timeout); ctx->resolver = rspamd_dns_resolver_init(worker->srv->logger, ctx->event_loop, worker->srv->cfg); rspamd_upstreams_library_config(worker->srv->cfg, ctx->cfg->ups_ctx, ctx->event_loop, ctx->resolver->r); ctx->http_ctx = rspamd_http_context_create(ctx->cfg, ctx->event_loop, ctx->cfg->ups_ctx); rspamd_mempool_add_destructor(ctx->cfg->cfg_pool, (rspamd_mempool_destruct_t) rspamd_http_context_free, ctx->http_ctx); rspamd_worker_init_scanner(worker, ctx->event_loop, ctx->resolver, &ctx->lang_det); is_controller = rspamd_worker_check_controller_presence(worker); if (is_controller) { rspamd_worker_init_controller(worker, NULL); } else { rspamd_map_watch(worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, RSPAMD_MAP_WATCH_SCANNER); } rspamd_lua_run_postloads(ctx->cfg->lua_state, ctx->cfg, ctx->event_loop, worker); ev_loop(ctx->event_loop, 0); rspamd_worker_block_signals(); if (is_controller) { rspamd_controller_on_terminate(worker, NULL); } rspamd_stat_close(); REF_RELEASE(ctx->cfg); rspamd_log_close(worker->srv->logger); rspamd_unset_crash_handler(worker->srv); exit(EXIT_SUCCESS); }