aboutsummaryrefslogtreecommitdiffstats
path: root/src/kvstorage_server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/kvstorage_server.c')
-rw-r--r--src/kvstorage_server.c29
1 files changed, 5 insertions, 24 deletions
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c
index 3f072654d..5ef34cbed 100644
--- a/src/kvstorage_server.c
+++ b/src/kvstorage_server.c
@@ -473,10 +473,8 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->arg_data.length);
}
else if (session->command == KVSTORAGE_CMD_GET) {
- g_static_rw_lock_reader_lock (&session->cf->storage->rwlock);
elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->keylen, session->now);
if (elt == NULL) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
@@ -506,18 +504,15 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
}
if (!rspamd_dispatcher_write (session->dispather, outbuf,
r, TRUE, FALSE)) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
return FALSE;
}
if (elt->flags & KV_ELT_INTEGER) {
if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
return FALSE;
}
}
else {
- if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, FALSE)) {
return FALSE;
}
}
@@ -529,22 +524,17 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
res = rspamd_dispatcher_write (session->dispather, CRLF,
sizeof (CRLF) - 1, FALSE, TRUE);
}
- if (!res) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
- }
return res;
}
}
else if (session->command == KVSTORAGE_CMD_DELETE) {
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
elt = rspamd_kv_storage_delete (session->cf->storage, session->key, session->keylen);
if (elt != NULL) {
if ((elt->flags & KV_ELT_DIRTY) == 0) {
/* Free memory if backend has deleted this element */
g_slice_free1 (ELT_SIZE (elt), elt);
}
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
@@ -555,7 +545,6 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
}
}
else {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
@@ -567,10 +556,8 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
}
}
else if (session->command == KVSTORAGE_CMD_INCR || session->command == KVSTORAGE_CMD_DECR) {
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
longval = session->arg_data.value;
if (!rspamd_kv_storage_increment (session->cf->storage, session->key, session->keylen, &longval)) {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
@@ -581,7 +568,6 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
}
}
else {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
r = rspamd_snprintf (outbuf, sizeof (outbuf), "%l" CRLF,
longval);
@@ -629,7 +615,6 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
}
}
}
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
}
else if (session->command == KVSTORAGE_CMD_SELECT) {
if (!is_redis) {
@@ -842,16 +827,13 @@ kvstorage_read_socket (f_str_t * in, void *arg)
if (session->command == KVSTORAGE_CMD_SET) {
session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
in->begin, in->len,
session->flags, session->expire)) {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
}
else {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
}
@@ -870,11 +852,9 @@ kvstorage_read_socket (f_str_t * in, void *arg)
case KVSTORAGE_STATE_READ_DATA:
session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
in->begin, in->len,
session->flags, session->expire)) {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, "STORED" CRLF,
sizeof ("STORED" CRLF) - 1, FALSE, TRUE);
@@ -885,7 +865,6 @@ kvstorage_read_socket (f_str_t * in, void *arg)
}
}
else {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED,
sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE);
@@ -911,7 +890,6 @@ kvstorage_write_socket (void *arg)
struct kvstorage_session *session = (struct kvstorage_session *) arg;
if (session->elt) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
session->elt = NULL;
}
@@ -954,9 +932,10 @@ thr_accept_socket (gint fd, short what, void *arg)
g_static_mutex_unlock (thr->accept_mtx);
return;
}
- g_static_mutex_unlock (thr->accept_mtx);
+
/* Check for EAGAIN */
if (nfd == 0) {
+ g_static_mutex_unlock (thr->accept_mtx);
return;
}
@@ -968,6 +947,8 @@ thr_accept_socket (gint fd, short what, void *arg)
session->dispather = rspamd_create_dispatcher (thr->ev_base, nfd, BUFFER_LINE,
kvstorage_read_socket, kvstorage_write_socket,
kvstorage_err_socket, thr->tv, session);
+
+ g_static_mutex_unlock (thr->accept_mtx);
session->elt = NULL;
if (su.ss.ss_family == AF_UNIX) {