/*********************************************************************
  Blosc - Blocked Shuffling and Compression Library

  Author: Francesc Alted <francesc@blosc.org>
  Creation date: 2009-05-20

  See LICENSE.txt for details about copyright and rights to use.
**********************************************************************/


#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <assert.h>

#include "fastcopy.h"

#if defined(USING_CMAKE)
  #include "config.h"
#endif /*  USING_CMAKE */
#include "blosc.h"
#include "shuffle.h"
#include "blosclz.h"
#if defined(HAVE_LZ4)
  #include "lz4.h"
  #include "lz4hc.h"
#endif /*  HAVE_LZ4 */
#if defined(HAVE_SNAPPY)
  #include "snappy-c.h"
#endif /*  HAVE_SNAPPY */
#if defined(HAVE_ZLIB)
  #include "zlib.h"
#endif /*  HAVE_ZLIB */
#if defined(HAVE_ZSTD)
  #include "zstd.h"
#endif /*  HAVE_ZSTD */

#if defined(_WIN32) && !defined(__MINGW32__)
  #include <windows.h>
  #include <malloc.h>

  /* stdint.h only available in VS2010 (VC++ 16.0) and newer */
  #if defined(_MSC_VER) && _MSC_VER < 1600
    #include "win32/stdint-windows.h"
  #else
    #include <stdint.h>
  #endif

  #include <process.h>
  #define getpid _getpid
#else
  #include <stdint.h>
  #include <unistd.h>
  #include <inttypes.h>
#endif  /* _WIN32 */

/* Include the win32/pthread.h library for all the Windows builds. See #224. */
#if defined(_WIN32)
  #include "win32/pthread.h"
  #include "win32/pthread.c"
#else
  #include <pthread.h>
#endif


/* Some useful units */
#define KB 1024
#define MB (1024 * (KB))

/* Minimum buffer size to be compressed */
#define MIN_BUFFERSIZE 128       /* Cannot be smaller than 66 */

/* The maximum number of splits in a block for compression */
#define MAX_SPLITS 16            /* Cannot be larger than 128 */

/* The size of L1 cache.  32 KB is quite common nowadays. */
#define L1 (32 * (KB))

/* Have problems using posix barriers when symbol value is 200112L */
/* This requires more investigation, but will work for the moment */
#if defined(_POSIX_BARRIERS) && ( (_POSIX_BARRIERS - 20012L) >= 0 && _POSIX_BARRIERS != 200112L)
#define _POSIX_BARRIERS_MINE
#endif
/* Synchronization variables */


struct blosc_context {
  int32_t compress;               /* 1 if we are doing compression 0 if decompress */

  const uint8_t* src;
  uint8_t* dest;                  /* The current pos in the destination buffer */
  uint8_t* header_flags;          /* Flags for header */
  int compversion;                /* Compressor version byte, only used during decompression */
  int32_t sourcesize;             /* Number of bytes in source buffer (or uncompressed bytes in compressed file) */
  int32_t compressedsize;         /* Number of bytes of compressed data (only used when decompressing) */
  int32_t nblocks;                /* Number of total blocks in buffer */
  int32_t leftover;               /* Extra bytes at end of buffer */
  int32_t blocksize;              /* Length of the block in bytes */
  int32_t typesize;               /* Type size */
  int32_t num_output_bytes;       /* Counter for the number of output bytes */
  int32_t destsize;               /* Maximum size for destination buffer */
  uint8_t* bstarts;               /* Start of the buffer past header info */
  int32_t compcode;               /* Compressor code to use */
  int clevel;                     /* Compression level (1-9) */
  /* Function to use for decompression.  Only used when decompression */
  int (*decompress_func)(const void* input, int compressed_length, void* output,
                         int maxout);

  /* Threading */
  int32_t numthreads;
  int32_t threads_started;
  int32_t end_threads;
  pthread_t threads[BLOSC_MAX_THREADS];
  int32_t tids[BLOSC_MAX_THREADS];
  pthread_mutex_t count_mutex;
  #ifdef _POSIX_BARRIERS_MINE
  pthread_barrier_t barr_init;
  pthread_barrier_t barr_finish;
  #else
  int32_t count_threads;
  pthread_mutex_t count_threads_mutex;
  pthread_cond_t count_threads_cv;
  #endif
  #if !defined(_WIN32)
  pthread_attr_t ct_attr;            /* creation time attrs for threads */
  #endif
  int32_t thread_giveup_code;               /* error code when give up */
  int32_t thread_nblock;                    /* block counter */
};

struct thread_context {
  struct blosc_context* parent_context;
  int32_t tid;
  uint8_t* tmp;
  uint8_t* tmp2;
  uint8_t* tmp3;
  int32_t tmpblocksize; /* Used to keep track of how big the temporary buffers are */
};

/* Global context for non-contextual API */
static struct blosc_context* g_global_context;
static pthread_mutex_t* global_comp_mutex;
static int32_t g_compressor = BLOSC_BLOSCLZ;  /* the compressor to use by default */
static int32_t g_threads = 1;
static int32_t g_force_blocksize = 0;
static int32_t g_initlib = 0;
static int32_t g_atfork_registered = 0;
static int32_t g_splitmode = BLOSC_FORWARD_COMPAT_SPLIT;



/* Wrapped function to adjust the number of threads used by blosc */
int blosc_set_nthreads_(struct blosc_context*);

/* Releases the global threadpool */
int blosc_release_threadpool(struct blosc_context* context);

/* Macros for synchronization */

/* Wait until all threads are initialized */
#ifdef _POSIX_BARRIERS_MINE
#define WAIT_INIT(RET_VAL, CONTEXT_PTR)  \
  rc = pthread_barrier_wait(&CONTEXT_PTR->barr_init); \
  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
    printf("Could not wait on barrier (init): %d\n", rc); \
    return((RET_VAL));                            \
  }
#else
#define WAIT_INIT(RET_VAL, CONTEXT_PTR)   \
  pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
  if (CONTEXT_PTR->count_threads < CONTEXT_PTR->numthreads) { \
    CONTEXT_PTR->count_threads++;  \
    pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
  } \
  else { \
    pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
  } \
  pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
#endif

/* Wait for all threads to finish */
#ifdef _POSIX_BARRIERS_MINE
#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)   \
  rc = pthread_barrier_wait(&CONTEXT_PTR->barr_finish); \
  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
    printf("Could not wait on barrier (finish)\n"); \
    return((RET_VAL));                              \
  }
#else
#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)                           \
  pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
  if (CONTEXT_PTR->count_threads > 0) { \
    CONTEXT_PTR->count_threads--; \
    pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
  } \
  else { \
    pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
  } \
  pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
#endif


/* A function for aligned malloc that is portable */
static uint8_t *my_malloc(size_t size)
{
  void *block = NULL;
  int res = 0;

/* Do an alignment to 32 bytes because AVX2 is supported */
#if defined(_WIN32)
  /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
  block = (void *)_aligned_malloc(size, 32);
#elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
  /* Platform does have an implementation of posix_memalign */
  res = posix_memalign(&block, 32, size);
#else
  block = malloc(size);
#endif  /* _WIN32 */

  if (block == NULL || res != 0) {
    printf("Error allocating memory!");
    return NULL;
  }

  return (uint8_t *)block;
}


/* Release memory booked by my_malloc */
static void my_free(void *block)
{
#if defined(_WIN32)
    _aligned_free(block);
#else
    free(block);
#endif  /* _WIN32 */
}


/* Copy 4 bytes from `*pa` to int32_t, changing endianness if necessary. */
static int32_t sw32_(const uint8_t *pa)
{
  int32_t idest;
  uint8_t *dest = (uint8_t *)&idest;
  int i = 1;                    /* for big/little endian detection */
  char *p = (char *)&i;

  if (p[0] != 1) {
    /* big endian */
    dest[0] = pa[3];
    dest[1] = pa[2];
    dest[2] = pa[1];
    dest[3] = pa[0];
  }
  else {
    /* little endian */
    dest[0] = pa[0];
    dest[1] = pa[1];
    dest[2] = pa[2];
    dest[3] = pa[3];
  }
  return idest;
}


/* Copy 4 bytes from `*pa` to `*dest`, changing endianness if necessary. */
static void _sw32(uint8_t* dest, int32_t a)
{
  uint8_t *pa = (uint8_t *)&a;
  int i = 1;                    /* for big/little endian detection */
  char *p = (char *)&i;

  if (p[0] != 1) {
    /* big endian */
    dest[0] = pa[3];
    dest[1] = pa[2];
    dest[2] = pa[1];
    dest[3] = pa[0];
  }
  else {
    /* little endian */
    dest[0] = pa[0];
    dest[1] = pa[1];
    dest[2] = pa[2];
    dest[3] = pa[3];
  }
}

/*
 * Conversion routines between compressor and compression libraries
 */

/* Return the library code associated with the compressor name */
static int compname_to_clibcode(const char *compname)
{
  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
    return BLOSC_BLOSCLZ_LIB;
  if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
    return BLOSC_LZ4_LIB;
  if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
    return BLOSC_LZ4_LIB;
  if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0)
    return BLOSC_SNAPPY_LIB;
  if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
    return BLOSC_ZLIB_LIB;
  if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0)
    return BLOSC_ZSTD_LIB;
  return -1;
}

