]> source.dussan.org Git - rspamd.git/commitdiff
* Add support for extending controller protocol by modules
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 29 Jul 2009 16:17:29 +0000 (20:17 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 29 Jul 2009 16:17:29 +0000 (20:17 +0400)
* Add write support via controller to fuzzy storage

TODO: Add delete and check commands support to controller interface

src/controller.c
src/lmtp.c
src/main.h
src/plugins/fuzzy_check.c
src/worker.c

index 87e7f1b0f9fff478d28b2c6594e55013ba946290..a4c0a7bb482a4093f67c269a774a84f059c45e1a 100644 (file)
@@ -53,22 +53,31 @@ enum command_type {
 
 struct controller_command {
        char *command;
-       int privilleged;
+       gboolean privilleged;
        enum command_type type;
 };
 
+struct custom_controller_command {
+       const char *command;
+       gboolean privilleged;
+       gboolean require_message;
+       controller_func_t handler;      
+};
+
 static struct controller_command commands[] = {
-       {"password", 0, COMMAND_PASSWORD},
-       {"quit", 0, COMMAND_QUIT},
-       {"reload", 1, COMMAND_RELOAD},
-       {"stat", 0, COMMAND_STAT},
-       {"shutdown", 1, COMMAND_SHUTDOWN},
-       {"uptime", 0, COMMAND_UPTIME},
-       {"learn", 1, COMMAND_LEARN},
-       {"help", 0, COMMAND_HELP},
-       {"counters", 0, COMMAND_COUNTERS},
+       {"password", FALSE, COMMAND_PASSWORD},
+       {"quit", FALSE, COMMAND_QUIT},
+       {"reload", TRUE, COMMAND_RELOAD},
+       {"stat", FALSE, COMMAND_STAT},
+       {"shutdown", TRUE, COMMAND_SHUTDOWN},
+       {"uptime", FALSE, COMMAND_UPTIME},
+       {"learn", TRUE, COMMAND_LEARN},
+       {"help", FALSE, COMMAND_HELP},
+       {"counters", FALSE, COMMAND_COUNTERS},
 };
 
+static GList *custom_commands = NULL;
+
 static GCompletion *comp;
 static time_t start_time;
 
@@ -395,6 +404,26 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
        }
 }
 
+static gboolean
+process_custom_command (char *line, char **cmd_args, struct controller_session *session)
+{
+       GList *cur;
+       struct custom_controller_command *cmd;
+
+       cur = custom_commands;
+       while (cur) {
+               cmd = cur->data;
+               if (g_ascii_strcasecmp (cmd->command, line) == 0) {
+                       /* Call handler */
+                       cmd->handler (cmd_args, session);
+                       return TRUE;
+               }
+               cur = g_list_next (cur);
+       }
+
+       return FALSE;
+}
+
 static void
 controller_read_socket (f_str_t *in, void *arg)
 {
@@ -421,9 +450,11 @@ controller_read_socket (f_str_t *in, void *arg)
                                                process_command ((struct controller_command *)comp_list->data, &params[1], session);
                                                break;
                                        case 0:
-                                               msg_debug ("Unknown command: '%s'", cmd);
-                                               i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
-                                               rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
+                                               if (!process_custom_command (cmd, &params[1], session)) {
+                                                       msg_debug ("Unknown command: '%s'", cmd);
+                                                       i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
+                                                       rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
+                                               }
                                                break;
                                        default:
                                                msg_debug ("Ambigious command: '%s'", cmd);
@@ -472,6 +503,15 @@ controller_read_socket (f_str_t *in, void *arg)
 
                        session->state = STATE_REPLY;
                        break;
+               case STATE_OTHER:
+                       if (session->other_handler) {
+                               session->other_handler (session, in);
+                       }
+                       session->state = STATE_REPLY;
+                       break;
+               case STATE_WAIT:
+                       rspamd_dispatcher_pause (session->dispatcher);
+                       break;
                default:
                        msg_debug ("controller_read_socket: unknown state while reading %d", session->state);
                        break;
@@ -595,6 +635,18 @@ start_controller (struct rspamd_worker *worker)
        exit (EXIT_SUCCESS);
 }
 
