From d5a36f4f750e63294cefed10f55b29fe7409bba6 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 18 Jun 2019 16:27:25 +0100 Subject: [PATCH] [Project] Another iteration --- src/libserver/fuzzy_backend.c | 33 +++++++++++------------- src/libserver/milter.c | 4 +-- src/libserver/milter_internal.h | 6 ++--- src/libserver/monitored.c | 33 +++++++++++------------- src/libserver/protocol.c | 6 ++--- src/libserver/protocol.h | 2 +- src/libserver/roll_history.c | 10 ++++---- src/libserver/roll_history.h | 2 +- src/libserver/task.c | 45 ++++++--------------------------- src/libserver/task.h | 3 +-- src/lua/lua_task.c | 10 +++++--- src/plugins/dkim_check.c | 10 ++++---- src/plugins/spf.c | 6 ++--- src/worker.c | 2 +- 14 files changed, 69 insertions(+), 103 deletions(-) diff --git a/src/libserver/fuzzy_backend.c b/src/libserver/fuzzy_backend.c index f52fcca99..f6dec1d6e 100644 --- a/src/libserver/fuzzy_backend.c +++ b/src/libserver/fuzzy_backend.c @@ -105,12 +105,12 @@ struct rspamd_fuzzy_backend { enum rspamd_fuzzy_backend_type type; gdouble expire; gdouble sync; - struct ev_loop *ev_base; + struct ev_loop *event_loop; rspamd_fuzzy_periodic_cb periodic_cb; void *periodic_ud; const struct rspamd_fuzzy_backend_subr *subr; void *subr_ud; - struct event periodic_event; + ev_timer periodic_event; }; static GQuark @@ -307,7 +307,7 @@ rspamd_fuzzy_backend_create (struct ev_loop *ev_base, } bk = g_malloc0 (sizeof (*bk)); - bk->ev_base = ev_base; + bk->event_loop = ev_base; bk->expire = expire; bk->type = type; bk->subr = &fuzzy_subrs[type]; @@ -499,17 +499,15 @@ rspamd_fuzzy_backend_periodic_sync (struct rspamd_fuzzy_backend *bk) } static void -rspamd_fuzzy_backend_periodic_cb (gint fd, short what, void *ud) +rspamd_fuzzy_backend_periodic_cb (EV_P_ ev_timer *w, int revents) { - struct rspamd_fuzzy_backend *bk = ud; + struct rspamd_fuzzy_backend *bk = (struct rspamd_fuzzy_backend *)w->data; gdouble jittered; - struct timeval tv; jittered = rspamd_time_jitter (bk->sync, bk->sync / 2.0); - double_to_tv (jittered, &tv); - event_del (&bk->periodic_event); + w->repeat = jittered; rspamd_fuzzy_backend_periodic_sync (bk); - event_add (&bk->periodic_event, &tv); + ev_timer_again (EV_A_ w); } void @@ -519,13 +517,12 @@ rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *bk, void *ud) { gdouble jittered; - struct timeval tv; g_assert (bk != NULL); if (bk->subr->periodic) { if (bk->sync > 0.0) { - event_del (&bk->periodic_event); + ev_timer_stop (bk->event_loop, &bk->periodic_event); } if (cb) { @@ -536,11 +533,11 @@ rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *bk, rspamd_fuzzy_backend_periodic_sync (bk); bk->sync = timeout; jittered = rspamd_time_jitter (timeout, timeout / 2.0); - double_to_tv (jittered, &tv); - event_set (&bk->periodic_event, -1, EV_TIMEOUT, - rspamd_fuzzy_backend_periodic_cb, bk); - event_base_set (bk->ev_base, &bk->periodic_event); - event_add (&bk->periodic_event, &tv); + + bk->periodic_event.data = bk; + ev_timer_init (&bk->periodic_event, rspamd_fuzzy_backend_periodic_cb, + jittered, 0.0); + ev_timer_start (bk->event_loop, &bk->periodic_event); } } @@ -551,7 +548,7 @@ rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *bk) if (bk->sync > 0.0) { rspamd_fuzzy_backend_periodic_sync (bk); - event_del (&bk->periodic_event); + ev_timer_stop (bk->event_loop, &bk->periodic_event); } bk->subr->close (bk, bk->subr_ud); @@ -562,7 +559,7 @@ rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *bk) struct ev_loop* rspamd_fuzzy_backend_event_base (struct rspamd_fuzzy_backend *backend) { - return backend->ev_base; + return backend->event_loop; } gdouble diff --git a/src/libserver/milter.c b/src/libserver/milter.c index ecd66cfe6..f9b4eee8c 100644 --- a/src/libserver/milter.c +++ b/src/libserver/milter.c @@ -273,7 +273,7 @@ rspamd_milter_plan_io (struct rspamd_milter_session *session, event_set (&priv->ev, priv->fd, what, rspamd_milter_io_handler, session); - event_base_set (priv->ev_base, &priv->ev); + event_base_set (priv->event_loop, &priv->ev); event_add (&priv->ev, priv->ptv); } @@ -1103,7 +1103,7 @@ rspamd_milter_handle_socket (gint fd, const struct timeval *tv, priv->err_cb = error_cb; priv->parser.state = st_len_1; priv->parser.buf = rspamd_fstring_sized_new (RSPAMD_MILTER_MESSAGE_CHUNK + 5); - priv->ev_base = ev_base; + priv->event_loop = ev_base; priv->state = RSPAMD_MILTER_READ_MORE; priv->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "milter"); priv->discard_on_reject = milter_ctx->discard_on_reject; diff --git a/src/libserver/milter_internal.h b/src/libserver/milter_internal.h index 7da696b36..7c0e4f26f 100644 --- a/src/libserver/milter_internal.h +++ b/src/libserver/milter_internal.h @@ -22,6 +22,7 @@ #include "contrib/libev/ev.h" #include "khash.h" #include "libutil/str_util.h" +#include "libutil/libev_helper.h" enum rspamd_milter_state { st_len_1 = 0, @@ -59,11 +60,10 @@ KHASH_INIT (milter_headers_hash_t, char *, GArray *, true, struct rspamd_milter_private { struct rspamd_milter_parser parser; - struct event ev; - struct timeval tv; + struct rspamd_io_ev ev; struct rspamd_milter_outbuf *out_chain; struct timeval *ptv; - struct ev_loop *ev_base; + struct ev_loop *event_loop; rspamd_mempool_t *pool; khash_t(milter_headers_hash_t) *headers; gint cur_hdr; diff --git a/src/libserver/monitored.c b/src/libserver/monitored.c index 2e876242b..d64ec92fe 100644 --- a/src/libserver/monitored.c +++ b/src/libserver/monitored.c @@ -39,7 +39,7 @@ struct rspamd_monitored_methods { struct rspamd_monitored_ctx { struct rspamd_config *cfg; struct rdns_resolver *resolver; - struct ev_loop *ev_base; + struct ev_loop *event_loop; GPtrArray *elts; GHashTable *helts; mon_change_cb change_cb; @@ -63,7 +63,7 @@ struct rspamd_monitored { enum rspamd_monitored_flags flags; struct rspamd_monitored_ctx *ctx; struct rspamd_monitored_methods proc; - struct event periodic; + ev_timer periodic; gchar tag[RSPAMD_MONITORED_TAG_LEN]; }; @@ -169,9 +169,9 @@ rspamd_monitored_propagate_success (struct rspamd_monitored *m, gdouble lat) } static void -rspamd_monitored_periodic (gint fd, short what, gpointer ud) +rspamd_monitored_periodic (EV_P_ ev_timer *w, int revents) { - struct rspamd_monitored *m = ud; + struct rspamd_monitored *m = (struct rspamd_monitored *)w->data; struct timeval tv; gdouble jittered; gboolean ret = FALSE; @@ -185,7 +185,8 @@ rspamd_monitored_periodic (gint fd, short what, gpointer ud) } if (ret) { - event_add (&m->periodic, &tv); + m->periodic.repeat = jittered; + ev_timer_again (EV_A_ &m->periodic); } } @@ -436,7 +437,7 @@ rspamd_monitored_ctx_config (struct rspamd_monitored_ctx *ctx, guint i; g_assert (ctx != NULL); - ctx->ev_base = ev_base; + ctx->event_loop = ev_base; ctx->resolver = resolver; ctx->cfg = cfg; ctx->initialized = TRUE; @@ -460,7 +461,7 @@ rspamd_monitored_ctx_config (struct rspamd_monitored_ctx *ctx, struct ev_loop * rspamd_monitored_ctx_get_ev_base (struct rspamd_monitored_ctx *ctx) { - return ctx->ev_base; + return ctx->event_loop; } @@ -527,7 +528,7 @@ rspamd_monitored_create_ (struct rspamd_monitored_ctx *ctx, g_ptr_array_add (ctx->elts, m); - if (ctx->ev_base) { + if (ctx->event_loop) { rspamd_monitored_start (m); } @@ -592,30 +593,26 @@ rspamd_monitored_stop (struct rspamd_monitored *m) { g_assert (m != NULL); - if (rspamd_event_pending (&m->periodic, EV_TIMEOUT)) { - event_del (&m->periodic); - } + ev_timer_stop (m->ctx->event_loop, &m->periodic); } void rspamd_monitored_start (struct rspamd_monitored *m) { - struct timeval tv; gdouble jittered; g_assert (m != NULL); msg_debug_mon ("started monitored object %s", m->url); jittered = rspamd_time_jitter (m->ctx->monitoring_interval * m->monitoring_mult, 0.0); - double_to_tv (jittered, &tv); - if (rspamd_event_pending (&m->periodic, EV_TIMEOUT)) { - event_del (&m->periodic); + if (ev_is_active (&m->periodic)) { + ev_timer_stop (m->ctx->event_loop, &m->periodic); } - event_set (&m->periodic, -1, EV_TIMEOUT, rspamd_monitored_periodic, m); - event_base_set (m->ctx->ev_base, &m->periodic); - event_add (&m->periodic, &tv); + m->periodic.data = m; + ev_timer_init (&m->periodic, rspamd_monitored_periodic, jittered, 0.0); + ev_timer_start (m->ctx->event_loop, &m->periodic); } void diff --git a/src/libserver/protocol.c b/src/libserver/protocol.c index 7df5b27c5..f2827828b 100644 --- a/src/libserver/protocol.c +++ b/src/libserver/protocol.c @@ -1766,7 +1766,7 @@ rspamd_protocol_write_log_pipe (struct rspamd_task *task) } void -rspamd_protocol_write_reply (struct rspamd_task *task) +rspamd_protocol_write_reply (struct rspamd_task *task, ev_tstamp timeout) { struct rspamd_http_message *msg; const gchar *ctype = "application/json"; @@ -1786,7 +1786,7 @@ rspamd_protocol_write_reply (struct rspamd_task *task) msg->flags |= RSPAMD_HTTP_FLAG_SPAMC; } - msg->date = time (NULL); + msg->date = ev_time (); msg_debug_protocol ("writing reply to client"); if (task->err != NULL) { @@ -1832,7 +1832,7 @@ rspamd_protocol_write_reply (struct rspamd_task *task) rspamd_http_connection_reset (task->http_conn); rspamd_http_connection_write_message (task->http_conn, msg, NULL, - ctype, task, &task->tv); + ctype, task, timeout); task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED; } diff --git a/src/libserver/protocol.h b/src/libserver/protocol.h index 08372d765..2059110fb 100644 --- a/src/libserver/protocol.h +++ b/src/libserver/protocol.h @@ -103,7 +103,7 @@ ucl_object_t * rspamd_protocol_write_ucl (struct rspamd_task *task, * @param task task object * @return 0 if we wrote reply and -1 if there was some error */ -void rspamd_protocol_write_reply (struct rspamd_task *task); +void rspamd_protocol_write_reply (struct rspamd_task *task, ev_tstamp timeout); /** * Convert rspamd output to legacy protocol reply diff --git a/src/libserver/roll_history.c b/src/libserver/roll_history.c index c9367409d..c70246383 100644 --- a/src/libserver/roll_history.c +++ b/src/libserver/roll_history.c @@ -136,7 +136,7 @@ rspamd_roll_history_update (struct roll_history *history, rspamd_strlcpy (row->from_addr, "unknown", sizeof (row->from_addr)); } - memcpy (&row->tv, &task->tv, sizeof (row->tv)); + row->timestamp = task->task_timestamp; /* Strings */ rspamd_strlcpy (row->message_id, task->message_id, @@ -173,7 +173,7 @@ rspamd_roll_history_update (struct roll_history *history, } } - row->scan_time = task->time_real_finish - task->time_real; + row->scan_time = task->time_real_finish - task->task_timestamp; row->len = task->msg.len; g_atomic_int_set (&row->completed, TRUE); } @@ -282,7 +282,7 @@ rspamd_roll_history_load (struct roll_history *history, const gchar *filename) elt = ucl_object_lookup (cur, "time"); if (elt && ucl_object_type (elt) == UCL_FLOAT) { - double_to_tv (ucl_object_todouble (elt), &row->tv); + row->timestamp = ucl_object_todouble (elt); } elt = ucl_object_lookup (cur, "id"); @@ -391,8 +391,8 @@ rspamd_roll_history_save (struct roll_history *history, const gchar *filename) elt = ucl_object_typed_new (UCL_OBJECT); - ucl_object_insert_key (elt, ucl_object_fromdouble ( - tv_to_double (&row->tv)), "time", 0, false); + ucl_object_insert_key (elt, ucl_object_fromdouble (row->timestamp), + "time", 0, false); ucl_object_insert_key (elt, ucl_object_fromstring (row->message_id), "id", 0, false); ucl_object_insert_key (elt, ucl_object_fromstring (row->symbols), diff --git a/src/libserver/roll_history.h b/src/libserver/roll_history.h index d8a77bfd7..d0f140098 100644 --- a/src/libserver/roll_history.h +++ b/src/libserver/roll_history.h @@ -33,7 +33,7 @@ struct rspamd_task; struct rspamd_config; struct roll_history_row { - struct timeval tv; + ev_tstamp timestamp; gchar message_id[HISTORY_MAX_ID]; gchar symbols[HISTORY_MAX_SYMBOLS]; gchar user[HISTORY_MAX_USER]; diff --git a/src/libserver/task.c b/src/libserver/task.c index 62b8725c3..84ea1417a 100644 --- a/src/libserver/task.c +++ b/src/libserver/task.c @@ -102,23 +102,8 @@ rspamd_task_new (struct rspamd_worker *worker, struct rspamd_config *cfg, } new_task->event_loop = ev_base; - -#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC - if (ev_base) { - event_base_update_cache_time (ev_base); - event_base_gettimeofday_cached (ev_base, &new_task->tv); - new_task->time_real = tv_to_double (&new_task->tv); - } - else { - gettimeofday (&new_task->tv, NULL); - new_task->time_real = tv_to_double (&new_task->tv); - } -#else - gettimeofday (&new_task->tv, NULL); - new_task->time_real = tv_to_double (&new_task->tv); -#endif - - new_task->time_virtual = rspamd_get_virtual_ticks (); + new_task->task_timestamp = ev_time (); + new_task->time_virtual = ev_now (ev_base); new_task->time_real_finish = NAN; new_task->time_virtual_finish = NAN; @@ -185,11 +170,13 @@ rspamd_task_new (struct rspamd_worker *worker, struct rspamd_config *cfg, static void rspamd_task_reply (struct rspamd_task *task) { + const ev_tstamp write_timeout = 2.0; + if (task->fin_callback) { task->fin_callback (task, task->fin_arg); } else { - rspamd_protocol_write_reply (task); + rspamd_protocol_write_reply (task, write_timeout); } } @@ -1450,7 +1437,7 @@ rspamd_task_log_variable (struct rspamd_task *task, var.begin = numbuf; break; case RSPAMD_LOG_TIME_REAL: - var.begin = rspamd_log_check_time (task->time_real, + var.begin = rspamd_log_check_time (task->task_timestamp, task->time_real_finish, task->cfg->clock_res); var.len = strlen (var.begin); @@ -1748,25 +1735,9 @@ rspamd_task_profile_get (struct rspamd_task *task, const gchar *key) gboolean rspamd_task_set_finish_time (struct rspamd_task *task) { - struct timeval tv; - if (isnan (task->time_real_finish)) { - -#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC - if (task->ev_base) { - event_base_update_cache_time (task->ev_base); - event_base_gettimeofday_cached (task->ev_base, &tv); - task->time_real_finish = tv_to_double (&tv); - } - else { - gettimeofday (&tv, NULL); - task->time_real_finish = tv_to_double (&tv); - } -#else - gettimeofday (&tv, NULL); - task->time_real_finish = tv_to_double (&tv); -#endif - task->time_virtual_finish = rspamd_get_virtual_ticks (); + task->time_real_finish = ev_time (); + task->time_virtual_finish = ev_now (task->event_loop); return TRUE; } diff --git a/src/libserver/task.h b/src/libserver/task.h index ca42da6b3..a73102424 100644 --- a/src/libserver/task.h +++ b/src/libserver/task.h @@ -189,11 +189,10 @@ struct rspamd_task { struct rspamd_config *cfg; /**< pointer to config object */ GError *err; rspamd_mempool_t *task_pool; /**< memory pool for task */ - double time_real; double time_virtual; double time_real_finish; double time_virtual_finish; - struct timeval tv; + ev_tstamp task_timestamp; gboolean (*fin_callback)(struct rspamd_task *task, void *arg); /**< callback for filters finalizing */ void *fin_arg; /**< argument for fin callback */ diff --git a/src/lua/lua_task.c b/src/lua/lua_task.c index 11ee6b32a..0ffe4b8c5 100644 --- a/src/lua/lua_task.c +++ b/src/lua/lua_task.c @@ -4309,7 +4309,7 @@ lua_task_get_date (lua_State *L) } /* Get GMT date and store it to time_t */ if (type == DATE_CONNECT || type == DATE_CONNECT_STRING) { - tim = (tv_to_msec (&task->tv)) / 1000.; + tim = task->task_timestamp; if (!gmt) { struct tm t; @@ -4399,14 +4399,16 @@ lua_task_get_timeval (lua_State *L) { LUA_TRACE_POINT; struct rspamd_task *task = lua_check_task (L, 1); + struct timeval tv; if (task != NULL) { + double_to_tv (task->task_timestamp, &tv); lua_createtable (L, 0, 2); lua_pushstring (L, "tv_sec"); - lua_pushinteger (L, (lua_Integer)task->tv.tv_sec); + lua_pushinteger (L, (lua_Integer)tv.tv_sec); lua_settable (L, -3); lua_pushstring (L, "tv_usec"); - lua_pushinteger (L, (lua_Integer)task->tv.tv_usec); + lua_pushinteger (L, (lua_Integer)tv.tv_usec); lua_settable (L, -3); } else { @@ -4429,7 +4431,7 @@ lua_task_get_scan_time (lua_State *L) } rspamd_task_set_finish_time (task); - lua_pushnumber (L, task->time_real_finish - task->time_real); + lua_pushnumber (L, task->time_real_finish - task->task_timestamp); lua_pushnumber (L, task->time_virtual_finish - task->time_virtual); if (!set) { diff --git a/src/plugins/dkim_check.c b/src/plugins/dkim_check.c index e1cdc5e98..233ecf1d1 100644 --- a/src/plugins/dkim_check.c +++ b/src/plugins/dkim_check.c @@ -1056,8 +1056,8 @@ dkim_module_key_handler (rspamd_dkim_key_t *key, * lru hash owns this object now */ rspamd_lru_hash_insert (dkim_module_ctx->dkim_hash, - g_strdup (rspamd_dkim_get_dns_key (ctx)), - key, res->task->tv.tv_sec, rspamd_dkim_key_get_ttl (key)); + g_strdup (rspamd_dkim_get_dns_key (ctx)), + key, res->task->task_timestamp, rspamd_dkim_key_get_ttl (key)); /* Release key when task is processed */ rspamd_mempool_add_destructor (res->task->task_pool, dkim_module_key_dtor, res->key); @@ -1210,7 +1210,7 @@ dkim_symbol_callback (struct rspamd_task *task, key = rspamd_lru_hash_lookup (dkim_module_ctx->dkim_hash, rspamd_dkim_get_dns_key (ctx), - task->tv.tv_sec); + task->task_timestamp); if (key != NULL) { cur->key = rspamd_dkim_key_ref (key); @@ -1400,7 +1400,7 @@ dkim_module_lua_on_key (rspamd_dkim_key_t *key, */ rspamd_lru_hash_insert (dkim_module_ctx->dkim_hash, g_strdup (rspamd_dkim_get_dns_key (ctx)), - key, cbd->task->tv.tv_sec, rspamd_dkim_key_get_ttl (key)); + key, cbd->task->task_timestamp, rspamd_dkim_key_get_ttl (key)); /* Release key when task is processed */ rspamd_mempool_add_destructor (cbd->task->task_pool, dkim_module_key_dtor, cbd->key); @@ -1507,7 +1507,7 @@ lua_dkim_verify_handler (lua_State *L) key = rspamd_lru_hash_lookup (dkim_module_ctx->dkim_hash, rspamd_dkim_get_dns_key (ctx), - task->tv.tv_sec); + task->task_timestamp); if (key != NULL) { cbd->key = rspamd_dkim_key_ref (key); diff --git a/src/plugins/spf.c b/src/plugins/spf.c index 6a4004e57..f10807f47 100644 --- a/src/plugins/spf.c +++ b/src/plugins/spf.c @@ -561,7 +561,7 @@ spf_plugin_callback (struct spf_resolved *record, struct rspamd_task *task, spf_record_ref (record); if ((l = rspamd_lru_hash_lookup (spf_module_ctx->spf_hash, - record->domain, task->tv.tv_sec)) == NULL) { + record->domain, task->task_timestamp)) == NULL) { l = record; if (record->ttl > 0 && @@ -571,7 +571,7 @@ spf_plugin_callback (struct spf_resolved *record, struct rspamd_task *task, rspamd_lru_hash_insert (spf_module_ctx->spf_hash, record->domain, spf_record_ref (l), - task->tv.tv_sec, record->ttl); + task->task_timestamp, record->ttl); msg_info_task ("stored record for %s (0x%xuL) in LRU cache for %d seconds, " "%d/%d elements in the cache", @@ -642,7 +642,7 @@ spf_symbol_callback (struct rspamd_task *task, if (domain) { if ((l = rspamd_lru_hash_lookup (spf_module_ctx->spf_hash, domain, - task->tv.tv_sec)) != NULL) { + task->task_timestamp)) != NULL) { spf_record_ref (l); spf_check_list (l, task, TRUE); spf_record_unref (l); diff --git a/src/worker.c b/src/worker.c index 62c781c05..5a4adb325 100644 --- a/src/worker.c +++ b/src/worker.c @@ -332,7 +332,7 @@ rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err) NULL, "application/json", task, - &task->tv); + 1.0); } } -- 2.39.5