/* Return the library name associated with the compressor code */
static const char *clibcode_to_clibname(int clibcode)
{
  if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
  if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
  if (clibcode == BLOSC_SNAPPY_LIB) return BLOSC_SNAPPY_LIBNAME;
  if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
  if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME;
  return NULL;                  /* should never happen */
}


/*
 * Conversion routines between compressor names and compressor codes
 */

/* Get the compressor name associated with the compressor code */
int blosc_compcode_to_compname(int compcode, const char **compname)
{
  int code = -1;    /* -1 means non-existent compressor code */
  const char *name = NULL;

  /* Map the compressor code */
  if (compcode == BLOSC_BLOSCLZ)
    name = BLOSC_BLOSCLZ_COMPNAME;
  else if (compcode == BLOSC_LZ4)
    name = BLOSC_LZ4_COMPNAME;
  else if (compcode == BLOSC_LZ4HC)
    name = BLOSC_LZ4HC_COMPNAME;
  else if (compcode == BLOSC_SNAPPY)
    name = BLOSC_SNAPPY_COMPNAME;
  else if (compcode == BLOSC_ZLIB)
    name = BLOSC_ZLIB_COMPNAME;
  else if (compcode == BLOSC_ZSTD)
    name = BLOSC_ZSTD_COMPNAME;

  *compname = name;

  /* Guess if there is support for this code */
  if (compcode == BLOSC_BLOSCLZ)
    code = BLOSC_BLOSCLZ;
#if defined(HAVE_LZ4)
  else if (compcode == BLOSC_LZ4)
    code = BLOSC_LZ4;
  else if (compcode == BLOSC_LZ4HC)
    code = BLOSC_LZ4HC;
#endif /*  HAVE_LZ4 */
#if defined(HAVE_SNAPPY)
  else if (compcode == BLOSC_SNAPPY)
    code = BLOSC_SNAPPY;
#endif /*  HAVE_SNAPPY */
#if defined(HAVE_ZLIB)
  else if (compcode == BLOSC_ZLIB)
    code = BLOSC_ZLIB;
#endif /*  HAVE_ZLIB */
#if defined(HAVE_ZSTD)
  else if (compcode == BLOSC_ZSTD)
    code = BLOSC_ZSTD;
#endif /*  HAVE_ZSTD */

  return code;
}

/* Get the compressor code for the compressor name. -1 if it is not available */
int blosc_compname_to_compcode(const char *compname)
{
  int code = -1;  /* -1 means non-existent compressor code */

  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
    code = BLOSC_BLOSCLZ;
  }
#if defined(HAVE_LZ4)
  else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
    code = BLOSC_LZ4;
  }
  else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
    code = BLOSC_LZ4HC;
  }
#endif /*  HAVE_LZ4 */
#if defined(HAVE_SNAPPY)
  else if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0) {
    code = BLOSC_SNAPPY;
  }
#endif /*  HAVE_SNAPPY */
#if defined(HAVE_ZLIB)
  else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
    code = BLOSC_ZLIB;
  }
#endif /*  HAVE_ZLIB */
#if defined(HAVE_ZSTD)
  else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) {
    code = BLOSC_ZSTD;
  }
#endif /*  HAVE_ZSTD */

return code;
}


#if defined(HAVE_LZ4)
static int lz4_wrap_compress(const char* input, size_t input_length,
                             char* output, size_t maxout, int accel)
{
  int cbytes;
  cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout,
                             accel);
  return cbytes;
}

static int lz4hc_wrap_compress(const char* input, size_t input_length,
                               char* output, size_t maxout, int clevel)
{
  int cbytes;
  if (input_length > (size_t)(UINT32_C(2)<<30))
    return -1;   /* input larger than 2 GB is not supported */
  /* clevel for lz4hc goes up to 12, at least in LZ4 1.7.5
   * but levels larger than 9 do not buy much compression. */
  cbytes = LZ4_compress_HC(input, output, (int)input_length, (int)maxout,
                           clevel);
  return cbytes;
}

static int lz4_wrap_decompress(const void* input, int compressed_length,
                               void* output, int maxout)
{
  return LZ4_decompress_safe(input, output, compressed_length, maxout);
}

#endif /* HAVE_LZ4 */

#if defined(HAVE_SNAPPY)
static int snappy_wrap_compress(const char* input, size_t input_length,
                                char* output, size_t maxout)
{
  snappy_status status;
  size_t cl = maxout;
  status = snappy_compress(input, input_length, output, &cl);
  if (status != SNAPPY_OK){
    return 0;
  }
  return (int)cl;
}

static int snappy_wrap_decompress(const void* input, int compressed_length,
                                  void* output, int maxout)
{
  snappy_status status;
  size_t ul = maxout;
  status = snappy_uncompress(input, compressed_length, output, &ul);
  if (status != SNAPPY_OK){
    return 0;
  }
  return (int)ul;
}
#endif /* HAVE_SNAPPY */

#if defined(HAVE_ZLIB)
/* zlib is not very respectful with sharing name space with others.
 Fortunately, its names do not collide with those already in blosc. */
