Sfoglia il codice sorgente

Allow for post filters to register async events.

Some fixes to lua redis library.
tags/0.4.6
Vsevolod Stakhov 12 anni fa
parent
commit
e5c2094331
8 ha cambiato i file con 126 aggiunte e 33 eliminazioni
  1. 36
    15
      src/controller.c
  2. 10
    2
      src/events.c
  3. 4
    1
      src/events.h
  4. 29
    4
      src/kvstorage_server.c
  5. 20
    3
      src/lua/lua_redis.c
  6. 1
    0
      src/main.h
  7. 1
    1
      src/smtp.c
  8. 25
    7
      src/worker.c

+ 36
- 15
src/controller.c Vedi File

@@ -832,26 +832,47 @@ process_normal_command (const gchar *line)
return NULL;
}

/*
* Called if all filters are processed
*/
static void
fin_learn_task (void *arg)
{
struct worker_task *task = (struct worker_task *)arg;

/* XXX: this is bad logic in fact */
/* Process all statfiles */
process_statfiles (task);
/* Call post filters */
lua_call_post_filters (task);
task->state = WRITE_REPLY;

if (task->fin_callback) {
task->fin_callback (task->fin_arg);
struct worker_task *task = (struct worker_task *) arg;

if (task->state != WRITING_REPLY) {
task->state = WRITE_REPLY;
/* Process all statfiles */
process_statfiles (task);
/* Call post filters */
lua_call_post_filters (task);
}
else {
rspamd_dispatcher_restore (task->dispatcher);

/* Check if we have all events finished */
if (task->state != WRITING_REPLY) {
if (task->fin_callback) {
task->fin_callback (task->fin_arg);
}
else {
rspamd_dispatcher_restore (task->dispatcher);
}
}
}

/*
* Called if session was restored inside fin callback
*/
static void
restore_learn_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;

/* Special state */
task->state = WRITING_REPLY;

rspamd_dispatcher_pause (task->dispatcher);
}

static gboolean
controller_read_socket (f_str_t * in, void *arg)
{
@@ -971,7 +992,7 @@ controller_read_socket (f_str_t * in, void *arg)
return FALSE;
}
/* Set up async session */
task->s = new_async_session (task->task_pool, fin_learn_task, free_task_hard, task);
task->s = new_async_session (task->task_pool, fin_learn_task, restore_learn_task, free_task_hard, task);
r = process_filters (task);
if (r == -1) {
session->state = STATE_REPLY;
@@ -1187,7 +1208,7 @@ accept_socket (gint fd, short what, void *arg)
io_tv->tv_sec = ctx->timeout / 1000;
io_tv->tv_usec = ctx->timeout - io_tv->tv_sec * 1000;

new_session->s = new_async_session (new_session->session_pool, NULL, free_session, new_session);
new_session->s = new_async_session (new_session->session_pool, NULL, NULL, free_session, new_session);

new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket,
controller_write_socket, controller_err_socket, io_tv, (void *)new_session);

+ 10
- 2
src/events.c Vedi File

@@ -58,13 +58,15 @@ rspamd_event_hash (gconstpointer a)
}

struct rspamd_async_session *
new_async_session (memory_pool_t * pool, event_finalizer_t fin, event_finalizer_t cleanup, void *user_data)
new_async_session (memory_pool_t * pool, event_finalizer_t fin,
event_finalizer_t restore, event_finalizer_t cleanup, void *user_data)
{
struct rspamd_async_session *new;

new = memory_pool_alloc (pool, sizeof (struct rspamd_async_session));
new->pool = pool;
new->fin = fin;
new->restore = restore;
new->cleanup = cleanup;
new->user_data = user_data;
new->wanna_die = FALSE;
@@ -207,7 +209,13 @@ check_session_pending (struct rspamd_async_session *session)
if (session->fin != NULL) {
session->fin (session->user_data);
}
/* No more events */
/* Check events count again */
if (g_hash_table_size (session->events) != 0) {
if (session->restore != NULL) {
session->restore (session->user_data);
}
return TRUE;
}
return FALSE;
}


+ 4
- 1
src/events.h Vedi File

