g_free(tcp_session);
}
+static bool
+fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_read)
+{
+ if (bytes_read <= 0) {
+ /* Apparent garbage */
+ return false;
+ }
+
+ static const unsigned state_mask = 0xC000;
+
+ size_t buf_avail = tcp_session->bytes_unprocessed + bytes_read;
+ uint8_t *p = (uint8_t *) tcp_session->input_buf;
+
+ while (buf_avail > 0) {
+ /* Run our state machine */
+ unsigned int state = (tcp_session->cur_frame_state & state_mask) >> 14;
+
+ if (state == 0) {
+ /* We have nothing to be read, process the next frame */
+ tcp_session->cur_frame_state = (uint16_t) (*p) | 0x4000u;
+ p++;
+ buf_avail--;
+ }
+ else if (state == 1) {
+ /* We have read one byte of size and need to advance another one */
+ tcp_session->cur_frame_state |= ((uint16_t) (*p << 8u)) | 0x8000u;
+ p++;
+ buf_avail--;
+ }
+ else {
+ size_t bytes_required = tcp_session->cur_frame_state & ~state_mask;
+
+ if (buf_avail >= bytes_required) {
+ /* We can proces the next command */
+ msg_debug_fuzzy_storage("can process next command: %d bytes available, %d bytes required",
+ (int) buf_avail, bytes_required);
+
+ /* TODO: add this */
+ buf_avail -= bytes_required;
+ p += bytes_required;
+ tcp_session->cur_frame_state = 0;
+ }
+ else {
+ /* Partial read, need more */
+ msg_debug_fuzzy_storage("can not process next command: %d bytes available, %d bytes required (partial read)",
+ (int) buf_avail, bytes_required);
+ tcp_session->bytes_unprocessed = buf_avail;
+ memmove(tcp_session->input_buf, p, buf_avail);
+
+ break;
+ }
+ }
+ }
+
+ return true;
+}
+
static void
tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
{
if (r == -1) {
/* Cannot read anything */
+ msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: %s",
+ rspamd_inet_address_to_string(tcp_session->addr),
+ strerror(errno));
+
+ tcp_session_dtor(tcp_session);
}
else if (r == 0) {
/* Got EOF */
+ msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: EOF",
+ rspamd_inet_address_to_string(tcp_session->addr));
+
+ tcp_session_dtor(tcp_session);
}
else {
+ if (!fuzzy_tcp_process_input(tcp_session, r)) {
+ tcp_session_dtor(tcp_session);
+ }
+ else {
+ if (tcp_session->replies_queue != NULL) {
+ /* No more replies */
+ ev_io_set(w, tcp_session->fd, EV_READ);
+ }
+ else {
+ /* Wait for another write readiness */
+ ev_io_set(w, tcp_session->fd, EV_WRITE | EV_READ);
+ }
+
+ ev_io_start(loop, w);
+ /* Reset timer */
+ ev_timer_again(loop, &tcp_session->tm);
+ }
}
}
else if (revents & EV_WRITE) {
n++;
}
- /* Allocate iovec */
- struct iovec *iov = g_malloc(sizeof(struct iovec) * n);
+ /* Allocate iovec (use stack for small number to avoid malloc calls for the trivial cases) */
+ struct iovec *iov = n > 32 ? g_malloc(sizeof(struct iovec) * n) : g_alloca(sizeof(struct iovec) * n);
n = 0;
DL_FOREACH(tcp_session->replies_queue, rep)
{
/* Try to write everything */
ssize_t r = writev(tcp_session->fd, iov, n);
+ if (n > 32) {
+ g_free(iov);
+ }
+
if (r == -1) {
/* Cannot write anything */
msg_debug_fuzzy_storage("failed TCP connection from %s; cannot write: %s",