source: branches/threaded/src/blosc.c @ 74

Revision 74, 26.3 KB checked in by faltet, 3 years ago (diff)

Removed SP_BLOCKSIZE constant and use min_buffersize_parallel only.

This is simpler and new benchmarks reveal that there is not much sense
to have very small blocks like 2 KB.

Following benchmark results, the hash value for blosclz has made to not
pass 13.

RevLine 
[22]1/*********************************************************************
2  Blosc - Blocked Suffling and Compression Library
[3]3
[22]4  Author: Francesc Alted (faltet@pytables.org)
[35]5  Creation date: 2009-05-20
[3]6
[22]7  See LICENSES/BLOSC.txt for details about copyright and rights to use.
8**********************************************************************/
[3]9
[22]10
[3]11#include <stdlib.h>
12#include <stdio.h>
13#include <string.h>
[22]14#include <sys/types.h>
15#include <sys/stat.h>
[51]16#include <assert.h>
[22]17#include "blosc.h"
[3]18#include "blosclz.h"
[8]19#include "shuffle.h"
[73]20
[42]21#ifdef _WIN32
[26]22  #include <windows.h>
23#else
24  #include <stdint.h>
25  #include <unistd.h>
[42]26#endif  /* _WIN32 */
[73]27
[68]28#include <pthread.h>
[3]29
[20]30
[72]31/* Minimal buffer size to be compressed */
32#define MIN_BUFFERSIZE 128       /* Cannot be smaller than 66 */
33
[42]34/* Maximum typesize before considering buffer as a stream of bytes. */
[72]35#define MAX_TYPESIZE 256         /* Cannot be larger than 256 */
[42]36
[23]37/* The maximum number of splits in a block for compression */
[66]38#define MAX_SPLITS 16            /* Cannot be larger than 128 */
[3]39
[66]40/* The maximum number of threads (for some static arrays) */
[59]41#define MAX_THREADS 64
[3]42
43
[74]44/* Minimal buffer size beyond which parallelization is allowed.  Note
45   that the buffer size should be *strictly larger* than this minimum
46   before parallelization enters in action. */
47size_t min_buffersize_parallel = 256*1024;
[73]48
[65]49/* Global variables for main logic */
50int init_temps_done = 0;        /* temporaries for compr/decompr initialized? */
51size_t force_blocksize = 0;     /* should we force the use of a blocksize? */
52
[59]53/* Global variables for threads */
[60]54int nthreads = 1;               /* number of desired threads in pool */
55int init_threads_done = 0;      /* pool of threads initialized? */
56int end_threads = 0;            /* should exisiting threads end? */
[59]57int giveup;                     /* should (de-)compression give up? */
58int nblock = -1;                /* block counter */
59pthread_t threads[MAX_THREADS]; /* opaque structure for threads */
[61]60int tids[MAX_THREADS];          /* ID per each thread */
[59]61pthread_attr_t ct_attr;         /* creation time attributes for threads */
[11]62
[59]63/* Syncronization variables */
64pthread_mutex_t count_mutex;
65pthread_barrier_t barr_init;
66pthread_barrier_t barr_inter;
67pthread_barrier_t barr_finish;
[11]68
[59]69/* Structure for parameters in (de-)compression threads */
70struct thread_data {
71  size_t typesize;
72  size_t blocksize;
73  int compress;
74  int clevel;
75  int shuffle;
76  int ntbytes;
77  unsigned int nbytes;
78  unsigned int nblocks;
79  unsigned int leftover;
80  unsigned int *bstarts;             /* start pointers for each block */
81  unsigned char *src;
82  unsigned char *dest;
83  unsigned char *tmp[MAX_THREADS];
84  unsigned char *tmp2[MAX_THREADS];
85} params;
86
87
[61]88/* Structure for parameters meant for keeping track of current temporaries */
89struct temp_data {
90  int nthreads;
91  size_t typesize;
92  size_t blocksize;
93} current_temp;
[59]94
95
96
[73]97/* Shuffle & compress a single block */
[59]98static int
99blosc_c(size_t blocksize, int leftoverblock,
100        unsigned int ntbytes, unsigned int nbytes,
101        unsigned char *src, unsigned char *dest, unsigned char *tmp)
[55]102{
[22]103  size_t j, neblock, nsplits;
[59]104  int cbytes;                   /* number of compressed bytes in split */
105  int ctbytes = 0;              /* number of compressed bytes in block */
106  int maxout;
107  unsigned char *_tmp;
108  size_t typesize = params.typesize;
[9]109
[59]110  if (params.shuffle && (typesize > 1)) {
[22]111    /* Shuffle this block (this makes sense only if typesize > 1) */
[59]112    shuffle(typesize, blocksize, src, tmp);
[20]113    _tmp = tmp;
[11]114  }
[20]115  else {
[59]116    _tmp = src;
[20]117  }
[9]118
[22]119  /* Compress for each shuffled slice split for this block. */
[65]120  /* If typesize is too large, neblock is too small or we are in a
121     leftover block, do not split at all. */
[66]122  if ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE &&
[65]123      (!leftoverblock)) {
[22]124    nsplits = typesize;
125  }
126  else {
127    nsplits = 1;
128  }
129  neblock = blocksize / nsplits;
130  for (j = 0; j < nsplits; j++) {
[59]131    dest += sizeof(int);
132    ntbytes += sizeof(int);
[22]133    ctbytes += sizeof(int);
134    maxout = neblock;
[59]135    if (ntbytes+maxout > nbytes) {
136      maxout = nbytes - ntbytes;   /* avoid buffer overrun */
[42]137      if (maxout <= 0) {
138        return 0;                  /* non-compressible block */
139      }
[22]140    }
[59]141    cbytes = blosclz_compress(params.clevel, _tmp+j*neblock, neblock,
142                              dest, maxout-1);
[55]143    if (cbytes >= maxout) {
[22]144      /* Buffer overrun caused by blosclz_compress (should never happen) */
145      return -1;
146    }
[42]147    else if (cbytes < 0) {
[22]148      /* cbytes should never be negative */
149      return -2;
150    }
[55]151    else if (cbytes == 0) {
152      /* The compressor has been unable to compress data significantly. */
153      /* Before doing the copy, check that we are not running into a
154         buffer overflow. */
[59]155      if ((ntbytes+neblock) > nbytes) {
[22]156        return 0;    /* Non-compressible data */
[18]157      }
[59]158      memcpy(dest, _tmp+j*neblock, neblock);
[9]159      cbytes = neblock;
160    }
[59]161    ((unsigned int *)(dest))[-1] = cbytes;
162    dest += cbytes;
163    ntbytes += cbytes;
[22]164    ctbytes += cbytes;
[43]165  }  /* Closes j < nsplits */
[9]166
[59]167  return ctbytes;
[9]168}
169
170
[67]171/* Decompress & unshuffle a single block */
172static int
173blosc_d(size_t blocksize, int leftoverblock,
174        unsigned char *src, unsigned char *dest,
175        unsigned char *tmp, unsigned char *tmp2)
176{
177  int j, neblock, nsplits;
178  int nbytes;                /* number of decompressed bytes in split */
179  int cbytes;                /* number of compressed bytes in split */
180  int ctbytes = 0;           /* number of compressed bytes in block */
181  int ntbytes = 0;           /* number of uncompressed bytes in block */
182  unsigned char *_tmp;
183  size_t typesize = params.typesize;
184  size_t shuffle = params.shuffle;
185
186  if (shuffle && (typesize > 1)) {
187    _tmp = tmp;
188  }
189  else {
190    _tmp = dest;
191  }
192
193  /* Compress for each shuffled slice split for this block. */
194  if ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE &&
195      (!leftoverblock)) {
196    nsplits = typesize;
197  }
198  else {
199    nsplits = 1;
200  }
201  neblock = blocksize / nsplits;
202  for (j = 0; j < nsplits; j++) {
203    cbytes = ((unsigned int *)(src))[0]; /* amount of compressed bytes */
204    src += sizeof(int);
205    ctbytes += sizeof(int);
206    /* Uncompress */
207    if (cbytes == neblock) {
208      memcpy(_tmp, src, neblock);
209      nbytes = neblock;
210    }
211    else {
212      nbytes = blosclz_decompress(src, cbytes, _tmp, neblock);
213      if (nbytes != neblock) {
214        return -2;
215      }
216    }
217    src += cbytes;
218    ctbytes += cbytes;
219    _tmp += nbytes;
220    ntbytes += nbytes;
221  } /* Closes j < nsplits */
222
223  if (shuffle && (typesize > 1)) {
224    if ((uintptr_t)dest % 16 == 0) {
225      /* 16-bytes aligned dest.  SSE2 unshuffle will work. */
226      unshuffle(typesize, blocksize, tmp, dest);
227    }
228    else {
229      /* dest is not aligned.  Use tmp2, which is aligned, and copy. */
230      unshuffle(typesize, blocksize, tmp, tmp2);
231      memcpy(dest, tmp2, blocksize);
232    }
233  }
234
235  /* Return the number of uncompressed bytes */
236  return ntbytes;
237}
238
239
240/* Serial version for compression/decompression */
241int
242serial_blosc(void)
243{
244  unsigned int j, bsize, leftoverblock;
245  int cbytes;
246  int compress = params.compress;
247  size_t blocksize = params.blocksize;
248  int ntbytes = params.ntbytes;
249  int nbytes = params.nbytes;
250  unsigned int nblocks = params.nblocks;
251  int leftover = params.nbytes % params.blocksize;
252  unsigned int *bstarts = params.bstarts;
253  unsigned char *src = params.src;
254  unsigned char *dest = params.dest;
255  unsigned char *tmp = params.tmp[0];     /* tmp for thread 0 */
256  unsigned char *tmp2 = params.tmp2[0];   /* tmp2 for thread 0 */
257
258  for (j = 0; j < nblocks; j++) {
259    if (compress) {
260      bstarts[j] = ntbytes;
261    }
262    bsize = blocksize;
263    leftoverblock = 0;
264    if ((j == nblocks - 1) && (leftover > 0)) {
265      bsize = leftover;
266      leftoverblock = 1;
267    }
268    if (compress) {
269      cbytes = blosc_c(bsize, leftoverblock, ntbytes, nbytes,
270                       src+j*blocksize, dest+bstarts[j], tmp);
271      if (cbytes == 0) {
272        ntbytes = 0;              /* uncompressible data */
273        break;
274      }
275    }
276    else {
277      cbytes = blosc_d(bsize, leftoverblock,
278                       src+bstarts[j], dest+j*blocksize, tmp, tmp2);
279    }
280    if (cbytes < 0) {
281      ntbytes = cbytes;         /* error in blosc_c / blosc_d */
282      break;
283    }
284    ntbytes += cbytes;
285  }
286
287  /* Final check for ntbytes (only in compression mode) */
288  if (compress) {
289    if (ntbytes == nbytes) {
290      ntbytes = 0;               /* non-compressible data */
291    }
292    else if (ntbytes > nbytes) {
293      fprintf(stderr, "The impossible happened: buffer overflow!\n");
294      ntbytes = -5;               /* too large buffer */
295    }
296  }  /* Close j < nblocks */
297
298  return ntbytes;
299}
300
301
302/* Threaded version for compression/decompression */
303int
304parallel_blosc(void)
305{
306  int rc;
307
308  /* Synchronization point for all threads (wait for initialization) */
309  rc = pthread_barrier_wait(&barr_init);
310  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
311    printf("Could not wait on barrier (init)\n");
312    exit(-1);
313  }
314  /* Synchronization point for all threads (wait for finalization) */
315  rc = pthread_barrier_wait(&barr_finish);
316  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
317    printf("Could not wait on barrier (finish)\n");
318    exit(-1);
319  }
320
321  /* Return the total bytes decompressed in threads */
322  return params.ntbytes;
323}
324
325
326/* Convenience functions for creating and releasing temporaries */
327void
328create_temporaries(void)
329{
330  int tid;
331  size_t typesize = params.typesize;
332  size_t blocksize = params.blocksize;
333  /* Extended blocksize for temporary destination.  Extended blocksize
334   is only useful for compression in parallel mode, but it doesn't
335   hurt other modes either. */
336  size_t ebsize = blocksize + typesize*sizeof(int);
337  unsigned char *tmp, *tmp2;
338
339  /* Create temporary area for each thread */
340  for (tid = 0; tid < nthreads; tid++) {
341#ifdef _WIN32
342    tmp = (unsigned char *)_aligned_malloc(blocksize, 16);
343    tmp2 = (unsigned char *)_aligned_malloc(ebsize, 16);
344#elif defined __APPLE__
345    /* Mac OS X guarantees 16-byte alignment in small allocs */
346    tmp = (unsigned char *)malloc(blocksize);
347    tmp2 = (unsigned char *)malloc(ebsize);
348#else
349    posix_memalign((void **)&tmp, 16, blocksize);
350    posix_memalign((void **)&tmp2, 16, ebsize);
351#endif  /* _WIN32 */
352    params.tmp[tid] = tmp;
353    params.tmp2[tid] = tmp2;
354  }
355
356  init_temps_done = 1;
357  /* Update params for current temporaries */
358  current_temp.nthreads = nthreads;
359  current_temp.typesize = typesize;
360  current_temp.blocksize = blocksize;
361
362}
363
364
365void
366release_temporaries(void)
367{
368  int tid;
369  unsigned char *tmp, *tmp2;
370
371  /* Release buffers */
372  for (tid = 0; tid < nthreads; tid++) {
373    tmp = params.tmp[tid];
374    tmp2 = params.tmp2[tid];
375#ifdef _WIN32
376    _aligned_free(tmp);
377    _aligned_free(tmp2);
378#else
379    free(tmp);
380    free(tmp2);
381#endif  /* _WIN32 */
382  }
383
384  init_temps_done = 0;
385
386}
387
388
389/* Do the compression or decompression of the buffer depending on the
390   global params. */
391int
392do_job(void) {
393  int ntbytes;
394
395  /* Initialize/reset temporaries if needed */
396  if (!init_temps_done) {
397    create_temporaries();
398  }
399  else if (current_temp.nthreads != nthreads ||
400           current_temp.typesize != params.typesize ||
401           current_temp.blocksize != params.blocksize) {
402    release_temporaries();
403    create_temporaries();
404  }
405
406  /* Run the serial version when nthreads is 1 or when the buffers are
407     just too small */
[74]408  if (nthreads == 1 || params.nbytes <= min_buffersize_parallel) {
[67]409    ntbytes = serial_blosc();
410  }
411  else {
412    ntbytes = parallel_blosc();
413  }
414
415  return ntbytes;
416}
417
418
[63]419size_t
420compute_blocksize(int clevel, size_t typesize, size_t nbytes)
421{
422  size_t blocksize;
423  int i;
424
[65]425  if (force_blocksize) {
426    blocksize = force_blocksize;
427    /* Check that forced blocksize is not too small nor too large */
[66]428    if (blocksize < MIN_BUFFERSIZE) {
429      blocksize = MIN_BUFFERSIZE;
[65]430    }
431    if (blocksize > nbytes) {
432      blocksize = nbytes;
433    }
434  }
[74]435  else if (nbytes > min_buffersize_parallel) {
436    blocksize = min_buffersize_parallel;
437    /* Duplicate blocksize for compression levels 8 and 9 */
438    if (clevel > 7) {
[63]439      blocksize *= 2;
440    }
441  }
442  else {
443    /* Buffer size is too small for splitting it in blocks */
444    blocksize = nbytes;
445  }
[65]446
[63]447  /* blocksize must be a multiple of the typesize */
448  blocksize = blocksize / typesize * typesize;
449
450  return blocksize;
451}
452
453
[3]454unsigned int
[59]455blosc_compress(int clevel, int shuffle, size_t typesize, size_t nbytes,
[39]456               const void *src, void *dest)
[3]457{
[55]458  unsigned char *_dest=NULL;       /* current pos for destination buffer */
[42]459  unsigned char *flags;            /* flags for header */
[59]460  unsigned int nblocks;            /* number of total blocks in buffer */
461  unsigned int leftover;           /* extra bytes at end of buffer */
[51]462  unsigned int *bstarts;           /* start pointers for each block */
[42]463  size_t blocksize;                /* length of the block in bytes */
[59]464  unsigned int ntbytes = 0;        /* the number of compressed bytes */
465  unsigned int *ntbytes_;          /* placeholder for bytes in output buffer */
[3]466
[39]467  /* Compression level */
468  if (clevel < 0 || clevel > 9) {
469    /* If clevel not in 0..9, print an error */
470    fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
471    return -10;
472  }
473  else if (clevel == 0) {
[35]474    /* No compression wanted.  Just return without doing anything else. */
[39]475    return 0;
[35]476  }
[65]477  else if (nbytes < MIN_BUFFERSIZE) {
478    /* Too little buffer.  Just return without doing anything else. */
479    return 0;
480  }
[35]481
[39]482  /* Shuffle */
[59]483  if (shuffle != 0 && shuffle != 1) {
484    fprintf(stderr, "`shuffle` parameter must be either 0 or 1!\n");
[39]485    return -10;
486  }
487
[63]488  /* Get the blocksize */
489  blocksize = compute_blocksize(clevel, typesize, nbytes);
[23]490
[55]491  /* Compute number of blocks in buffer */
[59]492  nblocks = nbytes / blocksize;
[20]493  leftover = nbytes % blocksize;
[59]494  nblocks = (leftover>0)? nblocks+1: nblocks;
[46]495
496  /* Check typesize limits */
[66]497  if (typesize == MAX_TYPESIZE) {
498    typesize = 0;               /* zero means MAX_TYPESIZE */
[46]499  }
[66]500  else if (typesize > MAX_TYPESIZE) {
[46]501    /* If typesize is too large, treat buffer as an 1-byte stream. */
[42]502    typesize = 1;
503  }
504
[55]505  _dest = (unsigned char *)(dest);
[23]506  /* Write header for this block */
[42]507  _dest[0] = BLOSC_VERSION_FORMAT;         /* blosc format version */
508  _dest[1] = BLOSCLZ_VERSION_FORMAT;       /* blosclz format version */
[46]509  flags = _dest+2;                         /* flags */
510  _dest[2] = 0;                            /* zeroes flags */
511  _dest[3] = (unsigned char)typesize;      /* type size */
[42]512  _dest += 4;
[59]513  ntbytes += 4;
514  ((unsigned int *)_dest)[0] = nbytes;     /* size of the buffer */
[46]515  ((unsigned int *)_dest)[1] = blocksize;  /* block size */
[59]516  ntbytes_ = (unsigned int *)(_dest+8);    /* compressed buffer size */
[35]517  _dest += sizeof(int)*3;
[59]518  ntbytes += sizeof(int)*3;
[51]519  bstarts = (unsigned int *)_dest;         /* starts for every block */
[59]520  _dest += sizeof(int)*nblocks;            /* book space for pointers to */
521  ntbytes += sizeof(int)*nblocks;          /* every block in output */
[3]522
[59]523  if (shuffle == 1) {
[42]524    /* Shuffle is active */
525    *flags |= 0x1;                         /* bit 0 set to one in flags */
[9]526  }
527
[59]528  /* Populate parameters for compression routines */
529  params.compress = 1;
530  params.clevel = clevel;
531  params.shuffle = shuffle;
532  params.typesize = typesize;
533  params.blocksize = blocksize;
534  params.ntbytes = ntbytes;
535  params.nbytes = nbytes;
536  params.nblocks = nblocks;
537  params.leftover = leftover;
538  params.bstarts = bstarts;
539  params.src = (unsigned char *)src;
540  params.dest = (unsigned char *)dest;
[55]541
[59]542  /* Do the actual compression */
543  ntbytes = do_job();
544  /* Set the number of compressed bytes in header */
545  *ntbytes_ = ntbytes;
[55]546
[63]547  assert(ntbytes < (int)nbytes);
[59]548  return ntbytes;
[3]549}
550
551
552unsigned int
[22]553blosc_decompress(const void *src, void *dest, size_t dest_size)
[3]554{
[59]555  unsigned char *_src=NULL;          /* current pos for source buffer */
556  unsigned char *_dest=NULL;         /* current pos for destination buffer */
[42]557  unsigned char version, versionlz;  /* versions for compressed header */
558  unsigned char flags;               /* flags for header */
[59]559  int shuffle = 0;                   /* do unshuffle? */
560  int ntbytes;                       /* the number of uncompressed bytes */
561  unsigned int nblocks;              /* number of total blocks in buffer */
562  unsigned int leftover;             /* extra bytes at end of buffer */
[51]563  unsigned int *bstarts;             /* start pointers for each block */
[59]564  unsigned int typesize, blocksize, nbytes, ctbytes;
[3]565
[8]566  _src = (unsigned char *)(src);
[3]567  _dest = (unsigned char *)(dest);
568
[22]569  /* Read the header block */
[42]570  version = _src[0];                        /* blosc format version */
571  versionlz = _src[1];                      /* blosclz format version */
[46]572  flags = _src[2];                          /* flags */
573  typesize = (unsigned int)_src[3];         /* typesize */
[35]574  _src += 4;
[59]575  nbytes = ((unsigned int *)_src)[0];       /* buffer size */
[46]576  blocksize = ((unsigned int *)_src)[1];    /* block size */
[59]577  ctbytes = ((unsigned int *)_src)[2];      /* compressed buffer size */
[35]578  _src += sizeof(int)*3;
[59]579  bstarts = (unsigned int *)_src;
[35]580  /* Compute some params */
[51]581  /* Total blocks */
[59]582  nblocks = nbytes / blocksize;
[35]583  leftover = nbytes % blocksize;
[59]584  nblocks = (leftover>0)? nblocks+1: nblocks;
585  _src += sizeof(int)*nblocks;
[20]586
[46]587  /* Check zero typesizes */
588  if (typesize == 0) {
[66]589    typesize = MAX_TYPESIZE;
[46]590  }
591
[10]592  if (nbytes > dest_size) {
[51]593    /* This should never happen but just in case */
[10]594    return -1;
595  }
[3]596
[39]597  if ((flags & 0x1) == 1) {
[22]598    /* Input is shuffled.  Unshuffle it. */
[59]599    shuffle = 1;
[11]600  }
[20]601
[59]602  /* Populate parameters for decompression routines */
603  params.compress = 0;
604  params.clevel = 0;            /* specific for compression */
605  params.shuffle = shuffle;
606  params.typesize = typesize;
607  params.blocksize = blocksize;
608  params.ntbytes = 0;
609  params.nbytes = nbytes;
610  params.nblocks = nblocks;
611  params.leftover = leftover;
612  params.bstarts = bstarts;
613  params.src = (unsigned char *)src;
614  params.dest = (unsigned char *)dest;
615
616  /* Do the actual decompression */
617  ntbytes = do_job();
618
[63]619  assert(ntbytes <= (int)dest_size);
[59]620  return ntbytes;
621}
622
623
624/* Decompress & unshuffle several blocks in a single thread */
625void *t_blosc(void *tids)
626{
627  int tid = *(int *)tids;
628  int cbytes, ntdest;
629  unsigned int tblocks;              /* number of blocks per thread */
630  unsigned int leftover2;
[67]631  unsigned int tblock;               /* limit block on a thread */
632  unsigned int nblock_;              /* private copy of nblock */
[59]633  int rc;
634  unsigned int bsize, leftoverblock;
[68]635  /* Parameters for threads */
636  size_t blocksize;
637  size_t ebsize;
638  int compress;
639  unsigned int nbytes;
640  unsigned int ntbytes;
641  unsigned int nblocks;
642  unsigned int leftover;
643  unsigned int *bstarts;
644  unsigned char *src;
645  unsigned char *dest;
646  unsigned char *tmp;
647  unsigned char *tmp2;
[59]648
649  while (1) {
650    /* Meeting point for all threads (wait for initialization) */
651    rc = pthread_barrier_wait(&barr_init);
652    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
653      printf("Could not wait on barrier (init)\n");
654      exit(-1);
655    }
[60]656    if (end_threads) {
657      return(0);
658    }
[59]659
660    /* Get parameters for this thread */
[68]661    blocksize = params.blocksize;
662    ebsize = blocksize + params.typesize*sizeof(int);
663    compress = params.compress;
664    nbytes = params.nbytes;
665    ntbytes = params.ntbytes;
666    nblocks = params.nblocks;
667    leftover = params.leftover;
668    bstarts = params.bstarts;
669    src = params.src;
670    dest = params.dest;
671    tmp = params.tmp[tid];
672    tmp2 = params.tmp2[tid];
[59]673
674    giveup = 0;
675    if (compress) {
676      /* Compression always has to follow the block order */
677      pthread_mutex_lock(&count_mutex);
678      nblock++;
679      nblock_ = nblock;
680      pthread_mutex_unlock(&count_mutex);
681      tblock = nblocks;
682    }
683    else {
684      /* Decompression can happen using any order.  We choose
685       sequential block order on each thread */
686
687      /* Blocks per thread */
688      tblocks = nblocks / nthreads;
689      leftover2 = nblocks % nthreads;
690      tblocks = (leftover2>0)? tblocks+1: tblocks;
691
692      nblock_ = tid*tblocks;
693      tblock = nblock_ + tblocks;
694      if (tblock > nblocks) {
695        tblock = nblocks;
[55]696      }
[59]697      ntbytes = 0;
698    }
699
700    /* Loop over blocks */
701    leftoverblock = 0;
702    while ((nblock_ < tblock) && !giveup) {
703      bsize = blocksize;
704      if (nblock_ == (nblocks - 1) && (leftover > 0)) {
705        bsize = leftover;
706        leftoverblock = 1;
[55]707      }
[59]708      if (compress) {
709        cbytes = blosc_c(bsize, leftoverblock, 0, ebsize,
710                         src+nblock_*blocksize, tmp2, tmp);
[55]711      }
[59]712      else {
713        cbytes = blosc_d(bsize, leftoverblock,
714                         src+bstarts[nblock_], dest+nblock_*blocksize,
715                         tmp, tmp2);
716      }
717
718      /* Check whether current thread has to giveup */
719      /* This is critical in order to not overwrite variables */
720      if (giveup != 0) {
721        break;
722      }
723
724      /* Check results for the decompressed block */
725      if (cbytes < 0) {            /* compr/decompr failure */
726        /* Set cbytes code error first */
727        pthread_mutex_lock(&count_mutex);
728        params.ntbytes = cbytes;
729        pthread_mutex_unlock(&count_mutex);
730        giveup = 1;                 /* give up (de-)compressing after */
731        break;
732      }
733
734      if (compress) {
735        /* Start critical section */
736        pthread_mutex_lock(&count_mutex);
737        if (cbytes == 0) {            /* uncompressible buffer */
738          params.ntbytes = 0;
739          pthread_mutex_unlock(&count_mutex);
740          giveup = 1;                 /* give up compressing */
741          break;
742        }
743        bstarts[nblock_] = params.ntbytes;   /* update block start counter */
744        ntdest = params.ntbytes;
745        if (ntdest+cbytes > nbytes) {         /* uncompressible buffer */
746          params.ntbytes = 0;
747          pthread_mutex_unlock(&count_mutex);
748          giveup = 1;
749          break;
750        }
751        nblock++;
752        nblock_ = nblock;
753        params.ntbytes += cbytes;       /* update return bytes counter */
754        pthread_mutex_unlock(&count_mutex);
755        /* End of critical section */
756
757        /* Copy the compressed buffer to destination */
758        memcpy(dest+ntdest, tmp2, cbytes);
759      }
760      else {
761        nblock_++;
762      }
763      /* Update counter for this thread */
764      ntbytes += cbytes;
765
766    } /* closes while (nblock_) */
767
768    if (!compress && !giveup) {
769      /* Update global counter for all threads (decompression only) */
770      pthread_mutex_lock(&count_mutex);
771      params.ntbytes += ntbytes;
772      pthread_mutex_unlock(&count_mutex);
[51]773    }
[59]774
775    /* Meeting point for all threads (wait for finalization) */
776    rc = pthread_barrier_wait(&barr_finish);
777    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
778      printf("Could not wait on barrier (finish)\n");
779      exit(-1);
780    }
781    /* Reset block counter */
782    nblock = -1;
783
784  }  /* closes while(1) */
785
786  /* This should never be reached, but anyway */
787  return(0);
788}
789
790
791int
[60]792init_threads()
[59]793{
794  int tid, rc;
795
796  /* Initialize mutex and condition variable objects */
797  pthread_mutex_init(&count_mutex, NULL);
798
799  /* Barrier initialization */
800  pthread_barrier_init(&barr_init, NULL, nthreads+1);
801  pthread_barrier_init(&barr_inter, NULL, nthreads);
802  pthread_barrier_init(&barr_finish, NULL, nthreads+1);
803
804  /* Initialize and set thread detached attribute */
805  pthread_attr_init(&ct_attr);
[60]806  pthread_attr_setdetachstate(&ct_attr, PTHREAD_CREATE_JOINABLE);
[59]807
808  /* Finally, create the threads in detached state */
809  for (tid = 0; tid < nthreads; tid++) {
810    tids[tid] = tid;
811    rc = pthread_create(&threads[tid], &ct_attr, t_blosc, (void *)&tids[tid]);
812    if (rc) {
813      fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc);
814      fprintf(stderr, "\tError detail: %s\n", strerror(rc));
815      exit(-1);
816    }
817  }
818
[60]819  init_threads_done = 1;                 /* Initialization done! */
[59]820
821  return(0);
[3]822}
[60]823
824
825int
826blosc_set_nthreads(int nthreads_new)
827{
828  int nthreads_old = nthreads;
829  int t, rc;
830  void *status;
831
832  if (nthreads_new > MAX_THREADS) {
833    fprintf(stderr, "Error.  nthreads cannot be larger than MAX_THREADS (%d)",
834            MAX_THREADS);
835    return -1;
836  }
837  else if (nthreads_new <= 0) {
838    fprintf(stderr, "Error.  nthreads must be a positive integer");
839    return -1;
840  }
841  else if (nthreads_new != nthreads) {
842    if (nthreads > 1 && init_threads_done) {
843      /* Tell all existing threads to end */
844      end_threads = 1;
845      rc = pthread_barrier_wait(&barr_init);
846      if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
847        printf("Could not wait on barrier (init)\n");
848        exit(-1);
849      }
850      /* Join exiting threads */
851      for (t=0; t<nthreads; t++) {
852        rc = pthread_join(threads[t], &status);
853        if (rc) {
854          fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc);
855          fprintf(stderr, "\tError detail: %s\n", strerror(rc));
856          exit(-1);
857        }
858      }
859      init_threads_done = 0;
860      end_threads = 0;
861    }
862    nthreads = nthreads_new;
863    if (nthreads > 1) {
864      /* Launch a new pool of threads */
865      init_threads();
866    }
867    return nthreads_old;
868  }
869  return 0;
870}
871
[64]872
873/* Free possible memory temporaries and thread resources */
874void
875blosc_free_resources(void)
876{
877  int t, rc;
878  void *status;
879
880  /* Release temporaries */
881  if (init_temps_done) {
882    release_temporaries();
883  }
884
885  /* Finish the possible thread pool */
886  if (nthreads > 1 && init_threads_done) {
887    /* Tell all existing threads to end */
888    end_threads = 1;
889    rc = pthread_barrier_wait(&barr_init);
890    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
891      exit(-1);
892    }
893    /* Join exiting threads */
894    for (t=0; t<nthreads; t++) {
895      rc = pthread_join(threads[t], &status);
896      if (rc) {
897        fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc);
898        fprintf(stderr, "\tError detail: %s\n", strerror(rc));
899        exit(-1);
900      }
901    }
902
903    /* Release mutex and condition variable objects */
904    pthread_mutex_destroy(&count_mutex);
905
906    /* Barriers */
907    pthread_barrier_destroy(&barr_init);
908    pthread_barrier_destroy(&barr_inter);
909    pthread_barrier_destroy(&barr_finish);
910
911    /* Thread attributes */
912    pthread_attr_destroy(&ct_attr);
913
914    init_threads_done = 0;
915    end_threads = 0;
916  }
917}
[65]918
919
920/* Force the use of a specific blocksize.  If 0, an automatic
921   blocksize will be used (the default). */
922void
[73]923blosc_set_blocksize(size_t size)
[65]924{
[73]925  force_blocksize = size;
[65]926}
[73]927
928
929/* Set the minimal buffer size before threads can run in parallel */
930void
931blosc_set_min_buffersize_parallel(size_t size)
932{
933  min_buffersize_parallel = size;
934}
Note: See TracBrowser for help on using the repository browser.