struct cache_savepoint {
guchar *processed_bits;
- guint processed_num;
guint pass;
- gint offset;
struct metric_result *rs;
gdouble lim;
GPtrArray *waitq;
return FALSE;
}
+static gboolean
+rspamd_symbols_cache_check_deps (struct symbols_cache *cache,
+ struct cache_item *item,
+ struct cache_savepoint *checkpoint)
+{
+ struct cache_dependency *dep;
+ guint i;
+
+ if (item->deps != NULL && item->deps->len > 0) {
+ for (i = 0; i < item->deps->len; i ++) {
+ dep = g_ptr_array_index (item->deps, i);
+
+ g_assert (dep->item != NULL);
+
+ if (!isset (checkpoint->processed_bits, dep->item->id)) {
+ return FALSE;
+ }
+ }
+ }
+
+ return TRUE;
+}
+
gboolean
rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
- struct symbols_cache * cache)
+ struct symbols_cache *cache)
{
double t1, t2;
guint64 diff;
struct cache_item *item = NULL;
struct cache_savepoint *checkpoint;
gint i;
+ guint pending_before, pending_after;
g_assert (cache != NULL);
checkpoint = rspamd_mempool_alloc0 (task->task_pool, sizeof (*checkpoint));
checkpoint->processed_bits = rspamd_mempool_alloc (task->task_pool,
NBYTES (cache->used_items));
- /* Inverse to use ffs */
memset (checkpoint->processed_bits, 0x0, NBYTES (cache->used_items));
checkpoint->waitq = g_ptr_array_new ();
rspamd_mempool_add_destructor (task->task_pool,
checkpoint = task->checkpoint;
}
- if (checkpoint->processed_num >= cache->used_items) {
- /* All symbols are processed */
- return TRUE;
- }
+ if (checkpoint->pass == 0) {
- for (i = checkpoint->offset * NBBY; i < (gint)cache->used_items; i ++) {
- if (rspamd_symbols_cache_metric_limit (task, checkpoint)) {
- msg_info ("<%s> has already scored more than %.2f, so do not "
- "plan any more checks", task->message_id,
- checkpoint->rs->score);
- return TRUE;
- }
+ /*
+ * On the first pass we check symbols that do not have dependencies
+ * If we figure out symbol that has no dependencies satisfied, then
+ * we just save it for another pass
+ */
+ for (i = 0; i < (gint)cache->used_items; i ++) {
+ if (rspamd_symbols_cache_metric_limit (task, checkpoint)) {
+ msg_info ("<%s> has already scored more than %.2f, so do not "
+ "plan any more checks", task->message_id,
+ checkpoint->rs->score);
+ return TRUE;
+ }
- item = g_ptr_array_index (cache->items_by_order, i);
- if (!isset (checkpoint->processed_bits, i)) {
- if (item->type == SYMBOL_TYPE_NORMAL || item->type == SYMBOL_TYPE_CALLBACK) {
- g_assert (item->func != NULL);
- t1 = rspamd_get_ticks ();
-
- if (item->symbol != NULL &&
- G_UNLIKELY (check_debug_symbol (task->cfg, item->symbol))) {
- rspamd_log_debug (rspamd_main->logger);
- item->func (task, item->user_data);
- rspamd_log_nodebug (rspamd_main->logger);
- }
- else {
- item->func (task, item->user_data);
+ item = g_ptr_array_index (cache->items_by_order, i);
+ if (!isset (checkpoint->processed_bits, i)) {
+ if (item->type == SYMBOL_TYPE_NORMAL || item->type == SYMBOL_TYPE_CALLBACK) {
+
+ if (!rspamd_symbols_cache_check_deps (cache, item,
+ checkpoint)) {
+ g_ptr_array_add (checkpoint->waitq, item);
+ continue;
+ }
+
+ g_assert (item->func != NULL);
+ t1 = rspamd_get_ticks ();
+
+ pending_before = rspamd_session_events_pending (task->s);
+ if (item->symbol != NULL &&
+ G_UNLIKELY (check_debug_symbol (task->cfg, item->symbol))) {
+ rspamd_log_debug (rspamd_main->logger);
+ item->func (task, item->user_data);
+ rspamd_log_nodebug (rspamd_main->logger);
+ }
+ else {
+ item->func (task, item->user_data);
+ }
+
+ t2 = rspamd_get_ticks ();
+ pending_after = rspamd_session_events_pending (task->s);
+
+ diff = (t2 - t1) * 1000000;
+ rspamd_set_counter (item, diff);
+
+ if (pending_before == pending_after) {
+ /* No new events registered */
+ setbit (checkpoint->processed_bits, i);
+ }
}
-
- t2 = rspamd_get_ticks ();
-
- diff = (t2 - t1) * 1000000;
- rspamd_set_counter (item, diff);
}
-
- setbit (checkpoint->processed_bits, i);
- checkpoint->processed_num ++;
}
}