/* Now check limits */
while (storage->memory + len > storage->max_memory) {
if (storage->expire) {
+ g_static_rw_lock_writer_lock (&storage->rwlock);
storage->expire->step_func (storage->expire, storage, time (NULL), steps);
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
}
/* Insert elt to the cache */
+ g_static_rw_lock_writer_lock (&storage->rwlock);
elt = storage->cache->insert_func (storage->cache, key, keylen, data, len);
+
if (elt == NULL) {
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return FALSE;
}
/* Copy data */
storage->elts ++;
storage->memory += elt->size + sizeof (struct rspamd_kv_element);
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return TRUE;
}
/* Now check limits */
while (storage->memory + len + keylen > storage->max_memory) {
if (storage->expire) {
+ g_static_rw_lock_writer_lock (&storage->rwlock);
storage->expire->step_func (storage->expire, storage, time (NULL), steps);
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
steps = 0;
while (storage->elts > storage->max_elts) {
if (storage->expire) {
+ g_static_rw_lock_writer_lock (&storage->rwlock);
storage->expire->step_func (storage->expire, storage, time (NULL), steps);
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
}
/* First try to search it in cache */
+ g_static_rw_lock_writer_lock (&storage->rwlock);
elt = storage->cache->lookup_func (storage->cache, key, keylen);
if (elt) {
if (storage->expire) {
else {
elt = storage->cache->insert_func (storage->cache, key, keylen, data, len);
if (elt == NULL) {
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return FALSE;
}
}
storage->elts ++;
storage->memory += ELT_SIZE (elt);
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return res;
}
/* Now check limits */
while (storage->memory + ELT_SIZE (elt) > storage->max_memory) {
if (storage->expire) {
+ g_static_rw_lock_writer_lock (&storage->rwlock);
storage->expire->step_func (storage->expire, storage, time (NULL), steps);
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
}
}
+ g_static_rw_lock_writer_lock (&storage->rwlock);
/* Insert elt to the cache */
res = storage->cache->replace_func (storage->cache, key, keylen, elt);
if (res && storage->backend) {
res = storage->backend->replace_func (storage->backend, key, keylen, elt);
}
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return res;
}
glong *lp;
/* First try to look at cache */
+ g_static_rw_lock_writer_lock (&storage->rwlock);
elt = storage->cache->lookup_func (storage->cache, key, keylen);
if (elt == NULL && storage->backend) {
*lp += *value;
*value = *lp;
if (storage->backend) {
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return storage->backend->replace_func (storage->backend, key, keylen, elt);
}
else {
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return TRUE;
}
}
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
+
return FALSE;
}
struct rspamd_kv_element *elt = NULL, *belt;
/* First try to look at cache */
+ g_static_rw_lock_reader_lock (&storage->rwlock);
elt = storage->cache->lookup_func (storage->cache, key, keylen);
+ g_static_rw_lock_reader_unlock (&storage->rwlock);
+
/* Next look at the backend */
if (elt == NULL && storage->backend) {
+ g_static_rw_lock_reader_lock (&storage->rwlock);
belt = storage->backend->lookup_func (storage->backend, key, keylen);
+ g_static_rw_lock_reader_unlock (&storage->rwlock);
if (belt) {
/* Put this element into cache */
rspamd_kv_storage_insert_internal (storage, ELT_KEY (belt), keylen, ELT_DATA (belt),
struct rspamd_kv_element *elt;
/* First delete key from cache */
+ g_static_rw_lock_writer_lock (&storage->rwlock);
elt = storage->cache->delete_func (storage->cache, key, keylen);
/* Now delete from backend */
storage->elts --;
storage->memory -= elt->size;
}
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return elt;
}
void
rspamd_kv_storage_destroy (struct rspamd_kv_storage *storage)
{
- if (storage->cache && storage->cache->destroy_func) {
- storage->cache->destroy_func (storage->cache);
- }
+ g_static_rw_lock_writer_lock (&storage->rwlock);
if (storage->backend && storage->backend->destroy_func) {
storage->backend->destroy_func (storage->backend);
}
if (storage->expire && storage->expire->destroy_func) {
storage->expire->destroy_func (storage->expire);
}
+ if (storage->cache && storage->cache->destroy_func) {
+ storage->cache->destroy_func (storage->cache);
+ }
g_free (storage->name);
+
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
g_slice_free1 (sizeof (struct rspamd_kv_storage), storage);
}
elt->flags |= KV_ELT_ARRAY;
g_slice_free1 (len + sizeof (guint), arr_data);
/* Place to the backend */
+
if (storage->backend) {
return storage->backend->insert_func (storage->backend, key, keylen, elt);
}
storage->cache->steal_func (storage->cache, elt);
TAILQ_REMOVE (&expire->head, elt, entry);
/* Free memory */
- g_slice_free1 (ELT_SIZE (elt), elt);
+ if ((elt->flags & KV_ELT_DIRTY) != 0) {
+ elt->flags |= KV_ELT_NEED_FREE;
+ }
+ else {
+ g_slice_free1 (ELT_SIZE (elt), elt);
+ }
}
}
/**
* Destroy the whole cache
*/
+
+static void
+rspamd_kv_hash_destroy_cb (gpointer key, gpointer value, gpointer unused)
+{
+ struct rspamd_kv_element *elt = value;
+
+ g_slice_free1 (ELT_SIZE (elt), elt);
+}
+
static void
rspamd_kv_hash_destroy (struct rspamd_kv_cache *c)
{
struct rspamd_kv_hash_cache *cache = (struct rspamd_kv_hash_cache *)c;
+ g_hash_table_foreach (cache->hash, rspamd_kv_hash_destroy_cb, NULL);
g_hash_table_destroy (cache->hash);
g_slice_free1 (sizeof (struct rspamd_kv_hash_cache), cache);
}
rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->arg_data.length);
}
else if (session->command == KVSTORAGE_CMD_GET) {
- g_static_rw_lock_reader_lock (&session->cf->storage->rwlock);
elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->keylen, session->now);
if (elt == NULL) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
}
if (!rspamd_dispatcher_write (session->dispather, outbuf,
r, TRUE, FALSE)) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
return FALSE;
}
if (elt->flags & KV_ELT_INTEGER) {
if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
return FALSE;
}
}
else {
- if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, FALSE)) {
return FALSE;
}
}
res = rspamd_dispatcher_write (session->dispather, CRLF,
sizeof (CRLF) - 1, FALSE, TRUE);
}
- if (!res) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
- }
return res;
}
}
else if (session->command == KVSTORAGE_CMD_DELETE) {
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
elt = rspamd_kv_storage_delete (session->cf->storage, session->key, session->keylen);
if (elt != NULL) {
if ((elt->flags & KV_ELT_DIRTY) == 0) {
/* Free memory if backend has deleted this element */
g_slice_free1 (ELT_SIZE (elt), elt);
}
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
}
}
else {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
}
}
else if (session->command == KVSTORAGE_CMD_INCR || session->command == KVSTORAGE_CMD_DECR) {
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
longval = session->arg_data.value;
if (!rspamd_kv_storage_increment (session->cf->storage, session->key, session->keylen, &longval)) {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
}
}
else {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
r = rspamd_snprintf (outbuf, sizeof (outbuf), "%l" CRLF,
longval);
}
}
}
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
}
else if (session->command == KVSTORAGE_CMD_SELECT) {
if (!is_redis) {
if (session->command == KVSTORAGE_CMD_SET) {
session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
in->begin, in->len,
session->flags, session->expire)) {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
}
else {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
}
case KVSTORAGE_STATE_READ_DATA:
session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
in->begin, in->len,
session->flags, session->expire)) {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, "STORED" CRLF,
sizeof ("STORED" CRLF) - 1, FALSE, TRUE);
}
}
else {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED,
sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE);
struct kvstorage_session *session = (struct kvstorage_session *) arg;
if (session->elt) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
session->elt = NULL;
}
g_static_mutex_unlock (thr->accept_mtx);
return;
}
- g_static_mutex_unlock (thr->accept_mtx);
+
/* Check for EAGAIN */
if (nfd == 0) {
+ g_static_mutex_unlock (thr->accept_mtx);
return;
}
session->dispather = rspamd_create_dispatcher (thr->ev_base, nfd, BUFFER_LINE,
kvstorage_read_socket, kvstorage_write_socket,
kvstorage_err_socket, thr->tv, session);
+
+ g_static_mutex_unlock (thr->accept_mtx);
session->elt = NULL;
if (su.ss.ss_family == AF_UNIX) {