Przeglądaj źródła

* Fix threading in kvstorage.

Rspamd now can detect and work with libevent-2.
tags/0.4.5
Vsevolod Stakhov 12 lat temu
rodzic
commit
c750ae5859
6 zmienionych plików z 87 dodań i 72 usunięć
  1. 10
    0
      CMakeLists.txt
  2. 3
    0
      config.h.in
  3. 3
    3
      src/buffer.c
  4. 67
    69
      src/kvstorage_server.c
  5. 2
    0
      src/kvstorage_server.h
  6. 2
    0
      src/util.c

+ 10
- 0
CMakeLists.txt Wyświetl plik

@@ -672,6 +672,13 @@ ELSE(HAVE_SIGINFO_H)
CHECK_SYMBOL_EXISTS(SA_SIGINFO "signal.h" HAVE_SA_SIGINFO)
ENDIF(HAVE_SIGINFO_H)

# Some hack for libevent 2.0
CHECK_C_SOURCE_COMPILES ("#include <event.h>
#if _EVENT_NUMERIC_VERSION < 0x02000000
#error Unsupported
#endif
int main() { return 0;}" HAVE_LIBEVENT2)

IF(NOT CMAKE_SYSTEM_NAME STREQUAL "SunOS")
IF(HAVE_CLOCK_GETTIME)
CHECK_SYMBOL_EXISTS(CLOCK_PROCESS_CPUTIME_ID time.h HAVE_CLOCK_PROCESS_CPUTIME_ID)
@@ -810,6 +817,9 @@ IF(LIBDB_LIBRARY)
TARGET_LINK_LIBRARIES(rspamd db-4)
ENDIF(LIBDB_LIBRARY)
TARGET_LINK_LIBRARIES(rspamd event)
IF(HAVE_LIBEVENT2)
TARGET_LINK_LIBRARIES(rspamd event_pthreads)
ENDIF(HAVE_LIBEVENT2)
TARGET_LINK_LIBRARIES(rspamd pcre)

TARGET_LINK_LIBRARIES(rspamd ${CMAKE_REQUIRED_LIBRARIES})

+ 3
- 0
config.h.in Wyświetl plik

@@ -375,6 +375,9 @@
# include <siginfo.h>
#endif
#include <event.h>
#if _EVENT_NUMERIC_VERSION > 0x02000000
# include <event2/thread.h>
#endif
#include <glib.h>



+ 3
- 3
src/buffer.c Wyświetl plik

@@ -178,7 +178,7 @@ static gboolean
write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
{
GList *cur;
GError *err;
GError *err = NULL;
rspamd_buffer_t *buf;
ssize_t r;

@@ -264,7 +264,7 @@ static void
read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
{
ssize_t r;
GError *err;
GError *err = NULL;
f_str_t res;
gchar *c, *b;
gchar *end;
@@ -453,7 +453,7 @@ static void
dispatcher_cb (gint fd, short what, void *arg)
{
rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *) arg;
GError *err;
GError *err = NULL;

debug_ip("in dispatcher callback, what: %d, fd: %d", (gint)what, fd);


+ 67
- 69
src/kvstorage_server.c Wyświetl plik

@@ -37,10 +37,10 @@
#define ERROR_NOT_FOUND "NOT_FOUND" CRLF
#define ERROR_INVALID_KEYSTORAGE "CLIENT_ERROR storage does not exists" CRLF

/* This is required for normal signals processing */
static GList *global_evbases = NULL;
static struct event_base *main_base = NULL;

static sig_atomic_t wanna_die = 0;
static sig_atomic_t do_reopen_log = 0;
static sig_atomic_t soft_wanna_die = 0;

/* Logging functions */
#define thr_err(...) do { \
@@ -69,64 +69,20 @@ static void
sig_handler (gint signo, siginfo_t *info, void *unused)
#endif
{
struct timeval tv;
GList *cur;

switch (signo) {
case SIGUSR1:
reopen_log (rspamd_main->logger);
do_reopen_log = 1;
break;
case SIGINT:
case SIGTERM:
if (!wanna_die) {
wanna_die = 1;
tv.tv_sec = 0;
tv.tv_usec = 0;
cur = global_evbases;
while (cur) {
event_base_loopexit (cur->data, &tv);
}
event_base_loopexit (main_base, &tv);
#ifdef WITH_GPERF_TOOLS
ProfilerStop ();
#endif
}
wanna_die = 1;
break;
case SIGUSR2:
soft_wanna_die = 1;
break;
}
}

/*
* Config reload is designed by sending sigusr to active workers and pending shutdown of them
*/
static void
sigusr_handler (gint fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
/* Do not accept new connections, preparing to end worker's process */
struct timeval tv;
GList *cur;
struct kvstorage_worker_ctx *ctx;
struct kvstorage_worker_thread *thr;

ctx = worker->ctx;
if (! wanna_die) {
tv.tv_sec = SOFT_SHUTDOWN_TIME;
tv.tv_usec = 0;
event_del (&worker->sig_ev);
msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
cur = ctx->threads;
while (cur) {
thr = cur->data;
if (thr->ev_base != NULL) {
event_del (&thr->bind_ev);
event_base_loopexit (thr->ev_base, &tv);
}
}
event_base_loopexit (ctx->ev_base, &tv);
}
return;
}

gpointer
init_kvstorage_worker (void)
{
@@ -164,6 +120,7 @@ free_kvstorage_session (struct kvstorage_session *session)
rspamd_remove_dispatcher (session->dispather);
memory_pool_delete (session->pool);
close (session->sock);
g_slice_free1 (sizeof (struct kvstorage_session), session);
}

/**
@@ -460,8 +417,10 @@ kvstorage_err_socket (GError * err, void *arg)
struct kvstorage_worker_thread *thr;

thr = session->thr;
thr_info ("%ud: abnormally closing connection from: %s, error: %s",
if (err->code != -1) {
thr_info ("%ud: abnormally closing connection from: %s, error: %s",
thr->id, inet_ntoa (session->client_addr), err->message);
}
g_error_free (err);
free_kvstorage_session (session);
}
@@ -478,35 +437,35 @@ thr_accept_socket (gint fd, short what, void *arg)
gint nfd;
struct kvstorage_session *session;

g_static_mutex_lock (thr->accept_mtx);
if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno));
g_static_mutex_unlock (thr->accept_mtx);
return;
}
/* Check for EAGAIN */
if (nfd == 0) {
g_static_mutex_unlock (thr->accept_mtx);
return;
}

session = g_malloc (sizeof (struct kvstorage_session));
session = g_slice_alloc (sizeof (struct kvstorage_session));
session->pool = memory_pool_new (memory_pool_get_size ());
session->state = KVSTORAGE_STATE_READ_CMD;
session->thr = thr;
session->sock = nfd;
session->dispather = rspamd_create_dispatcher (thr->ctx->ev_base, nfd, BUFFER_LINE,
session->dispather = rspamd_create_dispatcher (thr->ev_base, nfd, BUFFER_LINE,
kvstorage_read_socket, NULL,
kvstorage_err_socket, thr->tv, session);
session->dispather->strip_eol = TRUE;

if (su.ss.ss_family == AF_UNIX) {
thr_info ("%ud: accepted connection from unix socket", thr->id);
session->client_addr.s_addr = INADDR_NONE;
}
else if (su.ss.ss_family == AF_INET) {
thr_info ("%ud: accepted connection from %s port %d", thr->id,
inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port));
memcpy (&session->client_addr, &su.s4.sin_addr,
sizeof (struct in_addr));
}
g_static_mutex_unlock (thr->accept_mtx);
}

/**
@@ -542,6 +501,7 @@ create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_c
new->worker = worker;
new->tv = &ctx->io_timeout;
new->log_mtx = &ctx->log_mtx;
new->accept_mtx = &ctx->accept_mtx;
new->id = id;
new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err);
new->ev_base = NULL;
@@ -563,6 +523,8 @@ start_kvstorage_worker (struct rspamd_worker *worker)
struct kvstorage_worker_ctx *ctx = worker->ctx;
guint i;
struct kvstorage_worker_thread *thr;
struct timeval tv;
GList *cur;

gperf_profiler_init (worker->srv->cfg, "kvstorage");

@@ -576,7 +538,6 @@ start_kvstorage_worker (struct rspamd_worker *worker)
exit (EXIT_SUCCESS);
}
worker->srv->pid = getpid ();
ctx->ev_base = event_init ();
ctx->threads = NULL;

g_thread_init (NULL);
@@ -586,7 +547,6 @@ start_kvstorage_worker (struct rspamd_worker *worker)
exit (EXIT_SUCCESS);
}
#endif
main_base = ctx->ev_base;

/* Set kvstorage options */
if ( !config_kvstorage_worker (worker)) {
@@ -597,22 +557,60 @@ start_kvstorage_worker (struct rspamd_worker *worker)
init_signals (&signals, sig_handler);
sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);

/* SIGUSR2 handler */
signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker);
event_base_set (ctx->ev_base, &worker->sig_ev);
signal_add (&worker->sig_ev, NULL);
/* Set umask */
umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);

