diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-11-24 16:59:58 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-11-24 16:59:58 +0300 |
commit | 9c775b45468e79bf9885dcb0950b0636e225b9ee (patch) | |
tree | 06dab8b1e1712356b7ce45315414c2fa700e8c41 /src/kvstorage_server.c | |
parent | b67a212bfab7329f8de3e55e456b8e5bad226e57 (diff) | |
download | rspamd-9c775b45468e79bf9885dcb0950b0636e225b9ee.tar.gz rspamd-9c775b45468e79bf9885dcb0950b0636e225b9ee.zip |
Rework kvstorage locking system.
Diffstat (limited to 'src/kvstorage_server.c')
-rw-r--r-- | src/kvstorage_server.c | 29 |
1 files changed, 5 insertions, 24 deletions
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index 3f072654d..5ef34cbed 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -473,10 +473,8 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->arg_data.length); } else if (session->command == KVSTORAGE_CMD_GET) { - g_static_rw_lock_reader_lock (&session->cf->storage->rwlock); elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->keylen, session->now); if (elt == NULL) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); if (!is_redis) { return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); @@ -506,18 +504,15 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } if (!rspamd_dispatcher_write (session->dispather, outbuf, r, TRUE, FALSE)) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); return FALSE; } if (elt->flags & KV_ELT_INTEGER) { if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); return FALSE; } } else { - if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, FALSE)) { return FALSE; } } @@ -529,22 +524,17 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) res = rspamd_dispatcher_write (session->dispather, CRLF, sizeof (CRLF) - 1, FALSE, TRUE); } - if (!res) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); - } return res; } } else if (session->command == KVSTORAGE_CMD_DELETE) { - g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); elt = rspamd_kv_storage_delete (session->cf->storage, session->key, session->keylen); if (elt != NULL) { if ((elt->flags & KV_ELT_DIRTY) == 0) { /* Free memory if backend has deleted this element */ g_slice_free1 (ELT_SIZE (elt), elt); } - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); if (!is_redis) { return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF, sizeof ("DELETED" CRLF) - 1, FALSE, TRUE); @@ -555,7 +545,6 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } } else { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); if (!is_redis) { return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); @@ -567,10 +556,8 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } } else if (session->command == KVSTORAGE_CMD_INCR || session->command == KVSTORAGE_CMD_DECR) { - g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); longval = session->arg_data.value; if (!rspamd_kv_storage_increment (session->cf->storage, session->key, session->keylen, &longval)) { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); if (!is_redis) { return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); @@ -581,7 +568,6 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } } else { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); if (!is_redis) { r = rspamd_snprintf (outbuf, sizeof (outbuf), "%l" CRLF, longval); @@ -629,7 +615,6 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } } } - g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); } else if (session->command == KVSTORAGE_CMD_SELECT) { if (!is_redis) { @@ -842,16 +827,13 @@ kvstorage_read_socket (f_str_t * in, void *arg) if (session->command == KVSTORAGE_CMD_SET) { session->state = KVSTORAGE_STATE_READ_CMD; rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); - g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen, in->begin, in->len, session->flags, session->expire)) { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, sizeof ("+OK" CRLF) - 1, FALSE, TRUE); } else { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF, sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE); } @@ -870,11 +852,9 @@ kvstorage_read_socket (f_str_t * in, void *arg) case KVSTORAGE_STATE_READ_DATA: session->state = KVSTORAGE_STATE_READ_CMD; rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); - g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen, in->begin, in->len, session->flags, session->expire)) { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); if (!is_redis) { return rspamd_dispatcher_write (session->dispather, "STORED" CRLF, sizeof ("STORED" CRLF) - 1, FALSE, TRUE); @@ -885,7 +865,6 @@ kvstorage_read_socket (f_str_t * in, void *arg) } } else { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); if (!is_redis) { return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED, sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE); @@ -911,7 +890,6 @@ kvstorage_write_socket (void *arg) struct kvstorage_session *session = (struct kvstorage_session *) arg; if (session->elt) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); session->elt = NULL; } @@ -954,9 +932,10 @@ thr_accept_socket (gint fd, short what, void *arg) g_static_mutex_unlock (thr->accept_mtx); return; } - g_static_mutex_unlock (thr->accept_mtx); + /* Check for EAGAIN */ if (nfd == 0) { + g_static_mutex_unlock (thr->accept_mtx); return; } @@ -968,6 +947,8 @@ thr_accept_socket (gint fd, short what, void *arg) session->dispather = rspamd_create_dispatcher (thr->ev_base, nfd, BUFFER_LINE, kvstorage_read_socket, kvstorage_write_socket, kvstorage_err_socket, thr->tv, session); + + g_static_mutex_unlock (thr->accept_mtx); session->elt = NULL; if (su.ss.ss_family == AF_UNIX) { |