Browse Source

[Project] Another iteration

tags/2.0
Vsevolod Stakhov 5 years ago
parent
commit
d5a36f4f75

+ 15
- 18
src/libserver/fuzzy_backend.c View File

@@ -105,12 +105,12 @@ struct rspamd_fuzzy_backend {
enum rspamd_fuzzy_backend_type type;
gdouble expire;
gdouble sync;
struct ev_loop *ev_base;
struct ev_loop *event_loop;
rspamd_fuzzy_periodic_cb periodic_cb;
void *periodic_ud;
const struct rspamd_fuzzy_backend_subr *subr;
void *subr_ud;
struct event periodic_event;
ev_timer periodic_event;
};

static GQuark
@@ -307,7 +307,7 @@ rspamd_fuzzy_backend_create (struct ev_loop *ev_base,
}

bk = g_malloc0 (sizeof (*bk));
bk->ev_base = ev_base;
bk->event_loop = ev_base;
bk->expire = expire;
bk->type = type;
bk->subr = &fuzzy_subrs[type];
@@ -499,17 +499,15 @@ rspamd_fuzzy_backend_periodic_sync (struct rspamd_fuzzy_backend *bk)
}

static void
rspamd_fuzzy_backend_periodic_cb (gint fd, short what, void *ud)
rspamd_fuzzy_backend_periodic_cb (EV_P_ ev_timer *w, int revents)
{
struct rspamd_fuzzy_backend *bk = ud;
struct rspamd_fuzzy_backend *bk = (struct rspamd_fuzzy_backend *)w->data;
gdouble jittered;
struct timeval tv;

jittered = rspamd_time_jitter (bk->sync, bk->sync / 2.0);
double_to_tv (jittered, &tv);
event_del (&bk->periodic_event);
w->repeat = jittered;
rspamd_fuzzy_backend_periodic_sync (bk);
event_add (&bk->periodic_event, &tv);
ev_timer_again (EV_A_ w);
}

void
@@ -519,13 +517,12 @@ rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *bk,
void *ud)
{
gdouble jittered;
struct timeval tv;

g_assert (bk != NULL);

if (bk->subr->periodic) {
if (bk->sync > 0.0) {
event_del (&bk->periodic_event);
ev_timer_stop (bk->event_loop, &bk->periodic_event);
}

if (cb) {
@@ -536,11 +533,11 @@ rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *bk,
rspamd_fuzzy_backend_periodic_sync (bk);
bk->sync = timeout;
jittered = rspamd_time_jitter (timeout, timeout / 2.0);
double_to_tv (jittered, &tv);
event_set (&bk->periodic_event, -1, EV_TIMEOUT,
rspamd_fuzzy_backend_periodic_cb, bk);
event_base_set (bk->ev_base, &bk->periodic_event);
event_add (&bk->periodic_event, &tv);
bk->periodic_event.data = bk;
ev_timer_init (&bk->periodic_event, rspamd_fuzzy_backend_periodic_cb,
jittered, 0.0);
ev_timer_start (bk->event_loop, &bk->periodic_event);
}
}

@@ -551,7 +548,7 @@ rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *bk)

if (bk->sync > 0.0) {
rspamd_fuzzy_backend_periodic_sync (bk);
event_del (&bk->periodic_event);
ev_timer_stop (bk->event_loop, &bk->periodic_event);
}

bk->subr->close (bk, bk->subr_ud);
@@ -562,7 +559,7 @@ rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *bk)
struct ev_loop*
rspamd_fuzzy_backend_event_base (struct rspamd_fuzzy_backend *backend)
{
return backend->ev_base;
return backend->event_loop;
}

gdouble

+ 2
- 2
src/libserver/milter.c View File

@@ -273,7 +273,7 @@ rspamd_milter_plan_io (struct rspamd_milter_session *session,

event_set (&priv->ev, priv->fd, what, rspamd_milter_io_handler,
session);
event_base_set (priv->ev_base, &priv->ev);
event_base_set (priv->event_loop, &priv->ev);
event_add (&priv->ev, priv->ptv);
}

