diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-08-25 15:13:24 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-08-25 15:13:24 +0400 |
commit | ab8f97db48ceefef5fdf47ea8a5954da635ffa31 (patch) | |
tree | 41fcd689903bbc215f168d0db7ff3ca697197da4 /src | |
parent | 034d3a91d5c60328bd99fc279f2a0ffa89d258d1 (diff) | |
download | rspamd-ab8f97db48ceefef5fdf47ea8a5954da635ffa31.tar.gz rspamd-ab8f97db48ceefef5fdf47ea8a5954da635ffa31.zip |
* Migrate fuzzy storage to use UDP instead of TCP
Diffstat (limited to 'src')
-rw-r--r-- | src/fuzzy_storage.c | 97 | ||||
-rw-r--r-- | src/fuzzy_storage.h | 4 | ||||
-rw-r--r-- | src/main.c | 10 | ||||
-rw-r--r-- | src/plugins/fuzzy_check.c | 4 | ||||
-rw-r--r-- | src/util.c | 18 | ||||
-rw-r--r-- | src/util.h | 2 |
6 files changed, 53 insertions, 82 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 53f3d9b16..e9cbfa64d 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -215,16 +215,6 @@ read_hashes_file (struct rspamd_worker *wrk) return TRUE; } -static void -free_session (struct fuzzy_session *session) -{ - /* Delete IO event */ - event_del (&session->ev); - /* Close socket */ - close (session->fd); - g_free (session); -} - static gboolean process_check_command (struct fuzzy_cmd *cmd) { @@ -315,12 +305,12 @@ process_delete_command (struct fuzzy_cmd *cmd) #define CMD_PROCESS(x) \ do { \ if (process_##x##_command (&session->cmd)) { \ - if (write (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1) == -1) { \ + if (sendto (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1, 0, &session->sa, session->salen) == -1) { \ msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno)); \ } \ } \ else { \ - if (write (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1) == -1) { \ + if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, &session->sa, session->salen) == -1) { \ msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno)); \ } \ } \ @@ -340,7 +330,7 @@ process_fuzzy_command (struct fuzzy_session *session) CMD_PROCESS(delete); break; default: - if (write (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1) == -1) { + if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, &session->sa, session->salen) == -1) { msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno)); } break; @@ -349,77 +339,38 @@ process_fuzzy_command (struct fuzzy_session *session) #undef CMD_PROCESS -/* Callback for network IO */ + +/* + * Accept new connection and construct task + */ static void -fuzzy_io_callback (int fd, short what, void *arg) +accept_fuzzy_socket (int fd, short what, void *arg) { - struct fuzzy_session *session = arg; + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + struct fuzzy_session session; ssize_t r; + + session.worker = worker; + session.fd = fd; + session.pos = (u_char *)&session.cmd; + /* Got some data */ if (what == EV_READ) { - if ((r = read (fd, session->pos, (u_char *)&session->cmd + sizeof (struct fuzzy_cmd) - session->pos)) == -1) { + if ((r = recvfrom (fd, session.pos, sizeof (struct fuzzy_cmd), MSG_WAITALL, &session.sa, &session.salen)) == -1) { msg_err ("fuzzy_io_callback: got error while reading from socket: %d, %s", errno, strerror (errno)); - free_session (session); + return; } - else if (session->pos + r == (u_char *)&session->cmd + sizeof (struct fuzzy_cmd)) { + else if (r == sizeof (struct fuzzy_cmd)) { /* Assume that the whole command was read */ - process_fuzzy_command (session); - free_session (session); + process_fuzzy_command (&session); } else { - session->pos += r; + msg_err ("fuzzy_io_callback: got incomplete data while reading from socket: %d, %s", errno, strerror (errno)); + return; } } - else { - free_session (session); - } -} - - -/* - * Accept new connection and construct task - */ -static void -accept_fuzzy_socket (int fd, short what, void *arg) -{ - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - struct sockaddr_storage ss; - struct sockaddr_in *sin; - struct fuzzy_session *session; - socklen_t addrlen = sizeof(ss); - int nfd; - - if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { - msg_warn ("accept_fuzzy_socket: accept failed: %s", strerror (errno)); - return; - } - /* Check for EAGAIN */ - if (nfd == 0) { - msg_debug ("accept_fuzzy_socket: cannot accept socket as it was already accepted by other worker"); - return; - } - - if (ss.ss_family == AF_UNIX) { - msg_info ("accept_fuzzy_socket: accepted connection from unix socket"); - } - else if (ss.ss_family == AF_INET) { - sin = (struct sockaddr_in *) &ss; - msg_info ("accept_fuzzy_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port)); - } - - session = g_malloc (sizeof (struct fuzzy_session)); - - session->worker = worker; - session->fd = nfd; - session->tv.tv_sec = WORKER_IO_TIMEOUT; - session->tv.tv_usec = 0; - session->pos = (u_char *)&session->cmd; - - event_set (&session->ev, session->fd, EV_READ | EV_PERSIST, fuzzy_io_callback, session); - event_add (&session->ev, &session->tv); - -} +} static void sync_callback (int fd, short what, void *arg) @@ -475,6 +426,10 @@ start_fuzzy_storage (struct rspamd_worker *worker) evtimer_add (&tev, &tmv); /* Accept event */ + if ((worker->cf->listen_sock = make_udp_socket (&worker->cf->bind_addr, worker->cf->bind_port, TRUE, TRUE)) == -1) { + msg_err ("start_fuzzy_storage: cannot bind to socket, exiting"); + exit (0); + } event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_fuzzy_socket, (void *)worker); event_add(&worker->bind_ev, NULL); diff --git a/src/fuzzy_storage.h b/src/fuzzy_storage.h index 533ecaf13..b03acfacb 100644 --- a/src/fuzzy_storage.h +++ b/src/fuzzy_storage.h @@ -18,11 +18,11 @@ struct fuzzy_cmd { struct fuzzy_session { struct rspamd_worker *worker; - struct event ev; struct fuzzy_cmd cmd; - struct timeval tv; int fd; u_char *pos; + int salen; + struct sockaddr sa; }; void start_fuzzy_storage (struct rspamd_worker *worker); diff --git a/src/main.c b/src/main.c index 4353174f8..233b93eeb 100644 --- a/src/main.c +++ b/src/main.c @@ -425,12 +425,14 @@ spawn_workers (struct rspamd_main *rspamd) cf = cur->data; /* Create listen socket */ - listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port, + if (cf->type != TYPE_FUZZY) { + listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port, cf->bind_family, cf->bind_host); - if (listen_sock == -1) { - exit(-errno); + if (listen_sock == -1) { + exit(-errno); + } + cf->listen_sock = listen_sock; } - cf->listen_sock = listen_sock; for (i = 0; i < cf->count; i++) { fork_worker (rspamd, cf); diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index da3e651a8..cd3641654 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -339,7 +339,7 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused) DEFAULT_UPSTREAM_MAXERRORS, part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe)); if (selected) { - if ((sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) { + if ((sock = make_udp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) { msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno)); } else { @@ -404,7 +404,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in) DEFAULT_UPSTREAM_MAXERRORS, part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe)); if (selected) { - if ((sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) { + if ((sock = make_udp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) { msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno)); r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF); rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); diff --git a/src/util.c b/src/util.c index 5fe99d051..bbe7ff666 100644 --- a/src/util.c +++ b/src/util.c @@ -50,15 +50,15 @@ make_socket_nonblocking (int fd) return 0; } -int -make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolean async) +static int +make_inet_socket (int family, struct in_addr *addr, u_short port, gboolean is_server, gboolean async) { int fd, r, optlen, on = 1, s_error; int serrno; struct sockaddr_in sin; /* Create socket */ - fd = socket (AF_INET, SOCK_STREAM, 0); + fd = socket (AF_INET, family, 0); if (fd == -1) { msg_warn ("make_tcp_socket: socket failed: %d, '%s'", errno, strerror (errno)); return -1; @@ -114,6 +114,18 @@ make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolea } int +make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolean async) +{ + return make_inet_socket (SOCK_STREAM, addr, port, is_server, async); +} + +int +make_udp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolean async) +{ + return make_inet_socket (SOCK_DGRAM, addr, port, is_server, async); +} + +int accept_from_socket (int listen_sock, struct sockaddr *addr, socklen_t *len) { int nfd; diff --git a/src/util.h b/src/util.h index 139a44262..d871288ca 100644 --- a/src/util.h +++ b/src/util.h @@ -11,6 +11,8 @@ struct workq; /* Create socket and bind or connect it to specified address and port */ int make_tcp_socket (struct in_addr *, u_short, gboolean is_server, gboolean async); +/* Create socket and bind or connect it to specified address and port */ +int make_udp_socket (struct in_addr *, u_short, gboolean is_server, gboolean async); /* Accept from socket */ int accept_from_socket (int listen_sock, struct sockaddr *addr, socklen_t *len); /* Create and bind or connect unix socket */ |