Browse Source

Implement HTTP session for normal worker.

tags/0.7.0
Vsevolod Stakhov 10 years ago
parent
commit
f16357c9ac
4 changed files with 132 additions and 54 deletions
  1. 4
    1
      src/events.c
  2. 0
    5
      src/http.c
  3. 97
    42
      src/protocol.c
  4. 31
    6
      src/worker.c

+ 4
- 1
src/events.c View File

@@ -200,8 +200,8 @@ check_session_pending (struct rspamd_async_session *session)
g_cond_wait (session->cond, session->mtx);
}
if (session->fin != NULL) {
g_mutex_unlock (session->mtx);
if (! session->fin (session->user_data)) {
g_mutex_unlock (session->mtx);
/* Session finished incompletely, perform restoration */
if (session->restore != NULL) {
session->restore (session->user_data);
@@ -210,6 +210,9 @@ check_session_pending (struct rspamd_async_session *session)
}
return TRUE;
}
else {
return FALSE;
}
}
g_mutex_unlock (session->mtx);
return FALSE;

+ 0
- 5
src/http.c View File

@@ -639,11 +639,6 @@ rspamd_http_connection_reset (struct rspamd_http_connection *conn)
g_slice_free1 (sizeof (struct iovec) * priv->outlen, priv->out);
priv->out = NULL;
}

/* Clear conn itself */
if (conn->fd != -1) {
close (conn->fd);
}
}

void

+ 97
- 42
src/protocol.c View File

@@ -809,6 +809,7 @@ rspamd_metric_symbol_ucl (struct worker_task *task, struct metric *m,
description = g_hash_table_lookup (m->descriptions, sym->name);

obj = ucl_object_insert_key (obj, ucl_object_fromstring (sym->name), "name", 0, false);
obj = ucl_object_insert_key (obj, ucl_object_fromdouble (sym->score), "score", 0, false);
if (description) {
obj = ucl_object_insert_key (obj, ucl_object_fromstring (description), "description", 0, false);
}
@@ -882,17 +883,84 @@ rspamd_metric_result_ucl (struct worker_task *task, struct metric_result *mres,
return obj;
}

/*
* GString ucl emitting functions
*/
static int
rspamd_gstring_append_character (unsigned char c, size_t len, void *ud)
{
GString *buf = ud;

if (len == 1) {
g_string_append_c (buf, c);
}
else {
if (buf->allocated_len - buf->len <= len) {
g_string_set_size (buf, buf->len + len + 1);
}
memset (&buf->str[buf->len], c, len);
buf->len += len;
buf->str[buf->len] = '\0';
}

return 0;
}

static int
rspamd_gstring_append_len (const unsigned char *str, size_t len, void *ud)
{
GString *buf = ud;

g_string_append_len (buf, str, len);

return 0;
}

static int
rspamd_gstring_append_int (int64_t val, void *ud)
{
GString *buf = ud;

rspamd_printf_gstring (buf, "%L", (intmax_t)val);
return 0;
}

static int
rspamd_gstring_append_double (double val, void *ud)
{
GString *buf = ud;
const double delta = 0.0000001;

if (val == (double)(int)val) {
rspamd_printf_gstring (buf, "%.1f", val);
}
else if (fabs (val - (double)(int)val) < delta) {
/* Write at maximum precision */
rspamd_printf_gstring (buf, "%.*g", DBL_DIG, val);
}
else {
rspamd_printf_gstring (buf, "%f", val);
}

return 0;
}

static gboolean
write_check_reply (struct worker_task *task)
write_check_reply (struct rspamd_http_message *msg, struct worker_task *task)
{
GString *logbuf;
struct metric_result *metric_res;
GHashTableIter hiter;
gpointer h, v;
ucl_object_t *top = NULL, *obj;
struct ucl_emitter_functions func = {
.ucl_emitter_append_character = rspamd_gstring_append_character,
.ucl_emitter_append_len = rspamd_gstring_append_len,
.ucl_emitter_append_int = rspamd_gstring_append_int,
.ucl_emitter_append_double = rspamd_gstring_append_double
};

/* Output the first line - check status */

logbuf = g_string_sized_new (BUFSIZ);
rspamd_printf_gstring (logbuf, "id: <%s>, qid: <%s>, ", task->message_id, task->queue_id);

@@ -925,39 +993,34 @@ write_check_reply (struct worker_task *task)
write_hashes_to_log (task, logbuf);
msg_info ("%v", logbuf);

msg->body = g_string_sized_new (BUFSIZ);
func.ud = msg->body;
ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT, &func);

/* Increase counters */
task->worker->srv->stat->messages_scanned++;

rspamd_http_connection_write_message (task->http_conn, msg, NULL,
"application/json", task, task->sock, &task->tv, task->ev_base);
return TRUE;
}

