Some fixes to lua redis library.
return NULL;
}
+/*
+ * Called if all filters are processed
+ */
static void
fin_learn_task (void *arg)
{
- struct worker_task *task = (struct worker_task *)arg;
-
- /* XXX: this is bad logic in fact */
- /* Process all statfiles */
- process_statfiles (task);
- /* Call post filters */
- lua_call_post_filters (task);
- task->state = WRITE_REPLY;
-
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
+ struct worker_task *task = (struct worker_task *) arg;
+
+ if (task->state != WRITING_REPLY) {
+ task->state = WRITE_REPLY;
+ /* Process all statfiles */
+ process_statfiles (task);
+ /* Call post filters */
+ lua_call_post_filters (task);
}
- else {
- rspamd_dispatcher_restore (task->dispatcher);
+
+ /* Check if we have all events finished */
+ if (task->state != WRITING_REPLY) {
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_dispatcher_restore (task->dispatcher);
+ }
}
}
+/*
+ * Called if session was restored inside fin callback
+ */
+static void
+restore_learn_task (void *arg)
+{
+ struct worker_task *task = (struct worker_task *) arg;
+
+ /* Special state */
+ task->state = WRITING_REPLY;
+
+ rspamd_dispatcher_pause (task->dispatcher);
+}
+
static gboolean
controller_read_socket (f_str_t * in, void *arg)
{
return FALSE;
}
/* Set up async session */
- task->s = new_async_session (task->task_pool, fin_learn_task, free_task_hard, task);
+ task->s = new_async_session (task->task_pool, fin_learn_task, restore_learn_task, free_task_hard, task);
r = process_filters (task);
if (r == -1) {
session->state = STATE_REPLY;
io_tv->tv_sec = ctx->timeout / 1000;
io_tv->tv_usec = ctx->timeout - io_tv->tv_sec * 1000;
- new_session->s = new_async_session (new_session->session_pool, NULL, free_session, new_session);
+ new_session->s = new_async_session (new_session->session_pool, NULL, NULL, free_session, new_session);
new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket,
controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
}
struct rspamd_async_session *
-new_async_session (memory_pool_t * pool, event_finalizer_t fin, event_finalizer_t cleanup, void *user_data)
+new_async_session (memory_pool_t * pool, event_finalizer_t fin,
+ event_finalizer_t restore, event_finalizer_t cleanup, void *user_data)
{
struct rspamd_async_session *new;
new = memory_pool_alloc (pool, sizeof (struct rspamd_async_session));
new->pool = pool;
new->fin = fin;
+ new->restore = restore;
new->cleanup = cleanup;
new->user_data = user_data;
new->wanna_die = FALSE;
if (session->fin != NULL) {
session->fin (session->user_data);
}
- /* No more events */
+ /* Check events count again */
+ if (g_hash_table_size (session->events) != 0) {
+ if (session->restore != NULL) {
+ session->restore (session->user_data);
+ }
+ return TRUE;
+ }
return FALSE;
}
struct rspamd_async_session {
event_finalizer_t fin;
+ event_finalizer_t restore;
event_finalizer_t cleanup;
GHashTable *events;
GQueue *forced_events;
* Make new async session
* @param pool pool to alloc memory from
* @param fin a callback called when no events are found in session
+ * @param restore a callback is called to restore processing of session
* @param cleanup a callback called when session is forcefully destroyed
* @param user_data abstract user data
* @return
*/
struct rspamd_async_session *new_async_session (memory_pool_t *pool,
- event_finalizer_t fin, event_finalizer_t cleanup, void *user_data);
+ event_finalizer_t fin, event_finalizer_t restore,
+ event_finalizer_t cleanup, void *user_data);
/**
* Insert new event to the session
longval);
}
else {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%l" CRLF,
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), ":%l" CRLF,
longval);
}
if (!rspamd_dispatcher_write (session->dispather, outbuf,
case KVSTORAGE_CMD_SYNC:
return session->argc == 1;
case KVSTORAGE_CMD_SET:
- return session->argc == 3;
+ return session->argc == 3 || session->argc == 4;
case KVSTORAGE_CMD_INCR:
case KVSTORAGE_CMD_DECR:
- return session->argc == 1 || session->argc == 2;
+ return session->argc == 2 || session->argc == 3;
default:
return session->argc == 2;
}
}
else if (session->argnum == 2) {
/* We get datablock for set command */
- if (session->command == KVSTORAGE_CMD_SET) {
+ if (session->command == KVSTORAGE_CMD_SET && session->argc == 3) {
session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
}
}
+ else if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) {
+ /* It is expire argument */
+ session->state = KVSTORAGE_STATE_READ_CMD;
+ rspamd_strtol (in->begin, in->len, (glong *)&session->expire);
+ session->argnum ++;
+ session->state = KVSTORAGE_STATE_READ_ARGLEN;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ }
else {
session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_strtol (in->begin, in->len, &session->arg_data.value);
return kvstorage_process_command (session, TRUE);
}
}
+ else if (session->argnum == 3) {
+ /* We get datablock for set command */
+ if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) {
+ session->state = KVSTORAGE_STATE_READ_CMD;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
+ in->begin, in->len,
+ session->flags, session->expire)) {
+ return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+ sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
+ sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ }
break;
case KVSTORAGE_STATE_READ_DATA:
session->state = KVSTORAGE_STATE_READ_CMD;
/* Error is nil */
lua_pushnil (ud->L);
/* Data */
- lua_pushlstring (ud->L, r->str, r->len);
+ if (r->type == REDIS_REPLY_STRING) {
+ lua_pushlstring (ud->L, r->str, r->len);
+ }
+ else if (r->type == REDIS_REPLY_INTEGER) {
+ lua_pushnumber (ud->L, r->integer);
+ }
+ else if (r->type == REDIS_REPLY_STATUS) {
+ lua_pushlstring (ud->L, r->str, r->len);
+ }
+ else {
+ msg_info ("bad type is passed: %d", r->type);
+ lua_pushnil (ud->L);
+ }
if (lua_pcall (ud->L, 3, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
if (c->err == 0) {
if (r != NULL) {
- lua_redis_push_data (reply, ud);
+ if (reply->type != REDIS_REPLY_ERROR) {
+ lua_redis_push_data (reply, ud);
+ }
+ else {
+ lua_redis_push_error (reply->str, ud, TRUE);
+ }
}
else {
- lua_redis_push_error ("received no data from server", ud, TRUE);
+ lua_redis_push_error ("received no data from server", ud, FALSE);
}
}
else {
WRITE_REPLY,
WRITE_ERROR,
WAIT_FILTER,
+ WAIT_POST_FILTER,
CLOSING_CONNECTION,
WRITING_REPLY
} state; /**< current session state */
/* Resolve client's addr */
/* Set up async session */
- session->s = new_async_session (session->pool, NULL, free_smtp_session, session);
+ session->s = new_async_session (session->pool, NULL, NULL, free_smtp_session, session);
session->state = SMTP_STATE_RESOLVE_REVERSE;
if (! make_dns_request (session->resolver, session->s, session->pool,
smtp_dns_cb, session, DNS_REQUEST_PTR, &session->client_addr)) {
return write_socket (task);
break;
case WAIT_FILTER:
+ case WAIT_POST_FILTER:
msg_info ("ignoring trailing garbadge of size %z", in->len);
break;
default:
return FALSE;
break;
case WRITING_REPLY:
+ case WAIT_FILTER:
+ case WAIT_POST_FILTER:
/* Do nothing here */
break;
default:
- msg_info ("abnormally closing connection");
+ msg_info ("abnormally closing connection at state: %d", task->state);
if (ctx->is_custom) {
fin_custom_filters (task);
}
{
struct worker_task *task = (struct worker_task *) arg;
- /* Process all statfiles */
- process_statfiles (task);
- /* Call post filters */
- lua_call_post_filters (task);
- task->state = WRITE_REPLY;
+ if (task->state != WAIT_POST_FILTER) {
+ /* Process all statfiles */
+ process_statfiles (task);
+ /* Call post filters */
+ lua_call_post_filters (task);
+ }
+ /* Check if we have all events finished */
+ task->state = WRITE_REPLY;
if (task->fin_callback) {
task->fin_callback (task->fin_arg);
}
}
}
+/*
+ * Called if session was restored inside fin callback
+ */
+static void
+restore_task (void *arg)
+{
+ struct worker_task *task = (struct worker_task *) arg;
+
+ /* Special state */
+ task->state = WAIT_POST_FILTER;
+}
+
/*
* Reduce number of tasks proceeded
*/
/* Set up async session */
new_task->s =
- new_async_session (new_task->task_pool, fin_task, free_task_hard, new_task);
+ new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task);
/* Init custom filters */
#ifndef BUILD_STATIC