static int zlib_wrap_compress(const char* input, size_t input_length,
                              char* output, size_t maxout, int clevel)
{
  int status;
  uLongf cl = maxout;
  status = compress2(
             (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
  if (status != Z_OK){
    return 0;
  }
  return (int)cl;
}

static int zlib_wrap_decompress(const void* input, int compressed_length,
                                void* output, int maxout) {
  int status;
  uLongf ul = maxout;
  status = uncompress(
             (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
  if (status != Z_OK){
    return 0;
  }
  return (int)ul;
}
#endif /*  HAVE_ZLIB */

#if defined(HAVE_ZSTD)
static int zstd_wrap_compress(const char* input, size_t input_length,
                              char* output, size_t maxout, int clevel) {
  size_t code;
  clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();
  /* Make the level 8 close enough to maxCLevel */
  if (clevel == 8) clevel = ZSTD_maxCLevel() - 2;
  code = ZSTD_compress(
      (void*)output, maxout, (void*)input, input_length, clevel);
  if (ZSTD_isError(code)) {
    return 0;
  }
  return (int)code;
}

static int zstd_wrap_decompress(const void* input, int compressed_length,
                                void* output, int maxout) {
  size_t code;
  code = ZSTD_decompress(
      (void*)output, maxout, (void*)input, compressed_length);
  if (ZSTD_isError(code)) {
    return 0;
  }
  return (int)code;
}
#endif /*  HAVE_ZSTD */

static int initialize_decompress_func(struct blosc_context* context) {
  int8_t header_flags = *(context->header_flags);
  int32_t compformat = (header_flags & 0xe0) >> 5;
  int compversion = context->compversion;

  if (compformat == BLOSC_BLOSCLZ_FORMAT) {
    if (compversion != BLOSC_BLOSCLZ_VERSION_FORMAT) {
      return -9;
    }
    context->decompress_func = &blosclz_decompress;
    return 0;
  }
#if defined(HAVE_LZ4)
  if (compformat == BLOSC_LZ4_FORMAT) {
    if (compversion != BLOSC_LZ4_VERSION_FORMAT) {
      return -9;
    }
    context->decompress_func = &lz4_wrap_decompress;
    return 0;
  }
#endif /*  HAVE_LZ4 */
#if defined(HAVE_SNAPPY)
  if (compformat == BLOSC_SNAPPY_FORMAT) {
    if (compversion != BLOSC_SNAPPY_VERSION_FORMAT) {
      return -9;
    }
    context->decompress_func = &snappy_wrap_decompress;
    return 0;
  }
#endif /*  HAVE_SNAPPY */
#if defined(HAVE_ZLIB)
  if (compformat == BLOSC_ZLIB_FORMAT) {
    if (compversion != BLOSC_ZLIB_VERSION_FORMAT) {
      return -9;
    }
    context->decompress_func = &zlib_wrap_decompress;
    return 0;
  }
#endif /*  HAVE_ZLIB */
#if defined(HAVE_ZSTD)
  if (compformat == BLOSC_ZSTD_FORMAT) {
    if (compversion != BLOSC_ZSTD_VERSION_FORMAT) {
      return -9;
    }
    context->decompress_func = &zstd_wrap_decompress;
    return 0;
  }
#endif /*  HAVE_ZSTD */
  return -5; /* signals no decompression support */
}

/* Compute acceleration for blosclz */
static int get_accel(const struct blosc_context* context) {
  int32_t clevel = context->clevel;

  if (context->compcode == BLOSC_LZ4) {
    /* This acceleration setting based on discussions held in:
     * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
     */
    return (10 - clevel);
  }
  return 1;
}


/* Shuffle & compress a single block */
static int blosc_c(const struct blosc_context* context, int32_t blocksize,
                   int32_t leftoverblock, int32_t ntbytes, int32_t maxbytes,
                   const uint8_t *src, uint8_t *dest, uint8_t *tmp,
                   uint8_t *tmp2)
{
  int8_t header_flags = *(context->header_flags);
  int dont_split = (header_flags & 0x10) >> 4;
  int32_t j, neblock, nsplits;
  int32_t cbytes;                   /* number of compressed bytes in split */
  int32_t ctbytes = 0;              /* number of compressed bytes in block */
  int32_t maxout;
  int32_t typesize = context->typesize;
  const uint8_t *_tmp = src;
  const char *compname;
  int accel;
  int bscount;
  int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
  int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
                      (blocksize >= typesize));

  if (doshuffle) {
    /* Byte shuffling only makes sense if typesize > 1 */
    blosc_internal_shuffle(typesize, blocksize, src, tmp);
    _tmp = tmp;
  }
  /* We don't allow more than 1 filter at the same time (yet) */
  else if (dobitshuffle) {
    bscount = blosc_internal_bitshuffle(typesize, blocksize, src, tmp, tmp2);
    if (bscount < 0)
      return bscount;
    _tmp = tmp;
  }

  /* Calculate acceleration for different compressors */
  accel = get_accel(context);

  /* The number of splits for this block */
  if (!dont_split && !leftoverblock) {
    nsplits = typesize;
  }
  else {
    nsplits = 1;
  }
  neblock = blocksize / nsplits;
  for (j = 0; j < nsplits; j++) {
    dest += sizeof(int32_t);
    ntbytes += (int32_t)sizeof(int32_t);
    ctbytes += (int32_t)sizeof(int32_t);
    maxout = neblock;
    #if defined(HAVE_SNAPPY)
    if (context->compcode == BLOSC_SNAPPY) {
      /* TODO perhaps refactor this to keep the value stashed somewhere */
      maxout = snappy_max_compressed_length(neblock);
    }
    #endif /*  HAVE_SNAPPY */
    if (ntbytes+maxout > maxbytes) {
      maxout = maxbytes - ntbytes;   /* avoid buffer overrun */
      if (maxout <= 0) {
        return 0;                  /* non-compressible block */
      }
    }
    if (context->compcode == BLOSC_BLOSCLZ) {
      cbytes = blosclz_compress(context->clevel, _tmp+j*neblock, neblock,
                                dest, maxout, !dont_split);
    }
    #if defined(HAVE_LZ4)
    else if (context->compcode == BLOSC_LZ4) {
      cbytes = lz4_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
                                 (char *)dest, (size_t)maxout, accel);
    }
    else if (context->compcode == BLOSC_LZ4HC) {
      cbytes = lz4hc_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
                                   (char *)dest, (size_t)maxout,
                                   context->clevel);
    }
    #endif /* HAVE_LZ4 */
    #if defined(HAVE_SNAPPY)
    else if (context->compcode == BLOSC_SNAPPY) {
      cbytes = snappy_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
                                    (char *)dest, (size_t)maxout);
    }
    #endif /* HAVE_SNAPPY */
    #if defined(HAVE_ZLIB)
    else if (context->compcode == BLOSC_ZLIB) {
      cbytes = zlib_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
                                  (char *)dest, (size_t)maxout,
                                  context->clevel);
    }
    #endif /* HAVE_ZLIB */
    #if defined(HAVE_ZSTD)
    else if (context->compcode == BLOSC_ZSTD) {
      cbytes = zstd_wrap_compress((char*)_tmp + j * neblock, (size_t)neblock,
                                  (char*)dest, (size_t)maxout, context->clevel);
    }
    #endif /* HAVE_ZSTD */

    else {
      blosc_compcode_to_compname(context->compcode, &compname);
      if (compname == NULL) {
          compname = "(null)";
      }
      fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
      fprintf(stderr, "compression support.  Please use one having it.");
      return -5;    /* signals no compression support */
    }

    if (cbytes > maxout) {
      /* Buffer overrun caused by compression (should never happen) */
      return -1;
    }
    else if (cbytes < 0) {
      /* cbytes should never be negative */
      return -2;
    }
    else if (cbytes == 0 || cbytes == neblock) {
      /* The compressor has been unable to compress data at all. */
      /* Before doing the copy, check that we are not running into a
         buffer overflow. */
      if ((ntbytes+neblock) > maxbytes) {
        return 0;    /* Non-compressible data */
      }
      fastcopy(dest, _tmp + j * neblock, neblock);
      cbytes = neblock;
    }
    _sw32(dest - 4, cbytes);
    dest += cbytes;
    ntbytes += cbytes;
    ctbytes += cbytes;
  }  /* Closes j < nsplits */

  return ctbytes;
}

/* Decompress & unshuffle a single block */
static int blosc_d(struct blosc_context* context, int32_t blocksize,
                   int32_t leftoverblock, const uint8_t* base_src,
                   int32_t src_offset, uint8_t* dest, uint8_t* tmp,
                   uint8_t* tmp2) {
  int8_t header_flags = *(context->header_flags);
  int dont_split = (header_flags & 0x10) >> 4;
  int32_t j, neblock, nsplits;
  int32_t nbytes;                /* number of decompressed bytes in split */
  const int32_t compressedsize = context->compressedsize;
  int32_t cbytes;                /* number of compressed bytes in split */
  int32_t ctbytes = 0;           /* number of compressed bytes in block */
  int32_t ntbytes = 0;           /* number of uncompressed bytes in block */
  uint8_t *_tmp = dest;
  int32_t typesize = context->typesize;
  int bscount;
  int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
  int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
                      (blocksize >= typesize));
  const uint8_t* src;

  if (doshuffle || dobitshuffle) {
    _tmp = tmp;
  }

  /* The number of splits for this block */
  if (!dont_split &&
      /* For compatibility with before the introduction of the split flag */
      ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE) &&
      !leftoverblock) {
    nsplits = typesize;
  }
  else {
    nsplits = 1;
  }

  neblock = blocksize / nsplits;
  for (j = 0; j < nsplits; j++) {
    /* Validate src_offset */
    if (src_offset < 0 || src_offset > compressedsize - sizeof(int32_t)) {
      return -1;
    }
    cbytes = sw32_(base_src + src_offset); /* amount of compressed bytes */
    src_offset += sizeof(int32_t);
    /* Validate cbytes */
    if (cbytes < 0 || cbytes > context->compressedsize - src_offset) {
      return -1;
    }
    ctbytes += (int32_t)sizeof(int32_t);
    src = base_src + src_offset;
    /* Uncompress */
    if (cbytes == neblock) {
      fastcopy(_tmp, src, neblock);
      nbytes = neblock;
    }
    else {
      nbytes = context->decompress_func(src, cbytes, _tmp, neblock);
      /* Check that decompressed bytes number is correct */
      if (nbytes != neblock) {
        return -2;
      }
    }
    src_offset += cbytes;
    ctbytes += cbytes;
    _tmp += nbytes;
    ntbytes += nbytes;
  } /* Closes j < nsplits */

  if (doshuffle) {
    blosc_internal_unshuffle(typesize, blocksize, tmp, dest);
  }
  else if (dobitshuffle) {
    bscount = blosc_internal_bitunshuffle(typesize, blocksize, tmp, dest, tmp2);
    if (bscount < 0)
      return bscount;
  }

  /* Return the number of uncompressed bytes */
  return ntbytes;
}

/* Serial version for compression/decompression */
static int serial_blosc(struct blosc_context* context)
{
  int32_t j, bsize, leftoverblock;
  int32_t cbytes;

  int32_t ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
  int32_t ntbytes = context->num_output_bytes;

  uint8_t *tmp = my_malloc(context->blocksize + ebsize);
  uint8_t *tmp2 = tmp + context->blocksize;

  for (j = 0; j < context->nblocks; j++) {
    if (context->compress && !(*(context->header_flags) & BLOSC_MEMCPYED)) {
      _sw32(context->bstarts + j * 4, ntbytes);
    }
    bsize = context->blocksize;
    leftoverblock = 0;
    if ((j == context->nblocks - 1) && (context->leftover > 0)) {
      bsize = context->leftover;
      leftoverblock = 1;
    }
    if (context->compress) {
      if (*(context->header_flags) & BLOSC_MEMCPYED) {
        /* We want to memcpy only */
        fastcopy(context->dest + BLOSC_MAX_OVERHEAD + j * context->blocksize,
                 context->src + j * context->blocksize, bsize);
        cbytes = bsize;
      }
      else {
        /* Regular compression */
        cbytes = blosc_c(context, bsize, leftoverblock, ntbytes,
                         context->destsize, context->src+j*context->blocksize,
                         context->dest+ntbytes, tmp, tmp2);
        if (cbytes == 0) {
          ntbytes = 0;              /* incompressible data */
          break;
        }
      }
    }
    else {
      if (*(context->header_flags) & BLOSC_MEMCPYED) {
        /* We want to memcpy only */
        fastcopy(context->dest + j * context->blocksize,
                 context->src + BLOSC_MAX_OVERHEAD + j * context->blocksize, bsize);
        cbytes = bsize;
      }
      else {
        /* Regular decompression */
        cbytes = blosc_d(context, bsize, leftoverblock, context->src,
                         sw32_(context->bstarts + j * 4),
                         context->dest + j * context->blocksize, tmp, tmp2);
      }
    }
    if (cbytes < 0) {
      ntbytes = cbytes;         /* error in blosc_c or blosc_d */
      break;
    }
    ntbytes += cbytes;
  }

  /* Free temporaries */
  my_free(tmp);

  return ntbytes;
}


