diff options
Diffstat (limited to 'contrib/zstd/pool.c')
-rw-r--r-- | contrib/zstd/pool.c | 245 |
1 files changed, 167 insertions, 78 deletions
diff --git a/contrib/zstd/pool.c b/contrib/zstd/pool.c index 5f19f331b..aa4b4de0d 100644 --- a/contrib/zstd/pool.c +++ b/contrib/zstd/pool.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-present, Yann Collet, Facebook, Inc. + * Copyright (c) 2016-2020, Yann Collet, Facebook, Inc. * All rights reserved. * * This source code is licensed under both the BSD-style license (found in the @@ -10,8 +10,9 @@ /* ====== Dependencies ======= */ -#include <stddef.h> /* size_t */ -#include <stdlib.h> /* malloc, calloc, free */ +#include <stddef.h> /* size_t */ +#include "debug.h" /* assert */ +#include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */ #include "pool.h" /* ====== Compiler specifics ====== */ @@ -33,8 +34,9 @@ typedef struct POOL_job_s { struct POOL_ctx_s { ZSTD_customMem customMem; /* Keep track of the threads */ - pthread_t *threads; - size_t numThreads; + ZSTD_pthread_t* threads; + size_t threadCapacity; + size_t threadLimit; /* The queue is a circular buffer */ POOL_job *queue; @@ -48,34 +50,37 @@ struct POOL_ctx_s { int queueEmpty; /* The mutex protects the queue */ - pthread_mutex_t queueMutex; + ZSTD_pthread_mutex_t queueMutex; /* Condition variable for pushers to wait on when the queue is full */ - pthread_cond_t queuePushCond; + ZSTD_pthread_cond_t queuePushCond; /* Condition variables for poppers to wait on when the queue is empty */ - pthread_cond_t queuePopCond; + ZSTD_pthread_cond_t queuePopCond; /* Indicates if the queue is shutting down */ int shutdown; }; /* POOL_thread() : - Work thread for the thread pool. - Waits for jobs and executes them. - @returns : NULL on failure else non-null. -*/ + * Work thread for the thread pool. + * Waits for jobs and executes them. + * @returns : NULL on failure else non-null. + */ static void* POOL_thread(void* opaque) { POOL_ctx* const ctx = (POOL_ctx*)opaque; if (!ctx) { return NULL; } for (;;) { /* Lock the mutex and wait for a non-empty queue or until shutdown */ - pthread_mutex_lock(&ctx->queueMutex); - - while (ctx->queueEmpty && !ctx->shutdown) { - pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); - } - /* empty => shutting down: so stop */ - if (ctx->queueEmpty) { - pthread_mutex_unlock(&ctx->queueMutex); - return opaque; + ZSTD_pthread_mutex_lock(&ctx->queueMutex); + + while ( ctx->queueEmpty + || (ctx->numThreadsBusy >= ctx->threadLimit) ) { + if (ctx->shutdown) { + /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit), + * a few threads will be shutdown while !queueEmpty, + * but enough threads will remain active to finish the queue */ + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + return opaque; + } + ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); } /* Pop a job off the queue */ { POOL_job const job = ctx->queue[ctx->queueHead]; @@ -83,62 +88,69 @@ static void* POOL_thread(void* opaque) { ctx->numThreadsBusy++; ctx->queueEmpty = ctx->queueHead == ctx->queueTail; /* Unlock the mutex, signal a pusher, and run the job */ - pthread_mutex_unlock(&ctx->queueMutex); - pthread_cond_signal(&ctx->queuePushCond); + ZSTD_pthread_cond_signal(&ctx->queuePushCond); + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); job.function(job.opaque); /* If the intended queue size was 0, signal after finishing job */ + ZSTD_pthread_mutex_lock(&ctx->queueMutex); + ctx->numThreadsBusy--; if (ctx->queueSize == 1) { - pthread_mutex_lock(&ctx->queueMutex); - ctx->numThreadsBusy--; - pthread_mutex_unlock(&ctx->queueMutex); - pthread_cond_signal(&ctx->queuePushCond); - } } + ZSTD_pthread_cond_signal(&ctx->queuePushCond); + } + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + } } /* for (;;) */ - /* Unreachable */ + assert(0); /* Unreachable */ } -POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { +POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); } -POOL_ctx *POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) { - POOL_ctx *ctx; - /* Check the parameters */ +POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, + ZSTD_customMem customMem) { + POOL_ctx* ctx; + /* Check parameters */ if (!numThreads) { return NULL; } /* Allocate the context and zero initialize */ - ctx = (POOL_ctx *)ZSTD_calloc(sizeof(POOL_ctx), customMem); + ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem); if (!ctx) { return NULL; } /* Initialize the job queue. - * It needs one extra space since one space is wasted to differentiate empty - * and full queues. + * It needs one extra space since one space is wasted to differentiate + * empty and full queues. */ ctx->queueSize = queueSize + 1; - ctx->queue = (POOL_job*) malloc(ctx->queueSize * sizeof(POOL_job)); + ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem); ctx->queueHead = 0; ctx->queueTail = 0; ctx->numThreadsBusy = 0; ctx->queueEmpty = 1; - (void)pthread_mutex_init(&ctx->queueMutex, NULL); - (void)pthread_cond_init(&ctx->queuePushCond, NULL); - (void)pthread_cond_init(&ctx->queuePopCond, NULL); + { + int error = 0; + error |= ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL); + error |= ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL); + error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL); + if (error) { POOL_free(ctx); return NULL; } + } ctx->shutdown = 0; /* Allocate space for the thread handles */ - ctx->threads = (pthread_t*)ZSTD_malloc(numThreads * sizeof(pthread_t), customMem); - ctx->numThreads = 0; + ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem); + ctx->threadCapacity = 0; ctx->customMem = customMem; /* Check for errors */ if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; } /* Initialize the threads */ { size_t i; for (i = 0; i < numThreads; ++i) { - if (pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) { - ctx->numThreads = i; + if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) { + ctx->threadCapacity = i; POOL_free(ctx); return NULL; } } - ctx->numThreads = numThreads; + ctx->threadCapacity = numThreads; + ctx->threadLimit = numThreads; } return ctx; } @@ -146,80 +158,146 @@ POOL_ctx *POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customM /*! POOL_join() : Shutdown the queue, wake any sleeping threads, and join all of the threads. */ -static void POOL_join(POOL_ctx *ctx) { +static void POOL_join(POOL_ctx* ctx) { /* Shut down the queue */ - pthread_mutex_lock(&ctx->queueMutex); + ZSTD_pthread_mutex_lock(&ctx->queueMutex); ctx->shutdown = 1; - pthread_mutex_unlock(&ctx->queueMutex); + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); /* Wake up sleeping threads */ - pthread_cond_broadcast(&ctx->queuePushCond); - pthread_cond_broadcast(&ctx->queuePopCond); + ZSTD_pthread_cond_broadcast(&ctx->queuePushCond); + ZSTD_pthread_cond_broadcast(&ctx->queuePopCond); /* Join all of the threads */ { size_t i; - for (i = 0; i < ctx->numThreads; ++i) { - pthread_join(ctx->threads[i], NULL); + for (i = 0; i < ctx->threadCapacity; ++i) { + ZSTD_pthread_join(ctx->threads[i], NULL); /* note : could fail */ } } } void POOL_free(POOL_ctx *ctx) { if (!ctx) { return; } POOL_join(ctx); - pthread_mutex_destroy(&ctx->queueMutex); - pthread_cond_destroy(&ctx->queuePushCond); - pthread_cond_destroy(&ctx->queuePopCond); + ZSTD_pthread_mutex_destroy(&ctx->queueMutex); + ZSTD_pthread_cond_destroy(&ctx->queuePushCond); + ZSTD_pthread_cond_destroy(&ctx->queuePopCond); ZSTD_free(ctx->queue, ctx->customMem); ZSTD_free(ctx->threads, ctx->customMem); ZSTD_free(ctx, ctx->customMem); } + + size_t POOL_sizeof(POOL_ctx *ctx) { if (ctx==NULL) return 0; /* supports sizeof NULL */ return sizeof(*ctx) + ctx->queueSize * sizeof(POOL_job) - + ctx->numThreads * sizeof(pthread_t); + + ctx->threadCapacity * sizeof(ZSTD_pthread_t); +} + + +/* @return : 0 on success, 1 on error */ +static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads) +{ + if (numThreads <= ctx->threadCapacity) { + if (!numThreads) return 1; + ctx->threadLimit = numThreads; + return 0; + } + /* numThreads > threadCapacity */ + { ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem); + if (!threadPool) return 1; + /* replace existing thread pool */ + memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool)); + ZSTD_free(ctx->threads, ctx->customMem); + ctx->threads = threadPool; + /* Initialize additional threads */ + { size_t threadId; + for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) { + if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) { + ctx->threadCapacity = threadId; + return 1; + } } + } } + /* successfully expanded */ + ctx->threadCapacity = numThreads; + ctx->threadLimit = numThreads; + return 0; +} + +/* @return : 0 on success, 1 on error */ +int POOL_resize(POOL_ctx* ctx, size_t numThreads) +{ + int result; + if (ctx==NULL) return 1; + ZSTD_pthread_mutex_lock(&ctx->queueMutex); + result = POOL_resize_internal(ctx, numThreads); + ZSTD_pthread_cond_broadcast(&ctx->queuePopCond); + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + return result; } /** * Returns 1 if the queue is full and 0 otherwise. * - * If the queueSize is 1 (the pool was created with an intended queueSize of 0), - * then a queue is empty if there is a thread free and no job is waiting. + * When queueSize is 1 (pool was created with an intended queueSize of 0), + * then a queue is empty if there is a thread free _and_ no job is waiting. */ static int isQueueFull(POOL_ctx const* ctx) { if (ctx->queueSize > 1) { return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize); } else { - return ctx->numThreadsBusy == ctx->numThreads || + return (ctx->numThreadsBusy == ctx->threadLimit) || !ctx->queueEmpty; } } -void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { - POOL_ctx* const ctx = (POOL_ctx*)ctxVoid; - if (!ctx) { return; } - pthread_mutex_lock(&ctx->queueMutex); - { POOL_job const job = {function, opaque}; +static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque) +{ + POOL_job const job = {function, opaque}; + assert(ctx != NULL); + if (ctx->shutdown) return; - /* Wait until there is space in the queue for the new job */ - while (isQueueFull(ctx) && !ctx->shutdown) { - pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); - } - /* The queue is still going => there is space */ - if (!ctx->shutdown) { - ctx->queueEmpty = 0; - ctx->queue[ctx->queueTail] = job; - ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize; - } + ctx->queueEmpty = 0; + ctx->queue[ctx->queueTail] = job; + ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize; + ZSTD_pthread_cond_signal(&ctx->queuePopCond); +} + +void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) +{ + assert(ctx != NULL); + ZSTD_pthread_mutex_lock(&ctx->queueMutex); + /* Wait until there is space in the queue for the new job */ + while (isQueueFull(ctx) && (!ctx->shutdown)) { + ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); } - pthread_mutex_unlock(&ctx->queueMutex); - pthread_cond_signal(&ctx->queuePopCond); + POOL_add_internal(ctx, function, opaque); + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); } + +int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) +{ + assert(ctx != NULL); + ZSTD_pthread_mutex_lock(&ctx->queueMutex); + if (isQueueFull(ctx)) { + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + return 0; + } + POOL_add_internal(ctx, function, opaque); + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + return 1; +} + + #else /* ZSTD_MULTITHREAD not defined */ + +/* ========================== */ /* No multi-threading support */ +/* ========================== */ + -/* We don't need any data, but if it is empty malloc() might return NULL. */ +/* We don't need any data, but if it is empty, malloc() might return NULL. */ struct POOL_ctx_s { int dummy; }; @@ -241,9 +319,20 @@ void POOL_free(POOL_ctx* ctx) { (void)ctx; } -void POOL_add(void* ctx, POOL_function function, void* opaque) { +int POOL_resize(POOL_ctx* ctx, size_t numThreads) { + (void)ctx; (void)numThreads; + return 0; +} + +void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) { + (void)ctx; + function(opaque); +} + +int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) { (void)ctx; function(opaque); + return 1; } size_t POOL_sizeof(POOL_ctx* ctx) { |