diff options
Diffstat (limited to 'src/kvstorage_server.c')
-rw-r--r-- | src/kvstorage_server.c | 626 |
1 files changed, 396 insertions, 230 deletions
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index 75ada2c77..ecf86f7b3 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -45,22 +45,31 @@ static sig_atomic_t do_reopen_log = 0; static sig_atomic_t soft_wanna_die = 0; /* Logging functions */ -#define thr_err(...) do { \ - g_mutex_lock (thr->log_mtx); \ - rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_CRITICAL, __FUNCTION__, __VA_ARGS__); \ - g_mutex_unlock (thr->log_mtx); \ +#define thr_err(...) do { \ + g_mutex_lock (thr->log_mtx); \ + rspamd_common_log_function (rspamd_main->logger, \ + G_LOG_LEVEL_CRITICAL, \ + __FUNCTION__, \ + __VA_ARGS__); \ + g_mutex_unlock (thr->log_mtx); \ } while (0) -#define thr_warn(...) do { \ - g_mutex_lock (thr->log_mtx); \ - rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_WARNING, __FUNCTION__, __VA_ARGS__); \ - g_mutex_unlock (thr->log_mtx); \ +#define thr_warn(...) do { \ + g_mutex_lock (thr->log_mtx); \ + rspamd_common_log_function (rspamd_main->logger, \ + G_LOG_LEVEL_WARNING, \ + __FUNCTION__, \ + __VA_ARGS__); \ + g_mutex_unlock (thr->log_mtx); \ } while (0) -#define thr_info(...) do { \ - g_mutex_lock (thr->log_mtx); \ - rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_INFO, __FUNCTION__, __VA_ARGS__); \ - g_mutex_unlock (thr->log_mtx); \ +#define thr_info(...) do { \ + g_mutex_lock (thr->log_mtx); \ + rspamd_common_log_function (rspamd_main->logger, \ + G_LOG_LEVEL_INFO, \ + __FUNCTION__, \ + __VA_ARGS__); \ + g_mutex_unlock (thr->log_mtx); \ } while (0) /* Init functions */ @@ -68,14 +77,14 @@ gpointer init_keystorage (void); void start_keystorage (struct rspamd_worker *worker); worker_t keystorage_worker = { - "keystorage", /* Name */ - init_keystorage, /* Init function */ - start_keystorage, /* Start function */ - TRUE, /* Has socket */ - FALSE, /* Non unique */ - TRUE, /* Non threaded */ - FALSE, /* Non killable */ - SOCK_STREAM /* TCP socket */ + "keystorage", /* Name */ + init_keystorage, /* Init function */ + start_keystorage, /* Start function */ + TRUE, /* Has socket */ + FALSE, /* Non unique */ + TRUE, /* Non threaded */ + FALSE, /* Non killable */ + SOCK_STREAM /* TCP socket */ }; #ifndef HAVE_SA_SIGINFO @@ -103,8 +112,8 @@ sig_handler (gint signo, siginfo_t *info, void *unused) gpointer init_keystorage (void) { - struct kvstorage_worker_ctx *ctx; - GQuark type; + struct kvstorage_worker_ctx *ctx; + GQuark type; type = g_quark_try_string ("keystorage"); ctx = g_malloc0 (sizeof (struct kvstorage_worker_ctx)); @@ -114,9 +123,9 @@ init_keystorage (void) ctx->timeout_raw = 300000; register_worker_opt (type, "timeout", xml_handle_seconds, ctx, - G_STRUCT_OFFSET (struct kvstorage_worker_ctx, timeout_raw)); + G_STRUCT_OFFSET (struct kvstorage_worker_ctx, timeout_raw)); register_worker_opt (type, "redis", xml_handle_boolean, ctx, - G_STRUCT_OFFSET (struct kvstorage_worker_ctx, is_redis)); + G_STRUCT_OFFSET (struct kvstorage_worker_ctx, is_redis)); return ctx; } @@ -124,7 +133,7 @@ init_keystorage (void) static gboolean config_kvstorage_worker (struct rspamd_worker *worker) { - struct kvstorage_worker_ctx *ctx = worker->ctx; + struct kvstorage_worker_ctx *ctx = worker->ctx; /* Init timeval */ msec_to_tv (ctx->timeout_raw, &ctx->io_timeout); @@ -153,18 +162,18 @@ parse_kvstorage_command (struct kvstorage_session *session, gchar *c, guint len) if (len == 3) { /* Set or get command */ if ((c[0] == 'g' || c[0] == 'G') && - (c[1] == 'e' || c[1] == 'E') && - (c[2] == 't' || c[2] == 'T')) { + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 't' || c[2] == 'T')) { session->command = KVSTORAGE_CMD_GET; } else if ((c[0] == 's' || c[0] == 'S') && - (c[1] == 'e' || c[1] == 'E') && - (c[2] == 't' || c[2] == 'T')) { + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 't' || c[2] == 'T')) { session->command = KVSTORAGE_CMD_SET; } else if ((c[0] == 'd' || c[0] == 'D') && - (c[1] == 'e' || c[1] == 'E') && - (c[2] == 'l' || c[2] == 'L')) { + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 'l' || c[2] == 'L')) { session->command = KVSTORAGE_CMD_DELETE; } else { @@ -174,51 +183,52 @@ parse_kvstorage_command (struct kvstorage_session *session, gchar *c, guint len) } else if (len == 4) { if ((c[0] == 'i' || c[0] == 'I') && - (c[1] == 'n' || c[1] == 'N') && - (c[2] == 'c' || c[2] == 'C') && - (c[3] == 'r' || c[3] == 'R')) { + (c[1] == 'n' || c[1] == 'N') && + (c[2] == 'c' || c[2] == 'C') && + (c[3] == 'r' || c[3] == 'R')) { session->command = KVSTORAGE_CMD_INCR; session->arg_data.value = 1; } else if ((c[0] == 'd' || c[0] == 'D') && - (c[1] == 'e' || c[1] == 'E') && - (c[2] == 'c' || c[2] == 'C') && - (c[3] == 'r' || c[3] == 'R')) { + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 'c' || c[2] == 'C') && + (c[3] == 'r' || c[3] == 'R')) { session->command = KVSTORAGE_CMD_DECR; session->arg_data.value = -1; } else if (g_ascii_strncasecmp (c, "quit", 4) == 0) { session->command = KVSTORAGE_CMD_QUIT; } - else if (g_ascii_strncasecmp (c, "sync", 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) { + else if (g_ascii_strncasecmp (c, "sync", + 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) { session->command = KVSTORAGE_CMD_SYNC; } } else if (len == 6) { if ((c[0] == 'i' || c[0] == 'I') && - (c[1] == 'n' || c[1] == 'N') && - (c[2] == 'c' || c[2] == 'C') && - (c[3] == 'r' || c[3] == 'R') && - (c[4] == 'b' || c[4] == 'B') && - (c[5] == 'y' || c[5] == 'Y')) { + (c[1] == 'n' || c[1] == 'N') && + (c[2] == 'c' || c[2] == 'C') && + (c[3] == 'r' || c[3] == 'R') && + (c[4] == 'b' || c[4] == 'B') && + (c[5] == 'y' || c[5] == 'Y')) { session->command = KVSTORAGE_CMD_INCR; session->arg_data.value = 1; } else if ((c[0] == 'd' || c[0] == 'D') && - (c[1] == 'e' || c[1] == 'E') && - (c[2] == 'c' || c[2] == 'C') && - (c[3] == 'r' || c[3] == 'R') && - (c[4] == 'b' || c[4] == 'B') && - (c[5] == 'y' || c[5] == 'Y')) { + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 'c' || c[2] == 'C') && + (c[3] == 'r' || c[3] == 'R') && + (c[4] == 'b' || c[4] == 'B') && + (c[5] == 'y' || c[5] == 'Y')) { session->command = KVSTORAGE_CMD_DECR; session->arg_data.value = -1; } else if ((c[0] == 'd' || c[0] == 'D') && - (c[1] == 'e' || c[1] == 'E') && - (c[2] == 'l' || c[2] == 'L') && - (c[3] == 'e' || c[3] == 'E') && - (c[4] == 't' || c[4] == 'T') && - (c[5] == 'e' || c[5] == 'E')) { + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 'l' || c[2] == 'L') && + (c[3] == 'e' || c[3] == 'E') && + (c[4] == 't' || c[4] == 'T') && + (c[5] == 'e' || c[5] == 'E')) { session->command = KVSTORAGE_CMD_DELETE; } else if (g_ascii_strncasecmp (c, "select", 6) == 0) { @@ -238,9 +248,9 @@ parse_kvstorage_command (struct kvstorage_session *session, gchar *c, guint len) static gboolean parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) { - gchar *p, *c, *end; - gint state = 0, next_state = 0; - gboolean is_redis; + gchar *p, *c, *end; + gint state = 0, next_state = 0; + gboolean is_redis; p = in->begin; end = in->begin + in->len; @@ -253,7 +263,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 0: /* At this state we try to read identifier of storage */ if (g_ascii_isdigit (*p)) { - p ++; + p++; } else { if (g_ascii_isspace (*p) && p != c) { @@ -284,7 +294,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 1: /* At this state we parse command */ if (g_ascii_isalpha (*p) && p != end) { - p ++; + p++; } else { if (parse_kvstorage_command (session, c, p - c)) { @@ -315,20 +325,21 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 2: /* Read and store key */ if (!g_ascii_isspace (*p) && end != p) { - p ++; + p++; } else { if (p == c) { return FALSE; } else { - session->key = rspamd_mempool_alloc (session->pool, p - c + 1); + session->key = rspamd_mempool_alloc (session->pool, + p - c + 1); rspamd_strlcpy (session->key, c, p - c + 1); session->keylen = p - c; /* Now we must select next state based on command */ if (session->command == KVSTORAGE_CMD_SET || - session->command == KVSTORAGE_CMD_INCR || - session->command == KVSTORAGE_CMD_DECR) { + session->command == KVSTORAGE_CMD_INCR || + session->command == KVSTORAGE_CMD_DECR) { /* Read flags */ state = 99; if (is_redis) { @@ -355,7 +366,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 3: /* Read flags */ if (g_ascii_isdigit (*p)) { - p ++; + p++; } else { if (g_ascii_isspace (*p)) { @@ -377,7 +388,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 4: /* Read exptime */ if (g_ascii_isdigit (*p)) { - p ++; + p++; } else { if (g_ascii_isspace (*p)) { @@ -393,7 +404,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 5: /* Read size or incr/decr values */ if (g_ascii_isdigit (*p)) { - p ++; + p++; } else { if (g_ascii_isspace (*p) || p >= end - 1) { @@ -404,7 +415,8 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) if (p != c) { session->arg_data.value = strtoul (c, NULL, 10); if (session->command == KVSTORAGE_CMD_DECR) { - session->arg_data.value = -session->arg_data.value; + session->arg_data.value = + -session->arg_data.value; } } else if (session->command == KVSTORAGE_CMD_INCR) { @@ -424,7 +436,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 6: /* Read index of storage */ if (g_ascii_isdigit (*p)) { - p ++; + p++; } else { if (g_ascii_isspace (*p) || end == p) { @@ -439,7 +451,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 7: /* Read arguments count */ if (g_ascii_isdigit (*p)) { - p ++; + p++; } else { if (g_ascii_isspace (*p) || end == p) { @@ -457,7 +469,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) case 99: /* Skip spaces state */ if (g_ascii_isspace (*p)) { - p ++; + p++; } else { c = p; @@ -478,33 +490,44 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) static gboolean kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) { - gint r; - gchar outbuf[BUFSIZ], intbuf[sizeof ("9223372036854775807")]; - gboolean res; - struct rspamd_kv_element *elt; - guint eltlen; - glong longval; + gint r; + gchar outbuf[BUFSIZ], intbuf[sizeof ("9223372036854775807")]; + gboolean res; + struct rspamd_kv_element *elt; + guint eltlen; + glong longval; if (session->command == KVSTORAGE_CMD_SET) { session->state = KVSTORAGE_STATE_READ_DATA; - rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->arg_data.length); + rspamd_set_dispatcher_policy (session->dispather, + BUFFER_CHARACTER, + session->arg_data.length); } else if (session->command == KVSTORAGE_CMD_GET) { - elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->keylen, session->now); + elt = rspamd_kv_storage_lookup (session->cf->storage, + session->key, + session->keylen, + session->now); if (elt == NULL) { RW_R_UNLOCK (&session->cf->storage->rwlock); if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, - sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + ERROR_NOT_FOUND, + sizeof (ERROR_NOT_FOUND) - 1, + FALSE, + TRUE); } else { return rspamd_dispatcher_write (session->dispather, "$-1" CRLF, - sizeof ("$-1" CRLF) - 1, FALSE, TRUE); + sizeof ("$-1" CRLF) - 1, FALSE, TRUE); } } else { if (elt->flags & KV_ELT_INTEGER) { - eltlen = rspamd_snprintf (intbuf, sizeof (intbuf), "%l", ELT_LONG (elt)); + eltlen = rspamd_snprintf (intbuf, + sizeof (intbuf), + "%l", + ELT_LONG (elt)); } else { @@ -512,34 +535,43 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } if (!is_redis) { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF, - ELT_KEY (elt), elt->flags, eltlen); + r = rspamd_snprintf (outbuf, + sizeof (outbuf), + "VALUE %s %ud %ud" CRLF, + ELT_KEY (elt), + elt->flags, + eltlen); } else { r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF, eltlen); } if (!rspamd_dispatcher_write (session->dispather, outbuf, - r, TRUE, FALSE)) { + r, TRUE, FALSE)) { RW_R_UNLOCK (&session->cf->storage->rwlock); return FALSE; } if (elt->flags & KV_ELT_INTEGER) { - if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) { + if (!rspamd_dispatcher_write (session->dispather, intbuf, + eltlen, TRUE, TRUE)) { RW_R_UNLOCK (&session->cf->storage->rwlock); return FALSE; } } else { - if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) { + if (!rspamd_dispatcher_write (session->dispather, + ELT_DATA (elt), eltlen, TRUE, TRUE)) { RW_R_UNLOCK (&session->cf->storage->rwlock); return FALSE; } } session->elt = elt; if (!is_redis) { - res = rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF, - sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE); + res = rspamd_dispatcher_write (session->dispather, + CRLF "END" CRLF, + sizeof (CRLF "END" CRLF) - 1, + FALSE, + TRUE); } else { res = rspamd_dispatcher_write (session->dispather, CRLF, @@ -553,42 +585,58 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } } else if (session->command == KVSTORAGE_CMD_DELETE) { - elt = rspamd_kv_storage_delete (session->cf->storage, session->key, session->keylen); + elt = rspamd_kv_storage_delete (session->cf->storage, + session->key, + session->keylen); if (elt != NULL) { if ((elt->flags & KV_ELT_DIRTY) == 0) { /* Free memory if backend has deleted this element */ g_slice_free1 (ELT_SIZE (elt), elt); } if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF, - sizeof ("DELETED" CRLF) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + "DELETED" CRLF, + sizeof ("DELETED" CRLF) - 1, + FALSE, + TRUE); } else { return rspamd_dispatcher_write (session->dispather, ":1" CRLF, - sizeof (":1" CRLF) - 1, FALSE, TRUE); + sizeof (":1" CRLF) - 1, FALSE, TRUE); } } else { if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, - sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + ERROR_NOT_FOUND, + sizeof (ERROR_NOT_FOUND) - 1, + FALSE, + TRUE); } else { return rspamd_dispatcher_write (session->dispather, ":0" CRLF, - sizeof (":0" CRLF) - 1, FALSE, TRUE); + sizeof (":0" CRLF) - 1, FALSE, TRUE); } } } - else if (session->command == KVSTORAGE_CMD_INCR || session->command == KVSTORAGE_CMD_DECR) { + else if (session->command == KVSTORAGE_CMD_INCR || session->command == + KVSTORAGE_CMD_DECR) { longval = session->arg_data.value; - if (!rspamd_kv_storage_increment (session->cf->storage, session->key, session->keylen, &longval)) { + if (!rspamd_kv_storage_increment (session->cf->storage, session->key, + session->keylen, &longval)) { if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, - sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + ERROR_NOT_FOUND, + sizeof (ERROR_NOT_FOUND) - 1, + FALSE, + TRUE); } else { - return rspamd_dispatcher_write (session->dispather, "-ERR not found" CRLF, - sizeof ("-ERR not found" CRLF) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + "-ERR not found" CRLF, + sizeof ("-ERR not found" CRLF) - 1, + FALSE, + TRUE); } } else { @@ -601,41 +649,61 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) longval); } if (!rspamd_dispatcher_write (session->dispather, outbuf, - r, FALSE, FALSE)) { + r, FALSE, FALSE)) { return FALSE; } } } else if (session->command == KVSTORAGE_CMD_SYNC) { - if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) { + if (session->cf->storage->backend == NULL || + session->cf->storage->backend->sync_func == NULL) { if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, ERROR_COMMON, - sizeof (ERROR_COMMON) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + ERROR_COMMON, + sizeof (ERROR_COMMON) - 1, + FALSE, + TRUE); } else { - return rspamd_dispatcher_write (session->dispather, "-ERR unsupported" CRLF, - sizeof ("-ERR unsupported" CRLF) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + "-ERR unsupported" CRLF, + sizeof ("-ERR unsupported" CRLF) - 1, + FALSE, + TRUE); } } else { - if (session->cf->storage->backend->sync_func (session->cf->storage->backend)) { + if (session->cf->storage->backend->sync_func (session->cf->storage-> + backend)) { if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, "SYNCED" CRLF, - sizeof ("SYNCED" CRLF) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + "SYNCED" CRLF, + sizeof ("SYNCED" CRLF) - 1, + FALSE, + TRUE); } else { - return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, - sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + "+OK" CRLF, + sizeof ("+OK" CRLF) - 1, + FALSE, + TRUE); } } else { if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, "NOT_SYNCED" CRLF, - sizeof ("NOT_SYNCED" CRLF) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + "NOT_SYNCED" CRLF, + sizeof ("NOT_SYNCED" CRLF) - 1, + FALSE, + TRUE); } else { - return rspamd_dispatcher_write (session->dispather, "-ERR not synced" CRLF, - sizeof ("-ERR not synced" CRLF) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + "-ERR not synced" CRLF, + sizeof ("-ERR not synced" CRLF) - 1, + FALSE, + TRUE); } } } @@ -643,11 +711,11 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) else if (session->command == KVSTORAGE_CMD_SELECT) { if (!is_redis) { return rspamd_dispatcher_write (session->dispather, "SELECTED" CRLF, - sizeof ("SELECTED" CRLF) - 1, FALSE, TRUE); + sizeof ("SELECTED" CRLF) - 1, FALSE, TRUE); } else { return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, - sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + sizeof ("+OK" CRLF) - 1, FALSE, TRUE); } } else if (session->command == KVSTORAGE_CMD_QUIT) { @@ -662,8 +730,8 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) static gboolean kvstorage_read_arglen (f_str_t *in, guint *len) { - gchar *p = in->begin, *end = in->begin + in->len, *c; - gint state = 0; + gchar *p = in->begin, *end = in->begin + in->len, *c; + gint state = 0; c = p; while (p < end) { @@ -673,14 +741,14 @@ kvstorage_read_arglen (f_str_t *in, guint *len) return FALSE; } else { - p ++; + p++; c = p; state = 1; } break; case 1: if (g_ascii_isdigit (*p) && p != end - 1) { - p ++; + p++; } else { if (p != end - 1) { @@ -731,12 +799,12 @@ kvstorage_check_argnum (struct kvstorage_session *session) static gboolean kvstorage_read_socket (f_str_t * in, void *arg) { - struct kvstorage_session *session = (struct kvstorage_session *) arg; - struct kvstorage_worker_thread *thr; - gint r; - guint arglen = 0; - gchar outbuf[BUFSIZ]; - gboolean is_redis; + struct kvstorage_session *session = (struct kvstorage_session *) arg; + struct kvstorage_worker_thread *thr; + gint r; + guint arglen = 0; + gchar outbuf[BUFSIZ]; + gboolean is_redis; if (in->len == 0) { /* Skip empty commands */ @@ -749,16 +817,22 @@ kvstorage_read_socket (f_str_t * in, void *arg) case KVSTORAGE_STATE_READ_CMD: /* Update timestamp */ session->now = time (NULL); - if (! parse_kvstorage_line (session, in)) { + if (!parse_kvstorage_line (session, in)) { thr_info ("%ud: unknown command: %V", thr->id, in); if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND, - sizeof (ERROR_UNKNOWN_COMMAND) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + ERROR_UNKNOWN_COMMAND, + sizeof (ERROR_UNKNOWN_COMMAND) - 1, + FALSE, + TRUE); } else { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown command '%V'" CRLF, in); + r = rspamd_snprintf (outbuf, + sizeof (outbuf), + "-ERR unknown command '%V'" CRLF, + in); return rspamd_dispatcher_write (session->dispather, outbuf, - r, FALSE, TRUE); + r, FALSE, TRUE); } } else { @@ -766,12 +840,18 @@ kvstorage_read_socket (f_str_t * in, void *arg) if (session->cf == NULL) { thr_info ("%ud: bad keystorage: %ud", thr->id, session->id); if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, ERROR_INVALID_KEYSTORAGE, - sizeof (ERROR_INVALID_KEYSTORAGE) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + ERROR_INVALID_KEYSTORAGE, + sizeof (ERROR_INVALID_KEYSTORAGE) - 1, + FALSE, + TRUE); } else { - return rspamd_dispatcher_write (session->dispather, "-ERR unknown keystorage" CRLF, - sizeof ("-ERR unknown keystorage" CRLF) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + "-ERR unknown keystorage" CRLF, + sizeof ("-ERR unknown keystorage" CRLF) - 1, + FALSE, + TRUE); } } if (session->state != KVSTORAGE_STATE_READ_ARGLEN) { @@ -780,44 +860,59 @@ kvstorage_read_socket (f_str_t * in, void *arg) } break; case KVSTORAGE_STATE_READ_ARGLEN: - if (! kvstorage_read_arglen (in, &arglen)) { + if (!kvstorage_read_arglen (in, &arglen)) { session->state = KVSTORAGE_STATE_READ_CMD; - r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown arglen '%V'" CRLF, in); + r = rspamd_snprintf (outbuf, + sizeof (outbuf), + "-ERR unknown arglen '%V'" CRLF, + in); return rspamd_dispatcher_write (session->dispather, outbuf, - r, FALSE, TRUE); + r, FALSE, TRUE); } else { session->state = KVSTORAGE_STATE_READ_ARG; - rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, arglen); + rspamd_set_dispatcher_policy (session->dispather, + BUFFER_CHARACTER, + arglen); } break; case KVSTORAGE_STATE_READ_ARG: if (session->argnum == 0) { /* Read command */ - if (! parse_kvstorage_command (session, in->begin, in->len)) { + if (!parse_kvstorage_command (session, in->begin, in->len)) { session->state = KVSTORAGE_STATE_READ_CMD; - r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown command '%V'" CRLF, in); + r = rspamd_snprintf (outbuf, + sizeof (outbuf), + "-ERR unknown command '%V'" CRLF, + in); return rspamd_dispatcher_write (session->dispather, outbuf, - r, FALSE, TRUE); + r, FALSE, TRUE); } else { - if (! kvstorage_check_argnum (session)) { + if (!kvstorage_check_argnum (session)) { session->state = KVSTORAGE_STATE_READ_CMD; - r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR invalid argnum for command '%V': %ud" CRLF, - in, session->argc); + r = rspamd_snprintf (outbuf, + sizeof (outbuf), + "-ERR invalid argnum for command '%V': %ud" CRLF, + in, + session->argc); return rspamd_dispatcher_write (session->dispather, outbuf, - r, FALSE, TRUE); + r, FALSE, TRUE); } else { if (session->argnum == session->argc - 1) { session->state = KVSTORAGE_STATE_READ_CMD; - rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + rspamd_set_dispatcher_policy (session->dispather, + BUFFER_LINE, + -1); return kvstorage_process_command (session, TRUE); } else { - session->argnum ++; + session->argnum++; session->state = KVSTORAGE_STATE_READ_ARGLEN; - rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + rspamd_set_dispatcher_policy (session->dispather, + BUFFER_LINE, + -1); } } } @@ -829,21 +924,28 @@ kvstorage_read_socket (f_str_t * in, void *arg) session->keylen = in->len; if (session->argnum == session->argc - 1) { session->state = KVSTORAGE_STATE_READ_CMD; - rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + rspamd_set_dispatcher_policy (session->dispather, + BUFFER_LINE, + -1); return kvstorage_process_command (session, TRUE); } else { - session->argnum ++; + session->argnum++; session->state = KVSTORAGE_STATE_READ_ARGLEN; - rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + rspamd_set_dispatcher_policy (session->dispather, + BUFFER_LINE, + -1); } } else { /* Special case for select command */ session->state = KVSTORAGE_STATE_READ_CMD; - rspamd_strlcpy (outbuf, in->begin, MIN (sizeof (outbuf), in->len)); + rspamd_strlcpy (outbuf, in->begin, MIN (sizeof (outbuf), + in->len)); session->id = strtoul (outbuf, NULL, 10); - rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + rspamd_set_dispatcher_policy (session->dispather, + BUFFER_LINE, + -1); return kvstorage_process_command (session, TRUE); } } @@ -851,25 +953,37 @@ kvstorage_read_socket (f_str_t * in, void *arg) /* We get datablock for set command */ if (session->command == KVSTORAGE_CMD_SET && session->argc == 3) { session->state = KVSTORAGE_STATE_READ_CMD; - rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); - if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen, - in->begin, in->len, - session->flags, session->expire)) { - return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, - sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + rspamd_set_dispatcher_policy (session->dispather, + BUFFER_LINE, + -1); + if (rspamd_kv_storage_insert (session->cf->storage, + session->key, session->keylen, + in->begin, in->len, + session->flags, session->expire)) { + return rspamd_dispatcher_write (session->dispather, + "+OK" CRLF, + sizeof ("+OK" CRLF) - 1, + FALSE, + TRUE); } else { - return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF, - sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + "-ERR not stored" CRLF, + sizeof ("-ERR not stored" CRLF) - 1, + FALSE, + TRUE); } } - else if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) { + else if (session->command == KVSTORAGE_CMD_SET && session->argc == + 4) { /* It is expire argument */ session->state = KVSTORAGE_STATE_READ_CMD; rspamd_strtol (in->begin, in->len, (glong *)&session->expire); - session->argnum ++; + session->argnum++; session->state = KVSTORAGE_STATE_READ_ARGLEN; - rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + rspamd_set_dispatcher_policy (session->dispather, + BUFFER_LINE, + -1); } else { session->state = KVSTORAGE_STATE_READ_CMD; @@ -877,7 +991,9 @@ kvstorage_read_socket (f_str_t * in, void *arg) if (session->command == KVSTORAGE_CMD_DECR) { session->arg_data.value = -session->arg_data.value; } - rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + rspamd_set_dispatcher_policy (session->dispather, + BUFFER_LINE, + -1); return kvstorage_process_command (session, TRUE); } } @@ -885,16 +1001,25 @@ kvstorage_read_socket (f_str_t * in, void *arg) /* We get datablock for set command */ if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) { session->state = KVSTORAGE_STATE_READ_CMD; - rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); - if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen, - in->begin, in->len, - session->flags, session->expire)) { - return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, - sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + rspamd_set_dispatcher_policy (session->dispather, + BUFFER_LINE, + -1); + if (rspamd_kv_storage_insert (session->cf->storage, + session->key, session->keylen, + in->begin, in->len, + session->flags, session->expire)) { + return rspamd_dispatcher_write (session->dispather, + "+OK" CRLF, + sizeof ("+OK" CRLF) - 1, + FALSE, + TRUE); } else { - return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF, - sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + "-ERR not stored" CRLF, + sizeof ("-ERR not stored" CRLF) - 1, + FALSE, + TRUE); } } } @@ -902,26 +1027,36 @@ kvstorage_read_socket (f_str_t * in, void *arg) case KVSTORAGE_STATE_READ_DATA: session->state = KVSTORAGE_STATE_READ_CMD; rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); - if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen, - in->begin, in->len, - session->flags, session->expire)) { + if (rspamd_kv_storage_insert (session->cf->storage, session->key, + session->keylen, + in->begin, in->len, + session->flags, session->expire)) { if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, "STORED" CRLF, - sizeof ("STORED" CRLF) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + "STORED" CRLF, + sizeof ("STORED" CRLF) - 1, + FALSE, + TRUE); } else { return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, - sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + sizeof ("+OK" CRLF) - 1, FALSE, TRUE); } } else { if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED, - sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + ERROR_NOT_STORED, + sizeof (ERROR_NOT_STORED) - 1, + FALSE, + TRUE); } else { - return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF, - sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE); + return rspamd_dispatcher_write (session->dispather, + "-ERR not stored" CRLF, + sizeof ("-ERR not stored" CRLF) - 1, + FALSE, + TRUE); } } @@ -937,17 +1072,18 @@ kvstorage_read_socket (f_str_t * in, void *arg) static gboolean kvstorage_write_socket (void *arg) { - struct kvstorage_session *session = (struct kvstorage_session *) arg; + struct kvstorage_session *session = (struct kvstorage_session *) arg; if (session->elt) { if ((session->elt->flags & KV_ELT_NEED_INSERT) != 0) { /* Insert to cache and free element */ session->elt->flags &= ~KV_ELT_NEED_INSERT; RW_R_UNLOCK (&session->cf->storage->rwlock); - rspamd_kv_storage_insert_cache (session->cf->storage, ELT_KEY (session->elt), - session->elt->keylen, ELT_DATA (session->elt), - session->elt->size, session->elt->flags, - session->elt->expire, NULL); + rspamd_kv_storage_insert_cache (session->cf->storage, + ELT_KEY (session->elt), + session->elt->keylen, ELT_DATA (session->elt), + session->elt->size, session->elt->flags, + session->elt->expire, NULL); g_free (session->elt); session->elt = NULL; return TRUE; @@ -966,8 +1102,8 @@ kvstorage_write_socket (void *arg) static void kvstorage_err_socket (GError * err, void *arg) { - struct kvstorage_session *session = (struct kvstorage_session *) arg; - struct kvstorage_worker_thread *thr; + struct kvstorage_session *session = (struct kvstorage_session *) arg; + struct kvstorage_worker_thread *thr; thr = session->thr; if (err->code != -1) { @@ -990,14 +1126,15 @@ kvstorage_err_socket (GError * err, void *arg) static void thr_accept_socket (gint fd, short what, void *arg) { - struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg; - union sa_union su; - socklen_t addrlen = sizeof (su.ss); - gint nfd; - struct kvstorage_session *session; + struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg; + union sa_union su; + socklen_t addrlen = sizeof (su.ss); + gint nfd; + struct kvstorage_session *session; g_mutex_lock (thr->accept_mtx); - if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { + if ((nfd = + accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno)); g_mutex_unlock (thr->accept_mtx); return; @@ -1014,9 +1151,14 @@ thr_accept_socket (gint fd, short what, void *arg) session->state = KVSTORAGE_STATE_READ_CMD; session->thr = thr; session->sock = nfd; - session->dispather = rspamd_create_dispatcher (thr->ev_base, nfd, BUFFER_LINE, - kvstorage_read_socket, kvstorage_write_socket, - kvstorage_err_socket, thr->tv, session); + session->dispather = rspamd_create_dispatcher (thr->ev_base, + nfd, + BUFFER_LINE, + kvstorage_read_socket, + kvstorage_write_socket, + kvstorage_err_socket, + thr->tv, + session); g_mutex_unlock (thr->accept_mtx); session->elt = NULL; @@ -1026,7 +1168,7 @@ thr_accept_socket (gint fd, short what, void *arg) } else if (su.ss.ss_family == AF_INET) { memcpy (&session->client_addr, &su.s4.sin_addr, - sizeof (struct in_addr)); + sizeof (struct in_addr)); } } @@ -1036,8 +1178,8 @@ thr_accept_socket (gint fd, short what, void *arg) static void thr_term_socket (gint fd, short what, void *arg) { - struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg; - struct timeval tv; + struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg; + struct timeval tv; if (read (fd, &tv, sizeof (struct timeval)) != sizeof (struct timeval)) { thr_err ("cannot read data from socket: %s", strerror (errno)); @@ -1055,18 +1197,26 @@ thr_term_socket (gint fd, short what, void *arg) static gpointer kvstorage_thread (gpointer ud) { - struct kvstorage_worker_thread *thr = ud; + struct kvstorage_worker_thread *thr = ud; /* Block signals as it is dispatcher deity */ sigprocmask (SIG_BLOCK, thr->signals, NULL); /* Init thread specific events */ thr->ev_base = event_init (); - event_set (&thr->bind_ev, thr->worker->cf->listen_sock, EV_READ | EV_PERSIST, thr_accept_socket, (void *)thr); + event_set (&thr->bind_ev, + thr->worker->cf->listen_sock, + EV_READ | EV_PERSIST, + thr_accept_socket, + (void *)thr); event_base_set (thr->ev_base, &thr->bind_ev); event_add (&thr->bind_ev, NULL); - event_set (&thr->term_ev, thr->term_sock[0], EV_READ | EV_PERSIST, thr_term_socket, (void *)thr); + event_set (&thr->term_ev, + thr->term_sock[0], + EV_READ | EV_PERSIST, + thr_term_socket, + (void *)thr); event_base_set (thr->ev_base, &thr->term_ev); event_add (&thr->term_ev, NULL); @@ -1079,12 +1229,17 @@ kvstorage_thread (gpointer ud) * Create new thread, set it detached */ static struct kvstorage_worker_thread * -create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_ctx *ctx, guint id, sigset_t *signals) +create_kvstorage_thread (struct rspamd_worker *worker, + struct kvstorage_worker_ctx *ctx, + guint id, + sigset_t *signals) { - struct kvstorage_worker_thread *new; - GError *err = NULL; + struct kvstorage_worker_thread *new; + GError *err = NULL; - new = rspamd_mempool_alloc (ctx->pool, sizeof (struct kvstorage_worker_thread)); + new = + rspamd_mempool_alloc (ctx->pool, + sizeof (struct kvstorage_worker_thread)); new->ctx = ctx; new->worker = worker; new->tv = &ctx->io_timeout; @@ -1102,10 +1257,14 @@ create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_c #if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err); #else - gchar *name; + gchar *name; - name = rspamd_mempool_alloc (ctx->pool, sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1); - rspamd_snprintf (name, sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1, "kvstorage_thread%d", id); + name = rspamd_mempool_alloc (ctx->pool, + sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1); + rspamd_snprintf (name, + sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1, + "kvstorage_thread%d", + id); new->thr = g_thread_new (name, kvstorage_thread, new); #endif @@ -1125,17 +1284,18 @@ create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_c void start_keystorage (struct rspamd_worker *worker) { - struct sigaction signals; - struct kvstorage_worker_ctx *ctx = worker->ctx; - guint i; - struct kvstorage_worker_thread *thr; - struct timeval tv; - GList *cur; + struct sigaction signals; + struct kvstorage_worker_ctx *ctx = worker->ctx; + guint i; + struct kvstorage_worker_thread *thr; + struct timeval tv; + GList *cur; gperf_profiler_init (worker->srv->cfg, "kvstorage"); if (!g_thread_supported ()) { - msg_err ("threads support is not supported on your system so kvstorage is not functionable"); + msg_err ( + "threads support is not supported on your system so kvstorage is not functionable"); exit (EXIT_SUCCESS); } /* Create socketpair */ @@ -1148,7 +1308,8 @@ start_keystorage (struct rspamd_worker *worker) #if _EVENT_NUMERIC_VERSION > 0x02000000 if (evthread_use_pthreads () == -1) { - msg_err ("threads support is not supported in your libevent so kvstorage is not functionable"); + msg_err ( + "threads support is not supported in your libevent so kvstorage is not functionable"); exit (EXIT_SUCCESS); } #endif @@ -1176,7 +1337,7 @@ start_keystorage (struct rspamd_worker *worker) #endif /* Start workers threads */ - for (i = 0; i < worker->cf->count; i ++) { + for (i = 0; i < worker->cf->count; i++) { thr = create_kvstorage_thread (worker, ctx, i, &signals.sa_mask); if (thr != NULL) { ctx->threads = g_list_prepend (ctx->threads, thr); @@ -1185,7 +1346,7 @@ start_keystorage (struct rspamd_worker *worker) sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL); /* Signal processing cycle */ - for (;;) { + for (;; ) { msg_debug ("calling sigsuspend"); sigemptyset (&signals.sa_mask); sigsuspend (&signals.sa_mask); @@ -1197,9 +1358,11 @@ start_keystorage (struct rspamd_worker *worker) cur = ctx->threads; while (cur) { thr = cur->data; - while (write (thr->term_sock[1], &tv, sizeof (struct timeval)) == -1) { + while (write (thr->term_sock[1], &tv, + sizeof (struct timeval)) == -1) { if (errno != EAGAIN) { - msg_err ("write to term socket failed: %s", strerror (errno)); + msg_err ("write to term socket failed: %s", + strerror (errno)); abort (); } } @@ -1211,13 +1374,16 @@ start_keystorage (struct rspamd_worker *worker) soft_wanna_die = 0; tv.tv_sec = SOFT_SHUTDOWN_TIME; tv.tv_usec = 0; - msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + msg_info ("worker's shutdown is pending in %d sec", + SOFT_SHUTDOWN_TIME); cur = ctx->threads; while (cur) { thr = cur->data; - while (write (thr->term_sock[1], &tv, sizeof (struct timeval)) == -1) { + while (write (thr->term_sock[1], &tv, + sizeof (struct timeval)) == -1) { if (errno != EAGAIN) { - msg_err ("write to term socket failed: %s", strerror (errno)); + msg_err ("write to term socket failed: %s", + strerror (errno)); abort (); } } |