/* Threaded version for compression/decompression */
static int parallel_blosc(struct blosc_context* context)
{
  int rc;
  (void)rc;  // just to avoid 'unused-variable' warning

  /* Check whether we need to restart threads */
  if (blosc_set_nthreads_(context) < 0) {
    return -1;
  }

  /* Set sentinels */
  context->thread_giveup_code = 1;
  context->thread_nblock = -1;

  /* Synchronization point for all threads (wait for initialization) */
  WAIT_INIT(-1, context);

  /* Synchronization point for all threads (wait for finalization) */
  WAIT_FINISH(-1, context);

  if (context->thread_giveup_code > 0) {
    /* Return the total bytes (de-)compressed in threads */
    return context->num_output_bytes;
  }
  else {
    /* Compression/decompression gave up.  Return error code. */
    return context->thread_giveup_code;
  }
}


/* Do the compression or decompression of the buffer depending on the
   global params. */
static int do_job(struct blosc_context* context)
{
  int32_t ntbytes;

  /* Run the serial version when nthreads is 1 or when the buffers are
     not much larger than blocksize */
  if (context->numthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
    ntbytes = serial_blosc(context);
  }
  else {
    ntbytes = parallel_blosc(context);
  }

  return ntbytes;
}


/* Whether a codec is meant for High Compression Ratios */
#define HCR(codec) (  \
             ((codec) == BLOSC_LZ4HC) ||                  \
             ((codec) == BLOSC_ZLIB) ||                   \
             ((codec) == BLOSC_ZSTD) ? 1 : 0 )


/* Conditions for splitting a block before compressing with a codec. */
static int split_block(int compcode, int typesize, int blocksize) {
  int splitblock = -1;

  switch (g_splitmode) {
    case BLOSC_ALWAYS_SPLIT:
      splitblock = 1;
      break;
    case BLOSC_NEVER_SPLIT:
      splitblock = 0;
      break;
    case BLOSC_AUTO_SPLIT:
      /* Normally all the compressors designed for speed benefit from a
         split.  However, in conducted benchmarks LZ4 seems that it runs
         faster if we don't split, which is quite surprising. */
      splitblock= (((compcode == BLOSC_BLOSCLZ) ||
                    (compcode == BLOSC_SNAPPY)) &&
                   (typesize <= MAX_SPLITS) &&
                   (blocksize / typesize) >= MIN_BUFFERSIZE);
      break;
    case BLOSC_FORWARD_COMPAT_SPLIT:
      /* The zstd support was introduced at the same time than the split flag, so
       * there should be not a problem with not splitting bloscks with it */
      splitblock = ((compcode != BLOSC_ZSTD) &&
                    (typesize <= MAX_SPLITS) &&
                    (blocksize / typesize) >= MIN_BUFFERSIZE);
      break;
    default:
      fprintf(stderr, "Split mode %d not supported", g_splitmode);
  }
  return splitblock;
}


static int32_t compute_blocksize(struct blosc_context* context, int32_t clevel,
                                 int32_t typesize, int32_t nbytes,
                                 int32_t forced_blocksize)
{
  int32_t blocksize;

  /* Protection against very small buffers */
  if (nbytes < (int32_t)typesize) {
    return 1;
  }

  blocksize = nbytes;           /* Start by a whole buffer as blocksize */

  if (forced_blocksize) {
    blocksize = forced_blocksize;
    /* Check that forced blocksize is not too small */
    if (blocksize < MIN_BUFFERSIZE) {
      blocksize = MIN_BUFFERSIZE;
    }
    /* Check that forced blocksize is not too large */
    if (blocksize > BLOSC_MAX_BLOCKSIZE) {
      blocksize = BLOSC_MAX_BLOCKSIZE;
    }
  }
  else if (nbytes >= L1) {
    blocksize = L1;

    /* For HCR codecs, increase the block sizes by a factor of 2 because they
       are meant for compressing large blocks (i.e. they show a big overhead
       when compressing small ones). */
    if (HCR(context->compcode)) {
      blocksize *= 2;
    }

    switch (clevel) {
      case 0:
        /* Case of plain copy */
        blocksize /= 4;
        break;
      case 1:
        blocksize /= 2;
        break;
      case 2:
        blocksize *= 1;
        break;
      case 3:
        blocksize *= 2;
        break;
      case 4:
      case 5:
        blocksize *= 4;
        break;
      case 6:
      case 7:
      case 8:
        blocksize *= 8;
        break;
      case 9:
        blocksize *= 8;
        if (HCR(context->compcode)) {
          blocksize *= 2;
        }
        break;
      default:
        assert(0);
        break;
    }
  }

  /* Enlarge the blocksize for splittable codecs */
  if (clevel > 0 && split_block(context->compcode, typesize, blocksize)) {
    if (blocksize > (1 << 18)) {
      /* Do not use a too large split buffer (> 256 KB) for splitting codecs */
      blocksize = (1 << 18);
    }
    blocksize *= typesize;
    if (blocksize < (1 << 16)) {
      /* Do not use a too small blocksize (< 64 KB) when typesize is small */
      blocksize = (1 << 16);
    }
    if (blocksize > 1024 * 1024) {
      /* But do not exceed 1 MB per thread (having this capacity in L3 is normal in modern CPUs) */
      blocksize = 1024 * 1024;
    }

  }

  /* Check that blocksize is not too large */
  if (blocksize > (int32_t)nbytes) {
    blocksize = nbytes;
  }

  /* blocksize *must absolutely* be a multiple of the typesize */
  if (blocksize > typesize) {
    blocksize = blocksize / typesize * typesize;
  }

  return blocksize;
}

static int initialize_context_compression(struct blosc_context* context,
                          int clevel,
                          int doshuffle,
                          size_t typesize,
                          size_t sourcesize,
                          const void* src,
                          void* dest,
                          size_t destsize,
                          int32_t compressor,
                          int32_t blocksize,
                          int32_t numthreads)
{
  char *envvar = NULL;
  int warnlvl = 0;
  /* Set parameters */
  context->compress = 1;
  context->src = (const uint8_t*)src;
  context->dest = (uint8_t *)(dest);
  context->num_output_bytes = 0;
  context->destsize = (int32_t)destsize;
  context->sourcesize = sourcesize;
  context->typesize = typesize;
  context->compcode = compressor;
  context->numthreads = numthreads;
  context->end_threads = 0;
  context->clevel = clevel;

  envvar = getenv("BLOSC_WARN");
  if (envvar != NULL) {
    warnlvl = strtol(envvar, NULL, 10);
  }

  /* Check buffer size limits */
  if (sourcesize > BLOSC_MAX_BUFFERSIZE) {
    if (warnlvl > 0) {
      fprintf(stderr, "Input buffer size cannot exceed %d bytes\n",
              BLOSC_MAX_BUFFERSIZE);
    }
    return 0;
  }
  if (destsize < BLOSC_MAX_OVERHEAD) {
    if (warnlvl > 0) {
      fprintf(stderr, "Output buffer size should be larger than %d bytes\n",
              BLOSC_MAX_OVERHEAD);
    }
    return 0;
  }

  /* Compression level */
  if (clevel < 0 || clevel > 9) {
    fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
    return -10;
  }

  /* Shuffle */
  if (doshuffle != 0 && doshuffle != 1 && doshuffle != 2) {
    fprintf(stderr, "`shuffle` parameter must be either 0, 1 or 2!\n");
    return -10;
  }

  /* Check typesize limits */
  if (context->typesize > BLOSC_MAX_TYPESIZE) {
    /* If typesize is too large, treat buffer as an 1-byte stream. */
    context->typesize = 1;
  }

  /* Get the blocksize */
  context->blocksize = compute_blocksize(context, clevel, (int32_t)context->typesize, context->sourcesize, blocksize);

  /* Compute number of blocks in buffer */
  context->nblocks = context->sourcesize / context->blocksize;
  context->leftover = context->sourcesize % context->blocksize;
  context->nblocks = (context->leftover > 0) ? (context->nblocks + 1) : context->nblocks;

  return 1;
}


