src/controller.c
src/cfg_utils.c
src/buffer.c
+ src/events.c
src/html.c
src/lmtp.c
src/lmtp_proto.c
#define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
-static void
-write_buffers (int fd, rspamd_io_dispatcher_t *d)
+static gboolean
+write_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean is_delayed)
{
GList *cur;
GError *err;
rspamd_buffer_t *buf;
- struct timeval *ntv;
ssize_t r;
/* Fix order */
if (d->err_callback) {
err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
d->err_callback (err, d->user_data);
- return;
+ return FALSE;
}
}
else if (r > 0) {
if (d->err_callback) {
err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
d->err_callback (err, d->user_data);
- return;
+ return FALSE;
}
}
else if (r == -1 && errno == EAGAIN) {
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
- ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
- memcpy (ntv, d->tv, sizeof (struct timeval));
- event_add (d->ev, ntv);
- return;
+ event_add (d->ev, d->tv);
+ return TRUE;
}
cur = g_list_next (cur);
}
msg_debug ("write_buffers: all buffers were written successfully");
- if (d->write_callback) {
- d->write_callback (d->user_data);
- if (d->wanna_die) {
+ if (is_delayed && d->write_callback) {
+ if (!d->write_callback (d->user_data)) {
msg_debug ("write_buffers: callback set wanna_die flag, terminating");
- rspamd_remove_dispatcher (d);
- return;
+ return FALSE;
}
}
event_del (d->ev);
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
- ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
- memcpy (ntv, d->tv, sizeof (struct timeval));
- event_add (d->ev, ntv);
+ event_add (d->ev, d->tv);
}
else {
/* Plan other write event */
event_del (d->ev);
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
- ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
- memcpy (ntv, d->tv, sizeof (struct timeval));
- event_add (d->ev, ntv);
+ event_add (d->ev, d->tv);
}
+
+ return TRUE;
}
static void
char **pos;
size_t *len;
enum io_policy saved_policy;
-
+
+ if (d->wanna_die) {
+ rspamd_remove_dispatcher (d);
+ return;
+ }
if (d->in_buf == NULL) {
d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
res.len --;
}
if (d->read_callback) {
- d->read_callback (&res, d->user_data);
- if (d->wanna_die) {
- msg_debug ("read_buffers: callback set wanna_die flag, terminating");
- rspamd_remove_dispatcher (d);
+ if (!d->read_callback (&res, d->user_data)) {
return;
}
/* Move remaining string to begin of buffer (draining) */
res.len = r;
c = b + r;
if (d->read_callback) {
- d->read_callback (&res, d->user_data);
+ if (!d->read_callback (&res, d->user_data)) {
+ return;
+ }
/* Move remaining string to begin of buffer (draining) */
memmove (d->in_buf->data->begin, c, *len - r);
b = d->in_buf->data->begin;
{
rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *)arg;
GError *err;
- struct timeval *ntv;
msg_debug ("dispatcher_cb: in dispatcher callback, what: %d, fd: %d", (int)what, fd);
if (d->out_buffers == NULL) {
event_del (d->ev);
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
- ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
- memcpy (ntv, d->tv, sizeof (struct timeval));
- event_add (d->ev, ntv);
+ event_add (d->ev, d->tv);
}
else {
- write_buffers (fd, d);
+ /* Delayed write */
+ write_buffers (fd, d, TRUE);
}
break;
case EV_READ:
struct timeval *tv, void *user_data)
{
rspamd_io_dispatcher_t *new;
- struct timeval *ntv;
if (fd == -1) {
return NULL;
new->fd = fd;
event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
- ntv = memory_pool_alloc (new->pool, sizeof (struct timeval));
- memcpy (ntv, new->tv, sizeof (struct timeval));
- event_add (new->ev, ntv);
+ event_add (new->ev, new->tv);
return new;
}
msg_debug ("rspamd_set_dispatcher_policy: new input length watermark is %ld", (long int)d->nchars);
}
-void
+gboolean
rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
void *data,
size_t len, gboolean delayed, gboolean allocated)
{
rspamd_buffer_t *newbuf;
- struct timeval *ntv;
newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
if (!allocated) {
if (!delayed) {
msg_debug ("rspamd_dispatcher_write: plan write event");
- event_del (d->ev);
- event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
- ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
- memcpy (ntv, d->tv, sizeof (struct timeval));
- event_add (d->ev, ntv);
+ return write_buffers (d->fd, d, FALSE);
}
+ return TRUE;
}
void
#include "mem_pool.h"
#include "fstring.h"
-typedef void (*dispatcher_read_callback_t)(f_str_t *in, void *user_data);
-typedef void (*dispatcher_write_callback_t)(void *user_data);
+typedef gboolean (*dispatcher_read_callback_t)(f_str_t *in, void *user_data);
+typedef gboolean (*dispatcher_write_callback_t)(void *user_data);
typedef void (*dispatcher_err_callback_t)(GError *err, void *user_data);
/**
* @param data data to write
* @param len length of data
*/
-void rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
+gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
void *data,
size_t len, gboolean delayed, gboolean allocated);
static char greetingbuf[1024];
extern rspamd_hash_t *counters;
+static gboolean controller_write_socket (void *arg);
+
static
void sig_handler (int signo)
{
}
static void
-free_session (struct controller_session *session, gboolean is_soft)
+free_session (void *ud)
{
GList *part;
struct mime_part *p;
+ struct controller_session *session = ud;
- msg_debug ("free_session: freeing session %p", session);
+ msg_info ("free_session: freeing session %p", session);
while ((part = g_list_first (session->parts))) {
session->parts = g_list_remove_link (session->parts, part);
g_byte_array_free (p->content, FALSE);
g_list_free_1 (part);
}
- if (is_soft) {
- /* Plan dispatcher shutdown */
- session->dispatcher->wanna_die = 1;
- }
- else {
- rspamd_remove_dispatcher (session->dispatcher);
- }
+ rspamd_remove_dispatcher (session->dispatcher);
close (session->sock);
return FALSE;
}
-static void
+static gboolean
controller_read_socket (f_str_t *in, void *arg)
{
struct controller_session *session = (struct controller_session *)arg;
if (!process_custom_command (cmd, ¶ms[1], session)) {
msg_debug ("Unknown command: '%s'", cmd);
i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
- rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
+ if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
+ return FALSE;
+ }
}
break;
default:
msg_debug ("Ambigious command: '%s'", cmd);
i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF);
- rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
+ if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
+ return FALSE;
+ }
break;
}
}
session->state = STATE_REPLY;
}
if (session->state != STATE_LEARN && session->state != STATE_OTHER) {
- rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE, TRUE);
+ if (!rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE, TRUE)) {
+ return FALSE;
+ }
}
g_strfreev (params);
if (!session->learn_classifier->tokenizer->tokenize_func (session->learn_classifier->tokenizer,
session->session_pool, &c, &tokens)) {
i = snprintf (out_buf, sizeof (out_buf), "learn fail, tokenizer error" CRLF);
- rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
+ if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
+ return FALSE;
+ }
session->state = STATE_REPLY;
- return;
+ return TRUE;
}
}
cls_ctx = session->learn_classifier->classifier->init_func (session->session_pool, session->learn_classifier);
session->learn_classifier->classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool,
session->learn_symbol, tokens, session->in_class);
session->worker->srv->stat->messages_learned ++;
- i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
- rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
/* Clean learned parts */
while ((cur = g_list_first (session->parts))) {
g_list_free_1 (cur);
}
+ i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
+ if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
+ return FALSE;
+ }
+
session->state = STATE_REPLY;
break;
case STATE_OTHER:
msg_debug ("controller_read_socket: unknown state while reading %d", session->state);
break;
}
+
+ if (session->state == STATE_REPLY || session->state == STATE_QUIT) {
+ (void)controller_write_socket (session);
+ }
+
+ return TRUE;
}
-static void
+static gboolean
controller_write_socket (void *arg)
{
struct controller_session *session = (struct controller_session *)arg;
if (session->state == STATE_QUIT) {
/* Free buffers */
- free_session (session, TRUE);
- return;
+ destroy_session (session->s);
+ return FALSE;
}
else if (session->state == STATE_REPLY) {
session->state = STATE_COMMAND;
rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_LINE, BUFSIZ);
}
+ return TRUE;
}
static void
if (err->code != EOF) {
msg_info ("controller_err_socket: abnormally closing control connection, error: %s", err->message);
}
+
/* Free buffers */
- free_session (session, FALSE);
+ destroy_session (session->s);
}
static void
io_tv->tv_sec = CONTROLLER_IO_TIMEOUT;
io_tv->tv_usec = 0;
+ new_session->s = new_async_session (new_session->session_pool, free_session, new_session);
+
new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, controller_read_socket,
controller_write_socket, controller_err_socket, io_tv,
(void *)new_session);
--- /dev/null
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "main.h"
+#include "events.h"
+
+
+struct rspamd_async_session*
+new_async_session (memory_pool_t *pool, event_finalizer_t fin, void *user_data)
+{
+ struct rspamd_async_session *new;
+
+ new = memory_pool_alloc (pool, sizeof (struct rspamd_async_session));
+ new->pool = pool;
+ new->fin = fin;
+ new->user_data = user_data;
+ new->wanna_die = FALSE;
+ new->events = g_queue_new ();
+
+ memory_pool_add_destructor (pool, (pool_destruct_func)g_queue_free, new->events);
+
+ return new;
+}
+
+void
+register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced)
+{
+ struct rspamd_async_event *new, *ev;
+ GList *cur;
+
+ if (session == NULL) {
+ msg_info ("register_async_event: session is NULL");
+ return;
+ }
+
+ if (forced) {
+ /* For forced events try first to increase its reference */
+ cur = session->events->head;
+ while (cur) {
+ ev = cur->data;
+ if (ev->forced && ev->fin == fin) {
+ ev->ref ++;
+ return;
+ }
+ cur = g_list_next (cur);
+ }
+ }
+
+ new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event));
+ new->fin = fin;
+ new->user_data = user_data;
+ new->forced = forced;
+ new->ref = 1;
+ g_queue_push_head (session->events, new);
+}
+
+void
+remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin)
+{
+ struct rspamd_async_event *ev;
+ GList *cur;
+
+ if (session == NULL) {
+ msg_info ("remove_forced_event: session is NULL");
+ return;
+ }
+
+ cur = session->events->head;
+ while (cur) {
+ ev = cur->data;
+ if (ev->forced && ev->fin == fin) {
+ ev->ref --;
+ if (ev->ref == 0) {
+ g_queue_delete_link (session->events, cur);
+ }
+ break;
+ }
+ cur = g_list_next (cur);
+ }
+
+ if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->events) == 0) {
+ /* Call session destroy after all forced events are ready */
+ session->fin (session->user_data);
+ }
+}
+
+void
+remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud)
+{
+ struct rspamd_async_event *ev;
+ GList *cur;
+
+ if (session == NULL) {
+ msg_info ("remove_forced_event: session is NULL");
+ return;
+ }
+
+ cur = session->events->head;
+ while (cur) {
+ ev = cur->data;
+ if (ev->fin == fin && ev->user_data == ud && !ev->forced) {
+ g_queue_delete_link (session->events, cur);
+ if (ev->fin) {
+ ev->fin (ev->user_data);
+ }
+ break;
+ }
+ cur = g_list_next (cur);
+ }
+}
+
+gboolean
+destroy_session (struct rspamd_async_session *session)
+{
+ struct rspamd_async_event *ev;
+ GList *cur, *tmp;
+
+ if (session == NULL) {
+ msg_info ("destroy_session: session is NULL");
+ return FALSE;
+ }
+
+ session->wanna_die = TRUE;
+
+ cur = session->events->head;
+
+ while (cur) {
+ ev = cur->data;
+ if (!ev->forced) {
+ if (ev->fin != NULL) {
+ ev->fin (ev->user_data);
+ }
+ tmp = cur;
+ cur = g_list_next (cur);
+ g_queue_delete_link (session->events, tmp);
+ }
+ else {
+ /* Do nothing with forced callbacks */
+ cur = g_list_next (cur);
+ }
+ }
+
+ if (g_queue_get_length (session->events) == 0) {
+ if (session->fin != NULL) {
+ session->fin (session->user_data);
+ }
+ return TRUE;
+ }
+
+ return FALSE;
+}
--- /dev/null
+#ifndef RSPAMD_EVENTS_H
+#define RSPAMD_EVENTS_H
+
+#include "config.h"
+
+struct rspamd_async_event;
+
+typedef void (*event_finalizer_t)(void *user_data);
+
+struct rspamd_async_event {
+ event_finalizer_t fin;
+ void *user_data;
+ gboolean forced;
+ guint ref;
+};
+
+struct rspamd_async_session {
+ event_finalizer_t fin;
+ GQueue *events;
+ void *user_data;
+ memory_pool_t *pool;
+ gboolean wanna_die;
+};
+
+/* Makes new async session */
+struct rspamd_async_session *new_async_session (memory_pool_t *pool, event_finalizer_t fin, void *user_data);
+/* Insert event into session */
+void register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced);
+/* Must be called by forced events to call session destructor properly */
+void remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin);
+void remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud);
+
+/**
+ * Must be called at the end of session, it calls fin functions for all non-forced callbacks
+ * @return true if the whole session was destroyed and false if there are forced events
+ */
+gboolean destroy_session (struct rspamd_async_session *session);
+
+#endif /* RSPAMD_EVENTS_H */
static char greetingbuf[1024];
static struct timeval io_tv;
-static void lmtp_write_socket (void *arg);
+static gboolean lmtp_write_socket (void *arg);
static
void sig_handler (int signo)
/*
* Callback that is called when there is data to read in buffer
*/
-static void
+static gboolean
lmtp_read_socket (f_str_t *in, void *arg)
{
struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
msg_debug ("lmtp_read_socket: invalid state while reading from socket %d", lmtp->task->state);
break;
}
+
+ return TRUE;
}
/*
* Callback for socket writing
*/
-static void
+static gboolean
lmtp_write_socket (void *arg)
{
struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
case CLOSING_CONNECTION:
msg_debug ("lmtp_write_socket: normally closing connection");
free_lmtp_task (lmtp, TRUE);
+ return FALSE;
break;
default:
msg_debug ("lmtp_write_socket: invalid state while writing to socket %d", lmtp->task->state);
break;
}
+
+ return TRUE;
}
/*
else {
out_lmtp_reply (cd->task, LMTP_FAILURE, "", "Delivery failure");
}
- cd->dispatcher->wanna_die = TRUE;
+ rspamd_remove_dispatcher (cd->dispatcher);
}
/*
* Callback that is called when there is data to read in buffer
*/
-static void
+static gboolean
mta_read_socket (f_str_t *in, void *arg)
{
struct mta_callback_data *cd = (struct mta_callback_data *)arg;
if (fstrstr (in, &contres1) != -1 || fstrstr (in, &contres2) != -1) {
/* Skip such lines */
- return;
+ return TRUE;
}
switch (cd->state) {
if (!parse_mta_str (in, cd)) {
msg_warn ("mta_read_socket: got bad greeting");
close_mta_connection (cd, FALSE);
- return;
+ return FALSE;
}
hostmax = sysconf (_SC_HOST_NAME_MAX) + 1;
hostbuf = alloca (hostmax);
if (!parse_mta_str (in, cd)) {
msg_warn ("mta_read_socket: got bad helo");
close_mta_connection (cd, FALSE);
- return;
+ return FALSE;
}
r = snprintf (outbuf, sizeof (outbuf), "MAIL FROM: <%s>" CRLF, cd->task->from);
rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE);
if (!parse_mta_str (in, cd)) {
msg_warn ("mta_read_socket: got bad mail from");
close_mta_connection (cd, FALSE);
- return;
+ return FALSE;
}
cur = g_list_first (cd->task->rcpt);
r = 0;
if (!parse_mta_str (in, cd)) {
msg_warn ("mta_read_socket: got bad rcpt");
close_mta_connection (cd, FALSE);
- return;
+ return FALSE;
}
r = snprintf (outbuf, sizeof (outbuf), "DATA" CRLF);
rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE);
if (!parse_mta_str (in, cd)) {
msg_warn ("mta_read_socket: got bad data");
close_mta_connection (cd, FALSE);
- return;
+ return FALSE;
}
c = g_mime_object_to_string ((GMimeObject *)cd->task->message);
r = strlen (c);
if (!parse_mta_str (in, cd)) {
msg_warn ("mta_read_socket: message not delivered");
close_mta_connection (cd, FALSE);
- return;
+ return FALSE;
}
close_mta_connection (cd, TRUE);
break;
}
+
+ return TRUE;
}
/*
#include "filter.h"
#include "buffer.h"
#include "hash.h"
+#include "events.h"
#include "util.h"
/* Default values */
void (*other_handler)(struct controller_session *session,
f_str_t *in); /**< other command handler to execute at the end of processing */
void *other_data; /**< and its data */
+ struct rspamd_async_session* s; /**< async session object */
};
typedef void (*controller_func_t)(char **args, struct controller_session *session);
char *user; /**< user to deliver */
f_str_t *msg; /**< message buffer */
rspamd_io_dispatcher_t *dispatcher; /**< IO dispatcher object */
+ struct rspamd_async_session* s; /**< async session object */
memcached_ctx_t *memc_ctx; /**< memcached context associated with task */
int parts_count; /**< mime parts count */
GMimeMessage *message; /**< message, parsed with GMime */
return fuzzy_check_module_config (cfg);
}
+static void
+fuzzy_io_fin (void *ud)
+{
+ struct fuzzy_client_session *session = ud;
+
+ event_del (&session->ev);
+ session->task->save.saved --;
+ if (session->task->save.saved == 0) {
+ /* Call other filters */
+ session->task->save.saved = 1;
+ process_filters (session->task);
+ }
+}
+
static void
fuzzy_io_callback (int fd, short what, void *arg)
{
msg_err ("fuzzy_io_callback: got error on IO with server %s:%d, %d, %s", session->server->name, session->server->port,
errno, strerror (errno));
ok:
- event_del (&session->ev);
close (fd);
- session->task->save.saved --;
- if (session->task->save.saved == 0) {
- /* Call other filters */
- session->task->save.saved = 1;
- process_filters (session->task);
- }
+ remove_normal_event (session->task->s, fuzzy_io_fin, session);
}
static void
-fuzzy_free_session (void *arg)
+fuzzy_learn_fin (void *arg)
{
struct fuzzy_learn_session *session = arg;
event_del (&session->ev);
+ (*session->saved) --;
+ if (*session->saved == 0) {
+ session->session->state = STATE_REPLY;
+ rspamd_dispatcher_write (session->session->dispatcher, "OK" CRLF, sizeof ("OK" CRLF) - 1, FALSE, FALSE);
+ }
}
static void
struct fuzzy_learn_session *session = arg;
struct fuzzy_cmd cmd;
char buf[sizeof ("ERR" CRLF)];
- int r;
if (what == EV_WRITE) {
/* Send command to storage */
session->server->port, errno, strerror (errno));
ok:
close (fd);
- (*session->saved) --;
- if (*session->saved == 0) {
- session->session->state = WRITE_REPLY;
- r = snprintf (buf, sizeof (buf), "OK" CRLF);
- rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE);
- }
+ remove_normal_event (session->session->s, fuzzy_learn_fin, session);
}
static void
session->task = task;
session->server = selected;
event_add (&session->ev, &session->tv);
+ register_async_event (task->s, fuzzy_io_fin, session, FALSE);
task->save.saved ++;
}
}
session->state = STATE_WAIT;
task->msg = in;
- r = process_message (task);
saved = memory_pool_alloc0 (session->session_pool, sizeof (int));
+ r = process_message (task);
if (r == -1) {
msg_warn ("read_socket: processing of message failed");
- task->last_error = "MIME processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
free_task (task, FALSE);
- session->state = WRITE_REPLY;
+ session->state = STATE_REPLY;
+ r = snprintf (out_buf, sizeof (out_buf), "cannot process message" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
}
else {
if (selected) {
if ((sock = make_udp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
+ session->state = STATE_REPLY;
r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
- session->state = WRITE_REPLY;
free_task (task, FALSE);
return;
}
s->cmd = cmd;
s->saved = saved;
event_add (&s->ev, &s->tv);
- memory_pool_add_destructor (session->session_pool, fuzzy_free_session, s);
(*saved) ++;
+ register_async_event (session->s, fuzzy_learn_fin, s, FALSE);
}
}
else {
+ session->state = STATE_REPLY;
r = snprintf (out_buf, sizeof (out_buf), "cannot write fuzzy hash" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
free_task (task, FALSE);
- session->state = WRITE_REPLY;
return;
}
cur = g_list_next (cur);
free_task (task, FALSE);
if (*saved == 0) {
+ session->state = STATE_REPLY;
r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
- session->state = WRITE_REPLY;
}
}
msg_info ("fuzzy_controller_handler: empty content length");
r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
- session->state = WRITE_REPLY;
+ session->state = STATE_REPLY;
return;
}
if (err_str && *err_str != '\0') {
r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
- session->state = WRITE_REPLY;
+ session->state = STATE_REPLY;
return;
}
msg_debug ("surbl_test_url: send surbl dns request %s", surbl_req);
if (evdns_resolve_ipv4 (surbl_req, DNS_QUERY_NO_SEARCH, dns_callback, (void *)param) == 0) {
param->task->save.saved ++;
+ register_async_event (task->s, (event_finalizer_t)dns_callback, NULL, TRUE);
}
}
else {
param->task->save.saved = 1;
process_filters (param->task);
}
+ remove_forced_event (param->task->s, (event_finalizer_t)dns_callback);
}
memc_init_ctx (param->ctx);
}
+static void
+free_redirector_session (void *ud)
+{
+ struct redirector_param *param = (struct redirector_param *)ud;
+
+ event_del (¶m->ev);
+ close (param->sock);
+ param->task->save.saved --;
+ make_surbl_requests (param->url, param->task, param->tree, param->suffix);
+ if (param->task->save.saved == 0) {
+ /* Call other filters */
+ param->task->save.saved = 1;
+ process_filters (param->task);
+ }
+
+}
+
static void
redirector_callback (int fd, short what, void *arg)
{
r = snprintf (url_buf, sizeof (url_buf), "GET %s HTTP/1.0\r\n\r\n", struri (param->url));
if (write (param->sock, url_buf, r) == -1) {
msg_err ("redirector_callback: write failed %s", strerror (errno));
- event_del (¶m->ev);
- close (fd);
- param->task->save.saved --;
- make_surbl_requests (param->url, param->task, param->tree, param->suffix);
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
+ remove_normal_event (param->task->s, free_redirector_session, param);
return;
}
param->state = STATE_READ;
}
else {
- event_del (¶m->ev);
- close (fd);
msg_info ("redirector_callback: <%s> connection to redirector timed out while waiting for write",
param->task->message_id);
- param->task->save.saved --;
- make_surbl_requests (param->url, param->task, param->tree, param->suffix);
-
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
+ remove_normal_event (param->task->s, free_redirector_session, param);
+ return;
}
break;
case STATE_READ:
parse_uri (param->url, memory_pool_strdup (param->task->task_pool, c), param->task->task_pool);
}
}
- event_del (¶m->ev);
- close (fd);
- param->task->save.saved --;
- make_surbl_requests (param->url, param->task, param->tree, param->suffix);
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
+ remove_normal_event (param->task->s, free_redirector_session, param);
}
else {
- event_del (¶m->ev);
- close (fd);
msg_info ("redirector_callback: <%s> reading redirector timed out, while waiting for read",
param->task->message_id);
- param->task->save.saved --;
- make_surbl_requests (param->url, param->task, param->tree, param->suffix);
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
+ remove_normal_event (param->task->s, free_redirector_session, param);
}
break;
}
timeout->tv_usec = surbl_module_ctx->connect_timeout - timeout->tv_sec * 1000;
event_set (¶m->ev, s, EV_WRITE, redirector_callback, (void *)param);
event_add (¶m->ev, timeout);
+ register_async_event (task->s, free_redirector_session, param, FALSE);
}
static gboolean
static struct timeval io_tv;
-static void write_socket (void *arg);
+static gboolean write_socket (void *arg);
static
void sig_handler (int signo)
}
}
+static void
+free_task_hard (void *ud)
+{
+ struct worker_task *task = ud;
+
+ free_task (task, FALSE);
+}
+
/*
* Callback that is called when there is data to read in buffer
*/
-static void
+static gboolean
read_socket (f_str_t *in, void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
/* Skip filters */
task->state = WRITE_REPLY;
write_socket (task);
- return;
+ return TRUE;
}
r = process_filters (task);
if (r == -1) {
msg_debug ("read_socket: invalid state on reading stage");
break;
}
+
+ return TRUE;
}
/*
* Callback for socket writing
*/
-static void
+static gboolean
write_socket (void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
switch (task->state) {
case WRITE_REPLY:
write_reply (task);
- task->state = CLOSING_CONNECTION;
+ destroy_session (task->s);
+ return FALSE;
break;
case WRITE_ERROR:
write_reply (task);
- task->state = CLOSING_CONNECTION;
+ destroy_session (task->s);
+ return FALSE;
break;
case CLOSING_CONNECTION:
msg_debug ("write_socket: normally closing connection");
- free_task (task, TRUE);
+ destroy_session (task->s);
+ return FALSE;
break;
default:
msg_info ("write_socket: abnormally closing connection");
- free_task (task, TRUE);
+ destroy_session (task->s);
+ return FALSE;
break;
}
+ return TRUE;
}
/*
struct worker_task *task = (struct worker_task *)arg;
msg_info ("err_socket: abnormally closing connection, error: %s", err->message);
/* Free buffers */
- free_task (task, FALSE);
+ destroy_session (task->s);
}
struct worker_task *
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results);
new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal);
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->re_cache);
+ new_task->s = new_async_session (new_task->task_pool, free_task_hard, new_task);
return new_task;
}