123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537 |
- /*
- * Copyright 2024 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- /*
- * Rspamd worker implementation
- */
-
- #include "config.h"
- #include "libutil/util.h"
- #include "libserver/maps/map.h"
- #include "libutil/upstream.h"
- #include "libserver/protocol.h"
- #include "libserver/cfg_file.h"
- #include "libserver/url.h"
- #include "libserver/dns.h"
- #include "libmime/message.h"
- #include "rspamd.h"
- #include "libstat/stat_api.h"
- #include "libserver/worker_util.h"
- #include "libserver/rspamd_control.h"
- #include "worker_private.h"
- #include "libserver/http/http_private.h"
- #include "libserver/cfg_file_private.h"
- #include <math.h>
- #include "unix-std.h"
-
- #include "lua/lua_common.h"
-
- /* 60 seconds for worker's IO */
- #define DEFAULT_WORKER_IO_TIMEOUT 60.0
-
- gpointer init_worker(struct rspamd_config *cfg);
- void start_worker(struct rspamd_worker *worker);
-
- worker_t normal_worker = {
- "normal", /* Name */
- init_worker, /* Init function */
- start_worker, /* Start function */
- RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE | RSPAMD_WORKER_SCANNER,
- RSPAMD_WORKER_SOCKET_TCP, /* TCP socket */
- RSPAMD_WORKER_VER /* Version info */
- };
-
- #define msg_err_ctx(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
- "worker", ctx->cfg->cfg_pool->tag.uid, \
- RSPAMD_LOG_FUNC, \
- __VA_ARGS__)
- #define msg_warn_ctx(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \
- "worker", ctx->cfg->cfg_pool->tag.uid, \
- RSPAMD_LOG_FUNC, \
- __VA_ARGS__)
- #define msg_info_ctx(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \
- "worker", ctx->cfg->cfg_pool->tag.uid, \
- RSPAMD_LOG_FUNC, \
- __VA_ARGS__)
-
- struct rspamd_worker_session {
- int64_t magic;
- struct rspamd_task *task;
- int fd;
- rspamd_inet_addr_t *addr;
- struct rspamd_worker_ctx *ctx;
- struct rspamd_http_connection *http_conn;
- struct rspamd_worker *worker;
- };
- /*
- * Reduce number of tasks proceeded
- */
- static void
- reduce_tasks_count(gpointer arg)
- {
- struct rspamd_worker *worker = arg;
-
- worker->nconns--;
-
- if (worker->state == rspamd_worker_wait_connections && worker->nconns == 0) {
-
- worker->state = rspamd_worker_wait_final_scripts;
- msg_info("performing finishing actions");
-
- if (rspamd_worker_call_finish_handlers(worker)) {
- worker->state = rspamd_worker_wait_final_scripts;
- }
- else {
- worker->state = rspamd_worker_wanna_die;
- }
- }
- else if (worker->state != rspamd_worker_state_running) {
- worker->state = rspamd_worker_wait_connections;
- }
- }
-
- static int
- rspamd_worker_body_handler(struct rspamd_http_connection *conn,
- struct rspamd_http_message *msg,
- const char *chunk, gsize len)
- {
- struct rspamd_worker_session *session = (struct rspamd_worker_session *) conn->ud;
- struct rspamd_task *task;
- struct rspamd_worker_ctx *ctx;
- const rspamd_ftok_t *hv_tok;
- gboolean debug_mempool = FALSE;
-
- ctx = session->ctx;
-
- /* Check debug */
- if ((hv_tok = rspamd_http_message_find_header(msg, "Memory")) != NULL) {
- rspamd_ftok_t cmp;
-
- RSPAMD_FTOK_ASSIGN(&cmp, "debug");
-
- if (rspamd_ftok_cmp(hv_tok, &cmp) == 0) {
- debug_mempool = TRUE;
- }
- }
-
- task = rspamd_task_new(session->worker,
- session->ctx->cfg, NULL, session->ctx->lang_det,
- session->ctx->event_loop,
- debug_mempool);
- session->task = task;
-
- msg_info_task("accepted connection from %s port %d, task ptr: %p",
- rspamd_inet_address_to_string(session->addr),
- rspamd_inet_address_get_port(session->addr),
- task);
-
- /* Copy some variables */
- if (ctx->is_mime) {
- task->flags |= RSPAMD_TASK_FLAG_MIME;
- }
- else {
- task->flags &= ~RSPAMD_TASK_FLAG_MIME;
- }
-
- /* We actually transfer ownership from session to task here */
- task->sock = session->fd;
- task->client_addr = session->addr;
- task->worker = session->worker;
- task->http_conn = session->http_conn;
-
- task->resolver = ctx->resolver;
- /* TODO: allow to disable autolearn in protocol */
- task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO;
-
- session->worker->nconns++;
- rspamd_mempool_add_destructor(task->task_pool,
- (rspamd_mempool_destruct_t) reduce_tasks_count,
- session->worker);
-
- /* Session memory is also now handled by task */
- rspamd_mempool_add_destructor(task->task_pool,
- (rspamd_mempool_destruct_t) g_free,
- session);
-
- /* Set up async session */
- task->s = rspamd_session_create(task->task_pool, rspamd_task_fin,
- NULL, (event_finalizer_t) rspamd_task_free, task);
-
- if (!rspamd_protocol_handle_request(task, msg)) {
- msg_err_task("cannot handle request: %e", task->err);
- task->flags |= RSPAMD_TASK_FLAG_SKIP;
- }
- else {
- if (task->cmd == CMD_PING) {
- task->flags |= RSPAMD_TASK_FLAG_SKIP;
- }
- else {
- if (!rspamd_task_load_message(task, msg, chunk, len)) {
- msg_err_task("cannot load message: %e", task->err);
- task->flags |= RSPAMD_TASK_FLAG_SKIP;
- }
- }
- }
-
- /* Set global timeout for the task */
- if (!isnan(ctx->task_timeout) && ctx->task_timeout > 0.0) {
- task->timeout_ev.data = task;
- ev_timer_init(&task->timeout_ev, rspamd_task_timeout,
- ctx->task_timeout,
- ctx->task_timeout);
- ev_set_priority(&task->timeout_ev, EV_MAXPRI);
- ev_timer_start(task->event_loop, &task->timeout_ev);
- }
-
- /* Set socket guard */
- task->guard_ev.data = task;
- ev_io_init(&task->guard_ev,
- rspamd_worker_guard_handler,
- task->sock, EV_READ);
- ev_io_start(task->event_loop, &task->guard_ev);
-
- rspamd_task_process(task, RSPAMD_TASK_PROCESS_ALL);
-
- return 0;
- }
-
- static void
- rspamd_worker_error_handler(struct rspamd_http_connection *conn, GError *err)
- {
- struct rspamd_worker_session *session = (struct rspamd_worker_session *) conn->ud;
- struct rspamd_task *task;
- struct rspamd_http_message *msg;
- rspamd_fstring_t *reply;
-
- /*
- * This function can be called with both struct rspamd_worker_session *
- * and struct rspamd_task *
- *
- * The first case is when we read message and it is controlled by this code;
- * the second case is when a reply is written and we do not control it normally,
- * as it is managed by `rspamd_protocol_reply` in protocol.c
- *
- * Hence, we need to distinguish our arguments...
- *
- * The approach here is simple:
- * - struct rspamd_worker_session starts with int64_t `magic` and we set it to
- * MAX_INT64
- * - struct rspamd_task starts with a pointer (or pointer + command on 32 bit system)
- *
- * The idea is simple: no sane pointer would reach MAX_INT64, so if this field
- * is MAX_INT64 then it is our session, and if it is not then it is a task.
- */
-
- if (session->magic == G_MAXINT64) {
- task = session->task;
- }
- else {
- task = (struct rspamd_task *) conn->ud;
- }
-
-
- if (task) {
- msg_info_task("abnormally closing connection from: %s, error: %e",
- rspamd_inet_address_to_string_pretty(task->client_addr), err);
-
- if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
- /* Terminate session immediately */
- rspamd_session_destroy(task->s);
- }
- else {
- task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED;
- msg = rspamd_http_new_message(HTTP_RESPONSE);
-
- if (err) {
- msg->status = rspamd_fstring_new_init(err->message,
- strlen(err->message));
- msg->code = err->code;
- }
- else {
- msg->status = rspamd_fstring_new_init("Internal error",
- strlen("Internal error"));
- msg->code = 500;
- }
-
- msg->date = time(NULL);
-
- reply = rspamd_fstring_sized_new(msg->status->len + 16);
- rspamd_printf_fstring(&reply, "{\"error\":\"%V\"}", msg->status);
- rspamd_http_message_set_body_from_fstring_steal(msg, reply);
- rspamd_http_connection_reset(task->http_conn);
- /* Use a shorter timeout for writing reply */
- rspamd_http_connection_write_message(task->http_conn,
- msg,
- NULL,
- "application/json",
- task,
- session->ctx->timeout / 10.0);
- }
- }
- else {
- /* If there was no task, then session is unmanaged */
- msg_info("no data received from: %s, error: %e",
- rspamd_inet_address_to_string_pretty(session->addr), err);
- rspamd_http_connection_reset(session->http_conn);
- rspamd_http_connection_unref(session->http_conn);
- rspamd_inet_address_free(session->addr);
- close(session->fd);
- g_free(session);
- }
- }
-
- static int
- rspamd_worker_finish_handler(struct rspamd_http_connection *conn,
- struct rspamd_http_message *msg)
- {
- struct rspamd_worker_session *session = (struct rspamd_worker_session *) conn->ud;
- struct rspamd_task *task;
-
- /* Read the comment to rspamd_worker_error_handler */
-
- if (session->magic == G_MAXINT64) {
- task = session->task;
- }
- else {
- task = (struct rspamd_task *) conn->ud;
- }
-
- if (task) {
- if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
- /* We are done here */
- msg_debug_task("normally closing connection from: %s",
- rspamd_inet_address_to_string(task->client_addr));
- rspamd_session_destroy(task->s);
- }
- else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) {
- rspamd_session_pending(task->s);
- }
- }
- else {
- /* If there was no task, then session is unmanaged */
- msg_info("no data received from: %s, closing connection",
- rspamd_inet_address_to_string_pretty(session->addr));
- rspamd_inet_address_free(session->addr);
- rspamd_http_connection_reset(session->http_conn);
- rspamd_http_connection_unref(session->http_conn);
- close(session->fd);
- g_free(session);
- }
-
- return 0;
- }
-
- /*
- * Accept new connection and construct task
- */
- static void
- accept_socket(EV_P_ ev_io *w, int revents)
- {
- struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
- struct rspamd_worker_ctx *ctx;
- struct rspamd_worker_session *session;
- rspamd_inet_addr_t *addr = NULL;
- int nfd, http_opts = 0;
-
- ctx = worker->ctx;
-
- if (ctx->max_tasks != 0 && worker->nconns > ctx->max_tasks) {
- msg_info_ctx("current tasks is now: %uD while maximum is: %uD",
- worker->nconns,
- ctx->max_tasks);
- return;
- }
-
- if ((nfd =
- rspamd_accept_from_socket(w->fd, &addr,
- rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
- msg_warn_ctx("accept failed: %s", strerror(errno));
- return;
- }
- /* Check for EAGAIN */
- if (nfd == 0) {
- rspamd_inet_address_free(addr);
-
- return;
- }
-
- session = g_malloc0(sizeof(*session));
- session->magic = G_MAXINT64;
- session->addr = addr;
- session->fd = nfd;
- session->ctx = ctx;
- session->worker = worker;
-
- if (ctx->encrypted_only && !rspamd_inet_address_is_local(addr)) {
- http_opts = RSPAMD_HTTP_REQUIRE_ENCRYPTION;
- }
-
- session->http_conn = rspamd_http_connection_new_server(
- ctx->http_ctx,
- nfd,
- rspamd_worker_body_handler,
- rspamd_worker_error_handler,
- rspamd_worker_finish_handler,
- http_opts);
-
- worker->srv->stat->connections_count++;
- rspamd_http_connection_set_max_size(session->http_conn,
- ctx->cfg->max_message);
-
- if (ctx->key) {
- rspamd_http_connection_set_key(session->http_conn, ctx->key);
- }
-
- rspamd_http_connection_read_message(session->http_conn,
- session,
- ctx->timeout);
- }
-
- gpointer
- init_worker(struct rspamd_config *cfg)
- {
- struct rspamd_worker_ctx *ctx;
- GQuark type;
-
- type = g_quark_try_string("normal");
- ctx = rspamd_mempool_alloc0(cfg->cfg_pool,
- sizeof(struct rspamd_worker_ctx));
-
- ctx->magic = rspamd_worker_magic;
- ctx->is_mime = TRUE;
- ctx->timeout = DEFAULT_WORKER_IO_TIMEOUT;
- ctx->cfg = cfg;
- ctx->task_timeout = NAN;
-
- rspamd_rcl_register_worker_option(cfg,
- type,
- "mime",
- rspamd_rcl_parse_struct_boolean,
- ctx,
- G_STRUCT_OFFSET(struct rspamd_worker_ctx, is_mime),
- 0,
- "Set to `false` if this worker is intended to work with non-MIME messages");
-
- rspamd_rcl_register_worker_option(cfg,
- type,
- "encrypted_only",
- rspamd_rcl_parse_struct_boolean,
- ctx,
- G_STRUCT_OFFSET(struct rspamd_worker_ctx, encrypted_only),
- 0,
- "Allow only encrypted connections");
-
-
- rspamd_rcl_register_worker_option(cfg,
- type,
- "timeout",
- rspamd_rcl_parse_struct_time,
- ctx,
- G_STRUCT_OFFSET(struct rspamd_worker_ctx,
- timeout),
- RSPAMD_CL_FLAG_TIME_FLOAT,
- "Protocol IO timeout");
-
- rspamd_rcl_register_worker_option(cfg,
- type,
- "task_timeout",
- rspamd_rcl_parse_struct_time,
- ctx,
- G_STRUCT_OFFSET(struct rspamd_worker_ctx,
- task_timeout),
- RSPAMD_CL_FLAG_TIME_FLOAT,
- "Maximum task processing time, default: 8.0 seconds");
-
- rspamd_rcl_register_worker_option(cfg,
- type,
- "max_tasks",
- rspamd_rcl_parse_struct_integer,
- ctx,
- G_STRUCT_OFFSET(struct rspamd_worker_ctx,
- max_tasks),
- RSPAMD_CL_FLAG_INT_32,
- "Maximum count of parallel tasks processed by a single worker process");
-
- rspamd_rcl_register_worker_option(cfg,
- type,
- "keypair",
- rspamd_rcl_parse_struct_keypair,
- ctx,
- G_STRUCT_OFFSET(struct rspamd_worker_ctx,
- key),
- 0,
- "Encryption keypair");
-
- return ctx;
- }
-
- /*
- * Start worker process
- */
- __attribute__((noreturn)) void
- start_worker(struct rspamd_worker *worker)
- {
- struct rspamd_worker_ctx *ctx = worker->ctx;
- gboolean is_controller = FALSE;
-
- g_assert(rspamd_worker_check_context(worker->ctx, rspamd_worker_magic));
- ctx->cfg = worker->srv->cfg;
- ctx->event_loop = rspamd_prepare_worker(worker, "normal", accept_socket);
- rspamd_symcache_start_refresh(worker->srv->cfg->cache, ctx->event_loop,
- worker);
-
- ctx->task_timeout = rspamd_worker_check_and_adjust_timeout(ctx->cfg, ctx->task_timeout);
-
- ctx->resolver = rspamd_dns_resolver_init(worker->srv->logger,
- ctx->event_loop,
- worker->srv->cfg);
- rspamd_upstreams_library_config(worker->srv->cfg, ctx->cfg->ups_ctx,
- ctx->event_loop, ctx->resolver->r);
-
- ctx->http_ctx = rspamd_http_context_create(ctx->cfg, ctx->event_loop,
- ctx->cfg->ups_ctx);
- rspamd_mempool_add_destructor(ctx->cfg->cfg_pool,
- (rspamd_mempool_destruct_t) rspamd_http_context_free,
- ctx->http_ctx);
- rspamd_worker_init_scanner(worker, ctx->event_loop, ctx->resolver,
- &ctx->lang_det);
-
- is_controller = rspamd_worker_check_controller_presence(worker);
-
- if (is_controller) {
- rspamd_worker_init_controller(worker, NULL);
- }
- else {
- rspamd_map_watch(worker->srv->cfg, ctx->event_loop, ctx->resolver,
- worker, RSPAMD_MAP_WATCH_SCANNER);
- }
-
- rspamd_lua_run_postloads(ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
- worker);
-
- ev_loop(ctx->event_loop, 0);
- rspamd_worker_block_signals();
-
- if (is_controller) {
- rspamd_controller_on_terminate(worker, NULL);
- }
-
- rspamd_stat_close();
- REF_RELEASE(ctx->cfg);
- rspamd_log_close(worker->srv->logger);
- rspamd_unset_crash_handler(worker->srv);
-
- exit(EXIT_SUCCESS);
- }
|