static int write_compression_header(struct blosc_context* context, int clevel, int doshuffle)
{
  int32_t compformat;
  int dont_split;

  /* Write version header for this block */
  context->dest[0] = BLOSC_VERSION_FORMAT;           /* blosc format version */

  /* Write compressor format */
  compformat = -1;
  switch (context->compcode)
  {
  case BLOSC_BLOSCLZ:
    compformat = BLOSC_BLOSCLZ_FORMAT;
    context->dest[1] = BLOSC_BLOSCLZ_VERSION_FORMAT; /* blosclz format version */
    break;

#if defined(HAVE_LZ4)
  case BLOSC_LZ4:
    compformat = BLOSC_LZ4_FORMAT;
    context->dest[1] = BLOSC_LZ4_VERSION_FORMAT;  /* lz4 format version */
    break;
  case BLOSC_LZ4HC:
    compformat = BLOSC_LZ4HC_FORMAT;
    context->dest[1] = BLOSC_LZ4HC_VERSION_FORMAT; /* lz4hc is the same as lz4 */
    break;
#endif /* HAVE_LZ4 */

#if defined(HAVE_SNAPPY)
  case BLOSC_SNAPPY:
    compformat = BLOSC_SNAPPY_FORMAT;
    context->dest[1] = BLOSC_SNAPPY_VERSION_FORMAT;    /* snappy format version */
    break;
#endif /* HAVE_SNAPPY */

#if defined(HAVE_ZLIB)
  case BLOSC_ZLIB:
    compformat = BLOSC_ZLIB_FORMAT;
    context->dest[1] = BLOSC_ZLIB_VERSION_FORMAT;      /* zlib format version */
    break;
#endif /* HAVE_ZLIB */

#if defined(HAVE_ZSTD)
  case BLOSC_ZSTD:
    compformat = BLOSC_ZSTD_FORMAT;
    context->dest[1] = BLOSC_ZSTD_VERSION_FORMAT;      /* zstd format version */
    break;
#endif /* HAVE_ZSTD */

  default:
  {
    const char *compname;
    compname = clibcode_to_clibname(compformat);
    if (compname == NULL) {
        compname = "(null)";
    }
    fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
    fprintf(stderr, "compression support.  Please use one having it.");
    return -5;    /* signals no compression support */
    break;
  }
  }

  context->header_flags = context->dest+2;  /* flags */
  context->dest[2] = 0;  /* zeroes flags */
  context->dest[3] = (uint8_t)context->typesize;  /* type size */
  _sw32(context->dest + 4, context->sourcesize);  /* size of the buffer */
  _sw32(context->dest + 8, context->blocksize);  /* block size */
  context->bstarts = context->dest + 16;  /* starts for every block */
  context->num_output_bytes = 16 + sizeof(int32_t)*context->nblocks;  /* space for header and pointers */

  if (context->clevel == 0) {
    /* Compression level 0 means buffer to be memcpy'ed */
    *(context->header_flags) |= BLOSC_MEMCPYED;
    context->num_output_bytes = 16;      /* space just for header */
  }

  if (context->sourcesize < MIN_BUFFERSIZE) {
    /* Buffer is too small.  Try memcpy'ing. */
    *(context->header_flags) |= BLOSC_MEMCPYED;
    context->num_output_bytes = 16;      /* space just for header */
  }

  if (doshuffle == BLOSC_SHUFFLE) {
    /* Byte-shuffle is active */
    *(context->header_flags) |= BLOSC_DOSHUFFLE;     /* bit 0 set to one in flags */
  }

  if (doshuffle == BLOSC_BITSHUFFLE) {
    /* Bit-shuffle is active */
    *(context->header_flags) |= BLOSC_DOBITSHUFFLE;  /* bit 2 set to one in flags */
  }

  dont_split = !split_block(context->compcode, context->typesize,
                            context->blocksize);
  *(context->header_flags) |= dont_split << 4;  /* dont_split is in bit 4 */
  *(context->header_flags) |= compformat << 5;  /* compressor format starts at bit 5 */

  return 1;
}


int blosc_compress_context(struct blosc_context* context)
{
  int32_t ntbytes = 0;

  if ((*(context->header_flags) & BLOSC_MEMCPYED) &&
      (context->sourcesize + BLOSC_MAX_OVERHEAD > context->destsize)) {
    return 0;   /* data cannot be copied without overrun destination */
  }

  /* Do the actual compression */
  ntbytes = do_job(context);
  if (ntbytes < 0) {
    return -1;
  }
  if ((ntbytes == 0) && (context->sourcesize + BLOSC_MAX_OVERHEAD <= context->destsize)) {
    /* Last chance for fitting `src` buffer in `dest`.  Update flags and force a copy. */
    *(context->header_flags) |= BLOSC_MEMCPYED;
    context->num_output_bytes = BLOSC_MAX_OVERHEAD;  /* reset the output bytes in previous step */
    ntbytes = do_job(context);
    if (ntbytes < 0) {
      return -1;
    }
  }

  /* Set the number of compressed bytes in header */
  _sw32(context->dest + 12, ntbytes);

  assert(ntbytes <= context->destsize);
  return ntbytes;
}

/* The public routine for compression with context. */
int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize,
                       size_t nbytes, const void* src, void* dest,
                       size_t destsize, const char* compressor,
                       size_t blocksize, int numinternalthreads)
{
  int error, result;
  struct blosc_context context;

  context.threads_started = 0;
  error = initialize_context_compression(&context, clevel, doshuffle, typesize,
					 nbytes, src, dest, destsize,
					 blosc_compname_to_compcode(compressor),
					 blocksize, numinternalthreads);
  if (error <= 0) { return error; }

  error = write_compression_header(&context, clevel, doshuffle);
  if (error <= 0) { return error; }

  result = blosc_compress_context(&context);

  if (numinternalthreads > 1)
  {
    blosc_release_threadpool(&context);
  }

  return result;
}

/* The public routine for compression.  See blosc.h for docstrings. */
int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
                   const void *src, void *dest, size_t destsize)
{
  int result;
  char* envvar;

  /* Check if should initialize */
  if (!g_initlib) blosc_init();

  /* Check for environment variables */
  envvar = getenv("BLOSC_CLEVEL");
  if (envvar != NULL) {
    long value;
    value = strtol(envvar, NULL, 10);
    if ((value != EINVAL) && (value >= 0)) {
      clevel = (int)value;
    }
  }

  envvar = getenv("BLOSC_SHUFFLE");
  if (envvar != NULL) {
    if (strcmp(envvar, "NOSHUFFLE") == 0) {
      doshuffle = BLOSC_NOSHUFFLE;
    }
    if (strcmp(envvar, "SHUFFLE") == 0) {
      doshuffle = BLOSC_SHUFFLE;
    }
    if (strcmp(envvar, "BITSHUFFLE") == 0) {
      doshuffle = BLOSC_BITSHUFFLE;
    }
  }

  envvar = getenv("BLOSC_TYPESIZE");
  if (envvar != NULL) {
    long value;
    value = strtol(envvar, NULL, 10);
    if ((value != EINVAL) && (value > 0)) {
      typesize = (int)value;
    }
  }

  envvar = getenv("BLOSC_COMPRESSOR");
  if (envvar != NULL) {
    result = blosc_set_compressor(envvar);
    if (result < 0) { return result; }
  }

  envvar = getenv("BLOSC_BLOCKSIZE");
  if (envvar != NULL) {
    long blocksize;
    blocksize = strtol(envvar, NULL, 10);
    if ((blocksize != EINVAL) && (blocksize > 0)) {
      blosc_set_blocksize((size_t)blocksize);
    }
  }

  envvar = getenv("BLOSC_NTHREADS");
  if (envvar != NULL) {
    long nthreads;
    nthreads = strtol(envvar, NULL, 10);
    if ((nthreads != EINVAL) && (nthreads > 0)) {
      result = blosc_set_nthreads((int)nthreads);
      if (result < 0) { return result; }
    }
  }

  envvar = getenv("BLOSC_SPLITMODE");
  if (envvar != NULL) {
    if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
      blosc_set_splitmode(BLOSC_FORWARD_COMPAT_SPLIT);
    }
    else if (strcmp(envvar, "AUTO") == 0) {
      blosc_set_splitmode(BLOSC_AUTO_SPLIT);
    }
    else if (strcmp(envvar, "ALWAYS") == 0) {
      blosc_set_splitmode(BLOSC_ALWAYS_SPLIT);
    }
    else if (strcmp(envvar, "NEVER") == 0) {
      blosc_set_splitmode(BLOSC_NEVER_SPLIT);
    }
    else {
      fprintf(stderr, "BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
      return -1;
    }
  }

  /* Check for a BLOSC_NOLOCK environment variable.  It is important
     that this should be the last env var so that it can take the
     previous ones into account */
  envvar = getenv("BLOSC_NOLOCK");
  if (envvar != NULL) {
    const char *compname;
    blosc_compcode_to_compname(g_compressor, &compname);
    result = blosc_compress_ctx(clevel, doshuffle, typesize,
				nbytes, src, dest, destsize,
				compname, g_force_blocksize, g_threads);
    return result;
  }

  pthread_mutex_lock(global_comp_mutex);

  do {
    result = initialize_context_compression(g_global_context, clevel, doshuffle,
                                           typesize, nbytes, src, dest, destsize,
                                           g_compressor, g_force_blocksize,
                                           g_threads);
    if (result <= 0) { break; }

    result = write_compression_header(g_global_context, clevel, doshuffle);
    if (result <= 0) { break; }

    result = blosc_compress_context(g_global_context);
  } while (0);

  pthread_mutex_unlock(global_comp_mutex);

  return result;
}

