]> source.dussan.org Git - rspamd.git/commitdiff
Rework kvstorage locking system.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 24 Nov 2011 13:59:58 +0000 (16:59 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 24 Nov 2011 13:59:58 +0000 (16:59 +0300)
src/kvstorage.c
src/kvstorage_server.c

index 4afc826d8bc1af72bd59dc5e78fe282f7469960d..60d28a833d9f6dd858e468879a0f2665b04fb640 100644 (file)
@@ -95,7 +95,9 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k
                /* Now check limits */
                while (storage->memory + len > storage->max_memory) {
                        if (storage->expire) {
+                               g_static_rw_lock_writer_lock (&storage->rwlock);
                                storage->expire->step_func (storage->expire, storage, time (NULL), steps);
+                               g_static_rw_lock_writer_unlock (&storage->rwlock);
                        }
                        else {
                                msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
@@ -108,8 +110,11 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k
        }
 
        /* Insert elt to the cache */
+       g_static_rw_lock_writer_lock (&storage->rwlock);
        elt = storage->cache->insert_func (storage->cache, key, keylen, data, len);
+
        if (elt == NULL) {
+               g_static_rw_lock_writer_unlock (&storage->rwlock);
                return FALSE;
        }
        /* Copy data */
@@ -124,6 +129,7 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k
 
        storage->elts ++;
        storage->memory += elt->size + sizeof (struct rspamd_kv_element);
+       g_static_rw_lock_writer_unlock (&storage->rwlock);
 
        return TRUE;
 }
@@ -149,7 +155,9 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
                /* Now check limits */
                while (storage->memory + len + keylen > storage->max_memory) {
                        if (storage->expire) {
+                               g_static_rw_lock_writer_lock (&storage->rwlock);
                                storage->expire->step_func (storage->expire, storage, time (NULL), steps);
+                               g_static_rw_lock_writer_unlock (&storage->rwlock);
                        }
                        else {
                                msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
@@ -165,7 +173,9 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
                steps = 0;
                while (storage->elts > storage->max_elts) {
                        if (storage->expire) {
+                               g_static_rw_lock_writer_lock (&storage->rwlock);
                                storage->expire->step_func (storage->expire, storage, time (NULL), steps);
+                               g_static_rw_lock_writer_unlock (&storage->rwlock);
                        }
                        else {
                                msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
@@ -178,6 +188,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
        }
 
        /* First try to search it in cache */
+       g_static_rw_lock_writer_lock (&storage->rwlock);
        elt = storage->cache->lookup_func (storage->cache, key, keylen);
        if (elt) {
                if (storage->expire) {
@@ -209,6 +220,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
        else {
                elt = storage->cache->insert_func (storage->cache, key, keylen, data, len);
                if (elt == NULL) {
+                       g_static_rw_lock_writer_unlock (&storage->rwlock);
                        return FALSE;
                }
        }
@@ -231,6 +243,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
 
        storage->elts ++;
        storage->memory += ELT_SIZE (elt);
+       g_static_rw_lock_writer_unlock (&storage->rwlock);
 
        return res;
 }
@@ -253,7 +266,9 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guin
                /* Now check limits */
                while (storage->memory + ELT_SIZE (elt) > storage->max_memory) {
                        if (storage->expire) {
+                               g_static_rw_lock_writer_lock (&storage->rwlock);
                                storage->expire->step_func (storage->expire, storage, time (NULL), steps);
+                               g_static_rw_lock_writer_unlock (&storage->rwlock);
                        }
                        else {
                                msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
@@ -265,6 +280,7 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guin
                }
        }
 
+       g_static_rw_lock_writer_lock (&storage->rwlock);
        /* Insert elt to the cache */
        res = storage->cache->replace_func (storage->cache, key, keylen, elt);
 
@@ -272,6 +288,7 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guin
        if (res && storage->backend) {
                res = storage->backend->replace_func (storage->backend, key, keylen, elt);
        }
+       g_static_rw_lock_writer_unlock (&storage->rwlock);
 
        return res;
 }
@@ -284,6 +301,7 @@ rspamd_kv_storage_increment (struct rspamd_kv_storage *storage, gpointer key, gu
        glong                                                           *lp;
 
        /* First try to look at cache */
+       g_static_rw_lock_writer_lock (&storage->rwlock);
        elt = storage->cache->lookup_func (storage->cache, key, keylen);
 
        if (elt == NULL && storage->backend) {
@@ -305,13 +323,17 @@ rspamd_kv_storage_increment (struct rspamd_kv_storage *storage, gpointer key, gu
                *lp += *value;
                *value = *lp;
                if (storage->backend) {
+                       g_static_rw_lock_writer_unlock (&storage->rwlock);
                        return storage->backend->replace_func (storage->backend, key, keylen, elt);
                }
                else {
+                       g_static_rw_lock_writer_unlock (&storage->rwlock);
                        return TRUE;
                }
        }
 
+       g_static_rw_lock_writer_unlock (&storage->rwlock);
+
        return FALSE;
 }
 
@@ -322,11 +344,16 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, guint
        struct rspamd_kv_element                        *elt = NULL, *belt;
 
        /* First try to look at cache */
+       g_static_rw_lock_reader_lock (&storage->rwlock);
        elt = storage->cache->lookup_func (storage->cache, key, keylen);
+       g_static_rw_lock_reader_unlock (&storage->rwlock);
+
 
        /* Next look at the backend */
        if (elt == NULL && storage->backend) {
+               g_static_rw_lock_reader_lock (&storage->rwlock);
                belt = storage->backend->lookup_func (storage->backend, key, keylen);
+               g_static_rw_lock_reader_unlock (&storage->rwlock);
                if (belt) {
                        /* Put this element into cache */
                        rspamd_kv_storage_insert_internal (storage, ELT_KEY (belt), keylen, ELT_DATA (belt),
@@ -355,6 +382,7 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint
        struct rspamd_kv_element           *elt;
 
        /* First delete key from cache */
+       g_static_rw_lock_writer_lock (&storage->rwlock);
        elt = storage->cache->delete_func (storage->cache, key, keylen);
 
        /* Now delete from backend */
@@ -369,6 +397,7 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint
                storage->elts --;
                storage->memory -= elt->size;
        }
+       g_static_rw_lock_writer_unlock (&storage->rwlock);
 
        return elt;
 }
@@ -377,17 +406,20 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint
 void
 rspamd_kv_storage_destroy (struct rspamd_kv_storage *storage)
 {
-       if (storage->cache && storage->cache->destroy_func) {
-               storage->cache->destroy_func (storage->cache);
-       }
+       g_static_rw_lock_writer_lock (&storage->rwlock);
        if (storage->backend && storage->backend->destroy_func) {
                storage->backend->destroy_func (storage->backend);
        }
        if (storage->expire && storage->expire->destroy_func) {
                storage->expire->destroy_func (storage->expire);
        }
+       if (storage->cache && storage->cache->destroy_func) {
+               storage->cache->destroy_func (storage->cache);
+       }
 
        g_free (storage->name);
+
+       g_static_rw_lock_writer_unlock (&storage->rwlock);
        g_slice_free1 (sizeof (struct rspamd_kv_storage), storage);
 }
 
@@ -414,6 +446,7 @@ rspamd_kv_storage_insert_array (struct rspamd_kv_storage *storage, gpointer key,
        elt->flags |= KV_ELT_ARRAY;
        g_slice_free1 (len + sizeof (guint), arr_data);
        /* Place to the backend */
+
        if (storage->backend) {
                return storage->backend->insert_func (storage->backend, key, keylen, elt);
        }
@@ -563,7 +596,12 @@ rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *st
                                storage->cache->steal_func (storage->cache, elt);
                                TAILQ_REMOVE (&expire->head, elt, entry);
                                /* Free memory */
-                               g_slice_free1 (ELT_SIZE (elt), elt);
+                               if ((elt->flags & KV_ELT_DIRTY) != 0) {
+                                       elt->flags |= KV_ELT_NEED_FREE;
+                               }
+                               else {
+                                       g_slice_free1 (ELT_SIZE (elt), elt);
+                               }
 
                        }
                }
@@ -757,11 +795,21 @@ rspamd_kv_hash_steal (struct rspamd_kv_cache *c, struct rspamd_kv_element *elt)
 /**
  * Destroy the whole cache
  */
+
+static void
+rspamd_kv_hash_destroy_cb (gpointer key, gpointer value, gpointer unused)
+{
+       struct rspamd_kv_element                        *elt = value;
+
+       g_slice_free1 (ELT_SIZE (elt), elt);
+}
+
 static void
 rspamd_kv_hash_destroy (struct rspamd_kv_cache *c)
 {
        struct rspamd_kv_hash_cache                     *cache = (struct rspamd_kv_hash_cache *)c;
 
+       g_hash_table_foreach (cache->hash, rspamd_kv_hash_destroy_cb, NULL);
        g_hash_table_destroy (cache->hash);
        g_slice_free1 (sizeof (struct rspamd_kv_hash_cache), cache);
 }
index 3f072654d0aedbe9f6e644d09be5ee56d420ef6b..5ef34cbedba80313bc2e46d52ba873e025c798c3 100644 (file)
@@ -473,10 +473,8 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
                rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->arg_data.length);
        }
        else if (session->command == KVSTORAGE_CMD_GET) {
-               g_static_rw_lock_reader_lock (&session->cf->storage->rwlock);
                elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->keylen, session->now);
                if (elt == NULL) {
-                       g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
                        if (!is_redis) {
                                return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
                                                sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
@@ -506,18 +504,15 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
                        }
                        if (!rspamd_dispatcher_write (session->dispather, outbuf,
                                        r, TRUE, FALSE)) {
-                               g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
                                return FALSE;
                        }
                        if (elt->flags & KV_ELT_INTEGER) {
                                if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) {
-                                       g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
                                        return FALSE;
                                }
                        }
                        else {
-                               if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) {
-                                       g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+                               if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, FALSE)) {
                                        return FALSE;
                                }
                        }
@@ -529,22 +524,17 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
                                res = rspamd_dispatcher_write (session->dispather, CRLF,
                                                sizeof (CRLF) - 1, FALSE, TRUE);
                        }