@@ -1103,7 +1103,7 @@ rspamd_milter_handle_socket (gint fd, const struct timeval *tv,
priv->err_cb = error_cb;
priv->parser.state = st_len_1;
priv->parser.buf = rspamd_fstring_sized_new (RSPAMD_MILTER_MESSAGE_CHUNK + 5);
priv->ev_base = ev_base;
priv->event_loop = ev_base;
priv->state = RSPAMD_MILTER_READ_MORE;
priv->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "milter");
priv->discard_on_reject = milter_ctx->discard_on_reject;

+ 3
- 3
src/libserver/milter_internal.h View File

@@ -22,6 +22,7 @@
#include "contrib/libev/ev.h"
#include "khash.h"
#include "libutil/str_util.h"
#include "libutil/libev_helper.h"

enum rspamd_milter_state {
st_len_1 = 0,
@@ -59,11 +60,10 @@ KHASH_INIT (milter_headers_hash_t, char *, GArray *, true,

struct rspamd_milter_private {
struct rspamd_milter_parser parser;
struct event ev;
struct timeval tv;
struct rspamd_io_ev ev;
struct rspamd_milter_outbuf *out_chain;
struct timeval *ptv;
struct ev_loop *ev_base;
struct ev_loop *event_loop;
rspamd_mempool_t *pool;
khash_t(milter_headers_hash_t) *headers;
gint cur_hdr;

+ 15
- 18
src/libserver/monitored.c View File

@@ -39,7 +39,7 @@ struct rspamd_monitored_methods {
struct rspamd_monitored_ctx {
struct rspamd_config *cfg;
struct rdns_resolver *resolver;
struct ev_loop *ev_base;
struct ev_loop *event_loop;
GPtrArray *elts;
GHashTable *helts;
mon_change_cb change_cb;
@@ -63,7 +63,7 @@ struct rspamd_monitored {
enum rspamd_monitored_flags flags;
struct rspamd_monitored_ctx *ctx;
struct rspamd_monitored_methods proc;
struct event periodic;
ev_timer periodic;
gchar tag[RSPAMD_MONITORED_TAG_LEN];
};

@@ -169,9 +169,9 @@ rspamd_monitored_propagate_success (struct rspamd_monitored *m, gdouble lat)
}

static void
rspamd_monitored_periodic (gint fd, short what, gpointer ud)
rspamd_monitored_periodic (EV_P_ ev_timer *w, int revents)
{
struct rspamd_monitored *m = ud;
struct rspamd_monitored *m = (struct rspamd_monitored *)w->data;
struct timeval tv;
gdouble jittered;
gboolean ret = FALSE;
@@ -185,7 +185,8 @@ rspamd_monitored_periodic (gint fd, short what, gpointer ud)
}

if (ret) {
event_add (&m->periodic, &tv);
m->periodic.repeat = jittered;
ev_timer_again (EV_A_ &m->periodic);
}
}

@@ -436,7 +437,7 @@ rspamd_monitored_ctx_config (struct rspamd_monitored_ctx *ctx,
guint i;

g_assert (ctx != NULL);
ctx->ev_base = ev_base;
ctx->event_loop = ev_base;
ctx->resolver = resolver;
ctx->cfg = cfg;
ctx->initialized = TRUE;
@@ -460,7 +461,7 @@ rspamd_monitored_ctx_config (struct rspamd_monitored_ctx *ctx,
struct ev_loop *
rspamd_monitored_ctx_get_ev_base (struct rspamd_monitored_ctx *ctx)
{
return ctx->ev_base;
return ctx->event_loop;
}


@@ -527,7 +528,7 @@ rspamd_monitored_create_ (struct rspamd_monitored_ctx *ctx,

g_ptr_array_add (ctx->elts, m);

if (ctx->ev_base) {
if (ctx->event_loop) {
rspamd_monitored_start (m);
}

@@ -592,30 +593,26 @@ rspamd_monitored_stop (struct rspamd_monitored *m)
{
g_assert (m != NULL);

if (rspamd_event_pending (&m->periodic, EV_TIMEOUT)) {
event_del (&m->periodic);
}
ev_timer_stop (m->ctx->event_loop, &m->periodic);
}

void
rspamd_monitored_start (struct rspamd_monitored *m)
{
struct timeval tv;
gdouble jittered;

g_assert (m != NULL);
msg_debug_mon ("started monitored object %s", m->url);
jittered = rspamd_time_jitter (m->ctx->monitoring_interval * m->monitoring_mult,
0.0);
double_to_tv (jittered, &tv);

if (rspamd_event_pending (&m->periodic, EV_TIMEOUT)) {
event_del (&m->periodic);
if (ev_is_active (&m->periodic)) {
ev_timer_stop (m->ctx->event_loop, &m->periodic);
}

event_set (&m->periodic, -1, EV_TIMEOUT, rspamd_monitored_periodic, m);
event_base_set (m->ctx->ev_base, &m->periodic);
event_add (&m->periodic, &tv);
m->periodic.data = m;
ev_timer_init (&m->periodic, rspamd_monitored_periodic, jittered, 0.0);
ev_timer_start (m->ctx->event_loop, &m->periodic);
}

void

+ 3
- 3
src/libserver/protocol.c View File

@@ -1766,7 +1766,7 @@ rspamd_protocol_write_log_pipe (struct rspamd_task *task)
}

void
rspamd_protocol_write_reply (struct rspamd_task *task)
rspamd_protocol_write_reply (struct rspamd_task *task, ev_tstamp timeout)
{
struct rspamd_http_message *msg;
const gchar *ctype = "application/json";
@@ -1786,7 +1786,7 @@ rspamd_protocol_write_reply (struct rspamd_task *task)
msg->flags |= RSPAMD_HTTP_FLAG_SPAMC;
}

msg->date = time (NULL);
msg->date = ev_time ();

msg_debug_protocol ("writing reply to client");
if (task->err != NULL) {
@@ -1832,7 +1832,7 @@ rspamd_protocol_write_reply (struct rspamd_task *task)

rspamd_http_connection_reset (task->http_conn);
rspamd_http_connection_write_message (task->http_conn, msg, NULL,
ctype, task, &task->tv);
ctype, task, timeout);

task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED;
}

+ 1
- 1
src/libserver/protocol.h View File

@@ -103,7 +103,7 @@ ucl_object_t * rspamd_protocol_write_ucl (struct rspamd_task *task,
* @param task task object
* @return 0 if we wrote reply and -1 if there was some error
*/
void rspamd_protocol_write_reply (struct rspamd_task *task);
void rspamd_protocol_write_reply (struct rspamd_task *task, ev_tstamp timeout);

/**
* Convert rspamd output to legacy protocol reply

+ 5
- 5
src/libserver/roll_history.c View File

@@ -136,7 +136,7 @@ rspamd_roll_history_update (struct roll_history *history,
rspamd_strlcpy (row->from_addr, "unknown", sizeof (row->from_addr));
}

memcpy (&row->tv, &task->tv, sizeof (row->tv));
row->timestamp = task->task_timestamp;

/* Strings */
rspamd_strlcpy (row->message_id, task->message_id,
@@ -173,7 +173,7 @@ rspamd_roll_history_update (struct roll_history *history,
}
}

row->scan_time = task->time_real_finish - task->time_real;
row->scan_time = task->time_real_finish - task->task_timestamp;
row->len = task->msg.len;
g_atomic_int_set (&row->completed, TRUE);
}
@@ -282,7 +282,7 @@ rspamd_roll_history_load (struct roll_history *history, const gchar *filename)
elt = ucl_object_lookup (cur, "time");

if (elt && ucl_object_type (elt) == UCL_FLOAT) {
double_to_tv (ucl_object_todouble (elt), &row->tv);
row->timestamp = ucl_object_todouble (elt);
}

elt = ucl_object_lookup (cur, "id");
@@ -391,8 +391,8 @@ rspamd_roll_history_save (struct roll_history *history, const gchar *filename)

elt = ucl_object_typed_new (UCL_OBJECT);

ucl_object_insert_key (elt, ucl_object_fromdouble (
tv_to_double (&row->tv)), "time", 0, false);
ucl_object_insert_key (elt, ucl_object_fromdouble (row->timestamp),
"time", 0, false);
ucl_object_insert_key (elt, ucl_object_fromstring (row->message_id),
"id", 0, false);
ucl_object_insert_key (elt, ucl_object_fromstring (row->symbols),

+ 1
- 1
src/libserver/roll_history.h View File

@@ -33,7 +33,7 @@ struct rspamd_task;
struct rspamd_config;

struct roll_history_row {
struct timeval tv;
ev_tstamp timestamp;
gchar message_id[HISTORY_MAX_ID];
gchar symbols[HISTORY_MAX_SYMBOLS];
gchar user[HISTORY_MAX_USER];

+ 8
- 37
src/libserver/task.c View File

@@ -102,23 +102,8 @@ rspamd_task_new (struct rspamd_worker *worker, struct rspamd_config *cfg,
}

new_task->event_loop = ev_base;

#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
if (ev_base) {
event_base_update_cache_time (ev_base);
event_base_gettimeofday_cached (ev_base, &new_task->tv);
new_task->time_real = tv_to_double (&new_task->tv);
}
else {
gettimeofday (&new_task->tv, NULL);
new_task->time_real = tv_to_double (&new_task->tv);
}
#else
gettimeofday (&new_task->tv, NULL);
new_task->time_real = tv_to_double (&new_task->tv);
#endif

new_task->time_virtual = rspamd_get_virtual_ticks ();
new_task->task_timestamp = ev_time ();
new_task->time_virtual = ev_now (ev_base);
new_task->time_real_finish = NAN;
new_task->time_virtual_finish = NAN;

@@ -185,11 +170,13 @@ rspamd_task_new (struct rspamd_worker *worker, struct rspamd_config *cfg,
static void
rspamd_task_reply (struct rspamd_task *task)
{
const ev_tstamp write_timeout = 2.0;

if (task->fin_callback) {
task->fin_callback (task, task->fin_arg);
}
else {
rspamd_protocol_write_reply (task);
rspamd_protocol_write_reply (task, write_timeout);
}
}

@@ -1450,7 +1437,7 @@ rspamd_task_log_variable (struct rspamd_task *task,
var.begin = numbuf;
break;
case RSPAMD_LOG_TIME_REAL:
var.begin = rspamd_log_check_time (task->time_real,
var.begin = rspamd_log_check_time (task->task_timestamp,
task->time_real_finish,
task->cfg->clock_res);
var.len = strlen (var.begin);
@@ -1748,25 +1735,9 @@ rspamd_task_profile_get (struct rspamd_task *task, const gchar *key)
gboolean
rspamd_task_set_finish_time (struct rspamd_task *task)
{
struct timeval tv;

if (isnan (task->time_real_finish)) {

#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
if (task->ev_base) {
event_base_update_cache_time (task->ev_base);
event_base_gettimeofday_cached (task->ev_base, &tv);
task->time_real_finish = tv_to_double (&tv);
}
else {
gettimeofday (&tv, NULL);
task->time_real_finish = tv_to_double (&tv);
}
#else
gettimeofday (&tv, NULL);
task->time_real_finish = tv_to_double (&tv);
#endif
task->time_virtual_finish = rspamd_get_virtual_ticks ();
task->time_real_finish = ev_time ();
task->time_virtual_finish = ev_now (task->event_loop);

return TRUE;
}

+ 1
- 2
src/libserver/task.h View File

@@ -189,11 +189,10 @@ struct rspamd_task {
struct rspamd_config *cfg; /**< pointer to config object */
GError *err;
rspamd_mempool_t *task_pool; /**< memory pool for task */
double time_real;
double time_virtual;
double time_real_finish;
double time_virtual_finish;
struct timeval tv;
ev_tstamp task_timestamp;
gboolean (*fin_callback)(struct rspamd_task *task, void *arg);
/**< callback for filters finalizing */
void *fin_arg; /**< argument for fin callback */

+ 6
- 4
src/lua/lua_task.c View File

@@ -4309,7 +4309,7 @@ lua_task_get_date (lua_State *L)
}
/* Get GMT date and store it to time_t */
if (type == DATE_CONNECT || type == DATE_CONNECT_STRING) {
tim = (tv_to_msec (&task->tv)) / 1000.;
tim = task->task_timestamp;

if (!gmt) {
struct tm t;
@@ -4399,14 +4399,16 @@ lua_task_get_timeval (lua_State *L)
{
LUA_TRACE_POINT;
struct rspamd_task *task = lua_check_task (L, 1);
struct timeval tv;

if (task != NULL) {
double_to_tv (task->task_timestamp, &tv);
lua_createtable (L, 0, 2);
lua_pushstring (L, "tv_sec");
lua_pushinteger (L, (lua_Integer)task->tv.tv_sec);
lua_pushinteger (L, (lua_Integer)tv.tv_sec);
lua_settable (L, -3);
lua_pushstring (L, "tv_usec");
lua_pushinteger (L, (lua_Integer)task->tv.tv_usec);
lua_pushinteger (L, (lua_Integer)tv.tv_usec);
lua_settable (L, -3);
}
else {
@@ -4429,7 +4431,7 @@ lua_task_get_scan_time (lua_State *L)
}

rspamd_task_set_finish_time (task);
lua_pushnumber (L, task->time_real_finish - task->time_real);
lua_pushnumber (L, task->time_real_finish - task->task_timestamp);
lua_pushnumber (L, task->time_virtual_finish - task->time_virtual);

if (!set) {

+ 5
- 5
src/plugins/dkim_check.c View File

@@ -1056,8 +1056,8 @@ dkim_module_key_handler (rspamd_dkim_key_t *key,
* lru hash owns this object now
*/
rspamd_lru_hash_insert (dkim_module_ctx->dkim_hash,
g_strdup (rspamd_dkim_get_dns_key (ctx)),
key, res->task->tv.tv_sec, rspamd_dkim_key_get_ttl (key));
g_strdup (rspamd_dkim_get_dns_key (ctx)),
key, res->task->task_timestamp, rspamd_dkim_key_get_ttl (key));
/* Release key when task is processed */
rspamd_mempool_add_destructor (res->task->task_pool,
dkim_module_key_dtor, res->key);
@@ -1210,7 +1210,7 @@ dkim_symbol_callback (struct rspamd_task *task,

key = rspamd_lru_hash_lookup (dkim_module_ctx->dkim_hash,
rspamd_dkim_get_dns_key (ctx),
task->tv.tv_sec);
task->task_timestamp);

if (key != NULL) {
cur->key = rspamd_dkim_key_ref (key);
@@ -1400,7 +1400,7 @@ dkim_module_lua_on_key (rspamd_dkim_key_t *key,
*/
rspamd_lru_hash_insert (dkim_module_ctx->dkim_hash,
g_strdup (rspamd_dkim_get_dns_key (ctx)),
key, cbd->task->tv.tv_sec, rspamd_dkim_key_get_ttl (key));
key, cbd->task->task_timestamp, rspamd_dkim_key_get_ttl (key));
/* Release key when task is processed */
rspamd_mempool_add_destructor (cbd->task->task_pool,
dkim_module_key_dtor, cbd->key);
@@ -1507,7 +1507,7 @@ lua_dkim_verify_handler (lua_State *L)

key = rspamd_lru_hash_lookup (dkim_module_ctx->dkim_hash,
rspamd_dkim_get_dns_key (ctx),
task->tv.tv_sec);
task->task_timestamp);

if (key != NULL) {
cbd->key = rspamd_dkim_key_ref (key);

+ 3
- 3
src/plugins/spf.c View File

@@ -561,7 +561,7 @@ spf_plugin_callback (struct spf_resolved *record, struct rspamd_task *task,
spf_record_ref (record);

if ((l = rspamd_lru_hash_lookup (spf_module_ctx->spf_hash,
record->domain, task->tv.tv_sec)) == NULL) {
record->domain, task->task_timestamp)) == NULL) {
l = record;

if (record->ttl > 0 &&
@@ -571,7 +571,7 @@ spf_plugin_callback (struct spf_resolved *record, struct rspamd_task *task,

rspamd_lru_hash_insert (spf_module_ctx->spf_hash,
record->domain, spf_record_ref (l),
task->tv.tv_sec, record->ttl);
task->task_timestamp, record->ttl);

msg_info_task ("stored record for %s (0x%xuL) in LRU cache for %d seconds, "
"%d/%d elements in the cache",
@@ -642,7 +642,7 @@ spf_symbol_callback (struct rspamd_task *task,
if (domain) {
if ((l =
rspamd_lru_hash_lookup (spf_module_ctx->spf_hash, domain,
task->tv.tv_sec)) != NULL) {
task->task_timestamp)) != NULL) {
spf_record_ref (l);
spf_check_list (l, task, TRUE);
spf_record_unref (l);

+ 1
- 1
src/worker.c View File

@@ -332,7 +332,7 @@ rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
NULL,
"application/json",
task,
&task->tv);
1.0);
}
}


Loading…
Cancel
Save