diff options
-rw-r--r-- | src/controller.c | 2 | ||||
-rw-r--r-- | src/libserver/protocol.c | 11 | ||||
-rw-r--r-- | src/libserver/protocol.h | 7 | ||||
-rw-r--r-- | src/rspamd_proxy.c | 197 | ||||
-rw-r--r-- | src/worker.c | 4 | ||||
-rw-r--r-- | src/worker_private.h | 10 |
6 files changed, 210 insertions, 21 deletions
diff --git a/src/controller.c b/src/controller.c index ad24e5034..7214aa59f 100644 --- a/src/controller.c +++ b/src/controller.c @@ -1876,7 +1876,7 @@ rspamd_controller_scan_reply (struct rspamd_task *task) msg = rspamd_http_new_message (HTTP_RESPONSE); msg->date = time (NULL); msg->code = 200; - rspamd_protocol_http_reply (msg, task); + rspamd_protocol_http_reply (msg, task, NULL); rspamd_http_connection_reset (conn_ent->conn); rspamd_http_connection_write_message (conn_ent->conn, msg, NULL, "application/json", conn_ent, conn_ent->conn->fd, conn_ent->rt->ptv, diff --git a/src/libserver/protocol.c b/src/libserver/protocol.c index cbf91027c..52ba05188 100644 --- a/src/libserver/protocol.c +++ b/src/libserver/protocol.c @@ -1241,7 +1241,7 @@ rspamd_protocol_write_ucl (struct rspamd_task *task, void rspamd_protocol_http_reply (struct rspamd_http_message *msg, - struct rspamd_task *task) + struct rspamd_task *task, ucl_object_t **pobj) { struct rspamd_metric_result *metric_res; GHashTableIter hiter; @@ -1265,6 +1265,10 @@ rspamd_protocol_http_reply (struct rspamd_http_message *msg, top = rspamd_protocol_write_ucl (task, flags); + if (pobj) { + *pobj = top; + } + if (!(task->flags & RSPAMD_TASK_FLAG_NO_LOG)) { rspamd_roll_history_update (task->worker->srv->history, task); } @@ -1400,7 +1404,7 @@ end: } } -static void +void rspamd_protocol_write_log_pipe (struct rspamd_task *task) { struct rspamd_worker_log_pipe *lp; @@ -1645,7 +1649,6 @@ rspamd_protocol_write_reply (struct rspamd_task *task) msg->date = time (NULL); - debug_task ("writing reply to client"); if (task->err != NULL) { ucl_object_t *top = NULL; @@ -1675,7 +1678,7 @@ rspamd_protocol_write_reply (struct rspamd_task *task) case CMD_PROCESS: case CMD_SKIP: case CMD_CHECK_V2: - rspamd_protocol_http_reply (msg, task); + rspamd_protocol_http_reply (msg, task, NULL); rspamd_protocol_write_log_pipe (task); break; case CMD_PING: diff --git a/src/libserver/protocol.h b/src/libserver/protocol.h index c0095fedc..3b7dabc73 100644 --- a/src/libserver/protocol.h +++ b/src/libserver/protocol.h @@ -66,7 +66,12 @@ gboolean rspamd_protocol_handle_request (struct rspamd_task *task, * @param task */ void rspamd_protocol_http_reply (struct rspamd_http_message *msg, - struct rspamd_task *task); + struct rspamd_task *task, ucl_object_t **pobj); +/** + * Write data to log pipes + * @param task + */ +void rspamd_protocol_write_log_pipe (struct rspamd_task *task); enum rspamd_protocol_flags { RSPAMD_PROTOCOL_BASIC = 1 << 0, diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index 992a4f08d..e381a7c54 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -26,8 +26,10 @@ #include "libmime/message.h" #include "rspamd.h" #include "libserver/worker_util.h" +#include "worker_private.h" #include "lua/lua_common.h" #include "keypairs_cache.h" +#include "libstat/stat_api.h" #include "ottery.h" #include "unix-std.h" @@ -73,6 +75,7 @@ struct rspamd_http_upstream { gint parser_from_ref; gint parser_to_ref; gboolean local; + gboolean self_process; }; struct rspamd_http_mirror { @@ -118,6 +121,8 @@ struct rspamd_proxy_ctx { GArray *cmp_refs; /* Maximum count for retries */ guint max_retries; + /* If we have self_scanning backends, we need to work as a normal worker */ + gboolean has_self_scan; }; enum rspamd_backend_flags { @@ -142,9 +147,11 @@ struct rspamd_proxy_backend_connection { enum rspamd_backend_flags flags; gint parser_from_ref; gint parser_to_ref; + struct rspamd_task *task; }; struct rspamd_proxy_session { + struct rspamd_worker *worker; rspamd_mempool_t *pool; struct rspamd_proxy_ctx *ctx; rspamd_inet_addr_t *client_addr; @@ -318,25 +325,33 @@ rspamd_proxy_parse_upstream (rspamd_mempool_t *pool, (rspamd_mempool_destruct_t)rspamd_pubkey_unref, up->key); } + elt = ucl_object_lookup (obj, "self_process"); + if (elt && ucl_object_toboolean (elt)) { + up->self_process = TRUE; + ctx->has_self_scan = TRUE; + } + elt = ucl_object_lookup (obj, "hosts"); - if (elt == NULL) { + if (elt == NULL && !up->self_process) { g_set_error (err, rspamd_proxy_quark (), 100, "upstream option must have some hosts definition"); goto err; } - up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx); - if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) { - g_set_error (err, rspamd_proxy_quark (), 100, - "upstream has bad hosts definition"); + if (elt) { + up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx); + if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) { + g_set_error (err, rspamd_proxy_quark (), 100, + "upstream has bad hosts definition"); - goto err; - } + goto err; + } - rspamd_mempool_add_destructor (pool, - (rspamd_mempool_destruct_t)rspamd_upstreams_destroy, up->u); + rspamd_mempool_add_destructor (pool, + (rspamd_mempool_destruct_t)rspamd_upstreams_destroy, up->u); + } elt = ucl_object_lookup (obj, "default"); if (elt && ucl_object_toboolean (elt)) { @@ -841,8 +856,14 @@ proxy_session_dtor (struct rspamd_proxy_session *session) } } - if (session->master_conn && session->master_conn->results) { - ucl_object_unref (session->master_conn->results); + if (session->master_conn) { + if (session->master_conn->results) { + ucl_object_unref (session->master_conn->results); + } + + if (session->master_conn->task) { + rspamd_session_destroy (session->master_conn->task->s); + } } g_ptr_array_free (session->mirror_conns, TRUE); @@ -1194,6 +1215,144 @@ proxy_backend_master_finish_handler (struct rspamd_http_connection *conn, return 0; } +static void +rspamd_proxy_scan_self_reply (struct rspamd_task *task) +{ + struct rspamd_http_message *msg; + struct rspamd_proxy_session *session = task->fin_arg; + ucl_object_t *rep; + const char *ctype = "application/json"; + + msg = rspamd_http_new_message (HTTP_RESPONSE); + msg->date = time (NULL); + msg->code = 200; + + switch (task->cmd) { + case CMD_REPORT_IFSPAM: + case CMD_REPORT: + case CMD_CHECK: + case CMD_SYMBOLS: + case CMD_PROCESS: + case CMD_SKIP: + case CMD_CHECK_V2: + rspamd_protocol_http_reply (msg, task, &rep); + rspamd_protocol_write_log_pipe (task); + break; + case CMD_PING: + rspamd_http_message_set_body (msg, "pong" CRLF, 6); + ctype = "text/plain"; + break; + case CMD_OTHER: + msg_err_task ("BROKEN"); + break; + } + + rspamd_http_connection_reset (session->client_conn); + session->master_conn->flags |= RSPAMD_BACKEND_CLOSED; + session->master_conn->results = rep; + rspamd_http_connection_write_message (session->client_conn, msg, NULL, + ctype, + session, + session->client_sock, + NULL, + session->ctx->ev_base); +} + +static gboolean +rspamd_proxy_task_fin (void *ud) +{ + struct rspamd_task *task = ud; + + msg_debug_task ("finish task"); + + if (RSPAMD_TASK_IS_PROCESSED (task)) { + rspamd_proxy_scan_self_reply (task); + return TRUE; + } + + if (!rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL)) { + rspamd_proxy_scan_self_reply (task); + return TRUE; + } + + if (RSPAMD_TASK_IS_PROCESSED (task)) { + rspamd_proxy_scan_self_reply (task); + return TRUE; + } + + /* One more iteration */ + return FALSE; +} + +static gboolean +rspamd_proxy_self_scan (struct rspamd_proxy_session *session) +{ + struct rspamd_task *task; + struct rspamd_http_message *msg; + struct event *guard_ev; + const gchar *data; + gsize len; + + msg = session->client_message; + task = rspamd_task_new (session->worker, session->ctx->cfg); + task->flags |= RSPAMD_TASK_FLAG_MIME; + task->sock = session->client_sock; + task->client_addr = session->client_addr; + task->fin_arg = session; + task->resolver = session->ctx->resolver; + /* TODO: allow to disable autolearn in protocol */ + task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO; + task->ev_base = session->ctx->ev_base; + task->s = rspamd_session_create (task->task_pool, rspamd_proxy_task_fin, + NULL, (event_finalizer_t )rspamd_task_free, task); + data = rspamd_http_message_get_body (msg, &len); + + /* Process message */ + if (!rspamd_protocol_handle_request (task, msg)) { + msg_err_task ("cannot handle request: %e", task->err); + task->flags |= RSPAMD_TASK_FLAG_SKIP; + } + else { + if (task->cmd == CMD_PING) { + task->flags |= RSPAMD_TASK_FLAG_SKIP; + } + else { + if (!rspamd_task_load_message (task, msg, data, len)) { + msg_err_task ("cannot load message: %e", task->err); + task->flags |= RSPAMD_TASK_FLAG_SKIP; + } + } + } + + /* Set global timeout for the task */ + if (session->ctx->default_upstream->timeout > 0.0) { + struct timeval task_tv; + + event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout, + task); + event_base_set (session->ctx->ev_base, &task->timeout_ev); + double_to_tv (session->ctx->default_upstream->timeout, &task_tv); + event_add (&task->timeout_ev, &task_tv); + } + + /* Set socket guard */ + guard_ev = rspamd_mempool_alloc (task->task_pool, sizeof (*guard_ev)); +#ifdef EV_CLOSED + event_set (guard_ev, task->sock, EV_READ|EV_PERSIST|EV_CLOSED, + rspamd_worker_guard_handler, task); +#else + event_set (guard_ev, task->sock, EV_READ|EV_PERSIST, + rspamd_worker_guard_handler, task); +#endif + event_base_set (task->ev_base, guard_ev); + event_add (guard_ev, NULL); + task->guard_ev = guard_ev; + + rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL); + + return TRUE; +} + static gboolean proxy_send_master_message (struct rspamd_proxy_session *session) { @@ -1222,6 +1381,9 @@ proxy_send_master_message (struct rspamd_proxy_session *session) goto err; } else { + if (backend->self_process) { + return rspamd_proxy_self_scan (session); + } retry: if (session->ctx->max_retries && session->retries > session->ctx->max_retries) { @@ -1415,6 +1577,7 @@ proxy_accept_socket (gint fd, short what, void *arg) ctx->keys_cache, NULL); session->ctx = ctx; + session->worker = worker; if (ctx->key) { rspamd_http_connection_set_key (session->client_conn, ctx->key); @@ -1450,8 +1613,7 @@ proxy_rotate_key (gint fd, short what, void *arg) } void -start_rspamd_proxy (struct rspamd_worker *worker) -{ +start_rspamd_proxy (struct rspamd_worker *worker) { struct rspamd_proxy_ctx *ctx = worker->ctx; struct timeval rot_tv; @@ -1480,11 +1642,20 @@ start_rspamd_proxy (struct rspamd_worker *worker) event_base_set (ctx->ev_base, &ctx->rotate_ev); event_add (&ctx->rotate_ev, &rot_tv); + if (ctx->has_self_scan) { + /* Additional initialisation needed */ + rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver); + } + event_base_loop (ctx->ev_base, 0); rspamd_worker_block_signals (); rspamd_log_close (worker->srv->logger); + if (ctx->has_self_scan) { + rspamd_stat_close (); + } + rspamd_keypair_cache_destroy (ctx->keys_cache); REF_RELEASE (ctx->cfg); diff --git a/src/worker.c b/src/worker.c index 724b9ccd2..5bb4b2d61 100644 --- a/src/worker.c +++ b/src/worker.c @@ -142,7 +142,7 @@ reduce_tasks_count (gpointer arg) } } -static void +void rspamd_task_timeout (gint fd, short what, gpointer ud) { struct rspamd_task *task = (struct rspamd_task *) ud; @@ -156,7 +156,7 @@ rspamd_task_timeout (gint fd, short what, gpointer ud) } } -static void +void rspamd_worker_guard_handler (gint fd, short what, void *data) { struct rspamd_task *task = data; diff --git a/src/worker_private.h b/src/worker_private.h index ac391fc8c..b9a9e57d6 100644 --- a/src/worker_private.h +++ b/src/worker_private.h @@ -60,4 +60,14 @@ void rspamd_worker_init_scanner (struct rspamd_worker *worker, struct event_base *ev_base, struct rspamd_dns_resolver *resolver); +/* + * Called on forced timeout + */ +void rspamd_task_timeout (gint fd, short what, gpointer ud); + +/* + * Called on unexpected IO error (e.g. ECONNRESET) + */ +void rspamd_worker_guard_handler (gint fd, short what, void *data); + #endif |