gboolean
write_reply (struct worker_task *task)
{
gint r;
gchar outbuf[OUTBUFSIZ];
struct rspamd_http_message *msg;

rspamd_http_connection_reset (task->http_conn);
msg = rspamd_http_new_message (HTTP_RESPONSE);
msg->date = time (NULL);

debug_task ("writing reply to client");
if (task->error_code != 0) {
/* Write error message and error code to reply */
if (task->is_http) {
r = rspamd_snprintf (outbuf, sizeof (outbuf), "HTTP/1.0 400 Bad request" CRLF
"Connection: close" CRLF CRLF "Error: %d - %s" CRLF, task->error_code, task->last_error);
}
else {
if (task->proto == SPAMC_PROTO) {
r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s %d %s" CRLF CRLF,
SPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), task->error_code, SPAMD_ERROR);
debug_task ("writing error: %s", outbuf);
}
else {
r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s %d %s" CRLF "%s: %s" CRLF CRLF,
RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), task->error_code, SPAMD_ERROR, ERROR_HEADER, task->last_error);
debug_task ("writing error: %s", outbuf);
}
}
/* Write to bufferevent error message */
return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
msg->code = task->error_code;
rspamd_http_connection_write_message (task->http_conn, msg, NULL,
"text/plain", task, task->sock, &task->tv, task->ev_base);
task->state = CLOSING_CONNECTION;
return TRUE;
}
else {
switch (task->cmd) {
@@ -966,32 +1029,24 @@ write_reply (struct worker_task *task)
case CMD_CHECK:
case CMD_SYMBOLS:
case CMD_PROCESS:
return write_check_reply (task);
break;
case CMD_SKIP:
r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 %s" CRLF,
(task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), SPAMD_OK);
return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
task->state = CLOSING_CONNECTION;
return write_check_reply (msg, task);
break;
case CMD_PING:
r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 PONG" CRLF,
(task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver));
return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
rspamd_http_connection_write_message (task->http_conn, msg, NULL,
"text/plain", task, task->sock, &task->tv, task->ev_base);
task->state = CLOSING_CONNECTION;
break;
case CMD_LEARN:
if (task->is_http) {
r = rspamd_snprintf (outbuf, sizeof (outbuf), "HTTP/1.0 200 Ok" CRLF
"Connection: close" CRLF CRLF "%s" CRLF, task->last_error);
}
else {
r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 LEARN" CRLF CRLF "%s" CRLF,
(task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER,
rspamc_proto_str (task->proto_ver),
task->last_error);
}
return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
msg->code = task->error_code;
rspamd_http_connection_write_message (task->http_conn, msg, NULL,
"text/plain", task, task->sock, &task->tv, task->ev_base);
task->state = CLOSING_CONNECTION;
return TRUE;
break;
case CMD_OTHER:
task->state = CLOSING_CONNECTION;
return task->custom_cmd->func (task);
}
}

+ 31
- 6
src/worker.c View File

@@ -399,7 +399,8 @@ fin_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
struct rspamd_worker_ctx *ctx;

gint r;
GError *err = NULL;

ctx = task->worker->ctx;

@@ -409,7 +410,7 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
rspamd_dispatcher_restore (task->dispatcher);
write_reply (task);
}
return TRUE;
}
@@ -442,7 +443,7 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
rspamd_dispatcher_restore (task->dispatcher);
write_reply (task);
}
}
else {
@@ -454,12 +455,31 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
rspamd_dispatcher_restore (task->dispatcher);
write_reply (task);
}
}
else {
/* Check normal filters in write callback */
rspamd_dispatcher_restore (task->dispatcher);
task->state = WAIT_FILTER;
r = process_filters (task);
if (r == -1) {
task->last_error = "Filter processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
write_reply (task);
}
/* Add task to classify to classify pool */
if (!task->is_skipped && ctx->classify_pool) {
register_async_thread (task->s);
g_thread_pool_push (ctx->classify_pool, task, &err);
if (err != NULL) {
msg_err ("cannot pull task to the pool: %s", err->message);
remove_async_thread (task->s);
g_error_free (err);
}
}
if (task->is_skipped) {
write_reply (task);
}
}
}

@@ -585,7 +605,12 @@ static void
rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
{
struct worker_task *task = (struct worker_task *) conn->ud;

if (task->state == CLOSING_CONNECTION) {
msg_debug ("normally closing connection from: %s", inet_ntoa (task->client_addr));
destroy_session (task->s);
}
}

/*

Loading…
Cancel
Save