diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/plugins/fuzzy_check.c | 357 |
1 files changed, 357 insertions, 0 deletions
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index 96c283c5d..2576586c4 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -196,6 +196,7 @@ static gint fuzzy_lua_unlearn_handler(lua_State *L); static gint fuzzy_lua_gen_hashes_handler(lua_State *L); static gint fuzzy_lua_hex_hashes_handler(lua_State *L); static gint fuzzy_lua_list_storages(lua_State *L); +static gint fuzzy_lua_ping_storage(lua_State *L); module_t fuzzy_check_module = { "fuzzy_check", @@ -1227,6 +1228,9 @@ gint fuzzy_check_module_config(struct rspamd_config *cfg, bool validate) lua_pushstring(L, "list_storages"); lua_pushcfunction(L, fuzzy_lua_list_storages); lua_settable(L, -3); + lua_pushstring(L, "ping_storage"); + lua_pushcfunction(L, fuzzy_lua_ping_storage); + lua_settable(L, -3); /* Finish fuzzy_check key */ lua_settable(L, -3); } @@ -1369,6 +1373,59 @@ fuzzy_cmd_stat(struct fuzzy_rule *rule, return io; } +static inline double +fuzzy_milliseconds_since_midnight(void) +{ + double now = rspamd_get_calendar_ticks(); + double ms = now - (int64_t) now; + now = (((int64_t) now % 86400) + ms) * 1000; + + return now; +} + +static struct fuzzy_cmd_io * +fuzzy_cmd_ping(struct fuzzy_rule *rule, + rspamd_mempool_t *pool) +{ + struct rspamd_fuzzy_cmd *cmd; + struct rspamd_fuzzy_encrypted_cmd *enccmd = NULL; + struct fuzzy_cmd_io *io; + + if (rule->peer_key) { + enccmd = rspamd_mempool_alloc0(pool, sizeof(*enccmd)); + cmd = &enccmd->cmd; + } + else { + cmd = rspamd_mempool_alloc0(pool, sizeof(*cmd)); + } + + /* Get milliseconds since midnight */ + + + cmd->cmd = FUZZY_PING; + cmd->version = RSPAMD_FUZZY_PLUGIN_VERSION; + cmd->shingles_count = 0; + cmd->value = fuzzy_milliseconds_since_midnight(); /* Record timestamp */ + cmd->tag = ottery_rand_uint32(); + + io = rspamd_mempool_alloc(pool, sizeof(*io)); + io->flags = 0; + io->tag = cmd->tag; + memcpy(&io->cmd, cmd, sizeof(io->cmd)); + + if (rule->peer_key && enccmd) { + fuzzy_encrypt_cmd(rule, &enccmd->hdr, (guchar *) cmd, sizeof(*cmd)); + io->io.iov_base = enccmd; + io->io.iov_len = sizeof(*enccmd); + } + else { + io->io.iov_base = cmd; + io->io.iov_len = sizeof(*cmd); + } + + return io; +} + static struct fuzzy_cmd_io * fuzzy_cmd_hash(struct fuzzy_rule *rule, int c, @@ -3040,6 +3097,16 @@ fuzzy_generate_commands(struct rspamd_task *task, struct fuzzy_rule *rule, goto end; } + else if (c == FUZZY_PING) { + res = g_ptr_array_sized_new(1); + + io = fuzzy_cmd_ping(rule, task->task_pool); + if (io) { + g_ptr_array_add(res, io); + } + + goto end; + } if (task->message == NULL) { goto end; @@ -4246,6 +4313,9 @@ fuzzy_attach_controller(struct module_ctx *ctx, GHashTable *commands) return 0; } +/* Lua handlers */ +/* TODO: move to a separate unit, as this file is now a bit too hard to read */ + static void lua_upstream_str_inserter(struct upstream *up, guint idx, void *ud) { @@ -4302,4 +4372,291 @@ fuzzy_lua_list_storages(lua_State *L) } return 1; +} + +struct fuzzy_lua_session { + struct rspamd_task *task; + lua_State *L; + rspamd_inet_addr_t *addr; + GPtrArray *commands; + struct fuzzy_rule *rule; + struct rspamd_io_ev ev; + gint cbref; + gint fd; +}; + +static void +fuzzy_lua_session_fin(void *ud) +{ + struct fuzzy_lua_session *session = ud; + + if (session->commands) { + g_ptr_array_free(session->commands, TRUE); + } + + rspamd_ev_watcher_stop(session->task->event_loop, &session->ev); + luaL_unref(session->L, LUA_REGISTRYINDEX, session->cbref); +} + +static gboolean +fuzzy_lua_session_is_completed(struct fuzzy_lua_session *session) +{ + struct fuzzy_cmd_io *io; + guint nreplied = 0, i; + + + for (i = 0; i < session->commands->len; i++) { + io = g_ptr_array_index(session->commands, i); + + if (io->flags & FUZZY_CMD_FLAG_REPLIED) { + nreplied++; + } + } + + if (nreplied == session->commands->len) { + + rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session); + + return TRUE; + } + + return FALSE; +} + +static gint +fuzzy_lua_try_read(struct fuzzy_lua_session *session) +{ + struct rspamd_task *task; + const struct rspamd_fuzzy_reply *rep; + struct rspamd_fuzzy_cmd *cmd = NULL; + struct fuzzy_cmd_io *io = NULL; + gint r, ret; + guchar buf[2048], *p; + + task = session->task; + + if ((r = read(session->fd, buf, sizeof(buf) - 1)) == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { + return 0; + } + else { + return -1; + } + } + else { + p = buf; + + ret = 0; + + while ((rep = fuzzy_process_reply(&p, &r, + session->commands, session->rule, &cmd, &io)) != NULL) { + + lua_rawgeti(session->L, LUA_REGISTRYINDEX, session->cbref); + + if (rep->v1.prob > 0.5) { + if (cmd->cmd == FUZZY_PING) { + /* ret, addr, latency */ + lua_pushboolean(session->L, TRUE); + rspamd_lua_ip_push(session->L, session->addr); + lua_pushnumber(session->L, fuzzy_milliseconds_since_midnight() - rep->v1.value); + } + else { + /* TODO: unsupported */ + lua_pushboolean(session->L, FALSE); + rspamd_lua_ip_push(session->L, session->addr); + lua_pushstring(session->L, "unsupported"); + } + } + else { + lua_pushboolean(session->L, FALSE); + rspamd_lua_ip_push(session->L, session->addr); + lua_pushfstring(session->L, "invalid reply from server: %d", rep->v1.value); + } + + /* TODO: check results maybe? */ + lua_pcall(session->L, 3, 0, 0); + + ret = 1; + } + } + + return ret; +} + +/* Fuzzy check callback */ +static void +fuzzy_lua_io_callback(gint fd, short what, void *arg) +{ + struct fuzzy_lua_session *session = arg; + gint r; + + enum { + return_error = 0, + return_want_more, + return_finished + } ret = return_error; + + if (what & EV_READ) { + /* Try to read reply */ + r = fuzzy_lua_try_read(session); + + switch (r) { + case 0: + if (what & EV_READ) { + ret = return_want_more; + } + else { + if (what & EV_WRITE) { + /* Retransmit attempt */ + if (!fuzzy_cmd_vector_to_wire(fd, session->commands)) { + ret = return_error; + } + else { + ret = return_want_more; + } + } + } + break; + case 1: + ret = return_finished; + break; + default: + ret = return_error; + break; + } + } + else if (what & EV_WRITE) { + if (!fuzzy_cmd_vector_to_wire(fd, session->commands)) { + ret = return_error; + } + else { + ret = return_want_more; + } + } + else { + /* Timeout */ + ret = return_error; + } + + if (ret == return_want_more) { + /* Processed write, switch to reading */ + rspamd_ev_watcher_reschedule(session->task->event_loop, + &session->ev, EV_READ); + } + else if (ret == return_error) { + rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session); + } + else { + /* Read something from network */ + if (!fuzzy_lua_session_is_completed(session)) { + /* Need to read more */ + rspamd_ev_watcher_reschedule(session->task->event_loop, + &session->ev, EV_READ); + } + } +} + +/*** + * @function fuzzy_check.ping_storage(task, callback, rule, timeout[, server_override]) + * @return + */ +static gint +fuzzy_lua_ping_storage(lua_State *L) +{ + struct rspamd_task *task = lua_check_task(L, 1); + + if (task == NULL) { + return luaL_error(L, "invalid arguments: task"); + } + + /* Other arguments sanity */ + if (lua_type(L, 2) != LUA_TFUNCTION || lua_type(L, 3) != LUA_TSTRING || lua_type(L, 4) != LUA_TNUMBER) { + return luaL_error(L, "invalid arguments: callback/rule/timeout argument"); + } + + struct fuzzy_ctx *fuzzy_module_ctx = fuzzy_get_context(task->cfg); + struct fuzzy_rule *rule, *rule_found = NULL; + int i; + const char *rule_name = lua_tostring(L, 3); + + PTR_ARRAY_FOREACH(fuzzy_module_ctx->fuzzy_rules, i, rule) + { + if (strcmp(rule->name, rule_name) == 0) { + rule_found = rule; + break; + } + } + + if (rule_found == NULL) { + return luaL_error(L, "invalid arguments: no such rule defined"); + } + + rspamd_inet_addr_t *addr = NULL; + + if (lua_type(L, 5) == LUA_TSTRING) { + const gchar *server_name = lua_tostring(L, 5); + enum rspamd_parse_host_port_result res; + GPtrArray *addrs; + + /* We resolve address synchronously here! Why? Because it is an override... */ + res = rspamd_parse_host_port_priority(server_name, &addrs, 0, NULL, + 11335, FALSE, task->task_pool); + + if (res == RSPAMD_PARSE_ADDR_FAIL) { + lua_pushboolean(L, FALSE); + lua_pushfstring(L, "invalid arguments: cannot resolve %s", server_name); + return 2; + } + + /* Get random address */ + addr = rspamd_inet_address_copy(g_ptr_array_index(addrs, rspamd_random_uint64_fast() % addrs->len), + task->task_pool); + rspamd_mempool_add_destructor(task->task_pool, + rspamd_ptr_array_free_hard, addrs); + } + else { + struct upstream *selected = rspamd_upstream_get(rule_found->servers, + RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); + addr = rspamd_upstream_addr_next(selected); + } + + if (addr != NULL) { + int sock; + GPtrArray *commands = fuzzy_generate_commands(task, rule, FUZZY_PING, 0, 0, 0); + + if ((sock = rspamd_inet_address_connect(addr, SOCK_DGRAM, TRUE)) == -1) { + lua_pushboolean(L, FALSE); + lua_pushfstring(L, "cannot connect to %s, %s", + rspamd_inet_address_to_string_pretty(addr), + strerror(errno)); + return 2; + } + else { + /* Create a dedicated ping session for a socket */ + struct fuzzy_lua_session *session = + rspamd_mempool_alloc0(task->task_pool, + sizeof(struct fuzzy_lua_session)); + session->task = task; + session->fd = sock; + session->addr = addr; + session->commands = commands; + session->L = L; + session->rule = rule_found; + /* Store callback */ + lua_pushvalue(L, 2); + session->cbref = luaL_ref(L, LUA_REGISTRYINDEX); + + rspamd_session_add_event(task->s, fuzzy_lua_session_fin, session, M); + rspamd_ev_watcher_init(&session->ev, + sock, + EV_WRITE, + fuzzy_lua_io_callback, + session); + rspamd_ev_watcher_start(session->task->event_loop, &session->ev, + lua_tonumber(L, 4)); + } + } + + lua_pushboolean(L, TRUE); + return 1; }
\ No newline at end of file |