/* * Copyright (c) 2017-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under both the BSD-style license (found in the * LICENSE file in the root directory of this source tree) and the GPLv2 (found * in the COPYING file in the root directory of this source tree). */ #include /* fprintf */ #include /* malloc, free */ #include /* pthread functions */ #include /* memset */ #include "zstd_internal.h" #include "util.h" #define DISPLAY(...) fprintf(stderr, __VA_ARGS__) #define PRINT(...) fprintf(stdout, __VA_ARGS__) #define DEBUG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } } #define FILE_CHUNK_SIZE 4 << 20 #define MAX_NUM_JOBS 2 #define stdinmark "/*stdin*\\" #define stdoutmark "/*stdout*\\" #define MAX_PATH 256 #define DEFAULT_DISPLAY_LEVEL 1 #define DEFAULT_COMPRESSION_LEVEL 6 #define MAX_COMPRESSION_LEVEL_CHANGE 2 #define CONVERGENCE_LOWER_BOUND 5 #define CLEVEL_DECREASE_COOLDOWN 5 #define CHANGE_BY_TWO_THRESHOLD 0.1 #define CHANGE_BY_ONE_THRESHOLD 0.65 #ifndef DEBUG_MODE static int g_displayLevel = DEFAULT_DISPLAY_LEVEL; #else static int g_displayLevel = DEBUG_MODE; #endif static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL; static UTIL_time_t g_startTime; static size_t g_streamedSize = 0; static unsigned g_useProgressBar = 1; static UTIL_freq_t g_ticksPerSecond; static unsigned g_forceCompressionLevel = 0; static unsigned g_minCLevel = 1; static unsigned g_maxCLevel; typedef struct { void* start; size_t size; size_t capacity; } buffer_t; typedef struct { size_t filled; buffer_t buffer; } inBuff_t; typedef struct { buffer_t src; buffer_t dst; unsigned jobID; unsigned lastJobPlusOne; size_t compressedSize; size_t dictSize; } jobDescription; typedef struct { pthread_mutex_t pMutex; int noError; } mutex_t; typedef struct { pthread_cond_t pCond; int noError; } cond_t; typedef struct { unsigned compressionLevel; unsigned numJobs; unsigned nextJobID; unsigned threadError; /* * JobIDs for the next jobs to be created, compressed, and written */ unsigned jobReadyID; unsigned jobCompressedID; unsigned jobWriteID; unsigned allJobsCompleted; /* * counter for how many jobs in a row the compression level has not changed * if the counter becomes >= CONVERGENCE_LOWER_BOUND, the next time the * compression level tries to change (by non-zero amount) resets the counter * to 1 and does not apply the change */ unsigned convergenceCounter; /* * cooldown counter in order to prevent rapid successive decreases in compression level * whenever compression level is decreased, cooldown is set to CLEVEL_DECREASE_COOLDOWN * whenever adaptCompressionLevel() is called and cooldown != 0, it is decremented * as long as cooldown != 0, the compression level cannot be decreased */ unsigned cooldown; /* * XWaitYCompletion * Range from 0.0 to 1.0 * if the value is not 1.0, then this implies that thread X waited on thread Y to finish * and thread Y was XWaitYCompletion finished at the time of the wait (i.e. compressWaitWriteCompletion=0.5 * implies that the compression thread waited on the write thread and it was only 50% finished writing a job) */ double createWaitCompressionCompletion; double compressWaitCreateCompletion; double compressWaitWriteCompletion; double writeWaitCompressionCompletion; /* * Completion values * Range from 0.0 to 1.0 * Jobs are divided into mini-chunks in order to measure completion * these values are updated each time a thread finishes its operation on the * mini-chunk (i.e. finishes writing out, compressing, etc. this mini-chunk). */ double compressionCompletion; double writeCompletion; double createCompletion; mutex_t jobCompressed_mutex; cond_t jobCompressed_cond; mutex_t jobReady_mutex; cond_t jobReady_cond; mutex_t allJobsCompleted_mutex; cond_t allJobsCompleted_cond; mutex_t jobWrite_mutex; cond_t jobWrite_cond; mutex_t compressionCompletion_mutex; mutex_t createCompletion_mutex; mutex_t writeCompletion_mutex; mutex_t compressionLevel_mutex; size_t lastDictSize; inBuff_t input; jobDescription* jobs; ZSTD_CCtx* cctx; } adaptCCtx; typedef struct { adaptCCtx* ctx; FILE* dstFile; } outputThreadArg; typedef struct { FILE* srcFile; adaptCCtx* ctx; outputThreadArg* otArg; } fcResources; static void freeCompressionJobs(adaptCCtx* ctx) { unsigned u; for (u=0; unumJobs; u++) { jobDescription job = ctx->jobs[u]; free(job.dst.start); free(job.src.start); } } static int destroyMutex(mutex_t* mutex) { if (mutex->noError) { int const ret = pthread_mutex_destroy(&mutex->pMutex); return ret; } return 0; } static int destroyCond(cond_t* cond) { if (cond->noError) { int const ret = pthread_cond_destroy(&cond->pCond); return ret; } return 0; } static int freeCCtx(adaptCCtx* ctx) { if (!ctx) return 0; { int error = 0; error |= destroyMutex(&ctx->jobCompressed_mutex); error |= destroyCond(&ctx->jobCompressed_cond); error |= destroyMutex(&ctx->jobReady_mutex); error |= destroyCond(&ctx->jobReady_cond); error |= destroyMutex(&ctx->allJobsCompleted_mutex); error |= destroyCond(&ctx->allJobsCompleted_cond); error |= destroyMutex(&ctx->jobWrite_mutex); error |= destroyCond(&ctx->jobWrite_cond); error |= destroyMutex(&ctx->compressionCompletion_mutex); error |= destroyMutex(&ctx->createCompletion_mutex); error |= destroyMutex(&ctx->writeCompletion_mutex); error |= destroyMutex(&ctx->compressionLevel_mutex); error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx)); free(ctx->input.buffer.start); if (ctx->jobs){ freeCompressionJobs(ctx); free(ctx->jobs); } free(ctx); return error; } } static int initMutex(mutex_t* mutex) { int const ret = pthread_mutex_init(&mutex->pMutex, NULL); mutex->noError = !ret; return ret; } static int initCond(cond_t* cond) { int const ret = pthread_cond_init(&cond->pCond, NULL); cond->noError = !ret; return ret; } static int initCCtx(adaptCCtx* ctx, unsigned numJobs) { ctx->compressionLevel = g_compressionLevel; { int pthreadError = 0; pthreadError |= initMutex(&ctx->jobCompressed_mutex); pthreadError |= initCond(&ctx->jobCompressed_cond); pthreadError |= initMutex(&ctx->jobReady_mutex); pthreadError |= initCond(&ctx->jobReady_cond); pthreadError |= initMutex(&ctx->allJobsCompleted_mutex); pthreadError |= initCond(&ctx->allJobsCompleted_cond); pthreadError |= initMutex(&ctx->jobWrite_mutex); pthreadError |= initCond(&ctx->jobWrite_cond); pthreadError |= initMutex(&ctx->compressionCompletion_mutex); pthreadError |= initMutex(&ctx->createCompletion_mutex); pthreadError |= initMutex(&ctx->writeCompletion_mutex); pthreadError |= initMutex(&ctx->compressionLevel_mutex); if (pthreadError) return pthreadError; } ctx->numJobs = numJobs; ctx->jobReadyID = 0; ctx->jobCompressedID = 0; ctx->jobWriteID = 0; ctx->lastDictSize = 0; ctx->createWaitCompressionCompletion = 1; ctx->compressWaitCreateCompletion = 1; ctx->compressWaitWriteCompletion = 1; ctx->writeWaitCompressionCompletion = 1; ctx->createCompletion = 1; ctx->writeCompletion = 1; ctx->compressionCompletion = 1; ctx->convergenceCounter = 0; ctx->cooldown = 0; ctx->jobs = calloc(1, numJobs*sizeof(jobDescription)); if (!ctx->jobs) { DISPLAY("Error: could not allocate space for jobs during context creation\n"); return 1; } /* initializing jobs */ { unsigned jobNum; for (jobNum=0; jobNumjobs[jobNum]; job->src.start = malloc(2 * FILE_CHUNK_SIZE); job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE)); job->lastJobPlusOne = 0; if (!job->src.start || !job->dst.start) { DISPLAY("Could not allocate buffers for jobs\n"); return 1; } job->src.capacity = FILE_CHUNK_SIZE; job->dst.capacity = ZSTD_compressBound(FILE_CHUNK_SIZE); } } ctx->nextJobID = 0; ctx->threadError = 0; ctx->allJobsCompleted = 0; ctx->cctx = ZSTD_createCCtx(); if (!ctx->cctx) { DISPLAY("Error: could not allocate ZSTD_CCtx\n"); return 1; } ctx->input.filled = 0; ctx->input.buffer.capacity = 2 * FILE_CHUNK_SIZE; ctx->input.buffer.start = malloc(ctx->input.buffer.capacity); if (!ctx->input.buffer.start) { DISPLAY("Error: could not allocate input buffer\n"); return 1; } return 0; } static adaptCCtx* createCCtx(unsigned numJobs) { adaptCCtx* const ctx = calloc(1, sizeof(adaptCCtx)); if (ctx == NULL) { DISPLAY("Error: could not allocate space for context\n"); return NULL; } { int const error = initCCtx(ctx, numJobs); if (error) { freeCCtx(ctx); return NULL; } return ctx; } } static void signalErrorToThreads(adaptCCtx* ctx) { ctx->threadError = 1; pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); pthread_cond_signal(&ctx->jobReady_cond.pCond); pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond); pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); pthread_cond_signal(&ctx->jobWrite_cond.pCond); pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex); pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond); pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex); } static void waitUntilAllJobsCompleted(adaptCCtx* ctx) { if (!ctx) return; pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex); while (ctx->allJobsCompleted == 0 && !ctx->threadError) { pthread_cond_wait(&ctx->allJobsCompleted_cond.pCond, &ctx->allJobsCompleted_mutex.pMutex); } pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex); } /* map completion percentages to values for changing compression level */ static unsigned convertCompletionToChange(double completion) { if (completion < CHANGE_BY_TWO_THRESHOLD) { return 2; } else if (completion < CHANGE_BY_ONE_THRESHOLD) { return 1; } else { return 0; } } /* * Compression level is changed depending on which part of the compression process is lagging * Currently, three theads exist for job creation, compression, and file writing respectively. * adaptCompressionLevel() increments or decrements compression level based on which of the threads is lagging * job creation or file writing lag => increased compression level * compression thread lag => decreased compression level * detecting which thread is lagging is done by keeping track of how many calls each thread makes to pthread_cond_wait */ static void adaptCompressionLevel(adaptCCtx* ctx) { double createWaitCompressionCompletion; double compressWaitCreateCompletion; double compressWaitWriteCompletion; double writeWaitCompressionCompletion; double const threshold = 0.00001; unsigned prevCompressionLevel; pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); prevCompressionLevel = ctx->compressionLevel; pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); if (g_forceCompressionLevel) { pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); ctx->compressionLevel = g_compressionLevel; pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); return; } DEBUG(2, "adapting compression level %u\n", prevCompressionLevel); /* read and reset completion measurements */ pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); DEBUG(2, "createWaitCompressionCompletion %f\n", ctx->createWaitCompressionCompletion); DEBUG(2, "writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion); createWaitCompressionCompletion = ctx->createWaitCompressionCompletion; writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion; pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); DEBUG(2, "compressWaitWriteCompletion %f\n", ctx->compressWaitWriteCompletion); compressWaitWriteCompletion = ctx->compressWaitWriteCompletion; pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); DEBUG(2, "compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion); compressWaitCreateCompletion = ctx->compressWaitCreateCompletion; pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter); assert(g_minCLevel <= prevCompressionLevel && g_maxCLevel >= prevCompressionLevel); /* adaptation logic */ if (ctx->cooldown) ctx->cooldown--; if ((1-createWaitCompressionCompletion > threshold || 1-writeWaitCompressionCompletion > threshold) && ctx->cooldown == 0) { /* create or write waiting on compression */ /* use whichever one waited less because it was slower */ double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion); unsigned const change = convertCompletionToChange(completion); unsigned const boundChange = MIN(change, prevCompressionLevel - g_minCLevel); if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) { /* reset convergence counter, might have been a spike */ ctx->convergenceCounter = 0; DEBUG(2, "convergence counter reset, no change applied\n"); } else if (boundChange != 0) { pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); ctx->compressionLevel -= boundChange; pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); ctx->cooldown = CLEVEL_DECREASE_COOLDOWN; ctx->convergenceCounter = 1; DEBUG(2, "create or write threads waiting on compression, tried to decrease compression level by %u\n\n", boundChange); } } else if (1-compressWaitWriteCompletion > threshold || 1-compressWaitCreateCompletion > threshold) { /* compress waiting on write */ double const completion = MIN(compressWaitWriteCompletion, compressWaitCreateCompletion); unsigned const change = convertCompletionToChange(completion); unsigned const boundChange = MIN(change, g_maxCLevel - prevCompressionLevel); if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) { /* reset convergence counter, might have been a spike */ ctx->convergenceCounter = 0; DEBUG(2, "convergence counter reset, no change applied\n"); } else if (boundChange != 0) { pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); ctx->compressionLevel += boundChange; pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); ctx->cooldown = 0; ctx->convergenceCounter = 1; DEBUG(2, "compress waiting on write or create, tried to increase compression level by %u\n\n", boundChange); } } pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); if (ctx->compressionLevel == prevCompressionLevel) { ctx->convergenceCounter++; } pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); } static size_t getUseableDictSize(unsigned compressionLevel) { ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0); unsigned const overlapLog = compressionLevel >= (unsigned)ZSTD_maxCLevel() ? 0 : 3; size_t const overlapSize = 1 << (params.cParams.windowLog - overlapLog); return overlapSize; } static void* compressionThread(void* arg) { adaptCCtx* const ctx = (adaptCCtx*)arg; unsigned currJob = 0; for ( ; ; ) { unsigned const currJobIndex = currJob % ctx->numJobs; jobDescription* const job = &ctx->jobs[currJobIndex]; DEBUG(2, "starting compression for job %u\n", currJob); { /* check if compression thread will have to wait */ unsigned willWaitForCreate = 0; unsigned willWaitForWrite = 0; pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); if (currJob + 1 > ctx->jobReadyID) willWaitForCreate = 1; pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); if (currJob - ctx->jobWriteID >= ctx->numJobs) willWaitForWrite = 1; pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); if (willWaitForCreate) { DEBUG(2, "compression will wait for create on job %u\n", currJob); ctx->compressWaitCreateCompletion = ctx->createCompletion; DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion); } else { ctx->compressWaitCreateCompletion = 1; } pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); if (willWaitForWrite) { DEBUG(2, "compression will wait for write on job %u\n", currJob); ctx->compressWaitWriteCompletion = ctx->writeCompletion; DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion); } else { ctx->compressWaitWriteCompletion = 1; } pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); } /* wait until job is ready */ pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); while (currJob + 1 > ctx->jobReadyID && !ctx->threadError) { pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); /* wait until job previously in this space is written */ pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) { pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); /* reset compression completion */ pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); ctx->compressionCompletion = 0; pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); /* adapt compression level */ if (currJob) adaptCompressionLevel(ctx); pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); DEBUG(2, "job %u compressed with level %u\n", currJob, ctx->compressionLevel); pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); /* compress the data */ { size_t const compressionBlockSize = ZSTD_BLOCKSIZE_MAX; /* 128 KB */ unsigned cLevel; unsigned blockNum = 0; size_t remaining = job->src.size; size_t srcPos = 0; size_t dstPos = 0; pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); cLevel = ctx->compressionLevel; pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); /* reset compressed size */ job->compressedSize = 0; DEBUG(2, "calling ZSTD_compressBegin()\n"); /* begin compression */ { size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize); size_t const dictModeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceRawDict, 1); ZSTD_parameters params = ZSTD_getParams(cLevel, 0, useDictSize); params.cParams.windowLog = 23; { size_t const initError = ZSTD_compressBegin_advanced(ctx->cctx, job->src.start + job->dictSize - useDictSize, useDictSize, params, 0); size_t const windowSizeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceWindow, 1); if (ZSTD_isError(dictModeError) || ZSTD_isError(initError) || ZSTD_isError(windowSizeError)) { DISPLAY("Error: something went wrong while starting compression\n"); signalErrorToThreads(ctx); return arg; } } } DEBUG(2, "finished with ZSTD_compressBegin()\n"); do { size_t const actualBlockSize = MIN(remaining, compressionBlockSize); /* continue compression */ if (currJob != 0 || blockNum != 0) { /* not first block of first job flush/overwrite the frame header */ size_t const hSize = ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, 0); if (ZSTD_isError(hSize)) { DISPLAY("Error: something went wrong while continuing compression\n"); job->compressedSize = hSize; signalErrorToThreads(ctx); return arg; } ZSTD_invalidateRepCodes(ctx->cctx); } { size_t const ret = (job->lastJobPlusOne == currJob + 1 && remaining == actualBlockSize) ? ZSTD_compressEnd (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) : ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize); if (ZSTD_isError(ret)) { DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(ret)); signalErrorToThreads(ctx); return arg; } job->compressedSize += ret; remaining -= actualBlockSize; srcPos += actualBlockSize; dstPos += ret; blockNum++; /* update completion */ pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); ctx->compressionCompletion = 1 - (double)remaining/job->src.size; pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); } } while (remaining != 0); job->dst.size = job->compressedSize; } pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); ctx->jobCompressedID++; pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond); pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) { /* finished compressing all jobs */ break; } DEBUG(2, "finished compressing job %u\n", currJob); currJob++; } return arg; } static void displayProgress(unsigned cLevel, unsigned last) { UTIL_time_t currTime; UTIL_getTime(&currTime); if (!g_useProgressBar) return; { double const timeElapsed = (double)(UTIL_getSpanTimeMicro(g_ticksPerSecond, g_startTime, currTime) / 1000.0); double const sizeMB = (double)g_streamedSize / (1 << 20); double const avgCompRate = sizeMB * 1000 / timeElapsed; fprintf(stderr, "\r| Comp. Level: %2u | Time Elapsed: %7.2f s | Data Size: %7.1f MB | Avg Comp. Rate: %6.2f MB/s |", cLevel, timeElapsed/1000.0, sizeMB, avgCompRate); if (last) { fprintf(stderr, "\n"); } else { fflush(stderr); } } } static void* outputThread(void* arg) { outputThreadArg* const otArg = (outputThreadArg*)arg; adaptCCtx* const ctx = otArg->ctx; FILE* const dstFile = otArg->dstFile; unsigned currJob = 0; for ( ; ; ) { unsigned const currJobIndex = currJob % ctx->numJobs; jobDescription* const job = &ctx->jobs[currJobIndex]; unsigned willWaitForCompress = 0; DEBUG(2, "starting write for job %u\n", currJob); pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); if (currJob + 1 > ctx->jobCompressedID) willWaitForCompress = 1; pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); if (willWaitForCompress) { /* write thread is waiting on compression thread */ ctx->writeWaitCompressionCompletion = ctx->compressionCompletion; DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion); } else { ctx->writeWaitCompressionCompletion = 1; } pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); /* reset write completion */ pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); ctx->writeCompletion = 0; pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); { size_t const compressedSize = job->compressedSize; size_t remaining = compressedSize; if (ZSTD_isError(compressedSize)) { DISPLAY("Error: an error occurred during compression\n"); signalErrorToThreads(ctx); return arg; } { size_t const blockSize = MAX(compressedSize >> 7, 1 << 10); size_t pos = 0; for ( ; ; ) { size_t const writeSize = MIN(remaining, blockSize); size_t const ret = fwrite(job->dst.start + pos, 1, writeSize, dstFile); if (ret != writeSize) break; pos += ret; remaining -= ret; /* update completion variable for writing */ pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); ctx->writeCompletion = 1 - (double)remaining/compressedSize; pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); if (remaining == 0) break; } if (pos != compressedSize) { DISPLAY("Error: an error occurred during file write operation\n"); signalErrorToThreads(ctx); return arg; } } } { unsigned cLevel; pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); cLevel = ctx->compressionLevel; pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); displayProgress(cLevel, job->lastJobPlusOne == currJob + 1); } pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); ctx->jobWriteID++; pthread_cond_signal(&ctx->jobWrite_cond.pCond); pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) { /* finished with all jobs */ pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex); ctx->allJobsCompleted = 1; pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond); pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex); break; } DEBUG(2, "finished writing job %u\n", currJob); currJob++; } return arg; } static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) { unsigned const nextJob = ctx->nextJobID; unsigned const nextJobIndex = nextJob % ctx->numJobs; jobDescription* const job = &ctx->jobs[nextJobIndex]; job->src.size = srcSize; job->jobID = nextJob; if (last) job->lastJobPlusOne = nextJob + 1; { /* swap buffer */ void* const copy = job->src.start; job->src.start = ctx->input.buffer.start; ctx->input.buffer.start = copy; } job->dictSize = ctx->lastDictSize; ctx->nextJobID++; /* if not on the last job, reuse data as dictionary in next job */ if (!last) { size_t const oldDictSize = ctx->lastDictSize; memcpy(ctx->input.buffer.start, job->src.start + oldDictSize, srcSize); ctx->lastDictSize = srcSize; ctx->input.filled = srcSize; } /* signal job ready */ pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); ctx->jobReadyID++; pthread_cond_signal(&ctx->jobReady_cond.pCond); pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); return 0; } static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadArg* otArg) { /* early error check to exit */ if (!ctx || !srcFile || !otArg) { return 1; } /* create output thread */ { pthread_t out; if (pthread_create(&out, NULL, &outputThread, otArg)) { DISPLAY("Error: could not create output thread\n"); signalErrorToThreads(ctx); return 1; } else if (pthread_detach(out)) { DISPLAY("Error: could not detach output thread\n"); signalErrorToThreads(ctx); return 1; } } /* create compression thread */ { pthread_t compression; if (pthread_create(&compression, NULL, &compressionThread, ctx)) { DISPLAY("Error: could not create compression thread\n"); signalErrorToThreads(ctx); return 1; } else if (pthread_detach(compression)) { DISPLAY("Error: could not detach compression thread\n"); signalErrorToThreads(ctx); return 1; } } { unsigned currJob = 0; /* creating jobs */ for ( ; ; ) { size_t pos = 0; size_t const readBlockSize = 1 << 15; size_t remaining = FILE_CHUNK_SIZE; unsigned const nextJob = ctx->nextJobID; unsigned willWaitForCompress = 0; DEBUG(2, "starting creation of job %u\n", currJob); pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); if (nextJob - ctx->jobCompressedID >= ctx->numJobs) willWaitForCompress = 1; pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); if (willWaitForCompress) { /* creation thread is waiting, take measurement of completion */ ctx->createWaitCompressionCompletion = ctx->compressionCompletion; DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion); } else { ctx->createWaitCompressionCompletion = 1; } pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); /* wait until the job has been compressed */ pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) { pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); /* reset create completion */ pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); ctx->createCompletion = 0; pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); while (remaining != 0 && !feof(srcFile)) { size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile); if (ret != readBlockSize && !feof(srcFile)) { /* error could not read correct number of bytes */ DISPLAY("Error: problem occurred during read from src file\n"); signalErrorToThreads(ctx); return 1; } pos += ret; remaining -= ret; pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE); pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); } if (remaining != 0 && !feof(srcFile)) { DISPLAY("Error: problem occurred during read from src file\n"); signalErrorToThreads(ctx); return 1; } g_streamedSize += pos; /* reading was fine, now create the compression job */ { int const last = feof(srcFile); int const error = createCompressionJob(ctx, pos, last); if (error != 0) { signalErrorToThreads(ctx); return error; } } DEBUG(2, "finished creating job %u\n", currJob); currJob++; if (feof(srcFile)) { break; } } } /* success -- created all jobs */ return 0; } static fcResources createFileCompressionResources(const char* const srcFilename, const char* const dstFilenameOrNull) { fcResources fcr; unsigned const stdinUsed = !strcmp(srcFilename, stdinmark); FILE* const srcFile = stdinUsed ? stdin : fopen(srcFilename, "rb"); const char* const outFilenameIntermediate = (stdinUsed && !dstFilenameOrNull) ? stdoutmark : dstFilenameOrNull; const char* outFilename = outFilenameIntermediate; char fileAndSuffix[MAX_PATH]; size_t const numJobs = MAX_NUM_JOBS; memset(&fcr, 0, sizeof(fcr)); if (!outFilenameIntermediate) { if (snprintf(fileAndSuffix, MAX_PATH, "%s.zst", srcFilename) + 1 > MAX_PATH) { DISPLAY("Error: output filename is too long\n"); return fcr; } outFilename = fileAndSuffix; } { unsigned const stdoutUsed = !strcmp(outFilename, stdoutmark); FILE* const dstFile = stdoutUsed ? stdout : fopen(outFilename, "wb"); fcr.otArg = malloc(sizeof(outputThreadArg)); if (!fcr.otArg) { DISPLAY("Error: could not allocate space for output thread argument\n"); return fcr; } fcr.otArg->dstFile = dstFile; } /* checking for errors */ if (!fcr.otArg->dstFile || !srcFile) { DISPLAY("Error: some file(s) could not be opened\n"); return fcr; } /* creating context */ fcr.ctx = createCCtx(numJobs); fcr.otArg->ctx = fcr.ctx; fcr.srcFile = srcFile; return fcr; } static int freeFileCompressionResources(fcResources* fcr) { int ret = 0; waitUntilAllJobsCompleted(fcr->ctx); ret |= (fcr->srcFile != NULL) ? fclose(fcr->srcFile) : 0; ret |= (fcr->ctx != NULL) ? freeCCtx(fcr->ctx) : 0; if (fcr->otArg) { ret |= (fcr->otArg->dstFile != stdout) ? fclose(fcr->otArg->dstFile) : 0; free(fcr->otArg); /* no need to freeCCtx() on otArg->ctx because it should be the same context */ } return ret; } static int compressFilename(const char* const srcFilename, const char* const dstFilenameOrNull) { int ret = 0; fcResources fcr = createFileCompressionResources(srcFilename, dstFilenameOrNull); UTIL_getTime(&g_startTime); g_streamedSize = 0; ret |= performCompression(fcr.ctx, fcr.srcFile, fcr.otArg); ret |= freeFileCompressionResources(&fcr); return ret; } static int compressFilenames(const char** filenameTable, unsigned numFiles, unsigned forceStdout) { int ret = 0; unsigned fileNum; for (fileNum=0; fileNum MAX_UINT */ static unsigned readU32FromChar(const char** stringPtr) { unsigned result = 0; while ((**stringPtr >='0') && (**stringPtr <='9')) result *= 10, result += **stringPtr - '0', (*stringPtr)++ ; if ((**stringPtr=='K') || (**stringPtr=='M')) { result <<= 10; if (**stringPtr=='M') result <<= 10; (*stringPtr)++ ; if (**stringPtr=='i') (*stringPtr)++; if (**stringPtr=='B') (*stringPtr)++; } return result; } static void help(const char* progPath) { PRINT("Usage:\n"); PRINT(" %s [options] [file(s)]\n", progPath); PRINT("\n"); PRINT("Options:\n"); PRINT(" -oFILE : specify the output file name\n"); PRINT(" -i# : provide initial compression level -- default %d, must be in the range [L, U] where L and U are bound values (see below for defaults)\n", DEFAULT_COMPRESSION_LEVEL); PRINT(" -h : display help/information\n"); PRINT(" -f : force the compression level to stay constant\n"); PRINT(" -c : force write to stdout\n"); PRINT(" -p : hide progress bar\n"); PRINT(" -q : quiet mode -- do not show progress bar or other information\n"); PRINT(" -l# : provide lower bound for compression level -- default 1\n"); PRINT(" -u# : provide upper bound for compression level -- default %u\n", ZSTD_maxCLevel()); } /* return 0 if successful, else return error */ int main(int argCount, const char* argv[]) { const char* outFilename = NULL; const char** filenameTable = (const char**)malloc(argCount*sizeof(const char*)); unsigned filenameIdx = 0; unsigned forceStdout = 0; unsigned providedInitialCLevel = 0; int ret = 0; int argNum; filenameTable[0] = stdinmark; g_maxCLevel = ZSTD_maxCLevel(); UTIL_initTimer(&g_ticksPerSecond); if (filenameTable == NULL) { DISPLAY("Error: could not allocate sapce for filename table.\n"); return 1; } for (argNum=1; argNum 1) { switch (argument[1]) { case 'o': argument += 2; outFilename = argument; break; case 'i': argument += 2; g_compressionLevel = readU32FromChar(&argument); providedInitialCLevel = 1; break; case 'h': help(argv[0]); goto _main_exit; case 'p': g_useProgressBar = 0; break; case 'c': forceStdout = 1; outFilename = stdoutmark; break; case 'f': g_forceCompressionLevel = 1; break; case 'q': g_useProgressBar = 0; g_displayLevel = 0; break; case 'l': argument += 2; g_minCLevel = readU32FromChar(&argument); break; case 'u': argument += 2; g_maxCLevel = readU32FromChar(&argument); break; default: DISPLAY("Error: invalid argument provided\n"); ret = 1; goto _main_exit; } continue; } /* regular files to be compressed */ filenameTable[filenameIdx++] = argument; } /* check initial, max, and min compression levels */ { unsigned const minMaxInconsistent = g_minCLevel > g_maxCLevel; unsigned const initialNotInRange = g_minCLevel > g_compressionLevel || g_maxCLevel < g_compressionLevel; if (minMaxInconsistent || (initialNotInRange && providedInitialCLevel)) { DISPLAY("Error: provided compression level parameters are invalid\n"); ret = 1; goto _main_exit; } else if (initialNotInRange) { g_compressionLevel = g_minCLevel; } } /* error checking with number of files */ if (filenameIdx > 1 && (outFilename != NULL && strcmp(outFilename, stdoutmark))) { DISPLAY("Error: multiple input files provided, cannot use specified output file\n"); ret = 1; goto _main_exit; } /* compress files */ if (filenameIdx <= 1) { ret |= compressFilename(filenameTable[0], outFilename); } else { ret |= compressFilenames(filenameTable, filenameIdx, forceStdout); } _main_exit: free(filenameTable); return ret; }