diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-07-23 12:53:08 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-07-23 12:53:08 +0100 |
commit | fe79d8c5a39f2b717f78cc3f3ef21b3cfc46500b (patch) | |
tree | c84e6a5d4c5cd78a7a2cc3c7adbc7af5d0541682 /src/kvstorage_server.c | |
parent | e0483657ff6cf1adc828ccce457814d61fe90a0d (diff) | |
download | rspamd-fe79d8c5a39f2b717f78cc3f3ef21b3cfc46500b.tar.gz rspamd-fe79d8c5a39f2b717f78cc3f3ef21b3cfc46500b.zip |
Revert "Unify code style."
This reverts commit e0483657ff6cf1adc828ccce457814d61fe90a0d.
Diffstat (limited to 'src/kvstorage_server.c')
-rw-r--r-- | src/kvstorage_server.c | 630 |
1 files changed, 232 insertions, 398 deletions
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index 34a8ff8ce..75ada2c77 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -22,12 +22,12 @@ */ -#include "cfg_file.h" -#include "cfg_xml.h" #include "config.h" #include "kvstorage.h" #include "kvstorage_config.h" #include "kvstorage_server.h" +#include "cfg_file.h" +#include "cfg_xml.h" #include "main.h" #define ERROR_COMMON "ERROR" CRLF @@ -45,31 +45,22 @@ static sig_atomic_t do_reopen_log = 0; static sig_atomic_t soft_wanna_die = 0; /* Logging functions */ -#define thr_err(...) do { \ - g_mutex_lock (thr->log_mtx); \ - rspamd_common_log_function (rspamd_main->logger, \ - G_LOG_LEVEL_CRITICAL, \ - __FUNCTION__, \ - __VA_ARGS__); \ - g_mutex_unlock (thr->log_mtx); \ +#define thr_err(...) do { \ + g_mutex_lock (thr->log_mtx); \ + rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_CRITICAL, __FUNCTION__, __VA_ARGS__); \ + g_mutex_unlock (thr->log_mtx); \ } while (0) -#define thr_warn(...) do { \ - g_mutex_lock (thr->log_mtx); \ - rspamd_common_log_function (rspamd_main->logger, \ - G_LOG_LEVEL_WARNING, \ - __FUNCTION__, \ - __VA_ARGS__); \ - g_mutex_unlock (thr->log_mtx); \ +#define thr_warn(...) do { \ + g_mutex_lock (thr->log_mtx); \ + rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_WARNING, __FUNCTION__, __VA_ARGS__); \ + g_mutex_unlock (thr->log_mtx); \ } while (0) -#define thr_info(...) do { \ - g_mutex_lock (thr->log_mtx); \ - rspamd_common_log_function (rspamd_main->logger, \ - G_LOG_LEVEL_INFO, \ - __FUNCTION__, \ - __VA_ARGS__); \ - g_mutex_unlock (thr->log_mtx); \ +#define thr_info(...) do { \ + g_mutex_lock (thr->log_mtx); \ + rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_INFO, __FUNCTION__, __VA_ARGS__); \ + g_mutex_unlock (thr->log_mtx); \ } while (0) /* Init functions */ @@ -77,14 +68,14 @@ gpointer init_keystorage (void); void start_keystorage (struct rspamd_worker *worker); worker_t keystorage_worker = { - "keystorage", /* Name */ - init_keystorage, /* Init function */ - start_keystorage, /* Start function */ - TRUE, /* Has socket */ - FALSE, /* Non unique */ - TRUE, /* Non threaded */ - FALSE, /* Non killable */ - SOCK_STREAM /* TCP socket */ + "keystorage", /* Name */ + init_keystorage, /* Init function */ + start_keystorage, /* Start function */ + TRUE, /* Has socket */ + FALSE, /* Non unique */ + TRUE, /* Non threaded */ + FALSE, /* Non killable */ + SOCK_STREAM /* TCP socket */ }; #ifndef HAVE_SA_SIGINFO @@ -112,8 +103,8 @@ sig_handler (gint signo, siginfo_t *info, void *unused) gpointer init_keystorage (void) { - struct kvstorage_worker_ctx *ctx; - GQuark type; + struct kvstorage_worker_ctx *ctx; + GQuark type; type = g_quark_try_string ("keystorage"); ctx = g_malloc0 (sizeof (struct kvstorage_worker_ctx)); @@ -123,9 +114,9 @@ init_keystorage (void) ctx->timeout_raw = 300000; register_worker_opt (type, "timeout", xml_handle_seconds, ctx, - G_STRUCT_OFFSET (struct kvstorage_worker_ctx, timeout_raw)); + G_STRUCT_OFFSET (struct kvstorage_worker_ctx, timeout_raw)); register_worker_opt (type, "redis", xml_handle_boolean, ctx, - G_STRUCT_OFFSET (struct kvstorage_worker_ctx, is_redis)); + G_STRUCT_OFFSET (struct kvstorage_worker_ctx, is_redis)); return ctx; } @@ -133,7 +124,7 @@ init_keystorage (void) static gboolean config_kvstorage_worker (struct rspamd_worker *worker) { - struct kvstorage_worker_ctx *ctx = worker->ctx; + struct kvstorage_worker_ctx *ctx = worker->ctx; /* Init timeval */ msec_to_tv (ctx->timeout_raw, &ctx->io_timeout); @@ -162,18 +153,18 @@ parse_kvstorage_command (struct kvstorage_session *session, gchar *c, guint len) if (len == 3) { /* Set or get command */ if ((c[0] == 'g' || c[0] == 'G') && - (c[1] == 'e' || c[1] == 'E') && - (c[2] == 't' || c[2] == 'T')) { + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 't' || c[2] == 'T')) { session->command = KVSTORAGE_CMD_GET; } else if ((c[0] == 's' || c[0] == 'S') && - (c[1] == 'e' || c[1] == 'E') && - (c[2] == 't' || c[2] == 'T')) { + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 't' || c[2] == 'T')) { session->command = KVSTORAGE_CMD_SET; } else if ((c[0] == 'd' || c[0] == 'D') && - (c[1] == 'e' || c[1] == 'E') && - (c[2] == 'l' || c[2] == 'L')) { + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 'l' || c[2] == 'L')) { session->command = KVSTORAGE_CMD_DELETE; } else { @@ -183,52 +174,51 @@ parse_kvstorage_command (struct kvstorage_session *session, gchar *c, guint len) } else if (len == 4) { if ((c[0] == 'i' || c[0] == 'I') && - (c[1] == 'n' || c[1] == 'N') && - (c[2] == 'c' || c[2] == 'C') && - (c[3] == 'r' || c[3] == 'R')) { + (c[1] == 'n' || c[1] == 'N') && + (c[2] == 'c' || c[2] == 'C') && + (c[3] == 'r' || c[3] == 'R')) { session->command = KVSTORAGE_CMD_INCR; session->arg_data.value = 1; } else if ((c[0] == 'd' || c[0] == 'D') && - (c[1] == 'e' || c[1] == 'E') && - (c[2] == 'c' || c[2] == 'C') && - (c[3] == 'r' || c[3] == 'R')) { + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 'c' || c[2] == 'C') && + (c[3] == 'r' || c[3] == 'R')) { session->command = KVSTORAGE_CMD_DECR; session->arg_data.value = -1; } else if (g_ascii_strncasecmp (c, "quit", 4) == 0) { session->command = KVSTORAGE_CMD_QUIT; } - else if (g_ascii_strncasecmp (c, "sync", - 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) { + else if (g_ascii_strncasecmp (c, "sync", 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) { session->command = KVSTORAGE_CMD_SYNC; } } else if (len == 6) { if ((c[0] == 'i' || c[0] == 'I') && - (c[1] == 'n' || c[1] == 'N') && - (c[2] == 'c' || c[2] == 'C') && - (c[3] == 'r' || c[3] == 'R') && - (c[4] == 'b' || c[4] == 'B') && - (c[5] == 'y' || c[5] == 'Y')) { + (c[1] == 'n' || c[1] == 'N') && + (c[2] == 'c' || c[2] == 'C') && + (c[3] == 'r' || c[3] == 'R') && + (c[4] == 'b' || c[4] == 'B') && + (c[5] == 'y' || c[5] == 'Y')) { session->command = KVSTORAGE_CMD_INCR; session->arg_data.value = 1; } else if ((c[0] == 'd' || c[0] == 'D') && - (c[1] == 'e' || c[1] == 'E') && - (c[2] == 'c' || c[2] == 'C') && - (c[3] == 'r' || c[3] == 'R') && - (c[4] == 'b' || c[4] == 'B') && - (c[5] == 'y' || c[5] == 'Y')) { + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 'c' || c[2] == 'C') && + (c[3] == 'r' || c[3] == 'R') && + (c[4] == 'b' || c[4] == 'B') && + (c[5] == 'y' || c[5] == 'Y')) { session->command = KVSTORAGE_CMD_DECR; session->arg_data.value = -1; } else if ((c[0] == 'd' || c[0] == 'D') && - (c[1] == 'e' || c[1] == 'E') && - (c[2] == 'l' || c[2] == 'L') && - (c[3] == 'e' || c[3] == 'E') && - (c[4] == 't' || c[4] == 'T') && - (c[5] == 'e' || c[5] == 'E')) { + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 'l' || c[2] == 'L') && + (c[3] == 'e' || c[3] == 'E') && + (c[4] == 't' || c[4] == 'T') && + (c[5] == 'e' || c[5] == 'E')) { session->command = KVSTORAGE_CMD_DELETE; } else if (g_ascii_strncasecmp (c, "select", 6) == 0) { @@ -248,9 +238,9 @@ parse_kvstorage_command (struct kvstorage_session *session, gchar *c, guint len) static gboolean parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) { - gchar *p, *c, *end; - gint state = 0, next_state = 0; - gboolean is_redis; + gchar *p, *c, *end; + gint state = 0, next_state = 0; + gboolean is_redis; p = in->begin; end = in->begin + in->len; @@ -263,7 +253,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 0: /* At this state we try to read identifier of storage */ if (g_ascii_isdigit (*p)) { - p++; + p ++; } else { if (g_ascii_isspace (*p) && p != c) { @@ -294,7 +284,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 1: /* At this state we parse command */ if (g_ascii_isalpha (*p) && p != end) { - p++; + p ++; } else { if (parse_kvstorage_command (session, c, p - c)) { @@ -325,21 +315,20 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 2: /* Read and store key */ if (!g_ascii_isspace (*p) && end != p) { - p++; + p ++; } else { if (p == c) { return FALSE; } else { - session->key = rspamd_mempool_alloc (session->pool, - p - c + 1); + session->key = rspamd_mempool_alloc (session->pool, p - c + 1); rspamd_strlcpy (session->key, c, p - c + 1); session->keylen = p - c; /* Now we must select next state based on command */ if (session->command == KVSTORAGE_CMD_SET || - session->command == KVSTORAGE_CMD_INCR || - session->command == KVSTORAGE_CMD_DECR) { + session->command == KVSTORAGE_CMD_INCR || + session->command == KVSTORAGE_CMD_DECR) { /* Read flags */ state = 99; if (is_redis) { @@ -366,7 +355,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 3: /* Read flags */ if (g_ascii_isdigit (*p)) { - p++; + p ++; } else { if (g_ascii_isspace (*p)) { @@ -388,7 +377,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 4: /* Read exptime */ if (g_ascii_isdigit (*p)) { - p++; + p ++; } else { if (g_ascii_isspace (*p)) { @@ -404,7 +393,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 5: /* Read size or incr/decr values */ if (g_ascii_isdigit (*p)) { - p++; + p ++; } else { if (g_ascii_isspace (*p) || p >= end - 1) { @@ -415,8 +404,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) if (p != c) { session->arg_data.value = strtoul (c, NULL, 10); if (session->command == KVSTORAGE_CMD_DECR) { - session->arg_data.value = - -session->arg_data.value; + session->arg_data.value = -session->arg_data.value; } } else if (session->command == KVSTORAGE_CMD_INCR) { @@ -436,7 +424,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 6: /* Read index of storage */ if (g_ascii_isdigit (*p)) { - p++; + p ++; } else { if (g_ascii_isspace (*p) || end == p) { @@ -451,7 +439,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 7: /* Read arguments count */ if (g_ascii_isdigit (*p)) { - p++; + p ++; } else { if (g_ascii_isspace (*p) || end == p) { @@ -469,7 +457,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 99: /* Skip spaces state */ if (g_ascii_isspace (*p)) { - p++; + p ++; } else { c = p; @@ -490,44 +478,33 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) static gboolean kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) { - gint r; - gchar outbuf[BUFSIZ], intbuf[sizeof ("9223372036854775807")]; - gboolean res; - struct rspamd_kv_element *elt; - guint eltlen; - glong longval; + gint r; + gchar outbuf[BUFSIZ], intbuf[sizeof ("9223372036854775807")]; + gboolean res; + struct rspamd_kv_element *elt; + guint eltlen; + glong longval; if (session->command == KVSTORAGE_CMD_SET) { session->state = KVSTORAGE_STATE_READ_DATA; - rspamd_set_dispatcher_policy (session->dispather, - BUFFER_CHARACTER, - session->arg_data.length); + rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->arg_data.length); } else if (session->command == KVSTORAGE_CMD_GET) { - elt = rspamd_kv_storage_lookup (session->cf->storage, - session->key, - session->keylen, - session->now); + elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->keylen, session->now); if (elt == NULL) { RW_R_UNLOCK (&session->cf->storage->rwlock); if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, - ERROR_NOT_FOUND, - sizeof (ERROR_NOT_FOUND) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, + sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); } else { return rspamd_dispatcher_write (session->dispather, "$-1" CRLF, - sizeof ("$-1" CRLF) - 1, FALSE, TRUE); + sizeof ("$-1" CRLF) - 1, FALSE, TRUE); } } else { if (elt->flags & KV_ELT_INTEGER) { - eltlen = rspamd_snprintf (intbuf, - sizeof (intbuf), - "%l", - ELT_LONG (elt)); + eltlen = rspamd_snprintf (intbuf, sizeof (intbuf), "%l", ELT_LONG (elt)); } else { @@ -535,43 +512,34 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } if (!is_redis) { - r = rspamd_snprintf (outbuf, - sizeof (outbuf), - "VALUE %s %ud %ud" CRLF, - ELT_KEY (elt), - elt->flags, - eltlen); + r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF, + ELT_KEY (elt), elt->flags, eltlen); } else { r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF, eltlen); } if (!rspamd_dispatcher_write (session->dispather, outbuf, - r, TRUE, FALSE)) { + r, TRUE, FALSE)) { RW_R_UNLOCK (&session->cf->storage->rwlock); return FALSE; } if (elt->flags & KV_ELT_INTEGER) { - if (!rspamd_dispatcher_write (session->dispather, intbuf, - eltlen, TRUE, TRUE)) { + if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) { RW_R_UNLOCK (&session->cf->storage->rwlock); return FALSE; } } else { - if (!rspamd_dispatcher_write (session->dispather, - ELT_DATA (elt), eltlen, TRUE, TRUE)) { + if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) { RW_R_UNLOCK (&session->cf->storage->rwlock); return FALSE; } } session->elt = elt; if (!is_redis) { - res = rspamd_dispatcher_write (session->dispather, - CRLF "END" CRLF, - sizeof (CRLF "END" CRLF) - 1, - FALSE, - TRUE); + res = rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF, + sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE); } else { res = rspamd_dispatcher_write (session->dispather, CRLF, @@ -585,58 +553,42 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } } else if (session->command == KVSTORAGE_CMD_DELETE) { - elt = rspamd_kv_storage_delete (session->cf->storage, - session->key, - session->keylen); + elt = rspamd_kv_storage_delete (session->cf->storage, session->key, session->keylen); if (elt != NULL) { if ((elt->flags & KV_ELT_DIRTY) == 0) { /* Free memory if backend has deleted this element */ g_slice_free1 (ELT_SIZE (elt), elt); } if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, - "DELETED" CRLF, - sizeof ("DELETED" CRLF) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF, + sizeof ("DELETED" CRLF) - 1, FALSE, TRUE); } else { return rspamd_dispatcher_write (session->dispather, ":1" CRLF, - sizeof (":1" CRLF) - 1, FALSE, TRUE); + sizeof (":1" CRLF) - 1, FALSE, TRUE); } } else { if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, - ERROR_NOT_FOUND, - sizeof (ERROR_NOT_FOUND) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, + sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); } else { return rspamd_dispatcher_write (session->dispather, ":0" CRLF, - sizeof (":0" CRLF) - 1, FALSE, TRUE); + sizeof (":0" CRLF) - 1, FALSE, TRUE); } } } - else if (session->command == KVSTORAGE_CMD_INCR || session->command == - KVSTORAGE_CMD_DECR) { + else if (session->command == KVSTORAGE_CMD_INCR || session->command == KVSTORAGE_CMD_DECR) { longval = session->arg_data.value; - if (!rspamd_kv_storage_increment (session->cf->storage, session->key, - session->keylen, &longval)) { + if (!rspamd_kv_storage_increment (session->cf->storage, session->key, session->keylen, &longval)) { if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, - ERROR_NOT_FOUND, - sizeof (ERROR_NOT_FOUND) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, + sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); } else { - return rspamd_dispatcher_write (session->dispather, - "-ERR not found" CRLF, - sizeof ("-ERR not found" CRLF) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, "-ERR not found" CRLF, + sizeof ("-ERR not found" CRLF) - 1, FALSE, TRUE); } } else { @@ -649,61 +601,41 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) longval); } if (!rspamd_dispatcher_write (session->dispather, outbuf, - r, FALSE, FALSE)) { + r, FALSE, FALSE)) { return FALSE; } } } else if (session->command == KVSTORAGE_CMD_SYNC) { - if (session->cf->storage->backend == NULL || - session->cf->storage->backend->sync_func == NULL) { + if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) { if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, - ERROR_COMMON, - sizeof (ERROR_COMMON) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, ERROR_COMMON, + sizeof (ERROR_COMMON) - 1, FALSE, TRUE); } else { - return rspamd_dispatcher_write (session->dispather, - "-ERR unsupported" CRLF, - sizeof ("-ERR unsupported" CRLF) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, "-ERR unsupported" CRLF, + sizeof ("-ERR unsupported" CRLF) - 1, FALSE, TRUE); } } else { - if (session->cf->storage->backend->sync_func (session->cf->storage-> - backend)) { + if (session->cf->storage->backend->sync_func (session->cf->storage->backend)) { if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, - "SYNCED" CRLF, - sizeof ("SYNCED" CRLF) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, "SYNCED" CRLF, + sizeof ("SYNCED" CRLF) - 1, FALSE, TRUE); } else { - return rspamd_dispatcher_write (session->dispather, - "+OK" CRLF, - sizeof ("+OK" CRLF) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, + sizeof ("+OK" CRLF) - 1, FALSE, TRUE); } } else { if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, - "NOT_SYNCED" CRLF, - sizeof ("NOT_SYNCED" CRLF) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, "NOT_SYNCED" CRLF, + sizeof ("NOT_SYNCED" CRLF) - 1, FALSE, TRUE); } else { - return rspamd_dispatcher_write (session->dispather, - "-ERR not synced" CRLF, - sizeof ("-ERR not synced" CRLF) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, "-ERR not synced" CRLF, + sizeof ("-ERR not synced" CRLF) - 1, FALSE, TRUE); } } } @@ -711,11 +643,11 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) else if (session->command == KVSTORAGE_CMD_SELECT) { if (!is_redis) { return rspamd_dispatcher_write (session->dispather, "SELECTED" CRLF, - sizeof ("SELECTED" CRLF) - 1, FALSE, TRUE); + sizeof ("SELECTED" CRLF) - 1, FALSE, TRUE); } else { return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, - sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + sizeof ("+OK" CRLF) - 1, FALSE, TRUE); } } else if (session->command == KVSTORAGE_CMD_QUIT) { @@ -730,8 +662,8 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) static gboolean kvstorage_read_arglen (f_str_t *in, guint *len) { - gchar *p = in->begin, *end = in->begin + in->len, *c; - gint state = 0; + gchar *p = in->begin, *end = in->begin + in->len, *c; + gint state = 0; c = p; while (p < end) { @@ -741,14 +673,14 @@ kvstorage_read_arglen (f_str_t *in, guint *len) return FALSE; } else { - p++; + p ++; c = p; state = 1; } break; case 1: if (g_ascii_isdigit (*p) && p != end - 1) { - p++; + p ++; } else { if (p != end - 1) { @@ -799,12 +731,12 @@ kvstorage_check_argnum (struct kvstorage_session *session) static gboolean kvstorage_read_socket (f_str_t * in, void *arg) { - struct kvstorage_session *session = (struct kvstorage_session *) arg; - struct kvstorage_worker_thread *thr; - gint r; - guint arglen = 0; - gchar outbuf[BUFSIZ]; - gboolean is_redis; + struct kvstorage_session *session = (struct kvstorage_session *) arg; + struct kvstorage_worker_thread *thr; + gint r; + guint arglen = 0; + gchar outbuf[BUFSIZ]; + gboolean is_redis; if (in->len == 0) { /* Skip empty commands */ @@ -817,22 +749,16 @@ kvstorage_read_socket (f_str_t * in, void *arg) case KVSTORAGE_STATE_READ_CMD: /* Update timestamp */ session->now = time (NULL); - if (!parse_kvstorage_line (session, in)) { + if (! parse_kvstorage_line (session, in)) { thr_info ("%ud: unknown command: %V", thr->id, in); if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, - ERROR_UNKNOWN_COMMAND, - sizeof (ERROR_UNKNOWN_COMMAND) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND, + sizeof (ERROR_UNKNOWN_COMMAND) - 1, FALSE, TRUE); } else { - r = rspamd_snprintf (outbuf, - sizeof (outbuf), - "-ERR unknown command '%V'" CRLF, - in); + r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown command '%V'" CRLF, in); return rspamd_dispatcher_write (session->dispather, outbuf, - r, FALSE, TRUE); + r, FALSE, TRUE); } } else { @@ -840,18 +766,12 @@ kvstorage_read_socket (f_str_t * in, void *arg) if (session->cf == NULL) { thr_info ("%ud: bad keystorage: %ud", thr->id, session->id); if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, - ERROR_INVALID_KEYSTORAGE, - sizeof (ERROR_INVALID_KEYSTORAGE) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, ERROR_INVALID_KEYSTORAGE, + sizeof (ERROR_INVALID_KEYSTORAGE) - 1, FALSE, TRUE); } else { - return rspamd_dispatcher_write (session->dispather, - "-ERR unknown keystorage" CRLF, - sizeof ("-ERR unknown keystorage" CRLF) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, "-ERR unknown keystorage" CRLF, + sizeof ("-ERR unknown keystorage" CRLF) - 1, FALSE, TRUE); } } if (session->state != KVSTORAGE_STATE_READ_ARGLEN) { @@ -860,59 +780,44 @@ kvstorage_read_socket (f_str_t * in, void *arg) } break; case KVSTORAGE_STATE_READ_ARGLEN: - if (!kvstorage_read_arglen (in, &arglen)) { + if (! kvstorage_read_arglen (in, &arglen)) { session->state = KVSTORAGE_STATE_READ_CMD; - r = rspamd_snprintf (outbuf, - sizeof (outbuf), - "-ERR unknown arglen '%V'" CRLF, - in); + r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown arglen '%V'" CRLF, in); return rspamd_dispatcher_write (session->dispather, outbuf, - r, FALSE, TRUE); + r, FALSE, TRUE); } else { session->state = KVSTORAGE_STATE_READ_ARG; - rspamd_set_dispatcher_policy (session->dispather, - BUFFER_CHARACTER, - arglen); + rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, arglen); } break; case KVSTORAGE_STATE_READ_ARG: if (session->argnum == 0) { /* Read command */ - if (!parse_kvstorage_command (session, in->begin, in->len)) { + if (! parse_kvstorage_command (session, in->begin, in->len)) { session->state = KVSTORAGE_STATE_READ_CMD; - r = rspamd_snprintf (outbuf, - sizeof (outbuf), - "-ERR unknown command '%V'" CRLF, - in); + r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown command '%V'" CRLF, in); return rspamd_dispatcher_write (session->dispather, outbuf, - r, FALSE, TRUE); + r, FALSE, TRUE); } else { - if (!kvstorage_check_argnum (session)) { + if (! kvstorage_check_argnum (session)) { session->state = KVSTORAGE_STATE_READ_CMD; - r = rspamd_snprintf (outbuf, - sizeof (outbuf), - "-ERR invalid argnum for command '%V': %ud" CRLF, - in, - session->argc); + r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR invalid argnum for command '%V': %ud" CRLF, + in, session->argc); return rspamd_dispatcher_write (session->dispather, outbuf, - r, FALSE, TRUE); + r, FALSE, TRUE); } else { if (session->argnum == session->argc - 1) { session->state = KVSTORAGE_STATE_READ_CMD; - rspamd_set_dispatcher_policy (session->dispather, - BUFFER_LINE, - -1); + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); return kvstorage_process_command (session, TRUE); } else { - session->argnum++; + session->argnum ++; session->state = KVSTORAGE_STATE_READ_ARGLEN; - rspamd_set_dispatcher_policy (session->dispather, - BUFFER_LINE, - -1); + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); } } } @@ -924,28 +829,21 @@ kvstorage_read_socket (f_str_t * in, void *arg) session->keylen = in->len; if (session->argnum == session->argc - 1) { session->state = KVSTORAGE_STATE_READ_CMD; - rspamd_set_dispatcher_policy (session->dispather, - BUFFER_LINE, - -1); + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); return kvstorage_process_command (session, TRUE); } else { - session->argnum++; + session->argnum ++; session->state = KVSTORAGE_STATE_READ_ARGLEN; - rspamd_set_dispatcher_policy (session->dispather, - BUFFER_LINE, - -1); + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); } } else { /* Special case for select command */ session->state = KVSTORAGE_STATE_READ_CMD; - rspamd_strlcpy (outbuf, in->begin, MIN (sizeof (outbuf), - in->len)); + rspamd_strlcpy (outbuf, in->begin, MIN (sizeof (outbuf), in->len)); session->id = strtoul (outbuf, NULL, 10); - rspamd_set_dispatcher_policy (session->dispather, - BUFFER_LINE, - -1); + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); return kvstorage_process_command (session, TRUE); } } @@ -953,37 +851,25 @@ kvstorage_read_socket (f_str_t * in, void *arg) /* We get datablock for set command */ if (session->command == KVSTORAGE_CMD_SET && session->argc == 3) { session->state = KVSTORAGE_STATE_READ_CMD; - rspamd_set_dispatcher_policy (session->dispather, - BUFFER_LINE, - -1); - if (rspamd_kv_storage_insert (session->cf->storage, - session->key, session->keylen, - in->begin, in->len, - session->flags, session->expire)) { - return rspamd_dispatcher_write (session->dispather, - "+OK" CRLF, - sizeof ("+OK" CRLF) - 1, - FALSE, - TRUE); + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen, + in->begin, in->len, + session->flags, session->expire)) { + return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, + sizeof ("+OK" CRLF) - 1, FALSE, TRUE); } else { - return rspamd_dispatcher_write (session->dispather, - "-ERR not stored" CRLF, - sizeof ("-ERR not stored" CRLF) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF, + sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE); } } - else if (session->command == KVSTORAGE_CMD_SET && session->argc == - 4) { + else if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) { /* It is expire argument */ session->state = KVSTORAGE_STATE_READ_CMD; rspamd_strtol (in->begin, in->len, (glong *)&session->expire); - session->argnum++; + session->argnum ++; session->state = KVSTORAGE_STATE_READ_ARGLEN; - rspamd_set_dispatcher_policy (session->dispather, - BUFFER_LINE, - -1); + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); } else { session->state = KVSTORAGE_STATE_READ_CMD; @@ -991,9 +877,7 @@ kvstorage_read_socket (f_str_t * in, void *arg) if (session->command == KVSTORAGE_CMD_DECR) { session->arg_data.value = -session->arg_data.value; } - rspamd_set_dispatcher_policy (session->dispather, - BUFFER_LINE, - -1); + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); return kvstorage_process_command (session, TRUE); } } @@ -1001,25 +885,16 @@ kvstorage_read_socket (f_str_t * in, void *arg) /* We get datablock for set command */ if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) { session->state = KVSTORAGE_STATE_READ_CMD; - rspamd_set_dispatcher_policy (session->dispather, - BUFFER_LINE, - -1); - if (rspamd_kv_storage_insert (session->cf->storage, - session->key, session->keylen, - in->begin, in->len, - session->flags, session->expire)) { - return rspamd_dispatcher_write (session->dispather, - "+OK" CRLF, - sizeof ("+OK" CRLF) - 1, - FALSE, - TRUE); + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen, + in->begin, in->len, + session->flags, session->expire)) { + return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, + sizeof ("+OK" CRLF) - 1, FALSE, TRUE); } else { - return rspamd_dispatcher_write (session->dispather, - "-ERR not stored" CRLF, - sizeof ("-ERR not stored" CRLF) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF, + sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE); } } } @@ -1027,36 +902,26 @@ kvstorage_read_socket (f_str_t * in, void *arg) case KVSTORAGE_STATE_READ_DATA: session->state = KVSTORAGE_STATE_READ_CMD; rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); - if (rspamd_kv_storage_insert (session->cf->storage, session->key, - session->keylen, - in->begin, in->len, - session->flags, session->expire)) { + if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen, + in->begin, in->len, + session->flags, session->expire)) { if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, - "STORED" CRLF, - sizeof ("STORED" CRLF) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, "STORED" CRLF, + sizeof ("STORED" CRLF) - 1, FALSE, TRUE); } else { return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, - sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + sizeof ("+OK" CRLF) - 1, FALSE, TRUE); } } else { if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, - ERROR_NOT_STORED, - sizeof (ERROR_NOT_STORED) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED, + sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE); } else { - return rspamd_dispatcher_write (session->dispather, - "-ERR not stored" CRLF, - sizeof ("-ERR not stored" CRLF) - 1, - FALSE, - TRUE); + return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF, + sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE); } } @@ -1072,18 +937,17 @@ kvstorage_read_socket (f_str_t * in, void *arg) static gboolean kvstorage_write_socket (void *arg) { - struct kvstorage_session *session = (struct kvstorage_session *) arg; + struct kvstorage_session *session = (struct kvstorage_session *) arg; if (session->elt) { if ((session->elt->flags & KV_ELT_NEED_INSERT) != 0) { /* Insert to cache and free element */ session->elt->flags &= ~KV_ELT_NEED_INSERT; RW_R_UNLOCK (&session->cf->storage->rwlock); - rspamd_kv_storage_insert_cache (session->cf->storage, - ELT_KEY (session->elt), - session->elt->keylen, ELT_DATA (session->elt), - session->elt->size, session->elt->flags, - session->elt->expire, NULL); + rspamd_kv_storage_insert_cache (session->cf->storage, ELT_KEY (session->elt), + session->elt->keylen, ELT_DATA (session->elt), + session->elt->size, session->elt->flags, + session->elt->expire, NULL); g_free (session->elt); session->elt = NULL; return TRUE; @@ -1102,8 +966,8 @@ kvstorage_write_socket (void *arg) static void kvstorage_err_socket (GError * err, void *arg) { - struct kvstorage_session *session = (struct kvstorage_session *) arg; - struct kvstorage_worker_thread *thr; + struct kvstorage_session *session = (struct kvstorage_session *) arg; + struct kvstorage_worker_thread *thr; thr = session->thr; if (err->code != -1) { @@ -1126,15 +990,14 @@ kvstorage_err_socket (GError * err, void *arg) static void thr_accept_socket (gint fd, short what, void *arg) { - struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg; - union sa_union su; - socklen_t addrlen = sizeof (su.ss); - gint nfd; - struct kvstorage_session *session; + struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg; + union sa_union su; + socklen_t addrlen = sizeof (su.ss); + gint nfd; + struct kvstorage_session *session; g_mutex_lock (thr->accept_mtx); - if ((nfd = - accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { + if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno)); g_mutex_unlock (thr->accept_mtx); return; @@ -1151,14 +1014,9 @@ thr_accept_socket (gint fd, short what, void *arg) session->state = KVSTORAGE_STATE_READ_CMD; session->thr = thr; session->sock = nfd; - session->dispather = rspamd_create_dispatcher (thr->ev_base, - nfd, - BUFFER_LINE, - kvstorage_read_socket, - kvstorage_write_socket, - kvstorage_err_socket, - thr->tv, - session); + session->dispather = rspamd_create_dispatcher (thr->ev_base, nfd, BUFFER_LINE, + kvstorage_read_socket, kvstorage_write_socket, + kvstorage_err_socket, thr->tv, session); g_mutex_unlock (thr->accept_mtx); session->elt = NULL; @@ -1168,7 +1026,7 @@ thr_accept_socket (gint fd, short what, void *arg) } else if (su.ss.ss_family == AF_INET) { memcpy (&session->client_addr, &su.s4.sin_addr, - sizeof (struct in_addr)); + sizeof (struct in_addr)); } } @@ -1178,8 +1036,8 @@ thr_accept_socket (gint fd, short what, void *arg) static void thr_term_socket (gint fd, short what, void *arg) { - struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg; - struct timeval tv; + struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg; + struct timeval tv; if (read (fd, &tv, sizeof (struct timeval)) != sizeof (struct timeval)) { thr_err ("cannot read data from socket: %s", strerror (errno)); @@ -1197,26 +1055,18 @@ thr_term_socket (gint fd, short what, void *arg) static gpointer kvstorage_thread (gpointer ud) { - struct kvstorage_worker_thread *thr = ud; + struct kvstorage_worker_thread *thr = ud; /* Block signals as it is dispatcher deity */ sigprocmask (SIG_BLOCK, thr->signals, NULL); /* Init thread specific events */ thr->ev_base = event_init (); - event_set (&thr->bind_ev, - thr->worker->cf->listen_sock, - EV_READ | EV_PERSIST, - thr_accept_socket, - (void *)thr); + event_set (&thr->bind_ev, thr->worker->cf->listen_sock, EV_READ | EV_PERSIST, thr_accept_socket, (void *)thr); event_base_set (thr->ev_base, &thr->bind_ev); event_add (&thr->bind_ev, NULL); - event_set (&thr->term_ev, - thr->term_sock[0], - EV_READ | EV_PERSIST, - thr_term_socket, - (void *)thr); + event_set (&thr->term_ev, thr->term_sock[0], EV_READ | EV_PERSIST, thr_term_socket, (void *)thr); event_base_set (thr->ev_base, &thr->term_ev); event_add (&thr->term_ev, NULL); @@ -1229,17 +1079,12 @@ kvstorage_thread (gpointer ud) * Create new thread, set it detached */ static struct kvstorage_worker_thread * -create_kvstorage_thread (struct rspamd_worker *worker, - struct kvstorage_worker_ctx *ctx, - guint id, - sigset_t *signals) +create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_ctx *ctx, guint id, sigset_t *signals) { - struct kvstorage_worker_thread *new; - GError *err = NULL; + struct kvstorage_worker_thread *new; + GError *err = NULL; - new = - rspamd_mempool_alloc (ctx->pool, - sizeof (struct kvstorage_worker_thread)); + new = rspamd_mempool_alloc (ctx->pool, sizeof (struct kvstorage_worker_thread)); new->ctx = ctx; new->worker = worker; new->tv = &ctx->io_timeout; @@ -1257,14 +1102,10 @@ create_kvstorage_thread (struct rspamd_worker *worker, #if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err); #else - gchar *name; + gchar *name; - name = rspamd_mempool_alloc (ctx->pool, - sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1); - rspamd_snprintf (name, - sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1, - "kvstorage_thread%d", - id); + name = rspamd_mempool_alloc (ctx->pool, sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1); + rspamd_snprintf (name, sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1, "kvstorage_thread%d", id); new->thr = g_thread_new (name, kvstorage_thread, new); #endif @@ -1284,18 +1125,17 @@ create_kvstorage_thread (struct rspamd_worker *worker, void start_keystorage (struct rspamd_worker *worker) { - struct sigaction signals; - struct kvstorage_worker_ctx *ctx = worker->ctx; - guint i; - struct kvstorage_worker_thread *thr; - struct timeval tv; - GList *cur; + struct sigaction signals; + struct kvstorage_worker_ctx *ctx = worker->ctx; + guint i; + struct kvstorage_worker_thread *thr; + struct timeval tv; + GList *cur; gperf_profiler_init (worker->srv->cfg, "kvstorage"); if (!g_thread_supported ()) { - msg_err ( - "threads support is not supported on your system so kvstorage is not functionable"); + msg_err ("threads support is not supported on your system so kvstorage is not functionable"); exit (EXIT_SUCCESS); } /* Create socketpair */ @@ -1308,8 +1148,7 @@ start_keystorage (struct rspamd_worker *worker) #if _EVENT_NUMERIC_VERSION > 0x02000000 if (evthread_use_pthreads () == -1) { - msg_err ( - "threads support is not supported in your libevent so kvstorage is not functionable"); + msg_err ("threads support is not supported in your libevent so kvstorage is not functionable"); exit (EXIT_SUCCESS); } #endif @@ -1337,7 +1176,7 @@ start_keystorage (struct rspamd_worker *worker) #endif /* Start workers threads */ - for (i = 0; i < worker->cf->count; i++) { + for (i = 0; i < worker->cf->count; i ++) { thr = create_kvstorage_thread (worker, ctx, i, &signals.sa_mask); if (thr != NULL) { ctx->threads = g_list_prepend (ctx->threads, thr); @@ -1346,7 +1185,7 @@ start_keystorage (struct rspamd_worker *worker) sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL); /* Signal processing cycle */ - for (;; ) { + for (;;) { msg_debug ("calling sigsuspend"); sigemptyset (&signals.sa_mask); sigsuspend (&signals.sa_mask); @@ -1358,11 +1197,9 @@ start_keystorage (struct rspamd_worker *worker) cur = ctx->threads; while (cur) { thr = cur->data; - while (write (thr->term_sock[1], &tv, - sizeof (struct timeval)) == -1) { + while (write (thr->term_sock[1], &tv, sizeof (struct timeval)) == -1) { if (errno != EAGAIN) { - msg_err ("write to term socket failed: %s", - strerror (errno)); + msg_err ("write to term socket failed: %s", strerror (errno)); abort (); } } @@ -1374,16 +1211,13 @@ start_keystorage (struct rspamd_worker *worker) soft_wanna_die = 0; tv.tv_sec = SOFT_SHUTDOWN_TIME; tv.tv_usec = 0; - msg_info ("worker's shutdown is pending in %d sec", - SOFT_SHUTDOWN_TIME); + msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); cur = ctx->threads; while (cur) { thr = cur->data; - while (write (thr->term_sock[1], &tv, - sizeof (struct timeval)) == -1) { + while (write (thr->term_sock[1], &tv, sizeof (struct timeval)) == -1) { if (errno != EAGAIN) { - msg_err ("write to term socket failed: %s", - strerror (errno)); + msg_err ("write to term socket failed: %s", strerror (errno)); abort (); } } |