typedef gboolean (*backend_replace)(struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt);
typedef struct rspamd_kv_element* (*backend_lookup)(struct rspamd_kv_backend *backend, gpointer key);
typedef void (*backend_delete)(struct rspamd_kv_backend *backend, gpointer key);
+typedef gboolean (*backend_sync)(struct rspamd_kv_backend *backend);
typedef void (*backend_destroy)(struct rspamd_kv_backend *backend);
/* Callbacks for expire */
backend_replace replace_func; /*< this callback is called when element is replaced */
backend_lookup lookup_func; /*< this callback is used for lookup of element */
backend_delete delete_func; /*< this callback is called when an element is deleted */
+ backend_sync sync_func; /*< this callback is called when backend need to be synced */
backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */
};
struct rspamd_kv_expire {
backend_replace replace_func; /*< this callback is called when element is replaced */
backend_lookup lookup_func; /*< this callback is used for lookup of element */
backend_delete delete_func; /*< this callback is called when an element is deleted */
+ backend_sync sync_func; /*< this callback is called when backend need to be synced */
backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */
DB_ENV *envp; /*< db environment */
DB *dbp; /*< db pointer */
/* Process operations queue */
static gboolean
-bdb_process_queue (struct rspamd_bdb_backend *db)
+bdb_process_queue (struct rspamd_kv_backend *backend)
{
+ struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend;
struct bdb_op *op;
GList *cur;
g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
- return bdb_process_queue (db);
+ return bdb_process_queue (backend);
}
return TRUE;
g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
- return bdb_process_queue (db);
+ return bdb_process_queue (backend);
}
return TRUE;
g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op);
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
- bdb_process_queue (db);
+ bdb_process_queue (backend);
}
return;
struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend;
if (db->initialized) {
- bdb_process_queue (db);
+ bdb_process_queue (backend);
if (db->dbp != NULL) {
db->dbp->close (db->dbp, 0);
}
new->lookup_func = rspamd_bdb_lookup;
new->delete_func = rspamd_bdb_delete;
new->replace_func = rspamd_bdb_replace;
+ new->sync_func = bdb_process_queue;
new->destroy_func = rspamd_bdb_destroy;
return (struct rspamd_kv_backend *)new;
}
}
else if (p - c == 4) {
- if (memcmp (c, "quit", 4) == 0) {
+ if (g_ascii_strncasecmp (c, "quit", 4) == 0) {
session->command = KVSTORAGE_CMD_QUIT;
state = 100;
continue;
}
+ if (g_ascii_strncasecmp (c, "sync", 4) == 0) {
+ session->command = KVSTORAGE_CMD_SYNC;
+ state = 100;
+ continue;
+ }
}
else if (p - c == 6) {
- if (memcmp (c, "delete", 6) == 0) {
+ if (g_ascii_strncasecmp (c, "delete", 6) == 0) {
session->command = KVSTORAGE_CMD_DELETE;
}
+
else {
return FALSE;
}
}
}
}
+ else if (session->command == KVSTORAGE_CMD_SYNC) {
+ if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) {
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, ERROR_COMMON,
+ sizeof (ERROR_COMMON) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "-ERR unsupported" CRLF,
+ sizeof ("-ERR unsupported" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ else {
+ if (session->cf->storage->backend->sync_func (session->cf->storage->backend)) {
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, "SYNCED" CRLF,
+ sizeof ("SYNCED" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+ sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ else {
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, "NOT_SYNCED" CRLF,
+ sizeof ("NOT_SYNCED" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "-ERR not synced" CRLF,
+ sizeof ("-ERR not synced" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ }
+ g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
+ }
else if (session->command == KVSTORAGE_CMD_QUIT) {
/* Quit session */
free_kvstorage_session (session);
KVSTORAGE_CMD_SET,
KVSTORAGE_CMD_GET,
KVSTORAGE_CMD_DELETE,
+ KVSTORAGE_CMD_SYNC,
KVSTORAGE_CMD_QUIT
} command;
guint id;
backend_replace replace_func; /*< this callback is called when element is replaced */
backend_lookup lookup_func; /*< this callback is used for lookup of element */
backend_delete delete_func; /*< this callback is called when an element is deleted */
+ backend_sync sync_func; /*< this callback is called when backend need to be synced */
backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */
sqlite3 *dbp;
gchar *filename;
/* Process operations queue */
static gboolean
-sqlite_process_queue (struct rspamd_sqlite_backend *db)
+sqlite_process_queue (struct rspamd_kv_backend *backend)
{
+ struct rspamd_sqlite_backend *db = (struct rspamd_sqlite_backend *)backend;
struct sqlite_op *op;
GList *cur;
g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
- return sqlite_process_queue (db);
+ return sqlite_process_queue (backend);
}
return TRUE;
g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
- return sqlite_process_queue (db);
+ return sqlite_process_queue (backend);
}
return TRUE;
g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op);
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
- sqlite_process_queue (db);
+ sqlite_process_queue (backend);
}
return;
struct rspamd_sqlite_backend *db = (struct rspamd_sqlite_backend *)backend;
if (db->initialized) {
- sqlite_process_queue (db);
+ sqlite_process_queue (backend);
if (db->get_stmt != NULL) {
sqlite3_finalize (db->get_stmt);
}
new->lookup_func = rspamd_sqlite_lookup;
new->delete_func = rspamd_sqlite_delete;
new->replace_func = rspamd_sqlite_replace;
+ new->sync_func = sqlite_process_queue;
new->destroy_func = rspamd_sqlite_destroy;
return (struct rspamd_kv_backend *)new;