len = g_queue_get_length (d->out_buffers);
if (len > 1) {
/* IOV version */
+
+ /* Unset delayed as actually we HAVE buffers to write */
+ is_delayed = TRUE;
cur = d->out_buffers->tail;
iov = g_slice_alloc (len * sizeof (struct iovec));
i = 0;
}
}
else {
+ session->elt = elt;
+
if (!is_redis) {
r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
ELT_KEY (elt), elt->flags, elt->size);
res = rspamd_dispatcher_write (session->dispather, CRLF,
sizeof (CRLF) - 1, FALSE, TRUE);
}
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ if (!res) {
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ }
+
return res;
}
}
return TRUE;
}
+/*
+ * Called if buffers were written
+ */
+static gboolean
+kvstorage_write_socket (void *arg)
+{
+ struct kvstorage_session *session = (struct kvstorage_session *) arg;
+
+ if (session->elt) {
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ session->elt = NULL;
+ }
+
+ return TRUE;
+}
+
/*
* Called if something goes wrong
*/
session->thr = thr;
session->sock = nfd;
session->dispather = rspamd_create_dispatcher (thr->ev_base, nfd, BUFFER_LINE,
- kvstorage_read_socket, NULL,
+ kvstorage_read_socket, kvstorage_write_socket,
kvstorage_err_socket, thr->tv, session);
+ session->elt = NULL;
if (su.ss.ss_family == AF_UNIX) {
session->client_addr.s_addr = INADDR_NONE;