aboutsummaryrefslogtreecommitdiffstats
path: root/src/kvstorage_server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/kvstorage_server.c')
-rw-r--r--src/kvstorage_server.c626
1 files changed, 396 insertions, 230 deletions
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c
index 75ada2c77..ecf86f7b3 100644
--- a/src/kvstorage_server.c
+++ b/src/kvstorage_server.c
@@ -45,22 +45,31 @@ static sig_atomic_t do_reopen_log = 0;
static sig_atomic_t soft_wanna_die = 0;
/* Logging functions */
-#define thr_err(...) do { \
- g_mutex_lock (thr->log_mtx); \
- rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_CRITICAL, __FUNCTION__, __VA_ARGS__); \
- g_mutex_unlock (thr->log_mtx); \
+#define thr_err(...) do { \
+ g_mutex_lock (thr->log_mtx); \
+ rspamd_common_log_function (rspamd_main->logger, \
+ G_LOG_LEVEL_CRITICAL, \
+ __FUNCTION__, \
+ __VA_ARGS__); \
+ g_mutex_unlock (thr->log_mtx); \
} while (0)
-#define thr_warn(...) do { \
- g_mutex_lock (thr->log_mtx); \
- rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_WARNING, __FUNCTION__, __VA_ARGS__); \
- g_mutex_unlock (thr->log_mtx); \
+#define thr_warn(...) do { \
+ g_mutex_lock (thr->log_mtx); \
+ rspamd_common_log_function (rspamd_main->logger, \
+ G_LOG_LEVEL_WARNING, \
+ __FUNCTION__, \
+ __VA_ARGS__); \
+ g_mutex_unlock (thr->log_mtx); \
} while (0)
-#define thr_info(...) do { \
- g_mutex_lock (thr->log_mtx); \
- rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_INFO, __FUNCTION__, __VA_ARGS__); \
- g_mutex_unlock (thr->log_mtx); \
+#define thr_info(...) do { \
+ g_mutex_lock (thr->log_mtx); \
+ rspamd_common_log_function (rspamd_main->logger, \
+ G_LOG_LEVEL_INFO, \
+ __FUNCTION__, \
+ __VA_ARGS__); \
+ g_mutex_unlock (thr->log_mtx); \
} while (0)
/* Init functions */
@@ -68,14 +77,14 @@ gpointer init_keystorage (void);
void start_keystorage (struct rspamd_worker *worker);
worker_t keystorage_worker = {
- "keystorage", /* Name */
- init_keystorage, /* Init function */
- start_keystorage, /* Start function */
- TRUE, /* Has socket */
- FALSE, /* Non unique */
- TRUE, /* Non threaded */
- FALSE, /* Non killable */
- SOCK_STREAM /* TCP socket */
+ "keystorage", /* Name */
+ init_keystorage, /* Init function */
+ start_keystorage, /* Start function */
+ TRUE, /* Has socket */
+ FALSE, /* Non unique */
+ TRUE, /* Non threaded */
+ FALSE, /* Non killable */
+ SOCK_STREAM /* TCP socket */
};
#ifndef HAVE_SA_SIGINFO
@@ -103,8 +112,8 @@ sig_handler (gint signo, siginfo_t *info, void *unused)
gpointer
init_keystorage (void)
{
- struct kvstorage_worker_ctx *ctx;
- GQuark type;
+ struct kvstorage_worker_ctx *ctx;
+ GQuark type;
type = g_quark_try_string ("keystorage");
ctx = g_malloc0 (sizeof (struct kvstorage_worker_ctx));
@@ -114,9 +123,9 @@ init_keystorage (void)
ctx->timeout_raw = 300000;
register_worker_opt (type, "timeout", xml_handle_seconds, ctx,
- G_STRUCT_OFFSET (struct kvstorage_worker_ctx, timeout_raw));
+ G_STRUCT_OFFSET (struct kvstorage_worker_ctx, timeout_raw));
register_worker_opt (type, "redis", xml_handle_boolean, ctx,
- G_STRUCT_OFFSET (struct kvstorage_worker_ctx, is_redis));
+ G_STRUCT_OFFSET (struct kvstorage_worker_ctx, is_redis));
return ctx;
}
@@ -124,7 +133,7 @@ init_keystorage (void)
static gboolean
config_kvstorage_worker (struct rspamd_worker *worker)
{
- struct kvstorage_worker_ctx *ctx = worker->ctx;
+ struct kvstorage_worker_ctx *ctx = worker->ctx;
/* Init timeval */
msec_to_tv (ctx->timeout_raw, &ctx->io_timeout);
@@ -153,18 +162,18 @@ parse_kvstorage_command (struct kvstorage_session *session, gchar *c, guint len)
if (len == 3) {
/* Set or get command */
if ((c[0] == 'g' || c[0] == 'G') &&
- (c[1] == 'e' || c[1] == 'E') &&
- (c[2] == 't' || c[2] == 'T')) {
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 't' || c[2] == 'T')) {
session->command = KVSTORAGE_CMD_GET;
}
else if ((c[0] == 's' || c[0] == 'S') &&
- (c[1] == 'e' || c[1] == 'E') &&
- (c[2] == 't' || c[2] == 'T')) {
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 't' || c[2] == 'T')) {
session->command = KVSTORAGE_CMD_SET;
}
else if ((c[0] == 'd' || c[0] == 'D') &&
- (c[1] == 'e' || c[1] == 'E') &&
- (c[2] == 'l' || c[2] == 'L')) {
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 'l' || c[2] == 'L')) {
session->command = KVSTORAGE_CMD_DELETE;
}
else {
@@ -174,51 +183,52 @@ parse_kvstorage_command (struct kvstorage_session *session, gchar *c, guint len)
}
else if (len == 4) {
if ((c[0] == 'i' || c[0] == 'I') &&
- (c[1] == 'n' || c[1] == 'N') &&
- (c[2] == 'c' || c[2] == 'C') &&
- (c[3] == 'r' || c[3] == 'R')) {
+ (c[1] == 'n' || c[1] == 'N') &&
+ (c[2] == 'c' || c[2] == 'C') &&
+ (c[3] == 'r' || c[3] == 'R')) {
session->command = KVSTORAGE_CMD_INCR;
session->arg_data.value = 1;
}
else if ((c[0] == 'd' || c[0] == 'D') &&
- (c[1] == 'e' || c[1] == 'E') &&
- (c[2] == 'c' || c[2] == 'C') &&
- (c[3] == 'r' || c[3] == 'R')) {
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 'c' || c[2] == 'C') &&
+ (c[3] == 'r' || c[3] == 'R')) {
session->command = KVSTORAGE_CMD_DECR;
session->arg_data.value = -1;
}
else if (g_ascii_strncasecmp (c, "quit", 4) == 0) {
session->command = KVSTORAGE_CMD_QUIT;
}
- else if (g_ascii_strncasecmp (c, "sync", 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) {
+ else if (g_ascii_strncasecmp (c, "sync",
+ 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) {
session->command = KVSTORAGE_CMD_SYNC;
}
}
else if (len == 6) {
if ((c[0] == 'i' || c[0] == 'I') &&
- (c[1] == 'n' || c[1] == 'N') &&
- (c[2] == 'c' || c[2] == 'C') &&
- (c[3] == 'r' || c[3] == 'R') &&
- (c[4] == 'b' || c[4] == 'B') &&
- (c[5] == 'y' || c[5] == 'Y')) {
+ (c[1] == 'n' || c[1] == 'N') &&
+ (c[2] == 'c' || c[2] == 'C') &&
+ (c[3] == 'r' || c[3] == 'R') &&
+ (c[4] == 'b' || c[4] == 'B') &&
+ (c[5] == 'y' || c[5] == 'Y')) {
session->command = KVSTORAGE_CMD_INCR;
session->arg_data.value = 1;
}
else if ((c[0] == 'd' || c[0] == 'D') &&
- (c[1] == 'e' || c[1] == 'E') &&
- (c[2] == 'c' || c[2] == 'C') &&
- (c[3] == 'r' || c[3] == 'R') &&
- (c[4] == 'b' || c[4] == 'B') &&
- (c[5] == 'y' || c[5] == 'Y')) {
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 'c' || c[2] == 'C') &&
+ (c[3] == 'r' || c[3] == 'R') &&
+ (c[4] == 'b' || c[4] == 'B') &&
+ (c[5] == 'y' || c[5] == 'Y')) {
session->command = KVSTORAGE_CMD_DECR;
session->arg_data.value = -1;
}
else if ((c[0] == 'd' || c[0] == 'D') &&
- (c[1] == 'e' || c[1] == 'E') &&
- (c[2] == 'l' || c[2] == 'L') &&
- (c[3] == 'e' || c[3] == 'E') &&
- (c[4] == 't' || c[4] == 'T') &&
- (c[5] == 'e' || c[5] == 'E')) {
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 'l' || c[2] == 'L') &&
+ (c[3] == 'e' || c[3] == 'E') &&
+ (c[4] == 't' || c[4] == 'T') &&
+ (c[5] == 'e' || c[5] == 'E')) {
session->command = KVSTORAGE_CMD_DELETE;
}
else if (g_ascii_strncasecmp (c, "select", 6) == 0) {
@@ -238,9 +248,9 @@ parse_kvstorage_command (struct kvstorage_session *session, gchar *c, guint len)
static gboolean
parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
{
- gchar *p, *c, *end;
- gint state = 0, next_state = 0;
- gboolean is_redis;
+ gchar *p, *c, *end;
+ gint state = 0, next_state = 0;
+ gboolean is_redis;
p = in->begin;
end = in->begin + in->len;
@@ -253,7 +263,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
case 0:
/* At this state we try to read identifier of storage */
if (g_ascii_isdigit (*p)) {
- p ++;
+ p++;
}
else {
if (g_ascii_isspace (*p) && p != c) {
@@ -284,7 +294,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
case 1:
/* At this state we parse command */
if (g_ascii_isalpha (*p) && p != end) {
- p ++;
+ p++;
}
else {
if (parse_kvstorage_command (session, c, p - c)) {
@@ -315,20 +325,21 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
case 2:
/* Read and store key */
if (!g_ascii_isspace (*p) && end != p) {
- p ++;
+ p++;
}
else {
if (p == c) {
return FALSE;
}
else {
- session->key = rspamd_mempool_alloc (session->pool, p - c + 1);
+ session->key = rspamd_mempool_alloc (session->pool,
+ p - c + 1);
rspamd_strlcpy (session->key, c, p - c + 1);
session->keylen = p - c;
/* Now we must select next state based on command */
if (session->command == KVSTORAGE_CMD_SET ||
- session->command == KVSTORAGE_CMD_INCR ||
- session->command == KVSTORAGE_CMD_DECR) {
+ session->command == KVSTORAGE_CMD_INCR ||
+ session->command == KVSTORAGE_CMD_DECR) {
/* Read flags */
state = 99;
if (is_redis) {
@@ -355,7 +366,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
case 3:
/* Read flags */
if (g_ascii_isdigit (*p)) {
- p ++;
+ p++;
}
else {
if (g_ascii_isspace (*p)) {
@@ -377,7 +388,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
case 4:
/* Read exptime */
if (g_ascii_isdigit (*p)) {
- p ++;
+ p++;
}
else {
if (g_ascii_isspace (*p)) {
@@ -393,7 +404,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
case 5:
/* Read size or incr/decr values */
if (g_ascii_isdigit (*p)) {
- p ++;
+ p++;
}
else {
if (g_ascii_isspace (*p) || p >= end - 1) {
@@ -404,7 +415,8 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
if (p != c) {
session->arg_data.value = strtoul (c, NULL, 10);
if (session->command == KVSTORAGE_CMD_DECR) {
- session->arg_data.value = -session->arg_data.value;
+ session->arg_data.value =
+ -session->arg_data.value;
}
}
else if (session->command == KVSTORAGE_CMD_INCR) {
@@ -424,7 +436,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
case 6:
/* Read index of storage */
if (g_ascii_isdigit (*p)) {
- p ++;
+ p++;
}
else {
if (g_ascii_isspace (*p) || end == p) {
@@ -439,7 +451,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
case 7:
/* Read arguments count */
if (g_ascii_isdigit (*p)) {
- p ++;
+ p++;
}
else {
if (g_ascii_isspace (*p) || end == p) {
@@ -457,7 +469,7 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
case 99:
/* Skip spaces state */
if (g_ascii_isspace (*p)) {
- p ++;
+ p++;
}
else {
c = p;
@@ -478,33 +490,44 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
static gboolean
kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
{
- gint r;
- gchar outbuf[BUFSIZ], intbuf[sizeof ("9223372036854775807")];
- gboolean res;
- struct rspamd_kv_element *elt;
- guint eltlen;
- glong longval;
+ gint r;
+ gchar outbuf[BUFSIZ], intbuf[sizeof ("9223372036854775807")];
+ gboolean res;
+ struct rspamd_kv_element *elt;
+ guint eltlen;
+ glong longval;
if (session->command == KVSTORAGE_CMD_SET) {
session->state = KVSTORAGE_STATE_READ_DATA;
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->arg_data.length);
+ rspamd_set_dispatcher_policy (session->dispather,
+ BUFFER_CHARACTER,
+ session->arg_data.length);
}
else if (session->command == KVSTORAGE_CMD_GET) {
- elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->keylen, session->now);
+ elt = rspamd_kv_storage_lookup (session->cf->storage,
+ session->key,
+ session->keylen,
+ session->now);
if (elt == NULL) {
RW_R_UNLOCK (&session->cf->storage->rwlock);
if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
- sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ ERROR_NOT_FOUND,
+ sizeof (ERROR_NOT_FOUND) - 1,
+ FALSE,
+ TRUE);
}
else {
return rspamd_dispatcher_write (session->dispather, "$-1" CRLF,
- sizeof ("$-1" CRLF) - 1, FALSE, TRUE);
+ sizeof ("$-1" CRLF) - 1, FALSE, TRUE);
}
}
else {
if (elt->flags & KV_ELT_INTEGER) {
- eltlen = rspamd_snprintf (intbuf, sizeof (intbuf), "%l", ELT_LONG (elt));
+ eltlen = rspamd_snprintf (intbuf,
+ sizeof (intbuf),
+ "%l",
+ ELT_LONG (elt));
}
else {
@@ -512,34 +535,43 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
}
if (!is_redis) {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
- ELT_KEY (elt), elt->flags, eltlen);
+ r = rspamd_snprintf (outbuf,
+ sizeof (outbuf),
+ "VALUE %s %ud %ud" CRLF,
+ ELT_KEY (elt),
+ elt->flags,
+ eltlen);
}
else {
r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF,
eltlen);
}
if (!rspamd_dispatcher_write (session->dispather, outbuf,
- r, TRUE, FALSE)) {
+ r, TRUE, FALSE)) {
RW_R_UNLOCK (&session->cf->storage->rwlock);
return FALSE;
}
if (elt->flags & KV_ELT_INTEGER) {
- if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) {
+ if (!rspamd_dispatcher_write (session->dispather, intbuf,
+ eltlen, TRUE, TRUE)) {
RW_R_UNLOCK (&session->cf->storage->rwlock);
return FALSE;
}
}
else {
- if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) {
+ if (!rspamd_dispatcher_write (session->dispather,
+ ELT_DATA (elt), eltlen, TRUE, TRUE)) {
RW_R_UNLOCK (&session->cf->storage->rwlock);
return FALSE;
}
}
session->elt = elt;
if (!is_redis) {
- res = rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
- sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE);
+ res = rspamd_dispatcher_write (session->dispather,
+ CRLF "END" CRLF,
+ sizeof (CRLF "END" CRLF) - 1,
+ FALSE,
+ TRUE);
}
else {
res = rspamd_dispatcher_write (session->dispather, CRLF,
@@ -553,42 +585,58 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
}
}
else if (session->command == KVSTORAGE_CMD_DELETE) {
- elt = rspamd_kv_storage_delete (session->cf->storage, session->key, session->keylen);
+ elt = rspamd_kv_storage_delete (session->cf->storage,
+ session->key,
+ session->keylen);
if (elt != NULL) {
if ((elt->flags & KV_ELT_DIRTY) == 0) {
/* Free memory if backend has deleted this element */
g_slice_free1 (ELT_SIZE (elt), elt);
}
if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
- sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ "DELETED" CRLF,
+ sizeof ("DELETED" CRLF) - 1,
+ FALSE,
+ TRUE);
}
else {
return rspamd_dispatcher_write (session->dispather, ":1" CRLF,
- sizeof (":1" CRLF) - 1, FALSE, TRUE);
+ sizeof (":1" CRLF) - 1, FALSE, TRUE);
}
}
else {
if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
- sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ ERROR_NOT_FOUND,
+ sizeof (ERROR_NOT_FOUND) - 1,
+ FALSE,
+ TRUE);
}
else {
return rspamd_dispatcher_write (session->dispather, ":0" CRLF,
- sizeof (":0" CRLF) - 1, FALSE, TRUE);
+ sizeof (":0" CRLF) - 1, FALSE, TRUE);
}
}
}
- else if (session->command == KVSTORAGE_CMD_INCR || session->command == KVSTORAGE_CMD_DECR) {
+ else if (session->command == KVSTORAGE_CMD_INCR || session->command ==
+ KVSTORAGE_CMD_DECR) {
longval = session->arg_data.value;
- if (!rspamd_kv_storage_increment (session->cf->storage, session->key, session->keylen, &longval)) {
+ if (!rspamd_kv_storage_increment (session->cf->storage, session->key,
+ session->keylen, &longval)) {
if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
- sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ ERROR_NOT_FOUND,
+ sizeof (ERROR_NOT_FOUND) - 1,
+ FALSE,
+ TRUE);
}
else {
- return rspamd_dispatcher_write (session->dispather, "-ERR not found" CRLF,
- sizeof ("-ERR not found" CRLF) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ "-ERR not found" CRLF,
+ sizeof ("-ERR not found" CRLF) - 1,
+ FALSE,
+ TRUE);
}
}
else {
@@ -601,41 +649,61 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
longval);
}
if (!rspamd_dispatcher_write (session->dispather, outbuf,
- r, FALSE, FALSE)) {
+ r, FALSE, FALSE)) {
return FALSE;
}
}
}
else if (session->command == KVSTORAGE_CMD_SYNC) {
- if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) {
+ if (session->cf->storage->backend == NULL ||
+ session->cf->storage->backend->sync_func == NULL) {
if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, ERROR_COMMON,
- sizeof (ERROR_COMMON) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ ERROR_COMMON,
+ sizeof (ERROR_COMMON) - 1,
+ FALSE,
+ TRUE);
}
else {
- return rspamd_dispatcher_write (session->dispather, "-ERR unsupported" CRLF,
- sizeof ("-ERR unsupported" CRLF) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ "-ERR unsupported" CRLF,
+ sizeof ("-ERR unsupported" CRLF) - 1,
+ FALSE,
+ TRUE);
}
}
else {
- if (session->cf->storage->backend->sync_func (session->cf->storage->backend)) {
+ if (session->cf->storage->backend->sync_func (session->cf->storage->
+ backend)) {
if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, "SYNCED" CRLF,
- sizeof ("SYNCED" CRLF) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ "SYNCED" CRLF,
+ sizeof ("SYNCED" CRLF) - 1,
+ FALSE,
+ TRUE);
}
else {
- return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
- sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ "+OK" CRLF,
+ sizeof ("+OK" CRLF) - 1,
+ FALSE,
+ TRUE);
}
}
else {
if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, "NOT_SYNCED" CRLF,
- sizeof ("NOT_SYNCED" CRLF) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ "NOT_SYNCED" CRLF,
+ sizeof ("NOT_SYNCED" CRLF) - 1,
+ FALSE,
+ TRUE);
}
else {
- return rspamd_dispatcher_write (session->dispather, "-ERR not synced" CRLF,
- sizeof ("-ERR not synced" CRLF) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ "-ERR not synced" CRLF,
+ sizeof ("-ERR not synced" CRLF) - 1,
+ FALSE,
+ TRUE);
}
}
}
@@ -643,11 +711,11 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
else if (session->command == KVSTORAGE_CMD_SELECT) {
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, "SELECTED" CRLF,
- sizeof ("SELECTED" CRLF) - 1, FALSE, TRUE);
+ sizeof ("SELECTED" CRLF) - 1, FALSE, TRUE);
}
else {
return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
- sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
}
}
else if (session->command == KVSTORAGE_CMD_QUIT) {
@@ -662,8 +730,8 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
static gboolean
kvstorage_read_arglen (f_str_t *in, guint *len)
{
- gchar *p = in->begin, *end = in->begin + in->len, *c;
- gint state = 0;
+ gchar *p = in->begin, *end = in->begin + in->len, *c;
+ gint state = 0;
c = p;
while (p < end) {
@@ -673,14 +741,14 @@ kvstorage_read_arglen (f_str_t *in, guint *len)
return FALSE;
}
else {
- p ++;
+ p++;
c = p;
state = 1;
}
break;
case 1:
if (g_ascii_isdigit (*p) && p != end - 1) {
- p ++;
+ p++;
}
else {
if (p != end - 1) {
@@ -731,12 +799,12 @@ kvstorage_check_argnum (struct kvstorage_session *session)
static gboolean
kvstorage_read_socket (f_str_t * in, void *arg)
{
- struct kvstorage_session *session = (struct kvstorage_session *) arg;
- struct kvstorage_worker_thread *thr;
- gint r;
- guint arglen = 0;
- gchar outbuf[BUFSIZ];
- gboolean is_redis;
+ struct kvstorage_session *session = (struct kvstorage_session *) arg;
+ struct kvstorage_worker_thread *thr;
+ gint r;
+ guint arglen = 0;
+ gchar outbuf[BUFSIZ];
+ gboolean is_redis;
if (in->len == 0) {
/* Skip empty commands */
@@ -749,16 +817,22 @@ kvstorage_read_socket (f_str_t * in, void *arg)
case KVSTORAGE_STATE_READ_CMD:
/* Update timestamp */
session->now = time (NULL);
- if (! parse_kvstorage_line (session, in)) {
+ if (!parse_kvstorage_line (session, in)) {
thr_info ("%ud: unknown command: %V", thr->id, in);
if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND,
- sizeof (ERROR_UNKNOWN_COMMAND) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ ERROR_UNKNOWN_COMMAND,
+ sizeof (ERROR_UNKNOWN_COMMAND) - 1,
+ FALSE,
+ TRUE);
}
else {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown command '%V'" CRLF, in);
+ r = rspamd_snprintf (outbuf,
+ sizeof (outbuf),
+ "-ERR unknown command '%V'" CRLF,
+ in);
return rspamd_dispatcher_write (session->dispather, outbuf,
- r, FALSE, TRUE);
+ r, FALSE, TRUE);
}
}
else {
@@ -766,12 +840,18 @@ kvstorage_read_socket (f_str_t * in, void *arg)
if (session->cf == NULL) {
thr_info ("%ud: bad keystorage: %ud", thr->id, session->id);
if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, ERROR_INVALID_KEYSTORAGE,
- sizeof (ERROR_INVALID_KEYSTORAGE) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ ERROR_INVALID_KEYSTORAGE,
+ sizeof (ERROR_INVALID_KEYSTORAGE) - 1,
+ FALSE,
+ TRUE);
}
else {
- return rspamd_dispatcher_write (session->dispather, "-ERR unknown keystorage" CRLF,
- sizeof ("-ERR unknown keystorage" CRLF) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ "-ERR unknown keystorage" CRLF,
+ sizeof ("-ERR unknown keystorage" CRLF) - 1,
+ FALSE,
+ TRUE);
}
}
if (session->state != KVSTORAGE_STATE_READ_ARGLEN) {
@@ -780,44 +860,59 @@ kvstorage_read_socket (f_str_t * in, void *arg)
}
break;
case KVSTORAGE_STATE_READ_ARGLEN:
- if (! kvstorage_read_arglen (in, &arglen)) {
+ if (!kvstorage_read_arglen (in, &arglen)) {
session->state = KVSTORAGE_STATE_READ_CMD;
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown arglen '%V'" CRLF, in);
+ r = rspamd_snprintf (outbuf,
+ sizeof (outbuf),
+ "-ERR unknown arglen '%V'" CRLF,
+ in);
return rspamd_dispatcher_write (session->dispather, outbuf,
- r, FALSE, TRUE);
+ r, FALSE, TRUE);
}
else {
session->state = KVSTORAGE_STATE_READ_ARG;
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, arglen);
+ rspamd_set_dispatcher_policy (session->dispather,
+ BUFFER_CHARACTER,
+ arglen);
}
break;
case KVSTORAGE_STATE_READ_ARG:
if (session->argnum == 0) {
/* Read command */
- if (! parse_kvstorage_command (session, in->begin, in->len)) {
+ if (!parse_kvstorage_command (session, in->begin, in->len)) {
session->state = KVSTORAGE_STATE_READ_CMD;
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown command '%V'" CRLF, in);
+ r = rspamd_snprintf (outbuf,
+ sizeof (outbuf),
+ "-ERR unknown command '%V'" CRLF,
+ in);
return rspamd_dispatcher_write (session->dispather, outbuf,
- r, FALSE, TRUE);
+ r, FALSE, TRUE);
}
else {
- if (! kvstorage_check_argnum (session)) {
+ if (!kvstorage_check_argnum (session)) {
session->state = KVSTORAGE_STATE_READ_CMD;
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR invalid argnum for command '%V': %ud" CRLF,
- in, session->argc);
+ r = rspamd_snprintf (outbuf,
+ sizeof (outbuf),
+ "-ERR invalid argnum for command '%V': %ud" CRLF,
+ in,
+ session->argc);
return rspamd_dispatcher_write (session->dispather, outbuf,
- r, FALSE, TRUE);
+ r, FALSE, TRUE);
}
else {
if (session->argnum == session->argc - 1) {
session->state = KVSTORAGE_STATE_READ_CMD;
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ rspamd_set_dispatcher_policy (session->dispather,
+ BUFFER_LINE,
+ -1);
return kvstorage_process_command (session, TRUE);
}
else {
- session->argnum ++;
+ session->argnum++;
session->state = KVSTORAGE_STATE_READ_ARGLEN;
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ rspamd_set_dispatcher_policy (session->dispather,
+ BUFFER_LINE,
+ -1);
}
}
}
@@ -829,21 +924,28 @@ kvstorage_read_socket (f_str_t * in, void *arg)
session->keylen = in->len;
if (session->argnum == session->argc - 1) {
session->state = KVSTORAGE_STATE_READ_CMD;
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ rspamd_set_dispatcher_policy (session->dispather,
+ BUFFER_LINE,
+ -1);
return kvstorage_process_command (session, TRUE);
}
else {
- session->argnum ++;
+ session->argnum++;
session->state = KVSTORAGE_STATE_READ_ARGLEN;
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ rspamd_set_dispatcher_policy (session->dispather,
+ BUFFER_LINE,
+ -1);
}
}
else {
/* Special case for select command */
session->state = KVSTORAGE_STATE_READ_CMD;
- rspamd_strlcpy (outbuf, in->begin, MIN (sizeof (outbuf), in->len));
+ rspamd_strlcpy (outbuf, in->begin, MIN (sizeof (outbuf),
+ in->len));
session->id = strtoul (outbuf, NULL, 10);
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ rspamd_set_dispatcher_policy (session->dispather,
+ BUFFER_LINE,
+ -1);
return kvstorage_process_command (session, TRUE);
}
}
@@ -851,25 +953,37 @@ kvstorage_read_socket (f_str_t * in, void *arg)
/* We get datablock for set command */
if (session->command == KVSTORAGE_CMD_SET && session->argc == 3) {
session->state = KVSTORAGE_STATE_READ_CMD;
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
- if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
- in->begin, in->len,
- session->flags, session->expire)) {
- return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
- sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ rspamd_set_dispatcher_policy (session->dispather,
+ BUFFER_LINE,
+ -1);
+ if (rspamd_kv_storage_insert (session->cf->storage,
+ session->key, session->keylen,
+ in->begin, in->len,
+ session->flags, session->expire)) {
+ return rspamd_dispatcher_write (session->dispather,
+ "+OK" CRLF,
+ sizeof ("+OK" CRLF) - 1,
+ FALSE,
+ TRUE);
}
else {
- return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
- sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ "-ERR not stored" CRLF,
+ sizeof ("-ERR not stored" CRLF) - 1,
+ FALSE,
+ TRUE);
}
}
- else if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) {
+ else if (session->command == KVSTORAGE_CMD_SET && session->argc ==
+ 4) {
/* It is expire argument */
session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_strtol (in->begin, in->len, (glong *)&session->expire);
- session->argnum ++;
+ session->argnum++;
session->state = KVSTORAGE_STATE_READ_ARGLEN;
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ rspamd_set_dispatcher_policy (session->dispather,
+ BUFFER_LINE,
+ -1);
}
else {
session->state = KVSTORAGE_STATE_READ_CMD;
@@ -877,7 +991,9 @@ kvstorage_read_socket (f_str_t * in, void *arg)
if (session->command == KVSTORAGE_CMD_DECR) {
session->arg_data.value = -session->arg_data.value;
}
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ rspamd_set_dispatcher_policy (session->dispather,
+ BUFFER_LINE,
+ -1);
return kvstorage_process_command (session, TRUE);
}
}
@@ -885,16 +1001,25 @@ kvstorage_read_socket (f_str_t * in, void *arg)
/* We get datablock for set command */
if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) {
session->state = KVSTORAGE_STATE_READ_CMD;
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
- if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
- in->begin, in->len,
- session->flags, session->expire)) {
- return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
- sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ rspamd_set_dispatcher_policy (session->dispather,
+ BUFFER_LINE,
+ -1);
+ if (rspamd_kv_storage_insert (session->cf->storage,
+ session->key, session->keylen,
+ in->begin, in->len,
+ session->flags, session->expire)) {
+ return rspamd_dispatcher_write (session->dispather,
+ "+OK" CRLF,
+ sizeof ("+OK" CRLF) - 1,
+ FALSE,
+ TRUE);
}
else {
- return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
- sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ "-ERR not stored" CRLF,
+ sizeof ("-ERR not stored" CRLF) - 1,
+ FALSE,
+ TRUE);
}
}
}
@@ -902,26 +1027,36 @@ kvstorage_read_socket (f_str_t * in, void *arg)
case KVSTORAGE_STATE_READ_DATA:
session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
- if (rspamd_kv_storage_insert (session->cf->storage, session->key, session->keylen,
- in->begin, in->len,
- session->flags, session->expire)) {
+ if (rspamd_kv_storage_insert (session->cf->storage, session->key,
+ session->keylen,
+ in->begin, in->len,
+ session->flags, session->expire)) {
if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, "STORED" CRLF,
- sizeof ("STORED" CRLF) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ "STORED" CRLF,
+ sizeof ("STORED" CRLF) - 1,
+ FALSE,
+ TRUE);
}
else {
return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
- sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
}
}
else {
if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED,
- sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ ERROR_NOT_STORED,
+ sizeof (ERROR_NOT_STORED) - 1,
+ FALSE,
+ TRUE);
}
else {
- return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
- sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
+ return rspamd_dispatcher_write (session->dispather,
+ "-ERR not stored" CRLF,
+ sizeof ("-ERR not stored" CRLF) - 1,
+ FALSE,
+ TRUE);
}
}
@@ -937,17 +1072,18 @@ kvstorage_read_socket (f_str_t * in, void *arg)
static gboolean
kvstorage_write_socket (void *arg)
{
- struct kvstorage_session *session = (struct kvstorage_session *) arg;
+ struct kvstorage_session *session = (struct kvstorage_session *) arg;
if (session->elt) {
if ((session->elt->flags & KV_ELT_NEED_INSERT) != 0) {
/* Insert to cache and free element */
session->elt->flags &= ~KV_ELT_NEED_INSERT;
RW_R_UNLOCK (&session->cf->storage->rwlock);
- rspamd_kv_storage_insert_cache (session->cf->storage, ELT_KEY (session->elt),
- session->elt->keylen, ELT_DATA (session->elt),
- session->elt->size, session->elt->flags,
- session->elt->expire, NULL);
+ rspamd_kv_storage_insert_cache (session->cf->storage,
+ ELT_KEY (session->elt),
+ session->elt->keylen, ELT_DATA (session->elt),
+ session->elt->size, session->elt->flags,
+ session->elt->expire, NULL);
g_free (session->elt);
session->elt = NULL;
return TRUE;
@@ -966,8 +1102,8 @@ kvstorage_write_socket (void *arg)
static void
kvstorage_err_socket (GError * err, void *arg)
{
- struct kvstorage_session *session = (struct kvstorage_session *) arg;
- struct kvstorage_worker_thread *thr;
+ struct kvstorage_session *session = (struct kvstorage_session *) arg;
+ struct kvstorage_worker_thread *thr;
thr = session->thr;
if (err->code != -1) {
@@ -990,14 +1126,15 @@ kvstorage_err_socket (GError * err, void *arg)
static void
thr_accept_socket (gint fd, short what, void *arg)
{
- struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg;
- union sa_union su;
- socklen_t addrlen = sizeof (su.ss);
- gint nfd;
- struct kvstorage_session *session;
+ struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg;
+ union sa_union su;
+ socklen_t addrlen = sizeof (su.ss);
+ gint nfd;
+ struct kvstorage_session *session;
g_mutex_lock (thr->accept_mtx);
- if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
+ if ((nfd =
+ accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno));
g_mutex_unlock (thr->accept_mtx);
return;
@@ -1014,9 +1151,14 @@ thr_accept_socket (gint fd, short what, void *arg)
session->state = KVSTORAGE_STATE_READ_CMD;
session->thr = thr;
session->sock = nfd;
- session->dispather = rspamd_create_dispatcher (thr->ev_base, nfd, BUFFER_LINE,
- kvstorage_read_socket, kvstorage_write_socket,
- kvstorage_err_socket, thr->tv, session);
+ session->dispather = rspamd_create_dispatcher (thr->ev_base,
+ nfd,
+ BUFFER_LINE,
+ kvstorage_read_socket,
+ kvstorage_write_socket,
+ kvstorage_err_socket,
+ thr->tv,
+ session);
g_mutex_unlock (thr->accept_mtx);
session->elt = NULL;
@@ -1026,7 +1168,7 @@ thr_accept_socket (gint fd, short what, void *arg)
}
else if (su.ss.ss_family == AF_INET) {
memcpy (&session->client_addr, &su.s4.sin_addr,
- sizeof (struct in_addr));
+ sizeof (struct in_addr));
}
}
@@ -1036,8 +1178,8 @@ thr_accept_socket (gint fd, short what, void *arg)
static void
thr_term_socket (gint fd, short what, void *arg)
{
- struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg;
- struct timeval tv;
+ struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg;
+ struct timeval tv;
if (read (fd, &tv, sizeof (struct timeval)) != sizeof (struct timeval)) {
thr_err ("cannot read data from socket: %s", strerror (errno));
@@ -1055,18 +1197,26 @@ thr_term_socket (gint fd, short what, void *arg)
static gpointer
kvstorage_thread (gpointer ud)
{
- struct kvstorage_worker_thread *thr = ud;
+ struct kvstorage_worker_thread *thr = ud;
/* Block signals as it is dispatcher deity */
sigprocmask (SIG_BLOCK, thr->signals, NULL);
/* Init thread specific events */
thr->ev_base = event_init ();
- event_set (&thr->bind_ev, thr->worker->cf->listen_sock, EV_READ | EV_PERSIST, thr_accept_socket, (void *)thr);
+ event_set (&thr->bind_ev,
+ thr->worker->cf->listen_sock,
+ EV_READ | EV_PERSIST,
+ thr_accept_socket,
+ (void *)thr);
event_base_set (thr->ev_base, &thr->bind_ev);
event_add (&thr->bind_ev, NULL);
- event_set (&thr->term_ev, thr->term_sock[0], EV_READ | EV_PERSIST, thr_term_socket, (void *)thr);
+ event_set (&thr->term_ev,
+ thr->term_sock[0],
+ EV_READ | EV_PERSIST,
+ thr_term_socket,
+ (void *)thr);
event_base_set (thr->ev_base, &thr->term_ev);
event_add (&thr->term_ev, NULL);
@@ -1079,12 +1229,17 @@ kvstorage_thread (gpointer ud)
* Create new thread, set it detached
*/
static struct kvstorage_worker_thread *
-create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_ctx *ctx, guint id, sigset_t *signals)
+create_kvstorage_thread (struct rspamd_worker *worker,
+ struct kvstorage_worker_ctx *ctx,
+ guint id,
+ sigset_t *signals)
{
- struct kvstorage_worker_thread *new;
- GError *err = NULL;
+ struct kvstorage_worker_thread *new;
+ GError *err = NULL;
- new = rspamd_mempool_alloc (ctx->pool, sizeof (struct kvstorage_worker_thread));
+ new =
+ rspamd_mempool_alloc (ctx->pool,
+ sizeof (struct kvstorage_worker_thread));
new->ctx = ctx;
new->worker = worker;
new->tv = &ctx->io_timeout;
@@ -1102,10 +1257,14 @@ create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_c
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err);
#else
- gchar *name;
+ gchar *name;
- name = rspamd_mempool_alloc (ctx->pool, sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1);
- rspamd_snprintf (name, sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1, "kvstorage_thread%d", id);
+ name = rspamd_mempool_alloc (ctx->pool,
+ sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1);
+ rspamd_snprintf (name,
+ sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1,
+ "kvstorage_thread%d",
+ id);
new->thr = g_thread_new (name, kvstorage_thread, new);
#endif
@@ -1125,17 +1284,18 @@ create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_c
void
start_keystorage (struct rspamd_worker *worker)
{
- struct sigaction signals;
- struct kvstorage_worker_ctx *ctx = worker->ctx;
- guint i;
- struct kvstorage_worker_thread *thr;
- struct timeval tv;
- GList *cur;
+ struct sigaction signals;
+ struct kvstorage_worker_ctx *ctx = worker->ctx;
+ guint i;
+ struct kvstorage_worker_thread *thr;
+ struct timeval tv;
+ GList *cur;
gperf_profiler_init (worker->srv->cfg, "kvstorage");
if (!g_thread_supported ()) {
- msg_err ("threads support is not supported on your system so kvstorage is not functionable");
+ msg_err (
+ "threads support is not supported on your system so kvstorage is not functionable");
exit (EXIT_SUCCESS);
}
/* Create socketpair */
@@ -1148,7 +1308,8 @@ start_keystorage (struct rspamd_worker *worker)
#if _EVENT_NUMERIC_VERSION > 0x02000000
if (evthread_use_pthreads () == -1) {
- msg_err ("threads support is not supported in your libevent so kvstorage is not functionable");
+ msg_err (
+ "threads support is not supported in your libevent so kvstorage is not functionable");
exit (EXIT_SUCCESS);
}
#endif
@@ -1176,7 +1337,7 @@ start_keystorage (struct rspamd_worker *worker)
#endif
/* Start workers threads */
- for (i = 0; i < worker->cf->count; i ++) {
+ for (i = 0; i < worker->cf->count; i++) {
thr = create_kvstorage_thread (worker, ctx, i, &signals.sa_mask);
if (thr != NULL) {
ctx->threads = g_list_prepend (ctx->threads, thr);
@@ -1185,7 +1346,7 @@ start_keystorage (struct rspamd_worker *worker)
sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL);
/* Signal processing cycle */
- for (;;) {
+ for (;; ) {
msg_debug ("calling sigsuspend");
sigemptyset (&signals.sa_mask);
sigsuspend (&signals.sa_mask);
@@ -1197,9 +1358,11 @@ start_keystorage (struct rspamd_worker *worker)
cur = ctx->threads;
while (cur) {
thr = cur->data;
- while (write (thr->term_sock[1], &tv, sizeof (struct timeval)) == -1) {
+ while (write (thr->term_sock[1], &tv,
+ sizeof (struct timeval)) == -1) {
if (errno != EAGAIN) {
- msg_err ("write to term socket failed: %s", strerror (errno));
+ msg_err ("write to term socket failed: %s",
+ strerror (errno));
abort ();
}
}
@@ -1211,13 +1374,16 @@ start_keystorage (struct rspamd_worker *worker)
soft_wanna_die = 0;
tv.tv_sec = SOFT_SHUTDOWN_TIME;
tv.tv_usec = 0;
- msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+ msg_info ("worker's shutdown is pending in %d sec",
+ SOFT_SHUTDOWN_TIME);
cur = ctx->threads;
while (cur) {
thr = cur->data;
- while (write (thr->term_sock[1], &tv, sizeof (struct timeval)) == -1) {
+ while (write (thr->term_sock[1], &tv,
+ sizeof (struct timeval)) == -1) {
if (errno != EAGAIN) {
- msg_err ("write to term socket failed: %s", strerror (errno));
+ msg_err ("write to term socket failed: %s",
+ strerror (errno));
abort ();
}
}