+void 
+register_custom_controller_command (const char *name, controller_func_t handler, gboolean privilleged, gboolean require_message)
+{
+       struct custom_controller_command *cmd;
+
+       cmd->command = name;
+       cmd->handler = handler;
+       cmd->privilleged = privilleged;
+       cmd->require_message = require_message;
+
+       custom_commands = g_list_prepend (custom_commands, cmd);
+}
 
 /* 
  * vi:ts=4 
index d1928cf5ce3b726768fe09a0872d4e9d08e7cc65..6554ba4f4d6b9ba7962d95f90c5207c0fa5897c7 100644 (file)
@@ -85,13 +85,13 @@ rcpt_destruct (void *pointer)
  * Free all structures of lmtp proto
  */
 static void
-free_task (struct rspamd_lmtp_proto *lmtp, gboolean is_soft)
+free_lmtp_task (struct rspamd_lmtp_proto *lmtp, gboolean is_soft)
 {
        GList *part;
        struct mime_part *p;
 
        if (lmtp) {
-               msg_debug ("free_task: free pointer %p", lmtp->task);
+               msg_debug ("free_lmtp_task: free pointer %p", lmtp->task);
                if (lmtp->task->memc_ctx) {
                        memc_close_ctx (lmtp->task->memc_ctx);
                }
@@ -188,7 +188,7 @@ lmtp_write_socket (void *arg)
                        break;
                case CLOSING_CONNECTION:
                        msg_debug ("lmtp_write_socket: normally closing connection");
-                       free_task (lmtp, TRUE);
+                       free_lmtp_task (lmtp, TRUE);
                        break;
                default:
                        msg_debug ("lmtp_write_socket: invalid state while writing to socket %d", lmtp->task->state);
@@ -205,7 +205,7 @@ lmtp_err_socket (GError *err, void *arg)
        struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
        msg_info ("lmtp_err_socket: abnormally closing connection, error: %s", err->message);
        /* Free buffers */
-       free_task (lmtp, FALSE);
+       free_lmtp_task (lmtp, FALSE);
 }
 
 /*
index e87b4efde730214b476f05551f947c6ec4c3d565..ee34a2caebc85dd86e9c6f31fa1e791ef231f69b 100644 (file)
@@ -117,6 +117,8 @@ struct save_point {
        unsigned int saved;                                                                                     /**< how much time we have delayed processing           */
 };
 
+
+
 /**
  * Control session object
  */
@@ -127,6 +129,8 @@ struct controller_session {
                STATE_LEARN,
                STATE_REPLY,
                STATE_QUIT,
+               STATE_OTHER,
+               STATE_WAIT,
        } state;                                                                                                        /**< current session state                                                      */
        int sock;                                                                                                       /**< socket descriptor                                                          */
        /* Access to authorized commands */
@@ -142,8 +146,13 @@ struct controller_session {
        f_str_t *learn_buf;                                                                                     /**< learn input                                                                        */
        GList *parts;                                                                                           /**< extracted mime parts                                                       */
        int in_class;                                                                                           /**< positive or negative learn                                         */
+       void (*other_handler)(struct controller_session *session, 
+                                                               f_str_t *in);                                   /**< other command handler to execute at the end of processing */
+       void *other_data;                                                                                       /**< and its data                                                                       */
 };
 
+typedef void (*controller_func_t)(char **args, struct controller_session *session);
+
 /**
  * Worker task structure
  */
