Browse Source

* Migrate fuzzy storage to use UDP instead of TCP

tags/0.2.7
Vsevolod Stakhov 15 years ago
parent
commit
ab8f97db48
6 changed files with 53 additions and 82 deletions
  1. 26
    71
      src/fuzzy_storage.c
  2. 2
    2
      src/fuzzy_storage.h
  3. 6
    4
      src/main.c
  4. 2
    2
      src/plugins/fuzzy_check.c
  5. 15
    3
      src/util.c
  6. 2
    0
      src/util.h

+ 26
- 71
src/fuzzy_storage.c View File

@@ -215,16 +215,6 @@ read_hashes_file (struct rspamd_worker *wrk)
return TRUE;
}

static void
free_session (struct fuzzy_session *session)
{
/* Delete IO event */
event_del (&session->ev);
/* Close socket */
close (session->fd);
g_free (session);
}

static gboolean
process_check_command (struct fuzzy_cmd *cmd)
{
@@ -315,12 +305,12 @@ process_delete_command (struct fuzzy_cmd *cmd)
#define CMD_PROCESS(x) \
do { \
if (process_##x##_command (&session->cmd)) { \
if (write (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1) == -1) { \
if (sendto (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1, 0, &session->sa, session->salen) == -1) { \
msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno)); \
} \
} \
else { \
if (write (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1) == -1) { \
if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, &session->sa, session->salen) == -1) { \
msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno)); \
} \
} \
@@ -340,7 +330,7 @@ process_fuzzy_command (struct fuzzy_session *session)
CMD_PROCESS(delete);
break;
default:
if (write (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1) == -1) {
if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, &session->sa, session->salen) == -1) {
msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno));
}
break;
@@ -349,77 +339,38 @@ process_fuzzy_command (struct fuzzy_session *session)

#undef CMD_PROCESS

/* Callback for network IO */

/*
* Accept new connection and construct task
*/
static void
fuzzy_io_callback (int fd, short what, void *arg)
accept_fuzzy_socket (int fd, short what, void *arg)
{
struct fuzzy_session *session = arg;
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
struct fuzzy_session session;
ssize_t r;

session.worker = worker;
session.fd = fd;
session.pos = (u_char *)&session.cmd;

/* Got some data */
if (what == EV_READ) {
if ((r = read (fd, session->pos, (u_char *)&session->cmd + sizeof (struct fuzzy_cmd) - session->pos)) == -1) {
if ((r = recvfrom (fd, session.pos, sizeof (struct fuzzy_cmd), MSG_WAITALL, &session.sa, &session.salen)) == -1) {
msg_err ("fuzzy_io_callback: got error while reading from socket: %d, %s", errno, strerror (errno));
free_session (session);
return;
}
else if (session->pos + r == (u_char *)&session->cmd + sizeof (struct fuzzy_cmd)) {
else if (r == sizeof (struct fuzzy_cmd)) {
/* Assume that the whole command was read */
process_fuzzy_command (session);
free_session (session);
process_fuzzy_command (&session);
}
else {
session->pos += r;
msg_err ("fuzzy_io_callback: got incomplete data while reading from socket: %d, %s", errno, strerror (errno));
return;
}
}
else {
free_session (session);
}
}


/*
* Accept new connection and construct task
*/
static void
accept_fuzzy_socket (int fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
struct sockaddr_storage ss;
struct sockaddr_in *sin;
struct fuzzy_session *session;
socklen_t addrlen = sizeof(ss);
int nfd;
if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
msg_warn ("accept_fuzzy_socket: accept failed: %s", strerror (errno));
return;
}
/* Check for EAGAIN */
if (nfd == 0) {
msg_debug ("accept_fuzzy_socket: cannot accept socket as it was already accepted by other worker");
return;
}

if (ss.ss_family == AF_UNIX) {
msg_info ("accept_fuzzy_socket: accepted connection from unix socket");
}
else if (ss.ss_family == AF_INET) {
sin = (struct sockaddr_in *) &ss;
msg_info ("accept_fuzzy_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
}
session = g_malloc (sizeof (struct fuzzy_session));

session->worker = worker;
session->fd = nfd;
session->tv.tv_sec = WORKER_IO_TIMEOUT;
session->tv.tv_usec = 0;
session->pos = (u_char *)&session->cmd;

event_set (&session->ev, session->fd, EV_READ | EV_PERSIST, fuzzy_io_callback, session);
event_add (&session->ev, &session->tv);

}
}

static void
sync_callback (int fd, short what, void *arg)
@@ -475,6 +426,10 @@ start_fuzzy_storage (struct rspamd_worker *worker)
evtimer_add (&tev, &tmv);

/* Accept event */
if ((worker->cf->listen_sock = make_udp_socket (&worker->cf->bind_addr, worker->cf->bind_port, TRUE, TRUE)) == -1) {
msg_err ("start_fuzzy_storage: cannot bind to socket, exiting");
exit (0);
}
event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_fuzzy_socket, (void *)worker);
event_add(&worker->bind_ev, NULL);


