struct rspamd_io_ev ev;
int state;
int fd;
- unsigned int retransmits;
+ int retransmits;
};
struct fuzzy_learn_session {
struct ev_loop *event_loop;
struct rspamd_io_ev ev;
int fd;
- unsigned int retransmits;
+ int retransmits;
};
#define FUZZY_CMD_FLAG_REPLIED (1 << 0)
(unsigned int) -1,
};
-INIT_LOG_MODULE(N)
-#define msg_debug_fuzzy_check(...) rspamd_conditional_debug_fast(NULL, \
- task ? task->from_addr : NULL, \
- rspamd_task_log_id, M, task ? task->task_pool->tag.uid : NULL, \
- RSPAMD_LOG_FUNC, \
+INIT_LOG_MODULE(fuzzy_check)
+#define msg_debug_fuzzy_check(...) rspamd_conditional_debug_fast(NULL, \
+ task ? task->from_addr : NULL, \
+ rspamd_fuzzy_check_log_id, M, task ? task->task_pool->tag.uid : NULL, \
+ RSPAMD_LOG_FUNC, \
__VA_ARGS__)
static inline struct fuzzy_ctx *
}
}
- if (session->retransmits >= session->rule->retransmits) {
- msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits",
- rspamd_upstream_name(session->server),
- rspamd_inet_address_to_string_pretty(
- rspamd_upstream_addr_cur(session->server)),
- session->retransmits,
- session->rule->retransmits);
- rspamd_upstream_fail(session->server, TRUE, "timeout");
+ if (session->retransmits >= 0) {
+ if (session->retransmits >= session->rule->retransmits) {
+ msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits",
+ rspamd_upstream_name(session->server),
+ rspamd_inet_address_to_string_pretty(
+ rspamd_upstream_addr_cur(session->server)),
+ session->retransmits,
+ session->rule->retransmits);
+ rspamd_upstream_fail(session->server, TRUE, "timeout");
- if (session->item) {
- rspamd_symcache_item_async_dec_check(session->task, session->item, M);
+ if (session->item) {
+ rspamd_symcache_item_async_dec_check(session->task, session->item, M);
+ }
+ rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
+ }
+ else {
+ /* Apply backpressure here: at least 100ms up to 500ms */
+ session->retransmits++;
+ double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits;
+ /* Do not make delay more than 500ms for performance considerations */
+ backpressure_time = MIN(backpressure_time, 0.5);
+ /* Inverse to distinguish */
+ msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;",
+ backpressure_time * 1000,
+ rspamd_upstream_name(session->server),
+ session->retransmits);
+ session->retransmits = -(session->retransmits);
+ /* Disable write event for the time of backpressure */
+ rspamd_ev_watcher_reschedule_at(session->event_loop,
+ &session->ev, EV_READ,
+ backpressure_time);
}
- rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
}
else {
/* Plan write event */
- rspamd_ev_watcher_reschedule(session->event_loop,
- &session->ev, EV_READ | EV_WRITE);
- session->retransmits++;
+ rspamd_ev_watcher_reschedule_at(session->event_loop,
+ &session->ev, EV_READ | EV_WRITE,
+ session->rule->io_timeout);
+ session->retransmits = -(session->retransmits);
}
}
task = session->task;
- if (session->retransmits >= session->rule->retransmits) {
- rspamd_upstream_fail(session->server, TRUE, "timeout");
- msg_err_task_check("got IO timeout with server %s(%s), "
- "after %d/%d retransmits",
- rspamd_upstream_name(session->server),
- rspamd_inet_address_to_string_pretty(
- rspamd_upstream_addr_cur(session->server)),
- session->retransmits,
- session->rule->retransmits);
-
- if (session->session) {
- rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin,
- session);
- }
- else {
- if (session->http_entry) {
- rspamd_controller_send_error(session->http_entry,
- 500, "IO timeout with fuzzy storage");
+ if (session->retransmits >= 0) {
+ if (session->retransmits >= session->rule->retransmits) {
+ rspamd_upstream_fail(session->server, TRUE, "timeout");
+ msg_err_task_check("got IO timeout with server %s(%s), "
+ "after %d/%d retransmits",
+ rspamd_upstream_name(session->server),
+ rspamd_inet_address_to_string_pretty(
+ rspamd_upstream_addr_cur(session->server)),
+ session->retransmits,
+ session->rule->retransmits);
+
+ if (session->session) {
+ rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin,
+ session);
}
+ else {
+ if (session->http_entry) {
+ rspamd_controller_send_error(session->http_entry,
+ 500, "IO timeout with fuzzy storage");
+ }
- if (*session->saved > 0) {
- (*session->saved)--;
- if (*session->saved == 0) {
- if (session->http_entry) {
- rspamd_task_free(session->task);
+ if (*session->saved > 0) {
+ (*session->saved)--;
+ if (*session->saved == 0) {
+ if (session->http_entry) {
+ rspamd_task_free(session->task);
+ }
+
+ session->task = NULL;
}
+ }
- session->task = NULL;
+ if (session->http_entry) {
+ rspamd_http_connection_unref(session->http_entry->conn);
}
- }
- if (session->http_entry) {
- rspamd_http_connection_unref(session->http_entry->conn);
+ rspamd_ev_watcher_stop(session->event_loop,
+ &session->ev);
+ close(session->fd);
}
-
- rspamd_ev_watcher_stop(session->event_loop,
- &session->ev);
- close(session->fd);
+ }
+ else {
+ /* Apply backpressure here: at least 100ms up to 500ms */
+ session->retransmits++;
+ double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits;
+ /* Do not make delay more than 500ms for performance considerations */
+ backpressure_time = MIN(backpressure_time, 0.5);
+ /* Inverse to distinguish */
+ msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;",
+ backpressure_time * 1000,
+ rspamd_upstream_name(session->server),
+ session->retransmits);
+ session->retransmits = -(session->retransmits);
+ /* Disable write event for the time of backpressure */
+ rspamd_ev_watcher_reschedule_at(session->event_loop,
+ &session->ev, EV_READ,
+ backpressure_time);
}
}
else {
- /* Plan write event */
- rspamd_ev_watcher_reschedule(session->event_loop,
- &session->ev, EV_READ | EV_WRITE);
- session->retransmits++;
+ /* Backpressure done, plan write event */
+ rspamd_ev_watcher_reschedule_at(session->event_loop,
+ &session->ev, EV_READ | EV_WRITE,
+ session->rule->io_timeout);
+ session->retransmits = -(session->retransmits);
}
}