@@ -213,6 +222,20 @@ struct c_module {
 void start_worker (struct rspamd_worker *worker);
 void start_controller (struct rspamd_worker *worker);
 
+/**
+ * Register custom controller function
+ */
+void register_custom_controller_command (const char *name, controller_func_t handler, gboolean privilleged, gboolean require_message);
+
+/**
+ * Construct new task for worker
+ */
+struct worker_task* construct_task (struct rspamd_worker *worker);
+/**
+ * Destroy task object and remove its IO dispatcher if it exists
+ */
+void free_task (struct worker_task *task, gboolean is_soft);
+
 /**
  * If set, reopen log file on next write
  */
index 085ee5164ac20ee4c426cb52e1f40c6fab9a8106..e4fdaba9363fb401c1db467c80f2a21f08ae4f34 100644 (file)
@@ -68,6 +68,14 @@ struct fuzzy_client_session {
        struct worker_task *task;
 };
 
+struct fuzzy_learn_session {
+       struct event ev;
+       fuzzy_hash_t *h;
+       struct timeval tv;
+       struct controller_session *session;
+       struct worker_task *task;
+};
+
 static struct fuzzy_ctx *fuzzy_module_ctx = NULL;
 
 static int fuzzy_mime_filter (struct worker_task *task);
@@ -91,6 +99,7 @@ parse_servers_string (char *str)
 
                if ((p = strchr (strvec[i], ':')) != NULL) {
                        j = 0;
+                       p ++;
                        while (g_ascii_isdigit (*p) && j < sizeof (portbuf) - 1) {
                                portbuf[j ++] = *p ++;
                        }
@@ -244,6 +253,51 @@ fuzzy_io_callback (int fd, short what, void *arg)
 
 }
 
+static void
+fuzzy_learn_callback (int fd, short what, void *arg)
+{
+       struct fuzzy_learn_session *session = arg;
+       struct fuzzy_cmd cmd;
+       char buf[sizeof ("ERR")];
+
+       if (what == EV_WRITE) {
+               /* Send command to storage */
+               cmd.blocksize = session->h->block_size;
+               memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash));
+               cmd.cmd = FUZZY_WRITE;
+               if (write (fd, &cmd, sizeof (struct fuzzy_cmd)) == -1) {
+                       goto err;
+               }
+               else {
+                       event_set (&session->ev, fd, EV_READ, fuzzy_io_callback, session);
+                       event_add (&session->ev, &session->tv);
+               }
+       }
+       else if (what == EV_READ) {
+               if (read (fd, buf, sizeof (buf)) == -1) {
+                       goto err;
+               }
+               else if (buf[0] == 'O' && buf[1] == 'K') {
+                       insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, 1, NULL);
+               }
+               goto ok;
+       }
+       
+       return;
+
+       err:
+               msg_err ("fuzzy_io_callback: got error on IO, %d, %s", errno, strerror (errno));
+       ok:
+               event_del (&session->ev);
+               close (fd);
+               session->task->save.saved --;
+               if (session->task->save.saved == 0) {
+                       /* Call other filters */
+                       session->task->save.saved = 1;
+                       process_filters (session->task);
+               }
+}
+
 static void 
 fuzzy_symbol_callback (struct worker_task *task, void *unused)
 {
@@ -282,6 +336,88 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused)
        }
 }
 
