summaryrefslogtreecommitdiffstats
path: root/contrib/zstd/pool.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/zstd/pool.c')
-rw-r--r--contrib/zstd/pool.c81
1 files changed, 54 insertions, 27 deletions
diff --git a/contrib/zstd/pool.c b/contrib/zstd/pool.c
index aa4b4de0d..f3d9d0854 100644
--- a/contrib/zstd/pool.c
+++ b/contrib/zstd/pool.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2020, Yann Collet, Facebook, Inc.
+ * Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
@@ -10,9 +10,9 @@
/* ====== Dependencies ======= */
-#include <stddef.h> /* size_t */
+#include "zstd_deps.h" /* size_t */
#include "debug.h" /* assert */
-#include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */
+#include "zstd_internal.h" /* ZSTD_customCalloc, ZSTD_customFree */
#include "pool.h"
/* ====== Compiler specifics ====== */
@@ -86,7 +86,7 @@ static void* POOL_thread(void* opaque) {
{ POOL_job const job = ctx->queue[ctx->queueHead];
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
ctx->numThreadsBusy++;
- ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
+ ctx->queueEmpty = (ctx->queueHead == ctx->queueTail);
/* Unlock the mutex, signal a pusher, and run the job */
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
@@ -96,33 +96,37 @@ static void* POOL_thread(void* opaque) {
/* If the intended queue size was 0, signal after finishing job */
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
ctx->numThreadsBusy--;
- if (ctx->queueSize == 1) {
- ZSTD_pthread_cond_signal(&ctx->queuePushCond);
- }
+ ZSTD_pthread_cond_signal(&ctx->queuePushCond);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
}
} /* for (;;) */
assert(0); /* Unreachable */
}
+/* ZSTD_createThreadPool() : public access point */
+POOL_ctx* ZSTD_createThreadPool(size_t numThreads) {
+ return POOL_create (numThreads, 0);
+}
+
POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
}
POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
- ZSTD_customMem customMem) {
+ ZSTD_customMem customMem)
+{
POOL_ctx* ctx;
/* Check parameters */
if (!numThreads) { return NULL; }
/* Allocate the context and zero initialize */
- ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem);
+ ctx = (POOL_ctx*)ZSTD_customCalloc(sizeof(POOL_ctx), customMem);
if (!ctx) { return NULL; }
/* Initialize the job queue.
* It needs one extra space since one space is wasted to differentiate
* empty and full queues.
*/
ctx->queueSize = queueSize + 1;
- ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem);
+ ctx->queue = (POOL_job*)ZSTD_customCalloc(ctx->queueSize * sizeof(POOL_job), customMem);
ctx->queueHead = 0;
ctx->queueTail = 0;
ctx->numThreadsBusy = 0;
@@ -136,7 +140,7 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
}
ctx->shutdown = 0;
/* Allocate space for the thread handles */
- ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
+ ctx->threads = (ZSTD_pthread_t*)ZSTD_customCalloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
ctx->threadCapacity = 0;
ctx->customMem = customMem;
/* Check for errors */
@@ -169,7 +173,7 @@ static void POOL_join(POOL_ctx* ctx) {
/* Join all of the threads */
{ size_t i;
for (i = 0; i < ctx->threadCapacity; ++i) {
- ZSTD_pthread_join(ctx->threads[i], NULL); /* note : could fail */
+ ZSTD_pthread_join(ctx->threads[i]); /* note : could fail */
} }
}
@@ -179,14 +183,27 @@ void POOL_free(POOL_ctx *ctx) {
ZSTD_pthread_mutex_destroy(&ctx->queueMutex);
ZSTD_pthread_cond_destroy(&ctx->queuePushCond);
ZSTD_pthread_cond_destroy(&ctx->queuePopCond);
- ZSTD_free(ctx->queue, ctx->customMem);
- ZSTD_free(ctx->threads, ctx->customMem);
- ZSTD_free(ctx, ctx->customMem);
+ ZSTD_customFree(ctx->queue, ctx->customMem);
+ ZSTD_customFree(ctx->threads, ctx->customMem);
+ ZSTD_customFree(ctx, ctx->customMem);
}
+/*! POOL_joinJobs() :
+ * Waits for all queued jobs to finish executing.
+ */
+void POOL_joinJobs(POOL_ctx* ctx) {
+ ZSTD_pthread_mutex_lock(&ctx->queueMutex);
+ while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) {
+ ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
+ }
+ ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+}
+void ZSTD_freeThreadPool (ZSTD_threadPool* pool) {
+ POOL_free (pool);
+}
-size_t POOL_sizeof(POOL_ctx *ctx) {
+size_t POOL_sizeof(const POOL_ctx* ctx) {
if (ctx==NULL) return 0; /* supports sizeof NULL */
return sizeof(*ctx)
+ ctx->queueSize * sizeof(POOL_job)
@@ -203,11 +220,11 @@ static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
return 0;
}
/* numThreads > threadCapacity */
- { ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem);
+ { ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_customCalloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem);
if (!threadPool) return 1;
/* replace existing thread pool */
- memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool));
- ZSTD_free(ctx->threads, ctx->customMem);
+ ZSTD_memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool));
+ ZSTD_customFree(ctx->threads, ctx->customMem);
ctx->threads = threadPool;
/* Initialize additional threads */
{ size_t threadId;
@@ -251,9 +268,12 @@ static int isQueueFull(POOL_ctx const* ctx) {
}
-static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
+static void
+POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
{
- POOL_job const job = {function, opaque};
+ POOL_job job;
+ job.function = function;
+ job.opaque = opaque;
assert(ctx != NULL);
if (ctx->shutdown) return;
@@ -301,21 +321,28 @@ int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
struct POOL_ctx_s {
int dummy;
};
-static POOL_ctx g_ctx;
+static POOL_ctx g_poolCtx;
POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
}
-POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {
+POOL_ctx*
+POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem)
+{
(void)numThreads;
(void)queueSize;
(void)customMem;
- return &g_ctx;
+ return &g_poolCtx;
}
void POOL_free(POOL_ctx* ctx) {
- assert(!ctx || ctx == &g_ctx);
+ assert(!ctx || ctx == &g_poolCtx);
+ (void)ctx;
+}
+
+void POOL_joinJobs(POOL_ctx* ctx){
+ assert(!ctx || ctx == &g_poolCtx);
(void)ctx;
}
@@ -335,9 +362,9 @@ int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {
return 1;
}
-size_t POOL_sizeof(POOL_ctx* ctx) {
+size_t POOL_sizeof(const POOL_ctx* ctx) {
if (ctx==NULL) return 0; /* supports sizeof NULL */
- assert(ctx == &g_ctx);
+ assert(ctx == &g_poolCtx);
return sizeof(*ctx);
}