]> source.dussan.org Git - rspamd.git/commitdiff
* Migrate fuzzy storage to use UDP instead of TCP
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 25 Aug 2009 11:13:24 +0000 (15:13 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 25 Aug 2009 11:13:24 +0000 (15:13 +0400)
src/fuzzy_storage.c
src/fuzzy_storage.h
src/main.c
src/plugins/fuzzy_check.c
src/util.c
src/util.h

index 53f3d9b16c3b5a746e1f7b2dbbb3e4184e4e335e..e9cbfa64d9e40f251bf0034e133fdfe8aa9140dc 100644 (file)
@@ -215,16 +215,6 @@ read_hashes_file (struct rspamd_worker *wrk)
        return TRUE;
 }
 
-static void
-free_session (struct fuzzy_session *session)
-{
-       /* Delete IO event */
-       event_del (&session->ev);
-       /* Close socket */
-       close (session->fd);
-       g_free (session);
-}
-
 static gboolean
 process_check_command (struct fuzzy_cmd *cmd)
 {
@@ -315,12 +305,12 @@ process_delete_command (struct fuzzy_cmd *cmd)
 #define CMD_PROCESS(x)                                                                                                                                                 \
 do {                                                                                                                                                                                   \
 if (process_##x##_command (&session->cmd)) {                                                                                                   \
-       if (write (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1) == -1) {                                                     \
+       if (sendto (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1, 0, &session->sa, session->salen) == -1) {                                                   \
                msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno));             \
        }                                                                                                                                                                                       \
 }                                                                                                                                                                                              \
 else {                                                                                                                                                                                 \
-       if (write (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1) == -1) {                                           \
+       if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, &session->sa, session->salen) == -1) {                                         \
                msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno));             \
        }                                                                                                                                                                                       \
 }                                                                                                                                                                                              \
@@ -340,7 +330,7 @@ process_fuzzy_command (struct fuzzy_session *session)
                        CMD_PROCESS(delete);
                        break;  
                default:
-                       if (write (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1) == -1) {
+                       if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, &session->sa, session->salen) == -1) {
                                msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno));
                        }
                        break;
@@ -349,77 +339,38 @@ process_fuzzy_command (struct fuzzy_session *session)
 
 #undef CMD_PROCESS
 
-/* Callback for network IO */
+
+/*
+ * Accept new connection and construct task
+ */
 static void
-fuzzy_io_callback (int fd, short what, void *arg)
+accept_fuzzy_socket (int fd, short what, void *arg)
 {
-       struct fuzzy_session *session = arg;
+       struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+       struct fuzzy_session session;
        ssize_t r;
        
+
+       session.worker = worker;
+       session.fd = fd;
+       session.pos = (u_char *)&session.cmd;
+
        /* Got some data */
        if (what == EV_READ) {
-               if ((r = read (fd, session->pos, (u_char *)&session->cmd + sizeof (struct fuzzy_cmd) - session->pos)) == -1) {
+               if ((r = recvfrom (fd, session.pos, sizeof (struct fuzzy_cmd), MSG_WAITALL, &session.sa, &session.salen)) == -1) {
                        msg_err ("fuzzy_io_callback: got error while reading from socket: %d, %s", errno, strerror (errno));
-                       free_session (session);
+                       return;
                }
-               else if (session->pos + r == (u_char *)&session->cmd + sizeof (struct fuzzy_cmd)) {
+               else if (r == sizeof (struct fuzzy_cmd)) {
                        /* Assume that the whole command was read */
-                       process_fuzzy_command (session);
-                       free_session (session);
+                       process_fuzzy_command (&session);
                }
                else {
-                       session->pos += r;
+                       msg_err ("fuzzy_io_callback: got incomplete data while reading from socket: %d, %s", errno, strerror (errno));
+                       return;
                }
        }
-       else {
-               free_session (session);
-       }
-}
-
-
-/*
- * Accept new connection and construct task
- */
-static void
-accept_fuzzy_socket (int fd, short what, void *arg)
-{
-       struct rspamd_worker *worker = (struct rspamd_worker *)arg;
-       struct sockaddr_storage ss;
-    struct sockaddr_in *sin;
-       struct fuzzy_session *session;
-       socklen_t addrlen = sizeof(ss);
-       int nfd;
-       
-       if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
-               msg_warn ("accept_fuzzy_socket: accept failed: %s", strerror (errno));
-               return;
-       }
-       /* Check for EAGAIN */
-       if (nfd == 0) {
-               msg_debug ("accept_fuzzy_socket: cannot accept socket as it was already accepted by other worker");
-               return;
-       }
-
-    if (ss.ss_family == AF_UNIX) {
-        msg_info ("accept_fuzzy_socket: accepted connection from unix socket");
-    }
-    else if (ss.ss_family == AF_INET) {
-        sin = (struct sockaddr_in *) &ss;
-        msg_info ("accept_fuzzy_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
-    }
-       
-       session = g_malloc (sizeof (struct fuzzy_session));
-
-       session->worker = worker;
-       session->fd = nfd;
-       session->tv.tv_sec = WORKER_IO_TIMEOUT;
-       session->tv.tv_usec = 0;
-       session->pos = (u_char *)&session->cmd;
-
-       event_set (&session->ev, session->fd, EV_READ | EV_PERSIST, fuzzy_io_callback, session);
-       event_add (&session->ev, &session->tv);
-
-}
+}      
 
 static void
 sync_callback (int fd, short what, void *arg)
@@ -475,6 +426,10 @@ start_fuzzy_storage (struct rspamd_worker *worker)
        evtimer_add (&tev, &tmv);
 
        /* Accept event */
+       if ((worker->cf->listen_sock = make_udp_socket (&worker->cf->bind_addr, worker->cf->bind_port, TRUE, TRUE)) == -1) {
+               msg_err ("start_fuzzy_storage: cannot bind to socket, exiting");
+               exit (0);
+       }
        event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_fuzzy_socket, (void *)worker);
        event_add(&worker->bind_ev, NULL);
 
