Fix lua redis library.
return NULL;
}
+static void
+fin_learn_task (void *arg)
+{
+ struct worker_task *task = (struct worker_task *)arg;
+
+ /* XXX: this is bad logic in fact */
+ /* Process all statfiles */
+ process_statfiles (task);
+ /* Call post filters */
+ lua_call_post_filters (task);
+ task->state = WRITE_REPLY;
+
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_dispatcher_restore (task->dispatcher);
+ }
+}
+
static gboolean
controller_read_socket (f_str_t * in, void *arg)
{
r = process_message (task);
if (r == -1) {
msg_warn ("processing of message failed");
- destroy_session (task->s);
session->state = STATE_REPLY;
r = rspamd_snprintf (out_buf, sizeof (out_buf), "cannot process message" CRLF);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
}
return FALSE;
}
-
+ /* Set up async session */
+ task->s = new_async_session (task->task_pool, fin_learn_task, free_task_hard, task);
r = process_filters (task);
if (r == -1) {
session->state = STATE_REPLY;
return FALSE;
}
}
- else if (r == 0) {
+ else {
session->state = STATE_LEARN_SPAM;
task->dispatcher = session->dispatcher;
session->learn_task = task;
rspamd_dispatcher_pause (session->dispatcher);
}
- else {
- lua_call_post_filters (task);
- session->state = STATE_REPLY;
-
- if (! learn_task_spam (session->learn_classifier, task, session->in_class, &err)) {
- if (err) {
- i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn failed, learn classifier error: %s" CRLF END, err->message);
- g_error_free (err);
- }
- else {
- i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn failed, unknown learn classifier error" CRLF END);
- }
- }
- else {
- i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF END);
- }
-
- destroy_session (task->s);
- if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
- return FALSE;
- }
- }
break;
case STATE_WEIGHTS:
session->learn_buf = in;
io_tv->tv_sec = ctx->timeout / 1000;
io_tv->tv_usec = ctx->timeout - io_tv->tv_sec * 1000;
- new_session->s = new_async_session (new_session->session_pool, free_session, new_session);
+ new_session->s = new_async_session (new_session->session_pool, NULL, free_session, new_session);
new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket,
controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
}
upstream_ok (&rep->request->server->up, rep->request->time);
rep->request->func (rep, rep->request->arg);
+ remove_normal_event (req->session, dns_fin_cb, req);
}
}
}
rep->request = req;
rep->code = DNS_RC_SERVFAIL;
upstream_fail (&rep->request->server->up, rep->request->time);
- remove_normal_event (req->session, dns_fin_cb, req);
dns_check_throttling (req->resolver);
req->resolver->errors ++;
req->func (rep, req->arg);
+ remove_normal_event (req->session, dns_fin_cb, req);
return;
}
rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
rep->request = req;
rep->code = DNS_RC_SERVFAIL;
- remove_normal_event (req->session, dns_fin_cb, req);
+
req->func (rep, req->arg);
+ remove_normal_event (req->session, dns_fin_cb, req);
return;
}
rep->request = req;
rep->code = DNS_RC_SERVFAIL;
upstream_fail (&rep->request->server->up, rep->request->time);
- remove_normal_event (req->session, dns_fin_cb, req);
+
req->func (rep, req->arg);
+ remove_normal_event (req->session, dns_fin_cb, req);
return;
}
rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
rep->request = req;
rep->code = DNS_RC_SERVFAIL;
- remove_normal_event (req->session, dns_fin_cb, req);
upstream_fail (&rep->request->server->up, rep->request->time);
req->func (rep, req->arg);
+ remove_normal_event (req->session, dns_fin_cb, req);
return;
}
evtimer_add (&req->timer_event, &req->tv);
struct rspamd_dns_reply *rep;
gint r;
+ remove_normal_event (req->session, (event_finalizer_t)event_del, &req->io_event);
+
if (what == EV_WRITE) {
/* Retransmit dns request */
req->retransmits ++;
#include "main.h"
#include "events.h"
+#undef RSPAMD_EVENTS_DEBUG
+
+static gboolean
+rspamd_event_equal (gconstpointer a, gconstpointer b)
+{
+ const struct rspamd_async_event *ev1 = a, *ev2 = b;
+
+ if (ev1->fin == ev2->fin) {
+ return ev1->user_data == ev2->user_data;
+ }
+
+ return FALSE;
+}
+
+static guint
+rspamd_event_hash (gconstpointer a)
+{
+ const struct rspamd_async_event *ev = a;
+ guint h = 0, i;
+ gchar *p;
+
+ p = (gchar *)ev->user_data;
+ for (i = 0; i < sizeof (gpointer); i ++) {
+ h ^= *p;
+ h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24);
+ p ++;
+ }
+
+ return h;
+}
struct rspamd_async_session *
-new_async_session (memory_pool_t * pool, event_finalizer_t fin, void *user_data)
+new_async_session (memory_pool_t * pool, event_finalizer_t fin, event_finalizer_t cleanup, void *user_data)
{
struct rspamd_async_session *new;
new = memory_pool_alloc (pool, sizeof (struct rspamd_async_session));
new->pool = pool;
new->fin = fin;
+ new->cleanup = cleanup;
new->user_data = user_data;
new->wanna_die = FALSE;
- new->events = g_queue_new ();
+ new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
+ new->forced_events = g_queue_new ();
- memory_pool_add_destructor (pool, (pool_destruct_func) g_queue_free, new->events);
+ memory_pool_add_destructor (pool, (pool_destruct_func) g_hash_table_destroy, new->events);
+ memory_pool_add_destructor (pool, (pool_destruct_func) g_queue_free, new->forced_events);
return new;
}
if (forced) {
/* For forced events try first to increase its reference */
- cur = session->events->head;
+ cur = session->forced_events->head;
while (cur) {
ev = cur->data;
if (ev->forced && ev->fin == fin) {
new->user_data = user_data;
new->forced = forced;
new->ref = 1;
- g_queue_push_head (session->events, new);
+ g_hash_table_insert (session->events, new, new);
+#ifdef RSPAMD_EVENTS_DEBUG
+ msg_info ("added event: %p, pending %d events", user_data, g_hash_table_size (session->events));
+#endif
}
void
return;
}
- cur = session->events->head;
+ cur = session->forced_events->head;
while (cur) {
ev = cur->data;
if (ev->forced && ev->fin == fin) {
ev->ref--;
if (ev->ref == 0) {
- g_queue_delete_link (session->events, cur);
+ g_queue_delete_link (session->forced_events, cur);
}
break;
}
cur = g_list_next (cur);
}
- if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->events) == 0) {
+ check_session_pending (session);
+
+ if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->forced_events) == 0) {
/* Call session destroy after all forced events are ready */
- session->fin (session->user_data);
+ session->cleanup (session->user_data);
}
}
void
remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud)
{
- struct rspamd_async_event *ev;
- GList *cur;
+ struct rspamd_async_event search_ev;
if (session == NULL) {
msg_info ("session is NULL");
return;
}
- cur = session->events->head;
- while (cur) {
- ev = cur->data;
- if (ev->fin == fin && ev->user_data == ud && !ev->forced) {
- g_queue_delete_link (session->events, cur);
- if (ev->fin) {
- ev->fin (ev->user_data);
- }
- break;
- }
- cur = g_list_next (cur);
+ search_ev.fin = fin;
+ search_ev.user_data = ud;
+ if (g_hash_table_remove (session->events, &search_ev)) {
+ fin (ud);
+ }
+
+#ifdef RSPAMD_EVENTS_DEBUG
+ msg_info ("removed event: %p, pending %d events", ud, g_hash_table_size (session->events));
+#endif
+
+ check_session_pending (session);
+}
+
+static void
+rspamd_session_destroy (gpointer k, gpointer v, gpointer unused)
+{
+ struct rspamd_async_event *ev = v;
+
+ /* Call event's finalizer */
+ if (ev->fin != NULL) {
+ ev->fin (ev->user_data);
}
}
gboolean
destroy_session (struct rspamd_async_session *session)
{
- struct rspamd_async_event *ev;
- GList *cur, *tmp;
-
if (session == NULL) {
msg_info ("session is NULL");
return FALSE;
session->wanna_die = TRUE;
- cur = session->events->head;
+ g_hash_table_foreach (session->events, rspamd_session_destroy, session);
- while (cur) {
- ev = cur->data;
- if (!ev->forced) {
- if (ev->fin != NULL) {
- ev->fin (ev->user_data);
- }
- tmp = cur;
- cur = g_list_next (cur);
- g_queue_delete_link (session->events, tmp);
- }
- else {
- /* Do nothing with forced callbacks */
- cur = g_list_next (cur);
+ if (g_queue_get_length (session->forced_events) == 0) {
+ if (session->cleanup != NULL) {
+ session->cleanup (session->user_data);
}
+ return TRUE;
}
- if (g_queue_get_length (session->events) == 0) {
+ return FALSE;
+}
+
+gboolean
+check_session_pending (struct rspamd_async_session *session)
+{
+ if (g_queue_get_length (session->forced_events) == 0 && g_hash_table_size (session->events) == 0) {
if (session->fin != NULL) {
session->fin (session->user_data);
}
- return TRUE;
+ /* No more events */
+ return FALSE;
}
- return FALSE;
+ return TRUE;
}
struct rspamd_async_session {
event_finalizer_t fin;
- GQueue *events;
+ event_finalizer_t cleanup;
+ GHashTable *events;
+ GQueue *forced_events;
void *user_data;
memory_pool_t *pool;
gboolean wanna_die;
};
-/* Makes new async session */
-struct rspamd_async_session *new_async_session (memory_pool_t *pool, event_finalizer_t fin, void *user_data);
-/* Insert event into session */
-void register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced);
-/* Must be called by forced events to call session destructor properly */
+/**
+ * Make new async session
+ * @param pool pool to alloc memory from
+ * @param fin a callback called when no events are found in session
+ * @param cleanup a callback called when session is forcefully destroyed
+ * @param user_data abstract user data
+ * @return
+ */
+struct rspamd_async_session *new_async_session (memory_pool_t *pool,
+ event_finalizer_t fin, event_finalizer_t cleanup, void *user_data);
+
+/**
+ * Insert new event to the session
+ * @param session session object
+ * @param fin finalizer callback
+ * @param user_data abstract user_data
+ * @param forced session cannot be destroyed until forced event are still in it
+ */
+void register_async_event (struct rspamd_async_session *session,
+ event_finalizer_t fin, void *user_data, gboolean forced);
+
+
+/**
+ * Remove forced event
+ * @param session session object
+ * @param fin destructor function
+ */
void remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin);
+
+/**
+ * Remove normal event
+ * @param session session object
+ * @param fin final callback
+ * @param ud user data object
+ */
void remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud);
/**
*/
gboolean destroy_session (struct rspamd_async_session *session);
+/**
+ * Check session for events pending and call fin callback if no events are pending
+ * @param session session object
+ * @return TRUE if session has pending events
+ */
+gboolean check_session_pending (struct rspamd_async_session *session);
+
#endif /* RSPAMD_EVENTS_H */
return FALSE;
}
-static gint
-continue_process_filters (struct worker_task *task)
-{
- GList *cur;
- gpointer item = task->save.item;
- struct metric *metric;
-
- while (call_symbol_callback (task, task->cfg->cache, &item)) {
- cur = task->cfg->metrics_list;
- while (cur) {
- metric = cur->data;
- /* call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_HEADER); */
- if (task->save.saved) {
- task->save.entry = cur;
- task->save.item = item;
- return 0;
- }
- else if (!task->pass_all_filters &&
- metric->action == METRIC_ACTION_REJECT &&
- check_metric_is_spam (task, metric)) {
- goto end;
- }
- cur = g_list_next (cur);
- }
- }
-
-end:
- /* Process all statfiles */
- process_statfiles (task);
- /* Call post filters */
- lua_call_post_filters (task);
- task->state = WRITE_REPLY;
-
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_dispatcher_restore (task->dispatcher);
- }
- return 1;
-}
-
gint
process_filters (struct worker_task *task)
{
struct metric *metric;
gpointer item = NULL;
- if (task->save.saved) {
- task->save.saved = 0;
- return continue_process_filters (task);
- }
/* Check skip */
if (check_skip (task->cfg->views, task)) {
task->is_skipped = TRUE;
cur = task->cfg->metrics_list;
while (cur) {
metric = cur->data;
- if (task->save.saved) {
- task->save.entry = cur;
- task->save.item = item;
- return 0;
- }
- else if (!task->pass_all_filters &&
+ if (!task->pass_all_filters &&
metric->action == METRIC_ACTION_REJECT &&
check_metric_is_spam (task, metric)) {
- task->state = WRITE_REPLY;
+ check_session_pending (task->s);
return 1;
}
cur = g_list_next (cur);
ud->parser_state = 3;
remove_normal_event (ud->task->s, lua_http_fin, ud);
-
- ud->task->save.saved--;
- if (ud->task->save.saved == 0) {
- /* Call other filters */
- ud->task->save.saved = 1;
- process_filters (ud->task);
- }
}
static void
}
remove_normal_event (ud->task->s, lua_http_fin, ud);
- ud->task->save.saved--;
- if (ud->task->save.saved == 0) {
- /* Call other filters */
- ud->task->save.saved = 1;
- process_filters (ud->task);
- }
}
/*
if (make_dns_request (task->resolver, task->s, task->task_pool, lua_http_dns_callback, ud,
DNS_REQUEST_A, hostname)) {
task->dns_requests ++;
- task->save.saved++;
}
return 0;
struct lua_redis_userdata *ud = arg;
if (ud->ctx) {
- msg_info ("hui");
redisAsyncFree (ud->ctx);
luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);
- /*
- ud->task->save.saved--;
- if (ud->task->save.saved == 0) {
- ud->task->save.saved = 1;
- process_filters (ud->task);
- }*/
}
}
*ptask = ud->task;
/* String of error */
lua_pushstring (ud->L, err);
- /* Data */
+ /* Data is nil */
lua_pushnil (ud->L);
if (lua_pcall (ud->L, 3, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
lua_setclass (ud->L, "rspamd{task}", -1);
*ptask = ud->task;
- /* String of error */
- lua_pushstring (ud->L, ud->ctx->errstr);
+ /* Error is nil */
+ lua_pushnil (ud->L);
/* Data */
lua_pushlstring (ud->L, r->str, r->len);
redisReply *reply = r;
struct lua_redis_userdata *ud = priv;
- msg_info ("in callback: err: %d, r: %p", c->err, r);
-
if (c->err == 0) {
if (r != NULL) {
lua_redis_push_data (reply, ud);
}
}
else {
- lua_redis_push_error (c->errstr, ud, TRUE);
+ if (c->err == REDIS_ERR_IO) {
+ lua_redis_push_error (strerror (errno), ud, TRUE);
+ }
+ else {
+ lua_redis_push_error (c->errstr, ud, TRUE);
+ }
}
}
/**
}
else {
register_async_event (ud->task->s, lua_redis_fin, ud, FALSE);
- ud->task->save.saved ++;
}
redisLibeventAttach (ud->ctx, ud->task->ev_base);
/* Make a request now */
/* Now get remaining args */
ud->args_num = lua_gettop (L) - 5;
ud->args = memory_pool_alloc (task->task_pool, ud->args_num * sizeof (f_str_t));
- for (i = 0; i < ud->args_num - 1; i ++) {
+ for (i = 0; i < ud->args_num; i ++) {
tmp = lua_tolstring (L, i + 6, &ud->args[i].len);
/* Make a copy of argument */
ud->args[i].begin = memory_pool_alloc (task->task_pool, ud->args[i].len);
if (lua_pcall (cd->L, 5, 0, 0) != 0) {
msg_info ("call to %s failed: %s", cd->callback, lua_tostring (cd->L, -1));
}
-
- cd->task->save.saved--;
- if (cd->task->save.saved == 0) {
- /* Call other filters */
- cd->task->save.saved = 1;
- process_filters (cd->task);
- }
}
static gint
}
if (make_dns_request (task->resolver, task->s, task->task_pool, lua_dns_callback, (void *)cd, DNS_REQUEST_A, cd->to_resolve)) {
task->dns_requests ++;
- task->save.saved++;
}
}
return 0;
}
if (make_dns_request (task->resolver, task->s, task->task_pool, lua_dns_callback, (void *)cd, DNS_REQUEST_TXT, cd->to_resolve)) {
task->dns_requests ++;
- task->save.saved++;
}
}
return 0;
if (make_dns_request (task->resolver, task->s, task->task_pool,
lua_dns_callback, (void *)cd, DNS_REQUEST_PTR, ina)) {
task->dns_requests ++;
- task->save.saved++;
}
}
return 0;
gint number;
};
-/**
- * Save point object for delayed filters processing
- */
-struct save_point {
- GList *entry; /**< pointer to saved metric */
- void *item; /**< pointer to saved item */
- guint saved; /**< how much time we have delayed processing */
-};
-
/**
* Structure to point exception in text from processing
*/
GList *messages; /**< list of messages that would be reported */
GHashTable *re_cache; /**< cache for matched or not matched regexps */
struct config_file *cfg; /**< pointer to config object */
- struct save_point save; /**< save point for delayed processing */
gchar *last_error; /**< last error */
gint error_code; /**< code of last error */
memory_pool_t *task_pool; /**< memory pool for task */
msg_err ("got error on IO with server %s:%d, %d, %s", session->server->name, session->server->port, errno, strerror (errno));
ok:
remove_normal_event (session->task->s, fuzzy_io_fin, session);
-
- session->task->save.saved--;
- if (session->task->save.saved == 0) {
- /* Call other filters */
- session->task->save.saved = 1;
- process_filters (session->task);
- }
}
static void
}
ok:
remove_normal_event (session->session->s, fuzzy_learn_fin, session);
-
- (*session->saved)--;
- if (*session->saved == 0) {
- session->session->state = STATE_REPLY;
- session->session->dispatcher->write_callback (session->session);
- }
}
static inline void
session->server = selected;
event_add (&session->ev, &session->tv);
register_async_event (task->s, fuzzy_io_fin, session, FALSE);
- task->save.saved++;
}
}
}
}
spf_check_list (l, task);
}
-
- if (task->save.saved == 0) {
- /* Call other filters */
- task->save.saved = 1;
- /* Note that here task MAY be destroyed */
- process_filters (task);
- }
}
debug_task ("send surbl dns request %s", surbl_req);
if (make_dns_request (task->resolver, task->s, task->task_pool, dns_callback, (void *)param, DNS_REQUEST_A, surbl_req)) {
task->dns_requests ++;
- param->task->save.saved++;
}
}
else if (err != NULL && err->code != WHITELIST_ERROR) {
else {
debug_task ("<%s> domain [%s] is not in surbl %s", param->task->message_id, param->host_resolve, param->suffix->suffix);
}
-
- param->task->save.saved--;
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
}
static void
if (error != OK) {
msg_info ("memcached returned error %s on CONNECT stage", memc_strerror (error));
memc_close_ctx (param->ctx);
- param->task->save.saved--;
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
}
else {
memc_get (param->ctx, param->ctx->param);
if (error != OK) {
msg_info ("memcached returned error %s on READ stage", memc_strerror (error));
memc_close_ctx (param->ctx);
- param->task->save.saved--;
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
}
else {
url_count = (gint *)param->ctx->param->buf;
msg_info ("memcached returned error %s on WRITE stage", memc_strerror (error));
}
memc_close_ctx (param->ctx);
- param->task->save.saved--;
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
make_surbl_requests (param->url, param->task, param->suffix, FALSE);
break;
default:
msg_err ("write failed %s to %s", strerror (errno), param->redirector->name);
upstream_fail (¶m->redirector->up, param->task->tv.tv_sec);
remove_normal_event (param->task->s, free_redirector_session, param);
-
- param->task->save.saved--;
- make_surbl_requests (param->url, param->task, param->suffix, FALSE);
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
return;
}
param->state = STATE_READ;
upstream_fail (¶m->redirector->up, param->task->tv.tv_sec);
remove_normal_event (param->task->s, free_redirector_session, param);
- param->task->save.saved--;
- make_surbl_requests (param->url, param->task, param->suffix, FALSE);
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
return;
}
break;
if (r <= 0) {
msg_err ("read failed: %s from %s", strerror (errno), param->redirector->name);
upstream_fail (¶m->redirector->up, param->task->tv.tv_sec);
- remove_normal_event (param->task->s, free_redirector_session, param);
- param->task->save.saved--;
make_surbl_requests (param->url, param->task, param->suffix, FALSE);
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
+ remove_normal_event (param->task->s, free_redirector_session, param);
return;
}
}
upstream_ok (¶m->redirector->up, param->task->tv.tv_sec);
remove_normal_event (param->task->s, free_redirector_session, param);
-
- param->task->save.saved--;
- make_surbl_requests (param->url, param->task, param->suffix, FALSE);
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
-
}
else {
msg_info ("<%s> reading redirector %s timed out, while waiting for read",
param->redirector->name, param->task->message_id);
upstream_fail (¶m->redirector->up, param->task->tv.tv_sec);
remove_normal_event (param->task->s, free_redirector_session, param);
-
- param->task->save.saved--;
- make_surbl_requests (param->url, param->task, param->suffix, FALSE);
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
}
break;
}
if (s == -1) {
msg_info ("<%s> cannot create tcp socket failed: %s", task->message_id, strerror (errno));
- task->save.saved--;
make_surbl_requests (url, task, suffix, FALSE);
return;
}
insert_result (param->task, surbl_module_ctx->redirector_symbol, 1, g_list_prepend (NULL, red_domain));
}
register_redirector_call (url, param->task, param->suffix, red_domain);
- param->task->save.saved++;
return FALSE;
}
}
else {
if (param->task->worker->srv->cfg->memcached_servers_num > 0) {
register_memcached_call (url, param->task, param->suffix);
- param->task->save.saved++;
}
else {
make_surbl_requests (url, param->task, param->suffix, FALSE);
/* Resolve client's addr */
/* Set up async session */
- session->s = new_async_session (session->pool, free_smtp_session, session);
+ session->s = new_async_session (session->pool, NULL, free_smtp_session, session);
session->state = SMTP_STATE_RESOLVE_REVERSE;
if (! make_dns_request (session->resolver, session->s, session->pool,
smtp_dns_cb, session, DNS_REQUEST_PTR, &session->client_addr)) {
/* Now resolve A record for this MX */
if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_A, elt_data->mx.name)) {
task->dns_requests ++;
- task->save.saved++;
}
}
else if (reply->type == DNS_REQUEST_A) {
break;
}
}
-
- cb->rec->task->save.saved--;
- if (cb->rec->task->save.saved == 0 && cb->rec->callback) {
- cb->rec->callback (cb->rec, cb->rec->task);
- }
}
static gboolean
cb->in_include = rec->in_include;
if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_A, host)) {
task->dns_requests ++;
- task->save.saved++;
return TRUE;
}
cb->in_include = rec->in_include;
if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_MX, host)) {
task->dns_requests ++;
- task->save.saved++;
return TRUE;
}
domain = memory_pool_strdup (task->task_pool, begin);
if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_TXT, domain)) {
task->dns_requests ++;
- task->save.saved++;
-
+
return TRUE;
}
domain = memory_pool_strdup (task->task_pool, begin);
if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_TXT, domain)) {
task->dns_requests ++;
- task->save.saved++;
return TRUE;
}
if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_A, host)) {
task->dns_requests ++;
- task->save.saved++;
return TRUE;
}
cur = g_list_next (cur);
}
}
-
- rec->task->save.saved--;
- if (rec->task->save.saved == 0 && rec->callback) {
- rec->callback (rec, rec->task);
- }
}
gchar *
if (make_dns_request (task->resolver, task->s, task->task_pool, spf_dns_callback, (void *)rec, DNS_REQUEST_TXT, rec->cur_domain)) {
task->dns_requests ++;
- task->save.saved++;
return TRUE;
}
}
rec->sender_domain = rec->cur_domain;
if (make_dns_request (task->resolver, task->s, task->task_pool, spf_dns_callback, (void *)rec, DNS_REQUEST_TXT, rec->cur_domain)) {
task->dns_requests ++;
- task->save.saved++;
return TRUE;
}
}
memory_pool_add_destructor (new_task->task_pool,
(pool_destruct_func) g_tree_destroy,
new_task->urls);
- new_task->s =
- new_async_session (new_task->task_pool, free_task_hard, new_task);
new_task->sock = -1;
new_task->is_mime = TRUE;
task->state = WRITE_ERROR;
return write_socket (task);
}
- else if (r == 0) {
- task->state = WAIT_FILTER;
- rspamd_dispatcher_pause (task->dispatcher);
- }
- else {
- process_statfiles (task);
- lua_call_post_filters (task);
- task->state = WRITE_REPLY;
- return write_socket (task);
- }
}
break;
case WRITE_REPLY:
destroy_session (task->s);
}
+/*
+ * Called if all filters are processed
+ */
+static void
+fin_task (void *arg)
+{
+ struct worker_task *task = (struct worker_task *) arg;
+
+ /* Process all statfiles */
+ process_statfiles (task);
+ /* Call post filters */
+ lua_call_post_filters (task);
+ task->state = WRITE_REPLY;
+
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_dispatcher_restore (task->dispatcher);
+ }
+}
+
/*
* Reduce number of tasks proceeded
*/
ctx->tasks ++;
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks);
+ /* Set up async session */
+ new_task->s =
+ new_async_session (new_task->task_pool, fin_task, free_task_hard, new_task);
+
/* Init custom filters */
#ifndef BUILD_STATIC
if (ctx->is_custom) {