aboutsummaryrefslogtreecommitdiffstats
path: root/src/fuzzy_storage.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-08-25 15:13:24 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-08-25 15:13:24 +0400
commitab8f97db48ceefef5fdf47ea8a5954da635ffa31 (patch)
tree41fcd689903bbc215f168d0db7ff3ca697197da4 /src/fuzzy_storage.c
parent034d3a91d5c60328bd99fc279f2a0ffa89d258d1 (diff)
downloadrspamd-ab8f97db48ceefef5fdf47ea8a5954da635ffa31.tar.gz
rspamd-ab8f97db48ceefef5fdf47ea8a5954da635ffa31.zip
* Migrate fuzzy storage to use UDP instead of TCP
Diffstat (limited to 'src/fuzzy_storage.c')
-rw-r--r--src/fuzzy_storage.c97
1 files changed, 26 insertions, 71 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index 53f3d9b16..e9cbfa64d 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -215,16 +215,6 @@ read_hashes_file (struct rspamd_worker *wrk)
return TRUE;
}
-static void
-free_session (struct fuzzy_session *session)
-{
- /* Delete IO event */
- event_del (&session->ev);
- /* Close socket */
- close (session->fd);
- g_free (session);
-}
-
static gboolean
process_check_command (struct fuzzy_cmd *cmd)
{
@@ -315,12 +305,12 @@ process_delete_command (struct fuzzy_cmd *cmd)
#define CMD_PROCESS(x) \
do { \
if (process_##x##_command (&session->cmd)) { \
- if (write (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1) == -1) { \
+ if (sendto (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1, 0, &session->sa, session->salen) == -1) { \
msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno)); \
} \
} \
else { \
- if (write (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1) == -1) { \
+ if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, &session->sa, session->salen) == -1) { \
msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno)); \
} \
} \
@@ -340,7 +330,7 @@ process_fuzzy_command (struct fuzzy_session *session)
CMD_PROCESS(delete);
break;
default:
- if (write (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1) == -1) {
+ if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, &session->sa, session->salen) == -1) {
msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno));
}
break;
@@ -349,77 +339,38 @@ process_fuzzy_command (struct fuzzy_session *session)
#undef CMD_PROCESS
-/* Callback for network IO */
+
+/*
+ * Accept new connection and construct task
+ */
static void
-fuzzy_io_callback (int fd, short what, void *arg)
+accept_fuzzy_socket (int fd, short what, void *arg)
{
- struct fuzzy_session *session = arg;
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct fuzzy_session session;
ssize_t r;
+
+ session.worker = worker;
+ session.fd = fd;
+ session.pos = (u_char *)&session.cmd;
+
/* Got some data */
if (what == EV_READ) {
- if ((r = read (fd, session->pos, (u_char *)&session->cmd + sizeof (struct fuzzy_cmd) - session->pos)) == -1) {
+ if ((r = recvfrom (fd, session.pos, sizeof (struct fuzzy_cmd), MSG_WAITALL, &session.sa, &session.salen)) == -1) {
msg_err ("fuzzy_io_callback: got error while reading from socket: %d, %s", errno, strerror (errno));
- free_session (session);
+ return;
}
- else if (session->pos + r == (u_char *)&session->cmd + sizeof (struct fuzzy_cmd)) {
+ else if (r == sizeof (struct fuzzy_cmd)) {
/* Assume that the whole command was read */
- process_fuzzy_command (session);
- free_session (session);
+ process_fuzzy_command (&session);
}
else {
- session->pos += r;
+ msg_err ("fuzzy_io_callback: got incomplete data while reading from socket: %d, %s", errno, strerror (errno));
+ return;
}
}
- else {
- free_session (session);
- }
-}
-
-
-/*
- * Accept new connection and construct task
- */
-static void
-accept_fuzzy_socket (int fd, short what, void *arg)
-{
- struct rspamd_worker *worker = (struct rspamd_worker *)arg;
- struct sockaddr_storage ss;
- struct sockaddr_in *sin;
- struct fuzzy_session *session;
- socklen_t addrlen = sizeof(ss);
- int nfd;
-
- if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
- msg_warn ("accept_fuzzy_socket: accept failed: %s", strerror (errno));
- return;
- }
- /* Check for EAGAIN */
- if (nfd == 0) {
- msg_debug ("accept_fuzzy_socket: cannot accept socket as it was already accepted by other worker");
- return;
- }
-
- if (ss.ss_family == AF_UNIX) {
- msg_info ("accept_fuzzy_socket: accepted connection from unix socket");
- }
- else if (ss.ss_family == AF_INET) {
- sin = (struct sockaddr_in *) &ss;
- msg_info ("accept_fuzzy_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
- }
-
- session = g_malloc (sizeof (struct fuzzy_session));
-
- session->worker = worker;
- session->fd = nfd;
- session->tv.tv_sec = WORKER_IO_TIMEOUT;
- session->tv.tv_usec = 0;
- session->pos = (u_char *)&session->cmd;
-
- event_set (&session->ev, session->fd, EV_READ | EV_PERSIST, fuzzy_io_callback, session);
- event_add (&session->ev, &session->tv);
-
-}
+}
static void
sync_callback (int fd, short what, void *arg)
@@ -475,6 +426,10 @@ start_fuzzy_storage (struct rspamd_worker *worker)
evtimer_add (&tev, &tmv);
/* Accept event */
+ if ((worker->cf->listen_sock = make_udp_socket (&worker->cf->bind_addr, worker->cf->bind_port, TRUE, TRUE)) == -1) {
+ msg_err ("start_fuzzy_storage: cannot bind to socket, exiting");
+ exit (0);
+ }
event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_fuzzy_socket, (void *)worker);
event_add(&worker->bind_ev, NULL);