index 533ecaf1391821f8afec02010b311e725c5b4fd1..b03acfacbd7c2f49c74af570f8938066049a5b65 100644 (file)
@@ -18,11 +18,11 @@ struct fuzzy_cmd {
 
 struct fuzzy_session {
        struct rspamd_worker *worker;
-       struct event ev;
        struct fuzzy_cmd cmd;
-       struct timeval tv;
        int fd;
        u_char *pos;
+       int salen;
+       struct sockaddr sa;
 };
 
 void start_fuzzy_storage (struct rspamd_worker *worker);
index 4353174f86a7ab835d9de42d04300b20209ee244..233b93eeb163b483c209b0360d25f7ec1fc25a41 100644 (file)
@@ -425,12 +425,14 @@ spawn_workers (struct rspamd_main *rspamd)
                cf = cur->data;
 
                /* Create listen socket */
-               listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port, 
+               if (cf->type != TYPE_FUZZY) {
+                       listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port, 
                                                                                        cf->bind_family, cf->bind_host);
-               if (listen_sock == -1) {
-                       exit(-errno);
+                       if (listen_sock == -1) {
+                               exit(-errno);
+                       }
+                       cf->listen_sock = listen_sock;
                }
-               cf->listen_sock = listen_sock;
 
                for (i = 0; i < cf->count; i++) {
                        fork_worker (rspamd, cf);
index da3e651a8b94716baed88accdbba2a303828bdc2..cd3641654a922220e9a608752a0bb2e8f9c2b653 100644 (file)
@@ -339,7 +339,7 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused)
                                                                                 DEFAULT_UPSTREAM_MAXERRORS,
                                                                                 part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
                if (selected) {
-                       if ((sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
+                       if ((sock = make_udp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
                                msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
                        }       
                        else {
@@ -404,7 +404,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in)
                                                                                         DEFAULT_UPSTREAM_MAXERRORS,
                                                                                         part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
                        if (selected) {
-                               if ((sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
+                               if ((sock = make_udp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
                                        msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
                                        r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF);
                                        rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
index 5fe99d051ba0f3e6063e18dfe37c8f9fcd0bb463..bbe7ff666c05b0db85c0ca265a6be3e639f3bdc9 100644 (file)
@@ -50,15 +50,15 @@ make_socket_nonblocking (int fd)
        return 0;
 }
 
-int
-make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolean async)
+static int
+make_inet_socket (int family, struct in_addr *addr, u_short port, gboolean is_server, gboolean async)
 {
        int fd, r, optlen, on = 1, s_error;
        int serrno;
        struct sockaddr_in sin;
        
        /* Create socket */
-       fd = socket (AF_INET, SOCK_STREAM, 0);
+       fd = socket (AF_INET, family, 0);
        if (fd == -1) {
                msg_warn ("make_tcp_socket: socket failed: %d, '%s'", errno, strerror (errno));
                return -1;
@@ -113,6 +113,18 @@ make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolea
        return (-1);
 }
 
+int
+make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolean async)
+{
+       return make_inet_socket (SOCK_STREAM, addr, port, is_server, async);    
+}
+
+int
+make_udp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolean async)
+{
+       return make_inet_socket (SOCK_DGRAM, addr, port, is_server, async);     
+}
+
 int
 accept_from_socket (int listen_sock, struct sockaddr *addr, socklen_t *len)
 {
index 139a44262205ebf9204d7527b402801b21a7044e..d871288cab712cd32ae056776dcc941f30b653b7 100644 (file)
@@ -11,6 +11,8 @@ struct workq;
 
 /* Create socket and bind or connect it to specified address and port */
 int make_tcp_socket (struct in_addr *, u_short, gboolean is_server, gboolean async);
+/* Create socket and bind or connect it to specified address and port */
+int make_udp_socket (struct in_addr *, u_short, gboolean is_server, gboolean async);
 /* Accept from socket */
 int accept_from_socket (int listen_sock, struct sockaddr *addr, socklen_t *len);
 /* Create and bind or connect unix socket */