#define FUZZY_TCP_BUFFER_LENGTH 8192
+struct rspamd_fuzzy_tcp_reply {
+ uint16_t size_hdr; /* We have to write this as well */
+ struct rspamd_fuzzy_encrypted_reply rep; /* Payload */
+};
+
struct fuzzy_tcp_reply {
- struct rspamd_fuzzy_encrypted_reply rep; /* Serialized reply */
- unsigned int written; /* How many have we already written */
- struct fuzzy_tcp_reply *prev, *next; /* Link */
+ struct rspamd_fuzzy_tcp_reply rep; /* Serialized reply */
+ unsigned int written; /* How many bytes have we already written */
+ struct fuzzy_tcp_reply *prev, *next; /* Link */
};
struct fuzzy_tcp_session {
struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data;
msg_debug_fuzzy_storage("got io for %s: %d", rspamd_inet_address_to_string(tcp_session->addr), revents);
+
+ if (revents & EV_READ) {
+ ssize_t r;
+
+ r = read(tcp_session->fd, tcp_session->input_buf + tcp_session->bytes_unprocessed,
+ sizeof(tcp_session->input_buf) - tcp_session->bytes_unprocessed);
+
+ if (r == -1) {
+ /* Cannot read anything */
+ }
+ else if (r == 0) {
+ /* Got EOF */
+ }
+ else {
+ }
+ }
+ else if (revents & EV_WRITE) {
+ if (tcp_session->replies_queue) {
+ /* Try to write as many replies, as possible */
+ struct fuzzy_tcp_reply *rep, *tmp;
+ int n = 0;
+
+ DL_FOREACH(tcp_session->replies_queue, rep)
+ {
+ n++;
+ }
+
+ /* Allocate iovec */
+ struct iovec *iov = g_malloc(sizeof(struct iovec) * n);
+ n = 0;
+ DL_FOREACH(tcp_session->replies_queue, rep)
+ {
+ iov[n].iov_base = ((unsigned char *) &rep->rep) + rep->written;
+ iov[n].iov_len = sizeof(rep->rep) - rep->written;
+ n++;
+ }
+
+ /* Try to write everything */
+ ssize_t r = writev(tcp_session->fd, iov, n);
+
+ if (r == -1) {
+ /* Cannot write anything */
+ msg_debug_fuzzy_storage("failed TCP connection from %s; cannot write: %s",
+ rspamd_inet_address_to_string(tcp_session->addr),
+ strerror(errno));
+
+ tcp_session_dtor(tcp_session);
+ }
+ else if (r == 0) {
+ /* Fake EV_WRITE? */
+ ev_io_start(loop, w);
+ }
+ else {
+ /* Now we have to process all replies and check if we need to free them */
+ DL_FOREACH_SAFE(tcp_session->replies_queue, rep, tmp)
+ {
+ if (r >= (ssize_t) (sizeof(rep->rep) - rep->written)) {
+ /* We have written it completely */
+ r -= sizeof(rep->rep) - rep->written;
+ DL_DELETE(tcp_session->replies_queue, rep);
+ g_free(rep);
+ }
+ else {
+ /* Found incomplete buffer */
+ rep->written += r;
+ break;
+ }
+ }
+
+ if (tcp_session->replies_queue != NULL) {
+ /* No more replies */
+ ev_io_set(w, tcp_session->fd, EV_READ);
+ }
+ else {
+ /* Wait for another write readiness */
+ ev_io_set(w, tcp_session->fd, EV_WRITE | EV_READ);
+ }
+
+ ev_io_start(loop, w);
+ /* Reset timer */
+ ev_timer_again(loop, &tcp_session->tm);
+ }
+ }
+ }
}
static void
tcp_session->ctx = ctx;
tcp_session->fd = nfd;
ev_io_init(&tcp_session->io, tcp_fuzzy_socket_io, nfd, EV_READ);
- ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, 0.0);
+ ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, ctx->tcp_timeout);
tcp_session->tm.data = tcp_session;
tcp_session->io.data = tcp_session;
ev_timer_start(ctx->event_loop, &tcp_session->tm);