-                       if (!res) {
-                               g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
-                       }
 
                        return res;
                }
        }
        else if (session->command == KVSTORAGE_CMD_DELETE) {
-               g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
                elt = rspamd_kv_storage_delete (session->cf->storage, session->key, session->keylen);
                if (elt != NULL) {
                        if ((elt->flags & KV_ELT_DIRTY) == 0) {
                                /* Free memory if backend has deleted this element */
                                g_slice_free1 (ELT_SIZE (elt), elt);
                        }
-                       g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
                        if (!is_redis) {
                                return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
                                                sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
@@ -555,7 +545,6 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
                        }
                }
                else {
-                       g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
                        if (!is_redis) {
                                return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
                                                sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
@@ -567,10 +556,8 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
                }
        }
        else if (session->command == KVSTORAGE_CMD_INCR || session->command == KVSTORAGE_CMD_DECR) {
-               g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
                longval = session->arg_data.value;
                if (!rspamd_kv_storage_increment (session->cf->storage, session->key, session->keylen, &longval)) {
-                       g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
                        if (!is_redis) {
                                return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
                                                sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
@@ -581,7 +568,6 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
                        }
                }
                else {
-                       g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
                        if (!is_redis) {
                                r = rspamd_snprintf (outbuf, sizeof (outbuf), "%l" CRLF,
                                                longval);
@@ -629,7 +615,6 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
                                }
                        }
                }
-               g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
        }
        else if (session->command == KVSTORAGE_CMD_SELECT) {
                if (!is_redis) {
@@ -842,16 +827,13 @@ kvstorage_read_socket (f_str_t * in, void *arg)
                        if (session->command == KVSTORAGE_CMD_SET) {
                                session->state = KVSTORAGE_STATE_READ_CMD;
                                rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
-                               g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
                                if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
                                                in->begin, in->len,
                                                session->flags, session->expire)) {
-                                       g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
                                        return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
                                                        sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
                                }
                                else {
-                                       g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
                                        return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
                                                        sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
                                }
@@ -870,11 +852,9 @@ kvstorage_read_socket (f_str_t * in, void *arg)
        case KVSTORAGE_STATE_READ_DATA:
                session->state = KVSTORAGE_STATE_READ_CMD;
                rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
-               g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
                if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
                                in->begin, in->len,
                                session->flags, session->expire)) {
-                       g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
                        if (!is_redis) {
                                return rspamd_dispatcher_write (session->dispather, "STORED" CRLF,
                                                sizeof ("STORED" CRLF) - 1, FALSE, TRUE);
@@ -885,7 +865,6 @@ kvstorage_read_socket (f_str_t * in, void *arg)
                        }
                }
                else {
-                       g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
                        if (!is_redis) {
                                return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED,
                                                sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE);
@@ -911,7 +890,6 @@ kvstorage_write_socket (void *arg)
        struct kvstorage_session                        *session = (struct kvstorage_session *) arg;
 
        if (session->elt) {
-               g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
                session->elt = NULL;
        }
 
@@ -954,9 +932,10 @@ thr_accept_socket (gint fd, short what, void *arg)
                g_static_mutex_unlock (thr->accept_mtx);
                return;
        }
-       g_static_mutex_unlock (thr->accept_mtx);
+
        /* Check for EAGAIN */
        if (nfd == 0) {
+               g_static_mutex_unlock (thr->accept_mtx);
                return;
        }
 
@@ -968,6 +947,8 @@ thr_accept_socket (gint fd, short what, void *arg)
        session->dispather = rspamd_create_dispatcher (thr->ev_base, nfd, BUFFER_LINE,
                        kvstorage_read_socket, kvstorage_write_socket,
                        kvstorage_err_socket, thr->tv, session);
+
+       g_static_mutex_unlock (thr->accept_mtx);
        session->elt = NULL;
 
        if (su.ss.ss_family == AF_UNIX) {