aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c107
1 files changed, 61 insertions, 46 deletions
diff --git a/src/worker.c b/src/worker.c
index 726f7fe00..4832c71fc 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -4,11 +4,11 @@
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
@@ -40,8 +40,8 @@
#include <evdns.h>
#ifndef WITHOUT_PERL
-#include <EXTERN.h> /* from the Perl distribution */
-#include <perl.h> /* from the Perl distribution */
+#include <EXTERN.h> /* from the Perl distribution */
+#include <perl.h> /* from the Perl distribution */
extern PerlInterpreter *perl_interpreter;
#endif
@@ -111,7 +111,7 @@ rcpt_destruct (void *pointer)
/*
* Free all structures of worker_task
*/
-static void
+void
free_task (struct worker_task *task, gboolean is_soft)
{
GList *part;
@@ -135,14 +135,18 @@ free_task (struct worker_task *task, gboolean is_soft)
g_list_free (task->urls);
}
memory_pool_delete (task->task_pool);
- if (is_soft) {
- /* Plan dispatcher shutdown */
- task->dispatcher->wanna_die = 1;
+ if (task->dispatcher) {
+ if (is_soft) {
+ /* Plan dispatcher shutdown */
+ task->dispatcher->wanna_die = 1;
+ }
+ else {
+ rspamd_remove_dispatcher (task->dispatcher);
+ }
}
- else {
- rspamd_remove_dispatcher (task->dispatcher);
+ if (task->sock != -1) {
+ close (task->sock);
}
- close (task->sock);
g_free (task);
}
}
@@ -174,13 +178,13 @@ read_socket (f_str_t *in, void *arg)
task->msg->len = in->len;
msg_debug ("read_socket: got string of length %ld", (long int)task->msg->len);
r = process_message (task);
- if (r == -1) {
- msg_warn ("read_socket: processing of message failed");
+ if (r == -1) {
+ msg_warn ("read_socket: processing of message failed");
task->last_error = "MIME processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
write_socket (task);
- }
+ }
if (task->cmd == CMD_OTHER) {
/* Skip filters */
task->state = WRITE_REPLY;
@@ -249,36 +253,10 @@ err_socket (GError *err, void *arg)
free_task (task, FALSE);
}
-/*
- * Accept new connection and construct task
- */
-static void
-accept_socket (int fd, short what, void *arg)
+struct worker_task *
+construct_task (struct rspamd_worker *worker)
{
- struct rspamd_worker *worker = (struct rspamd_worker *)arg;
- struct sockaddr_storage ss;
- struct sockaddr_in *sin;
struct worker_task *new_task;
- socklen_t addrlen = sizeof(ss);
- int nfd;
-
- if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
- msg_warn ("accept_socket: accept failed: %s", strerror (errno));
- return;
- }
- /* Check for EAGAIN */
- if (nfd == 0) {
- msg_debug ("accept_socket: cannot accept socket as it was already accepted by other worker");
- return;
- }
-
- if (ss.ss_family == AF_UNIX) {
- msg_info ("accept_socket: accepted connection from unix socket");
- }
- else if (ss.ss_family == AF_INET) {
- sin = (struct sockaddr_in *) &ss;
- msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
- }
new_task = g_malloc (sizeof (struct worker_task));
@@ -286,7 +264,6 @@ accept_socket (int fd, short what, void *arg)
bzero (new_task, sizeof (struct worker_task));
new_task->worker = worker;
new_task->state = READ_COMMAND;
- new_task->sock = nfd;
new_task->cfg = worker->srv->cfg;
new_task->from_addr.s_addr = INADDR_NONE;
new_task->view_checked = FALSE;
@@ -307,12 +284,50 @@ accept_socket (int fd, short what, void *arg)
new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal);
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->re_cache);
+ return new_task;
+}
+
+/*
+ * Accept new connection and construct task
+ */
+static void
+accept_socket (int fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct sockaddr_storage ss;
+ struct sockaddr_in *sin;
+ struct worker_task *new_task;
+ socklen_t addrlen = sizeof(ss);
+ int nfd;
+
+ if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
+ msg_warn ("accept_socket: accept failed: %s", strerror (errno));
+ return;
+ }
+ /* Check for EAGAIN */
+ if (nfd == 0) {
+ msg_debug ("accept_socket: cannot accept socket as it was already accepted by other worker");
+ return;
+ }
+
+ if (ss.ss_family == AF_UNIX) {
+ msg_info ("accept_socket: accepted connection from unix socket");
+ }
+ else if (ss.ss_family == AF_INET) {
+ sin = (struct sockaddr_in *) &ss;
+ msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
+ }
+
+ new_task = construct_task (worker);
+
+ new_task->sock = nfd;
worker->srv->stat->connections_count ++;
/* Set up dispatcher */
new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket,
write_socket, err_socket, &io_tv,
(void *)new_task);
+
}
/*