aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-11-24 20:11:27 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-11-24 20:11:27 +0300
commit993872bac9e7d3231824f90035c50a1a9b5aff4e (patch)
tree6681b4338589dbcbb63307b33a3e4ef2291e6c32
parent5caf2897f55f821386f4b9196a7ba73df209321d (diff)
downloadrspamd-993872bac9e7d3231824f90035c50a1a9b5aff4e.tar.gz
rspamd-993872bac9e7d3231824f90035c50a1a9b5aff4e.zip
Another fix to locking logic.
-rw-r--r--src/kvstorage.c65
-rw-r--r--src/kvstorage.h7
-rw-r--r--src/kvstorage_file.c37
-rw-r--r--src/kvstorage_server.c23
4 files changed, 85 insertions, 47 deletions
diff --git a/src/kvstorage.c b/src/kvstorage.c
index 60d28a833..5ad076b3e 100644
--- a/src/kvstorage.c
+++ b/src/kvstorage.c
@@ -78,31 +78,32 @@ rspamd_kv_storage_new (gint id, const gchar *name, struct rspamd_kv_cache *cache
/** Internal insertion to the kv storage from backend */
gboolean
-rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer key, guint keylen,
+rspamd_kv_storage_insert_cache (struct rspamd_kv_storage *storage, gpointer key, guint keylen,
gpointer data, gsize len, gint flags, guint expire, struct rspamd_kv_element **pelt)
{
gint steps = 0;
- struct rspamd_kv_element *elt = *pelt;
+ struct rspamd_kv_element *elt;
+ g_static_rw_lock_writer_lock (&storage->rwlock);
/* Hard limit */
if (storage->max_memory > 0) {
if (len > storage->max_memory) {
msg_info ("<%s>: trying to insert value of length %z while limit is %z", storage->name,
len, storage->max_memory);
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return FALSE;
}
/* Now check limits */
while (storage->memory + len > storage->max_memory) {
if (storage->expire) {
- g_static_rw_lock_writer_lock (&storage->rwlock);
storage->expire->step_func (storage->expire, storage, time (NULL), steps);
- g_static_rw_lock_writer_unlock (&storage->rwlock);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
}
if (++steps > MAX_EXPIRE_STEPS) {
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
return FALSE;
}
@@ -110,7 +111,7 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k
}
/* Insert elt to the cache */
- g_static_rw_lock_writer_lock (&storage->rwlock);
+
elt = storage->cache->insert_func (storage->cache, key, keylen, data, len);
if (elt == NULL) {
@@ -120,7 +121,10 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k
/* Copy data */
elt->flags = flags;
elt->expire = expire;
- *pelt = elt;
+
+ if (pelt != NULL) {
+ *pelt = elt;
+ }
/* Insert to the expire */
if (storage->expire) {
@@ -145,24 +149,25 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
glong longval;
/* Hard limit */
+ g_static_rw_lock_writer_lock (&storage->rwlock);
if (storage->max_memory > 0) {
if (len + sizeof (struct rspamd_kv_element) + keylen >= storage->max_memory) {
msg_warn ("<%s>: trying to insert value of length %z while limit is %z", storage->name,
len, storage->max_memory);
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return FALSE;
}
/* Now check limits */
while (storage->memory + len + keylen > storage->max_memory) {
if (storage->expire) {
- g_static_rw_lock_writer_lock (&storage->rwlock);
storage->expire->step_func (storage->expire, storage, time (NULL), steps);
- g_static_rw_lock_writer_unlock (&storage->rwlock);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
}
if (++steps > MAX_EXPIRE_STEPS) {
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
return FALSE;
}
@@ -173,14 +178,13 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
steps = 0;
while (storage->elts > storage->max_elts) {
if (storage->expire) {
- g_static_rw_lock_writer_lock (&storage->rwlock);
storage->expire->step_func (storage->expire, storage, time (NULL), steps);
- g_static_rw_lock_writer_unlock (&storage->rwlock);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
}
if (++steps > MAX_EXPIRE_STEPS) {
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
return FALSE;
}
@@ -188,7 +192,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
}
/* First try to search it in cache */
- g_static_rw_lock_writer_lock (&storage->rwlock);
+
elt = storage->cache->lookup_func (storage->cache, key, keylen);
if (elt) {
if (storage->expire) {
@@ -309,9 +313,11 @@ rspamd_kv_storage_increment (struct rspamd_kv_storage *storage, gpointer key, gu
if (belt) {
/* Put this element into cache */
if ((belt->flags & KV_ELT_INTEGER) != 0) {
- rspamd_kv_storage_insert_internal (storage, ELT_KEY (belt), keylen, ELT_DATA (belt),
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
+ rspamd_kv_storage_insert_cache (storage, ELT_KEY (belt), keylen, ELT_DATA (belt),
belt->size, belt->flags,
belt->expire, &elt);
+ g_static_rw_lock_writer_lock (&storage->rwlock);
}
if ((belt->flags & KV_ELT_DIRTY) == 0) {
g_free (belt);
@@ -346,21 +352,19 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, guint
/* First try to look at cache */
g_static_rw_lock_reader_lock (&storage->rwlock);
elt = storage->cache->lookup_func (storage->cache, key, keylen);
- g_static_rw_lock_reader_unlock (&storage->rwlock);
-
/* Next look at the backend */
if (elt == NULL && storage->backend) {
- g_static_rw_lock_reader_lock (&storage->rwlock);
belt = storage->backend->lookup_func (storage->backend, key, keylen);
- g_static_rw_lock_reader_unlock (&storage->rwlock);
+
if (belt) {
/* Put this element into cache */
- rspamd_kv_storage_insert_internal (storage, ELT_KEY (belt), keylen, ELT_DATA (belt),
- belt->size, belt->flags,
- belt->expire, &elt);
if ((belt->flags & KV_ELT_DIRTY) == 0) {
- g_free (belt);
+ belt->flags |= KV_ELT_NEED_INSERT;
+ return belt;
+ }
+ else {
+ elt = belt;
}
}
}
@@ -372,6 +376,7 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, guint
}
}
+ /* RWlock is still locked */
return elt;
}
@@ -396,7 +401,14 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint
}
storage->elts --;
storage->memory -= elt->size;
+ if ((elt->flags & KV_ELT_DIRTY) != 0) {
+ elt->flags |= KV_ELT_NEED_FREE;
+ }
+ else {
+ g_slice_free1 (ELT_SIZE (elt), elt);
+ }
}
+
g_static_rw_lock_writer_unlock (&storage->rwlock);
return elt;
@@ -437,7 +449,7 @@ rspamd_kv_storage_insert_array (struct rspamd_kv_storage *storage, gpointer key,
es = arr_data;
*es = elt_size;
memcpy (arr_data, (gchar *)data + sizeof (guint), len);
- if (!rspamd_kv_storage_insert_internal (storage, key, keylen, arr_data, len + sizeof (guint),
+ if (!rspamd_kv_storage_insert_cache (storage, key, keylen, arr_data, len + sizeof (guint),
flags, expire, &elt)) {
g_slice_free1 (len + sizeof (guint), arr_data);
return FALSE;
@@ -583,7 +595,12 @@ rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *st
storage->elts --;
TAILQ_REMOVE (&expire->head, elt, entry);
/* Free memory */
- g_slice_free1 (ELT_SIZE (elt), elt);
+ if ((elt->flags & (KV_ELT_DIRTY|KV_ELT_NEED_INSERT)) != 0) {
+ elt->flags |= KV_ELT_NEED_FREE;
+ }
+ else {
+ g_slice_free1 (ELT_SIZE (elt), elt);
+ }
res = TRUE;
/* Check other elements in this queue */
TAILQ_FOREACH_SAFE (elt, &expire->head, entry, temp) {
@@ -596,7 +613,7 @@ rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *st
storage->cache->steal_func (storage->cache, elt);
TAILQ_REMOVE (&expire->head, elt, entry);
/* Free memory */
- if ((elt->flags & KV_ELT_DIRTY) != 0) {
+ if ((elt->flags & (KV_ELT_DIRTY|KV_ELT_NEED_INSERT)) != 0) {
elt->flags |= KV_ELT_NEED_FREE;
}
else {
@@ -613,7 +630,7 @@ rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *st
storage->cache->steal_func (storage->cache, oldest_elt);
TAILQ_REMOVE (&expire->head, oldest_elt, entry);
/* Free memory */
- if ((oldest_elt->flags & KV_ELT_DIRTY) != 0) {
+ if ((oldest_elt->flags & (KV_ELT_DIRTY|KV_ELT_NEED_INSERT)) != 0) {
oldest_elt->flags |= KV_ELT_NEED_FREE;
}
else {
diff --git a/src/kvstorage.h b/src/kvstorage.h
index f970a7086..b99050583 100644
--- a/src/kvstorage.h
+++ b/src/kvstorage.h
@@ -65,7 +65,8 @@ enum rspamd_kv_flags {
KV_ELT_DIRTY = 1 << 2,
KV_ELT_OUSTED = 1 << 3,
KV_ELT_NEED_FREE = 1 << 4,
- KV_ELT_INTEGER = 1 << 5
+ KV_ELT_INTEGER = 1 << 5,
+ KV_ELT_NEED_INSERT = 1 << 6
};
#define ELT_DATA(elt) (gchar *)(elt)->data + (elt)->keylen + 1
@@ -140,6 +141,10 @@ struct rspamd_kv_storage *rspamd_kv_storage_new (gint id, const gchar *name,
/** Insert new element to the kv storage */
gboolean rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint keylen, gpointer data, gsize len, gint flags, guint expire);
+/** Insert element only in cache */
+gboolean rspamd_kv_storage_insert_cache (struct rspamd_kv_storage *storage, gpointer key, guint keylen,
+ gpointer data, gsize len, gint flags, guint expire, struct rspamd_kv_element **pelt);
+
/** Replace an element in the kv storage */
gboolean rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guint keylen, struct rspamd_kv_element *elt);
diff --git a/src/kvstorage_file.c b/src/kvstorage_file.c
index c342b1fbf..81521720c 100644
--- a/src/kvstorage_file.c
+++ b/src/kvstorage_file.c
@@ -201,10 +201,15 @@ file_process_queue (struct rspamd_kv_backend *backend)
cur = db->ops_queue->head;
while (cur) {
op = cur->data;
- if (op->op == FILE_OP_DELETE || (op->elt->flags & KV_ELT_NEED_FREE) != 0) {
+ if (op->op == FILE_OP_DELETE || ((op->elt->flags & KV_ELT_NEED_FREE) != 0 &&
+ (op->elt->flags & KV_ELT_NEED_INSERT) == 0)) {
/* Also clean memory */
g_slice_free1 (ELT_SIZE (op->elt), op->elt);
}
+ else {
+ /* Unset dirty flag */
+ op->elt->flags &= ~KV_ELT_DIRTY;
+ }
g_slice_free1 (sizeof (struct file_op), op);
cur = g_list_next (cur);
}
@@ -419,6 +424,8 @@ rspamd_file_lookup (struct rspamd_kv_backend *backend, gpointer key, guint keyle
close (fd);
+ elt->flags &= ~(KV_ELT_DIRTY|KV_ELT_NEED_FREE);
+
return elt;
}
@@ -426,39 +433,27 @@ static void
rspamd_file_delete (struct rspamd_kv_backend *backend, gpointer key, guint keylen)
{
struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend;
- struct file_op *op;
- struct rspamd_kv_element *elt;
+ gchar filebuf[PATH_MAX];
struct rspamd_kv_element search_elt;
-
- search_elt.keylen = keylen;
- search_elt.p = key;
+ struct file_op *op;
if (!db->initialized) {
return;
}
+ search_elt.keylen = keylen;
+ search_elt.p = key;
+ /* First search in ops queue */
if ((op = g_hash_table_lookup (db->ops_hash, &search_elt)) != NULL) {
op->op = FILE_OP_DELETE;
return;
}
-
- elt = rspamd_file_lookup (backend, key, keylen);
- if (elt == NULL) {
+ /* Get filename */
+ if (!get_file_name (db, key, keylen, filebuf, sizeof (filebuf))) {
return;
}
- op = g_slice_alloc (sizeof (struct file_op));
- op->op = FILE_OP_DELETE;
- op->elt = elt;
- elt->flags |= KV_ELT_DIRTY;
-
- g_queue_push_head (db->ops_queue, op);
- g_hash_table_insert (db->ops_hash, elt, op);
-
- if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) {
- file_process_queue (backend);
- }
- return;
+ unlink (filebuf);
}
static void
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c
index 5ef34cbed..3e4cedcf9 100644
--- a/src/kvstorage_server.c
+++ b/src/kvstorage_server.c
@@ -508,11 +508,13 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
}
if (elt->flags & KV_ELT_INTEGER) {
if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) {
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
return FALSE;
}
}
else {
- if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, FALSE)) {
+ if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) {
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
return FALSE;
}
}
@@ -524,6 +526,9 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
res = rspamd_dispatcher_write (session->dispather, CRLF,
sizeof (CRLF) - 1, FALSE, TRUE);
}
+ if (!res) {
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ }
return res;
}
@@ -890,7 +895,22 @@ kvstorage_write_socket (void *arg)
struct kvstorage_session *session = (struct kvstorage_session *) arg;
if (session->elt) {
+
+ if ((session->elt->flags & KV_ELT_NEED_INSERT) != 0) {
+ /* Insert to cache and free element */
+ session->elt->flags &= ~KV_ELT_NEED_INSERT;
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ rspamd_kv_storage_insert_cache (session->cf->storage, ELT_KEY (session->elt),
+ session->elt->keylen, ELT_DATA (session->elt),
+ session->elt->size, session->elt->flags,
+ session->elt->expire, NULL);
+ g_free (session->elt);
+ session->elt = NULL;
+ return TRUE;
+ }
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
session->elt = NULL;
+
}
return TRUE;
@@ -972,6 +992,7 @@ kvstorage_thread (gpointer ud)
sigprocmask (SIG_BLOCK, thr->signals, NULL);
/* Init thread specific events */
thr->ev_base = event_init ();
+
event_set (&thr->bind_ev, thr->worker->cf->listen_sock, EV_READ | EV_PERSIST, thr_accept_socket, (void *)thr);
event_base_set (thr->ev_base, &thr->bind_ev);
event_add (&thr->bind_ev, NULL);