+ 2
- 2
src/fuzzy_storage.h View File

@@ -18,11 +18,11 @@ struct fuzzy_cmd {

struct fuzzy_session {
struct rspamd_worker *worker;
struct event ev;
struct fuzzy_cmd cmd;
struct timeval tv;
int fd;
u_char *pos;
int salen;
struct sockaddr sa;
};

void start_fuzzy_storage (struct rspamd_worker *worker);

+ 6
- 4
src/main.c View File

@@ -425,12 +425,14 @@ spawn_workers (struct rspamd_main *rspamd)
cf = cur->data;

/* Create listen socket */
listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port,
if (cf->type != TYPE_FUZZY) {
listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port,
cf->bind_family, cf->bind_host);
if (listen_sock == -1) {
exit(-errno);
if (listen_sock == -1) {
exit(-errno);
}
cf->listen_sock = listen_sock;
}
cf->listen_sock = listen_sock;

for (i = 0; i < cf->count; i++) {
fork_worker (rspamd, cf);

+ 2
- 2
src/plugins/fuzzy_check.c View File

@@ -339,7 +339,7 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused)
DEFAULT_UPSTREAM_MAXERRORS,
part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
if (selected) {
if ((sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
if ((sock = make_udp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
}
else {
@@ -404,7 +404,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in)
DEFAULT_UPSTREAM_MAXERRORS,
part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
if (selected) {
if ((sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
if ((sock = make_udp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);

+ 15
- 3
src/util.c View File

@@ -50,15 +50,15 @@ make_socket_nonblocking (int fd)
return 0;
}

int
make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolean async)
static int
make_inet_socket (int family, struct in_addr *addr, u_short port, gboolean is_server, gboolean async)
{
int fd, r, optlen, on = 1, s_error;
int serrno;
struct sockaddr_in sin;
/* Create socket */
fd = socket (AF_INET, SOCK_STREAM, 0);
fd = socket (AF_INET, family, 0);
if (fd == -1) {
msg_warn ("make_tcp_socket: socket failed: %d, '%s'", errno, strerror (errno));
return -1;
@@ -113,6 +113,18 @@ make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolea
return (-1);
}

int
make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolean async)
{
return make_inet_socket (SOCK_STREAM, addr, port, is_server, async);
}

int
make_udp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolean async)
{
return make_inet_socket (SOCK_DGRAM, addr, port, is_server, async);
}

int
accept_from_socket (int listen_sock, struct sockaddr *addr, socklen_t *len)
{

+ 2
- 0
src/util.h View File

@@ -11,6 +11,8 @@ struct workq;

/* Create socket and bind or connect it to specified address and port */
int make_tcp_socket (struct in_addr *, u_short, gboolean is_server, gboolean async);
/* Create socket and bind or connect it to specified address and port */
int make_udp_socket (struct in_addr *, u_short, gboolean is_server, gboolean async);
/* Accept from socket */
int accept_from_socket (int listen_sock, struct sockaddr *addr, socklen_t *len);
/* Create and bind or connect unix socket */

Loading…
Cancel
Save