description = g_hash_table_lookup (m->descriptions, sym->name);
obj = ucl_object_insert_key (obj, ucl_object_fromstring (sym->name), "name", 0, false);
+ obj = ucl_object_insert_key (obj, ucl_object_fromdouble (sym->score), "score", 0, false);
if (description) {
obj = ucl_object_insert_key (obj, ucl_object_fromstring (description), "description", 0, false);
}
return obj;
}
+/*
+ * GString ucl emitting functions
+ */
+static int
+rspamd_gstring_append_character (unsigned char c, size_t len, void *ud)
+{
+ GString *buf = ud;
+
+ if (len == 1) {
+ g_string_append_c (buf, c);
+ }
+ else {
+ if (buf->allocated_len - buf->len <= len) {
+ g_string_set_size (buf, buf->len + len + 1);
+ }
+ memset (&buf->str[buf->len], c, len);
+ buf->len += len;
+ buf->str[buf->len] = '\0';
+ }
+
+ return 0;
+}
+
+static int
+rspamd_gstring_append_len (const unsigned char *str, size_t len, void *ud)
+{
+ GString *buf = ud;
+
+ g_string_append_len (buf, str, len);
+
+ return 0;
+}
+
+static int
+rspamd_gstring_append_int (int64_t val, void *ud)
+{
+ GString *buf = ud;
+
+ rspamd_printf_gstring (buf, "%L", (intmax_t)val);
+ return 0;
+}
+
+static int
+rspamd_gstring_append_double (double val, void *ud)
+{
+ GString *buf = ud;
+ const double delta = 0.0000001;
+
+ if (val == (double)(int)val) {
+ rspamd_printf_gstring (buf, "%.1f", val);
+ }
+ else if (fabs (val - (double)(int)val) < delta) {
+ /* Write at maximum precision */
+ rspamd_printf_gstring (buf, "%.*g", DBL_DIG, val);
+ }
+ else {
+ rspamd_printf_gstring (buf, "%f", val);
+ }
+
+ return 0;
+}
+
static gboolean
-write_check_reply (struct worker_task *task)
+write_check_reply (struct rspamd_http_message *msg, struct worker_task *task)
{
GString *logbuf;
struct metric_result *metric_res;
GHashTableIter hiter;
gpointer h, v;
ucl_object_t *top = NULL, *obj;
+ struct ucl_emitter_functions func = {
+ .ucl_emitter_append_character = rspamd_gstring_append_character,
+ .ucl_emitter_append_len = rspamd_gstring_append_len,
+ .ucl_emitter_append_int = rspamd_gstring_append_int,
+ .ucl_emitter_append_double = rspamd_gstring_append_double
+ };
/* Output the first line - check status */
-
logbuf = g_string_sized_new (BUFSIZ);
rspamd_printf_gstring (logbuf, "id: <%s>, qid: <%s>, ", task->message_id, task->queue_id);
write_hashes_to_log (task, logbuf);
msg_info ("%v", logbuf);
+ msg->body = g_string_sized_new (BUFSIZ);
+ func.ud = msg->body;
+ ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT, &func);
+
/* Increase counters */
task->worker->srv->stat->messages_scanned++;
+ rspamd_http_connection_write_message (task->http_conn, msg, NULL,
+ "application/json", task, task->sock, &task->tv, task->ev_base);
return TRUE;
}
gboolean
write_reply (struct worker_task *task)
{
- gint r;
- gchar outbuf[OUTBUFSIZ];
+ struct rspamd_http_message *msg;
+
+ rspamd_http_connection_reset (task->http_conn);
+ msg = rspamd_http_new_message (HTTP_RESPONSE);
+ msg->date = time (NULL);
debug_task ("writing reply to client");
if (task->error_code != 0) {
- /* Write error message and error code to reply */
- if (task->is_http) {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "HTTP/1.0 400 Bad request" CRLF
- "Connection: close" CRLF CRLF "Error: %d - %s" CRLF, task->error_code, task->last_error);
- }
- else {
- if (task->proto == SPAMC_PROTO) {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s %d %s" CRLF CRLF,
- SPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), task->error_code, SPAMD_ERROR);
- debug_task ("writing error: %s", outbuf);
- }
- else {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s %d %s" CRLF "%s: %s" CRLF CRLF,
- RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), task->error_code, SPAMD_ERROR, ERROR_HEADER, task->last_error);
- debug_task ("writing error: %s", outbuf);
- }
- }
- /* Write to bufferevent error message */
- return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
+ msg->code = task->error_code;
+ rspamd_http_connection_write_message (task->http_conn, msg, NULL,
+ "text/plain", task, task->sock, &task->tv, task->ev_base);
+ task->state = CLOSING_CONNECTION;
+ return TRUE;
}
else {
switch (task->cmd) {
case CMD_CHECK:
case CMD_SYMBOLS:
case CMD_PROCESS:
- return write_check_reply (task);
- break;
case CMD_SKIP:
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 %s" CRLF,
- (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), SPAMD_OK);
- return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
+ task->state = CLOSING_CONNECTION;
+ return write_check_reply (msg, task);
break;
case CMD_PING:
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 PONG" CRLF,
- (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver));
- return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
+ rspamd_http_connection_write_message (task->http_conn, msg, NULL,
+ "text/plain", task, task->sock, &task->tv, task->ev_base);
+ task->state = CLOSING_CONNECTION;
break;
case CMD_LEARN:
- if (task->is_http) {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "HTTP/1.0 200 Ok" CRLF
- "Connection: close" CRLF CRLF "%s" CRLF, task->last_error);
- }
- else {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 LEARN" CRLF CRLF "%s" CRLF,
- (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER,
- rspamc_proto_str (task->proto_ver),
- task->last_error);
- }
- return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
+ msg->code = task->error_code;
+ rspamd_http_connection_write_message (task->http_conn, msg, NULL,
+ "text/plain", task, task->sock, &task->tv, task->ev_base);
+ task->state = CLOSING_CONNECTION;
+ return TRUE;
break;
case CMD_OTHER:
+ task->state = CLOSING_CONNECTION;
return task->custom_cmd->func (task);
}
}
{
struct worker_task *task = (struct worker_task *) arg;
struct rspamd_worker_ctx *ctx;
-
+ gint r;
+ GError *err = NULL;
ctx = task->worker->ctx;
task->fin_callback (task->fin_arg);
}
else {
- rspamd_dispatcher_restore (task->dispatcher);
+ write_reply (task);
}
return TRUE;
}
task->fin_callback (task->fin_arg);
}
else {
- rspamd_dispatcher_restore (task->dispatcher);
+ write_reply (task);
}
}
else {
task->fin_callback (task->fin_arg);
}
else {
- rspamd_dispatcher_restore (task->dispatcher);
+ write_reply (task);
}
}
else {
- /* Check normal filters in write callback */
- rspamd_dispatcher_restore (task->dispatcher);
+ task->state = WAIT_FILTER;
+ r = process_filters (task);
+ if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ write_reply (task);
+ }
+ /* Add task to classify to classify pool */
+ if (!task->is_skipped && ctx->classify_pool) {
+ register_async_thread (task->s);
+ g_thread_pool_push (ctx->classify_pool, task, &err);
+ if (err != NULL) {
+ msg_err ("cannot pull task to the pool: %s", err->message);
+ remove_async_thread (task->s);
+ g_error_free (err);
+ }
+ }
+ if (task->is_skipped) {
+ write_reply (task);
+ }
}
}
rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
{
+ struct worker_task *task = (struct worker_task *) conn->ud;
+ if (task->state == CLOSING_CONNECTION) {
+ msg_debug ("normally closing connection from: %s", inet_ntoa (task->client_addr));
+ destroy_session (task->s);
+ }
}
/*