+static void
+fuzzy_process_handler (struct controller_session *session, f_str_t *in)
+{
+       struct worker_task *task;
+       struct fuzzy_learn_session *s;
+       struct mime_text_part *part;
+       struct storage_server *selected;
+       GList *cur;
+       int sock, r;
+
+       task = construct_task (session->worker);
+       session->other_data = task;
+       session->state = STATE_WAIT;
+
+       task->msg = in;
+       r = process_message (task);
+       if (r == -1) {
+               msg_warn ("read_socket: processing of message failed");
+               task->last_error = "MIME processing error";
+               task->error_code = RSPAMD_FILTER_ERROR;
+               task->state = WRITE_ERROR;
+       }
+       else {
+               /* Plan new event for writing */
+               cur = task->text_parts;
+
+               while (cur) {
+                       part = cur->data;
+                       selected = (struct storage_server *)get_upstream_by_hash (fuzzy_module_ctx->servers, fuzzy_module_ctx->servers_num,
+                                                                                        sizeof (struct storage_server), task->ts.tv_sec, 
+                                                                                        DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, 
+                                                                                        DEFAULT_UPSTREAM_MAXERRORS,
+                                                                                        part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
+                       if (selected) {
+                               if ((sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
+                                       msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
+                               }       
+                               else {
+                                       s = memory_pool_alloc (session->session_pool, sizeof (struct fuzzy_learn_session));
+                                       event_set (&s->ev, sock, EV_WRITE, fuzzy_learn_callback, s);
+                                       s->tv.tv_sec = IO_TIMEOUT;
+                                       s->tv.tv_usec = 0;
+                                       s->task = task;
+                                       s->h = part->fuzzy;
+                                       s->session = session;
+                                       event_add (&s->ev, &s->tv);
+                               }
+                       }
+                       cur = g_list_next (cur);
+               }
+       }
+
+}
+
+static void
+fuzzy_controller_handler (char **args, struct controller_session *session)
+{
+       char *arg, out_buf[BUFSIZ], *err_str;
+       uint32_t size;
+       int r;
+
+       arg = *args;
+       if (!arg || *arg == '\0') {
+               msg_info ("fuzzy_controller_handler: empty content length");
+               r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF);
+               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+               return;
+       }
+
+       size = strtoul (arg, &err_str, 10);
+       if (err_str && *err_str != '\0') {
+               msg_debug ("process_command: statfile size is invalid: %s", arg);
+               r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
+               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+               return;
+       }
+
+       session->state = STATE_OTHER;
+       rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size);
+       session->other_handler = fuzzy_process_handler;
+}
+
 static int 
 fuzzy_mime_filter (struct worker_task *task)
 {
index 726f7fe001f3fa8d300d4154bc84589654eb7651..4832c71fcc794910b0348e144c0ce01922462757 100644 (file)
@@ -4,11 +4,11 @@
  *
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions are met:
- *     * Redistributions of source code must retain the above copyright
- *       notice, this list of conditions and the following disclaimer.
- *     * Redistributions in binary form must reproduce the above copyright
- *       notice, this list of conditions and the following disclaimer in the
- *       documentation and/or other materials provided with the distribution.
+ *      * Redistributions of source code must retain the above copyright
+ *        notice, this list of conditions and the following disclaimer.
+ *      * Redistributions in binary form must reproduce the above copyright
+ *        notice, this list of conditions and the following disclaimer in the
+ *        documentation and/or other materials provided with the distribution.
  *
  * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
@@ -40,8 +40,8 @@
 #include <evdns.h>
 
 #ifndef WITHOUT_PERL
-#include <EXTERN.h>               /* from the Perl distribution     */
-#include <perl.h>                 /* from the Perl distribution     */
+#include <EXTERN.h>                       /* from the Perl distribution         */
+#include <perl.h>                               /* from the Perl distribution   */
 
 extern PerlInterpreter *perl_interpreter;
 #endif
@@ -111,7 +111,7 @@ rcpt_destruct (void *pointer)
 /*
  * Free all structures of worker_task
  */
-static void
+void
 free_task (struct worker_task *task, gboolean is_soft)
 {
        GList *part;
@@ -135,14 +135,18 @@ free_task (struct worker_task *task, gboolean is_soft)
                        g_list_free (task->urls);
                }
                memory_pool_delete (task->task_pool);
-               if (is_soft) {
-                       /* Plan dispatcher shutdown */
-                       task->dispatcher->wanna_die = 1;
+               if (task->dispatcher) {
+                       if (is_soft) {
+                               /* Plan dispatcher shutdown */
+                               task->dispatcher->wanna_die = 1;
+                       }
+                       else {
+                               rspamd_remove_dispatcher (task->dispatcher);
+                       }
                }
-               else {
-                       rspamd_remove_dispatcher (task->dispatcher);
+               if (task->sock != -1) {
+                       close (task->sock);
                }
-               close (task->sock);
                g_free (task);
        }
 }
@@ -174,13 +178,13 @@ read_socket (f_str_t *in, void *arg)
                        task->msg->len = in->len;
                        msg_debug ("read_socket: got string of length %ld", (long int)task->msg->len);
                        r = process_message (task);
-            if (r == -1) {
-                msg_warn ("read_socket: processing of message failed");
+                       if (r == -1) {
+                               msg_warn ("read_socket: processing of message failed");
                                task->last_error = "MIME processing error";
                                task->error_code = RSPAMD_FILTER_ERROR;
                                task->state = WRITE_ERROR;
                                write_socket (task);
-            }
+                       }
                        if (task->cmd == CMD_OTHER) {
                                /* Skip filters */
                                task->state = WRITE_REPLY;
@@ -249,36 +253,10 @@ err_socket (GError *err, void *arg)
        free_task (task, FALSE);
 }
 
-/*
- * Accept new connection and construct task
- */
-static void
-accept_socket (int fd, short what, void *arg)
+struct worker_task *
+construct_task (struct rspamd_worker *worker)
 {
-       struct rspamd_worker *worker = (struct rspamd_worker *)arg;
-       struct sockaddr_storage ss;
-    struct sockaddr_in *sin;
        struct worker_task *new_task;
-       socklen_t addrlen = sizeof(ss);
-       int nfd;
-       
-       if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
-               msg_warn ("accept_socket: accept failed: %s", strerror (errno));
-               return;
-       }
-       /* Check for EAGAIN */
-       if (nfd == 0) {
-               msg_debug ("accept_socket: cannot accept socket as it was already accepted by other worker");
-               return;
-       }
-
-    if (ss.ss_family == AF_UNIX) {
-        msg_info ("accept_socket: accepted connection from unix socket");
-    }
-    else if (ss.ss_family == AF_INET) {
-        sin = (struct sockaddr_in *) &ss;
-        msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
-    }
        
        new_task = g_malloc (sizeof (struct worker_task));
 
@@ -286,7 +264,6 @@ accept_socket (int fd, short what, void *arg)
        bzero (new_task, sizeof (struct worker_task));
        new_task->worker = worker;
        new_task->state = READ_COMMAND;
-       new_task->sock = nfd;
        new_task->cfg = worker->srv->cfg;
        new_task->from_addr.s_addr = INADDR_NONE;
        new_task->view_checked = FALSE;
@@ -307,12 +284,50 @@ accept_socket (int fd, short what, void *arg)
        new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal);
        memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->re_cache);
 
+       return new_task;
+}
+
+/*
+ * Accept new connection and construct task
+ */
+static void
+accept_socket (int fd, short what, void *arg)
+{
+       struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+       struct sockaddr_storage ss;
+       struct sockaddr_in *sin;
+       struct worker_task *new_task;
+       socklen_t addrlen = sizeof(ss);
+       int nfd;
+       
+       if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
+               msg_warn ("accept_socket: accept failed: %s", strerror (errno));
+               return;
+       }
+       /* Check for EAGAIN */
+       if (nfd == 0) {
+               msg_debug ("accept_socket: cannot accept socket as it was already accepted by other worker");
+               return;
+       }
+
+       if (ss.ss_family == AF_UNIX) {
+               msg_info ("accept_socket: accepted connection from unix socket");
+       }
+       else if (ss.ss_family == AF_INET) {
+               sin = (struct sockaddr_in *) &ss;
+               msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
+       }
+
+       new_task = construct_task (worker);
+
+       new_task->sock = nfd;   
        worker->srv->stat->connections_count ++;
 
        /* Set up dispatcher */
        new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket,
                                                                                                                write_socket, err_socket, &io_tv,
                                                                                                                (void *)new_task);
+
 }
 
 /*