source: trunk/src/blosc.c @ 82

Revision 82, 26.1 KB checked in by faltet, 3 years ago (diff)

Modifications for properly including stdint-windows.h on Windows.

Line 
1/*********************************************************************
2  Blosc - Blocked Suffling and Compression Library
3
4  Author: Francesc Alted (faltet@pytables.org)
5  Creation date: 2009-05-20
6
7  See LICENSES/BLOSC.txt for details about copyright and rights to use.
8**********************************************************************/
9
10
11#include <stdlib.h>
12#include <stdio.h>
13#include <string.h>
14#include <sys/types.h>
15#include <sys/stat.h>
16#include <assert.h>
17#include <pthread.h>
18#include "blosc.h"
19#include "blosclz.h"
20#include "shuffle.h"
21
22#ifdef _WIN32
23  #include <windows.h>
24  #include "stdint-windows.h"
25#else
26  #include <stdint.h>
27  #include <unistd.h>
28  #include <inttypes.h>
29#endif  /* _WIN32 */
30
31
32
33/* Minimal buffer size to be compressed */
34#define MIN_BUFFERSIZE 128       /* Cannot be smaller than 66 */
35
36/* Maximum typesize before considering buffer as a stream of bytes. */
37#define MAX_TYPESIZE 255         /* Cannot be larger than 255 */
38
39/* The maximum number of splits in a block for compression */
40#define MAX_SPLITS 16            /* Cannot be larger than 128 */
41
42/* The maximum number of threads (for some static arrays) */
43#define MAX_THREADS 64
44
45/* The size of L1 cache.  32 KB is quite common nowadays. */
46#define L1 (32*1024)
47
48
49/* Global variables for main logic */
50int32_t init_temps_done = 0;    /* temporaries for compr/decompr initialized? */
51size_t force_blocksize = 0;     /* should we force the use of a blocksize? */
52
53/* Global variables for threads */
54int32_t nthreads = 1;            /* number of desired threads in pool */
55int32_t init_threads_done = 0;   /* pool of threads initialized? */
56int32_t end_threads = 0;         /* should exisiting threads end? */
57int32_t giveup;                  /* should (de-)compression give up? */
58int32_t nblock = -1;             /* block counter */
59pthread_t threads[MAX_THREADS];  /* opaque structure for threads */
60int32_t tids[MAX_THREADS];       /* ID per each thread */
61pthread_attr_t ct_attr;          /* creation time attributes for threads */
62
63/* Syncronization variables */
64pthread_mutex_t count_mutex;
65pthread_barrier_t barr_init;
66pthread_barrier_t barr_inter;
67pthread_barrier_t barr_finish;
68
69/* Structure for parameters in (de-)compression threads */
70struct thread_data {
71  size_t typesize;
72  size_t blocksize;
73  int32_t compress;
74  int32_t clevel;
75  int32_t shuffle;
76  int32_t ntbytes;
77  uint32_t nbytes;
78  uint32_t nblocks;
79  uint32_t leftover;
80  uint32_t *bstarts;             /* start pointers for each block */
81  uint8_t *src;
82  uint8_t *dest;
83  uint8_t *tmp[MAX_THREADS];
84  uint8_t *tmp2[MAX_THREADS];
85} params;
86
87
88/* Structure for parameters meant for keeping track of current temporaries */
89struct temp_data {
90  int32_t nthreads;
91  size_t typesize;
92  size_t blocksize;
93} current_temp;
94
95
96
97/* Shuffle & compress a single block */
98static int blosc_c(size_t blocksize, int32_t leftoverblock,
99                   uint32_t ntbytes, uint32_t nbytes,
100                   uint8_t *src, uint8_t *dest, uint8_t *tmp)
101{
102  size_t j, neblock, nsplits;
103  int32_t cbytes;                   /* number of compressed bytes in split */
104  int32_t ctbytes = 0;              /* number of compressed bytes in block */
105  int32_t maxout;
106  uint8_t *_tmp;
107  size_t typesize = params.typesize;
108
109  if (params.shuffle && (typesize > 1)) {
110    /* Shuffle this block (this makes sense only if typesize > 1) */
111    shuffle(typesize, blocksize, src, tmp);
112    _tmp = tmp;
113  }
114  else {
115    _tmp = src;
116  }
117
118  /* Compress for each shuffled slice split for this block. */
119  /* If typesize is too large, neblock is too small or we are in a
120     leftover block, do not split at all. */
121  if ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE &&
122      (!leftoverblock)) {
123    nsplits = typesize;
124  }
125  else {
126    nsplits = 1;
127  }
128  neblock = blocksize / nsplits;
129  for (j = 0; j < nsplits; j++) {
130    dest += sizeof(int32_t);
131    ntbytes += sizeof(int32_t);
132    ctbytes += sizeof(int32_t);
133    maxout = neblock;
134    if (ntbytes+maxout > nbytes) {
135      maxout = nbytes - ntbytes;   /* avoid buffer overrun */
136      if (maxout <= 0) {
137        return 0;                  /* non-compressible block */
138      }
139    }
140    cbytes = blosclz_compress(params.clevel, _tmp+j*neblock, neblock,
141                              dest, maxout-1);
142    if (cbytes >= maxout) {
143      /* Buffer overrun caused by blosclz_compress (should never happen) */
144      return -1;
145    }
146    else if (cbytes < 0) {
147      /* cbytes should never be negative */
148      return -2;
149    }
150    else if (cbytes == 0) {
151      /* The compressor has been unable to compress data significantly. */
152      /* Before doing the copy, check that we are not running into a
153         buffer overflow. */
154      if ((ntbytes+neblock) > nbytes) {
155        return 0;    /* Non-compressible data */
156      }
157      memcpy(dest, _tmp+j*neblock, neblock);
158      cbytes = neblock;
159    }
160    ((uint32_t *)(dest))[-1] = cbytes;
161    dest += cbytes;
162    ntbytes += cbytes;
163    ctbytes += cbytes;
164  }  /* Closes j < nsplits */
165
166  return ctbytes;
167}
168
169
170/* Decompress & unshuffle a single block */
171static int blosc_d(size_t blocksize, int32_t leftoverblock,
172                   uint8_t *src, uint8_t *dest, uint8_t *tmp, uint8_t *tmp2)
173{
174  int32_t j, neblock, nsplits;
175  int32_t nbytes;                /* number of decompressed bytes in split */
176  int32_t cbytes;                /* number of compressed bytes in split */
177  int32_t ctbytes = 0;           /* number of compressed bytes in block */
178  int32_t ntbytes = 0;           /* number of uncompressed bytes in block */
179  uint8_t *_tmp;
180  size_t typesize = params.typesize;
181  size_t shuffle = params.shuffle;
182
183  if (shuffle && (typesize > 1)) {
184    _tmp = tmp;
185  }
186  else {
187    _tmp = dest;
188  }
189
190  /* Compress for each shuffled slice split for this block. */
191  if ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE &&
192      (!leftoverblock)) {
193    nsplits = typesize;
194  }
195  else {
196    nsplits = 1;
197  }
198  neblock = blocksize / nsplits;
199  for (j = 0; j < nsplits; j++) {
200    cbytes = ((uint32_t *)(src))[0];   /* amount of compressed bytes */
201    src += sizeof(int32_t);
202    ctbytes += sizeof(int32_t);
203    /* Uncompress */
204    if (cbytes == neblock) {
205      memcpy(_tmp, src, neblock);
206      nbytes = neblock;
207    }
208    else {
209      nbytes = blosclz_decompress(src, cbytes, _tmp, neblock);
210      if (nbytes != neblock) {
211        return -2;
212      }
213    }
214    src += cbytes;
215    ctbytes += cbytes;
216    _tmp += nbytes;
217    ntbytes += nbytes;
218  } /* Closes j < nsplits */
219
220  if (shuffle && (typesize > 1)) {
221    if ((uintptr_t)dest % 16 == 0) {
222      /* 16-bytes aligned dest.  SSE2 unshuffle will work. */
223      unshuffle(typesize, blocksize, tmp, dest);
224    }
225    else {
226      /* dest is not aligned.  Use tmp2, which is aligned, and copy. */
227      unshuffle(typesize, blocksize, tmp, tmp2);
228      memcpy(dest, tmp2, blocksize);
229    }
230  }
231
232  /* Return the number of uncompressed bytes */
233  return ntbytes;
234}
235
236
237/* Serial version for compression/decompression */
238int serial_blosc(void)
239{
240  uint32_t j, bsize, leftoverblock;
241  int32_t cbytes;
242  int32_t compress = params.compress;
243  size_t blocksize = params.blocksize;
244  int32_t ntbytes = params.ntbytes;
245  int32_t nbytes = params.nbytes;
246  uint32_t nblocks = params.nblocks;
247  int32_t leftover = params.nbytes % params.blocksize;
248  uint32_t *bstarts = params.bstarts;
249  uint8_t *src = params.src;
250  uint8_t *dest = params.dest;
251  uint8_t *tmp = params.tmp[0];     /* tmp for thread 0 */
252  uint8_t *tmp2 = params.tmp2[0];   /* tmp2 for thread 0 */
253
254  for (j = 0; j < nblocks; j++) {
255    if (compress) {
256      bstarts[j] = ntbytes;
257    }
258    bsize = blocksize;
259    leftoverblock = 0;
260    if ((j == nblocks - 1) && (leftover > 0)) {
261      bsize = leftover;
262      leftoverblock = 1;
263    }
264    if (compress) {
265      cbytes = blosc_c(bsize, leftoverblock, ntbytes, nbytes,
266                       src+j*blocksize, dest+bstarts[j], tmp);
267      if (cbytes == 0) {
268        ntbytes = 0;              /* uncompressible data */
269        break;
270      }
271    }
272    else {
273      cbytes = blosc_d(bsize, leftoverblock,
274                       src+bstarts[j], dest+j*blocksize, tmp, tmp2);
275    }
276    if (cbytes < 0) {
277      ntbytes = cbytes;         /* error in blosc_c / blosc_d */
278      break;
279    }
280    ntbytes += cbytes;
281  }
282
283  /* Final check for ntbytes (only in compression mode) */
284  if (compress) {
285    if (ntbytes == nbytes) {
286      ntbytes = 0;               /* non-compressible data */
287    }
288    else if (ntbytes > nbytes) {
289      fprintf(stderr, "The impossible happened: buffer overflow!\n");
290      ntbytes = -5;               /* too large buffer */
291    }
292  }  /* Close j < nblocks */
293
294  return ntbytes;
295}
296
297
298/* Threaded version for compression/decompression */
299int parallel_blosc(void)
300{
301  int32_t rc;
302
303  /* Synchronization point for all threads (wait for initialization) */
304  rc = pthread_barrier_wait(&barr_init);
305  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
306    printf("Could not wait on barrier (init)\n");
307    exit(-1);
308  }
309  /* Synchronization point for all threads (wait for finalization) */
310  rc = pthread_barrier_wait(&barr_finish);
311  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
312    printf("Could not wait on barrier (finish)\n");
313    exit(-1);
314  }
315
316  /* Return the total bytes decompressed in threads */
317  return params.ntbytes;
318}
319
320
321/* Convenience functions for creating and releasing temporaries */
322void create_temporaries(void)
323{
324  int32_t tid;
325  size_t typesize = params.typesize;
326  size_t blocksize = params.blocksize;
327  /* Extended blocksize for temporary destination.  Extended blocksize
328   is only useful for compression in parallel mode, but it doesn't
329   hurt other modes either. */
330  size_t ebsize = blocksize + typesize*sizeof(int32_t);
331  uint8_t *tmp, *tmp2;
332
333  /* Create temporary area for each thread */
334  for (tid = 0; tid < nthreads; tid++) {
335#ifdef _WIN32
336    tmp = (uint8_t *)_aligned_malloc(blocksize, 16);
337    tmp2 = (uint8_t *)_aligned_malloc(ebsize, 16);
338#elif defined __APPLE__
339    /* Mac OS X guarantees 16-byte alignment in small allocs */
340    tmp = (uint8_t *)malloc(blocksize);
341    tmp2 = (uint8_t *)malloc(ebsize);
342#else
343    posix_memalign((void **)&tmp, 16, blocksize);
344    posix_memalign((void **)&tmp2, 16, ebsize);
345#endif  /* _WIN32 */
346    params.tmp[tid] = tmp;
347    params.tmp2[tid] = tmp2;
348  }
349
350  init_temps_done = 1;
351  /* Update params for current temporaries */
352  current_temp.nthreads = nthreads;
353  current_temp.typesize = typesize;
354  current_temp.blocksize = blocksize;
355
356}
357
358
359void release_temporaries(void)
360{
361  int32_t tid;
362  uint8_t *tmp, *tmp2;
363
364  /* Release buffers */
365  for (tid = 0; tid < nthreads; tid++) {
366    tmp = params.tmp[tid];
367    tmp2 = params.tmp2[tid];
368#ifdef _WIN32
369    _aligned_free(tmp);
370    _aligned_free(tmp2);
371#else
372    free(tmp);
373    free(tmp2);
374#endif  /* _WIN32 */
375  }
376
377  init_temps_done = 0;
378
379}
380
381
382/* Do the compression or decompression of the buffer depending on the
383   global params. */
384int do_job(void) {
385  int32_t ntbytes;
386
387  /* Initialize/reset temporaries if needed */
388  if (!init_temps_done) {
389    create_temporaries();
390  }
391  else if (current_temp.nthreads != nthreads ||
392           current_temp.typesize != params.typesize ||
393           current_temp.blocksize != params.blocksize) {
394    release_temporaries();
395    create_temporaries();
396  }
397
398  /* Run the serial version when nthreads is 1 or when the buffers are
399     not much larger than blocksize */
400  if (nthreads == 1 || (params.nbytes / params.blocksize) <= 1) {
401    ntbytes = serial_blosc();
402  }
403  else {
404    ntbytes = parallel_blosc();
405  }
406
407  return ntbytes;
408}
409
410
411size_t compute_blocksize(int32_t clevel, size_t typesize, size_t nbytes)
412{
413  size_t blocksize;
414
415  blocksize = nbytes;           /* Start by a whole buffer as blocksize */
416
417  if (force_blocksize) {
418    blocksize = force_blocksize;
419    /* Check that forced blocksize is not too small nor too large */
420    if (blocksize < MIN_BUFFERSIZE) {
421      blocksize = MIN_BUFFERSIZE;
422    }
423  }
424  else if (nbytes >= L1*typesize) {
425    blocksize = L1 * typesize;
426    if (clevel <= 3) {
427      blocksize /= 4;
428    }
429    else if (clevel <= 6) {
430      blocksize /= 2;
431    }
432    else if (clevel < 9) {
433      blocksize *= 1;
434    }
435    else {
436      blocksize *= 2;
437    }
438  }
439
440  /* Check that blocksize is not too large */
441  if (blocksize > nbytes) {
442    blocksize = nbytes;
443  }
444
445  /* blocksize must be a multiple of the typesize */
446  blocksize = blocksize / typesize * typesize;
447
448  return blocksize;
449}
450
451
452unsigned int blosc_compress(int clevel, int shuffle, size_t typesize,
453                            size_t nbytes, const void *src, void *dest)
454{
455  uint8_t *_dest=NULL;         /* current pos for destination buffer */
456  uint8_t *flags;              /* flags for header */
457  uint32_t nblocks;            /* number of total blocks in buffer */
458  uint32_t leftover;           /* extra bytes at end of buffer */
459  uint32_t *bstarts;           /* start pointers for each block */
460  size_t blocksize;            /* length of the block in bytes */
461  uint32_t ntbytes = 0;        /* the number of compressed bytes */
462  uint32_t *ntbytes_;          /* placeholder for bytes in output buffer */
463
464  /* Compression level */
465  if (clevel < 0 || clevel > 9) {
466    /* If clevel not in 0..9, print an error */
467    fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
468    return -10;
469  }
470  else if (clevel == 0) {
471    /* No compression wanted.  Just return without doing anything else. */
472    return 0;
473  }
474  else if (nbytes < MIN_BUFFERSIZE) {
475    /* Too little buffer.  Just return without doing anything else. */
476    return 0;
477  }
478
479  /* Shuffle */
480  if (shuffle != 0 && shuffle != 1) {
481    fprintf(stderr, "`shuffle` parameter must be either 0 or 1!\n");
482    return -10;
483  }
484
485  /* Get the blocksize */
486  blocksize = compute_blocksize(clevel, typesize, nbytes);
487
488  /* Compute number of blocks in buffer */
489  nblocks = nbytes / blocksize;
490  leftover = nbytes % blocksize;
491  nblocks = (leftover>0)? nblocks+1: nblocks;
492
493  /* Check typesize limits */
494  if (typesize > MAX_TYPESIZE) {
495    /* If typesize is too large, treat buffer as an 1-byte stream. */
496    typesize = 1;
497  }
498
499  _dest = (uint8_t *)(dest);
500  /* Write header for this block */
501  _dest[0] = BLOSC_VERSION_FORMAT;         /* blosc format version */
502  _dest[1] = BLOSCLZ_VERSION_FORMAT;       /* blosclz format version */
503  flags = _dest+2;                         /* flags */
504  _dest[2] = 0;                            /* zeroes flags */
505  _dest[3] = (uint8_t)typesize;            /* type size */
506  _dest += 4;
507  ntbytes += 4;
508  ((uint32_t *)_dest)[0] = nbytes;         /* size of the buffer */
509  ((uint32_t *)_dest)[1] = blocksize;      /* block size */
510  ntbytes_ = (uint32_t *)(_dest+8);        /* compressed buffer size */
511  _dest += sizeof(int32_t)*3;
512  ntbytes += sizeof(int32_t)*3;
513  bstarts = (uint32_t *)_dest;             /* starts for every block */
514  _dest += sizeof(int32_t)*nblocks;        /* book space for pointers to */
515  ntbytes += sizeof(int32_t)*nblocks;      /* every block in output */
516
517  if (shuffle == 1) {
518    /* Shuffle is active */
519    *flags |= 0x1;                         /* bit 0 set to one in flags */
520  }
521
522  /* Populate parameters for compression routines */
523  params.compress = 1;
524  params.clevel = clevel;
525  params.shuffle = shuffle;
526  params.typesize = typesize;
527  params.blocksize = blocksize;
528  params.ntbytes = ntbytes;
529  params.nbytes = nbytes;
530  params.nblocks = nblocks;
531  params.leftover = leftover;
532  params.bstarts = bstarts;
533  params.src = (uint8_t *)src;
534  params.dest = (uint8_t *)dest;
535
536  /* Do the actual compression */
537  ntbytes = do_job();
538  /* Set the number of compressed bytes in header */
539  *ntbytes_ = ntbytes;
540
541  assert((int32_t)ntbytes < (int32_t)nbytes);
542  return ntbytes;
543}
544
545
546unsigned int blosc_decompress(const void *src, void *dest, size_t dest_size)
547{
548  uint8_t *_src=NULL;            /* current pos for source buffer */
549  uint8_t *_dest=NULL;           /* current pos for destination buffer */
550  uint8_t version, versionlz;    /* versions for compressed header */
551  uint8_t flags;                 /* flags for header */
552  int32_t shuffle = 0;           /* do unshuffle? */
553  int32_t ntbytes;               /* the number of uncompressed bytes */
554  uint32_t nblocks;              /* number of total blocks in buffer */
555  uint32_t leftover;             /* extra bytes at end of buffer */
556  uint32_t *bstarts;             /* start pointers for each block */
557  uint32_t typesize, blocksize, nbytes, ctbytes;
558
559  _src = (uint8_t *)(src);
560  _dest = (uint8_t *)(dest);
561
562  /* Read the header block */
563  version = _src[0];                   /* blosc format version */
564  versionlz = _src[1];                 /* blosclz format version */
565  flags = _src[2];                     /* flags */
566  typesize = (uint32_t)_src[3];        /* typesize */
567  _src += 4;
568  nbytes = ((uint32_t *)_src)[0];      /* buffer size */
569  blocksize = ((uint32_t *)_src)[1];   /* block size */
570  ctbytes = ((uint32_t *)_src)[2];     /* compressed buffer size */
571  _src += sizeof(int32_t)*3;
572  bstarts = (uint32_t *)_src;
573  /* Compute some params */
574  /* Total blocks */
575  nblocks = nbytes / blocksize;
576  leftover = nbytes % blocksize;
577  nblocks = (leftover>0)? nblocks+1: nblocks;
578  _src += sizeof(int32_t)*nblocks;
579
580  /* Check zero typesizes.  From Blosc version format 2 on, this value
581   has been reserved for future use (most probably to indicate
582   uncompressible buffers). */
583  if ((version == 1) && (typesize == 0)) {
584    typesize = 256;             /* 0 means 256 in format version 1 */
585  }
586
587  if (nbytes > dest_size) {
588    /* This should never happen but just in case */
589    return -1;
590  }
591
592  if ((flags & 0x1) == 1) {
593    /* Input is shuffled.  Unshuffle it. */
594    shuffle = 1;
595  }
596
597  /* Populate parameters for decompression routines */
598  params.compress = 0;
599  params.clevel = 0;            /* specific for compression */
600  params.shuffle = shuffle;
601  params.typesize = typesize;
602  params.blocksize = blocksize;
603  params.ntbytes = 0;
604  params.nbytes = nbytes;
605  params.nblocks = nblocks;
606  params.leftover = leftover;
607  params.bstarts = bstarts;
608  params.src = (uint8_t *)src;
609  params.dest = (uint8_t *)dest;
610
611  /* Do the actual decompression */
612  ntbytes = do_job();
613
614  assert(ntbytes <= (int32_t)dest_size);
615  return ntbytes;
616}
617
618
619/* Decompress & unshuffle several blocks in a single thread */
620void *t_blosc(void *tids)
621{
622  int32_t tid = *(int32_t *)tids;
623  int32_t cbytes, ntdest;
624  uint32_t tblocks;              /* number of blocks per thread */
625  uint32_t leftover2;
626  uint32_t tblock;               /* limit block on a thread */
627  uint32_t nblock_;              /* private copy of nblock */
628  int32_t rc;
629  uint32_t bsize, leftoverblock;
630  /* Parameters for threads */
631  size_t blocksize;
632  size_t ebsize;
633  int32_t compress;
634  uint32_t nbytes;
635  uint32_t ntbytes;
636  uint32_t nblocks;
637  uint32_t leftover;
638  uint32_t *bstarts;
639  uint8_t *src;
640  uint8_t *dest;
641  uint8_t *tmp;
642  uint8_t *tmp2;
643
644  while (1) {
645    /* Meeting point for all threads (wait for initialization) */
646    rc = pthread_barrier_wait(&barr_init);
647    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
648      printf("Could not wait on barrier (init)\n");
649      exit(-1);
650    }
651    if (end_threads) {
652      return(0);
653    }
654
655    /* Get parameters for this thread */
656    blocksize = params.blocksize;
657    ebsize = blocksize + params.typesize*sizeof(int32_t);
658    compress = params.compress;
659    nbytes = params.nbytes;
660    ntbytes = params.ntbytes;
661    nblocks = params.nblocks;
662    leftover = params.leftover;
663    bstarts = params.bstarts;
664    src = params.src;
665    dest = params.dest;
666    tmp = params.tmp[tid];
667    tmp2 = params.tmp2[tid];
668
669    giveup = 0;
670    if (compress) {
671      /* Compression always has to follow the block order */
672      pthread_mutex_lock(&count_mutex);
673      nblock++;
674      nblock_ = nblock;
675      pthread_mutex_unlock(&count_mutex);
676      tblock = nblocks;
677    }
678    else {
679      /* Decompression can happen using any order.  We choose
680       sequential block order on each thread */
681
682      /* Blocks per thread */
683      tblocks = nblocks / nthreads;
684      leftover2 = nblocks % nthreads;
685      tblocks = (leftover2>0)? tblocks+1: tblocks;
686
687      nblock_ = tid*tblocks;
688      tblock = nblock_ + tblocks;
689      if (tblock > nblocks) {
690        tblock = nblocks;
691      }
692      ntbytes = 0;
693    }
694
695    /* Loop over blocks */
696    leftoverblock = 0;
697    while ((nblock_ < tblock) && !giveup) {
698      bsize = blocksize;
699      if (nblock_ == (nblocks - 1) && (leftover > 0)) {
700        bsize = leftover;
701        leftoverblock = 1;
702      }
703      if (compress) {
704        cbytes = blosc_c(bsize, leftoverblock, 0, ebsize,
705                         src+nblock_*blocksize, tmp2, tmp);
706      }
707      else {
708        cbytes = blosc_d(bsize, leftoverblock,
709                         src+bstarts[nblock_], dest+nblock_*blocksize,
710                         tmp, tmp2);
711      }
712
713      /* Check whether current thread has to giveup */
714      /* This is critical in order to not overwrite variables */
715      if (giveup != 0) {
716        break;
717      }
718
719      /* Check results for the decompressed block */
720      if (cbytes < 0) {            /* compr/decompr failure */
721        /* Set cbytes code error first */
722        pthread_mutex_lock(&count_mutex);
723        params.ntbytes = cbytes;
724        pthread_mutex_unlock(&count_mutex);
725        giveup = 1;                 /* give up (de-)compressing after */
726        break;
727      }
728
729      if (compress) {
730        /* Start critical section */
731        pthread_mutex_lock(&count_mutex);
732        if (cbytes == 0) {            /* uncompressible buffer */
733          params.ntbytes = 0;
734          pthread_mutex_unlock(&count_mutex);
735          giveup = 1;                 /* give up compressing */
736          break;
737        }
738        bstarts[nblock_] = params.ntbytes;   /* update block start counter */
739        ntdest = params.ntbytes;
740        if (ntdest+cbytes > (int32_t)nbytes) {   /* uncompressible buffer */
741          params.ntbytes = 0;
742          pthread_mutex_unlock(&count_mutex);
743          giveup = 1;
744          break;
745        }
746        nblock++;
747        nblock_ = nblock;
748        params.ntbytes += cbytes;       /* update return bytes counter */
749        pthread_mutex_unlock(&count_mutex);
750        /* End of critical section */
751
752        /* Copy the compressed buffer to destination */
753        memcpy(dest+ntdest, tmp2, cbytes);
754      }
755      else {
756        nblock_++;
757      }
758      /* Update counter for this thread */
759      ntbytes += cbytes;
760
761    } /* closes while (nblock_) */
762
763    if (!compress && !giveup) {
764      /* Update global counter for all threads (decompression only) */
765      pthread_mutex_lock(&count_mutex);
766      params.ntbytes += ntbytes;
767      pthread_mutex_unlock(&count_mutex);
768    }
769
770    /* Meeting point for all threads (wait for finalization) */
771    rc = pthread_barrier_wait(&barr_finish);
772    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
773      printf("Could not wait on barrier (finish)\n");
774      exit(-1);
775    }
776    /* Reset block counter */
777    nblock = -1;
778
779  }  /* closes while(1) */
780
781  /* This should never be reached, but anyway */
782  return(0);
783}
784
785
786int init_threads(void)
787{
788  int32_t tid, rc;
789
790  /* Initialize mutex and condition variable objects */
791  pthread_mutex_init(&count_mutex, NULL);
792
793  /* Barrier initialization */
794  pthread_barrier_init(&barr_init, NULL, nthreads+1);
795  pthread_barrier_init(&barr_inter, NULL, nthreads);
796  pthread_barrier_init(&barr_finish, NULL, nthreads+1);
797
798  /* Initialize and set thread detached attribute */
799  pthread_attr_init(&ct_attr);
800  pthread_attr_setdetachstate(&ct_attr, PTHREAD_CREATE_JOINABLE);
801
802  /* Finally, create the threads in detached state */
803  for (tid = 0; tid < nthreads; tid++) {
804    tids[tid] = tid;
805    rc = pthread_create(&threads[tid], &ct_attr, t_blosc, (void *)&tids[tid]);
806    if (rc) {
807      fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc);
808      fprintf(stderr, "\tError detail: %s\n", strerror(rc));
809      exit(-1);
810    }
811  }
812
813  init_threads_done = 1;                 /* Initialization done! */
814
815  return(0);
816}
817
818
819int blosc_set_nthreads(int nthreads_new)
820{
821  int32_t nthreads_old = nthreads;
822  int32_t t, rc;
823  void *status;
824
825  if (nthreads_new > MAX_THREADS) {
826    fprintf(stderr, "Error.  nthreads cannot be larger than MAX_THREADS (%d)",
827            MAX_THREADS);
828    return -1;
829  }
830  else if (nthreads_new <= 0) {
831    fprintf(stderr, "Error.  nthreads must be a positive integer");
832    return -1;
833  }
834  else if (nthreads_new != nthreads) {
835    if (nthreads > 1 && init_threads_done) {
836      /* Tell all existing threads to end */
837      end_threads = 1;
838      rc = pthread_barrier_wait(&barr_init);
839      if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
840        printf("Could not wait on barrier (init)\n");
841        exit(-1);
842      }
843      /* Join exiting threads */
844      for (t=0; t<nthreads; t++) {
845        rc = pthread_join(threads[t], &status);
846        if (rc) {
847          fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc);
848          fprintf(stderr, "\tError detail: %s\n", strerror(rc));
849          exit(-1);
850        }
851      }
852      init_threads_done = 0;
853      end_threads = 0;
854    }
855    nthreads = nthreads_new;
856    if (nthreads > 1) {
857      /* Launch a new pool of threads */
858      init_threads();
859    }
860    return nthreads_old;
861  }
862  return 0;
863}
864
865
866/* Free possible memory temporaries and thread resources */
867void blosc_free_resources(void)
868{
869  int32_t t, rc;
870  void *status;
871
872  /* Release temporaries */
873  if (init_temps_done) {
874    release_temporaries();
875  }
876
877  /* Finish the possible thread pool */
878  if (nthreads > 1 && init_threads_done) {
879    /* Tell all existing threads to end */
880    end_threads = 1;
881    rc = pthread_barrier_wait(&barr_init);
882    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
883      exit(-1);
884    }
885    /* Join exiting threads */
886    for (t=0; t<nthreads; t++) {
887      rc = pthread_join(threads[t], &status);
888      if (rc) {
889        fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc);
890        fprintf(stderr, "\tError detail: %s\n", strerror(rc));
891        exit(-1);
892      }
893    }
894
895    /* Release mutex and condition variable objects */
896    pthread_mutex_destroy(&count_mutex);
897
898    /* Barriers */
899    pthread_barrier_destroy(&barr_init);
900    pthread_barrier_destroy(&barr_inter);
901    pthread_barrier_destroy(&barr_finish);
902
903    /* Thread attributes */
904    pthread_attr_destroy(&ct_attr);
905
906    init_threads_done = 0;
907    end_threads = 0;
908  }
909}
910
911
912/* Force the use of a specific blocksize.  If 0, an automatic
913   blocksize will be used (the default). */
914void blosc_set_blocksize(size_t size)
915{
916  force_blocksize = size;
917}
918
Note: See TracBrowser for help on using the repository browser.