diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/controller.c | 58 | ||||
-rw-r--r-- | src/fuzzy_storage.c | 1486 |
2 files changed, 153 insertions, 1391 deletions
diff --git a/src/controller.c b/src/controller.c index bf74f03a3..851087945 100644 --- a/src/controller.c +++ b/src/controller.c @@ -105,12 +105,8 @@ INIT_LOG_MODULE(controller) #define COLOR_REJECT "#CB4B4B" #define COLOR_TOTAL "#9440ED" -const struct timeval rrd_update_time = { - .tv_sec = 1, - .tv_usec = 0 -}; - -const guint64 rspamd_controller_ctx_magic = 0xf72697805e6941faULL; +const static ev_tstamp rrd_update_time = 1.0; +const static guint64 rspamd_controller_ctx_magic = 0xf72697805e6941faULL; extern void fuzzy_stat_command (struct rspamd_task *task); @@ -132,7 +128,7 @@ worker_t controller_worker = { struct rspamd_controller_worker_ctx { guint64 magic; /* Events base */ - struct ev_loop *ev_base; + struct ev_loop *event_loop; /* DNS resolver */ struct rspamd_dns_resolver *resolver; /* Config */ @@ -153,7 +149,7 @@ struct rspamd_controller_worker_ctx { struct rspamd_http_context *http_ctx; struct rspamd_http_connection_router *http; /* Server's start time */ - time_t start_time; + ev_tstamp start_time; /* Main server */ struct rspamd_main *srv; /* SSL cert */ @@ -182,9 +178,9 @@ struct rspamd_controller_worker_ctx { /* Local keypair */ gpointer key; - struct event *rrd_event; + ev_timer rrd_event; struct rspamd_rrd_file *rrd; - struct event save_stats_event; + ev_timer save_stats_event; struct rspamd_lang_detector *lang_det; gdouble task_timeout; }; @@ -1525,7 +1521,7 @@ rspamd_controller_handle_lua_history (lua_State *L, if (lua_isfunction (L, -1)) { task = rspamd_task_new (session->ctx->worker, session->cfg, - session->pool, ctx->lang_det, ctx->ev_base); + session->pool, ctx->lang_det, ctx->event_loop); task->resolver = ctx->resolver; task->s = rspamd_session_create (session->pool, @@ -1822,7 +1818,7 @@ rspamd_controller_handle_lua (struct rspamd_http_connection_entry *conn_ent, } task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool, - ctx->lang_det, ctx->ev_base); + ctx->lang_det, ctx->event_loop); task->resolver = ctx->resolver; task->s = rspamd_session_create (session->pool, @@ -2004,7 +2000,7 @@ rspamd_controller_handle_learn_common ( } task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool, - session->ctx->lang_det, ctx->ev_base); + session->ctx->lang_det, ctx->event_loop); task->resolver = ctx->resolver; task->s = rspamd_session_create (session->pool, @@ -2102,7 +2098,7 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent, } task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool, - ctx->lang_det, ctx->ev_base); + ctx->lang_det, ctx->event_loop); task->resolver = ctx->resolver; task->s = rspamd_session_create (session->pool, @@ -2133,7 +2129,7 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent, event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout, task); - event_base_set (ctx->ev_base, &task->timeout_ev); + event_base_set (ctx->event_loop, &task->timeout_ev); double_to_tv (ctx->task_timeout, &task_tv); event_add (&task->timeout_ev, &task_tv); } @@ -2600,7 +2596,7 @@ rspamd_controller_handle_stat_common ( ctx = session->ctx; task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool, - ctx->lang_det, ctx->ev_base); + ctx->lang_det, ctx->event_loop); task->resolver = ctx->resolver; cbdata = rspamd_mempool_alloc0 (session->pool, sizeof (*cbdata)); cbdata->conn_ent = conn_ent; @@ -3002,7 +2998,7 @@ rspamd_controller_handle_lua_plugin (struct rspamd_http_connection_entry *conn_e } task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool, - ctx->lang_det, ctx->ev_base); + ctx->lang_det, ctx->event_loop); task->resolver = ctx->resolver; task->s = rspamd_session_create (session->pool, @@ -3487,7 +3483,7 @@ lua_csession_get_ev_base (lua_State *L) s = c->ud; pbase = lua_newuserdata (L, sizeof (struct ev_loop *)); rspamd_lua_setclass (L, "rspamd{ev_base}", -1); - *pbase = s->ctx->ev_base; + *pbase = s->ctx->event_loop; } else { return luaL_error (L, "invalid arguments"); @@ -3702,7 +3698,7 @@ start_controller_worker (struct rspamd_worker *worker) const guint save_stats_interval = 60 * 1000; /* 1 minute */ gpointer m; - ctx->ev_base = rspamd_prepare_worker (worker, + ctx->event_loop = rspamd_prepare_worker (worker, "controller", rspamd_controller_accept_socket); msec_to_tv (ctx->timeout, &ctx->io_tv); @@ -3752,7 +3748,7 @@ start_controller_worker (struct rspamd_worker *worker) if (ctx->rrd) { ctx->rrd_event = g_malloc0 (sizeof (*ctx->rrd_event)); evtimer_set (ctx->rrd_event, rspamd_controller_rrd_update, ctx); - event_base_set (ctx->ev_base, ctx->rrd_event); + event_base_set (ctx->event_loop, ctx->rrd_event); event_add (ctx->rrd_event, &rrd_update_time); } else if (rrd_err) { @@ -3773,7 +3769,7 @@ start_controller_worker (struct rspamd_worker *worker) "password"); /* Accept event */ - ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->ev_base, + ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop, ctx->cfg->ups_ctx); ctx->http = rspamd_http_router_new (rspamd_controller_error_handler, rspamd_controller_finish_handler, &ctx->io_tv, @@ -3889,40 +3885,40 @@ start_controller_worker (struct rspamd_worker *worker) rspamd_controller_handle_unknown); ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger, - ctx->ev_base, + ctx->event_loop, worker->srv->cfg); rspamd_upstreams_library_config (worker->srv->cfg, worker->srv->cfg->ups_ctx, - ctx->ev_base, ctx->resolver->r); - rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->ev_base, + ctx->event_loop, ctx->resolver->r); + rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->event_loop, worker); - rspamd_stat_init (worker->srv->cfg, ctx->ev_base); + rspamd_stat_init (worker->srv->cfg, ctx->event_loop); if (worker->index == 0) { if (!ctx->cfg->disable_monitored) { - rspamd_worker_init_monitored (worker, ctx->ev_base, ctx->resolver); + rspamd_worker_init_monitored (worker, ctx->event_loop, ctx->resolver); } - rspamd_map_watch (worker->srv->cfg, ctx->ev_base, + rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, TRUE); /* Schedule periodic stats saving, see #1823 */ event_set (&ctx->save_stats_event, -1, EV_PERSIST, rspamd_controller_stats_save_periodic, ctx); - event_base_set (ctx->ev_base, &ctx->save_stats_event); + event_base_set (ctx->event_loop, &ctx->save_stats_event); msec_to_tv (save_stats_interval, &stv); evtimer_add (&ctx->save_stats_event, &stv); } else { - rspamd_map_watch (worker->srv->cfg, ctx->ev_base, + rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, FALSE); } - rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->ev_base, worker); + rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop, worker); /* Start event loop */ - event_base_loop (ctx->ev_base, 0); + event_base_loop (ctx->event_loop, 0); rspamd_worker_block_signals (); rspamd_stat_close (); diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 86a4230de..4565be874 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -36,8 +36,7 @@ #include "libserver/rspamd_control.h" #include "libutil/hash.h" #include "libutil/map_private.h" -#include "libutil/http_private.h" -#include "libutil/http_router.h" +#include "contrib/uthash/utlist.h" #include "unix-std.h" #include <math.h> @@ -132,7 +131,7 @@ static const guint64 rspamd_fuzzy_storage_magic = 0x291a3253eb1b3ea5ULL; struct rspamd_fuzzy_storage_ctx { guint64 magic; /* Events base */ - struct ev_loop *ev_base; + struct ev_loop *event_loop; /* DNS resolver */ struct rspamd_dns_resolver *resolver; /* Config */ @@ -146,34 +145,21 @@ struct rspamd_fuzzy_storage_ctx { struct rspamd_radix_map_helper *blocked_ips; struct rspamd_radix_map_helper *ratelimit_whitelist; - struct rspamd_cryptobox_keypair *sync_keypair; - struct rspamd_cryptobox_pubkey *master_key; - struct timeval master_io_tv; - gdouble master_timeout; - GPtrArray *mirrors; const ucl_object_t *update_map; - const ucl_object_t *masters_map; const ucl_object_t *blocked_map; const ucl_object_t *ratelimit_whitelist_map; - GHashTable *master_flags; guint keypair_cache_size; - gint peer_fd; - struct event peer_ev; - struct event stat_ev; - struct timeval stat_tv; + ev_timer stat_ev; + ev_io peer_ev; + ev_tstamp stat_timeout; /* Local keypair */ struct rspamd_cryptobox_keypair *default_keypair; /* Bad clash, need for parse keypair */ struct fuzzy_key *default_key; GHashTable *keys; gboolean encrypted_only; - gboolean collection_mode; gboolean read_only; - struct rspamd_cryptobox_keypair *collection_keypair; - struct rspamd_cryptobox_pubkey *collection_sign_key; - gchar *collection_id_file; - struct rspamd_http_context *http_ctx; struct rspamd_keypair_cache *keypair_cache; rspamd_lru_hash_t *errors_ips; rspamd_lru_hash_t *ratelimit_buckets; @@ -181,7 +167,8 @@ struct rspamd_fuzzy_storage_ctx { GArray *updates_pending; guint updates_failed; guint updates_maxfail; - guint32 collection_id; + /* Used to send data between workers */ + gint peer_fd; /* Ratelimits */ guint leaky_bucket_ttl; @@ -192,7 +179,6 @@ struct rspamd_fuzzy_storage_ctx { gdouble leaky_bucket_rate; struct rspamd_worker *worker; - struct rspamd_http_connection_router *collection_rt; const ucl_object_t *skip_map; struct rspamd_hash_map_helper *skip_hashes; guchar cookie[COOKIE_SIZE]; @@ -224,14 +210,14 @@ struct fuzzy_session { enum fuzzy_cmd_type cmd_type; gint fd; guint64 time; - struct event io; + struct ev_io io; ref_entry_t ref; struct fuzzy_key_stat *key_stat; guchar nm[rspamd_cryptobox_MAX_NMBYTES]; }; struct fuzzy_peer_request { - struct event io_ev; + ev_io io_ev; struct fuzzy_peer_cmd cmd; }; @@ -241,19 +227,13 @@ struct fuzzy_key { struct fuzzy_key_stat *stat; }; -struct fuzzy_master_update_session { - const gchar *name; - gchar uid[16]; - struct rspamd_http_connection *conn; - struct rspamd_http_message *msg; +struct rspamd_updates_cbdata { + GArray *updates_pending; struct rspamd_fuzzy_storage_ctx *ctx; - const gchar *src; - gchar *psrc; - rspamd_inet_addr_t *addr; - gboolean replied; - gint sock; + gchar *source; }; + static void rspamd_fuzzy_write_reply (struct fuzzy_session *session); static gboolean @@ -261,8 +241,7 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session) { rspamd_inet_addr_t *masked; struct rspamd_leaky_bucket_elt *elt; - struct timeval tv; - gdouble now; + ev_tstamp now; if (session->ctx->ratelimit_whitelist != NULL) { if (rspamd_match_radix_map_addr (session->ctx->ratelimit_whitelist, @@ -289,15 +268,9 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session) MIN (MAX (session->ctx->leaky_bucket_mask * 4, 64), 128)); } -#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC - event_base_gettimeofday_cached (session->ctx->ev_base, &tv); -#else - gettimeofday (&tv, NULL); -#endif - - now = tv_to_double (&tv); + now = ev_now (session->ctx->event_loop); elt = rspamd_lru_hash_lookup (session->ctx->ratelimit_buckets, masked, - tv.tv_sec); + now); if (elt) { gboolean ratelimited = FALSE; @@ -348,7 +321,7 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session) rspamd_lru_hash_insert (session->ctx->ratelimit_buckets, masked, elt, - tv.tv_sec, + now, session->ctx->leaky_bucket_ttl); } @@ -424,15 +397,6 @@ fuzzy_count_callback (guint64 count, void *ud) ctx->stat.fuzzy_hashes = count; } -struct fuzzy_slave_connection { - struct rspamd_cryptobox_keypair *local_key; - struct rspamd_cryptobox_pubkey *remote_key; - struct upstream *up; - struct rspamd_http_connection *http_conn; - struct rspamd_fuzzy_mirror *mirror; - gint sock; -}; - static void fuzzy_rl_bucket_free (gpointer p) { @@ -443,228 +407,32 @@ fuzzy_rl_bucket_free (gpointer p) } static void -fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn) -{ - if (conn) { - if (conn->http_conn) { - rspamd_http_connection_reset (conn->http_conn); - rspamd_http_connection_unref (conn->http_conn); - } - - close (conn->sock); - - g_free (conn); - } -} - -struct rspamd_fuzzy_updates_cbdata { - struct rspamd_fuzzy_storage_ctx *ctx; - struct rspamd_http_message *msg; - struct fuzzy_slave_connection *conn; - struct rspamd_fuzzy_mirror *m; - GArray *updates_pending; -}; - -static void -fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud) -{ - struct rspamd_fuzzy_updates_cbdata *cbdata = ud; - struct fuzzy_peer_cmd *io_cmd; - guint32 rev32 = rev64, len; - const gchar *p; - rspamd_fstring_t *reply; - struct fuzzy_slave_connection *conn; - struct rspamd_fuzzy_storage_ctx *ctx; - struct rspamd_http_message *msg; - struct rspamd_fuzzy_mirror *m; - struct timeval tv; - guint i; - - conn = cbdata->conn; - ctx = cbdata->ctx; - msg = cbdata->msg; - m = cbdata->m; - - rev32 = GUINT32_TO_LE (rev32); - len = sizeof (guint32) * 2; /* revision + last chunk */ - - for (i = 0; i < cbdata->updates_pending->len; i ++) { - io_cmd = &g_array_index (cbdata->updates_pending, - struct fuzzy_peer_cmd, i); - - if (io_cmd->is_shingle) { - len += sizeof (guint32) + sizeof (guint32) + - sizeof (struct rspamd_fuzzy_shingle_cmd); - } - else { - len += sizeof (guint32) + sizeof (guint32) + - sizeof (struct rspamd_fuzzy_cmd); - } - } - - reply = rspamd_fstring_sized_new (len); - reply = rspamd_fstring_append (reply, (const char *)&rev32, - sizeof (rev32)); - - for (i = 0; i < cbdata->updates_pending->len; i ++) { - io_cmd = &g_array_index (cbdata->updates_pending, struct fuzzy_peer_cmd, i); - - if (io_cmd->is_shingle) { - len = sizeof (guint32) + - sizeof (struct rspamd_fuzzy_shingle_cmd); - } - else { - len = sizeof (guint32) + - sizeof (struct rspamd_fuzzy_cmd); - } - - p = (const char *)io_cmd; - len = GUINT32_TO_LE (len); - reply = rspamd_fstring_append (reply, (const char *)&len, sizeof (len)); - reply = rspamd_fstring_append (reply, p, len); - } - - /* Last chunk */ - len = 0; - reply = rspamd_fstring_append (reply, (const char *)&len, sizeof (len)); - rspamd_http_message_set_body_from_fstring_steal (msg, reply); - double_to_tv (ctx->sync_timeout, &tv); - rspamd_http_connection_write_message (conn->http_conn, - msg, NULL, NULL, conn, - &tv); - msg_info ("send update request to %s", m->name); - - g_array_free (cbdata->updates_pending, TRUE); - g_free (cbdata); -} - -static void -fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m, - struct fuzzy_slave_connection *conn, - struct rspamd_fuzzy_storage_ctx *ctx, - struct rspamd_http_message *msg, - GArray *updates) -{ - - struct rspamd_fuzzy_updates_cbdata *cbdata; - - cbdata = g_malloc (sizeof (*cbdata)); - cbdata->ctx = ctx; - cbdata->msg = msg; - cbdata->conn = conn; - cbdata->m = m; - /* Copy queue */ - cbdata->updates_pending = g_array_sized_new (FALSE, FALSE, - sizeof (struct fuzzy_peer_cmd), updates->len); - g_array_append_vals (cbdata->updates_pending, updates->data, updates->len); - rspamd_fuzzy_backend_version (ctx->backend, local_db_name, - fuzzy_mirror_updates_version_cb, cbdata); -} - -static void -fuzzy_mirror_error_handler (struct rspamd_http_connection *conn, GError *err) -{ - struct fuzzy_slave_connection *bk_conn = conn->ud; - msg_info ("abnormally closing connection from backend: %s:%s, " - "error: %e", - bk_conn->mirror->name, - rspamd_inet_address_to_string (rspamd_upstream_addr_cur (bk_conn->up)), - err); - - fuzzy_mirror_close_connection (bk_conn); -} - -static gint -fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn, - struct rspamd_http_message *msg) +fuzzy_stat_count_callback (guint64 count, void *ud) { - struct fuzzy_slave_connection *bk_conn = conn->ud; - - msg_info ("finished mirror connection to %s", bk_conn->mirror->name); - fuzzy_mirror_close_connection (bk_conn); + struct rspamd_fuzzy_storage_ctx *ctx = ud; - return 0; + ev_timer_again (ctx->event_loop, &ctx->stat_ev); + ctx->stat.fuzzy_hashes = count; } static void -rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx, - struct rspamd_fuzzy_mirror *m, GArray *updates) +rspamd_fuzzy_stat_callback (EV_P_ ev_timer *w, int revents) { - struct fuzzy_slave_connection *conn; - struct rspamd_http_message *msg; - - conn = g_malloc0 (sizeof (*conn)); - conn->up = rspamd_upstream_get (m->u, - RSPAMD_UPSTREAM_MASTER_SLAVE, NULL, 0); - conn->mirror = m; - - if (conn->up == NULL) { - g_free (conn); - msg_err ("cannot select upstream for %s", m->name); - return; - } - - conn->sock = rspamd_inet_address_connect ( - rspamd_upstream_addr_next (conn->up), - SOCK_STREAM, TRUE); - - if (conn->sock == -1) { - g_free (conn); - msg_err ("cannot connect upstream for %s", m->name); - rspamd_upstream_fail (conn->up, TRUE); - return; - } - - msg = rspamd_http_new_message (HTTP_REQUEST); - rspamd_printf_fstring (&msg->url, "/update_v1/%s", m->name); - - conn->http_conn = rspamd_http_connection_new_client_socket ( - ctx->http_ctx, - NULL, - fuzzy_mirror_error_handler, - fuzzy_mirror_finish_handler, - RSPAMD_HTTP_CLIENT_SIMPLE, - conn->sock); - - rspamd_http_connection_set_key (conn->http_conn, - ctx->sync_keypair); - msg->peer_key = rspamd_pubkey_ref (m->key); - fuzzy_mirror_updates_to_http (m, conn, ctx, msg, updates); + struct rspamd_fuzzy_storage_ctx *ctx = + (struct rspamd_fuzzy_storage_ctx *)w->data; + rspamd_fuzzy_backend_count (ctx->backend, fuzzy_stat_count_callback, ctx); } -struct rspamd_updates_cbdata { - GArray *updates_pending; - struct rspamd_fuzzy_storage_ctx *ctx; - gchar *source; -}; static void fuzzy_update_version_callback (guint64 ver, void *ud) { msg_info ("updated fuzzy storage from %s: version: %d", - (const char *)ud, (gint)ver); + (const char *)ud, (gint)ver); g_free (ud); } static void -fuzzy_stat_count_callback (guint64 count, void *ud) -{ - struct rspamd_fuzzy_storage_ctx *ctx = ud; - - event_add (&ctx->stat_ev, &ctx->stat_tv); - ctx->stat.fuzzy_hashes = count; -} - -static void -rspamd_fuzzy_stat_callback (gint fd, gshort what, gpointer ud) -{ - struct rspamd_fuzzy_storage_ctx *ctx = ud; - - event_del (&ctx->stat_ev); - rspamd_fuzzy_backend_count (ctx->backend, fuzzy_stat_count_callback, ctx); -} - -static void rspamd_fuzzy_updates_cb (gboolean success, guint nadded, guint ndeleted, @@ -673,8 +441,6 @@ rspamd_fuzzy_updates_cb (gboolean success, void *ud) { struct rspamd_updates_cbdata *cbdata = ud; - struct rspamd_fuzzy_mirror *m; - guint i; struct rspamd_fuzzy_storage_ctx *ctx; const gchar *source; @@ -684,15 +450,6 @@ rspamd_fuzzy_updates_cb (gboolean success, if (success) { rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx); - if (ctx->updates_pending->len > 0) { - for (i = 0; i < ctx->mirrors->len; i ++) { - m = g_ptr_array_index (ctx->mirrors, i); - - rspamd_fuzzy_send_update_mirror (ctx, m, - cbdata->updates_pending); - } - } - msg_info ("successfully updated fuzzy storage: %d updates in queue; " "%d pending currently; " "%d added, %d deleted, %d extended, %d duplicates", @@ -727,12 +484,7 @@ rspamd_fuzzy_updates_cb (gboolean success, if (ctx->worker->wanna_die) { /* Plan exit */ - struct timeval tv; - - tv.tv_sec = 0; - tv.tv_usec = 0; - - event_base_loopexit (ctx->ev_base, &tv); + ev_break (ctx->event_loop, EVBREAK_ALL); } g_array_free (cbdata->updates_pending, TRUE); @@ -762,9 +514,9 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx, } static void -rspamd_fuzzy_reply_io (gint fd, gshort what, gpointer d) +rspamd_fuzzy_reply_io (EV_P_ ev_io *w, int revents) { - struct fuzzy_session *session = d; + struct fuzzy_session *session = (struct fuzzy_session *)w->data; rspamd_fuzzy_write_reply (session); REF_RELEASE (session); @@ -807,10 +559,9 @@ rspamd_fuzzy_write_reply (struct fuzzy_session *session) if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) { /* Grab reference to avoid early destruction */ REF_RETAIN (session); - event_set (&session->io, session->fd, EV_WRITE, - rspamd_fuzzy_reply_io, session); - event_base_set (session->ctx->ev_base, &session->io); - event_add (&session->io, NULL); + session->io.data = session; + ev_io_init (&session->io, rspamd_fuzzy_reply_io, session->fd, EV_WRITE); + ev_io_start (session->ctx->event_loop, &session->io); } else { msg_err ("error while writing reply: %s", strerror (errno)); @@ -819,22 +570,6 @@ rspamd_fuzzy_write_reply (struct fuzzy_session *session) } static void -fuzzy_peer_send_io (gint fd, gshort what, gpointer d) -{ - struct fuzzy_peer_request *up_req = d; - gssize r; - - r = write (fd, &up_req->cmd, sizeof (up_req->cmd)); - - if (r != sizeof (up_req->cmd)) { - msg_err ("cannot send update request to the peer: %s", strerror (errno)); - } - - event_del (&up_req->io_ev); - g_free (up_req); -} - -static void rspamd_fuzzy_update_stats (struct rspamd_fuzzy_storage_ctx *ctx, enum rspamd_fuzzy_epoch epoch, gboolean matched, @@ -946,6 +681,22 @@ rspamd_fuzzy_make_reply (struct rspamd_fuzzy_cmd *cmd, } static void +fuzzy_peer_send_io (EV_P_ ev_io *w, int revents) +{ + struct fuzzy_peer_request *up_req = (struct fuzzy_peer_request *)w->data; + gssize r; + + r = write (w->fd, &up_req->cmd, sizeof (up_req->cmd)); + + if (r != sizeof (up_req->cmd)) { + msg_err ("cannot send update request to the peer: %s", strerror (errno)); + } + + ev_io_stop (EV_A_ w); + g_free (up_req); +} + +static void rspamd_fuzzy_check_callback (struct rspamd_fuzzy_reply *result, void *ud) { struct fuzzy_session *session = ud; @@ -984,7 +735,7 @@ rspamd_fuzzy_check_callback (struct rspamd_fuzzy_reply *result, void *ud) struct fuzzy_peer_cmd up_cmd; struct fuzzy_peer_request *up_req; - if (session->worker->index == 0 || session->ctx->peer_fd == -1) { + if (session->worker->index == 0) { /* Just add to the queue */ memset (&up_cmd, 0, sizeof (up_cmd)); up_cmd.is_shingle = is_shingle; @@ -1017,10 +768,10 @@ rspamd_fuzzy_check_callback (struct rspamd_fuzzy_reply *result, void *ud) sizeof (up_req->cmd.cmd.shingle.sgl)); } - event_set (&up_req->io_ev, session->ctx->peer_fd, EV_WRITE, - fuzzy_peer_send_io, up_req); - event_base_set (session->ctx->ev_base, &up_req->io_ev); - event_add (&up_req->io_ev, NULL); + up_req->io_ev.data = up_req; + ev_io_init (&up_req->io_ev, fuzzy_peer_send_io, + session->ctx->peer_fd, EV_WRITE); + ev_io_start (session->ctx->event_loop, &up_req->io_ev); } } @@ -1103,17 +854,9 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) if (cmd->cmd == FUZZY_CHECK) { if (rspamd_fuzzy_check_client (session, FALSE)) { - if (G_UNLIKELY (session->ctx->collection_mode)) { - result.v1.prob = 0; - result.v1.value = 500; - result.v1.flag = 0; - rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, - is_shingle); - } else { - REF_RETAIN (session); - rspamd_fuzzy_backend_check (session->ctx->backend, cmd, - rspamd_fuzzy_check_callback, session); - } + REF_RETAIN (session); + rspamd_fuzzy_backend_check (session->ctx->backend, cmd, + rspamd_fuzzy_check_callback, session); } else { result.v1.value = 403; @@ -1123,18 +866,10 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) } } else if (cmd->cmd == FUZZY_STAT) { - if (G_UNLIKELY (session->ctx->collection_mode)) { - result.v1.prob = 0; - result.v1.value = 500; - result.v1.flag = 0; - rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle); - } - else { - result.v1.prob = 1.0; - result.v1.value = 0; - result.v1.flag = session->ctx->stat.fuzzy_hashes; - rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle); - } + result.v1.prob = 1.0; + result.v1.value = 0; + result.v1.flag = session->ctx->stat.fuzzy_hashes; + rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle); } else { if (rspamd_fuzzy_check_client (session, TRUE)) { @@ -1169,10 +904,10 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) (gpointer)&up_req->cmd.cmd.shingle : (gpointer)&up_req->cmd.cmd.normal; memcpy (ptr, cmd, up_len); - event_set (&up_req->io_ev, session->ctx->peer_fd, EV_WRITE, - fuzzy_peer_send_io, up_req); - event_base_set (session->ctx->ev_base, &up_req->io_ev); - event_add (&up_req->io_ev, NULL); + up_req->io_ev.data = up_req; + ev_io_init (&up_req->io_ev, fuzzy_peer_send_io, + session->ctx->peer_fd, EV_WRITE); + ev_io_start (session->ctx->event_loop, &up_req->io_ev); } result.v1.value = 0; @@ -1378,147 +1113,6 @@ rspamd_fuzzy_cmd_from_wire (guchar *buf, guint buflen, struct fuzzy_session *s) return TRUE; } -static void -rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, - struct rspamd_http_message *msg, guint our_rev) -{ - const guchar *p; - gsize remain; - gint32 revision; - guint32 len = 0, cnt = 0; - struct fuzzy_peer_cmd cmd; - enum { - read_len = 0, - read_data, - finish_processing - } state = read_len; - - gpointer flag_ptr; - - /* - * Message format: - * <uint32_le> - revision - * <uint32_le> - size of the next element - * <data> - command data - * ... - * <0> - end of data - * ... - ignored - */ - p = rspamd_http_message_get_body (msg, &remain); - - if (p && remain >= sizeof (gint32) * 2) { - memcpy (&revision, p, sizeof (gint32)); - revision = GINT32_TO_LE (revision); - - if (revision <= our_rev) { - msg_err_fuzzy_update ("remote revision: %d is older than ours: %d, " - "refusing update", - revision, our_rev); - - return; - } - else if (revision - our_rev > 1) { - msg_warn_fuzzy_update ("remote revision: %d is newer more than one revision " - "than ours: %d, cold sync is recommended", - revision, our_rev); - } - - remain -= sizeof (gint32); - p += sizeof (gint32); - } - else { - msg_err_fuzzy_update ("short update message, not processing"); - goto err; - } - - while (remain > 0) { - switch (state) { - case read_len: - if (remain < sizeof (guint32)) { - msg_err_fuzzy_update ("short update message while reading " - "length, not processing"); - goto err; - } - - memcpy (&len, p, sizeof (guint32)); - len = GUINT32_TO_LE (len); - remain -= sizeof (guint32); - p += sizeof (guint32); - - if (len == 0) { - remain = 0; - state = finish_processing; - } - else { - state = read_data; - } - break; - case read_data: - if (remain < len) { - msg_err_fuzzy_update ("short update message while reading data, " - "not processing" - " (%zd is available, %d is required)", remain, len); - return; - } - - if (len < sizeof (struct rspamd_fuzzy_cmd) + sizeof (guint32) || - len > sizeof (cmd)) { - /* Bad size command */ - msg_err_fuzzy_update ("incorrect element size: %d, at least " - "%d expected", len, - (gint)(sizeof (struct rspamd_fuzzy_cmd) + sizeof (guint32))); - goto err; - } - - memcpy (&cmd, p, len); - if (cmd.is_shingle && len != sizeof (cmd)) { - /* Short command */ - msg_err_fuzzy_update ("incorrect element size: %d, at least " - "%d expected", len, - (gint)(sizeof (cmd))); - goto err; - } - - if (cmd.is_shingle) { - if ((flag_ptr = g_hash_table_lookup (session->ctx->master_flags, - GUINT_TO_POINTER (cmd.cmd.shingle.basic.flag))) != NULL) { - cmd.cmd.shingle.basic.flag = GPOINTER_TO_UINT (flag_ptr); - } - } - else { - if ((flag_ptr = g_hash_table_lookup (session->ctx->master_flags, - GUINT_TO_POINTER (cmd.cmd.normal.flag))) != NULL) { - cmd.cmd.normal.flag = GPOINTER_TO_UINT (flag_ptr); - } - } - - g_array_append_val (session->ctx->updates_pending, cmd); - - p += len; - remain -= len; - len = 0; - state = read_len; - cnt ++; - break; - case finish_processing: - /* Do nothing */ - remain = 0; - break; - } - } - - - rspamd_fuzzy_process_updates_queue (session->ctx, session->src, TRUE); - msg_info_fuzzy_update ("processed updates from the master %s, " - "%ud operations processed," - " revision: %d (local revision: %d)", - rspamd_inet_address_to_string (session->addr), - cnt, revision, our_rev); - -err: - return; -} - static void fuzzy_session_destroy (gpointer d) @@ -1531,492 +1125,13 @@ fuzzy_session_destroy (gpointer d) g_free (session); } -static void -rspamd_fuzzy_mirror_session_destroy (struct fuzzy_master_update_session *session) -{ - if (session) { - rspamd_http_connection_reset (session->conn); - rspamd_http_connection_unref (session->conn); - rspamd_inet_address_free (session->addr); - close (session->sock); - - if (session->psrc) { - g_free (session->psrc); - } - g_free (session); - } -} - -static void -rspamd_fuzzy_mirror_error_handler (struct rspamd_http_connection *conn, GError *err) -{ - struct fuzzy_master_update_session *session = conn->ud; - - msg_err_fuzzy_update ("abnormally closing connection from: %s, error: %e", - rspamd_inet_address_to_string (session->addr), err); - /* Terminate session immediately */ - rspamd_fuzzy_mirror_session_destroy (session); -} - -static void -rspamd_fuzzy_mirror_send_reply (struct fuzzy_master_update_session *session, - guint code, const gchar *str) -{ - struct rspamd_http_message *msg; - - msg = rspamd_http_new_message (HTTP_RESPONSE); - msg->url = rspamd_fstring_new_init (str, strlen (str)); - msg->code = code; - session->replied = TRUE; - - rspamd_http_connection_reset (session->conn); - rspamd_http_connection_write_message (session->conn, msg, NULL, "text/plain", - session, &session->ctx->master_io_tv); -} - -static void -rspamd_fuzzy_update_version_callback (guint64 version, void *ud) -{ - struct fuzzy_master_update_session *session = ud; - - rspamd_fuzzy_mirror_process_update (session, session->msg, version); - rspamd_fuzzy_mirror_send_reply (session, 200, "OK"); -} - -static gint -rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn, - struct rspamd_http_message *msg) -{ - struct fuzzy_master_update_session *session = conn->ud; - const struct rspamd_cryptobox_pubkey *rk; - const gchar *err_str = NULL; - gchar *psrc; - const gchar *src = NULL; - gsize remain; - - if (session->replied) { - rspamd_fuzzy_mirror_session_destroy (session); - - return 0; - } - - /* Check key */ - if (!rspamd_http_connection_is_encrypted (conn)) { - msg_err_fuzzy_update ("refuse unencrypted update from: %s", - rspamd_inet_address_to_string (session->addr)); - err_str = "Unencrypted update is not allowed"; - goto end; - } - else { - - if (session->ctx->master_key) { - rk = rspamd_http_connection_get_peer_key (conn); - g_assert (rk != NULL); - - if (!rspamd_pubkey_equal (rk, session->ctx->master_key)) { - msg_err_fuzzy_update ("refuse unknown pubkey update from: %s", - rspamd_inet_address_to_string (session->addr)); - err_str = "Unknown pubkey"; - goto end; - } - } - else { - msg_warn_fuzzy_update ("no trusted key specified, accept any update from %s", - rspamd_inet_address_to_string (session->addr)); - } - if (!rspamd_http_message_get_body (msg, NULL) || !msg->url - || msg->url->len == 0) { - msg_err_fuzzy_update ("empty update message, not processing"); - err_str = "Empty update"; - - goto end; - } - - /* Detect source from url: /update_v1/<source>, so we look for the last '/' */ - remain = msg->url->len; - psrc = rspamd_fstringdup (msg->url); - src = psrc; - - while (remain--) { - if (src[remain] == '/') { - src = &src[remain + 1]; - break; - } - } - - session->src = src; - session->psrc = psrc; - session->msg = msg; - rspamd_fuzzy_backend_version (session->ctx->backend, src, - rspamd_fuzzy_update_version_callback, session); - - return 0; - } - -end: - rspamd_fuzzy_mirror_send_reply (session, 403, err_str); - - return 0; -} - -struct rspamd_fuzzy_collection_session { - struct rspamd_fuzzy_storage_ctx *ctx; - struct rspamd_worker *worker; - rspamd_inet_addr_t *from_addr; - guchar uid[16]; -}; - -static void -rspamd_fuzzy_collection_error_handler (struct rspamd_http_connection_entry *conn_ent, - GError *err) -{ - struct rspamd_fuzzy_collection_session *session = conn_ent->ud; - - msg_err_fuzzy_collection ("http error occurred: %s", err->message); -} - -static void -rspamd_fuzzy_collection_finish_handler (struct rspamd_http_connection_entry *conn_ent) -{ - struct rspamd_fuzzy_collection_session *session = conn_ent->ud; - - - rspamd_inet_address_free (session->from_addr); - g_free (session); -} - -void -rspamd_fuzzy_collection_send_error (struct rspamd_http_connection_entry *entry, - gint code, const gchar *error_msg, ...) -{ - struct rspamd_http_message *msg; - va_list args; - rspamd_fstring_t *reply; - - msg = rspamd_http_new_message (HTTP_RESPONSE); - - va_start (args, error_msg); - msg->status = rspamd_fstring_new (); - rspamd_vprintf_fstring (&msg->status, error_msg, args); - va_end (args); - - msg->date = time (NULL); - msg->code = code; - reply = rspamd_fstring_sized_new (msg->status->len + 16); - rspamd_printf_fstring (&reply, "%V", msg->status); - rspamd_http_message_set_body_from_fstring_steal (msg, reply); - rspamd_http_connection_reset (entry->conn); - rspamd_http_router_insert_headers (entry->rt, msg); - rspamd_http_connection_write_message (entry->conn, - msg, - NULL, - "text/plain", - entry, - entry->rt->ptv); - entry->is_reply = TRUE; -} - -/* - * Note: this function steals fstring - */ -void -rspamd_fuzzy_collection_send_fstring (struct rspamd_http_connection_entry *entry, - rspamd_fstring_t *fstr) -{ - struct rspamd_http_message *msg; - - msg = rspamd_http_new_message (HTTP_RESPONSE); - msg->status = rspamd_fstring_new_init ("OK", 2); - msg->date = time (NULL); - msg->code = 200; - rspamd_http_message_set_body_from_fstring_steal (msg, fstr); - rspamd_http_connection_reset (entry->conn); - rspamd_http_router_insert_headers (entry->rt, msg); - rspamd_http_connection_write_message (entry->conn, - msg, - NULL, - "application/octet-stream", - entry, - entry->rt->ptv); - entry->is_reply = TRUE; -} - -static int -rspamd_fuzzy_collection_cookie (struct rspamd_http_connection_entry *conn_ent, - struct rspamd_http_message *msg) -{ - struct rspamd_fuzzy_collection_session *session = conn_ent->ud; - rspamd_fstring_t *cookie; - - cookie = rspamd_fstring_new_init (session->ctx->cookie, - sizeof (session->ctx->cookie)); - rspamd_fuzzy_collection_send_fstring (conn_ent, cookie); - - return 0; -} - -static int -rspamd_fuzzy_collection_data (struct rspamd_http_connection_entry *conn_ent, - struct rspamd_http_message *msg) -{ - struct rspamd_fuzzy_collection_session *session = conn_ent->ud; - const rspamd_ftok_t *sign_header; - struct rspamd_fuzzy_storage_ctx *ctx; - guint i; - struct fuzzy_peer_cmd *io_cmd; - rspamd_fstring_t *reply; - GError *err = NULL; - guchar *decoded_signature; - gsize dec_len; - guint32 cmdlen, nupdates = 0; - - sign_header = rspamd_http_message_find_header (msg, "Signature"); - - if (sign_header == NULL) { - rspamd_fuzzy_collection_send_error (conn_ent, 403, "Missing signature"); - - return 0; - } - - ctx = session->ctx; - - if (ctx->collection_sign_key == NULL) { - rspamd_fuzzy_collection_send_error (conn_ent, 500, "Misconfigured signature key"); - - return 0; - } - - decoded_signature = g_malloc (sign_header->len * 2 + 1); - dec_len = rspamd_decode_hex_buf (sign_header->begin, sign_header->len, - decoded_signature, sign_header->len * 2 + 1); - - if (dec_len == -1 || !rspamd_keypair_verify (ctx->collection_sign_key, - ctx->cookie, sizeof (ctx->cookie), - decoded_signature, dec_len, &err)) { - if (err) { - rspamd_fuzzy_collection_send_error (conn_ent, 403, "Signature verification error: %e", - err); - g_error_free (err); - } - else { - rspamd_fuzzy_collection_send_error (conn_ent, 403, "Signature verification error"); - } - - g_free (decoded_signature); - - return 0; - } - - g_free (decoded_signature); - - /* Generate new cookie */ - ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie)); - - /* Send&Clear updates */ - reply = rspamd_fstring_sized_new (8192); - /* - * Message format: - * <uint32_le> - revision - * <uint32_le> - size of the next element - * <data> - command data - * ... - * <0> - end of data - * ... - ignored - */ - reply = rspamd_fstring_append (reply, (const gchar *)&ctx->collection_id, - sizeof (ctx->collection_id)); - - for (i = 0; i < ctx->updates_pending->len; i ++) { - io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i); - - if (io_cmd->is_shingle) { - cmdlen = sizeof (io_cmd->cmd.shingle) + sizeof (guint32); - - } - else { - cmdlen = sizeof (io_cmd->cmd.normal) + sizeof (guint32); - } - - cmdlen = GUINT32_TO_LE (cmdlen); - reply = rspamd_fstring_append (reply, (const gchar *)&cmdlen, - sizeof (cmdlen)); - reply = rspamd_fstring_append (reply, (const gchar *)io_cmd, - cmdlen); - nupdates ++; - } - - msg_info_fuzzy_collection ("collection %d done, send %d updates", - ctx->collection_id, nupdates); - /* Last command */ - cmdlen = 0; - reply = rspamd_fstring_append (reply, (const gchar *)&cmdlen, - sizeof (cmdlen)); - - ctx->updates_pending->len = 0; - /* Clear failed attempts counter */ - ctx->updates_failed = 0; - ctx->collection_id ++; - rspamd_fuzzy_collection_send_fstring (conn_ent, reply); - - return 0; -} - - -static void -accept_fuzzy_collection_socket (gint fd, short what, void *arg) -{ - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - rspamd_inet_addr_t *addr; - gint nfd; - struct rspamd_fuzzy_storage_ctx *ctx; - struct rspamd_fuzzy_collection_session *session; - - if ((nfd = - rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) { - msg_warn ("accept failed: %s", strerror (errno)); - return; - } - /* Check for EAGAIN */ - if (nfd == 0) { - return; - } - - ctx = worker->ctx; - - if (!ctx->collection_keypair) { - msg_err ("deny request from %s, as no local keypair is specified", - rspamd_inet_address_to_string (addr)); - rspamd_inet_address_free (addr); - close (nfd); - - return; - } - - session = g_malloc0 (sizeof (*session)); - session->ctx = ctx; - session->worker = worker; - rspamd_random_hex (session->uid, sizeof (session->uid) - 1); - session->uid[sizeof (session->uid) - 1] = '\0'; - session->from_addr = addr; - rspamd_http_router_handle_socket (ctx->collection_rt, nfd, session); - msg_info_fuzzy_collection ("accepted connection from %s port %d, session ptr: %p", - rspamd_inet_address_to_string (addr), - rspamd_inet_address_get_port (addr), - session); -} - -static void -rspamd_fuzzy_collection_periodic (gint fd, gshort what, gpointer ud) -{ - struct rspamd_fuzzy_storage_ctx *ctx = ud; - - if (++ctx->updates_failed > ctx->updates_maxfail) { - msg_err ("cannot store more data in workqueue, discard " - "%ud updates after %d missed collection points", - ctx->updates_pending->len, - ctx->updates_maxfail); - ctx->updates_failed = 0; - ctx->updates_pending->len = 0; - /* Regenerate cookie */ - ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie)); - } - else { - msg_err ("fuzzy data has not been collected in time, " - "%ud updates are still pending, %d updates left", - ctx->updates_pending->len, - ctx->updates_maxfail - ctx->updates_failed); - } - - if (ctx->worker->wanna_die) { - /* Plan exit */ - struct timeval tv; - - tv.tv_sec = 0; - tv.tv_usec = 0; - - event_base_loopexit (ctx->ev_base, &tv); - } -} - - -static void -accept_fuzzy_mirror_socket (gint fd, short what, void *arg) -{ - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - rspamd_inet_addr_t *addr; - gint nfd; - struct rspamd_http_connection *http_conn; - struct rspamd_fuzzy_storage_ctx *ctx; - struct fuzzy_master_update_session *session; - - if ((nfd = - rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) { - msg_warn ("accept failed: %s", strerror (errno)); - return; - } - /* Check for EAGAIN */ - if (nfd == 0) { - return; - } - - ctx = worker->ctx; - - if (!ctx->master_ips) { - msg_err ("deny update request from %s as no masters defined", - rspamd_inet_address_to_string (addr)); - rspamd_inet_address_free (addr); - close (nfd); - - return; - } - else if (rspamd_match_radix_map_addr (ctx->master_ips, addr) == NULL) { - msg_err ("deny update request from %s", - rspamd_inet_address_to_string (addr)); - rspamd_inet_address_free (addr); - close (nfd); - - return; - } - - if (!ctx->sync_keypair) { - msg_err ("deny update request from %s, as no local keypair is specified", - rspamd_inet_address_to_string (addr)); - rspamd_inet_address_free (addr); - close (nfd); - - return; - } - - session = g_malloc0 (sizeof (*session)); - session->name = rspamd_inet_address_to_string (addr); - rspamd_random_hex (session->uid, sizeof (session->uid) - 1); - session->uid[sizeof (session->uid) - 1] = '\0'; - http_conn = rspamd_http_connection_new_server ( - ctx->http_ctx, - nfd, - NULL, - rspamd_fuzzy_mirror_error_handler, - rspamd_fuzzy_mirror_finish_handler, - 0); - - rspamd_http_connection_set_key (http_conn, ctx->sync_keypair); - session->ctx = ctx; - session->conn = http_conn; - session->addr = addr; - session->sock = nfd; - - rspamd_http_connection_read_message (http_conn, - session, - &ctx->master_io_tv); -} - /* * Accept new connection and construct task */ static void -accept_fuzzy_socket (gint fd, short what, void *arg) +accept_fuzzy_socket (EV_P_ ev_io *w, int revents) { - struct rspamd_worker *worker = (struct rspamd_worker *)arg; + struct rspamd_worker *worker = (struct rspamd_worker *)w->data; struct fuzzy_session *session; rspamd_inet_addr_t *addr; gssize r; @@ -2024,12 +1139,12 @@ accept_fuzzy_socket (gint fd, short what, void *arg) guint64 *nerrors; /* Got some data */ - if (what == EV_READ) { + if (revents == EV_READ) { for (;;) { worker->nconns++; - r = rspamd_inet_address_recvfrom (fd, + r = rspamd_inet_address_recvfrom (w->fd, buf, sizeof (buf), 0, @@ -2053,7 +1168,7 @@ accept_fuzzy_socket (gint fd, short what, void *arg) session = g_malloc0 (sizeof (*session)); REF_INIT_RETAIN (session, fuzzy_session_destroy); session->worker = worker; - session->fd = fd; + session->fd = w->fd; session->ctx = worker->ctx; session->time = (guint64) time (NULL); session->addr = addr; @@ -2148,7 +1263,7 @@ rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main, memset (&rep, 0, sizeof (rep)); rep.type = RSPAMD_CONTROL_RELOAD; - if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base, + if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->event_loop, worker->cf->options, rspamd_main->cfg, &err)) == NULL) { msg_err ("cannot open backend after reload: %e", err); @@ -2391,120 +1506,6 @@ rspamd_fuzzy_storage_stat (struct rspamd_main *rspamd_main, } static gboolean -fuzzy_storage_parse_mirror (rspamd_mempool_t *pool, - const ucl_object_t *obj, - gpointer ud, - struct rspamd_rcl_section *section, - GError **err) -{ - const ucl_object_t *elt; - struct rspamd_fuzzy_mirror *up = NULL; - struct rspamd_rcl_struct_parser *pd = ud; - struct rspamd_fuzzy_storage_ctx *ctx; - - ctx = pd->user_struct; - - if (ucl_object_type (obj) != UCL_OBJECT) { - g_set_error (err, g_quark_try_string ("fuzzy"), 100, - "mirror/slave option must be an object"); - - return FALSE; - } - - elt = ucl_object_lookup (obj, "name"); - if (elt == NULL) { - g_set_error (err, g_quark_try_string ("fuzzy"), 100, - "mirror option must have some name definition"); - - return FALSE; - } - - up = g_malloc0 (sizeof (*up)); - up->name = g_strdup (ucl_object_tostring (elt)); - - elt = ucl_object_lookup (obj, "key"); - if (elt != NULL) { - up->key = rspamd_pubkey_from_base32 (ucl_object_tostring (elt), 0, - RSPAMD_KEYPAIR_KEX, RSPAMD_CRYPTOBOX_MODE_25519); - } - - if (up->key == NULL) { - g_set_error (err, g_quark_try_string ("fuzzy"), 100, - "cannot read mirror key"); - - goto err; - } - - elt = ucl_object_lookup (obj, "hosts"); - - if (elt == NULL) { - g_set_error (err, g_quark_try_string ("fuzzy"), 100, - "mirror option must have some hosts definition"); - - goto err; - } - - up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx); - if (!rspamd_upstreams_from_ucl (up->u, elt, 11335, NULL)) { - g_set_error (err, g_quark_try_string ("fuzzy"), 100, - "mirror has bad hosts definition"); - - goto err; - } - - g_ptr_array_add (ctx->mirrors, up); - - return TRUE; - -err: - g_free (up->name); - rspamd_upstreams_destroy (up->u); - - if (up->key) { - rspamd_pubkey_unref (up->key); - } - - g_free (up); - - return FALSE; -} - -static gboolean -fuzzy_storage_parse_master_flags (rspamd_mempool_t *pool, - const ucl_object_t *obj, - gpointer ud, - struct rspamd_rcl_section *section, - GError **err) -{ - const ucl_object_t *cur; - struct rspamd_rcl_struct_parser *pd = ud; - struct rspamd_fuzzy_storage_ctx *ctx; - ucl_object_iter_t it = NULL; - gulong remote_flag; - gint64 local_flag; - - ctx = pd->user_struct; - - if (ucl_object_type (obj) != UCL_OBJECT) { - g_set_error (err, g_quark_try_string ("fuzzy"), 100, - "master_flags option must be an object"); - - return FALSE; - } - - while ((cur = ucl_iterate_object (obj, &it, true)) != NULL) { - if (rspamd_strtoul (cur->key, cur->keylen, &remote_flag) && - ucl_object_toint_safe (cur, (int64_t *)&local_flag)) { - g_hash_table_insert (ctx->master_flags, GUINT_TO_POINTER (remote_flag), - GUINT_TO_POINTER (local_flag)); - } - } - - return TRUE; -} - - -static gboolean fuzzy_parse_keypair (rspamd_mempool_t *pool, const ucl_object_t *obj, gpointer ud, @@ -2599,26 +1600,18 @@ init_fuzzy (struct rspamd_config *cfg) ctx->magic = rspamd_fuzzy_storage_magic; ctx->sync_timeout = DEFAULT_SYNC_TIMEOUT; - ctx->master_timeout = DEFAULT_MASTER_TIMEOUT; ctx->keypair_cache_size = DEFAULT_KEYPAIR_CACHE_SIZE; ctx->keys = g_hash_table_new_full (fuzzy_kp_hash, fuzzy_kp_equal, NULL, fuzzy_key_dtor); rspamd_mempool_add_destructor (cfg->cfg_pool, (rspamd_mempool_destruct_t)g_hash_table_unref, ctx->keys); - ctx->master_flags = g_hash_table_new (g_direct_hash, g_direct_equal); - rspamd_mempool_add_destructor (cfg->cfg_pool, - (rspamd_mempool_destruct_t)g_hash_table_unref, ctx->master_flags); ctx->errors_ips = rspamd_lru_hash_new_full (1024, (GDestroyNotify) rspamd_inet_address_free, g_free, rspamd_inet_address_hash, rspamd_inet_address_equal); rspamd_mempool_add_destructor (cfg->cfg_pool, (rspamd_mempool_destruct_t)rspamd_lru_hash_destroy, ctx->errors_ips); ctx->cfg = cfg; - ctx->mirrors = g_ptr_array_new (); - rspamd_mempool_add_destructor (cfg->cfg_pool, - (rspamd_mempool_destruct_t)rspamd_ptr_array_free_hard, ctx->mirrors); ctx->updates_maxfail = DEFAULT_UPDATES_MAXFAIL; - ctx->collection_id_file = RSPAMD_DBDIR "/fuzzy_collection.id"; ctx->leaky_bucket_mask = DEFAULT_BUCKET_MASK; ctx->leaky_bucket_ttl = DEFAULT_BUCKET_TTL; ctx->max_buckets = DEFAULT_MAX_BUCKETS; @@ -2694,32 +1687,6 @@ init_fuzzy (struct rspamd_config *cfg) 0, "Work in read only mode"); - rspamd_rcl_register_worker_option (cfg, - type, - "master_timeout", - rspamd_rcl_parse_struct_time, - ctx, - G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_timeout), - RSPAMD_CL_FLAG_TIME_FLOAT, - "Master protocol IO timeout"); - - rspamd_rcl_register_worker_option (cfg, - type, - "sync_keypair", - rspamd_rcl_parse_struct_keypair, - ctx, - G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, sync_keypair), - 0, - "Encryption key for master/slave updates"); - - rspamd_rcl_register_worker_option (cfg, - type, - "masters", - rspamd_rcl_parse_struct_ucl, - ctx, - G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, masters_map), - 0, - "Allow master/slave updates from the following IP addresses"); rspamd_rcl_register_worker_option (cfg, type, @@ -2730,43 +1697,9 @@ init_fuzzy (struct rspamd_config *cfg) 0, "Block requests from specific networks"); - rspamd_rcl_register_worker_option (cfg, - type, - "master_key", - rspamd_rcl_parse_struct_pubkey, - ctx, - G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_key), - 0, - "Allow master/slave updates merely using the specified key"); rspamd_rcl_register_worker_option (cfg, type, - "mirror", - fuzzy_storage_parse_mirror, - ctx, - 0, - RSPAMD_CL_FLAG_MULTIPLE, - "List of slave hosts"); - - rspamd_rcl_register_worker_option (cfg, - type, - "slave", - fuzzy_storage_parse_mirror, - ctx, - 0, - RSPAMD_CL_FLAG_MULTIPLE, - "List of slave hosts"); - - rspamd_rcl_register_worker_option (cfg, - type, - "master_flags", - fuzzy_storage_parse_master_flags, - ctx, - 0, - 0, - "Map of flags in form master_flags = { master_flag = local_flag; ... }; "); - rspamd_rcl_register_worker_option (cfg, - type, "updates_maxfail", rspamd_rcl_parse_struct_integer, ctx, @@ -2775,38 +1708,6 @@ init_fuzzy (struct rspamd_config *cfg) "Maximum number of updates to be failed before discarding"); rspamd_rcl_register_worker_option (cfg, type, - "collection_only", - rspamd_rcl_parse_struct_boolean, - ctx, - G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_mode), - 0, - "Start fuzzy in collection only mode"); - rspamd_rcl_register_worker_option (cfg, - type, - "collection_signkey", - rspamd_rcl_parse_struct_pubkey, - ctx, - G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_sign_key), - RSPAMD_CL_FLAG_SIGNKEY, - "Accept only signed requests with the specified key"); - rspamd_rcl_register_worker_option (cfg, - type, - "collection_keypair", - rspamd_rcl_parse_struct_keypair, - ctx, - G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_keypair), - 0, - "Use the specified keypair to encrypt collection protocol"); - rspamd_rcl_register_worker_option (cfg, - type, - "collection_id_file", - rspamd_rcl_parse_struct_string, - ctx, - G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_id_file), - RSPAMD_CL_FLAG_STRING_PATH, - "Store collection epoch in the desired file"); - rspamd_rcl_register_worker_option (cfg, - type, "skip_hashes", rspamd_rcl_parse_struct_ucl, ctx, @@ -2877,17 +1778,18 @@ init_fuzzy (struct rspamd_config *cfg) } static void -rspamd_fuzzy_peer_io (gint fd, gshort what, gpointer d) +rspamd_fuzzy_peer_io (EV_P_ ev_io *w, int revents) { struct fuzzy_peer_cmd cmd; - struct rspamd_fuzzy_storage_ctx *ctx = d; + struct rspamd_fuzzy_storage_ctx *ctx = + (struct rspamd_fuzzy_storage_ctx *)w->data; gssize r; - r = read (fd, &cmd, sizeof (cmd)); + r = read (w->fd, &cmd, sizeof (cmd)); if (r != sizeof (cmd)) { if (errno == EINTR) { - rspamd_fuzzy_peer_io (fd, what, d); + rspamd_fuzzy_peer_io (EV_A_ w, revents); return; } if (errno != EAGAIN) { @@ -2907,7 +1809,7 @@ fuzzy_peer_rep (struct rspamd_worker *worker, struct rspamd_fuzzy_storage_ctx *ctx = ud; GList *cur; struct rspamd_worker_listen_socket *ls; - struct event *accept_events; + struct rspamd_worker_accept_event *ac_ev; ctx->peer_fd = rep_fd; @@ -2931,30 +1833,17 @@ fuzzy_peer_rep (struct rspamd_worker *worker, rspamd_inet_address_to_string_pretty (ls->addr)); if (ls->type == RSPAMD_WORKER_SOCKET_UDP) { - accept_events = g_malloc0 (sizeof (struct event) * 2); - event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST, - accept_fuzzy_socket, worker); - event_base_set (ctx->ev_base, &accept_events[0]); - event_add (&accept_events[0], NULL); - worker->accept_events = g_list_prepend (worker->accept_events, - accept_events); + ac_ev = g_malloc0 (sizeof (*ac_ev)); + ac_ev->accept_ev.data = worker; + ac_ev->event_loop = ctx->event_loop; + ev_io_init (&ac_ev->accept_ev, accept_fuzzy_socket, ls->fd, + EV_READ); + ev_io_start (ctx->event_loop, &ac_ev->accept_ev); + DL_APPEND (worker->accept_events, ac_ev); } - else if (worker->index == 0) { + else { /* We allow TCP listeners only for a update worker */ - accept_events = g_malloc0 (sizeof (struct event) * 2); - - if (ctx->collection_mode) { - event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST, - accept_fuzzy_collection_socket, worker); - } - else { - event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST, - accept_fuzzy_mirror_socket, worker); - } - event_base_set (ctx->ev_base, &accept_events[0]); - event_add (&accept_events[0], NULL); - worker->accept_events = g_list_prepend (worker->accept_events, - accept_events); + g_assert_not_reached (); } } @@ -2963,10 +1852,9 @@ fuzzy_peer_rep (struct rspamd_worker *worker, if (worker->index == 0 && ctx->peer_fd != -1) { /* Listen for peer requests */ - event_set (&ctx->peer_ev, ctx->peer_fd, EV_READ | EV_PERSIST, - rspamd_fuzzy_peer_io, ctx); - event_base_set (ctx->ev_base, &ctx->peer_ev); - event_add (&ctx->peer_ev, NULL); + ctx->peer_ev.data = ctx; + ev_io_init (&ctx->peer_ev, rspamd_fuzzy_peer_io, ctx->peer_fd, EV_READ); + ev_io_start (ctx->event_loop, &ctx->peer_ev); } } @@ -2981,140 +1869,53 @@ start_fuzzy (struct rspamd_worker *worker) struct rspamd_srv_command srv_cmd; struct rspamd_config *cfg = worker->srv->cfg; - ctx->ev_base = rspamd_prepare_worker (worker, + ctx->event_loop = rspamd_prepare_worker (worker, "fuzzy", NULL); ctx->peer_fd = -1; ctx->worker = worker; ctx->cfg = worker->srv->cfg; - double_to_tv (ctx->master_timeout, &ctx->master_io_tv); - ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger, - ctx->ev_base, + ctx->event_loop, worker->srv->cfg); rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx, - ctx->ev_base, ctx->resolver->r); + ctx->event_loop, ctx->resolver->r); if (ctx->keypair_cache_size > 0) { /* Create keypairs cache */ ctx->keypair_cache = rspamd_keypair_cache_new (ctx->keypair_cache_size); } - ctx->http_ctx = rspamd_http_context_create (cfg, ctx->ev_base, ctx->cfg->ups_ctx); - - if (!ctx->collection_mode) { - /* - * Open DB and perform VACUUM - */ - if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base, - worker->cf->options, cfg, &err)) == NULL) { - msg_err ("cannot open backend: %e", err); - if (err) { - g_error_free (err); - } - exit (EXIT_SUCCESS); - } - - rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx); - - if (worker->index == 0) { - ctx->updates_pending = g_array_sized_new (FALSE, FALSE, - sizeof (struct fuzzy_peer_cmd), 1024); - rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout, - rspamd_fuzzy_storage_periodic_callback, ctx); + if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->event_loop, + worker->cf->options, cfg, &err)) == NULL) { + msg_err ("cannot open backend: %e", err); + if (err) { + g_error_free (err); } - - double_to_tv (ctx->sync_timeout, &ctx->stat_tv); - event_set (&ctx->stat_ev, -1, EV_TIMEOUT, rspamd_fuzzy_stat_callback, ctx); - event_base_set (ctx->ev_base, &ctx->stat_ev); - event_add (&ctx->stat_ev, &ctx->stat_tv); - - /* Register custom reload and stat commands for the control socket */ - rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_RELOAD, - rspamd_fuzzy_storage_reload, ctx); - rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_STAT, - rspamd_fuzzy_storage_stat, ctx); - rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC, - rspamd_fuzzy_storage_sync, ctx); + exit (EXIT_SUCCESS); } - else { - /* - * In collection mode we do a different thing: - * we collect fuzzy hashes in the updates queue and ignore all read commands - */ - if (worker->index == 0) { - ctx->updates_pending = g_array_sized_new (FALSE, FALSE, - sizeof (struct fuzzy_peer_cmd), 1024); - double_to_tv (ctx->sync_timeout, &ctx->stat_tv); - event_set (&ctx->stat_ev, -1, EV_TIMEOUT|EV_PERSIST, - rspamd_fuzzy_collection_periodic, ctx); - event_base_set (ctx->ev_base, &ctx->stat_ev); - event_add (&ctx->stat_ev, &ctx->stat_tv); - - ctx->collection_rt = rspamd_http_router_new ( - rspamd_fuzzy_collection_error_handler, - rspamd_fuzzy_collection_finish_handler, - &ctx->stat_tv, - NULL, - ctx->http_ctx); - - if (ctx->collection_keypair) { - rspamd_http_router_set_key (ctx->collection_rt, - ctx->collection_keypair); - } - - /* Try to load collection id */ - if (ctx->collection_id_file) { - gint fd; - fd = rspamd_file_xopen (ctx->collection_id_file, O_RDONLY, 0, - FALSE); + rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx); - if (fd == -1) { - if (errno != ENOENT) { - msg_err ("cannot open collection id from %s: %s", - ctx->collection_id_file, strerror (errno)); - } - - ctx->collection_id = 0; - } - else { - if (read (fd, &ctx->collection_id, - sizeof (ctx->collection_id)) == -1) { - msg_err ("cannot read collection id from %s: %s", - ctx->collection_id_file, strerror (errno)); - ctx->collection_id = 0; - } - - close (fd); - } - } - /* Generate new cookie */ - ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie)); - /* Register paths */ - rspamd_http_router_add_path (ctx->collection_rt, - "/cookie", - rspamd_fuzzy_collection_cookie); - rspamd_http_router_add_path (ctx->collection_rt, - "/data", - rspamd_fuzzy_collection_data); - } + if (worker->index == 0) { + ctx->updates_pending = g_array_sized_new (FALSE, FALSE, + sizeof (struct fuzzy_peer_cmd), 1024); + rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout, + rspamd_fuzzy_storage_periodic_callback, ctx); } - if (ctx->mirrors && ctx->mirrors->len != 0) { - if (ctx->sync_keypair == NULL) { - GString *pk_str = NULL; - - ctx->sync_keypair = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX, - RSPAMD_CRYPTOBOX_MODE_25519); - pk_str = rspamd_keypair_print (ctx->sync_keypair, - RSPAMD_KEYPAIR_COMPONENT_PK|RSPAMD_KEYPAIR_BASE32); - msg_warn_config ("generating new temporary keypair for communicating" - " with slave hosts, pk is %s", pk_str->str); - g_string_free (pk_str, TRUE); - } - } + ctx->stat_ev.data = ctx; + ev_timer_init (&ctx->stat_ev, rspamd_fuzzy_stat_callback, ctx->sync_timeout, + ctx->sync_timeout); + ev_timer_start (ctx->event_loop, &ctx->stat_ev); + /* Register custom reload and stat commands for the control socket */ + rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_RELOAD, + rspamd_fuzzy_storage_reload, ctx); + rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_STAT, + rspamd_fuzzy_storage_stat, ctx); + rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC, + rspamd_fuzzy_storage_sync, ctx); /* Create radix trees */ if (ctx->update_map != NULL) { @@ -3123,12 +1924,6 @@ start_fuzzy (struct rspamd_worker *worker) &ctx->update_ips, NULL); } - if (ctx->masters_map != NULL) { - rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->masters_map, - "Allow fuzzy master/slave updates from specified addresses", - &ctx->master_ips, NULL); - } - if (ctx->skip_map != NULL) { struct rspamd_map *m; @@ -3168,9 +1963,9 @@ start_fuzzy (struct rspamd_worker *worker) /* Maps events */ ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger, - ctx->ev_base, + ctx->event_loop, worker->srv->cfg); - rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver, worker, 0); + rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0); /* Get peer pipe */ memset (&srv_cmd, 0, sizeof (srv_cmd)); @@ -3180,64 +1975,35 @@ start_fuzzy (struct rspamd_worker *worker) memset (srv_cmd.cmd.spair.pair_id, 0, sizeof (srv_cmd.cmd.spair.pair_id)); memcpy (srv_cmd.cmd.spair.pair_id, "fuzzy", sizeof ("fuzzy")); - rspamd_srv_send_command (worker, ctx->ev_base, &srv_cmd, -1, + rspamd_srv_send_command (worker, ctx->event_loop, &srv_cmd, -1, fuzzy_peer_rep, ctx); - event_base_loop (ctx->ev_base, 0); + ev_loop (ctx->event_loop, 0); rspamd_worker_block_signals (); - if (worker->index == 0 && ctx->updates_pending->len > 0) { - if (!ctx->collection_mode) { - rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE); - event_base_loop (ctx->ev_base, 0); + if (ctx->peer_fd != -1) { + if (worker->index == 0) { + ev_io_stop (ctx->event_loop, &ctx->peer_ev); } + close (ctx->peer_fd); } - if (!ctx->collection_mode) { - rspamd_fuzzy_backend_close (ctx->backend); + if (worker->index == 0 && ctx->updates_pending->len > 0) { + rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE); + ev_loop (ctx->event_loop, 0); } - else if (worker->index == 0) { - gint fd; - - rspamd_http_router_free (ctx->collection_rt); - - /* Try to save collection id */ - fd = rspamd_file_xopen (ctx->collection_id_file, - O_WRONLY | O_CREAT | O_TRUNC, 00644, 0); - - if (fd == -1) { - msg_err ("cannot open collection id to store in %s: %s", - ctx->collection_id_file, strerror (errno)); - } - else { - if (write (fd, &ctx->collection_id, - sizeof (ctx->collection_id)) == -1) { - msg_err ("cannot store collection id in %s: %s", - ctx->collection_id_file, strerror (errno)); - } - close (fd); - } - } + rspamd_fuzzy_backend_close (ctx->backend); if (worker->index == 0) { g_array_free (ctx->updates_pending, TRUE); } - if (ctx->peer_fd != -1) { - if (worker->index == 0) { - event_del (&ctx->peer_ev); - } - close (ctx->peer_fd); - } - if (ctx->keypair_cache) { rspamd_keypair_cache_destroy (ctx->keypair_cache); } - struct rspamd_http_context *http_ctx = ctx->http_ctx; REF_RELEASE (ctx->cfg); - rspamd_http_context_free (http_ctx); rspamd_log_close (worker->srv->logger, TRUE); exit (EXIT_SUCCESS); |