diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-11-07 03:20:42 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-11-07 03:20:42 +0300 |
commit | d651a97fe2551d53544d48487e3bcbbfad750eff (patch) | |
tree | 0307000a17c537a3fd460d0c71f81d19552896d5 | |
parent | 9ec83e20dcb4dd417f65ccf573a6c5de1bc20978 (diff) | |
download | rspamd-d651a97fe2551d53544d48487e3bcbbfad750eff.tar.gz rspamd-d651a97fe2551d53544d48487e3bcbbfad750eff.zip |
* Implement sync command for manual synchronization with backend.
-rw-r--r-- | src/kvstorage.h | 2 | ||||
-rw-r--r-- | src/kvstorage_bdb.c | 13 | ||||
-rw-r--r-- | src/kvstorage_server.c | 45 | ||||
-rw-r--r-- | src/kvstorage_server.h | 1 | ||||
-rw-r--r-- | src/kvstorage_sqlite.c | 13 |
5 files changed, 62 insertions, 12 deletions
diff --git a/src/kvstorage.h b/src/kvstorage.h index fb5848274..e4ad31cc1 100644 --- a/src/kvstorage.h +++ b/src/kvstorage.h @@ -47,6 +47,7 @@ typedef gboolean (*backend_insert)(struct rspamd_kv_backend *backend, gpointer k typedef gboolean (*backend_replace)(struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt); typedef struct rspamd_kv_element* (*backend_lookup)(struct rspamd_kv_backend *backend, gpointer key); typedef void (*backend_delete)(struct rspamd_kv_backend *backend, gpointer key); +typedef gboolean (*backend_sync)(struct rspamd_kv_backend *backend); typedef void (*backend_destroy)(struct rspamd_kv_backend *backend); /* Callbacks for expire */ @@ -99,6 +100,7 @@ struct rspamd_kv_backend { backend_replace replace_func; /*< this callback is called when element is replaced */ backend_lookup lookup_func; /*< this callback is used for lookup of element */ backend_delete delete_func; /*< this callback is called when an element is deleted */ + backend_sync sync_func; /*< this callback is called when backend need to be synced */ backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */ }; struct rspamd_kv_expire { diff --git a/src/kvstorage_bdb.c b/src/kvstorage_bdb.c index 87c343b22..280cb5ce9 100644 --- a/src/kvstorage_bdb.c +++ b/src/kvstorage_bdb.c @@ -45,6 +45,7 @@ struct rspamd_bdb_backend { backend_replace replace_func; /*< this callback is called when element is replaced */ backend_lookup lookup_func; /*< this callback is used for lookup of element */ backend_delete delete_func; /*< this callback is called when an element is deleted */ + backend_sync sync_func; /*< this callback is called when backend need to be synced */ backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */ DB_ENV *envp; /*< db environment */ DB *dbp; /*< db pointer */ @@ -93,8 +94,9 @@ bdb_process_single_op (struct rspamd_bdb_backend *db, DB_TXN *txn, struct bdb_op /* Process operations queue */ static gboolean -bdb_process_queue (struct rspamd_bdb_backend *db) +bdb_process_queue (struct rspamd_kv_backend *backend) { + struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend; struct bdb_op *op; GList *cur; @@ -198,7 +200,7 @@ rspamd_bdb_insert (struct rspamd_kv_backend *backend, gpointer key, struct rspam g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op); if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { - return bdb_process_queue (db); + return bdb_process_queue (backend); } return TRUE; @@ -223,7 +225,7 @@ rspamd_bdb_replace (struct rspamd_kv_backend *backend, gpointer key, struct rspa g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op); if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { - return bdb_process_queue (db); + return bdb_process_queue (backend); } return TRUE; @@ -292,7 +294,7 @@ rspamd_bdb_delete (struct rspamd_kv_backend *backend, gpointer key) g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op); if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { - bdb_process_queue (db); + bdb_process_queue (backend); } return; @@ -304,7 +306,7 @@ rspamd_bdb_destroy (struct rspamd_kv_backend *backend) struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend; if (db->initialized) { - bdb_process_queue (db); + bdb_process_queue (backend); if (db->dbp != NULL) { db->dbp->close (db->dbp, 0); } @@ -353,6 +355,7 @@ rspamd_kv_bdb_new (const gchar *filename, guint sync_ops) new->lookup_func = rspamd_bdb_lookup; new->delete_func = rspamd_bdb_delete; new->replace_func = rspamd_bdb_replace; + new->sync_func = bdb_process_queue; new->destroy_func = rspamd_bdb_destroy; return (struct rspamd_kv_backend *)new; diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index c6190454c..56f25ed97 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -199,16 +199,22 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in) } } else if (p - c == 4) { - if (memcmp (c, "quit", 4) == 0) { + if (g_ascii_strncasecmp (c, "quit", 4) == 0) { session->command = KVSTORAGE_CMD_QUIT; state = 100; continue; } + if (g_ascii_strncasecmp (c, "sync", 4) == 0) { + session->command = KVSTORAGE_CMD_SYNC; + state = 100; + continue; + } } else if (p - c == 6) { - if (memcmp (c, "delete", 6) == 0) { + if (g_ascii_strncasecmp (c, "delete", 6) == 0) { session->command = KVSTORAGE_CMD_DELETE; } + else { return FALSE; } @@ -445,6 +451,41 @@ kvstorage_read_socket (f_str_t * in, void *arg) } } } + else if (session->command == KVSTORAGE_CMD_SYNC) { + if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) { + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, ERROR_COMMON, + sizeof (ERROR_COMMON) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, "-ERR unsupported" CRLF, + sizeof ("-ERR unsupported" CRLF) - 1, FALSE, TRUE); + } + } + else { + if (session->cf->storage->backend->sync_func (session->cf->storage->backend)) { + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, "SYNCED" CRLF, + sizeof ("SYNCED" CRLF) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, + sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + } + } + else { + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, "NOT_SYNCED" CRLF, + sizeof ("NOT_SYNCED" CRLF) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, "-ERR not synced" CRLF, + sizeof ("-ERR not synced" CRLF) - 1, FALSE, TRUE); + } + } + } + g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); + } else if (session->command == KVSTORAGE_CMD_QUIT) { /* Quit session */ free_kvstorage_session (session); diff --git a/src/kvstorage_server.h b/src/kvstorage_server.h index b513d33ab..4f9e8c951 100644 --- a/src/kvstorage_server.h +++ b/src/kvstorage_server.h @@ -65,6 +65,7 @@ struct kvstorage_session { KVSTORAGE_CMD_SET, KVSTORAGE_CMD_GET, KVSTORAGE_CMD_DELETE, + KVSTORAGE_CMD_SYNC, KVSTORAGE_CMD_QUIT } command; guint id; diff --git a/src/kvstorage_sqlite.c b/src/kvstorage_sqlite.c index bde3ea1c6..3fa4bab20 100644 --- a/src/kvstorage_sqlite.c +++ b/src/kvstorage_sqlite.c @@ -51,6 +51,7 @@ struct rspamd_sqlite_backend { backend_replace replace_func; /*< this callback is called when element is replaced */ backend_lookup lookup_func; /*< this callback is used for lookup of element */ backend_delete delete_func; /*< this callback is called when an element is deleted */ + backend_sync sync_func; /*< this callback is called when backend need to be synced */ backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */ sqlite3 *dbp; gchar *filename; @@ -100,8 +101,9 @@ sqlite_process_single_op (struct rspamd_sqlite_backend *db, struct sqlite_op *op /* Process operations queue */ static gboolean -sqlite_process_queue (struct rspamd_sqlite_backend *db) +sqlite_process_queue (struct rspamd_kv_backend *backend) { + struct rspamd_sqlite_backend *db = (struct rspamd_sqlite_backend *)backend; struct sqlite_op *op; GList *cur; @@ -261,7 +263,7 @@ rspamd_sqlite_insert (struct rspamd_kv_backend *backend, gpointer key, struct rs g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op); if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { - return sqlite_process_queue (db); + return sqlite_process_queue (backend); } return TRUE; @@ -286,7 +288,7 @@ rspamd_sqlite_replace (struct rspamd_kv_backend *backend, gpointer key, struct r g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op); if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { - return sqlite_process_queue (db); + return sqlite_process_queue (backend); } return TRUE; @@ -356,7 +358,7 @@ rspamd_sqlite_delete (struct rspamd_kv_backend *backend, gpointer key) g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op); if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { - sqlite_process_queue (db); + sqlite_process_queue (backend); } return; @@ -368,7 +370,7 @@ rspamd_sqlite_destroy (struct rspamd_kv_backend *backend) struct rspamd_sqlite_backend *db = (struct rspamd_sqlite_backend *)backend; if (db->initialized) { - sqlite_process_queue (db); + sqlite_process_queue (backend); if (db->get_stmt != NULL) { sqlite3_finalize (db->get_stmt); } @@ -422,6 +424,7 @@ rspamd_kv_sqlite_new (const gchar *filename, guint sync_ops) new->lookup_func = rspamd_sqlite_lookup; new->delete_func = rspamd_sqlite_delete; new->replace_func = rspamd_sqlite_replace; + new->sync_func = sqlite_process_queue; new->destroy_func = rspamd_sqlite_destroy; return (struct rspamd_kv_backend *)new; |