Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

zstdmt_compress.c 45KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089
  1. /*
  2. * Copyright (c) 2016-present, Yann Collet, Facebook, Inc.
  3. * All rights reserved.
  4. *
  5. * This source code is licensed under both the BSD-style license (found in the
  6. * LICENSE file in the root directory of this source tree) and the GPLv2 (found
  7. * in the COPYING file in the root directory of this source tree).
  8. * You may select, at your option, one of the above-listed licenses.
  9. */
  10. /* ====== Tuning parameters ====== */
  11. #define ZSTDMT_NBTHREADS_MAX 256
  12. #define ZSTDMT_OVERLAPLOG_DEFAULT 6
  13. /* ====== Compiler specifics ====== */
  14. #if defined(_MSC_VER)
  15. # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
  16. #endif
  17. /* ====== Dependencies ====== */
  18. #include <string.h> /* memcpy, memset */
  19. #include "pool.h" /* threadpool */
  20. #include "threading.h" /* mutex */
  21. #include "zstd_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
  22. #include "zstdmt_compress.h"
  23. /* ====== Debug ====== */
  24. #if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2)
  25. # include <stdio.h>
  26. # include <unistd.h>
  27. # include <sys/times.h>
  28. # define DEBUGLOGRAW(l, ...) if (l<=ZSTD_DEBUG) { fprintf(stderr, __VA_ARGS__); }
  29. # define DEBUG_PRINTHEX(l,p,n) { \
  30. unsigned debug_u; \
  31. for (debug_u=0; debug_u<(n); debug_u++) \
  32. DEBUGLOGRAW(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \
  33. DEBUGLOGRAW(l, " \n"); \
  34. }
  35. static unsigned long long GetCurrentClockTimeMicroseconds(void)
  36. {
  37. static clock_t _ticksPerSecond = 0;
  38. if (_ticksPerSecond <= 0) _ticksPerSecond = sysconf(_SC_CLK_TCK);
  39. { struct tms junk; clock_t newTicks = (clock_t) times(&junk);
  40. return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond); }
  41. }
  42. #define MUTEX_WAIT_TIME_DLEVEL 6
  43. #define PTHREAD_MUTEX_LOCK(mutex) { \
  44. if (ZSTD_DEBUG>=MUTEX_WAIT_TIME_DLEVEL) { \
  45. unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \
  46. pthread_mutex_lock(mutex); \
  47. { unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \
  48. unsigned long long const elapsedTime = (afterTime-beforeTime); \
  49. if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \
  50. DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread took %llu microseconds to acquire mutex %s \n", \
  51. elapsedTime, #mutex); \
  52. } } \
  53. } else pthread_mutex_lock(mutex); \
  54. }
  55. #else
  56. # define PTHREAD_MUTEX_LOCK(m) pthread_mutex_lock(m)
  57. # define DEBUG_PRINTHEX(l,p,n) {}
  58. #endif
  59. /* ===== Buffer Pool ===== */
  60. /* a single Buffer Pool can be invoked from multiple threads in parallel */
  61. typedef struct buffer_s {
  62. void* start;
  63. size_t size;
  64. } buffer_t;
  65. static const buffer_t g_nullBuffer = { NULL, 0 };
  66. typedef struct ZSTDMT_bufferPool_s {
  67. pthread_mutex_t poolMutex;
  68. size_t bufferSize;
  69. unsigned totalBuffers;
  70. unsigned nbBuffers;
  71. ZSTD_customMem cMem;
  72. buffer_t bTable[1]; /* variable size */
  73. } ZSTDMT_bufferPool;
  74. static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads, ZSTD_customMem cMem)
  75. {
  76. unsigned const maxNbBuffers = 2*nbThreads + 3;
  77. ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_calloc(
  78. sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
  79. if (bufPool==NULL) return NULL;
  80. if (pthread_mutex_init(&bufPool->poolMutex, NULL)) {
  81. ZSTD_free(bufPool, cMem);
  82. return NULL;
  83. }
  84. bufPool->bufferSize = 64 KB;
  85. bufPool->totalBuffers = maxNbBuffers;
  86. bufPool->nbBuffers = 0;
  87. bufPool->cMem = cMem;
  88. return bufPool;
  89. }
  90. static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool)
  91. {
  92. unsigned u;
  93. if (!bufPool) return; /* compatibility with free on NULL */
  94. for (u=0; u<bufPool->totalBuffers; u++)
  95. ZSTD_free(bufPool->bTable[u].start, bufPool->cMem);
  96. pthread_mutex_destroy(&bufPool->poolMutex);
  97. ZSTD_free(bufPool, bufPool->cMem);
  98. }
  99. /* only works at initialization, not during compression */
  100. static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
  101. {
  102. size_t const poolSize = sizeof(*bufPool)
  103. + (bufPool->totalBuffers - 1) * sizeof(buffer_t);
  104. unsigned u;
  105. size_t totalBufferSize = 0;
  106. pthread_mutex_lock(&bufPool->poolMutex);
  107. for (u=0; u<bufPool->totalBuffers; u++)
  108. totalBufferSize += bufPool->bTable[u].size;
  109. pthread_mutex_unlock(&bufPool->poolMutex);
  110. return poolSize + totalBufferSize;
  111. }
  112. static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* bufPool, size_t bSize)
  113. {
  114. bufPool->bufferSize = bSize;
  115. }
  116. /** ZSTDMT_getBuffer() :
  117. * assumption : bufPool must be valid */
  118. static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
  119. {
  120. size_t const bSize = bufPool->bufferSize;
  121. DEBUGLOG(5, "ZSTDMT_getBuffer");
  122. pthread_mutex_lock(&bufPool->poolMutex);
  123. if (bufPool->nbBuffers) { /* try to use an existing buffer */
  124. buffer_t const buf = bufPool->bTable[--(bufPool->nbBuffers)];
  125. size_t const availBufferSize = buf.size;
  126. if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) {
  127. /* large enough, but not too much */
  128. pthread_mutex_unlock(&bufPool->poolMutex);
  129. return buf;
  130. }
  131. /* size conditions not respected : scratch this buffer, create new one */
  132. DEBUGLOG(5, "existing buffer does not meet size conditions => freeing");
  133. ZSTD_free(buf.start, bufPool->cMem);
  134. }
  135. pthread_mutex_unlock(&bufPool->poolMutex);
  136. /* create new buffer */
  137. DEBUGLOG(5, "create a new buffer");
  138. { buffer_t buffer;
  139. void* const start = ZSTD_malloc(bSize, bufPool->cMem);
  140. buffer.start = start; /* note : start can be NULL if malloc fails ! */
  141. buffer.size = (start==NULL) ? 0 : bSize;
  142. return buffer;
  143. }
  144. }
  145. /* store buffer for later re-use, up to pool capacity */
  146. static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
  147. {
  148. if (buf.start == NULL) return; /* compatible with release on NULL */
  149. DEBUGLOG(5, "ZSTDMT_releaseBuffer");
  150. pthread_mutex_lock(&bufPool->poolMutex);
  151. if (bufPool->nbBuffers < bufPool->totalBuffers) {
  152. bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */
  153. pthread_mutex_unlock(&bufPool->poolMutex);
  154. return;
  155. }
  156. pthread_mutex_unlock(&bufPool->poolMutex);
  157. /* Reached bufferPool capacity (should not happen) */
  158. DEBUGLOG(5, "buffer pool capacity reached => freeing ");
  159. ZSTD_free(buf.start, bufPool->cMem);
  160. }
  161. /* Sets parameters relevant to the compression job, initializing others to
  162. * default values. Notably, nbThreads should probably be zero. */
  163. static ZSTD_CCtx_params ZSTDMT_makeJobCCtxParams(ZSTD_CCtx_params const params)
  164. {
  165. ZSTD_CCtx_params jobParams;
  166. memset(&jobParams, 0, sizeof(jobParams));
  167. jobParams.cParams = params.cParams;
  168. jobParams.fParams = params.fParams;
  169. jobParams.compressionLevel = params.compressionLevel;
  170. jobParams.ldmParams = params.ldmParams;
  171. return jobParams;
  172. }
  173. /* ===== CCtx Pool ===== */
  174. /* a single CCtx Pool can be invoked from multiple threads in parallel */
  175. typedef struct {
  176. pthread_mutex_t poolMutex;
  177. unsigned totalCCtx;
  178. unsigned availCCtx;
  179. ZSTD_customMem cMem;
  180. ZSTD_CCtx* cctx[1]; /* variable size */
  181. } ZSTDMT_CCtxPool;
  182. /* note : all CCtx borrowed from the pool should be released back to the pool _before_ freeing the pool */
  183. static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool)
  184. {
  185. unsigned u;
  186. for (u=0; u<pool->totalCCtx; u++)
  187. ZSTD_freeCCtx(pool->cctx[u]); /* note : compatible with free on NULL */
  188. pthread_mutex_destroy(&pool->poolMutex);
  189. ZSTD_free(pool, pool->cMem);
  190. }
  191. /* ZSTDMT_createCCtxPool() :
  192. * implies nbThreads >= 1 , checked by caller ZSTDMT_createCCtx() */
  193. static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads,
  194. ZSTD_customMem cMem)
  195. {
  196. ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_calloc(
  197. sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*), cMem);
  198. if (!cctxPool) return NULL;
  199. if (pthread_mutex_init(&cctxPool->poolMutex, NULL)) {
  200. ZSTD_free(cctxPool, cMem);
  201. return NULL;
  202. }
  203. cctxPool->cMem = cMem;
  204. cctxPool->totalCCtx = nbThreads;
  205. cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */
  206. cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem);
  207. if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; }
  208. DEBUGLOG(3, "cctxPool created, with %u threads", nbThreads);
  209. return cctxPool;
  210. }
  211. /* only works during initialization phase, not during compression */
  212. static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
  213. {
  214. pthread_mutex_lock(&cctxPool->poolMutex);
  215. { unsigned const nbThreads = cctxPool->totalCCtx;
  216. size_t const poolSize = sizeof(*cctxPool)
  217. + (nbThreads-1)*sizeof(ZSTD_CCtx*);
  218. unsigned u;
  219. size_t totalCCtxSize = 0;
  220. for (u=0; u<nbThreads; u++) {
  221. totalCCtxSize += ZSTD_sizeof_CCtx(cctxPool->cctx[u]);
  222. }
  223. pthread_mutex_unlock(&cctxPool->poolMutex);
  224. return poolSize + totalCCtxSize;
  225. }
  226. }
  227. static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool)
  228. {
  229. DEBUGLOG(5, "ZSTDMT_getCCtx");
  230. pthread_mutex_lock(&cctxPool->poolMutex);
  231. if (cctxPool->availCCtx) {
  232. cctxPool->availCCtx--;
  233. { ZSTD_CCtx* const cctx = cctxPool->cctx[cctxPool->availCCtx];
  234. pthread_mutex_unlock(&cctxPool->poolMutex);
  235. return cctx;
  236. } }
  237. pthread_mutex_unlock(&cctxPool->poolMutex);
  238. DEBUGLOG(5, "create one more CCtx");
  239. return ZSTD_createCCtx_advanced(cctxPool->cMem); /* note : can be NULL, when creation fails ! */
  240. }
  241. static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
  242. {
  243. if (cctx==NULL) return; /* compatibility with release on NULL */
  244. pthread_mutex_lock(&pool->poolMutex);
  245. if (pool->availCCtx < pool->totalCCtx)
  246. pool->cctx[pool->availCCtx++] = cctx;
  247. else {
  248. /* pool overflow : should not happen, since totalCCtx==nbThreads */
  249. DEBUGLOG(5, "CCtx pool overflow : free cctx");
  250. ZSTD_freeCCtx(cctx);
  251. }
  252. pthread_mutex_unlock(&pool->poolMutex);
  253. }
  254. /* ===== Thread worker ===== */
  255. typedef struct {
  256. buffer_t src;
  257. const void* srcStart;
  258. size_t dictSize;
  259. size_t srcSize;
  260. buffer_t dstBuff;
  261. size_t cSize;
  262. size_t dstFlushed;
  263. unsigned firstChunk;
  264. unsigned lastChunk;
  265. unsigned jobCompleted;
  266. unsigned jobScanned;
  267. pthread_mutex_t* jobCompleted_mutex;
  268. pthread_cond_t* jobCompleted_cond;
  269. ZSTD_CCtx_params params;
  270. const ZSTD_CDict* cdict;
  271. ZSTDMT_CCtxPool* cctxPool;
  272. ZSTDMT_bufferPool* bufPool;
  273. unsigned long long fullFrameSize;
  274. } ZSTDMT_jobDescription;
  275. /* ZSTDMT_compressChunk() : POOL_function type */
  276. void ZSTDMT_compressChunk(void* jobDescription)
  277. {
  278. ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
  279. ZSTD_CCtx* cctx = ZSTDMT_getCCtx(job->cctxPool);
  280. const void* const src = (const char*)job->srcStart + job->dictSize;
  281. buffer_t dstBuff = job->dstBuff;
  282. DEBUGLOG(5, "job (first:%u) (last:%u) : dictSize %u, srcSize %u",
  283. job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize);
  284. if (cctx==NULL) {
  285. job->cSize = ERROR(memory_allocation);
  286. goto _endJob;
  287. }
  288. if (dstBuff.start == NULL) {
  289. dstBuff = ZSTDMT_getBuffer(job->bufPool);
  290. if (dstBuff.start==NULL) {
  291. job->cSize = ERROR(memory_allocation);
  292. goto _endJob;
  293. }
  294. job->dstBuff = dstBuff;
  295. }
  296. if (job->cdict) { /* should only happen for first segment */
  297. size_t const initError = ZSTD_compressBegin_usingCDict_advanced(cctx, job->cdict, job->params.fParams, job->fullFrameSize);
  298. DEBUGLOG(5, "using CDict");
  299. if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
  300. } else { /* srcStart points at reloaded section */
  301. if (!job->firstChunk) job->params.fParams.contentSizeFlag = 0; /* ensure no srcSize control */
  302. { ZSTD_CCtx_params jobParams = job->params;
  303. size_t const forceWindowError =
  304. ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk);
  305. /* Force loading dictionary in "content-only" mode (no header analysis) */
  306. size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, job->srcStart, job->dictSize, ZSTD_dm_rawContent, jobParams, job->fullFrameSize);
  307. if (ZSTD_isError(initError) || ZSTD_isError(forceWindowError)) {
  308. job->cSize = initError;
  309. goto _endJob;
  310. }
  311. } }
  312. if (!job->firstChunk) { /* flush and overwrite frame header when it's not first segment */
  313. size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, 0);
  314. if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; }
  315. ZSTD_invalidateRepCodes(cctx);
  316. }
  317. DEBUGLOG(5, "Compressing : ");
  318. DEBUG_PRINTHEX(4, job->srcStart, 12);
  319. job->cSize = (job->lastChunk) ?
  320. ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
  321. ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
  322. DEBUGLOG(5, "compressed %u bytes into %u bytes (first:%u) (last:%u)",
  323. (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
  324. DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize));
  325. _endJob:
  326. ZSTDMT_releaseCCtx(job->cctxPool, cctx);
  327. ZSTDMT_releaseBuffer(job->bufPool, job->src);
  328. job->src = g_nullBuffer; job->srcStart = NULL;
  329. PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
  330. job->jobCompleted = 1;
  331. job->jobScanned = 0;
  332. pthread_cond_signal(job->jobCompleted_cond);
  333. pthread_mutex_unlock(job->jobCompleted_mutex);
  334. }
  335. /* ------------------------------------------ */
  336. /* ===== Multi-threaded compression ===== */
  337. /* ------------------------------------------ */
  338. typedef struct {
  339. buffer_t buffer;
  340. size_t filled;
  341. } inBuff_t;
  342. struct ZSTDMT_CCtx_s {
  343. POOL_ctx* factory;
  344. ZSTDMT_jobDescription* jobs;
  345. ZSTDMT_bufferPool* bufPool;
  346. ZSTDMT_CCtxPool* cctxPool;
  347. pthread_mutex_t jobCompleted_mutex;
  348. pthread_cond_t jobCompleted_cond;
  349. size_t targetSectionSize;
  350. size_t inBuffSize;
  351. size_t dictSize;
  352. size_t targetDictSize;
  353. inBuff_t inBuff;
  354. ZSTD_CCtx_params params;
  355. XXH64_state_t xxhState;
  356. unsigned jobIDMask;
  357. unsigned doneJobID;
  358. unsigned nextJobID;
  359. unsigned frameEnded;
  360. unsigned allJobsCompleted;
  361. unsigned long long frameContentSize;
  362. ZSTD_customMem cMem;
  363. ZSTD_CDict* cdictLocal;
  364. const ZSTD_CDict* cdict;
  365. };
  366. static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
  367. {
  368. U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1;
  369. U32 const nbJobs = 1 << nbJobsLog2;
  370. *nbJobsPtr = nbJobs;
  371. return (ZSTDMT_jobDescription*) ZSTD_calloc(
  372. nbJobs * sizeof(ZSTDMT_jobDescription), cMem);
  373. }
  374. /* Internal only */
  375. size_t ZSTDMT_initializeCCtxParameters(ZSTD_CCtx_params* params, unsigned nbThreads)
  376. {
  377. params->nbThreads = nbThreads;
  378. params->overlapSizeLog = ZSTDMT_OVERLAPLOG_DEFAULT;
  379. params->jobSize = 0;
  380. return 0;
  381. }
  382. ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
  383. {
  384. ZSTDMT_CCtx* mtctx;
  385. U32 nbJobs = nbThreads + 2;
  386. DEBUGLOG(3, "ZSTDMT_createCCtx_advanced");
  387. if (nbThreads < 1) return NULL;
  388. nbThreads = MIN(nbThreads , ZSTDMT_NBTHREADS_MAX);
  389. if ((cMem.customAlloc!=NULL) ^ (cMem.customFree!=NULL))
  390. /* invalid custom allocator */
  391. return NULL;
  392. mtctx = (ZSTDMT_CCtx*) ZSTD_calloc(sizeof(ZSTDMT_CCtx), cMem);
  393. if (!mtctx) return NULL;
  394. ZSTDMT_initializeCCtxParameters(&mtctx->params, nbThreads);
  395. mtctx->cMem = cMem;
  396. mtctx->allJobsCompleted = 1;
  397. mtctx->factory = POOL_create_advanced(nbThreads, 0, cMem);
  398. mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem);
  399. mtctx->jobIDMask = nbJobs - 1;
  400. mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem);
  401. mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem);
  402. if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool) {
  403. ZSTDMT_freeCCtx(mtctx);
  404. return NULL;
  405. }
  406. if (pthread_mutex_init(&mtctx->jobCompleted_mutex, NULL)) {
  407. ZSTDMT_freeCCtx(mtctx);
  408. return NULL;
  409. }
  410. if (pthread_cond_init(&mtctx->jobCompleted_cond, NULL)) {
  411. ZSTDMT_freeCCtx(mtctx);
  412. return NULL;
  413. }
  414. DEBUGLOG(3, "mt_cctx created, for %u threads", nbThreads);
  415. return mtctx;
  416. }
  417. ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads)
  418. {
  419. return ZSTDMT_createCCtx_advanced(nbThreads, ZSTD_defaultCMem);
  420. }
  421. /* ZSTDMT_releaseAllJobResources() :
  422. * note : ensure all workers are killed first ! */
  423. static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
  424. {
  425. unsigned jobID;
  426. DEBUGLOG(3, "ZSTDMT_releaseAllJobResources");
  427. for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) {
  428. ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);
  429. mtctx->jobs[jobID].dstBuff = g_nullBuffer;
  430. ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].src);
  431. mtctx->jobs[jobID].src = g_nullBuffer;
  432. }
  433. memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription));
  434. ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->inBuff.buffer);
  435. mtctx->inBuff.buffer = g_nullBuffer;
  436. mtctx->allJobsCompleted = 1;
  437. }
  438. size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
  439. {
  440. if (mtctx==NULL) return 0; /* compatible with free on NULL */
  441. POOL_free(mtctx->factory);
  442. if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */
  443. ZSTDMT_freeBufferPool(mtctx->bufPool); /* release job resources into pools first */
  444. ZSTD_free(mtctx->jobs, mtctx->cMem);
  445. ZSTDMT_freeCCtxPool(mtctx->cctxPool);
  446. ZSTD_freeCDict(mtctx->cdictLocal);
  447. pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
  448. pthread_cond_destroy(&mtctx->jobCompleted_cond);
  449. ZSTD_free(mtctx, mtctx->cMem);
  450. return 0;
  451. }
  452. size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx)
  453. {
  454. if (mtctx == NULL) return 0; /* supports sizeof NULL */
  455. return sizeof(*mtctx)
  456. + POOL_sizeof(mtctx->factory)
  457. + ZSTDMT_sizeof_bufferPool(mtctx->bufPool)
  458. + (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription)
  459. + ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool)
  460. + ZSTD_sizeof_CDict(mtctx->cdictLocal);
  461. }
  462. /* Internal only */
  463. size_t ZSTDMT_CCtxParam_setMTCtxParameter(
  464. ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value) {
  465. switch(parameter)
  466. {
  467. case ZSTDMT_p_sectionSize :
  468. params->jobSize = value;
  469. return 0;
  470. case ZSTDMT_p_overlapSectionLog :
  471. DEBUGLOG(4, "ZSTDMT_p_overlapSectionLog : %u", value);
  472. params->overlapSizeLog = (value >= 9) ? 9 : value;
  473. return 0;
  474. default :
  475. return ERROR(parameter_unsupported);
  476. }
  477. }
  478. size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned value)
  479. {
  480. switch(parameter)
  481. {
  482. case ZSTDMT_p_sectionSize :
  483. return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value);
  484. case ZSTDMT_p_overlapSectionLog :
  485. return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value);
  486. default :
  487. return ERROR(parameter_unsupported);
  488. }
  489. }
  490. /* ------------------------------------------ */
  491. /* ===== Multi-threaded compression ===== */
  492. /* ------------------------------------------ */
  493. static unsigned computeNbChunks(size_t srcSize, unsigned windowLog, unsigned nbThreads) {
  494. size_t const chunkSizeTarget = (size_t)1 << (windowLog + 2);
  495. size_t const chunkMaxSize = chunkSizeTarget << 2;
  496. size_t const passSizeMax = chunkMaxSize * nbThreads;
  497. unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1;
  498. unsigned const nbChunksLarge = multiplier * nbThreads;
  499. unsigned const nbChunksMax = (unsigned)(srcSize / chunkSizeTarget) + 1;
  500. unsigned const nbChunksSmall = MIN(nbChunksMax, nbThreads);
  501. return (multiplier>1) ? nbChunksLarge : nbChunksSmall;
  502. }
  503. static size_t ZSTDMT_compress_advanced_internal(
  504. ZSTDMT_CCtx* mtctx,
  505. void* dst, size_t dstCapacity,
  506. const void* src, size_t srcSize,
  507. const ZSTD_CDict* cdict,
  508. ZSTD_CCtx_params const params)
  509. {
  510. ZSTD_CCtx_params const jobParams = ZSTDMT_makeJobCCtxParams(params);
  511. unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog;
  512. size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog);
  513. unsigned nbChunks = computeNbChunks(srcSize, params.cParams.windowLog, params.nbThreads);
  514. size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks;
  515. size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */
  516. const char* const srcStart = (const char*)src;
  517. size_t remainingSrcSize = srcSize;
  518. unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */
  519. size_t frameStartPos = 0, dstBufferPos = 0;
  520. XXH64_state_t xxh64;
  521. assert(jobParams.nbThreads == 0);
  522. assert(mtctx->cctxPool->totalCCtx == params.nbThreads);
  523. DEBUGLOG(4, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize);
  524. if (nbChunks==1) { /* fallback to single-thread mode */
  525. ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
  526. if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams);
  527. return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams);
  528. }
  529. assert(avgChunkSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), which is required for compressWithinDst */
  530. ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgChunkSize) );
  531. XXH64_reset(&xxh64, 0);
  532. if (nbChunks > mtctx->jobIDMask+1) { /* enlarge job table */
  533. U32 nbJobs = nbChunks;
  534. ZSTD_free(mtctx->jobs, mtctx->cMem);
  535. mtctx->jobIDMask = 0;
  536. mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, mtctx->cMem);
  537. if (mtctx->jobs==NULL) return ERROR(memory_allocation);
  538. mtctx->jobIDMask = nbJobs - 1;
  539. }
  540. { unsigned u;
  541. for (u=0; u<nbChunks; u++) {
  542. size_t const chunkSize = MIN(remainingSrcSize, avgChunkSize);
  543. size_t const dstBufferCapacity = ZSTD_compressBound(chunkSize);
  544. buffer_t const dstAsBuffer = { (char*)dst + dstBufferPos, dstBufferCapacity };
  545. buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer;
  546. size_t dictSize = u ? overlapSize : 0;
  547. mtctx->jobs[u].src = g_nullBuffer;
  548. mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
  549. mtctx->jobs[u].dictSize = dictSize;
  550. mtctx->jobs[u].srcSize = chunkSize;
  551. mtctx->jobs[u].cdict = mtctx->nextJobID==0 ? cdict : NULL;
  552. mtctx->jobs[u].fullFrameSize = srcSize;
  553. mtctx->jobs[u].params = jobParams;
  554. /* do not calculate checksum within sections, but write it in header for first section */
  555. if (u!=0) mtctx->jobs[u].params.fParams.checksumFlag = 0;
  556. mtctx->jobs[u].dstBuff = dstBuffer;
  557. mtctx->jobs[u].cctxPool = mtctx->cctxPool;
  558. mtctx->jobs[u].bufPool = mtctx->bufPool;
  559. mtctx->jobs[u].firstChunk = (u==0);
  560. mtctx->jobs[u].lastChunk = (u==nbChunks-1);
  561. mtctx->jobs[u].jobCompleted = 0;
  562. mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex;
  563. mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond;
  564. if (params.fParams.checksumFlag) {
  565. XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize);
  566. }
  567. DEBUGLOG(5, "posting job %u (%u bytes)", u, (U32)chunkSize);
  568. DEBUG_PRINTHEX(6, mtctx->jobs[u].srcStart, 12);
  569. POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]);
  570. frameStartPos += chunkSize;
  571. dstBufferPos += dstBufferCapacity;
  572. remainingSrcSize -= chunkSize;
  573. } }
  574. /* collect result */
  575. { size_t error = 0, dstPos = 0;
  576. unsigned chunkID;
  577. for (chunkID=0; chunkID<nbChunks; chunkID++) {
  578. DEBUGLOG(5, "waiting for chunk %u ", chunkID);
  579. PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
  580. while (mtctx->jobs[chunkID].jobCompleted==0) {
  581. DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID);
  582. pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex);
  583. }
  584. pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
  585. DEBUGLOG(5, "ready to write chunk %u ", chunkID);
  586. mtctx->jobs[chunkID].srcStart = NULL;
  587. { size_t const cSize = mtctx->jobs[chunkID].cSize;
  588. if (ZSTD_isError(cSize)) error = cSize;
  589. if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall);
  590. if (chunkID) { /* note : chunk 0 is written directly at dst, which is correct position */
  591. if (!error)
  592. memmove((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); /* may overlap when chunk compressed within dst */
  593. if (chunkID >= compressWithinDst) { /* chunk compressed into its own buffer, which must be released */
  594. DEBUGLOG(5, "releasing buffer %u>=%u", chunkID, compressWithinDst);
  595. ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[chunkID].dstBuff);
  596. }
  597. mtctx->jobs[chunkID].dstBuff = g_nullBuffer;
  598. }
  599. dstPos += cSize ;
  600. }
  601. } /* for (chunkID=0; chunkID<nbChunks; chunkID++) */
  602. DEBUGLOG(4, "checksumFlag : %u ", params.fParams.checksumFlag);
  603. if (params.fParams.checksumFlag) {
  604. U32 const checksum = (U32)XXH64_digest(&xxh64);
  605. if (dstPos + 4 > dstCapacity) {
  606. error = ERROR(dstSize_tooSmall);
  607. } else {
  608. DEBUGLOG(4, "writing checksum : %08X \n", checksum);
  609. MEM_writeLE32((char*)dst + dstPos, checksum);
  610. dstPos += 4;
  611. } }
  612. if (!error) DEBUGLOG(4, "compressed size : %u ", (U32)dstPos);
  613. return error ? error : dstPos;
  614. }
  615. }
  616. size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
  617. void* dst, size_t dstCapacity,
  618. const void* src, size_t srcSize,
  619. const ZSTD_CDict* cdict,
  620. ZSTD_parameters const params,
  621. unsigned overlapLog)
  622. {
  623. ZSTD_CCtx_params cctxParams = mtctx->params;
  624. cctxParams.cParams = params.cParams;
  625. cctxParams.fParams = params.fParams;
  626. cctxParams.overlapSizeLog = overlapLog;
  627. return ZSTDMT_compress_advanced_internal(mtctx,
  628. dst, dstCapacity,
  629. src, srcSize,
  630. cdict, cctxParams);
  631. }
  632. size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
  633. void* dst, size_t dstCapacity,
  634. const void* src, size_t srcSize,
  635. int compressionLevel)
  636. {
  637. U32 const overlapLog = (compressionLevel >= ZSTD_maxCLevel()) ? 9 : ZSTDMT_OVERLAPLOG_DEFAULT;
  638. ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0);
  639. params.fParams.contentSizeFlag = 1;
  640. return ZSTDMT_compress_advanced(mtctx, dst, dstCapacity, src, srcSize, NULL, params, overlapLog);
  641. }
  642. /* ====================================== */
  643. /* ======= Streaming API ======= */
  644. /* ====================================== */
  645. static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs)
  646. {
  647. DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted");
  648. while (zcs->doneJobID < zcs->nextJobID) {
  649. unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
  650. PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
  651. while (zcs->jobs[jobID].jobCompleted==0) {
  652. DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */
  653. pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
  654. }
  655. pthread_mutex_unlock(&zcs->jobCompleted_mutex);
  656. zcs->doneJobID++;
  657. }
  658. }
  659. size_t ZSTDMT_initCStream_internal(
  660. ZSTDMT_CCtx* zcs,
  661. const void* dict, size_t dictSize, ZSTD_dictMode_e dictMode,
  662. const ZSTD_CDict* cdict, ZSTD_CCtx_params params,
  663. unsigned long long pledgedSrcSize)
  664. {
  665. DEBUGLOG(4, "ZSTDMT_initCStream_internal");
  666. /* params are supposed to be fully validated at this point */
  667. assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
  668. assert(!((dict) && (cdict))); /* either dict or cdict, not both */
  669. assert(zcs->cctxPool->totalCCtx == params.nbThreads);
  670. if (params.nbThreads==1) {
  671. ZSTD_CCtx_params const singleThreadParams = ZSTDMT_makeJobCCtxParams(params);
  672. DEBUGLOG(4, "single thread mode");
  673. assert(singleThreadParams.nbThreads == 0);
  674. return ZSTD_initCStream_internal(zcs->cctxPool->cctx[0],
  675. dict, dictSize, cdict,
  676. singleThreadParams, pledgedSrcSize);
  677. }
  678. if (zcs->allJobsCompleted == 0) { /* previous compression not correctly finished */
  679. ZSTDMT_waitForAllJobsCompleted(zcs);
  680. ZSTDMT_releaseAllJobResources(zcs);
  681. zcs->allJobsCompleted = 1;
  682. }
  683. zcs->params = params;
  684. zcs->frameContentSize = pledgedSrcSize;
  685. if (dict) {
  686. DEBUGLOG(4,"cdictLocal: %08X", (U32)(size_t)zcs->cdictLocal);
  687. ZSTD_freeCDict(zcs->cdictLocal);
  688. zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize,
  689. ZSTD_dlm_byCopy, dictMode, /* note : a loadPrefix becomes an internal CDict */
  690. params.cParams, zcs->cMem);
  691. zcs->cdict = zcs->cdictLocal;
  692. if (zcs->cdictLocal == NULL) return ERROR(memory_allocation);
  693. } else {
  694. DEBUGLOG(4,"cdictLocal: %08X", (U32)(size_t)zcs->cdictLocal);
  695. ZSTD_freeCDict(zcs->cdictLocal);
  696. zcs->cdictLocal = NULL;
  697. zcs->cdict = cdict;
  698. }
  699. zcs->targetDictSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog));
  700. DEBUGLOG(4, "overlapLog : %u ", params.overlapSizeLog);
  701. DEBUGLOG(4, "overlap Size : %u KB", (U32)(zcs->targetDictSize>>10));
  702. zcs->targetSectionSize = params.jobSize ? params.jobSize : (size_t)1 << (params.cParams.windowLog + 2);
  703. zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize);
  704. zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize);
  705. DEBUGLOG(4, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10));
  706. zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize;
  707. ZSTDMT_setBufferSize(zcs->bufPool, MAX(zcs->inBuffSize, ZSTD_compressBound(zcs->targetSectionSize)) );
  708. zcs->inBuff.buffer = g_nullBuffer;
  709. zcs->dictSize = 0;
  710. zcs->doneJobID = 0;
  711. zcs->nextJobID = 0;
  712. zcs->frameEnded = 0;
  713. zcs->allJobsCompleted = 0;
  714. if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0);
  715. return 0;
  716. }
  717. size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx,
  718. const void* dict, size_t dictSize,
  719. ZSTD_parameters params,
  720. unsigned long long pledgedSrcSize)
  721. {
  722. ZSTD_CCtx_params cctxParams = mtctx->params;
  723. DEBUGLOG(5, "ZSTDMT_initCStream_advanced");
  724. cctxParams.cParams = params.cParams;
  725. cctxParams.fParams = params.fParams;
  726. return ZSTDMT_initCStream_internal(mtctx, dict, dictSize, ZSTD_dm_auto, NULL,
  727. cctxParams, pledgedSrcSize);
  728. }
  729. size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
  730. const ZSTD_CDict* cdict,
  731. ZSTD_frameParameters fParams,
  732. unsigned long long pledgedSrcSize)
  733. {
  734. ZSTD_CCtx_params cctxParams = mtctx->params;
  735. cctxParams.cParams = ZSTD_getCParamsFromCDict(cdict);
  736. cctxParams.fParams = fParams;
  737. if (cdict==NULL) return ERROR(dictionary_wrong); /* method incompatible with NULL cdict */
  738. return ZSTDMT_initCStream_internal(mtctx, NULL, 0 /*dictSize*/, ZSTD_dm_auto, cdict,
  739. cctxParams, pledgedSrcSize);
  740. }
  741. /* ZSTDMT_resetCStream() :
  742. * pledgedSrcSize is optional and can be zero == unknown */
  743. size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
  744. {
  745. if (zcs->params.nbThreads==1)
  746. return ZSTD_resetCStream(zcs->cctxPool->cctx[0], pledgedSrcSize);
  747. return ZSTDMT_initCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, 0, zcs->params,
  748. pledgedSrcSize);
  749. }
  750. size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
  751. ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0);
  752. ZSTD_CCtx_params cctxParams = zcs->params;
  753. cctxParams.cParams = params.cParams;
  754. cctxParams.fParams = params.fParams;
  755. return ZSTDMT_initCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, NULL, cctxParams, 0);
  756. }
  757. static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsigned endFrame)
  758. {
  759. unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
  760. DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ",
  761. zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize);
  762. zcs->jobs[jobID].src = zcs->inBuff.buffer;
  763. zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
  764. zcs->jobs[jobID].srcSize = srcSize;
  765. zcs->jobs[jobID].dictSize = zcs->dictSize;
  766. assert(zcs->inBuff.filled >= srcSize + zcs->dictSize);
  767. zcs->jobs[jobID].params = zcs->params;
  768. /* do not calculate checksum within sections, but write it in header for first section */
  769. if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;
  770. zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
  771. zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
  772. zcs->jobs[jobID].dstBuff = g_nullBuffer;
  773. zcs->jobs[jobID].cctxPool = zcs->cctxPool;
  774. zcs->jobs[jobID].bufPool = zcs->bufPool;
  775. zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
  776. zcs->jobs[jobID].lastChunk = endFrame;
  777. zcs->jobs[jobID].jobCompleted = 0;
  778. zcs->jobs[jobID].dstFlushed = 0;
  779. zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
  780. zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
  781. if (zcs->params.fParams.checksumFlag)
  782. XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->dictSize, srcSize);
  783. /* get a new buffer for next input */
  784. if (!endFrame) {
  785. size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize);
  786. zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool);
  787. if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
  788. zcs->jobs[jobID].jobCompleted = 1;
  789. zcs->nextJobID++;
  790. ZSTDMT_waitForAllJobsCompleted(zcs);
  791. ZSTDMT_releaseAllJobResources(zcs);
  792. return ERROR(memory_allocation);
  793. }
  794. zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize;
  795. memmove(zcs->inBuff.buffer.start,
  796. (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize,
  797. zcs->inBuff.filled);
  798. zcs->dictSize = newDictSize;
  799. } else { /* if (endFrame==1) */
  800. zcs->inBuff.buffer = g_nullBuffer;
  801. zcs->inBuff.filled = 0;
  802. zcs->dictSize = 0;
  803. zcs->frameEnded = 1;
  804. if (zcs->nextJobID == 0) {
  805. /* single chunk exception : checksum is calculated directly within worker thread */
  806. zcs->params.fParams.checksumFlag = 0;
  807. } }
  808. DEBUGLOG(4, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
  809. zcs->nextJobID,
  810. (U32)zcs->jobs[jobID].srcSize,
  811. zcs->jobs[jobID].lastChunk,
  812. zcs->doneJobID,
  813. zcs->doneJobID & zcs->jobIDMask);
  814. POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */
  815. zcs->nextJobID++;
  816. return 0;
  817. }
  818. /* ZSTDMT_flushNextJob() :
  819. * output : will be updated with amount of data flushed .
  820. * blockToFlush : if >0, the function will block and wait if there is no data available to flush .
  821. * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more, or an error code */
  822. static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
  823. {
  824. unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
  825. if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */
  826. PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
  827. while (zcs->jobs[wJobID].jobCompleted==0) {
  828. DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID);
  829. if (!blockToFlush) { pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
  830. pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */
  831. }
  832. pthread_mutex_unlock(&zcs->jobCompleted_mutex);
  833. /* compression job completed : output can be flushed */
  834. { ZSTDMT_jobDescription job = zcs->jobs[wJobID];
  835. if (!job.jobScanned) {
  836. if (ZSTD_isError(job.cSize)) {
  837. DEBUGLOG(5, "compression error detected ");
  838. ZSTDMT_waitForAllJobsCompleted(zcs);
  839. ZSTDMT_releaseAllJobResources(zcs);
  840. return job.cSize;
  841. }
  842. DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag);
  843. if (zcs->params.fParams.checksumFlag) {
  844. if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */
  845. U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
  846. DEBUGLOG(5, "writing checksum : %08X \n", checksum);
  847. MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
  848. job.cSize += 4;
  849. zcs->jobs[wJobID].cSize += 4;
  850. } }
  851. zcs->jobs[wJobID].jobScanned = 1;
  852. }
  853. { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
  854. DEBUGLOG(5, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
  855. memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
  856. output->pos += toWrite;
  857. job.dstFlushed += toWrite;
  858. }
  859. if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => move to next one */
  860. ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
  861. zcs->jobs[wJobID].dstBuff = g_nullBuffer;
  862. zcs->jobs[wJobID].jobCompleted = 0;
  863. zcs->doneJobID++;
  864. } else {
  865. zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
  866. }
  867. /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
  868. if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
  869. if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some buffer to flush */
  870. zcs->allJobsCompleted = zcs->frameEnded; /* frame completed and entirely flushed */
  871. return 0; /* everything flushed */
  872. } }
  873. /** ZSTDMT_compressStream_generic() :
  874. * internal use only
  875. * assumption : output and input are valid (pos <= size)
  876. * @return : minimum amount of data remaining to flush, 0 if none */
  877. size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
  878. ZSTD_outBuffer* output,
  879. ZSTD_inBuffer* input,
  880. ZSTD_EndDirective endOp)
  881. {
  882. size_t const newJobThreshold = mtctx->dictSize + mtctx->targetSectionSize;
  883. assert(output->pos <= output->size);
  884. assert(input->pos <= input->size);
  885. if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) {
  886. /* current frame being ended. Only flush/end are allowed. Or start new frame with init */
  887. return ERROR(stage_wrong);
  888. }
  889. if (mtctx->params.nbThreads==1) { /* delegate to single-thread (synchronous) */
  890. return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp);
  891. }
  892. /* single-pass shortcut (note : this is synchronous-mode) */
  893. if ( (mtctx->nextJobID==0) /* just started */
  894. && (mtctx->inBuff.filled==0) /* nothing buffered */
  895. && (endOp==ZSTD_e_end) /* end order */
  896. && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough room */
  897. size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx,
  898. (char*)output->dst + output->pos, output->size - output->pos,
  899. (const char*)input->src + input->pos, input->size - input->pos,
  900. mtctx->cdict, mtctx->params);
  901. if (ZSTD_isError(cSize)) return cSize;
  902. input->pos = input->size;
  903. output->pos += cSize;
  904. ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->inBuff.buffer); /* was allocated in initStream */
  905. mtctx->allJobsCompleted = 1;
  906. mtctx->frameEnded = 1;
  907. return 0;
  908. }
  909. /* fill input buffer */
  910. if (input->size > input->pos) { /* support NULL input */
  911. if (mtctx->inBuff.buffer.start == NULL) {
  912. mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool);
  913. if (mtctx->inBuff.buffer.start == NULL) return ERROR(memory_allocation);
  914. mtctx->inBuff.filled = 0;
  915. }
  916. { size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled);
  917. DEBUGLOG(5, "inBuff:%08X; inBuffSize=%u; ToCopy=%u", (U32)(size_t)mtctx->inBuff.buffer.start, (U32)mtctx->inBuffSize, (U32)toLoad);
  918. memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad);
  919. input->pos += toLoad;
  920. mtctx->inBuff.filled += toLoad;
  921. } }
  922. if ( (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */
  923. && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) { /* avoid overwriting job round buffer */
  924. CHECK_F( ZSTDMT_createCompressionJob(mtctx, mtctx->targetSectionSize, 0 /* endFrame */) );
  925. }
  926. /* check for potential compressed data ready to be flushed */
  927. CHECK_F( ZSTDMT_flushNextJob(mtctx, output, (mtctx->inBuff.filled == mtctx->inBuffSize) /* blockToFlush */) ); /* block if it wasn't possible to create new job due to saturation */
  928. if (input->pos < input->size) /* input not consumed : do not flush yet */
  929. endOp = ZSTD_e_continue;
  930. switch(endOp)
  931. {
  932. case ZSTD_e_flush:
  933. return ZSTDMT_flushStream(mtctx, output);
  934. case ZSTD_e_end:
  935. return ZSTDMT_endStream(mtctx, output);
  936. case ZSTD_e_continue:
  937. return 1;
  938. default:
  939. return ERROR(GENERIC); /* invalid endDirective */
  940. }
  941. }
  942. size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
  943. {
  944. CHECK_F( ZSTDMT_compressStream_generic(zcs, output, input, ZSTD_e_continue) );
  945. /* recommended next input size : fill current input buffer */
  946. return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
  947. }
  948. static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame)
  949. {
  950. size_t const srcSize = zcs->inBuff.filled - zcs->dictSize;
  951. if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded))
  952. && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {
  953. CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) );
  954. }
  955. /* check if there is any data available to flush */
  956. return ZSTDMT_flushNextJob(zcs, output, 1 /* blockToFlush */);
  957. }
  958. size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
  959. {
  960. DEBUGLOG(5, "ZSTDMT_flushStream");
  961. if (zcs->params.nbThreads==1)
  962. return ZSTD_flushStream(zcs->cctxPool->cctx[0], output);
  963. return ZSTDMT_flushStream_internal(zcs, output, 0 /* endFrame */);
  964. }
  965. size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
  966. {
  967. DEBUGLOG(4, "ZSTDMT_endStream");
  968. if (zcs->params.nbThreads==1)
  969. return ZSTD_endStream(zcs->cctxPool->cctx[0], output);
  970. return ZSTDMT_flushStream_internal(zcs, output, 1 /* endFrame */);
  971. }