diff options
Diffstat (limited to 'contrib/zstd/pool.c')
-rw-r--r-- | contrib/zstd/pool.c | 81 |
1 files changed, 54 insertions, 27 deletions
diff --git a/contrib/zstd/pool.c b/contrib/zstd/pool.c index aa4b4de0d..f3d9d0854 100644 --- a/contrib/zstd/pool.c +++ b/contrib/zstd/pool.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2020, Yann Collet, Facebook, Inc. + * Copyright (c) Meta Platforms, Inc. and affiliates. * All rights reserved. * * This source code is licensed under both the BSD-style license (found in the @@ -10,9 +10,9 @@ /* ====== Dependencies ======= */ -#include <stddef.h> /* size_t */ +#include "zstd_deps.h" /* size_t */ #include "debug.h" /* assert */ -#include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */ +#include "zstd_internal.h" /* ZSTD_customCalloc, ZSTD_customFree */ #include "pool.h" /* ====== Compiler specifics ====== */ @@ -86,7 +86,7 @@ static void* POOL_thread(void* opaque) { { POOL_job const job = ctx->queue[ctx->queueHead]; ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; ctx->numThreadsBusy++; - ctx->queueEmpty = ctx->queueHead == ctx->queueTail; + ctx->queueEmpty = (ctx->queueHead == ctx->queueTail); /* Unlock the mutex, signal a pusher, and run the job */ ZSTD_pthread_cond_signal(&ctx->queuePushCond); ZSTD_pthread_mutex_unlock(&ctx->queueMutex); @@ -96,33 +96,37 @@ static void* POOL_thread(void* opaque) { /* If the intended queue size was 0, signal after finishing job */ ZSTD_pthread_mutex_lock(&ctx->queueMutex); ctx->numThreadsBusy--; - if (ctx->queueSize == 1) { - ZSTD_pthread_cond_signal(&ctx->queuePushCond); - } + ZSTD_pthread_cond_signal(&ctx->queuePushCond); ZSTD_pthread_mutex_unlock(&ctx->queueMutex); } } /* for (;;) */ assert(0); /* Unreachable */ } +/* ZSTD_createThreadPool() : public access point */ +POOL_ctx* ZSTD_createThreadPool(size_t numThreads) { + return POOL_create (numThreads, 0); +} + POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); } POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, - ZSTD_customMem customMem) { + ZSTD_customMem customMem) +{ POOL_ctx* ctx; /* Check parameters */ if (!numThreads) { return NULL; } /* Allocate the context and zero initialize */ - ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem); + ctx = (POOL_ctx*)ZSTD_customCalloc(sizeof(POOL_ctx), customMem); if (!ctx) { return NULL; } /* Initialize the job queue. * It needs one extra space since one space is wasted to differentiate * empty and full queues. */ ctx->queueSize = queueSize + 1; - ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem); + ctx->queue = (POOL_job*)ZSTD_customCalloc(ctx->queueSize * sizeof(POOL_job), customMem); ctx->queueHead = 0; ctx->queueTail = 0; ctx->numThreadsBusy = 0; @@ -136,7 +140,7 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, } ctx->shutdown = 0; /* Allocate space for the thread handles */ - ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem); + ctx->threads = (ZSTD_pthread_t*)ZSTD_customCalloc(numThreads * sizeof(ZSTD_pthread_t), customMem); ctx->threadCapacity = 0; ctx->customMem = customMem; /* Check for errors */ @@ -169,7 +173,7 @@ static void POOL_join(POOL_ctx* ctx) { /* Join all of the threads */ { size_t i; for (i = 0; i < ctx->threadCapacity; ++i) { - ZSTD_pthread_join(ctx->threads[i], NULL); /* note : could fail */ + ZSTD_pthread_join(ctx->threads[i]); /* note : could fail */ } } } @@ -179,14 +183,27 @@ void POOL_free(POOL_ctx *ctx) { ZSTD_pthread_mutex_destroy(&ctx->queueMutex); ZSTD_pthread_cond_destroy(&ctx->queuePushCond); ZSTD_pthread_cond_destroy(&ctx->queuePopCond); - ZSTD_free(ctx->queue, ctx->customMem); - ZSTD_free(ctx->threads, ctx->customMem); - ZSTD_free(ctx, ctx->customMem); + ZSTD_customFree(ctx->queue, ctx->customMem); + ZSTD_customFree(ctx->threads, ctx->customMem); + ZSTD_customFree(ctx, ctx->customMem); } +/*! POOL_joinJobs() : + * Waits for all queued jobs to finish executing. + */ +void POOL_joinJobs(POOL_ctx* ctx) { + ZSTD_pthread_mutex_lock(&ctx->queueMutex); + while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) { + ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); + } + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); +} +void ZSTD_freeThreadPool (ZSTD_threadPool* pool) { + POOL_free (pool); +} -size_t POOL_sizeof(POOL_ctx *ctx) { +size_t POOL_sizeof(const POOL_ctx* ctx) { if (ctx==NULL) return 0; /* supports sizeof NULL */ return sizeof(*ctx) + ctx->queueSize * sizeof(POOL_job) @@ -203,11 +220,11 @@ static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads) return 0; } /* numThreads > threadCapacity */ - { ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem); + { ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_customCalloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem); if (!threadPool) return 1; /* replace existing thread pool */ - memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool)); - ZSTD_free(ctx->threads, ctx->customMem); + ZSTD_memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool)); + ZSTD_customFree(ctx->threads, ctx->customMem); ctx->threads = threadPool; /* Initialize additional threads */ { size_t threadId; @@ -251,9 +268,12 @@ static int isQueueFull(POOL_ctx const* ctx) { } -static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque) +static void +POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque) { - POOL_job const job = {function, opaque}; + POOL_job job; + job.function = function; + job.opaque = opaque; assert(ctx != NULL); if (ctx->shutdown) return; @@ -301,21 +321,28 @@ int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) struct POOL_ctx_s { int dummy; }; -static POOL_ctx g_ctx; +static POOL_ctx g_poolCtx; POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); } -POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) { +POOL_ctx* +POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) +{ (void)numThreads; (void)queueSize; (void)customMem; - return &g_ctx; + return &g_poolCtx; } void POOL_free(POOL_ctx* ctx) { - assert(!ctx || ctx == &g_ctx); + assert(!ctx || ctx == &g_poolCtx); + (void)ctx; +} + +void POOL_joinJobs(POOL_ctx* ctx){ + assert(!ctx || ctx == &g_poolCtx); (void)ctx; } @@ -335,9 +362,9 @@ int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) { return 1; } -size_t POOL_sizeof(POOL_ctx* ctx) { +size_t POOL_sizeof(const POOL_ctx* ctx) { if (ctx==NULL) return 0; /* supports sizeof NULL */ - assert(ctx == &g_ctx); + assert(ctx == &g_poolCtx); return sizeof(*ctx); } |