aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-11-24 16:59:58 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-11-24 16:59:58 +0300
commit9c775b45468e79bf9885dcb0950b0636e225b9ee (patch)
tree06dab8b1e1712356b7ce45315414c2fa700e8c41
parentb67a212bfab7329f8de3e55e456b8e5bad226e57 (diff)
downloadrspamd-9c775b45468e79bf9885dcb0950b0636e225b9ee.tar.gz
rspamd-9c775b45468e79bf9885dcb0950b0636e225b9ee.zip
Rework kvstorage locking system.
-rw-r--r--src/kvstorage.c56
-rw-r--r--src/kvstorage_server.c29
2 files changed, 57 insertions, 28 deletions
diff --git a/src/kvstorage.c b/src/kvstorage.c
index 4afc826d8..60d28a833 100644
--- a/src/kvstorage.c
+++ b/src/kvstorage.c
@@ -95,7 +95,9 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k
/* Now check limits */
while (storage->memory + len > storage->max_memory) {
if (storage->expire) {
+ g_static_rw_lock_writer_lock (&storage->rwlock);
storage->expire->step_func (storage->expire, storage, time (NULL), steps);
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
@@ -108,8 +110,11 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k
}
/* Insert elt to the cache */
+ g_static_rw_lock_writer_lock (&storage->rwlock);
elt = storage->cache->insert_func (storage->cache, key, keylen, data, len);
+
if (elt == NULL) {
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return FALSE;
}
/* Copy data */
@@ -124,6 +129,7 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k
storage->elts ++;
storage->memory += elt->size + sizeof (struct rspamd_kv_element);
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return TRUE;
}
@@ -149,7 +155,9 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
/* Now check limits */
while (storage->memory + len + keylen > storage->max_memory) {
if (storage->expire) {
+ g_static_rw_lock_writer_lock (&storage->rwlock);
storage->expire->step_func (storage->expire, storage, time (NULL), steps);
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
@@ -165,7 +173,9 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
steps = 0;
while (storage->elts > storage->max_elts) {
if (storage->expire) {
+ g_static_rw_lock_writer_lock (&storage->rwlock);
storage->expire->step_func (storage->expire, storage, time (NULL), steps);
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
@@ -178,6 +188,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
}
/* First try to search it in cache */
+ g_static_rw_lock_writer_lock (&storage->rwlock);
elt = storage->cache->lookup_func (storage->cache, key, keylen);
if (elt) {
if (storage->expire) {
@@ -209,6 +220,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
else {
elt = storage->cache->insert_func (storage->cache, key, keylen, data, len);
if (elt == NULL) {
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return FALSE;
}
}
@@ -231,6 +243,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
storage->elts ++;
storage->memory += ELT_SIZE (elt);
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return res;
}
@@ -253,7 +266,9 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guin
/* Now check limits */
while (storage->memory + ELT_SIZE (elt) > storage->max_memory) {
if (storage->expire) {
+ g_static_rw_lock_writer_lock (&storage->rwlock);
storage->expire->step_func (storage->expire, storage, time (NULL), steps);
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
@@ -265,6 +280,7 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guin
}
}
+ g_static_rw_lock_writer_lock (&storage->rwlock);
/* Insert elt to the cache */
res = storage->cache->replace_func (storage->cache, key, keylen, elt);
@@ -272,6 +288,7 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guin
if (res && storage->backend) {
res = storage->backend->replace_func (storage->backend, key, keylen, elt);
}
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return res;
}
@@ -284,6 +301,7 @@ rspamd_kv_storage_increment (struct rspamd_kv_storage *storage, gpointer key, gu
glong *lp;
/* First try to look at cache */
+ g_static_rw_lock_writer_lock (&storage->rwlock);
elt = storage->cache->lookup_func (storage->cache, key, keylen);
if (elt == NULL && storage->backend) {
@@ -305,13 +323,17 @@ rspamd_kv_storage_increment (struct rspamd_kv_storage *storage, gpointer key, gu
*lp += *value;
*value = *lp;
if (storage->backend) {
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return storage->backend->replace_func (storage->backend, key, keylen, elt);
}
else {
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return TRUE;
}
}
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
+
return FALSE;
}
@@ -322,11 +344,16 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, guint
struct rspamd_kv_element *elt = NULL, *belt;
/* First try to look at cache */
+ g_static_rw_lock_reader_lock (&storage->rwlock);
elt = storage->cache->lookup_func (storage->cache, key, keylen);
+ g_static_rw_lock_reader_unlock (&storage->rwlock);
+
/* Next look at the backend */
if (elt == NULL && storage->backend) {
+ g_static_rw_lock_reader_lock (&storage->rwlock);
belt = storage->backend->lookup_func (storage->backend, key, keylen);
+ g_static_rw_lock_reader_unlock (&storage->rwlock);
if (belt) {
/* Put this element into cache */
rspamd_kv_storage_insert_internal (storage, ELT_KEY (belt), keylen, ELT_DATA (belt),
@@ -355,6 +382,7 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint
struct rspamd_kv_element *elt;
/* First delete key from cache */
+ g_static_rw_lock_writer_lock (&storage->rwlock);
elt = storage->cache->delete_func (storage->cache, key, keylen);
/* Now delete from backend */
@@ -369,6 +397,7 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint
storage->elts --;
storage->memory -= elt->size;
}
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return elt;
}
@@ -377,17 +406,20 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint
void
rspamd_kv_storage_destroy (struct rspamd_kv_storage *storage)
{
- if (storage->cache && storage->cache->destroy_func) {
- storage->cache->destroy_func (storage->cache);
- }
+ g_static_rw_lock_writer_lock (&storage->rwlock);
if (storage->backend && storage->backend->destroy_func) {
storage->backend->destroy_func (storage->backend);
}
if (storage->expire && storage->expire->destroy_func) {
storage->expire->destroy_func (storage->expire);
}
+ if (storage->cache && storage->cache->destroy_func) {
+ storage->cache->destroy_func (storage->cache);
+ }
g_free (storage->name);
+
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
g_slice_free1 (sizeof (struct rspamd_kv_storage), storage);
}
@@ -414,6 +446,7 @@ rspamd_kv_storage_insert_array (struct rspamd_kv_storage *storage, gpointer key,
elt->flags |= KV_ELT_ARRAY;
g_slice_free1 (len + sizeof (guint), arr_data);
/* Place to the backend */
+
if (storage->backend) {
return storage->backend->insert_func (storage->backend, key, keylen, elt);
}
@@ -563,7 +596,12 @@ rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *st
storage->cache->steal_func (storage->cache, elt);
TAILQ_REMOVE (&expire->head, elt, entry);
/* Free memory */
- g_slice_free1 (ELT_SIZE (elt), elt);
+ if ((elt->flags & KV_ELT_DIRTY) != 0) {
+ elt->flags |= KV_ELT_NEED_FREE;
+ }
+ else {
+ g_slice_free1 (ELT_SIZE (elt), elt);
+ }
}
}
@@ -757,11 +795,21 @@ rspamd_kv_hash_steal (struct rspamd_kv_cache *c, struct rspamd_kv_element *elt)
/**
* Destroy the whole cache
*/
+
+static void
+rspamd_kv_hash_destroy_cb (gpointer key, gpointer value, gpointer unused)
+{
+ struct rspamd_kv_element *elt = value;
+
+ g_slice_free1 (ELT_SIZE (elt), elt);
+}
+
static void
rspamd_kv_hash_destroy (struct rspamd_kv_cache *c)
{
struct rspamd_kv_hash_cache *cache = (struct rspamd_kv_hash_cache *)c;
+ g_hash_table_foreach (cache->hash, rspamd_kv_hash_destroy_cb, NULL);
g_hash_table_destroy (cache->hash);
g_slice_free1 (sizeof (struct rspamd_kv_hash_cache), cache);
}
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c
index 3f072654d..5ef34cbed 100644
--- a/src/kvstorage_server.c
+++ b/src/kvstorage_server.c
@@ -473,10 +473,8 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->arg_data.length);
}
else if (session->command == KVSTORAGE_CMD_GET) {
- g_static_rw_lock_reader_lock (&session->cf->storage->rwlock);
elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->keylen, session->now);
if (elt == NULL) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
@@ -506,18 +504,15 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
}
if (!rspamd_dispatcher_write (session->dispather, outbuf,
r, TRUE, FALSE)) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
return FALSE;
}
if (elt->flags & KV_ELT_INTEGER) {
if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
return FALSE;
}
}
else {
- if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, FALSE)) {
return FALSE;
}
}
@@ -529,22 +524,17 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
res = rspamd_dispatcher_write (session->dispather, CRLF,
sizeof (CRLF) - 1, FALSE, TRUE);
}
- if (!res) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
- }
return res;
}
}
else if (session->command == KVSTORAGE_CMD_DELETE) {
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
elt = rspamd_kv_storage_delete (session->cf->storage, session->key, session->keylen);
if (elt != NULL) {
if ((elt->flags & KV_ELT_DIRTY) == 0) {
/* Free memory if backend has deleted this element */
g_slice_free1 (ELT_SIZE (elt), elt);
}
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
@@ -555,7 +545,6 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
}
}
else {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
@@ -567,10 +556,8 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
}
}
else if (session->command == KVSTORAGE_CMD_INCR || session->command == KVSTORAGE_CMD_DECR) {
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
longval = session->arg_data.value;
if (!rspamd_kv_storage_increment (session->cf->storage, session->key, session->keylen, &longval)) {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
@@ -581,7 +568,6 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
}
}
else {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
r = rspamd_snprintf (outbuf, sizeof (outbuf), "%l" CRLF,
longval);
@@ -629,7 +615,6 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
}
}
}
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
}
else if (session->command == KVSTORAGE_CMD_SELECT) {
if (!is_redis) {
@@ -842,16 +827,13 @@ kvstorage_read_socket (f_str_t * in, void *arg)
if (session->command == KVSTORAGE_CMD_SET) {
session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
in->begin, in->len,
session->flags, session->expire)) {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
}
else {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
}
@@ -870,11 +852,9 @@ kvstorage_read_socket (f_str_t * in, void *arg)
case KVSTORAGE_STATE_READ_DATA:
session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
in->begin, in->len,
session->flags, session->expire)) {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, "STORED" CRLF,
sizeof ("STORED" CRLF) - 1, FALSE, TRUE);
@@ -885,7 +865,6 @@ kvstorage_read_socket (f_str_t * in, void *arg)
}
}
else {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED,
sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE);
@@ -911,7 +890,6 @@ kvstorage_write_socket (void *arg)
struct kvstorage_session *session = (struct kvstorage_session *) arg;
if (session->elt) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
session->elt = NULL;
}
@@ -954,9 +932,10 @@ thr_accept_socket (gint fd, short what, void *arg)
g_static_mutex_unlock (thr->accept_mtx);
return;
}
- g_static_mutex_unlock (thr->accept_mtx);
+
/* Check for EAGAIN */
if (nfd == 0) {
+ g_static_mutex_unlock (thr->accept_mtx);
return;
}
@@ -968,6 +947,8 @@ thr_accept_socket (gint fd, short what, void *arg)
session->dispather = rspamd_create_dispatcher (thr->ev_base, nfd, BUFFER_LINE,
kvstorage_read_socket, kvstorage_write_socket,
kvstorage_err_socket, thr->tv, session);
+
+ g_static_mutex_unlock (thr->accept_mtx);
session->elt = NULL;
if (su.ss.ss_family == AF_UNIX) {