]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Add some major stuff to implement client side of the fuzzy ping
authorVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 9 Nov 2023 17:57:21 +0000 (17:57 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 9 Nov 2023 17:57:21 +0000 (17:57 +0000)
lualib/rspamadm/fuzzy_ping.lua
src/plugins/fuzzy_check.c

index e132208650f887dfc7139ada1416178193cf0b1a..eed509282132b7d73ecf00a768b717957443bdd5 100644 (file)
@@ -33,16 +33,14 @@ parser:option "-r --rule"
       :description "Storage to ping (must be configured in Rspamd configuration)"
       :argname("<name>")
       :default("rspamd.com")
-parser:option "-f --flood"
-      :description "Flood mode (send requests as fast as possible)"
-      :argname("<count>")
-      :convert(tonumber)
-      :default(10)
 parser:option "-t --timeout"
       :description "Timeout for requests"
       :argname("<timeout>")
       :convert(tonumber)
       :default(5)
+parser:option "-s --server"
+      :description "Override server to ping"
+      :argname("<name>")
 parser:option "-n --number"
       :description "Timeout for requests"
       :argname("<number>")
@@ -90,18 +88,73 @@ local function print_storages(rules)
   end
 end
 
+local function print_results(results)
+  for _, res in ipairs(results) do
+    if res.success then
+      print(highlight('Server %s: %s ms', res.server, res.latency))
+    else
+      print(highlight('Server %s: %s', res.server, res.error))
+    end
+  end
+end
+
 local function handler(args)
   local opts = parser:parse(args)
 
   load_config(opts)
 
   if opts.list then
-    local storages = rspamd_plugins.fuzzy_check.list_storages(rspamd_config)
-    print_storages(storages)
+    print_storages(rspamd_plugins.fuzzy_check.list_storages(rspamd_config))
     os.exit(0)
   end
 
+  -- Perform ping using a fake task from async stuff provided by rspamadm
+  local rspamd_task = require "rspamd_task"
+
+  local task = rspamd_task.create(rspamd_config, rspamadm_ev_base)
+  task:set_session(rspamadm_session)
+  task:set_resolver(rspamadm_dns_resolver)
+
+  local replied = 0
+  local results = {}
+
+  local function gen_ping_fuzzy_cb(num)
+    return function(success, server, latency_or_err)
+      if not success then
+        results[num] = {
+          success = false,
+          error = latency_or_err,
+          server = server,
+        }
+      else
+        results[num] = {
+          success = true,
+          latency = latency_or_err,
+          server = server,
+        }
+      end
+
+      if replied == opts.number - 1 then
+        print_results(results)
+      else
+        replied = replied + 1
+      end
+    end
+  end
+
+  local function ping_fuzzy(num)
+    local ret, err = rspamd_plugins.fuzzy_check.ping_storage(task, gen_ping_fuzzy_cb(num),
+        opts.rule, opts.timeout, opts.server)
+
+    if not ret then
+      rspamd_logger.errx('cannot ping fuzzy storage: %s', err)
+      os.exit(1)
+    end
+  end
 
+  for i = 1, opts.number do
+    ping_fuzzy(i)
+  end
 end
 
 return {
index 96c283c5d386cb15edd2b6d1878af611b19b33a3..2576586c49b6d13ebee92b7ef17fbee6c8d7326b 100644 (file)
@@ -196,6 +196,7 @@ static gint fuzzy_lua_unlearn_handler(lua_State *L);
 static gint fuzzy_lua_gen_hashes_handler(lua_State *L);
 static gint fuzzy_lua_hex_hashes_handler(lua_State *L);
 static gint fuzzy_lua_list_storages(lua_State *L);
+static gint fuzzy_lua_ping_storage(lua_State *L);
 
 module_t fuzzy_check_module = {
        "fuzzy_check",
@@ -1227,6 +1228,9 @@ gint fuzzy_check_module_config(struct rspamd_config *cfg, bool validate)
                lua_pushstring(L, "list_storages");
                lua_pushcfunction(L, fuzzy_lua_list_storages);
                lua_settable(L, -3);
+               lua_pushstring(L, "ping_storage");
+               lua_pushcfunction(L, fuzzy_lua_ping_storage);
+               lua_settable(L, -3);
                /* Finish fuzzy_check key */
                lua_settable(L, -3);
        }
@@ -1369,6 +1373,59 @@ fuzzy_cmd_stat(struct fuzzy_rule *rule,
        return io;
 }
 
+static inline double
+fuzzy_milliseconds_since_midnight(void)
+{
+       double now = rspamd_get_calendar_ticks();
+       double ms = now - (int64_t) now;
+       now = (((int64_t) now % 86400) + ms) * 1000;
+
+       return now;
+}
+
+static struct fuzzy_cmd_io *
+fuzzy_cmd_ping(struct fuzzy_rule *rule,
+                          rspamd_mempool_t *pool)
+{
+       struct rspamd_fuzzy_cmd *cmd;
+       struct rspamd_fuzzy_encrypted_cmd *enccmd = NULL;
+       struct fuzzy_cmd_io *io;
+
+       if (rule->peer_key) {
+               enccmd = rspamd_mempool_alloc0(pool, sizeof(*enccmd));
+               cmd = &enccmd->cmd;
+       }
+       else {
+               cmd = rspamd_mempool_alloc0(pool, sizeof(*cmd));
+       }
+
+       /* Get milliseconds since midnight */
+
+
+       cmd->cmd = FUZZY_PING;
+       cmd->version = RSPAMD_FUZZY_PLUGIN_VERSION;
+       cmd->shingles_count = 0;
+       cmd->value = fuzzy_milliseconds_since_midnight(); /* Record timestamp */
+       cmd->tag = ottery_rand_uint32();
+
+       io = rspamd_mempool_alloc(pool, sizeof(*io));
+       io->flags = 0;
+       io->tag = cmd->tag;
+       memcpy(&io->cmd, cmd, sizeof(io->cmd));
+
+       if (rule->peer_key && enccmd) {
+               fuzzy_encrypt_cmd(rule, &enccmd->hdr, (guchar *) cmd, sizeof(*cmd));
+               io->io.iov_base = enccmd;
+               io->io.iov_len = sizeof(*enccmd);
+       }
+       else {
+               io->io.iov_base = cmd;
+               io->io.iov_len = sizeof(*cmd);
+       }
+
+       return io;
+}
+
 static struct fuzzy_cmd_io *
 fuzzy_cmd_hash(struct fuzzy_rule *rule,
                           int c,
@@ -3040,6 +3097,16 @@ fuzzy_generate_commands(struct rspamd_task *task, struct fuzzy_rule *rule,
 
                goto end;
        }
+       else if (c == FUZZY_PING) {
+               res = g_ptr_array_sized_new(1);
+
+               io = fuzzy_cmd_ping(rule, task->task_pool);
+               if (io) {
+                       g_ptr_array_add(res, io);
+               }
+
+               goto end;
+       }
 
        if (task->message == NULL) {
                goto end;
@@ -4246,6 +4313,9 @@ fuzzy_attach_controller(struct module_ctx *ctx, GHashTable *commands)
        return 0;
 }
 
+/* Lua handlers */
+/* TODO: move to a separate unit, as this file is now a bit too hard to read */
+
 static void
 lua_upstream_str_inserter(struct upstream *up, guint idx, void *ud)
 {
@@ -4301,5 +4371,292 @@ fuzzy_lua_list_storages(lua_State *L)
                lua_setfield(L, -2, rule->name);
        }
 
+       return 1;
+}
+
+struct fuzzy_lua_session {
+       struct rspamd_task *task;
+       lua_State *L;
+       rspamd_inet_addr_t *addr;
+       GPtrArray *commands;
+       struct fuzzy_rule *rule;
+       struct rspamd_io_ev ev;
+       gint cbref;
+       gint fd;
+};
+
+static void
+fuzzy_lua_session_fin(void *ud)
+{
+       struct fuzzy_lua_session *session = ud;
+
+       if (session->commands) {
+               g_ptr_array_free(session->commands, TRUE);
+       }
+
+       rspamd_ev_watcher_stop(session->task->event_loop, &session->ev);
+       luaL_unref(session->L, LUA_REGISTRYINDEX, session->cbref);
+}
+
+static gboolean
+fuzzy_lua_session_is_completed(struct fuzzy_lua_session *session)
+{
+       struct fuzzy_cmd_io *io;
+       guint nreplied = 0, i;
+
+
+       for (i = 0; i < session->commands->len; i++) {
+               io = g_ptr_array_index(session->commands, i);
+
+               if (io->flags & FUZZY_CMD_FLAG_REPLIED) {
+                       nreplied++;
+               }
+       }
+
+       if (nreplied == session->commands->len) {
+
+               rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
+
+               return TRUE;
+       }
+
+       return FALSE;
+}
+
+static gint
+fuzzy_lua_try_read(struct fuzzy_lua_session *session)
+{
+       struct rspamd_task *task;
+       const struct rspamd_fuzzy_reply *rep;
+       struct rspamd_fuzzy_cmd *cmd = NULL;
+       struct fuzzy_cmd_io *io = NULL;
+       gint r, ret;
+       guchar buf[2048], *p;
+
+       task = session->task;
+
+       if ((r = read(session->fd, buf, sizeof(buf) - 1)) == -1) {
+               if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+                       return 0;
+               }
+               else {
+                       return -1;
+               }
+       }
+       else {
+               p = buf;
+
+               ret = 0;
+
+               while ((rep = fuzzy_process_reply(&p, &r,
+                                                                                 session->commands, session->rule, &cmd, &io)) != NULL) {
+
+                       lua_rawgeti(session->L, LUA_REGISTRYINDEX, session->cbref);
+
+                       if (rep->v1.prob > 0.5) {
+                               if (cmd->cmd == FUZZY_PING) {
+                                       /* ret, addr, latency */
+                                       lua_pushboolean(session->L, TRUE);
+                                       rspamd_lua_ip_push(session->L, session->addr);
+                                       lua_pushnumber(session->L, fuzzy_milliseconds_since_midnight() - rep->v1.value);
+                               }
+                               else {
+                                       /* TODO: unsupported */
+                                       lua_pushboolean(session->L, FALSE);
+                                       rspamd_lua_ip_push(session->L, session->addr);
+                                       lua_pushstring(session->L, "unsupported");
+                               }
+                       }
+                       else {
+                               lua_pushboolean(session->L, FALSE);
+                               rspamd_lua_ip_push(session->L, session->addr);
+                               lua_pushfstring(session->L, "invalid reply from server: %d", rep->v1.value);
+                       }
+
+                       /* TODO: check results maybe? */
+                       lua_pcall(session->L, 3, 0, 0);
+
+                       ret = 1;
+               }
+       }
+
+       return ret;
+}
+
+/* Fuzzy check callback */
+static void
+fuzzy_lua_io_callback(gint fd, short what, void *arg)
+{
+       struct fuzzy_lua_session *session = arg;
+       gint r;
+
+       enum {
+               return_error = 0,
+               return_want_more,
+               return_finished
+       } ret = return_error;
+
+       if (what & EV_READ) {
+               /* Try to read reply */
+               r = fuzzy_lua_try_read(session);
+
+               switch (r) {
+               case 0:
+                       if (what & EV_READ) {
+                               ret = return_want_more;
+                       }
+                       else {
+                               if (what & EV_WRITE) {
+                                       /* Retransmit attempt */
+                                       if (!fuzzy_cmd_vector_to_wire(fd, session->commands)) {
+                                               ret = return_error;
+                                       }
+                                       else {
+                                               ret = return_want_more;
+                                       }
+                               }
+                       }
+                       break;
+               case 1:
+                       ret = return_finished;
+                       break;
+               default:
+                       ret = return_error;
+                       break;
+               }
+       }
+       else if (what & EV_WRITE) {
+               if (!fuzzy_cmd_vector_to_wire(fd, session->commands)) {
+                       ret = return_error;
+               }
+               else {
+                       ret = return_want_more;
+               }
+       }
+       else {
+               /* Timeout */
+               ret = return_error;
+       }
+
+       if (ret == return_want_more) {
+               /* Processed write, switch to reading */
+               rspamd_ev_watcher_reschedule(session->task->event_loop,
+                                                                        &session->ev, EV_READ);
+       }
+       else if (ret == return_error) {
+               rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
+       }
+       else {
+               /* Read something from network */
+               if (!fuzzy_lua_session_is_completed(session)) {
+                       /* Need to read more */
+                       rspamd_ev_watcher_reschedule(session->task->event_loop,
+                                                                                &session->ev, EV_READ);
+               }
+       }
+}
+
+/***
+ * @function fuzzy_check.ping_storage(task, callback, rule, timeout[, server_override])
+ * @return
+ */
+static gint
+fuzzy_lua_ping_storage(lua_State *L)
+{
+       struct rspamd_task *task = lua_check_task(L, 1);
+
+       if (task == NULL) {
+               return luaL_error(L, "invalid arguments: task");
+       }
+
+       /* Other arguments sanity */
+       if (lua_type(L, 2) != LUA_TFUNCTION || lua_type(L, 3) != LUA_TSTRING || lua_type(L, 4) != LUA_TNUMBER) {
+               return luaL_error(L, "invalid arguments: callback/rule/timeout argument");
+       }
+
+       struct fuzzy_ctx *fuzzy_module_ctx = fuzzy_get_context(task->cfg);
+       struct fuzzy_rule *rule, *rule_found = NULL;
+       int i;
+       const char *rule_name = lua_tostring(L, 3);
+
+       PTR_ARRAY_FOREACH(fuzzy_module_ctx->fuzzy_rules, i, rule)
+       {
+               if (strcmp(rule->name, rule_name) == 0) {
+                       rule_found = rule;
+                       break;
+               }
+       }
+
+       if (rule_found == NULL) {
+               return luaL_error(L, "invalid arguments: no such rule defined");
+       }
+
+       rspamd_inet_addr_t *addr = NULL;
+
+       if (lua_type(L, 5) == LUA_TSTRING) {
+               const gchar *server_name = lua_tostring(L, 5);
+               enum rspamd_parse_host_port_result res;
+               GPtrArray *addrs;
+
+               /* We resolve address synchronously here! Why? Because it is an override... */
+               res = rspamd_parse_host_port_priority(server_name, &addrs, 0, NULL,
+                                                                                         11335, FALSE, task->task_pool);
+
+               if (res == RSPAMD_PARSE_ADDR_FAIL) {
+                       lua_pushboolean(L, FALSE);
+                       lua_pushfstring(L, "invalid arguments: cannot resolve %s", server_name);
+                       return 2;
+               }
+
+               /* Get random address */
+               addr = rspamd_inet_address_copy(g_ptr_array_index(addrs, rspamd_random_uint64_fast() % addrs->len),
+                                                                               task->task_pool);
+               rspamd_mempool_add_destructor(task->task_pool,
+                                                                         rspamd_ptr_array_free_hard, addrs);
+       }
+       else {
+               struct upstream *selected = rspamd_upstream_get(rule_found->servers,
+                                                                                                               RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
+               addr = rspamd_upstream_addr_next(selected);
+       }
+
+       if (addr != NULL) {
+               int sock;
+               GPtrArray *commands = fuzzy_generate_commands(task, rule, FUZZY_PING, 0, 0, 0);
+
+               if ((sock = rspamd_inet_address_connect(addr, SOCK_DGRAM, TRUE)) == -1) {
+                       lua_pushboolean(L, FALSE);
+                       lua_pushfstring(L, "cannot connect to %s, %s",
+                                                       rspamd_inet_address_to_string_pretty(addr),
+                                                       strerror(errno));
+                       return 2;
+               }
+               else {
+                       /* Create a dedicated ping session for a socket */
+                       struct fuzzy_lua_session *session =
+                               rspamd_mempool_alloc0(task->task_pool,
+                                                                         sizeof(struct fuzzy_lua_session));
+                       session->task = task;
+                       session->fd = sock;
+                       session->addr = addr;
+                       session->commands = commands;
+                       session->L = L;
+                       session->rule = rule_found;
+                       /* Store callback */
+                       lua_pushvalue(L, 2);
+                       session->cbref = luaL_ref(L, LUA_REGISTRYINDEX);
+
+                       rspamd_session_add_event(task->s, fuzzy_lua_session_fin, session, M);
+                       rspamd_ev_watcher_init(&session->ev,
+                                                                  sock,
+                                                                  EV_WRITE,
+                                                                  fuzzy_lua_io_callback,
+                                                                  session);
+                       rspamd_ev_watcher_start(session->task->event_loop, &session->ev,
+                                                                       lua_tonumber(L, 4));
+               }
+       }
+
+       lua_pushboolean(L, TRUE);
        return 1;
 }
\ No newline at end of file