From 9c775b45468e79bf9885dcb0950b0636e225b9ee Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 24 Nov 2011 16:59:58 +0300 Subject: [PATCH] Rework kvstorage locking system. --- src/kvstorage.c | 56 +++++++++++++++++++++++++++++++++++++++--- src/kvstorage_server.c | 29 ++++------------------ 2 files changed, 57 insertions(+), 28 deletions(-) diff --git a/src/kvstorage.c b/src/kvstorage.c index 4afc826d8..60d28a833 100644 --- a/src/kvstorage.c +++ b/src/kvstorage.c @@ -95,7 +95,9 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k /* Now check limits */ while (storage->memory + len > storage->max_memory) { if (storage->expire) { + g_static_rw_lock_writer_lock (&storage->rwlock); storage->expire->step_func (storage->expire, storage, time (NULL), steps); + g_static_rw_lock_writer_unlock (&storage->rwlock); } else { msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); @@ -108,8 +110,11 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k } /* Insert elt to the cache */ + g_static_rw_lock_writer_lock (&storage->rwlock); elt = storage->cache->insert_func (storage->cache, key, keylen, data, len); + if (elt == NULL) { + g_static_rw_lock_writer_unlock (&storage->rwlock); return FALSE; } /* Copy data */ @@ -124,6 +129,7 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k storage->elts ++; storage->memory += elt->size + sizeof (struct rspamd_kv_element); + g_static_rw_lock_writer_unlock (&storage->rwlock); return TRUE; } @@ -149,7 +155,9 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint /* Now check limits */ while (storage->memory + len + keylen > storage->max_memory) { if (storage->expire) { + g_static_rw_lock_writer_lock (&storage->rwlock); storage->expire->step_func (storage->expire, storage, time (NULL), steps); + g_static_rw_lock_writer_unlock (&storage->rwlock); } else { msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); @@ -165,7 +173,9 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint steps = 0; while (storage->elts > storage->max_elts) { if (storage->expire) { + g_static_rw_lock_writer_lock (&storage->rwlock); storage->expire->step_func (storage->expire, storage, time (NULL), steps); + g_static_rw_lock_writer_unlock (&storage->rwlock); } else { msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); @@ -178,6 +188,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint } /* First try to search it in cache */ + g_static_rw_lock_writer_lock (&storage->rwlock); elt = storage->cache->lookup_func (storage->cache, key, keylen); if (elt) { if (storage->expire) { @@ -209,6 +220,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint else { elt = storage->cache->insert_func (storage->cache, key, keylen, data, len); if (elt == NULL) { + g_static_rw_lock_writer_unlock (&storage->rwlock); return FALSE; } } @@ -231,6 +243,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint storage->elts ++; storage->memory += ELT_SIZE (elt); + g_static_rw_lock_writer_unlock (&storage->rwlock); return res; } @@ -253,7 +266,9 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guin /* Now check limits */ while (storage->memory + ELT_SIZE (elt) > storage->max_memory) { if (storage->expire) { + g_static_rw_lock_writer_lock (&storage->rwlock); storage->expire->step_func (storage->expire, storage, time (NULL), steps); + g_static_rw_lock_writer_unlock (&storage->rwlock); } else { msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); @@ -265,6 +280,7 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guin } } + g_static_rw_lock_writer_lock (&storage->rwlock); /* Insert elt to the cache */ res = storage->cache->replace_func (storage->cache, key, keylen, elt); @@ -272,6 +288,7 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guin if (res && storage->backend) { res = storage->backend->replace_func (storage->backend, key, keylen, elt); } + g_static_rw_lock_writer_unlock (&storage->rwlock); return res; } @@ -284,6 +301,7 @@ rspamd_kv_storage_increment (struct rspamd_kv_storage *storage, gpointer key, gu glong *lp; /* First try to look at cache */ + g_static_rw_lock_writer_lock (&storage->rwlock); elt = storage->cache->lookup_func (storage->cache, key, keylen); if (elt == NULL && storage->backend) { @@ -305,13 +323,17 @@ rspamd_kv_storage_increment (struct rspamd_kv_storage *storage, gpointer key, gu *lp += *value; *value = *lp; if (storage->backend) { + g_static_rw_lock_writer_unlock (&storage->rwlock); return storage->backend->replace_func (storage->backend, key, keylen, elt); } else { + g_static_rw_lock_writer_unlock (&storage->rwlock); return TRUE; } } + g_static_rw_lock_writer_unlock (&storage->rwlock); + return FALSE; } @@ -322,11 +344,16 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, guint struct rspamd_kv_element *elt = NULL, *belt; /* First try to look at cache */ + g_static_rw_lock_reader_lock (&storage->rwlock); elt = storage->cache->lookup_func (storage->cache, key, keylen); + g_static_rw_lock_reader_unlock (&storage->rwlock); + /* Next look at the backend */ if (elt == NULL && storage->backend) { + g_static_rw_lock_reader_lock (&storage->rwlock); belt = storage->backend->lookup_func (storage->backend, key, keylen); + g_static_rw_lock_reader_unlock (&storage->rwlock); if (belt) { /* Put this element into cache */ rspamd_kv_storage_insert_internal (storage, ELT_KEY (belt), keylen, ELT_DATA (belt), @@ -355,6 +382,7 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint struct rspamd_kv_element *elt; /* First delete key from cache */ + g_static_rw_lock_writer_lock (&storage->rwlock); elt = storage->cache->delete_func (storage->cache, key, keylen); /* Now delete from backend */ @@ -369,6 +397,7 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint storage->elts --; storage->memory -= elt->size; } + g_static_rw_lock_writer_unlock (&storage->rwlock); return elt; } @@ -377,17 +406,20 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint void rspamd_kv_storage_destroy (struct rspamd_kv_storage *storage) { - if (storage->cache && storage->cache->destroy_func) { - storage->cache->destroy_func (storage->cache); - } + g_static_rw_lock_writer_lock (&storage->rwlock); if (storage->backend && storage->backend->destroy_func) { storage->backend->destroy_func (storage->backend); } if (storage->expire && storage->expire->destroy_func) { storage->expire->destroy_func (storage->expire); } + if (storage->cache && storage->cache->destroy_func) { + storage->cache->destroy_func (storage->cache); + } g_free (storage->name); + + g_static_rw_lock_writer_unlock (&storage->rwlock); g_slice_free1 (sizeof (struct rspamd_kv_storage), storage); } @@ -414,6 +446,7 @@ rspamd_kv_storage_insert_array (struct rspamd_kv_storage *storage, gpointer key, elt->flags |= KV_ELT_ARRAY; g_slice_free1 (len + sizeof (guint), arr_data); /* Place to the backend */ + if (storage->backend) { return storage->backend->insert_func (storage->backend, key, keylen, elt); } @@ -563,7 +596,12 @@ rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *st storage->cache->steal_func (storage->cache, elt); TAILQ_REMOVE (&expire->head, elt, entry); /* Free memory */ - g_slice_free1 (ELT_SIZE (elt), elt); + if ((elt->flags & KV_ELT_DIRTY) != 0) { + elt->flags |= KV_ELT_NEED_FREE; + } + else { + g_slice_free1 (ELT_SIZE (elt), elt); + } } } @@ -757,11 +795,21 @@ rspamd_kv_hash_steal (struct rspamd_kv_cache *c, struct rspamd_kv_element *elt) /** * Destroy the whole cache */ + +static void +rspamd_kv_hash_destroy_cb (gpointer key, gpointer value, gpointer unused) +{ + struct rspamd_kv_element *elt = value; + + g_slice_free1 (ELT_SIZE (elt), elt); +} + static void rspamd_kv_hash_destroy (struct rspamd_kv_cache *c) { struct rspamd_kv_hash_cache *cache = (struct rspamd_kv_hash_cache *)c; + g_hash_table_foreach (cache->hash, rspamd_kv_hash_destroy_cb, NULL); g_hash_table_destroy (cache->hash); g_slice_free1 (sizeof (struct rspamd_kv_hash_cache), cache); } diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index 3f072654d..5ef34cbed 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -473,10 +473,8 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->arg_data.length); } else if (session->command == KVSTORAGE_CMD_GET) { - g_static_rw_lock_reader_lock (&session->cf->storage->rwlock); elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->keylen, session->now); if (elt == NULL) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); if (!is_redis) { return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); @@ -506,18 +504,15 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } if (!rspamd_dispatcher_write (session->dispather, outbuf, r, TRUE, FALSE)) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); return FALSE; } if (elt->flags & KV_ELT_INTEGER) { if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); return FALSE; } } else { - if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, FALSE)) { return FALSE; } } @@ -529,22 +524,17 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) res = rspamd_dispatcher_write (session->dispather, CRLF, sizeof (CRLF) - 1, FALSE, TRUE); } - if (!res) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); - } return res; } } else if (session->command == KVSTORAGE_CMD_DELETE) { - g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); elt = rspamd_kv_storage_delete (session->cf->storage, session->key, session->keylen); if (elt != NULL) { if ((elt->flags & KV_ELT_DIRTY) == 0) { /* Free memory if backend has deleted this element */ g_slice_free1 (ELT_SIZE (elt), elt); } - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); if (!is_redis) { return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF, sizeof ("DELETED" CRLF) - 1, FALSE, TRUE); @@ -555,7 +545,6 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } } else { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); if (!is_redis) { return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); @@ -567,10 +556,8 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } } else if (session->command == KVSTORAGE_CMD_INCR || session->command == KVSTORAGE_CMD_DECR) { - g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); longval = session->arg_data.value; if (!rspamd_kv_storage_increment (session->cf->storage, session->key, session->keylen, &longval)) { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); if (!is_redis) { return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); @@ -581,7 +568,6 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } } else { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); if (!is_redis) { r = rspamd_snprintf (outbuf, sizeof (outbuf), "%l" CRLF, longval); @@ -629,7 +615,6 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } } } - g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); } else if (session->command == KVSTORAGE_CMD_SELECT) { if (!is_redis) { @@ -842,16 +827,13 @@ kvstorage_read_socket (f_str_t * in, void *arg) if (session->command == KVSTORAGE_CMD_SET) { session->state = KVSTORAGE_STATE_READ_CMD; rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); - g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen, in->begin, in->len, session->flags, session->expire)) { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, sizeof ("+OK" CRLF) - 1, FALSE, TRUE); } else { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF, sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE); } @@ -870,11 +852,9 @@ kvstorage_read_socket (f_str_t * in, void *arg) case KVSTORAGE_STATE_READ_DATA: session->state = KVSTORAGE_STATE_READ_CMD; rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); - g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen, in->begin, in->len, session->flags, session->expire)) { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); if (!is_redis) { return rspamd_dispatcher_write (session->dispather, "STORED" CRLF, sizeof ("STORED" CRLF) - 1, FALSE, TRUE); @@ -885,7 +865,6 @@ kvstorage_read_socket (f_str_t * in, void *arg) } } else { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); if (!is_redis) { return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED, sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE); @@ -911,7 +890,6 @@ kvstorage_write_socket (void *arg) struct kvstorage_session *session = (struct kvstorage_session *) arg; if (session->elt) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); session->elt = NULL; } @@ -954,9 +932,10 @@ thr_accept_socket (gint fd, short what, void *arg) g_static_mutex_unlock (thr->accept_mtx); return; } - g_static_mutex_unlock (thr->accept_mtx); + /* Check for EAGAIN */ if (nfd == 0) { + g_static_mutex_unlock (thr->accept_mtx); return; } @@ -968,6 +947,8 @@ thr_accept_socket (gint fd, short what, void *arg) session->dispather = rspamd_create_dispatcher (thr->ev_base, nfd, BUFFER_LINE, kvstorage_read_socket, kvstorage_write_socket, kvstorage_err_socket, thr->tv, session); + + g_static_mutex_unlock (thr->accept_mtx); session->elt = NULL; if (su.ss.ss_family == AF_UNIX) { -- 2.39.5