static int blosc_run_decompression_with_context(struct blosc_context* context,
                                                const void* src,
                                                void* dest,
                                                size_t destsize,
                                                int numinternalthreads)
{
  uint8_t version;
  int32_t ntbytes;

  context->compress = 0;
  context->src = (const uint8_t*)src;
  context->dest = (uint8_t*)dest;
  context->destsize = destsize;
  context->num_output_bytes = 0;
  context->numthreads = numinternalthreads;
  context->end_threads = 0;

  /* Read the header block */
  version = context->src[0];                        /* blosc format version */
  context->compversion = context->src[1];

  context->header_flags = (uint8_t*)(context->src + 2);           /* flags */
  context->typesize = (int32_t)context->src[3];      /* typesize */
  context->sourcesize = sw32_(context->src + 4);     /* buffer size */
  context->blocksize = sw32_(context->src + 8);      /* block size */
  context->compressedsize = sw32_(context->src + 12); /* compressed buffer size */
  context->bstarts = (uint8_t*)(context->src + 16);

  if (context->sourcesize == 0) {
    /* Source buffer was empty, so we are done */
    return 0;
  }

  if (context->blocksize <= 0 || context->blocksize > destsize ||
      context->blocksize > BLOSC_MAX_BLOCKSIZE || context->typesize <= 0 ||
      context->typesize > BLOSC_MAX_TYPESIZE) {
    return -1;
  }

  if (version != BLOSC_VERSION_FORMAT) {
    /* Version from future */
    return -1;
  }
  if (*context->header_flags & 0x08) {
    /* compressor flags from the future */
    return -1;
  }

  /* Compute some params */
  /* Total blocks */
  context->nblocks = context->sourcesize / context->blocksize;
  context->leftover = context->sourcesize % context->blocksize;
  context->nblocks = (context->leftover>0)? context->nblocks+1: context->nblocks;

  /* Check that we have enough space to decompress */
  if (context->sourcesize > (int32_t)destsize) {
    return -1;
  }

  if (*(context->header_flags) & BLOSC_MEMCPYED) {
    /* Validate that compressed size is equal to decompressed size + header
       size. */
    if (context->sourcesize + BLOSC_MAX_OVERHEAD != context->compressedsize) {
      return -1;
    }
  } else {
    ntbytes = initialize_decompress_func(context);
    if (ntbytes != 0) return ntbytes;

    /* Validate that compressed size is large enough to hold the bstarts array */
    if (context->nblocks > (context->compressedsize - 16) / 4) {
      return -1;
    }
  }

  /* Do the actual decompression */
  ntbytes = do_job(context);
  if (ntbytes < 0) {
    return -1;
  }

  assert(ntbytes <= (int32_t)destsize);
  return ntbytes;
}

int blosc_decompress_ctx(const void* src, void* dest, size_t destsize,
                         int numinternalthreads) {
  int result;
  struct blosc_context context;

  context.threads_started = 0;
  result = blosc_run_decompression_with_context(&context, src, dest, destsize,
                                                numinternalthreads);

  if (numinternalthreads > 1)
  {
    blosc_release_threadpool(&context);
  }

  return result;
}

int blosc_decompress(const void* src, void* dest, size_t destsize) {
  int result;
  char* envvar;
  long nthreads;

  /* Check if should initialize */
  if (!g_initlib) blosc_init();

  /* Check for a BLOSC_NTHREADS environment variable */
  envvar = getenv("BLOSC_NTHREADS");
  if (envvar != NULL) {
    nthreads = strtol(envvar, NULL, 10);
    if ((nthreads != EINVAL) && (nthreads > 0)) {
      result = blosc_set_nthreads((int)nthreads);
      if (result < 0) { return result; }
    }
  }

  /* Check for a BLOSC_NOLOCK environment variable.  It is important
     that this should be the last env var so that it can take the
     previous ones into account */
  envvar = getenv("BLOSC_NOLOCK");
  if (envvar != NULL) {
    result = blosc_decompress_ctx(src, dest, destsize, g_threads);
    return result;
  }

  pthread_mutex_lock(global_comp_mutex);

  result = blosc_run_decompression_with_context(g_global_context, src, dest,
                                                destsize, g_threads);

  pthread_mutex_unlock(global_comp_mutex);

  return result;
}

int blosc_getitem(const void* src, int start, int nitems, void* dest) {
  uint8_t *_src=NULL;               /* current pos for source buffer */
  uint8_t version, compversion;     /* versions for compressed header */
  uint8_t flags;                    /* flags for header */
  int32_t ntbytes = 0;              /* the number of uncompressed bytes */
  int32_t nblocks;                  /* number of total blocks in buffer */
  int32_t leftover;                 /* extra bytes at end of buffer */
  uint8_t *bstarts;                 /* start pointers for each block */
  int32_t typesize, blocksize, nbytes, compressedsize;
  int32_t j, bsize, bsize2, leftoverblock;
  int32_t cbytes, startb, stopb;
  int stop = start + nitems;
  uint8_t *tmp;
  uint8_t *tmp2;
  uint8_t *tmp3;
  int32_t ebsize;
  struct blosc_context context = {0};

  _src = (uint8_t *)(src);

  /* Read the header block */
  version = _src[0];                        /* blosc format version */
  compversion = _src[1];
  flags = _src[2];                          /* flags */
  typesize = (int32_t)_src[3];              /* typesize */
  nbytes = sw32_(_src + 4);                 /* buffer size */
  blocksize = sw32_(_src + 8);              /* block size */
  compressedsize = sw32_(_src + 12); /* compressed buffer size */

  if (version != BLOSC_VERSION_FORMAT)
    return -9;

  if (blocksize <= 0 || blocksize > nbytes || blocksize > BLOSC_MAX_BLOCKSIZE ||
      typesize <= 0 || typesize > BLOSC_MAX_TYPESIZE) {
    return -1;
  }

  /* Compute some params */
  /* Total blocks */
  nblocks = nbytes / blocksize;
  leftover = nbytes % blocksize;
  nblocks = (leftover>0)? nblocks+1: nblocks;

  /* Only initialize the fields blosc_d uses */
  context.typesize = typesize;
  context.header_flags = &flags;
  context.compversion = compversion;
  context.compressedsize = compressedsize;
  if (flags & BLOSC_MEMCPYED) {
    if (nbytes + BLOSC_MAX_OVERHEAD != compressedsize) {
      return -1;
    }
  } else {
    ntbytes = initialize_decompress_func(&context);
    if (ntbytes != 0) return ntbytes;

    if (nblocks >= (compressedsize - 16) / 4) {
      return -1;
    }
  }

  ebsize = blocksize + typesize * (int32_t)sizeof(int32_t);
  tmp = my_malloc(blocksize + ebsize + blocksize);
  tmp2 = tmp + blocksize;
  tmp3 = tmp + blocksize + ebsize;

  _src += 16;
  bstarts = _src;
  _src += sizeof(int32_t)*nblocks;

  /* Check region boundaries */
  if ((start < 0) || (start*typesize > nbytes)) {
    fprintf(stderr, "`start` out of bounds");
    return -1;
  }

  if ((stop < 0) || (stop*typesize > nbytes)) {
    fprintf(stderr, "`start`+`nitems` out of bounds");
    return -1;
  }

  for (j = 0; j < nblocks; j++) {
    bsize = blocksize;
    leftoverblock = 0;
    if ((j == nblocks - 1) && (leftover > 0)) {
      bsize = leftover;
      leftoverblock = 1;
    }

    /* Compute start & stop for each block */
    startb = start * typesize - j * blocksize;
    stopb = stop * typesize - j * blocksize;
    if ((startb >= (int)blocksize) || (stopb <= 0)) {
      continue;
    }
    if (startb < 0) {
      startb = 0;
    }
    if (stopb > (int)blocksize) {
      stopb = blocksize;
    }
    bsize2 = stopb - startb;

    /* Do the actual data copy */
    if (flags & BLOSC_MEMCPYED) {
      /* We want to memcpy only */
      fastcopy((uint8_t *) dest + ntbytes,
               (uint8_t *) src + BLOSC_MAX_OVERHEAD + j * blocksize + startb, bsize2);
      cbytes = bsize2;
    }
    else {
      /* Regular decompression.  Put results in tmp2. */
      cbytes = blosc_d(&context, bsize, leftoverblock,
                       (uint8_t *)src, sw32_(bstarts + j * 4),
                       tmp2, tmp, tmp3);
      if (cbytes < 0) {
        ntbytes = cbytes;
        break;
      }
      /* Copy to destination */
      fastcopy((uint8_t *) dest + ntbytes, tmp2 + startb, bsize2);
      cbytes = bsize2;
    }
    ntbytes += cbytes;
  }

  my_free(tmp);

  return ntbytes;
}

