gpointer init_fuzzy(struct rspamd_config *cfg);
void start_fuzzy(struct rspamd_worker *worker);
+#define msg_debug_fuzzy_storage(...) rspamd_conditional_debug_fast(NULL, NULL, \
+ rspamd_fuzzy_storage_log_id, "fuzzy_storage", NULL, \
+ RSPAMD_LOG_FUNC, \
+ __VA_ARGS__)
+INIT_LOG_MODULE(fuzzy_storage);
+
worker_t fuzzy_worker = {
"fuzzy", /* Name */
init_fuzzy, /* Init function */
double expire;
double sync_timeout;
double delay;
+ double tcp_timeout;
struct rspamd_radix_map_helper *update_ips;
struct rspamd_hash_map_helper *update_keys;
struct rspamd_radix_map_helper *blocked_ips;
CMD_ENCRYPTED_SHINGLE
};
+#define FUZZY_TCP_BUFFER_LENGTH 8192
+
+struct fuzzy_tcp_reply {
+ struct rspamd_fuzzy_encrypted_reply rep; /* Serialized reply */
+ unsigned int written; /* How many have we already written */
+ struct fuzzy_tcp_reply *prev, *next; /* Link */
+};
+
+struct fuzzy_tcp_session {
+ struct rspamd_worker *worker;
+ rspamd_inet_addr_t *addr;
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ int fd;
+ struct ev_io io;
+ struct ev_timer tm;
+
+ /*
+ * We store the state in the current frame
+ * 0 0 x x x x x x x x x x x x x x x - initial
+ * 1 0 x x x x x x x x x x x x x x x - read 1 byte of lenght
+ * 1 1 x x x x x x x x x x x x x x x - read 2 bytes of length
+ * So the length is always cur_frame & 0x3fff
+ */
+ uint16_t cur_frame_state;
+ unsigned int bytes_unprocessed;
+
+ struct fuzzy_tcp_reply *replies_queue;
+ unsigned char input_buf[FUZZY_TCP_BUFFER_LENGTH];
+};
+
struct fuzzy_session {
struct rspamd_worker *worker;
rspamd_inet_addr_t *addr;
struct sockaddr_storage ss;
};
+static void
+tcp_session_dtor(struct fuzzy_tcp_session *tcp_session)
+{
+ struct fuzzy_tcp_reply *rep;
+ if (tcp_session->addr) {
+ rspamd_inet_address_free(tcp_session->addr);
+ }
+
+ if (tcp_session->ctx->event_loop) {
+ ev_timer_stop(tcp_session->ctx->event_loop, &tcp_session->tm);
+ ev_io_stop(tcp_session->ctx->event_loop, &tcp_session->io);
+ }
+
+ DL_FOREACH(tcp_session->replies_queue, rep)
+ {
+ g_free(rep);
+ }
+
+ close(tcp_session->fd);
+ g_free(tcp_session);
+}
+
+static void
+tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
+{
+ struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data;
+
+ msg_debug_fuzzy_storage("got io for %s: %d", rspamd_inet_address_to_string(tcp_session->addr), revents);
+}
+
+static void
+tcp_fuzzy_socket_timeout(EV_P_ ev_timer *w, int revents)
+{
+ struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data;
+
+ msg_debug_fuzzy_storage("timed out TCP connection from %s", rspamd_inet_address_to_string(tcp_session->addr));
+
+ tcp_session_dtor(tcp_session);
+}
+
static void
accept_tcp_fuzzy_socket(EV_P_ ev_io *w, int revents)
{
struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
struct rspamd_fuzzy_storage_ctx *ctx;
- struct fuzzy_session *session;
+ struct fuzzy_tcp_session *tcp_session;
ctx = (struct rspamd_fuzzy_storage_ctx *) worker->ctx;
return;
}
+
+ tcp_session = g_malloc0(sizeof(*tcp_session));
+ tcp_session->addr = addr;
+ tcp_session->ctx = ctx;
+ tcp_session->fd = nfd;
+ ev_io_init(&tcp_session->io, tcp_fuzzy_socket_io, nfd, EV_READ);
+ ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, 0.0);
+ tcp_session->tm.data = tcp_session;
+ tcp_session->io.data = tcp_session;
+ ev_timer_start(ctx->event_loop, &tcp_session->tm);
+ ev_io_start(ctx->event_loop, &tcp_session->io);
+
+ msg_debug_fuzzy_storage("accepted TCP connection from %s", rspamd_inet_address_to_string(addr));
}
ctx->leaky_bucket_burst = NAN;
ctx->leaky_bucket_rate = NAN;
ctx->delay = NAN;
+ ctx->tcp_timeout = 5.0;
ctx->default_forbidden_ids = kh_init(fuzzy_key_ids_set);
ctx->weak_ids = kh_init(fuzzy_key_ids_set);
RSPAMD_CL_FLAG_TIME_FLOAT,
"Default expire time for hashes, default: " G_STRINGIFY(DEFAULT_EXPIRE) " seconds");
+ rspamd_rcl_register_worker_option(cfg,
+ type,
+ "tcp_timeout",
+ rspamd_rcl_parse_struct_time,
+ ctx,
+ G_STRUCT_OFFSET(struct rspamd_fuzzy_storage_ctx,
+ tcp_timeout),
+ RSPAMD_CL_FLAG_TIME_FLOAT,
+ "Default tcp timeout: " G_STRINGIFY(5.0) " seconds");
+
rspamd_rcl_register_worker_option(cfg,
type,
"delay",