struct rspamd_fuzzy_redis_session {
struct rspamd_fuzzy_backend_redis *backend;
redisAsyncContext *ctx;
- struct event timeout;
+ ev_timer timeout;
const struct rspamd_fuzzy_cmd *cmd;
- struct ev_loop *ev_base;
+ struct ev_loop *event_loop;
float prob;
gboolean shingles_checked;
ac, is_fatal);
}
- if (rspamd_event_pending (&session->timeout, EV_TIMEOUT)) {
- event_del (&session->timeout);
- }
-
+ ev_timer_stop (session->event_loop, &session->timeout);
rspamd_fuzzy_redis_session_free_args (session);
REF_RELEASE (session->backend);
}
static void
-rspamd_fuzzy_redis_timeout (gint fd, short what, gpointer priv)
+rspamd_fuzzy_redis_timeout (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_fuzzy_redis_session *session = priv;
+ struct rspamd_fuzzy_redis_session *session =
+ (struct rspamd_fuzzy_redis_session *)w->data;
redisAsyncContext *ac;
static char errstr[128];
struct rspamd_fuzzy_redis_session *session = priv;
redisReply *reply = r, *cur;
struct rspamd_fuzzy_reply rep;
- struct timeval tv;
GString *key;
struct _rspamd_fuzzy_shingles_helper *shingles, *prev = NULL, *sel = NULL;
guint i, found = 0, max_found = 0, cur_found = 0;
- event_del (&session->timeout);
+ ev_timer_stop (session->event_loop, &session->timeout);
memset (&rep, 0, sizeof (rep));
if (c->err == 0) {
}
else {
/* Add timeout */
- event_set (&session->timeout, -1, EV_TIMEOUT,
+ session->timeout.data = session;
+ ev_timer_init (&session->timeout,
rspamd_fuzzy_redis_timeout,
- session);
- event_base_set (session->ev_base, &session->timeout);
- double_to_tv (session->backend->timeout, &tv);
- event_add (&session->timeout, &tv);
+ session->backend->timeout, 0.0);
+ ev_timer_start (session->event_loop, &session->timeout);
}
return;
static void
rspamd_fuzzy_backend_check_shingles (struct rspamd_fuzzy_redis_session *session)
{
- struct timeval tv;
struct rspamd_fuzzy_reply rep;
const struct rspamd_fuzzy_shingle_cmd *shcmd;
GString *key;
}
else {
/* Add timeout */
- event_set (&session->timeout, -1, EV_TIMEOUT, rspamd_fuzzy_redis_timeout,
- session);
- event_base_set (session->ev_base, &session->timeout);
- double_to_tv (session->backend->timeout, &tv);
- event_add (&session->timeout, &tv);
+ session->timeout.data = session;
+ ev_timer_init (&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start (session->event_loop, &session->timeout);
}
}
gulong value;
guint found_elts = 0;
- event_del (&session->timeout);
+ ev_timer_stop (session->event_loop, &session->timeout);
memset (&rep, 0, sizeof (rep));
if (c->err == 0) {
struct rspamd_fuzzy_redis_session *session;
struct upstream *up;
struct upstream_list *ups;
- struct timeval tv;
rspamd_inet_addr_t *addr;
struct rspamd_fuzzy_reply rep;
GString *key;
session->prob = 1.0;
memcpy (rep.digest, session->cmd->digest, sizeof (rep.digest));
memcpy (session->found_digest, session->cmd->digest, sizeof (rep.digest));
- session->ev_base = rspamd_fuzzy_backend_event_base (bk);
+ session->event_loop = rspamd_fuzzy_backend_event_base (bk);
/* First of all check digest */
session->nargs = 5;
}
else {
/* Add timeout */
- event_set (&session->timeout, -1, EV_TIMEOUT, rspamd_fuzzy_redis_timeout,
- session);
- event_base_set (session->ev_base, &session->timeout);
- double_to_tv (backend->timeout, &tv);
- event_add (&session->timeout, &tv);
+ session->timeout.data = session;
+ ev_timer_init (&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start (session->event_loop, &session->timeout);
}
}
}
redisReply *reply = r;
gulong nelts;
- event_del (&session->timeout);
+ ev_timer_stop (session->event_loop, &session->timeout);
if (c->err == 0) {
rspamd_upstream_ok (session->up);
struct rspamd_fuzzy_redis_session *session;
struct upstream *up;
struct upstream_list *ups;
- struct timeval tv;
rspamd_inet_addr_t *addr;
GString *key;
session->callback.cb_count = cb;
session->cbdata = ud;
session->command = RSPAMD_FUZZY_REDIS_COMMAND_COUNT;
- session->ev_base = rspamd_fuzzy_backend_event_base (bk);
+ session->event_loop = rspamd_fuzzy_backend_event_base (bk);
session->nargs = 2;
session->argv = g_malloc (sizeof (gchar *) * 2);
}
else {
/* Add timeout */
- event_set (&session->timeout, -1, EV_TIMEOUT, rspamd_fuzzy_redis_timeout,
- session);
- event_base_set (session->ev_base, &session->timeout);
- double_to_tv (backend->timeout, &tv);
- event_add (&session->timeout, &tv);
+ session->timeout.data = session;
+ ev_timer_init (&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start (session->event_loop, &session->timeout);
}
}
}
redisReply *reply = r;
gulong nelts;
- event_del (&session->timeout);
+ ev_timer_stop (session->event_loop, &session->timeout);
if (c->err == 0) {
rspamd_upstream_ok (session->up);
struct rspamd_fuzzy_redis_session *session;
struct upstream *up;
struct upstream_list *ups;
- struct timeval tv;
rspamd_inet_addr_t *addr;
GString *key;
session->callback.cb_version = cb;
session->cbdata = ud;
session->command = RSPAMD_FUZZY_REDIS_COMMAND_VERSION;
- session->ev_base = rspamd_fuzzy_backend_event_base (bk);
+ session->event_loop = rspamd_fuzzy_backend_event_base (bk);
session->nargs = 2;
session->argv = g_malloc (sizeof (gchar *) * 2);
}
else {
/* Add timeout */
- event_set (&session->timeout, -1, EV_TIMEOUT, rspamd_fuzzy_redis_timeout,
- session);
- event_base_set (session->ev_base, &session->timeout);
- double_to_tv (backend->timeout, &tv);
- event_add (&session->timeout, &tv);
+ session->timeout.data = session;
+ ev_timer_init (&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start (session->event_loop, &session->timeout);
}
}
}
{
struct rspamd_fuzzy_redis_session *session = priv;
redisReply *reply = r;
- event_del (&session->timeout);
+
+ ev_timer_stop (session->event_loop, &session->timeout);
if (c->err == 0) {
rspamd_upstream_ok (session->up);
struct rspamd_fuzzy_redis_session *session;
struct upstream *up;
struct upstream_list *ups;
- struct timeval tv;
rspamd_inet_addr_t *addr;
guint i;
GString *key;
struct fuzzy_peer_cmd *io_cmd;
- struct rspamd_fuzzy_cmd *cmd;
+ struct rspamd_fuzzy_cmd *cmd = NULL;
guint nargs, ncommands, cur_shift;
g_assert (backend != NULL);
session->command = RSPAMD_FUZZY_REDIS_COMMAND_UPDATES;
session->cmd = cmd;
session->prob = 1.0;
- session->ev_base = rspamd_fuzzy_backend_event_base (bk);
+ session->event_loop = rspamd_fuzzy_backend_event_base (bk);
/* First of all check digest */
session->nargs = nargs;
}
else {
/* Add timeout */
- event_set (&session->timeout, -1, EV_TIMEOUT, rspamd_fuzzy_redis_timeout,
- session);
- event_base_set (session->ev_base, &session->timeout);
- double_to_tv (backend->timeout, &tv);
- event_add (&session->timeout, &tv);
+ session->timeout.data = session;
+ ev_timer_init (&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start (session->event_loop, &session->timeout);
}
}
}
gboolean
-rspamd_milter_handle_socket (gint fd, const struct timeval *tv,
+rspamd_milter_handle_socket (gint fd, ev_tstamp timeout,
rspamd_mempool_t *pool,
struct ev_loop *ev_base, rspamd_milter_finish finish_cb,
rspamd_milter_error error_cb, void *ud)
priv->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "milter");
priv->discard_on_reject = milter_ctx->discard_on_reject;
priv->quarantine_on_reject = milter_ctx->quarantine_on_reject;
+ priv->ev.timeout = timeout;
if (pool) {
/* Copy tag */
priv->headers = kh_init (milter_headers_hash_t);
kh_resize (milter_headers_hash_t, priv->headers, 32);
- if (tv) {
- memcpy (&priv->tv, tv, sizeof (*tv));
- priv->ptv = &priv->tv;
- }
- else {
- priv->ptv = NULL;
- }
-
session->priv = priv;
REF_INIT_RETAIN (session, rspamd_milter_session_dtor);
#include "fstring.h"
#include "addr.h"
#include "contrib/libucl/ucl.h"
+#include "contrib/libev/ev.h"
#include "ref.h"
enum rspamd_milter_reply {
* @param ud
* @return
*/
-gboolean rspamd_milter_handle_socket (gint fd, const struct timeval *tv,
+gboolean rspamd_milter_handle_socket (gint fd, ev_tstamp timeout,
rspamd_mempool_t *pool,
struct ev_loop *ev_base, rspamd_milter_finish finish_cb,
rspamd_milter_error error_cb, void *ud);
struct rspamd_milter_parser parser;
struct rspamd_io_ev ev;
struct rspamd_milter_outbuf *out_chain;
- struct timeval *ptv;
struct ev_loop *event_loop;
rspamd_mempool_t *pool;
khash_t(milter_headers_hash_t) *headers;
ucl_object_insert_key (top, ucl_object_fromstring (task->message_id),
"message-id", 0, false);
ucl_object_insert_key (top,
- ucl_object_fromdouble (task->time_real_finish - task->time_real),
+ ucl_object_fromdouble (task->time_real_finish - task->task_timestamp),
"time_real", 0, false);
ucl_object_insert_key (top,
ucl_object_fromdouble (task->time_virtual_finish - task->time_virtual),
struct redisAsyncContext *ctx;
struct rspamd_redis_pool_elt *elt;
GList *entry;
- struct event timeout;
+ ev_timer timeout;
gboolean active;
gchar tag[MEMPOOL_UID_LEN];
ref_entry_t ref;
else {
msg_debug_rpool ("inactive connection removed");
- if (rspamd_event_pending (&conn->timeout, EV_TIMEOUT)) {
- event_del (&conn->timeout);
- }
+ ev_timer_stop (conn->elt->pool->event_loop, &conn->timeout);
if (conn->ctx && !(conn->ctx->c.flags & REDIS_FREEING)) {
redisAsyncContext *ac = conn->ctx;
}
static void
-rspamd_redis_conn_timeout (gint fd, short what, gpointer p)
+rspamd_redis_conn_timeout (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_redis_pool_connection *conn = p;
+ struct rspamd_redis_pool_connection *conn =
+ (struct rspamd_redis_pool_connection *)w->data;
g_assert (!conn->active);
msg_debug_rpool ("scheduled removal of connection %p, refcount: %d",
static void
rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn)
{
- struct timeval tv;
gdouble real_timeout;
guint active_elts;
msg_debug_rpool ("scheduled connection %p cleanup in %.1f seconds",
conn->ctx, real_timeout);
- double_to_tv (real_timeout, &tv);
- event_set (&conn->timeout, -1, EV_TIMEOUT, rspamd_redis_conn_timeout, conn);
- event_base_set (conn->elt->pool->event_loop, &conn->timeout);
- event_add (&conn->timeout, &tv);
+
+ conn->timeout.data = conn;
+ ev_timer_init (&conn->timeout,
+ rspamd_redis_conn_timeout,
+ real_timeout, 0.0);
+ ev_timer_start (conn->elt->pool->event_loop, &conn->timeout);
}
static void
g_assert (!conn->active);
if (conn->ctx->err == REDIS_OK) {
- event_del (&conn->timeout);
+ ev_timer_stop (elt->pool->event_loop, &conn->timeout);
conn->active = TRUE;
g_queue_push_tail_link (elt->active, conn_entry);
msg_debug_rpool ("reused existing connection to %s:%d: %p",
struct rspamd_cache_refresh_cbdata {
gdouble last_resort;
- struct event resort_ev;
+ ev_timer resort_ev;
struct rspamd_symcache *cache;
struct rspamd_worker *w;
- struct ev_loop *ev_base;
+ struct ev_loop *event_loop;
};
/* weight, frequency, time */
if (check) {
msg_debug_cache_task ("execute %s, %d", item->symbol, item->id);
-#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
- struct timeval tv;
-
- event_base_update_cache_time (task->ev_base);
- event_base_gettimeofday_cached (task->ev_base, &tv);
- t1 = tv_to_double (&tv);
-#else
- t1 = rspamd_get_ticks (FALSE);
-#endif
- dyn_item->start_msec = (t1 - task->time_real) * 1e3;
+ t1 = ev_now (task->event_loop);
+ dyn_item->start_msec = (t1 - task->time_virtual) * 1e3;
dyn_item->async_events = 0;
checkpoint->cur_item = item;
checkpoint->items_inflight ++;
}
static void
-rspamd_symcache_resort_cb (gint fd, short what, gpointer ud)
+rspamd_symcache_resort_cb (EV_P_ ev_timer *w, int revents)
{
- struct timeval tv;
gdouble tm;
- struct rspamd_cache_refresh_cbdata *cbdata = ud;
+ struct rspamd_cache_refresh_cbdata *cbdata =
+ (struct rspamd_cache_refresh_cbdata *)w->data;
struct rspamd_symcache *cache;
struct rspamd_symcache_item *item;
guint i;
cur_ticks = rspamd_get_ticks (FALSE);
msg_debug_cache ("resort symbols cache, next reload in %.2f seconds", tm);
g_assert (cache != NULL);
- evtimer_set (&cbdata->resort_ev, rspamd_symcache_resort_cb, cbdata);
- event_base_set (cbdata->ev_base, &cbdata->resort_ev);
- double_to_tv (tm, &tv);
- event_add (&cbdata->resort_ev, &tv);
+ cbdata->resort_ev.repeat = tm;
+ ev_timer_again (EV_A_ w);
if (rspamd_worker_is_primary_controller (cbdata->w)) {
/* Gather stats from shared execution times */
item->frequency_peaks);
if (cache->peak_cb != -1) {
- rspamd_symcache_call_peak_cb (cbdata->ev_base,
+ rspamd_symcache_call_peak_cb (cbdata->event_loop,
cache, item,
cur_value, cur_err);
}
}
}
-
cbdata->last_resort = cur_ticks;
/* We don't do actual sorting due to topological guarantees */
}
}
+static void
+rspamd_symcache_refresh_dtor (void *d)
+{
+ struct rspamd_cache_refresh_cbdata *cbdata =
+ (struct rspamd_cache_refresh_cbdata *)d;
+
+ ev_timer_stop (cbdata->event_loop, &cbdata->resort_ev);
+}
+
void
rspamd_symcache_start_refresh (struct rspamd_symcache *cache,
struct ev_loop *ev_base, struct rspamd_worker *w)
{
- struct timeval tv;
gdouble tm;
struct rspamd_cache_refresh_cbdata *cbdata;
cbdata = rspamd_mempool_alloc0 (cache->static_pool, sizeof (*cbdata));
cbdata->last_resort = rspamd_get_ticks (TRUE);
- cbdata->ev_base = ev_base;
+ cbdata->event_loop = ev_base;
cbdata->w = w;
cbdata->cache = cache;
tm = rspamd_time_jitter (cache->reload_time, 0);
msg_debug_cache ("next reload in %.2f seconds", tm);
g_assert (cache != NULL);
- evtimer_set (&cbdata->resort_ev, rspamd_symcache_resort_cb,
- cbdata);
- event_base_set (ev_base, &cbdata->resort_ev);
- double_to_tv (tm, &tv);
- event_add (&cbdata->resort_ev, &tv);
+ cbdata->resort_ev.data = cbdata;
+ ev_timer_init (&cbdata->resort_ev, rspamd_symcache_resort_cb,
+ tm, tm);
+ ev_timer_start (cbdata->event_loop, &cbdata->resort_ev);
rspamd_mempool_add_destructor (cache->static_pool,
- (rspamd_mempool_destruct_t) event_del,
- &cbdata->resort_ev);
+ rspamd_symcache_refresh_dtor, cbdata);
}
void
checkpoint->items_inflight --;
checkpoint->cur_item = NULL;
-#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
- struct timeval tv;
- event_base_update_cache_time (task->ev_base);
- event_base_gettimeofday_cached (task->ev_base, &tv);
- t2 = tv_to_double (&tv);
-#else
- t2 = rspamd_get_ticks (FALSE);
-#endif
-
- diff = ((t2 - task->time_real) * 1e3 - dyn_item->start_msec);
+ t2 = ev_now (task->event_loop);
+ diff = ((t2 - task->time_virtual) * 1e3 - dyn_item->start_msec);
if (G_UNLIKELY (RSPAMD_TASK_IS_PROFILING (task))) {
rspamd_task_profile_set (task, item->symbol, diff);
backend->stcf = stf;
st_elt = g_malloc0 (sizeof (*st_elt));
- st_elt->event_loop = ctx->ev_base;
+ st_elt->event_loop = ctx->event_loop;
st_elt->ctx = backend;
backend->stat_elt = rspamd_stat_ctx_register_async (
rspamd_redis_async_stat_cb,
stat_ctx->statfiles = g_ptr_array_new ();
stat_ctx->classifiers = g_ptr_array_new ();
stat_ctx->async_elts = g_queue_new ();
- stat_ctx->ev_base = ev_base;
+ stat_ctx->event_loop = ev_base;
stat_ctx->lua_stat_tokens_ref = -1;
/* Interact with lua_stat */
elt->cleanup (elt, elt->ud);
}
- event_del (&elt->timer_ev);
+ ev_timer_stop (elt->event_loop, &elt->timer_ev);
g_free (elt);
}
static void
-rspamd_async_elt_on_timer (gint fd, short what, gpointer d)
+rspamd_async_elt_on_timer (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_stat_async_elt *elt = d;
+ struct rspamd_stat_async_elt *elt = (struct rspamd_stat_async_elt *)w->data;
gdouble jittered_time;
- event_del (&elt->timer_ev);
if (elt->enabled) {
elt->handler (elt, elt->ud);
}
jittered_time = rspamd_time_jitter (elt->timeout, 0);
- double_to_tv (jittered_time, &elt->tv);
- event_add (&elt->timer_ev, &elt->tv);
+ elt->timer_ev.repeat = jittered_time;
+ ev_timer_again (EV_A_ w);
}
struct rspamd_stat_async_elt*
elt->cleanup = cleanup;
elt->ud = d;
elt->timeout = timeout;
+ elt->event_loop = st_ctx->event_loop;
REF_INIT_RETAIN (elt, rspamd_async_elt_dtor);
/* Enabled by default */
- if (st_ctx->ev_base) {
+ if (st_ctx->event_loop) {
elt->enabled = TRUE;
- event_set (&elt->timer_ev, -1, EV_TIMEOUT, rspamd_async_elt_on_timer, elt);
- event_base_set (st_ctx->ev_base, &elt->timer_ev);
/*
* First we set timeval to zero as we want cb to be executed as
* fast as possible
*/
- elt->tv.tv_sec = 0;
- elt->tv.tv_usec = 0;
- event_add (&elt->timer_ev, &elt->tv);
+ ev_timer_init (&elt->timer_ev, rspamd_async_elt_on_timer, 0.0, 0.0);
+ ev_timer_start (st_ctx->event_loop, &elt->timer_ev);
}
else {
elt->enabled = FALSE;
struct rspamd_stat_async_elt {
rspamd_stat_async_handler handler;
rspamd_stat_async_cleanup cleanup;
- struct event timer_ev;
- struct timeval tv;
+ struct ev_loop *event_loop;
+ ev_timer timer_ev;
gdouble timeout;
gboolean enabled;
gpointer ud;
struct rspamd_stat_tokenizer *tokenizer;
gpointer tkcf;
- struct ev_loop *ev_base;
+ struct ev_loop *event_loop;
};
typedef enum rspamd_learn_cache_result {