/* Decompress & unshuffle several blocks in a single thread */
static void *t_blosc(void *ctxt)
{
  struct thread_context* context = (struct thread_context*)ctxt;
  int32_t cbytes, ntdest;
  int32_t tblocks;              /* number of blocks per thread */
  int32_t leftover2;
  int32_t tblock;               /* limit block on a thread */
  int32_t nblock_;              /* private copy of nblock */
  int32_t bsize, leftoverblock;
  /* Parameters for threads */
  int32_t blocksize;
  int32_t ebsize;
  int32_t compress;
  int32_t maxbytes;
  int32_t ntbytes;
  int32_t flags;
  int32_t nblocks;
  int32_t leftover;
  uint8_t *bstarts;
  const uint8_t *src;
  uint8_t *dest;
  uint8_t *tmp;
  uint8_t *tmp2;
  uint8_t *tmp3;
  int rc;
  (void)rc;  // just to avoid 'unused-variable' warning

  while(1)
  {
    /* Synchronization point for all threads (wait for initialization) */
    WAIT_INIT(NULL, context->parent_context);

    if(context->parent_context->end_threads)
    {
      break;
    }

    /* Get parameters for this thread before entering the main loop */
    blocksize = context->parent_context->blocksize;
    ebsize = blocksize + context->parent_context->typesize * (int32_t)sizeof(int32_t);
    compress = context->parent_context->compress;
    flags = *(context->parent_context->header_flags);
    maxbytes = context->parent_context->destsize;
    nblocks = context->parent_context->nblocks;
    leftover = context->parent_context->leftover;
    bstarts = context->parent_context->bstarts;
    src = context->parent_context->src;
    dest = context->parent_context->dest;

    if (blocksize > context->tmpblocksize)
    {
      my_free(context->tmp);
      context->tmp = my_malloc(blocksize + ebsize + blocksize);
      context->tmp2 = context->tmp + blocksize;
      context->tmp3 = context->tmp + blocksize + ebsize;
    }

    tmp = context->tmp;
    tmp2 = context->tmp2;
    tmp3 = context->tmp3;

    ntbytes = 0;                /* only useful for decompression */

    if (compress && !(flags & BLOSC_MEMCPYED)) {
      /* Compression always has to follow the block order */
      pthread_mutex_lock(&context->parent_context->count_mutex);
      context->parent_context->thread_nblock++;
      nblock_ = context->parent_context->thread_nblock;
      pthread_mutex_unlock(&context->parent_context->count_mutex);
      tblock = nblocks;
    }
    else {
      /* Decompression can happen using any order.  We choose
       sequential block order on each thread */

      /* Blocks per thread */
      tblocks = nblocks / context->parent_context->numthreads;
      leftover2 = nblocks % context->parent_context->numthreads;
      tblocks = (leftover2>0)? tblocks+1: tblocks;

      nblock_ = context->tid*tblocks;
      tblock = nblock_ + tblocks;
      if (tblock > nblocks) {
        tblock = nblocks;
      }
    }

    /* Loop over blocks */
    leftoverblock = 0;
    while ((nblock_ < tblock) && context->parent_context->thread_giveup_code > 0) {
      bsize = blocksize;
      if (nblock_ == (nblocks - 1) && (leftover > 0)) {
        bsize = leftover;
        leftoverblock = 1;
      }
      if (compress) {
        if (flags & BLOSC_MEMCPYED) {
          /* We want to memcpy only */
          fastcopy(dest + BLOSC_MAX_OVERHEAD + nblock_ * blocksize,
                   src + nblock_ * blocksize, bsize);
          cbytes = bsize;
        }
        else {
          /* Regular compression */
          cbytes = blosc_c(context->parent_context, bsize, leftoverblock, 0, ebsize,
                           src+nblock_*blocksize, tmp2, tmp, tmp3);
        }
      }
      else {
        if (flags & BLOSC_MEMCPYED) {
          /* We want to memcpy only */
          fastcopy(dest + nblock_ * blocksize,
                   src + BLOSC_MAX_OVERHEAD + nblock_ * blocksize, bsize);
          cbytes = bsize;
        }
        else {
          cbytes = blosc_d(context->parent_context, bsize, leftoverblock,
                           src, sw32_(bstarts + nblock_ * 4),
                           dest+nblock_*blocksize,
                           tmp, tmp2);
        }
      }

      /* Check whether current thread has to giveup */
      if (context->parent_context->thread_giveup_code <= 0) {
        break;
      }

      /* Check results for the compressed/decompressed block */
      if (cbytes < 0) {            /* compr/decompr failure */
        /* Set giveup_code error */
        pthread_mutex_lock(&context->parent_context->count_mutex);
        context->parent_context->thread_giveup_code = cbytes;
        pthread_mutex_unlock(&context->parent_context->count_mutex);
        break;
      }

      if (compress && !(flags & BLOSC_MEMCPYED)) {
        /* Start critical section */
        pthread_mutex_lock(&context->parent_context->count_mutex);
        ntdest = context->parent_context->num_output_bytes;
        _sw32(bstarts + nblock_ * 4, ntdest); /* update block start counter */
        if ( (cbytes == 0) || (ntdest+cbytes > maxbytes) ) {
          context->parent_context->thread_giveup_code = 0;  /* incompressible buffer */
          pthread_mutex_unlock(&context->parent_context->count_mutex);
          break;
        }
        context->parent_context->thread_nblock++;
        nblock_ = context->parent_context->thread_nblock;
        context->parent_context->num_output_bytes += cbytes;           /* update return bytes counter */
        pthread_mutex_unlock(&context->parent_context->count_mutex);
        /* End of critical section */

        /* Copy the compressed buffer to destination */
        fastcopy(dest + ntdest, tmp2, cbytes);
      }
      else {
        nblock_++;
        /* Update counter for this thread */
        ntbytes += cbytes;
      }

    } /* closes while (nblock_) */

    /* Sum up all the bytes decompressed */
    if ((!compress || (flags & BLOSC_MEMCPYED)) && context->parent_context->thread_giveup_code > 0) {
      /* Update global counter for all threads (decompression only) */
      pthread_mutex_lock(&context->parent_context->count_mutex);
      context->parent_context->num_output_bytes += ntbytes;
      pthread_mutex_unlock(&context->parent_context->count_mutex);
    }

    /* Meeting point for all threads (wait for finalization) */
    WAIT_FINISH(NULL, context->parent_context);
  }

  /* Cleanup our working space and context */
  my_free(context->tmp);
  my_free(context);

  return(NULL);
}


static int init_threads(struct blosc_context* context)
{
  int32_t tid;
  int rc2;
  int32_t ebsize;
  struct thread_context* thread_context;

  /* Initialize mutex and condition variable objects */
  pthread_mutex_init(&context->count_mutex, NULL);

  /* Set context thread sentinels */
  context->thread_giveup_code = 1;
  context->thread_nblock = -1;

  /* Barrier initialization */
#ifdef _POSIX_BARRIERS_MINE
  pthread_barrier_init(&context->barr_init, NULL, context->numthreads+1);
  pthread_barrier_init(&context->barr_finish, NULL, context->numthreads+1);
#else
  pthread_mutex_init(&context->count_threads_mutex, NULL);
  pthread_cond_init(&context->count_threads_cv, NULL);
  context->count_threads = 0;      /* Reset threads counter */
#endif

#if !defined(_WIN32)
  /* Initialize and set thread detached attribute */
  pthread_attr_init(&context->ct_attr);
  pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
#endif

  /* Finally, create the threads in detached state */
  for (tid = 0; tid < context->numthreads; tid++) {
    context->tids[tid] = tid;

    /* Create a thread context thread owns context (will destroy when finished) */
    thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
    thread_context->parent_context = context;
    thread_context->tid = tid;

    ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
    thread_context->tmp = my_malloc(context->blocksize + ebsize + context->blocksize);
    thread_context->tmp2 = thread_context->tmp + context->blocksize;
    thread_context->tmp3 = thread_context->tmp + context->blocksize + ebsize;
    thread_context->tmpblocksize = context->blocksize;

#if !defined(_WIN32)
    rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc, (void *)thread_context);
#else
    rc2 = pthread_create(&context->threads[tid], NULL, t_blosc, (void *)thread_context);
#endif
    if (rc2) {
      fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc2);
      fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
      return(-1);
    }
  }


  return(0);
}

int blosc_get_nthreads(void)
{
  int ret = g_threads;

  return ret;
}

int blosc_set_nthreads(int nthreads_new)
{
  int ret = g_threads;

  /* Check if should initialize */
  if (!g_initlib) blosc_init();

  if (nthreads_new != ret){
    /* Re-initialize Blosc */
    blosc_destroy();
    blosc_init();
    g_threads = nthreads_new;
  }

  return ret;
}

int blosc_set_nthreads_(struct blosc_context* context)
{
  if (context->numthreads > BLOSC_MAX_THREADS) {
    fprintf(stderr,
            "Error.  nthreads cannot be larger than BLOSC_MAX_THREADS (%d)",
            BLOSC_MAX_THREADS);
    return -1;
  }
  else if (context->numthreads <= 0) {
    fprintf(stderr, "Error.  nthreads must be a positive integer");
    return -1;
  }

  /* Launch a new pool of threads */
  if (context->numthreads > 1 && context->numthreads != context->threads_started) {
    blosc_release_threadpool(context);
    if (init_threads(context) < 0) {
      return -1;
    }
  }

  /* We have now started the threads */
  context->threads_started = context->numthreads;

  return context->numthreads;
}

const char* blosc_get_compressor(void)
{
  const char* compname;
  blosc_compcode_to_compname(g_compressor, &compname);

  return compname;
}

int blosc_set_compressor(const char *compname)
{
  int code = blosc_compname_to_compcode(compname);

  g_compressor = code;

  /* Check if should initialize */
  if (!g_initlib) blosc_init();

  return code;
}