/* Start workers threads */
g_static_mutex_init (&ctx->log_mtx);
g_static_mutex_init (&ctx->accept_mtx);
for (i = 0; i < worker->cf->count; i ++) {
thr = create_kvstorage_thread (worker, ctx, i);
ctx->threads = g_list_prepend (ctx->threads, thr);
}

/* Set umask */
umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);

event_base_loop (ctx->ev_base, 0);
sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL);
/* Signal processing cycle */
for (;;) {
msg_debug ("calling sigsuspend");
sigemptyset (&signals.sa_mask);
sigsuspend (&signals.sa_mask);
if (wanna_die == 1) {
wanna_die = 0;
tv.tv_sec = 0;
tv.tv_usec = 0;
msg_info ("worker's immediately shutdown is requested");
cur = ctx->threads;
while (cur) {
thr = cur->data;
if (thr->ev_base != NULL) {
event_del (&thr->bind_ev);
event_base_loopexit (thr->ev_base, &tv);
}
cur = g_list_next (cur);
}
break;
}
else if (soft_wanna_die == 1) {
soft_wanna_die = 0;
tv.tv_sec = SOFT_SHUTDOWN_TIME;
tv.tv_usec = 0;
msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
cur = ctx->threads;
while (cur) {
thr = cur->data;
if (thr->ev_base != NULL) {
event_del (&thr->bind_ev);
event_base_loopexit (thr->ev_base, &tv);
}
cur = g_list_next (cur);
}
break;
}
else if (do_reopen_log == 1) {
do_reopen_log = 0;
reopen_log (rspamd_main->logger);
}
}

close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);

+ 2
- 0
src/kvstorage_server.h Wyświetl plik

@@ -38,6 +38,7 @@ struct kvstorage_worker_ctx {
memory_pool_t *pool;
struct event_base *ev_base;
GStaticMutex log_mtx;
GStaticMutex accept_mtx;
};

struct kvstorage_worker_thread {
@@ -48,6 +49,7 @@ struct kvstorage_worker_thread {
GThread *thr;
struct event_base *ev_base;
GStaticMutex *log_mtx;
GStaticMutex *accept_mtx;
guint id;
};


+ 2
- 0
src/util.c Wyświetl plik

@@ -1239,6 +1239,8 @@ process_to_str (enum process_type type)
return "lmtp";
case TYPE_SMTP:
return "smtp";
case TYPE_KVSTORAGE:
return "keystorage";
default:
return "unknown";
}

Ładowanie…
Anuluj
Zapisz