diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-08-25 15:13:24 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-08-25 15:13:24 +0400 |
commit | ab8f97db48ceefef5fdf47ea8a5954da635ffa31 (patch) | |
tree | 41fcd689903bbc215f168d0db7ff3ca697197da4 /src/fuzzy_storage.c | |
parent | 034d3a91d5c60328bd99fc279f2a0ffa89d258d1 (diff) | |
download | rspamd-ab8f97db48ceefef5fdf47ea8a5954da635ffa31.tar.gz rspamd-ab8f97db48ceefef5fdf47ea8a5954da635ffa31.zip |
* Migrate fuzzy storage to use UDP instead of TCP
Diffstat (limited to 'src/fuzzy_storage.c')
-rw-r--r-- | src/fuzzy_storage.c | 97 |
1 files changed, 26 insertions, 71 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 53f3d9b16..e9cbfa64d 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -215,16 +215,6 @@ read_hashes_file (struct rspamd_worker *wrk) return TRUE; } -static void -free_session (struct fuzzy_session *session) -{ - /* Delete IO event */ - event_del (&session->ev); - /* Close socket */ - close (session->fd); - g_free (session); -} - static gboolean process_check_command (struct fuzzy_cmd *cmd) { @@ -315,12 +305,12 @@ process_delete_command (struct fuzzy_cmd *cmd) #define CMD_PROCESS(x) \ do { \ if (process_##x##_command (&session->cmd)) { \ - if (write (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1) == -1) { \ + if (sendto (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1, 0, &session->sa, session->salen) == -1) { \ msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno)); \ } \ } \ else { \ - if (write (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1) == -1) { \ + if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, &session->sa, session->salen) == -1) { \ msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno)); \ } \ } \ @@ -340,7 +330,7 @@ process_fuzzy_command (struct fuzzy_session *session) CMD_PROCESS(delete); break; default: - if (write (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1) == -1) { + if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, &session->sa, session->salen) == -1) { msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno)); } break; @@ -349,77 +339,38 @@ process_fuzzy_command (struct fuzzy_session *session) #undef CMD_PROCESS -/* Callback for network IO */ + +/* + * Accept new connection and construct task + */ static void -fuzzy_io_callback (int fd, short what, void *arg) +accept_fuzzy_socket (int fd, short what, void *arg) { - struct fuzzy_session *session = arg; + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + struct fuzzy_session session; ssize_t r; + + session.worker = worker; + session.fd = fd; + session.pos = (u_char *)&session.cmd; + /* Got some data */ if (what == EV_READ) { - if ((r = read (fd, session->pos, (u_char *)&session->cmd + sizeof (struct fuzzy_cmd) - session->pos)) == -1) { + if ((r = recvfrom (fd, session.pos, sizeof (struct fuzzy_cmd), MSG_WAITALL, &session.sa, &session.salen)) == -1) { msg_err ("fuzzy_io_callback: got error while reading from socket: %d, %s", errno, strerror (errno)); - free_session (session); + return; } - else if (session->pos + r == (u_char *)&session->cmd + sizeof (struct fuzzy_cmd)) { + else if (r == sizeof (struct fuzzy_cmd)) { /* Assume that the whole command was read */ - process_fuzzy_command (session); - free_session (session); + process_fuzzy_command (&session); } else { - session->pos += r; + msg_err ("fuzzy_io_callback: got incomplete data while reading from socket: %d, %s", errno, strerror (errno)); + return; } } - else { - free_session (session); - } -} - - -/* - * Accept new connection and construct task - */ -static void -accept_fuzzy_socket (int fd, short what, void *arg) -{ - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - struct sockaddr_storage ss; - struct sockaddr_in *sin; - struct fuzzy_session *session; - socklen_t addrlen = sizeof(ss); - int nfd; - - if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { - msg_warn ("accept_fuzzy_socket: accept failed: %s", strerror (errno)); - return; - } - /* Check for EAGAIN */ - if (nfd == 0) { - msg_debug ("accept_fuzzy_socket: cannot accept socket as it was already accepted by other worker"); - return; - } - - if (ss.ss_family == AF_UNIX) { - msg_info ("accept_fuzzy_socket: accepted connection from unix socket"); - } - else if (ss.ss_family == AF_INET) { - sin = (struct sockaddr_in *) &ss; - msg_info ("accept_fuzzy_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port)); - } - - session = g_malloc (sizeof (struct fuzzy_session)); - - session->worker = worker; - session->fd = nfd; - session->tv.tv_sec = WORKER_IO_TIMEOUT; - session->tv.tv_usec = 0; - session->pos = (u_char *)&session->cmd; - - event_set (&session->ev, session->fd, EV_READ | EV_PERSIST, fuzzy_io_callback, session); - event_add (&session->ev, &session->tv); - -} +} static void sync_callback (int fd, short what, void *arg) @@ -475,6 +426,10 @@ start_fuzzy_storage (struct rspamd_worker *worker) evtimer_add (&tev, &tmv); /* Accept event */ + if ((worker->cf->listen_sock = make_udp_socket (&worker->cf->bind_addr, worker->cf->bind_port, TRUE, TRUE)) == -1) { + msg_err ("start_fuzzy_storage: cannot bind to socket, exiting"); + exit (0); + } event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_fuzzy_socket, (void *)worker); event_add(&worker->bind_ev, NULL); |