source: trunk/src/blosc.c @ 85

Revision 85, 26.2 KB checked in by faltet, 4 years ago (diff)

For optimization level 1, reduce the blocksize by 8 instead of 4. This gives significantly better decompression ratios at the expense of lower compression ratio and compression speed.

Line 
1/*********************************************************************
2  Blosc - Blocked Suffling and Compression Library
3
4  Author: Francesc Alted (faltet@pytables.org)
5  Creation date: 2009-05-20
6
7  See LICENSES/BLOSC.txt for details about copyright and rights to use.
8**********************************************************************/
9
10
11#include <stdlib.h>
12#include <stdio.h>
13#include <string.h>
14#include <sys/types.h>
15#include <sys/stat.h>
16#include <assert.h>
17#include <pthread.h>
18#include "blosc.h"
19#include "blosclz.h"
20#include "shuffle.h"
21
22#ifdef _WIN32
23  #include <windows.h>
24  #include "stdint-windows.h"
25#else
26  #include <stdint.h>
27  #include <unistd.h>
28  #include <inttypes.h>
29#endif  /* _WIN32 */
30
31
32
33/* Minimal buffer size to be compressed */
34#define MIN_BUFFERSIZE 128       /* Cannot be smaller than 66 */
35
36/* Maximum typesize before considering buffer as a stream of bytes. */
37#define MAX_TYPESIZE 255         /* Cannot be larger than 255 */
38
39/* The maximum number of splits in a block for compression */
40#define MAX_SPLITS 16            /* Cannot be larger than 128 */
41
42/* The maximum number of threads (for some static arrays) */
43#define MAX_THREADS 64
44
45/* The size of L1 cache.  32 KB is quite common nowadays. */
46#define L1 (32*1024)
47
48
49/* Global variables for main logic */
50int32_t init_temps_done = 0;    /* temporaries for compr/decompr initialized? */
51size_t force_blocksize = 0;     /* should we force the use of a blocksize? */
52
53/* Global variables for threads */
54int32_t nthreads = 1;            /* number of desired threads in pool */
55int32_t init_threads_done = 0;   /* pool of threads initialized? */
56int32_t end_threads = 0;         /* should exisiting threads end? */
57int32_t giveup;                  /* should (de-)compression give up? */
58int32_t nblock = -1;             /* block counter */
59pthread_t threads[MAX_THREADS];  /* opaque structure for threads */
60int32_t tids[MAX_THREADS];       /* ID per each thread */
61pthread_attr_t ct_attr;          /* creation time attributes for threads */
62
63/* Syncronization variables */
64pthread_mutex_t count_mutex;
65pthread_barrier_t barr_init;
66pthread_barrier_t barr_inter;
67pthread_barrier_t barr_finish;
68
69/* Structure for parameters in (de-)compression threads */
70struct thread_data {
71  size_t typesize;
72  size_t blocksize;
73  int32_t compress;
74  int32_t clevel;
75  int32_t shuffle;
76  int32_t ntbytes;
77  uint32_t nbytes;
78  uint32_t nblocks;
79  uint32_t leftover;
80  uint32_t *bstarts;             /* start pointers for each block */
81  uint8_t *src;
82  uint8_t *dest;
83  uint8_t *tmp[MAX_THREADS];
84  uint8_t *tmp2[MAX_THREADS];
85} params;
86
87
88/* Structure for parameters meant for keeping track of current temporaries */
89struct temp_data {
90  int32_t nthreads;
91  size_t typesize;
92  size_t blocksize;
93} current_temp;
94
95
96
97/* Shuffle & compress a single block */
98static int blosc_c(size_t blocksize, int32_t leftoverblock,
99                   uint32_t ntbytes, uint32_t nbytes,
100                   uint8_t *src, uint8_t *dest, uint8_t *tmp)
101{
102  size_t j, neblock, nsplits;
103  int32_t cbytes;                   /* number of compressed bytes in split */
104  int32_t ctbytes = 0;              /* number of compressed bytes in block */
105  int32_t maxout;
106  uint8_t *_tmp;
107  size_t typesize = params.typesize;
108
109  if (params.shuffle && (typesize > 1)) {
110    /* Shuffle this block (this makes sense only if typesize > 1) */
111    shuffle(typesize, blocksize, src, tmp);
112    _tmp = tmp;
113  }
114  else {
115    _tmp = src;
116  }
117
118  /* Compress for each shuffled slice split for this block. */
119  /* If typesize is too large, neblock is too small or we are in a
120     leftover block, do not split at all. */
121  if ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE &&
122      (!leftoverblock)) {
123    nsplits = typesize;
124  }
125  else {
126    nsplits = 1;
127  }
128  neblock = blocksize / nsplits;
129  for (j = 0; j < nsplits; j++) {
130    dest += sizeof(int32_t);
131    ntbytes += sizeof(int32_t);
132    ctbytes += sizeof(int32_t);
133    maxout = neblock;
134    if (ntbytes+maxout > nbytes) {
135      maxout = nbytes - ntbytes;   /* avoid buffer overrun */
136      if (maxout <= 0) {
137        return 0;                  /* non-compressible block */
138      }
139    }
140    cbytes = blosclz_compress(params.clevel, _tmp+j*neblock, neblock,
141                              dest, maxout-1);
142    if (cbytes >= maxout) {
143      /* Buffer overrun caused by blosclz_compress (should never happen) */
144      return -1;
145    }
146    else if (cbytes < 0) {
147      /* cbytes should never be negative */
148      return -2;
149    }
150    else if (cbytes == 0) {
151      /* The compressor has been unable to compress data significantly. */
152      /* Before doing the copy, check that we are not running into a
153         buffer overflow. */
154      if ((ntbytes+neblock) > nbytes) {
155        return 0;    /* Non-compressible data */
156      }
157      memcpy(dest, _tmp+j*neblock, neblock);
158      cbytes = neblock;
159    }
160    ((uint32_t *)(dest))[-1] = cbytes;
161    dest += cbytes;
162    ntbytes += cbytes;
163    ctbytes += cbytes;
164  }  /* Closes j < nsplits */
165
166  return ctbytes;
167}
168
169
170/* Decompress & unshuffle a single block */
171static int blosc_d(size_t blocksize, int32_t leftoverblock,
172                   uint8_t *src, uint8_t *dest, uint8_t *tmp, uint8_t *tmp2)
173{
174  int32_t j, neblock, nsplits;
175  int32_t nbytes;                /* number of decompressed bytes in split */
176  int32_t cbytes;                /* number of compressed bytes in split */
177  int32_t ctbytes = 0;           /* number of compressed bytes in block */
178  int32_t ntbytes = 0;           /* number of uncompressed bytes in block */
179  uint8_t *_tmp;
180  size_t typesize = params.typesize;
181  size_t shuffle = params.shuffle;
182
183  if (shuffle && (typesize > 1)) {
184    _tmp = tmp;
185  }
186  else {
187    _tmp = dest;
188  }
189
190  /* Compress for each shuffled slice split for this block. */
191  if ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE &&
192      (!leftoverblock)) {
193    nsplits = typesize;
194  }
195  else {
196    nsplits = 1;
197  }
198  neblock = blocksize / nsplits;
199  for (j = 0; j < nsplits; j++) {
200    cbytes = ((uint32_t *)(src))[0];   /* amount of compressed bytes */
201    src += sizeof(int32_t);
202    ctbytes += sizeof(int32_t);
203    /* Uncompress */
204    if (cbytes == neblock) {
205      memcpy(_tmp, src, neblock);
206      nbytes = neblock;
207    }
208    else {
209      nbytes = blosclz_decompress(src, cbytes, _tmp, neblock);
210      if (nbytes != neblock) {
211        return -2;
212      }
213    }
214    src += cbytes;
215    ctbytes += cbytes;
216    _tmp += nbytes;
217    ntbytes += nbytes;
218  } /* Closes j < nsplits */
219
220  if (shuffle && (typesize > 1)) {
221    if ((uintptr_t)dest % 16 == 0) {
222      /* 16-bytes aligned dest.  SSE2 unshuffle will work. */
223      unshuffle(typesize, blocksize, tmp, dest);
224    }
225    else {
226      /* dest is not aligned.  Use tmp2, which is aligned, and copy. */
227      unshuffle(typesize, blocksize, tmp, tmp2);
228      memcpy(dest, tmp2, blocksize);
229    }
230  }
231
232  /* Return the number of uncompressed bytes */
233  return ntbytes;
234}
235
236
237/* Serial version for compression/decompression */
238int serial_blosc(void)
239{
240  uint32_t j, bsize, leftoverblock;
241  int32_t cbytes;
242  int32_t compress = params.compress;
243  size_t blocksize = params.blocksize;
244  int32_t ntbytes = params.ntbytes;
245  int32_t nbytes = params.nbytes;
246  uint32_t nblocks = params.nblocks;
247  int32_t leftover = params.nbytes % params.blocksize;
248  uint32_t *bstarts = params.bstarts;
249  uint8_t *src = params.src;
250  uint8_t *dest = params.dest;
251  uint8_t *tmp = params.tmp[0];     /* tmp for thread 0 */
252  uint8_t *tmp2 = params.tmp2[0];   /* tmp2 for thread 0 */
253
254  for (j = 0; j < nblocks; j++) {
255    if (compress) {
256      bstarts[j] = ntbytes;
257    }
258    bsize = blocksize;
259    leftoverblock = 0;
260    if ((j == nblocks - 1) && (leftover > 0)) {
261      bsize = leftover;
262      leftoverblock = 1;
263    }
264    if (compress) {
265      cbytes = blosc_c(bsize, leftoverblock, ntbytes, nbytes,
266                       src+j*blocksize, dest+bstarts[j], tmp);
267      if (cbytes == 0) {
268        ntbytes = 0;              /* uncompressible data */
269        break;
270      }
271    }
272    else {
273      cbytes = blosc_d(bsize, leftoverblock,
274                       src+bstarts[j], dest+j*blocksize, tmp, tmp2);
275    }
276    if (cbytes < 0) {
277      ntbytes = cbytes;         /* error in blosc_c / blosc_d */
278      break;
279    }
280    ntbytes += cbytes;
281  }
282
283  /* Final check for ntbytes (only in compression mode) */
284  if (compress) {
285    if (ntbytes == nbytes) {
286      ntbytes = 0;               /* non-compressible data */
287    }
288    else if (ntbytes > nbytes) {
289      fprintf(stderr, "The impossible happened: buffer overflow!\n");
290      ntbytes = -5;               /* too large buffer */
291    }
292  }  /* Close j < nblocks */
293
294  return ntbytes;
295}
296
297
298/* Threaded version for compression/decompression */
299int parallel_blosc(void)
300{
301  int32_t rc;
302
303  /* Synchronization point for all threads (wait for initialization) */
304  rc = pthread_barrier_wait(&barr_init);
305  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
306    printf("Could not wait on barrier (init)\n");
307    exit(-1);
308  }
309  /* Synchronization point for all threads (wait for finalization) */
310  rc = pthread_barrier_wait(&barr_finish);
311  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
312    printf("Could not wait on barrier (finish)\n");
313    exit(-1);
314  }
315
316  /* Return the total bytes decompressed in threads */
317  return params.ntbytes;
318}
319
320
321/* Convenience functions for creating and releasing temporaries */
322void create_temporaries(void)
323{
324  int32_t tid;
325  size_t typesize = params.typesize;
326  size_t blocksize = params.blocksize;
327  /* Extended blocksize for temporary destination.  Extended blocksize
328   is only useful for compression in parallel mode, but it doesn't
329   hurt other modes either. */
330  size_t ebsize = blocksize + typesize*sizeof(int32_t);
331  uint8_t *tmp, *tmp2;
332
333  /* Create temporary area for each thread */
334  for (tid = 0; tid < nthreads; tid++) {
335#ifdef _WIN32
336    tmp = (uint8_t *)_aligned_malloc(blocksize, 16);
337    tmp2 = (uint8_t *)_aligned_malloc(ebsize, 16);
338#elif defined __APPLE__
339    /* Mac OS X guarantees 16-byte alignment in small allocs */
340    tmp = (uint8_t *)malloc(blocksize);
341    tmp2 = (uint8_t *)malloc(ebsize);
342#else
343    posix_memalign((void **)&tmp, 16, blocksize);
344    posix_memalign((void **)&tmp2, 16, ebsize);
345#endif  /* _WIN32 */
346    params.tmp[tid] = tmp;
347    params.tmp2[tid] = tmp2;
348  }
349
350  init_temps_done = 1;
351  /* Update params for current temporaries */
352  current_temp.nthreads = nthreads;
353  current_temp.typesize = typesize;
354  current_temp.blocksize = blocksize;
355
356}
357
358
359void release_temporaries(void)
360{
361  int32_t tid;
362  uint8_t *tmp, *tmp2;
363
364  /* Release buffers */
365  for (tid = 0; tid < nthreads; tid++) {
366    tmp = params.tmp[tid];
367    tmp2 = params.tmp2[tid];
368#ifdef _WIN32
369    _aligned_free(tmp);
370    _aligned_free(tmp2);
371#else
372    free(tmp);
373    free(tmp2);
374#endif  /* _WIN32 */
375  }
376
377  init_temps_done = 0;
378
379}
380
381
382/* Do the compression or decompression of the buffer depending on the
383   global params. */
384int do_job(void) {
385  int32_t ntbytes;
386
387  /* Initialize/reset temporaries if needed */
388  if (!init_temps_done) {
389    create_temporaries();
390  }
391  else if (current_temp.nthreads != nthreads ||
392           current_temp.typesize != params.typesize ||
393           current_temp.blocksize != params.blocksize) {
394    release_temporaries();
395    create_temporaries();
396  }
397
398  /* Run the serial version when nthreads is 1 or when the buffers are
399     not much larger than blocksize */
400  if (nthreads == 1 || (params.nbytes / params.blocksize) <= 1) {
401    ntbytes = serial_blosc();
402  }
403  else {
404    ntbytes = parallel_blosc();
405  }
406
407  return ntbytes;
408}
409
410
411size_t compute_blocksize(int32_t clevel, size_t typesize, size_t nbytes)
412{
413  size_t blocksize;
414
415  blocksize = nbytes;           /* Start by a whole buffer as blocksize */
416
417  if (force_blocksize) {
418    blocksize = force_blocksize;
419    /* Check that forced blocksize is not too small nor too large */
420    if (blocksize < MIN_BUFFERSIZE) {
421      blocksize = MIN_BUFFERSIZE;
422    }
423  }
424  else if (nbytes >= L1*typesize) {
425    blocksize = L1 * typesize;
426    if (clevel == 1) {
427      blocksize /= 8;
428    }
429    else if (clevel <= 3) {
430      blocksize /= 4;
431    }
432    else if (clevel <= 6) {
433      blocksize /= 2;
434    }
435    else if (clevel < 9) {
436      blocksize *= 1;
437    }
438    else {
439      blocksize *= 2;
440    }
441  }
442
443  /* Check that blocksize is not too large */
444  if (blocksize > nbytes) {
445    blocksize = nbytes;
446  }
447
448  /* blocksize must be a multiple of the typesize */
449  blocksize = blocksize / typesize * typesize;
450
451  return blocksize;
452}
453
454
455unsigned int blosc_compress(int clevel, int shuffle, size_t typesize,
456                            size_t nbytes, const void *src, void *dest)
457{
458  uint8_t *_dest=NULL;         /* current pos for destination buffer */
459  uint8_t *flags;              /* flags for header */
460  uint32_t nblocks;            /* number of total blocks in buffer */
461  uint32_t leftover;           /* extra bytes at end of buffer */
462  uint32_t *bstarts;           /* start pointers for each block */
463  size_t blocksize;            /* length of the block in bytes */
464  uint32_t ntbytes = 0;        /* the number of compressed bytes */
465  uint32_t *ntbytes_;          /* placeholder for bytes in output buffer */
466
467  /* Compression level */
468  if (clevel < 0 || clevel > 9) {
469    /* If clevel not in 0..9, print an error */
470    fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
471    return -10;
472  }
473  else if (clevel == 0) {
474    /* No compression wanted.  Just return without doing anything else. */
475    return 0;
476  }
477  else if (nbytes < MIN_BUFFERSIZE) {
478    /* Too little buffer.  Just return without doing anything else. */
479    return 0;
480  }
481
482  /* Shuffle */
483  if (shuffle != 0 && shuffle != 1) {
484    fprintf(stderr, "`shuffle` parameter must be either 0 or 1!\n");
485    return -10;
486  }
487
488  /* Get the blocksize */
489  blocksize = compute_blocksize(clevel, typesize, nbytes);
490
491  /* Compute number of blocks in buffer */
492  nblocks = nbytes / blocksize;
493  leftover = nbytes % blocksize;
494  nblocks = (leftover>0)? nblocks+1: nblocks;
495
496  /* Check typesize limits */
497  if (typesize > MAX_TYPESIZE) {
498    /* If typesize is too large, treat buffer as an 1-byte stream. */
499    typesize = 1;
500  }
501
502  _dest = (uint8_t *)(dest);
503  /* Write header for this block */
504  _dest[0] = BLOSC_VERSION_FORMAT;         /* blosc format version */
505  _dest[1] = BLOSCLZ_VERSION_FORMAT;       /* blosclz format version */
506  flags = _dest+2;                         /* flags */
507  _dest[2] = 0;                            /* zeroes flags */
508  _dest[3] = (uint8_t)typesize;            /* type size */
509  _dest += 4;
510  ntbytes += 4;
511  ((uint32_t *)_dest)[0] = nbytes;         /* size of the buffer */
512  ((uint32_t *)_dest)[1] = blocksize;      /* block size */
513  ntbytes_ = (uint32_t *)(_dest+8);        /* compressed buffer size */
514  _dest += sizeof(int32_t)*3;
515  ntbytes += sizeof(int32_t)*3;
516  bstarts = (uint32_t *)_dest;             /* starts for every block */
517  _dest += sizeof(int32_t)*nblocks;        /* book space for pointers to */
518  ntbytes += sizeof(int32_t)*nblocks;      /* every block in output */
519
520  if (shuffle == 1) {
521    /* Shuffle is active */
522    *flags |= 0x1;                         /* bit 0 set to one in flags */
523  }
524
525  /* Populate parameters for compression routines */
526  params.compress = 1;
527  params.clevel = clevel;
528  params.shuffle = shuffle;
529  params.typesize = typesize;
530  params.blocksize = blocksize;
531  params.ntbytes = ntbytes;
532  params.nbytes = nbytes;
533  params.nblocks = nblocks;
534  params.leftover = leftover;
535  params.bstarts = bstarts;
536  params.src = (uint8_t *)src;
537  params.dest = (uint8_t *)dest;
538
539  /* Do the actual compression */
540  ntbytes = do_job();
541  /* Set the number of compressed bytes in header */
542  *ntbytes_ = ntbytes;
543
544  assert((int32_t)ntbytes < (int32_t)nbytes);
545  return ntbytes;
546}
547
548
549unsigned int blosc_decompress(const void *src, void *dest, size_t dest_size)
550{
551  uint8_t *_src=NULL;            /* current pos for source buffer */
552  uint8_t *_dest=NULL;           /* current pos for destination buffer */
553  uint8_t version, versionlz;    /* versions for compressed header */
554  uint8_t flags;                 /* flags for header */
555  int32_t shuffle = 0;           /* do unshuffle? */
556  int32_t ntbytes;               /* the number of uncompressed bytes */
557  uint32_t nblocks;              /* number of total blocks in buffer */
558  uint32_t leftover;             /* extra bytes at end of buffer */
559  uint32_t *bstarts;             /* start pointers for each block */
560  uint32_t typesize, blocksize, nbytes, ctbytes;
561
562  _src = (uint8_t *)(src);
563  _dest = (uint8_t *)(dest);
564
565  /* Read the header block */
566  version = _src[0];                   /* blosc format version */
567  versionlz = _src[1];                 /* blosclz format version */
568  flags = _src[2];                     /* flags */
569  typesize = (uint32_t)_src[3];        /* typesize */
570  _src += 4;
571  nbytes = ((uint32_t *)_src)[0];      /* buffer size */
572  blocksize = ((uint32_t *)_src)[1];   /* block size */
573  ctbytes = ((uint32_t *)_src)[2];     /* compressed buffer size */
574  _src += sizeof(int32_t)*3;
575  bstarts = (uint32_t *)_src;
576  /* Compute some params */
577  /* Total blocks */
578  nblocks = nbytes / blocksize;
579  leftover = nbytes % blocksize;
580  nblocks = (leftover>0)? nblocks+1: nblocks;
581  _src += sizeof(int32_t)*nblocks;
582
583  /* Check zero typesizes.  From Blosc version format 2 on, this value
584   has been reserved for future use (most probably to indicate
585   uncompressible buffers). */
586  if ((version == 1) && (typesize == 0)) {
587    typesize = 256;             /* 0 means 256 in format version 1 */
588  }
589
590  if (nbytes > dest_size) {
591    /* This should never happen but just in case */
592    return -1;
593  }
594
595  if ((flags & 0x1) == 1) {
596    /* Input is shuffled.  Unshuffle it. */
597    shuffle = 1;
598  }
599
600  /* Populate parameters for decompression routines */
601  params.compress = 0;
602  params.clevel = 0;            /* specific for compression */
603  params.shuffle = shuffle;
604  params.typesize = typesize;
605  params.blocksize = blocksize;
606  params.ntbytes = 0;
607  params.nbytes = nbytes;
608  params.nblocks = nblocks;
609  params.leftover = leftover;
610  params.bstarts = bstarts;
611  params.src = (uint8_t *)src;
612  params.dest = (uint8_t *)dest;
613
614  /* Do the actual decompression */
615  ntbytes = do_job();
616
617  assert(ntbytes <= (int32_t)dest_size);
618  return ntbytes;
619}
620
621
622/* Decompress & unshuffle several blocks in a single thread */
623void *t_blosc(void *tids)
624{
625  int32_t tid = *(int32_t *)tids;
626  int32_t cbytes, ntdest;
627  uint32_t tblocks;              /* number of blocks per thread */
628  uint32_t leftover2;
629  uint32_t tblock;               /* limit block on a thread */
630  uint32_t nblock_;              /* private copy of nblock */
631  int32_t rc;
632  uint32_t bsize, leftoverblock;
633  /* Parameters for threads */
634  size_t blocksize;
635  size_t ebsize;
636  int32_t compress;
637  uint32_t nbytes;
638  uint32_t ntbytes;
639  uint32_t nblocks;
640  uint32_t leftover;
641  uint32_t *bstarts;
642  uint8_t *src;
643  uint8_t *dest;
644  uint8_t *tmp;
645  uint8_t *tmp2;
646
647  while (1) {
648    /* Meeting point for all threads (wait for initialization) */
649    rc = pthread_barrier_wait(&barr_init);
650    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
651      printf("Could not wait on barrier (init)\n");
652      exit(-1);
653    }
654    if (end_threads) {
655      return(0);
656    }
657
658    /* Get parameters for this thread */
659    blocksize = params.blocksize;
660    ebsize = blocksize + params.typesize*sizeof(int32_t);
661    compress = params.compress;
662    nbytes = params.nbytes;
663    ntbytes = params.ntbytes;
664    nblocks = params.nblocks;
665    leftover = params.leftover;
666    bstarts = params.bstarts;
667    src = params.src;
668    dest = params.dest;
669    tmp = params.tmp[tid];
670    tmp2 = params.tmp2[tid];
671
672    giveup = 0;
673    if (compress) {
674      /* Compression always has to follow the block order */
675      pthread_mutex_lock(&count_mutex);
676      nblock++;
677      nblock_ = nblock;
678      pthread_mutex_unlock(&count_mutex);
679      tblock = nblocks;
680    }
681    else {
682      /* Decompression can happen using any order.  We choose
683       sequential block order on each thread */
684
685      /* Blocks per thread */
686      tblocks = nblocks / nthreads;
687      leftover2 = nblocks % nthreads;
688      tblocks = (leftover2>0)? tblocks+1: tblocks;
689
690      nblock_ = tid*tblocks;
691      tblock = nblock_ + tblocks;
692      if (tblock > nblocks) {
693        tblock = nblocks;
694      }
695      ntbytes = 0;
696    }
697
698    /* Loop over blocks */
699    leftoverblock = 0;
700    while ((nblock_ < tblock) && !giveup) {
701      bsize = blocksize;
702      if (nblock_ == (nblocks - 1) && (leftover > 0)) {
703        bsize = leftover;
704        leftoverblock = 1;
705      }
706      if (compress) {
707        cbytes = blosc_c(bsize, leftoverblock, 0, ebsize,
708                         src+nblock_*blocksize, tmp2, tmp);
709      }
710      else {
711        cbytes = blosc_d(bsize, leftoverblock,
712                         src+bstarts[nblock_], dest+nblock_*blocksize,
713                         tmp, tmp2);
714      }
715
716      /* Check whether current thread has to giveup */
717      /* This is critical in order to not overwrite variables */
718      if (giveup != 0) {
719        break;
720      }
721
722      /* Check results for the decompressed block */
723      if (cbytes < 0) {            /* compr/decompr failure */
724        /* Set cbytes code error first */
725        pthread_mutex_lock(&count_mutex);
726        params.ntbytes = cbytes;
727        pthread_mutex_unlock(&count_mutex);
728        giveup = 1;                 /* give up (de-)compressing after */
729        break;
730      }
731
732      if (compress) {
733        /* Start critical section */
734        pthread_mutex_lock(&count_mutex);
735        if (cbytes == 0) {            /* uncompressible buffer */
736          params.ntbytes = 0;
737          pthread_mutex_unlock(&count_mutex);
738          giveup = 1;                 /* give up compressing */
739          break;
740        }
741        bstarts[nblock_] = params.ntbytes;   /* update block start counter */
742        ntdest = params.ntbytes;
743        if (ntdest+cbytes > (int32_t)nbytes) {   /* uncompressible buffer */
744          params.ntbytes = 0;
745          pthread_mutex_unlock(&count_mutex);
746          giveup = 1;
747          break;
748        }
749        nblock++;
750        nblock_ = nblock;
751        params.ntbytes += cbytes;       /* update return bytes counter */
752        pthread_mutex_unlock(&count_mutex);
753        /* End of critical section */
754
755        /* Copy the compressed buffer to destination */
756        memcpy(dest+ntdest, tmp2, cbytes);
757      }
758      else {
759        nblock_++;
760      }
761      /* Update counter for this thread */
762      ntbytes += cbytes;
763
764    } /* closes while (nblock_) */
765
766    if (!compress && !giveup) {
767      /* Update global counter for all threads (decompression only) */
768      pthread_mutex_lock(&count_mutex);
769      params.ntbytes += ntbytes;
770      pthread_mutex_unlock(&count_mutex);
771    }
772
773    /* Meeting point for all threads (wait for finalization) */
774    rc = pthread_barrier_wait(&barr_finish);
775    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
776      printf("Could not wait on barrier (finish)\n");
777      exit(-1);
778    }
779    /* Reset block counter */
780    nblock = -1;
781
782  }  /* closes while(1) */
783
784  /* This should never be reached, but anyway */
785  return(0);
786}
787
788
789int init_threads(void)
790{
791  int32_t tid, rc;
792
793  /* Initialize mutex and condition variable objects */
794  pthread_mutex_init(&count_mutex, NULL);
795
796  /* Barrier initialization */
797  pthread_barrier_init(&barr_init, NULL, nthreads+1);
798  pthread_barrier_init(&barr_inter, NULL, nthreads);
799  pthread_barrier_init(&barr_finish, NULL, nthreads+1);
800
801  /* Initialize and set thread detached attribute */
802  pthread_attr_init(&ct_attr);
803  pthread_attr_setdetachstate(&ct_attr, PTHREAD_CREATE_JOINABLE);
804
805  /* Finally, create the threads in detached state */
806  for (tid = 0; tid < nthreads; tid++) {
807    tids[tid] = tid;
808    rc = pthread_create(&threads[tid], &ct_attr, t_blosc, (void *)&tids[tid]);
809    if (rc) {
810      fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc);
811      fprintf(stderr, "\tError detail: %s\n", strerror(rc));
812      exit(-1);
813    }
814  }
815
816  init_threads_done = 1;                 /* Initialization done! */
817
818  return(0);
819}
820
821
822int blosc_set_nthreads(int nthreads_new)
823{
824  int32_t nthreads_old = nthreads;
825  int32_t t, rc;
826  void *status;
827
828  if (nthreads_new > MAX_THREADS) {
829    fprintf(stderr, "Error.  nthreads cannot be larger than MAX_THREADS (%d)",
830            MAX_THREADS);
831    return -1;
832  }
833  else if (nthreads_new <= 0) {
834    fprintf(stderr, "Error.  nthreads must be a positive integer");
835    return -1;
836  }
837  else if (nthreads_new != nthreads) {
838    if (nthreads > 1 && init_threads_done) {
839      /* Tell all existing threads to end */
840      end_threads = 1;
841      rc = pthread_barrier_wait(&barr_init);
842      if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
843        printf("Could not wait on barrier (init)\n");
844        exit(-1);
845      }
846      /* Join exiting threads */
847      for (t=0; t<nthreads; t++) {
848        rc = pthread_join(threads[t], &status);
849        if (rc) {
850          fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc);
851          fprintf(stderr, "\tError detail: %s\n", strerror(rc));
852          exit(-1);
853        }
854      }
855      init_threads_done = 0;
856      end_threads = 0;
857    }
858    nthreads = nthreads_new;
859    if (nthreads > 1) {
860      /* Launch a new pool of threads */
861      init_threads();
862    }
863    return nthreads_old;
864  }
865  return 0;
866}
867
868
869/* Free possible memory temporaries and thread resources */
870void blosc_free_resources(void)
871{
872  int32_t t, rc;
873  void *status;
874
875  /* Release temporaries */
876  if (init_temps_done) {
877    release_temporaries();
878  }
879
880  /* Finish the possible thread pool */
881  if (nthreads > 1 && init_threads_done) {
882    /* Tell all existing threads to end */
883    end_threads = 1;
884    rc = pthread_barrier_wait(&barr_init);
885    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
886      exit(-1);
887    }
888    /* Join exiting threads */
889    for (t=0; t<nthreads; t++) {
890      rc = pthread_join(threads[t], &status);
891      if (rc) {
892        fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc);
893        fprintf(stderr, "\tError detail: %s\n", strerror(rc));
894        exit(-1);
895      }
896    }
897
898    /* Release mutex and condition variable objects */
899    pthread_mutex_destroy(&count_mutex);
900
901    /* Barriers */
902    pthread_barrier_destroy(&barr_init);
903    pthread_barrier_destroy(&barr_inter);
904    pthread_barrier_destroy(&barr_finish);
905
906    /* Thread attributes */
907    pthread_attr_destroy(&ct_attr);
908
909    init_threads_done = 0;
910    end_threads = 0;
911  }
912}
913
914
915/* Force the use of a specific blocksize.  If 0, an automatic
916   blocksize will be used (the default). */
917void blosc_set_blocksize(size_t size)
918{
919  force_blocksize = size;
920}
921
Note: See TracBrowser for help on using the repository browser.