From 993872bac9e7d3231824f90035c50a1a9b5aff4e Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 24 Nov 2011 20:11:27 +0300 Subject: [PATCH] Another fix to locking logic. --- src/kvstorage.c | 65 ++++++++++++++++++++++++++---------------- src/kvstorage.h | 7 ++++- src/kvstorage_file.c | 37 +++++++++++------------- src/kvstorage_server.c | 23 ++++++++++++++- 4 files changed, 85 insertions(+), 47 deletions(-) diff --git a/src/kvstorage.c b/src/kvstorage.c index 60d28a833..5ad076b3e 100644 --- a/src/kvstorage.c +++ b/src/kvstorage.c @@ -78,31 +78,32 @@ rspamd_kv_storage_new (gint id, const gchar *name, struct rspamd_kv_cache *cache /** Internal insertion to the kv storage from backend */ gboolean -rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer key, guint keylen, +rspamd_kv_storage_insert_cache (struct rspamd_kv_storage *storage, gpointer key, guint keylen, gpointer data, gsize len, gint flags, guint expire, struct rspamd_kv_element **pelt) { gint steps = 0; - struct rspamd_kv_element *elt = *pelt; + struct rspamd_kv_element *elt; + g_static_rw_lock_writer_lock (&storage->rwlock); /* Hard limit */ if (storage->max_memory > 0) { if (len > storage->max_memory) { msg_info ("<%s>: trying to insert value of length %z while limit is %z", storage->name, len, storage->max_memory); + g_static_rw_lock_writer_unlock (&storage->rwlock); return FALSE; } /* Now check limits */ while (storage->memory + len > storage->max_memory) { if (storage->expire) { - g_static_rw_lock_writer_lock (&storage->rwlock); storage->expire->step_func (storage->expire, storage, time (NULL), steps); - g_static_rw_lock_writer_unlock (&storage->rwlock); } else { msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); } if (++steps > MAX_EXPIRE_STEPS) { + g_static_rw_lock_writer_unlock (&storage->rwlock); msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); return FALSE; } @@ -110,7 +111,7 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k } /* Insert elt to the cache */ - g_static_rw_lock_writer_lock (&storage->rwlock); + elt = storage->cache->insert_func (storage->cache, key, keylen, data, len); if (elt == NULL) { @@ -120,7 +121,10 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k /* Copy data */ elt->flags = flags; elt->expire = expire; - *pelt = elt; + + if (pelt != NULL) { + *pelt = elt; + } /* Insert to the expire */ if (storage->expire) { @@ -145,24 +149,25 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint glong longval; /* Hard limit */ + g_static_rw_lock_writer_lock (&storage->rwlock); if (storage->max_memory > 0) { if (len + sizeof (struct rspamd_kv_element) + keylen >= storage->max_memory) { msg_warn ("<%s>: trying to insert value of length %z while limit is %z", storage->name, len, storage->max_memory); + g_static_rw_lock_writer_unlock (&storage->rwlock); return FALSE; } /* Now check limits */ while (storage->memory + len + keylen > storage->max_memory) { if (storage->expire) { - g_static_rw_lock_writer_lock (&storage->rwlock); storage->expire->step_func (storage->expire, storage, time (NULL), steps); - g_static_rw_lock_writer_unlock (&storage->rwlock); } else { msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); } if (++steps > MAX_EXPIRE_STEPS) { + g_static_rw_lock_writer_unlock (&storage->rwlock); msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); return FALSE; } @@ -173,14 +178,13 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint steps = 0; while (storage->elts > storage->max_elts) { if (storage->expire) { - g_static_rw_lock_writer_lock (&storage->rwlock); storage->expire->step_func (storage->expire, storage, time (NULL), steps); - g_static_rw_lock_writer_unlock (&storage->rwlock); } else { msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); } if (++steps > MAX_EXPIRE_STEPS) { + g_static_rw_lock_writer_unlock (&storage->rwlock); msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); return FALSE; } @@ -188,7 +192,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint } /* First try to search it in cache */ - g_static_rw_lock_writer_lock (&storage->rwlock); + elt = storage->cache->lookup_func (storage->cache, key, keylen); if (elt) { if (storage->expire) { @@ -309,9 +313,11 @@ rspamd_kv_storage_increment (struct rspamd_kv_storage *storage, gpointer key, gu if (belt) { /* Put this element into cache */ if ((belt->flags & KV_ELT_INTEGER) != 0) { - rspamd_kv_storage_insert_internal (storage, ELT_KEY (belt), keylen, ELT_DATA (belt), + g_static_rw_lock_writer_unlock (&storage->rwlock); + rspamd_kv_storage_insert_cache (storage, ELT_KEY (belt), keylen, ELT_DATA (belt), belt->size, belt->flags, belt->expire, &elt); + g_static_rw_lock_writer_lock (&storage->rwlock); } if ((belt->flags & KV_ELT_DIRTY) == 0) { g_free (belt); @@ -346,21 +352,19 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, guint /* First try to look at cache */ g_static_rw_lock_reader_lock (&storage->rwlock); elt = storage->cache->lookup_func (storage->cache, key, keylen); - g_static_rw_lock_reader_unlock (&storage->rwlock); - /* Next look at the backend */ if (elt == NULL && storage->backend) { - g_static_rw_lock_reader_lock (&storage->rwlock); belt = storage->backend->lookup_func (storage->backend, key, keylen); - g_static_rw_lock_reader_unlock (&storage->rwlock); + if (belt) { /* Put this element into cache */ - rspamd_kv_storage_insert_internal (storage, ELT_KEY (belt), keylen, ELT_DATA (belt), - belt->size, belt->flags, - belt->expire, &elt); if ((belt->flags & KV_ELT_DIRTY) == 0) { - g_free (belt); + belt->flags |= KV_ELT_NEED_INSERT; + return belt; + } + else { + elt = belt; } } } @@ -372,6 +376,7 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, guint } } + /* RWlock is still locked */ return elt; } @@ -396,7 +401,14 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint } storage->elts --; storage->memory -= elt->size; + if ((elt->flags & KV_ELT_DIRTY) != 0) { + elt->flags |= KV_ELT_NEED_FREE; + } + else { + g_slice_free1 (ELT_SIZE (elt), elt); + } } + g_static_rw_lock_writer_unlock (&storage->rwlock); return elt; @@ -437,7 +449,7 @@ rspamd_kv_storage_insert_array (struct rspamd_kv_storage *storage, gpointer key, es = arr_data; *es = elt_size; memcpy (arr_data, (gchar *)data + sizeof (guint), len); - if (!rspamd_kv_storage_insert_internal (storage, key, keylen, arr_data, len + sizeof (guint), + if (!rspamd_kv_storage_insert_cache (storage, key, keylen, arr_data, len + sizeof (guint), flags, expire, &elt)) { g_slice_free1 (len + sizeof (guint), arr_data); return FALSE; @@ -583,7 +595,12 @@ rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *st storage->elts --; TAILQ_REMOVE (&expire->head, elt, entry); /* Free memory */ - g_slice_free1 (ELT_SIZE (elt), elt); + if ((elt->flags & (KV_ELT_DIRTY|KV_ELT_NEED_INSERT)) != 0) { + elt->flags |= KV_ELT_NEED_FREE; + } + else { + g_slice_free1 (ELT_SIZE (elt), elt); + } res = TRUE; /* Check other elements in this queue */ TAILQ_FOREACH_SAFE (elt, &expire->head, entry, temp) { @@ -596,7 +613,7 @@ rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *st storage->cache->steal_func (storage->cache, elt); TAILQ_REMOVE (&expire->head, elt, entry); /* Free memory */ - if ((elt->flags & KV_ELT_DIRTY) != 0) { + if ((elt->flags & (KV_ELT_DIRTY|KV_ELT_NEED_INSERT)) != 0) { elt->flags |= KV_ELT_NEED_FREE; } else { @@ -613,7 +630,7 @@ rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *st storage->cache->steal_func (storage->cache, oldest_elt); TAILQ_REMOVE (&expire->head, oldest_elt, entry); /* Free memory */ - if ((oldest_elt->flags & KV_ELT_DIRTY) != 0) { + if ((oldest_elt->flags & (KV_ELT_DIRTY|KV_ELT_NEED_INSERT)) != 0) { oldest_elt->flags |= KV_ELT_NEED_FREE; } else { diff --git a/src/kvstorage.h b/src/kvstorage.h index f970a7086..b99050583 100644 --- a/src/kvstorage.h +++ b/src/kvstorage.h @@ -65,7 +65,8 @@ enum rspamd_kv_flags { KV_ELT_DIRTY = 1 << 2, KV_ELT_OUSTED = 1 << 3, KV_ELT_NEED_FREE = 1 << 4, - KV_ELT_INTEGER = 1 << 5 + KV_ELT_INTEGER = 1 << 5, + KV_ELT_NEED_INSERT = 1 << 6 }; #define ELT_DATA(elt) (gchar *)(elt)->data + (elt)->keylen + 1 @@ -140,6 +141,10 @@ struct rspamd_kv_storage *rspamd_kv_storage_new (gint id, const gchar *name, /** Insert new element to the kv storage */ gboolean rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint keylen, gpointer data, gsize len, gint flags, guint expire); +/** Insert element only in cache */ +gboolean rspamd_kv_storage_insert_cache (struct rspamd_kv_storage *storage, gpointer key, guint keylen, + gpointer data, gsize len, gint flags, guint expire, struct rspamd_kv_element **pelt); + /** Replace an element in the kv storage */ gboolean rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guint keylen, struct rspamd_kv_element *elt); diff --git a/src/kvstorage_file.c b/src/kvstorage_file.c index c342b1fbf..81521720c 100644 --- a/src/kvstorage_file.c +++ b/src/kvstorage_file.c @@ -201,10 +201,15 @@ file_process_queue (struct rspamd_kv_backend *backend) cur = db->ops_queue->head; while (cur) { op = cur->data; - if (op->op == FILE_OP_DELETE || (op->elt->flags & KV_ELT_NEED_FREE) != 0) { + if (op->op == FILE_OP_DELETE || ((op->elt->flags & KV_ELT_NEED_FREE) != 0 && + (op->elt->flags & KV_ELT_NEED_INSERT) == 0)) { /* Also clean memory */ g_slice_free1 (ELT_SIZE (op->elt), op->elt); } + else { + /* Unset dirty flag */ + op->elt->flags &= ~KV_ELT_DIRTY; + } g_slice_free1 (sizeof (struct file_op), op); cur = g_list_next (cur); } @@ -419,6 +424,8 @@ rspamd_file_lookup (struct rspamd_kv_backend *backend, gpointer key, guint keyle close (fd); + elt->flags &= ~(KV_ELT_DIRTY|KV_ELT_NEED_FREE); + return elt; } @@ -426,39 +433,27 @@ static void rspamd_file_delete (struct rspamd_kv_backend *backend, gpointer key, guint keylen) { struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend; - struct file_op *op; - struct rspamd_kv_element *elt; + gchar filebuf[PATH_MAX]; struct rspamd_kv_element search_elt; - - search_elt.keylen = keylen; - search_elt.p = key; + struct file_op *op; if (!db->initialized) { return; } + search_elt.keylen = keylen; + search_elt.p = key; + /* First search in ops queue */ if ((op = g_hash_table_lookup (db->ops_hash, &search_elt)) != NULL) { op->op = FILE_OP_DELETE; return; } - - elt = rspamd_file_lookup (backend, key, keylen); - if (elt == NULL) { + /* Get filename */ + if (!get_file_name (db, key, keylen, filebuf, sizeof (filebuf))) { return; } - op = g_slice_alloc (sizeof (struct file_op)); - op->op = FILE_OP_DELETE; - op->elt = elt; - elt->flags |= KV_ELT_DIRTY; - - g_queue_push_head (db->ops_queue, op); - g_hash_table_insert (db->ops_hash, elt, op); - - if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) { - file_process_queue (backend); - } - return; + unlink (filebuf); } static void diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index 5ef34cbed..3e4cedcf9 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -508,11 +508,13 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } if (elt->flags & KV_ELT_INTEGER) { if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) { + g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); return FALSE; } } else { - if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, FALSE)) { + if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) { + g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); return FALSE; } } @@ -524,6 +526,9 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) res = rspamd_dispatcher_write (session->dispather, CRLF, sizeof (CRLF) - 1, FALSE, TRUE); } + if (!res) { + g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + } return res; } @@ -890,7 +895,22 @@ kvstorage_write_socket (void *arg) struct kvstorage_session *session = (struct kvstorage_session *) arg; if (session->elt) { + + if ((session->elt->flags & KV_ELT_NEED_INSERT) != 0) { + /* Insert to cache and free element */ + session->elt->flags &= ~KV_ELT_NEED_INSERT; + g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + rspamd_kv_storage_insert_cache (session->cf->storage, ELT_KEY (session->elt), + session->elt->keylen, ELT_DATA (session->elt), + session->elt->size, session->elt->flags, + session->elt->expire, NULL); + g_free (session->elt); + session->elt = NULL; + return TRUE; + } + g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); session->elt = NULL; + } return TRUE; @@ -972,6 +992,7 @@ kvstorage_thread (gpointer ud) sigprocmask (SIG_BLOCK, thr->signals, NULL); /* Init thread specific events */ thr->ev_base = event_init (); + event_set (&thr->bind_ev, thr->worker->cf->listen_sock, EV_READ | EV_PERSIST, thr_accept_socket, (void *)thr); event_base_set (thr->ev_base, &thr->bind_ev); event_add (&thr->bind_ev, NULL); -- 2.39.5