summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-11-07 03:20:42 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-11-07 03:20:42 +0300
commitd651a97fe2551d53544d48487e3bcbbfad750eff (patch)
tree0307000a17c537a3fd460d0c71f81d19552896d5
parent9ec83e20dcb4dd417f65ccf573a6c5de1bc20978 (diff)
downloadrspamd-d651a97fe2551d53544d48487e3bcbbfad750eff.tar.gz
rspamd-d651a97fe2551d53544d48487e3bcbbfad750eff.zip
* Implement sync command for manual synchronization with backend.
-rw-r--r--src/kvstorage.h2
-rw-r--r--src/kvstorage_bdb.c13
-rw-r--r--src/kvstorage_server.c45
-rw-r--r--src/kvstorage_server.h1
-rw-r--r--src/kvstorage_sqlite.c13
5 files changed, 62 insertions, 12 deletions
diff --git a/src/kvstorage.h b/src/kvstorage.h
index fb5848274..e4ad31cc1 100644
--- a/src/kvstorage.h
+++ b/src/kvstorage.h
@@ -47,6 +47,7 @@ typedef gboolean (*backend_insert)(struct rspamd_kv_backend *backend, gpointer k
typedef gboolean (*backend_replace)(struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt);
typedef struct rspamd_kv_element* (*backend_lookup)(struct rspamd_kv_backend *backend, gpointer key);
typedef void (*backend_delete)(struct rspamd_kv_backend *backend, gpointer key);
+typedef gboolean (*backend_sync)(struct rspamd_kv_backend *backend);
typedef void (*backend_destroy)(struct rspamd_kv_backend *backend);
/* Callbacks for expire */
@@ -99,6 +100,7 @@ struct rspamd_kv_backend {
backend_replace replace_func; /*< this callback is called when element is replaced */
backend_lookup lookup_func; /*< this callback is used for lookup of element */
backend_delete delete_func; /*< this callback is called when an element is deleted */
+ backend_sync sync_func; /*< this callback is called when backend need to be synced */
backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */
};
struct rspamd_kv_expire {
diff --git a/src/kvstorage_bdb.c b/src/kvstorage_bdb.c
index 87c343b22..280cb5ce9 100644
--- a/src/kvstorage_bdb.c
+++ b/src/kvstorage_bdb.c
@@ -45,6 +45,7 @@ struct rspamd_bdb_backend {
backend_replace replace_func; /*< this callback is called when element is replaced */
backend_lookup lookup_func; /*< this callback is used for lookup of element */
backend_delete delete_func; /*< this callback is called when an element is deleted */
+ backend_sync sync_func; /*< this callback is called when backend need to be synced */
backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */
DB_ENV *envp; /*< db environment */
DB *dbp; /*< db pointer */
@@ -93,8 +94,9 @@ bdb_process_single_op (struct rspamd_bdb_backend *db, DB_TXN *txn, struct bdb_op
/* Process operations queue */
static gboolean
-bdb_process_queue (struct rspamd_bdb_backend *db)
+bdb_process_queue (struct rspamd_kv_backend *backend)
{
+ struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend;
struct bdb_op *op;
GList *cur;
@@ -198,7 +200,7 @@ rspamd_bdb_insert (struct rspamd_kv_backend *backend, gpointer key, struct rspam
g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
- return bdb_process_queue (db);
+ return bdb_process_queue (backend);
}
return TRUE;
@@ -223,7 +225,7 @@ rspamd_bdb_replace (struct rspamd_kv_backend *backend, gpointer key, struct rspa
g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
- return bdb_process_queue (db);
+ return bdb_process_queue (backend);
}
return TRUE;
@@ -292,7 +294,7 @@ rspamd_bdb_delete (struct rspamd_kv_backend *backend, gpointer key)
g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op);
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
- bdb_process_queue (db);
+ bdb_process_queue (backend);
}
return;
@@ -304,7 +306,7 @@ rspamd_bdb_destroy (struct rspamd_kv_backend *backend)
struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend;
if (db->initialized) {
- bdb_process_queue (db);
+ bdb_process_queue (backend);
if (db->dbp != NULL) {
db->dbp->close (db->dbp, 0);
}
@@ -353,6 +355,7 @@ rspamd_kv_bdb_new (const gchar *filename, guint sync_ops)
new->lookup_func = rspamd_bdb_lookup;
new->delete_func = rspamd_bdb_delete;
new->replace_func = rspamd_bdb_replace;
+ new->sync_func = bdb_process_queue;
new->destroy_func = rspamd_bdb_destroy;
return (struct rspamd_kv_backend *)new;
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c
index c6190454c..56f25ed97 100644
--- a/src/kvstorage_server.c
+++ b/src/kvstorage_server.c
@@ -199,16 +199,22 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
}
}
else if (p - c == 4) {
- if (memcmp (c, "quit", 4) == 0) {
+ if (g_ascii_strncasecmp (c, "quit", 4) == 0) {
session->command = KVSTORAGE_CMD_QUIT;
state = 100;
continue;
}
+ if (g_ascii_strncasecmp (c, "sync", 4) == 0) {
+ session->command = KVSTORAGE_CMD_SYNC;
+ state = 100;
+ continue;
+ }
}
else if (p - c == 6) {
- if (memcmp (c, "delete", 6) == 0) {
+ if (g_ascii_strncasecmp (c, "delete", 6) == 0) {
session->command = KVSTORAGE_CMD_DELETE;
}
+
else {
return FALSE;
}
@@ -445,6 +451,41 @@ kvstorage_read_socket (f_str_t * in, void *arg)
}
}
}
+ else if (session->command == KVSTORAGE_CMD_SYNC) {
+ if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) {
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, ERROR_COMMON,
+ sizeof (ERROR_COMMON) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "-ERR unsupported" CRLF,
+ sizeof ("-ERR unsupported" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ else {
+ if (session->cf->storage->backend->sync_func (session->cf->storage->backend)) {
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, "SYNCED" CRLF,
+ sizeof ("SYNCED" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+ sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ else {
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, "NOT_SYNCED" CRLF,
+ sizeof ("NOT_SYNCED" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "-ERR not synced" CRLF,
+ sizeof ("-ERR not synced" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ }
+ g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
+ }
else if (session->command == KVSTORAGE_CMD_QUIT) {
/* Quit session */
free_kvstorage_session (session);
diff --git a/src/kvstorage_server.h b/src/kvstorage_server.h
index b513d33ab..4f9e8c951 100644
--- a/src/kvstorage_server.h
+++ b/src/kvstorage_server.h
@@ -65,6 +65,7 @@ struct kvstorage_session {
KVSTORAGE_CMD_SET,
KVSTORAGE_CMD_GET,
KVSTORAGE_CMD_DELETE,
+ KVSTORAGE_CMD_SYNC,
KVSTORAGE_CMD_QUIT
} command;
guint id;
diff --git a/src/kvstorage_sqlite.c b/src/kvstorage_sqlite.c
index bde3ea1c6..3fa4bab20 100644
--- a/src/kvstorage_sqlite.c
+++ b/src/kvstorage_sqlite.c
@@ -51,6 +51,7 @@ struct rspamd_sqlite_backend {
backend_replace replace_func; /*< this callback is called when element is replaced */
backend_lookup lookup_func; /*< this callback is used for lookup of element */
backend_delete delete_func; /*< this callback is called when an element is deleted */
+ backend_sync sync_func; /*< this callback is called when backend need to be synced */
backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */
sqlite3 *dbp;
gchar *filename;
@@ -100,8 +101,9 @@ sqlite_process_single_op (struct rspamd_sqlite_backend *db, struct sqlite_op *op
/* Process operations queue */
static gboolean
-sqlite_process_queue (struct rspamd_sqlite_backend *db)
+sqlite_process_queue (struct rspamd_kv_backend *backend)
{
+ struct rspamd_sqlite_backend *db = (struct rspamd_sqlite_backend *)backend;
struct sqlite_op *op;
GList *cur;
@@ -261,7 +263,7 @@ rspamd_sqlite_insert (struct rspamd_kv_backend *backend, gpointer key, struct rs
g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
- return sqlite_process_queue (db);
+ return sqlite_process_queue (backend);
}
return TRUE;
@@ -286,7 +288,7 @@ rspamd_sqlite_replace (struct rspamd_kv_backend *backend, gpointer key, struct r
g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
- return sqlite_process_queue (db);
+ return sqlite_process_queue (backend);
}
return TRUE;
@@ -356,7 +358,7 @@ rspamd_sqlite_delete (struct rspamd_kv_backend *backend, gpointer key)
g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op);
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
- sqlite_process_queue (db);
+ sqlite_process_queue (backend);
}
return;
@@ -368,7 +370,7 @@ rspamd_sqlite_destroy (struct rspamd_kv_backend *backend)
struct rspamd_sqlite_backend *db = (struct rspamd_sqlite_backend *)backend;
if (db->initialized) {
- sqlite_process_queue (db);
+ sqlite_process_queue (backend);
if (db->get_stmt != NULL) {
sqlite3_finalize (db->get_stmt);
}
@@ -422,6 +424,7 @@ rspamd_kv_sqlite_new (const gchar *filename, guint sync_ops)
new->lookup_func = rspamd_sqlite_lookup;
new->delete_func = rspamd_sqlite_delete;
new->replace_func = rspamd_sqlite_replace;
+ new->sync_func = sqlite_process_queue;
new->destroy_func = rspamd_sqlite_destroy;
return (struct rspamd_kv_backend *)new;