aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/fuzzy_storage.c97
-rw-r--r--src/fuzzy_storage.h4
-rw-r--r--src/main.c10
-rw-r--r--src/plugins/fuzzy_check.c4
-rw-r--r--src/util.c18
-rw-r--r--src/util.h2
6 files changed, 53 insertions, 82 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index 53f3d9b16..e9cbfa64d 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -215,16 +215,6 @@ read_hashes_file (struct rspamd_worker *wrk)
return TRUE;
}
-static void
-free_session (struct fuzzy_session *session)
-{
- /* Delete IO event */
- event_del (&session->ev);
- /* Close socket */
- close (session->fd);
- g_free (session);
-}
-
static gboolean
process_check_command (struct fuzzy_cmd *cmd)
{
@@ -315,12 +305,12 @@ process_delete_command (struct fuzzy_cmd *cmd)
#define CMD_PROCESS(x) \
do { \
if (process_##x##_command (&session->cmd)) { \
- if (write (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1) == -1) { \
+ if (sendto (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1, 0, &session->sa, session->salen) == -1) { \
msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno)); \
} \
} \
else { \
- if (write (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1) == -1) { \
+ if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, &session->sa, session->salen) == -1) { \
msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno)); \
} \
} \
@@ -340,7 +330,7 @@ process_fuzzy_command (struct fuzzy_session *session)
CMD_PROCESS(delete);
break;
default:
- if (write (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1) == -1) {
+ if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, &session->sa, session->salen) == -1) {
msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno));
}
break;
@@ -349,77 +339,38 @@ process_fuzzy_command (struct fuzzy_session *session)
#undef CMD_PROCESS
-/* Callback for network IO */
+
+/*
+ * Accept new connection and construct task
+ */
static void
-fuzzy_io_callback (int fd, short what, void *arg)
+accept_fuzzy_socket (int fd, short what, void *arg)
{
- struct fuzzy_session *session = arg;
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct fuzzy_session session;
ssize_t r;
+
+ session.worker = worker;
+ session.fd = fd;
+ session.pos = (u_char *)&session.cmd;
+
/* Got some data */
if (what == EV_READ) {
- if ((r = read (fd, session->pos, (u_char *)&session->cmd + sizeof (struct fuzzy_cmd) - session->pos)) == -1) {
+ if ((r = recvfrom (fd, session.pos, sizeof (struct fuzzy_cmd), MSG_WAITALL, &session.sa, &session.salen)) == -1) {
msg_err ("fuzzy_io_callback: got error while reading from socket: %d, %s", errno, strerror (errno));
- free_session (session);
+ return;
}
- else if (session->pos + r == (u_char *)&session->cmd + sizeof (struct fuzzy_cmd)) {
+ else if (r == sizeof (struct fuzzy_cmd)) {
/* Assume that the whole command was read */
- process_fuzzy_command (session);
- free_session (session);
+ process_fuzzy_command (&session);
}
else {
- session->pos += r;
+ msg_err ("fuzzy_io_callback: got incomplete data while reading from socket: %d, %s", errno, strerror (errno));
+ return;
}
}
- else {
- free_session (session);
- }
-}
-
-
-/*
- * Accept new connection and construct task
- */
-static void
-accept_fuzzy_socket (int fd, short what, void *arg)
-{
- struct rspamd_worker *worker = (struct rspamd_worker *)arg;
- struct sockaddr_storage ss;
- struct sockaddr_in *sin;
- struct fuzzy_session *session;
- socklen_t addrlen = sizeof(ss);
- int nfd;
-
- if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
- msg_warn ("accept_fuzzy_socket: accept failed: %s", strerror (errno));
- return;
- }
- /* Check for EAGAIN */
- if (nfd == 0) {
- msg_debug ("accept_fuzzy_socket: cannot accept socket as it was already accepted by other worker");
- return;
- }
-
- if (ss.ss_family == AF_UNIX) {
- msg_info ("accept_fuzzy_socket: accepted connection from unix socket");
- }
- else if (ss.ss_family == AF_INET) {
- sin = (struct sockaddr_in *) &ss;
- msg_info ("accept_fuzzy_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
- }
-
- session = g_malloc (sizeof (struct fuzzy_session));
-
- session->worker = worker;
- session->fd = nfd;
- session->tv.tv_sec = WORKER_IO_TIMEOUT;
- session->tv.tv_usec = 0;
- session->pos = (u_char *)&session->cmd;
-
- event_set (&session->ev, session->fd, EV_READ | EV_PERSIST, fuzzy_io_callback, session);
- event_add (&session->ev, &session->tv);
-
-}
+}
static void
sync_callback (int fd, short what, void *arg)
@@ -475,6 +426,10 @@ start_fuzzy_storage (struct rspamd_worker *worker)
evtimer_add (&tev, &tmv);
/* Accept event */
+ if ((worker->cf->listen_sock = make_udp_socket (&worker->cf->bind_addr, worker->cf->bind_port, TRUE, TRUE)) == -1) {
+ msg_err ("start_fuzzy_storage: cannot bind to socket, exiting");
+ exit (0);
+ }
event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_fuzzy_socket, (void *)worker);
event_add(&worker->bind_ev, NULL);
diff --git a/src/fuzzy_storage.h b/src/fuzzy_storage.h
index 533ecaf13..b03acfacb 100644
--- a/src/fuzzy_storage.h
+++ b/src/fuzzy_storage.h
@@ -18,11 +18,11 @@ struct fuzzy_cmd {
struct fuzzy_session {
struct rspamd_worker *worker;
- struct event ev;
struct fuzzy_cmd cmd;
- struct timeval tv;
int fd;
u_char *pos;
+ int salen;
+ struct sockaddr sa;
};
void start_fuzzy_storage (struct rspamd_worker *worker);
diff --git a/src/main.c b/src/main.c
index 4353174f8..233b93eeb 100644
--- a/src/main.c
+++ b/src/main.c
@@ -425,12 +425,14 @@ spawn_workers (struct rspamd_main *rspamd)
cf = cur->data;
/* Create listen socket */
- listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port,
+ if (cf->type != TYPE_FUZZY) {
+ listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port,
cf->bind_family, cf->bind_host);
- if (listen_sock == -1) {
- exit(-errno);
+ if (listen_sock == -1) {
+ exit(-errno);
+ }
+ cf->listen_sock = listen_sock;
}
- cf->listen_sock = listen_sock;
for (i = 0; i < cf->count; i++) {
fork_worker (rspamd, cf);
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index da3e651a8..cd3641654 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -339,7 +339,7 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused)
DEFAULT_UPSTREAM_MAXERRORS,
part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
if (selected) {
- if ((sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
+ if ((sock = make_udp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
}
else {
@@ -404,7 +404,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in)
DEFAULT_UPSTREAM_MAXERRORS,
part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
if (selected) {
- if ((sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
+ if ((sock = make_udp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
diff --git a/src/util.c b/src/util.c
index 5fe99d051..bbe7ff666 100644
--- a/src/util.c
+++ b/src/util.c
@@ -50,15 +50,15 @@ make_socket_nonblocking (int fd)
return 0;
}
-int
-make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolean async)
+static int
+make_inet_socket (int family, struct in_addr *addr, u_short port, gboolean is_server, gboolean async)
{
int fd, r, optlen, on = 1, s_error;
int serrno;
struct sockaddr_in sin;
/* Create socket */
- fd = socket (AF_INET, SOCK_STREAM, 0);
+ fd = socket (AF_INET, family, 0);
if (fd == -1) {
msg_warn ("make_tcp_socket: socket failed: %d, '%s'", errno, strerror (errno));
return -1;
@@ -114,6 +114,18 @@ make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolea
}
int
+make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolean async)
+{
+ return make_inet_socket (SOCK_STREAM, addr, port, is_server, async);
+}
+
+int
+make_udp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolean async)
+{
+ return make_inet_socket (SOCK_DGRAM, addr, port, is_server, async);
+}
+
+int
accept_from_socket (int listen_sock, struct sockaddr *addr, socklen_t *len)
{
int nfd;
diff --git a/src/util.h b/src/util.h
index 139a44262..d871288ca 100644
--- a/src/util.h
+++ b/src/util.h
@@ -11,6 +11,8 @@ struct workq;
/* Create socket and bind or connect it to specified address and port */
int make_tcp_socket (struct in_addr *, u_short, gboolean is_server, gboolean async);
+/* Create socket and bind or connect it to specified address and port */
+int make_udp_socket (struct in_addr *, u_short, gboolean is_server, gboolean async);
/* Accept from socket */
int accept_from_socket (int listen_sock, struct sockaddr *addr, socklen_t *len);
/* Create and bind or connect unix socket */