const char* blosc_list_compressors(void)
{
  static int compressors_list_done = 0;
  static char ret[256];

  if (compressors_list_done) return ret;
  ret[0] = '\0';
  strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
#if defined(HAVE_LZ4)
  strcat(ret, ","); strcat(ret, BLOSC_LZ4_COMPNAME);
  strcat(ret, ","); strcat(ret, BLOSC_LZ4HC_COMPNAME);
#endif /* HAVE_LZ4 */
#if defined(HAVE_SNAPPY)
  strcat(ret, ","); strcat(ret, BLOSC_SNAPPY_COMPNAME);
#endif /* HAVE_SNAPPY */
#if defined(HAVE_ZLIB)
  strcat(ret, ","); strcat(ret, BLOSC_ZLIB_COMPNAME);
#endif /* HAVE_ZLIB */
#if defined(HAVE_ZSTD)
  strcat(ret, ","); strcat(ret, BLOSC_ZSTD_COMPNAME);
#endif /* HAVE_ZSTD */
  compressors_list_done = 1;
  return ret;
}

const char* blosc_get_version_string(void)
{
  return BLOSC_VERSION_STRING;
}

int blosc_get_complib_info(const char *compname, char **complib, char **version)
{
  int clibcode;
  const char *clibname;
  const char *clibversion = "unknown";

#if (defined(HAVE_LZ4) && defined(LZ4_VERSION_MAJOR)) || (defined(HAVE_SNAPPY) && defined(SNAPPY_VERSION)) || defined(ZSTD_VERSION_MAJOR)
  char sbuffer[256];
#endif

  clibcode = compname_to_clibcode(compname);
  clibname = clibcode_to_clibname(clibcode);

  /* complib version */
  if (clibcode == BLOSC_BLOSCLZ_LIB) {
    clibversion = BLOSCLZ_VERSION_STRING;
  }
#if defined(HAVE_LZ4)
  else if (clibcode == BLOSC_LZ4_LIB) {
#if defined(LZ4_VERSION_MAJOR)
    sprintf(sbuffer, "%d.%d.%d",
            LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
    clibversion = sbuffer;
#endif /* LZ4_VERSION_MAJOR */
  }
#endif /* HAVE_LZ4 */
#if defined(HAVE_SNAPPY)
  else if (clibcode == BLOSC_SNAPPY_LIB) {
#if defined(SNAPPY_VERSION)
    sprintf(sbuffer, "%d.%d.%d", SNAPPY_MAJOR, SNAPPY_MINOR, SNAPPY_PATCHLEVEL);
    clibversion = sbuffer;
#endif /* SNAPPY_VERSION */
  }
#endif /* HAVE_SNAPPY */
#if defined(HAVE_ZLIB)
  else if (clibcode == BLOSC_ZLIB_LIB) {
    clibversion = ZLIB_VERSION;
  }
#endif /* HAVE_ZLIB */
#if defined(HAVE_ZSTD)
  else if (clibcode == BLOSC_ZSTD_LIB) {
    sprintf(sbuffer, "%d.%d.%d",
            ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE);
    clibversion = sbuffer;
  }
#endif /* HAVE_ZSTD */
  else {
    /* Unsupported library */
    if (complib != NULL) *complib = NULL;
    if (version != NULL) *version = NULL;
    return -1;
  }

  if (complib != NULL) *complib = strdup(clibname);
  if (version != NULL) *version = strdup(clibversion);

  return clibcode;
}

/* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
void blosc_cbuffer_sizes(const void *cbuffer, size_t *nbytes,
                         size_t *cbytes, size_t *blocksize)
{
  uint8_t *_src = (uint8_t *)(cbuffer);    /* current pos for source buffer */
  uint8_t version = _src[0];               /* version of header */

  if (version != BLOSC_VERSION_FORMAT) {
    *nbytes = *blocksize = *cbytes = 0;
    return;
  }

  /* Read the interesting values */
  *nbytes = (size_t)sw32_(_src + 4);       /* uncompressed buffer size */
  *blocksize = (size_t)sw32_(_src + 8);    /* block size */
  *cbytes = (size_t)sw32_(_src + 12);      /* compressed buffer size */
}

int blosc_cbuffer_validate(const void* cbuffer, size_t cbytes, size_t* nbytes) {
  size_t header_cbytes, header_blocksize;
  if (cbytes < BLOSC_MIN_HEADER_LENGTH) return -1;
  blosc_cbuffer_sizes(cbuffer, nbytes, &header_cbytes, &header_blocksize);
  if (header_cbytes != cbytes) return -1;
  if (*nbytes > BLOSC_MAX_BUFFERSIZE) return -1;
  return 0;
}

/* Return `typesize` and `flags` from a compressed buffer. */
void blosc_cbuffer_metainfo(const void *cbuffer, size_t *typesize,
                            int *flags)
{
  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */

  uint8_t version = _src[0];               /* version of header */

  if (version != BLOSC_VERSION_FORMAT) {
    *flags = *typesize = 0;
    return;
  }

  /* Read the interesting values */
  *flags = (int)_src[2] & 7;             /* first three flags */
  *typesize = (size_t)_src[3];           /* typesize */
}


/* Return version information from a compressed buffer. */
void blosc_cbuffer_versions(const void *cbuffer, int *version,
                            int *versionlz)
{
  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */

  /* Read the version info */
  *version = (int)_src[0];         /* blosc format version */
  *versionlz = (int)_src[1];       /* Lempel-Ziv compressor format version */
}


/* Return the compressor library/format used in a compressed buffer. */
const char *blosc_cbuffer_complib(const void *cbuffer)
{
  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
  int clibcode;
  const char *complib;

  /* Read the compressor format/library info */
  clibcode = (_src[2] & 0xe0) >> 5;
  complib = clibcode_to_clibname(clibcode);
  return complib;
}

/* Get the internal blocksize to be used during compression.  0 means
   that an automatic blocksize is computed internally. */
int blosc_get_blocksize(void)
{
  return (int)g_force_blocksize;
}

/* Force the use of a specific blocksize.  If 0, an automatic
   blocksize will be used (the default). */
void blosc_set_blocksize(size_t size)
{
  g_force_blocksize = (int32_t)size;
}

/* Force the use of a specific split mode. */
void blosc_set_splitmode(int mode)
{
  g_splitmode = mode;
}

/* Child global context is invalid and pool threads no longer exist post-fork.
 * Discard the old, inconsistent global context and global context mutex and
 * mark as uninitialized.  Subsequent calls through `blosc_*` interfaces will
 * trigger re-init of the global context.
 *
 * All pthread interfaces have undefined behavior in child handler in current
 * posix standards: https://pubs.opengroup.org/onlinepubs/9699919799/
 */
void blosc_atfork_child(void) {
  if (!g_initlib) return;

  g_initlib = 0;

  my_free(global_comp_mutex);
  global_comp_mutex = NULL;

  my_free(g_global_context);
  g_global_context = NULL;

}

void blosc_init(void)
{
  /* Return if we are already initialized */
  if (g_initlib) return;

  global_comp_mutex = (pthread_mutex_t*)my_malloc(sizeof(pthread_mutex_t));
  pthread_mutex_init(global_comp_mutex, NULL);

  g_global_context = (struct blosc_context*)my_malloc(sizeof(struct blosc_context));
  g_global_context->threads_started = 0;

  #if !defined(_WIN32)
  /* atfork handlers are only be registered once, though multiple re-inits may
   * occur via blosc_destroy/blosc_init.  */
  if (!g_atfork_registered) {
    g_atfork_registered = 1;
    pthread_atfork(NULL, NULL, &blosc_atfork_child);
  }
  #endif

  g_initlib = 1;
}

void blosc_destroy(void)
{
  /* Return if Blosc is not initialized */
  if (!g_initlib) return;

  g_initlib = 0;

  blosc_release_threadpool(g_global_context);
  my_free(g_global_context);
  g_global_context = NULL;

  pthread_mutex_destroy(global_comp_mutex);
  my_free(global_comp_mutex);
  global_comp_mutex = NULL;
}

int blosc_release_threadpool(struct blosc_context* context)
{
  int32_t t;
  void* status;
  int rc;
  int rc2;
  (void)rc;  // just to avoid 'unused-variable' warning

  if (context->threads_started > 0)
  {
    /* Tell all existing threads to finish */
    context->end_threads = 1;

    /* Sync threads */
    WAIT_INIT(-1, context);

    /* Join exiting threads */
    for (t=0; t<context->threads_started; t++) {
      rc2 = pthread_join(context->threads[t], &status);
      if (rc2) {
        fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2);
        fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
      }
    }

    /* Release mutex and condition variable objects */
    pthread_mutex_destroy(&context->count_mutex);

    /* Barriers */
  #ifdef _POSIX_BARRIERS_MINE
      pthread_barrier_destroy(&context->barr_init);
      pthread_barrier_destroy(&context->barr_finish);
  #else
      pthread_mutex_destroy(&context->count_threads_mutex);
      pthread_cond_destroy(&context->count_threads_cv);
  #endif

      /* Thread attributes */
  #if !defined(_WIN32)
      pthread_attr_destroy(&context->ct_attr);
  #endif

  }

  context->threads_started = 0;

  return 0;
}

int blosc_free_resources(void)
{
  /* Return if Blosc is not initialized */
  if (!g_initlib) return -1;

  return blosc_release_threadpool(g_global_context);
}