@@ -16,6 +16,7 @@ struct rspamd_async_event {

struct rspamd_async_session {
event_finalizer_t fin;
event_finalizer_t restore;
event_finalizer_t cleanup;
GHashTable *events;
GQueue *forced_events;
@@ -28,12 +29,14 @@ struct rspamd_async_session {
* Make new async session
* @param pool pool to alloc memory from
* @param fin a callback called when no events are found in session
* @param restore a callback is called to restore processing of session
* @param cleanup a callback called when session is forcefully destroyed
* @param user_data abstract user data
* @return
*/
struct rspamd_async_session *new_async_session (memory_pool_t *pool,
event_finalizer_t fin, event_finalizer_t cleanup, void *user_data);
event_finalizer_t fin, event_finalizer_t restore,
event_finalizer_t cleanup, void *user_data);

/**
* Insert new event to the session

+ 29
- 4
src/kvstorage_server.c Vedi File

@@ -580,7 +580,7 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
longval);
}
else {
r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%l" CRLF,
r = rspamd_snprintf (outbuf, sizeof (outbuf), ":%l" CRLF,
longval);
}
if (!rspamd_dispatcher_write (session->dispather, outbuf,
@@ -691,10 +691,10 @@ kvstorage_check_argnum (struct kvstorage_session *session)
case KVSTORAGE_CMD_SYNC:
return session->argc == 1;
case KVSTORAGE_CMD_SET:
return session->argc == 3;
return session->argc == 3 || session->argc == 4;
case KVSTORAGE_CMD_INCR:
case KVSTORAGE_CMD_DECR:
return session->argc == 1 || session->argc == 2;
return session->argc == 2 || session->argc == 3;
default:
return session->argc == 2;
}
@@ -831,7 +831,7 @@ kvstorage_read_socket (f_str_t * in, void *arg)
}
else if (session->argnum == 2) {
/* We get datablock for set command */
if (session->command == KVSTORAGE_CMD_SET) {
if (session->command == KVSTORAGE_CMD_SET && session->argc == 3) {
session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
@@ -845,6 +845,14 @@ kvstorage_read_socket (f_str_t * in, void *arg)
sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
}
}
else if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) {
/* It is expire argument */
session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_strtol (in->begin, in->len, (glong *)&session->expire);
session->argnum ++;
session->state = KVSTORAGE_STATE_READ_ARGLEN;
rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
}
else {
session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_strtol (in->begin, in->len, &session->arg_data.value);
@@ -855,6 +863,23 @@ kvstorage_read_socket (f_str_t * in, void *arg)
return kvstorage_process_command (session, TRUE);
}
}
else if (session->argnum == 3) {
/* We get datablock for set command */
if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) {
session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
in->begin, in->len,
session->flags, session->expire)) {
return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
}
else {
return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
}
}
}
break;
case KVSTORAGE_STATE_READ_DATA:
session->state = KVSTORAGE_STATE_READ_CMD;

+ 20
- 3
src/lua/lua_redis.c Vedi File

@@ -134,7 +134,19 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud)
/* Error is nil */
lua_pushnil (ud->L);
/* Data */
lua_pushlstring (ud->L, r->str, r->len);
if (r->type == REDIS_REPLY_STRING) {
lua_pushlstring (ud->L, r->str, r->len);
}
else if (r->type == REDIS_REPLY_INTEGER) {
lua_pushnumber (ud->L, r->integer);
}
else if (r->type == REDIS_REPLY_STATUS) {
lua_pushlstring (ud->L, r->str, r->len);
}
else {
msg_info ("bad type is passed: %d", r->type);
lua_pushnil (ud->L);
}

if (lua_pcall (ud->L, 3, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
@@ -157,10 +169,15 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)

if (c->err == 0) {
if (r != NULL) {
lua_redis_push_data (reply, ud);
if (reply->type != REDIS_REPLY_ERROR) {
lua_redis_push_data (reply, ud);
}
else {
lua_redis_push_error (reply->str, ud, TRUE);
}
}
else {
lua_redis_push_error ("received no data from server", ud, TRUE);
lua_redis_push_error ("received no data from server", ud, FALSE);
}
}
else {

+ 1
- 0
src/main.h Vedi File

@@ -177,6 +177,7 @@ struct worker_task {
WRITE_REPLY,
WRITE_ERROR,
WAIT_FILTER,
WAIT_POST_FILTER,
CLOSING_CONNECTION,
WRITING_REPLY
} state; /**< current session state */

+ 1
- 1
src/smtp.c Vedi File

@@ -700,7 +700,7 @@ accept_socket (gint fd, short what, void *arg)

/* Resolve client's addr */
/* Set up async session */
session->s = new_async_session (session->pool, NULL, free_smtp_session, session);
session->s = new_async_session (session->pool, NULL, NULL, free_smtp_session, session);
session->state = SMTP_STATE_RESOLVE_REVERSE;
if (! make_dns_request (session->resolver, session->s, session->pool,
smtp_dns_cb, session, DNS_REQUEST_PTR, &session->client_addr)) {

+ 25
- 7
src/worker.c Vedi File

@@ -494,6 +494,7 @@ read_socket (f_str_t * in, void *arg)
return write_socket (task);
break;
case WAIT_FILTER:
case WAIT_POST_FILTER:
msg_info ("ignoring trailing garbadge of size %z", in->len);
break;
default:
@@ -547,10 +548,12 @@ write_socket (void *arg)
return FALSE;
break;
case WRITING_REPLY:
case WAIT_FILTER:
case WAIT_POST_FILTER:
/* Do nothing here */
break;
default:
msg_info ("abnormally closing connection");
msg_info ("abnormally closing connection at state: %d", task->state);
if (ctx->is_custom) {
fin_custom_filters (task);
}
@@ -589,12 +592,15 @@ fin_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;

/* Process all statfiles */
process_statfiles (task);
/* Call post filters */
lua_call_post_filters (task);
task->state = WRITE_REPLY;
if (task->state != WAIT_POST_FILTER) {
/* Process all statfiles */
process_statfiles (task);
/* Call post filters */
lua_call_post_filters (task);
}

/* Check if we have all events finished */
task->state = WRITE_REPLY;
if (task->fin_callback) {
task->fin_callback (task->fin_arg);
}
@@ -603,6 +609,18 @@ fin_task (void *arg)
}
}

/*
* Called if session was restored inside fin callback
*/
static void
restore_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;

/* Special state */
task->state = WAIT_POST_FILTER;
}

/*
* Reduce number of tasks proceeded
*/
@@ -682,7 +700,7 @@ accept_socket (gint fd, short what, void *arg)

/* Set up async session */
new_task->s =
new_async_session (new_task->task_pool, fin_task, free_task_hard, new_task);
new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task);

/* Init custom filters */
#ifndef BUILD_STATIC

Loading…
Annulla
Salva