source: trunk/src/blosc.c @ 114

Revision 114, 29.5 KB checked in by faltet, 4 years ago (diff)

Fixes for supporting MINGW compiler.

Line 
1/*********************************************************************
2  Blosc - Blocked Suffling and Compression Library
3
4  Author: Francesc Alted (faltet@pytables.org)
5  Creation date: 2009-05-20
6
7  See LICENSES/BLOSC.txt for details about copyright and rights to use.
8**********************************************************************/
9
10
11#include <stdlib.h>
12#include <stdio.h>
13#include <string.h>
14#include <sys/types.h>
15#include <sys/stat.h>
16#include <assert.h>
17#include <pthread.h>
18#include "blosc.h"
19#include "blosclz.h"
20#include "shuffle.h"
21
22#if defined(_WIN32) && !defined(__MINGW32__)
23  #include <windows.h>
24  #include "stdint-windows.h"
25#else
26  #include <stdint.h>
27  #include <unistd.h>
28  #include <inttypes.h>
29#endif  /* _WIN32 */
30
31
32
33/* Minimal buffer size to be compressed */
34#define MIN_BUFFERSIZE 128       /* Cannot be smaller than 66 */
35
36/* Maximum typesize before considering buffer as a stream of bytes. */
37#define MAX_TYPESIZE 255         /* Cannot be larger than 255 */
38
39/* The maximum number of splits in a block for compression */
40#define MAX_SPLITS 16            /* Cannot be larger than 128 */
41
42/* The maximum number of threads (for some static arrays) */
43#define MAX_THREADS 64
44
45/* The size of L1 cache.  32 KB is quite common nowadays. */
46#define L1 (32*1024)
47
48
49/* Global variables for main logic */
50int32_t init_temps_done = 0;    /* temporaries for compr/decompr initialized? */
51size_t force_blocksize = 0;     /* should we force the use of a blocksize? */
52
53/* Global variables for threads */
54int32_t nthreads = 1;            /* number of desired threads in pool */
55int32_t init_threads_done = 0;   /* pool of threads initialized? */
56int32_t end_threads = 0;         /* should exisiting threads end? */
57int32_t init_sentinels_done = 0; /* sentinels initialized? */
58int32_t giveup_code;             /* error code when give up */
59int32_t nblock;                  /* block counter */
60pthread_t threads[MAX_THREADS];  /* opaque structure for threads */
61int32_t tids[MAX_THREADS];       /* ID per each thread */
62pthread_attr_t ct_attr;          /* creation time attributes for threads */
63
64#if defined(_POSIX_BARRIERS) && (_POSIX_BARRIERS - 20012L) >= 0
65#define _POSIX_BARRIERS_MINE
66#endif
67
68/* Syncronization variables */
69pthread_mutex_t count_mutex;
70#ifdef _POSIX_BARRIERS_MINE
71pthread_barrier_t barr_init;
72pthread_barrier_t barr2_init;
73pthread_barrier_t barr_finish;
74#else
75int32_t count_threads = 0;
76int32_t count2_threads = 0;
77pthread_mutex_t count_threads_mutex;
78pthread_cond_t count_threads_cv;
79pthread_mutex_t count2_threads_mutex;
80pthread_cond_t count2_threads_cv;
81#endif
82
83
84/* Structure for parameters in (de-)compression threads */
85struct thread_data {
86  size_t typesize;
87  size_t blocksize;
88  int32_t compress;
89  int32_t clevel;
90  int32_t shuffle;
91  int32_t ntbytes;
92  uint32_t nbytes;
93  uint32_t nblocks;
94  uint32_t leftover;
95  uint32_t *bstarts;             /* start pointers for each block */
96  uint8_t *src;
97  uint8_t *dest;
98  uint8_t *tmp[MAX_THREADS];
99  uint8_t *tmp2[MAX_THREADS];
100} params;
101
102
103/* Structure for parameters meant for keeping track of current temporaries */
104struct temp_data {
105  int32_t nthreads;
106  size_t typesize;
107  size_t blocksize;
108} current_temp;
109
110
111
112/* Shuffle & compress a single block */
113static int blosc_c(size_t blocksize, int32_t leftoverblock,
114                   uint32_t ntbytes, uint32_t nbytes,
115                   uint8_t *src, uint8_t *dest, uint8_t *tmp)
116{
117  size_t j, neblock, nsplits;
118  int32_t cbytes;                   /* number of compressed bytes in split */
119  int32_t ctbytes = 0;              /* number of compressed bytes in block */
120  int32_t maxout;
121  uint8_t *_tmp;
122  size_t typesize = params.typesize;
123
124  if (params.shuffle && (typesize > 1)) {
125    /* Shuffle this block (this makes sense only if typesize > 1) */
126    shuffle(typesize, blocksize, src, tmp);
127    _tmp = tmp;
128  }
129  else {
130    _tmp = src;
131  }
132
133  /* Compress for each shuffled slice split for this block. */
134  /* If typesize is too large, neblock is too small or we are in a
135     leftover block, do not split at all. */
136  if ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE &&
137      (!leftoverblock)) {
138    nsplits = typesize;
139  }
140  else {
141    nsplits = 1;
142  }
143  neblock = blocksize / nsplits;
144  for (j = 0; j < nsplits; j++) {
145    dest += sizeof(int32_t);
146    ntbytes += sizeof(int32_t);
147    ctbytes += sizeof(int32_t);
148    maxout = neblock;
149    if (ntbytes+maxout > nbytes) {
150      maxout = nbytes - ntbytes;   /* avoid buffer overrun */
151      if (maxout <= 0) {
152        return 0;                  /* non-compressible block */
153      }
154    }
155    cbytes = blosclz_compress(params.clevel, _tmp+j*neblock, neblock,
156                              dest, maxout-1);
157    if (cbytes >= maxout) {
158      /* Buffer overrun caused by blosclz_compress (should never happen) */
159      return -1;
160    }
161    else if (cbytes < 0) {
162      /* cbytes should never be negative */
163      return -2;
164    }
165    else if (cbytes == 0) {
166      /* The compressor has been unable to compress data significantly. */
167      /* Before doing the copy, check that we are not running into a
168         buffer overflow. */
169      if ((ntbytes+neblock) > nbytes) {
170        return 0;    /* Non-compressible data */
171      }
172      memcpy(dest, _tmp+j*neblock, neblock);
173      cbytes = neblock;
174    }
175    ((uint32_t *)(dest))[-1] = cbytes;
176    dest += cbytes;
177    ntbytes += cbytes;
178    ctbytes += cbytes;
179  }  /* Closes j < nsplits */
180
181  return ctbytes;
182}
183
184
185/* Decompress & unshuffle a single block */
186static int blosc_d(size_t blocksize, int32_t leftoverblock,
187                   uint8_t *src, uint8_t *dest, uint8_t *tmp, uint8_t *tmp2)
188{
189  int32_t j, neblock, nsplits;
190  int32_t nbytes;                /* number of decompressed bytes in split */
191  int32_t cbytes;                /* number of compressed bytes in split */
192  int32_t ctbytes = 0;           /* number of compressed bytes in block */
193  int32_t ntbytes = 0;           /* number of uncompressed bytes in block */
194  uint8_t *_tmp;
195  size_t typesize = params.typesize;
196  size_t shuffle = params.shuffle;
197
198  if (shuffle && (typesize > 1)) {
199    _tmp = tmp;
200  }
201  else {
202    _tmp = dest;
203  }
204
205  /* Compress for each shuffled slice split for this block. */
206  if ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE &&
207      (!leftoverblock)) {
208    nsplits = typesize;
209  }
210  else {
211    nsplits = 1;
212  }
213  neblock = blocksize / nsplits;
214  for (j = 0; j < nsplits; j++) {
215    cbytes = ((uint32_t *)(src))[0];   /* amount of compressed bytes */
216    src += sizeof(int32_t);
217    ctbytes += sizeof(int32_t);
218    /* Uncompress */
219    if (cbytes == neblock) {
220      memcpy(_tmp, src, neblock);
221      nbytes = neblock;
222    }
223    else {
224      nbytes = blosclz_decompress(src, cbytes, _tmp, neblock);
225      if (nbytes != neblock) {
226        return -2;
227      }
228    }
229    src += cbytes;
230    ctbytes += cbytes;
231    _tmp += nbytes;
232    ntbytes += nbytes;
233  } /* Closes j < nsplits */
234
235  if (shuffle && (typesize > 1)) {
236    if ((uintptr_t)dest % 16 == 0) {
237      /* 16-bytes aligned dest.  SSE2 unshuffle will work. */
238      unshuffle(typesize, blocksize, tmp, dest);
239    }
240    else {
241      /* dest is not aligned.  Use tmp2, which is aligned, and copy. */
242      unshuffle(typesize, blocksize, tmp, tmp2);
243      memcpy(dest, tmp2, blocksize);
244    }
245  }
246
247  /* Return the number of uncompressed bytes */
248  return ntbytes;
249}
250
251
252/* Serial version for compression/decompression */
253int serial_blosc(void)
254{
255  uint32_t j, bsize, leftoverblock;
256  int32_t cbytes;
257  int32_t compress = params.compress;
258  size_t blocksize = params.blocksize;
259  int32_t ntbytes = params.ntbytes;
260  int32_t nbytes = params.nbytes;
261  uint32_t nblocks = params.nblocks;
262  int32_t leftover = params.nbytes % params.blocksize;
263  uint32_t *bstarts = params.bstarts;
264  uint8_t *src = params.src;
265  uint8_t *dest = params.dest;
266  uint8_t *tmp = params.tmp[0];     /* tmp for thread 0 */
267  uint8_t *tmp2 = params.tmp2[0];   /* tmp2 for thread 0 */
268
269  for (j = 0; j < nblocks; j++) {
270    if (compress) {
271      bstarts[j] = ntbytes;
272    }
273    bsize = blocksize;
274    leftoverblock = 0;
275    if ((j == nblocks - 1) && (leftover > 0)) {
276      bsize = leftover;
277      leftoverblock = 1;
278    }
279    if (compress) {
280      cbytes = blosc_c(bsize, leftoverblock, ntbytes, nbytes,
281                       src+j*blocksize, dest+bstarts[j], tmp);
282      if (cbytes == 0) {
283        ntbytes = 0;              /* uncompressible data */
284        break;
285      }
286    }
287    else {
288      cbytes = blosc_d(bsize, leftoverblock,
289                       src+bstarts[j], dest+j*blocksize, tmp, tmp2);
290    }
291    if (cbytes < 0) {
292      ntbytes = cbytes;         /* error in blosc_c / blosc_d */
293      break;
294    }
295    ntbytes += cbytes;
296  }
297
298  /* Final check for ntbytes (only in compression mode) */
299  if (compress) {
300    if (ntbytes == nbytes) {
301      ntbytes = 0;               /* non-compressible data */
302    }
303    else if (ntbytes > nbytes) {
304      fprintf(stderr, "The impossible happened: buffer overflow!\n");
305      ntbytes = -5;               /* too large buffer */
306    }
307  }  /* Close j < nblocks */
308
309  return ntbytes;
310}
311
312
313/* Threaded version for compression/decompression */
314int parallel_blosc(void)
315{
316  int32_t rc;
317
318  /* Synchronization point for all threads (wait for initialization) */
319#ifdef _POSIX_BARRIERS_MINE
320  rc = pthread_barrier_wait(&barr_init);
321  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
322    printf("Could not wait on barrier (init)\n");
323    exit(-1);
324  }
325#else
326  pthread_mutex_lock(&count_threads_mutex);
327  if (count_threads < nthreads) {
328    count_threads++;
329    pthread_cond_wait(&count_threads_cv, &count_threads_mutex);
330  }
331  else {
332    pthread_cond_broadcast(&count_threads_cv);
333  }
334  pthread_mutex_unlock(&count_threads_mutex);
335#endif
336
337  /* Synchronization point for all threads (wait for finalization) */
338#ifdef _POSIX_BARRIERS_MINE
339  rc = pthread_barrier_wait(&barr_finish);
340  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
341    printf("Could not wait on barrier (finish)\n");
342    exit(-1);
343  }
344#else
345  pthread_mutex_lock(&count_threads_mutex);
346  if (count_threads > 0) {
347    count_threads--;
348    pthread_cond_wait(&count_threads_cv, &count_threads_mutex);
349  }
350  else {
351    pthread_cond_broadcast(&count_threads_cv);
352  }
353  pthread_mutex_unlock(&count_threads_mutex);
354#endif
355
356  if (giveup_code > 0) {
357    /* Return the total bytes (de-)compressed in threads */
358    return params.ntbytes;
359  }
360  else {
361    /* Compression/decompression gave up.  Return error code. */
362    return giveup_code;
363  }
364}
365
366
367/* Convenience functions for creating and releasing temporaries */
368void create_temporaries(void)
369{
370  int32_t tid;
371  size_t typesize = params.typesize;
372  size_t blocksize = params.blocksize;
373  /* Extended blocksize for temporary destination.  Extended blocksize
374   is only useful for compression in parallel mode, but it doesn't
375   hurt other modes either. */
376  size_t ebsize = blocksize + typesize*sizeof(int32_t);
377  uint8_t *tmp, *tmp2;
378
379  /* Create temporary area for each thread */
380  for (tid = 0; tid < nthreads; tid++) {
381#if defined(_WIN32)
382    tmp = (uint8_t *)_aligned_malloc(blocksize, 16);
383    tmp2 = (uint8_t *)_aligned_malloc(ebsize, 16);
384#elif defined __APPLE__
385    /* Mac OS X guarantees 16-byte alignment in small allocs */
386    tmp = (uint8_t *)malloc(blocksize);
387    tmp2 = (uint8_t *)malloc(ebsize);
388#else
389    posix_memalign((void **)&tmp, 16, blocksize);
390    posix_memalign((void **)&tmp2, 16, ebsize);
391#endif  /* _WIN32 */
392    params.tmp[tid] = tmp;
393    params.tmp2[tid] = tmp2;
394  }
395
396  init_temps_done = 1;
397  /* Update params for current temporaries */
398  current_temp.nthreads = nthreads;
399  current_temp.typesize = typesize;
400  current_temp.blocksize = blocksize;
401
402}
403
404
405void release_temporaries(void)
406{
407  int32_t tid;
408  uint8_t *tmp, *tmp2;
409
410  /* Release buffers */
411  for (tid = 0; tid < nthreads; tid++) {
412    tmp = params.tmp[tid];
413    tmp2 = params.tmp2[tid];
414#if defined(_WIN32)
415    _aligned_free(tmp);
416    _aligned_free(tmp2);
417#else
418    free(tmp);
419    free(tmp2);
420#endif  /* _WIN32 */
421  }
422
423  init_temps_done = 0;
424
425}
426
427
428/* Do the compression or decompression of the buffer depending on the
429   global params. */
430int do_job(void) {
431  int32_t ntbytes;
432
433  /* Initialize/reset temporaries if needed */
434  if (!init_temps_done) {
435    create_temporaries();
436  }
437  else if (current_temp.nthreads != nthreads ||
438           current_temp.typesize != params.typesize ||
439           current_temp.blocksize != params.blocksize) {
440    release_temporaries();
441    create_temporaries();
442  }
443
444  /* Run the serial version when nthreads is 1 or when the buffers are
445     not much larger than blocksize */
446  if (nthreads == 1 || (params.nbytes / params.blocksize) <= 1) {
447    ntbytes = serial_blosc();
448  }
449  else {
450    ntbytes = parallel_blosc();
451  }
452
453  return ntbytes;
454}
455
456
457size_t compute_blocksize(int32_t clevel, size_t typesize, size_t nbytes)
458{
459  size_t blocksize;
460
461  blocksize = nbytes;           /* Start by a whole buffer as blocksize */
462
463  if (force_blocksize) {
464    blocksize = force_blocksize;
465    /* Check that forced blocksize is not too small nor too large */
466    if (blocksize < MIN_BUFFERSIZE) {
467      blocksize = MIN_BUFFERSIZE;
468    }
469  }
470  else if (nbytes >= L1*typesize) {
471    blocksize = L1 * typesize;
472    if (clevel <= 3) {
473      blocksize /= 8;
474    }
475    else if (clevel <= 5) {
476      blocksize /= 4;
477    }
478    else if (clevel <= 6) {
479      blocksize /= 2;
480    }
481    else if (clevel < 9) {
482      blocksize *= 1;
483    }
484    else {
485      blocksize *= 2;
486    }
487  }
488
489  /* Check that blocksize is not too large */
490  if (blocksize > nbytes) {
491    blocksize = nbytes;
492  }
493
494  /* blocksize must be a multiple of the typesize */
495  blocksize = blocksize / typesize * typesize;
496
497  return blocksize;
498}
499
500
501unsigned int blosc_compress(int clevel, int shuffle, size_t typesize,
502                            size_t nbytes, const void *src, void *dest)
503{
504  uint8_t *_dest=NULL;         /* current pos for destination buffer */
505  uint8_t *flags;              /* flags for header */
506  uint32_t nblocks;            /* number of total blocks in buffer */
507  uint32_t leftover;           /* extra bytes at end of buffer */
508  uint32_t *bstarts;           /* start pointers for each block */
509  size_t blocksize;            /* length of the block in bytes */
510  uint32_t ntbytes = 0;        /* the number of compressed bytes */
511  uint32_t *ntbytes_;          /* placeholder for bytes in output buffer */
512
513  /* Compression level */
514  if (clevel < 0 || clevel > 9) {
515    /* If clevel not in 0..9, print an error */
516    fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
517    return -10;
518  }
519  else if (clevel == 0) {
520    /* No compression wanted.  Just return without doing anything else. */
521    return 0;
522  }
523  else if (nbytes < MIN_BUFFERSIZE) {
524    /* Too little buffer.  Just return without doing anything else. */
525    return 0;
526  }
527
528  /* Shuffle */
529  if (shuffle != 0 && shuffle != 1) {
530    fprintf(stderr, "`shuffle` parameter must be either 0 or 1!\n");
531    return -10;
532  }
533
534  /* Get the blocksize */
535  blocksize = compute_blocksize(clevel, typesize, nbytes);
536
537  /* Compute number of blocks in buffer */
538  nblocks = nbytes / blocksize;
539  leftover = nbytes % blocksize;
540  nblocks = (leftover>0)? nblocks+1: nblocks;
541
542  /* Check typesize limits */
543  if (typesize > MAX_TYPESIZE) {
544    /* If typesize is too large, treat buffer as an 1-byte stream. */
545    typesize = 1;
546  }
547
548  _dest = (uint8_t *)(dest);
549  /* Write header for this block */
550  _dest[0] = BLOSC_VERSION_FORMAT;         /* blosc format version */
551  _dest[1] = BLOSCLZ_VERSION_FORMAT;       /* blosclz format version */
552  flags = _dest+2;                         /* flags */
553  _dest[2] = 0;                            /* zeroes flags */
554  _dest[3] = (uint8_t)typesize;            /* type size */
555  _dest += 4;
556  ntbytes += 4;
557  ((uint32_t *)_dest)[0] = nbytes;         /* size of the buffer */
558  ((uint32_t *)_dest)[1] = blocksize;      /* block size */
559  ntbytes_ = (uint32_t *)(_dest+8);        /* compressed buffer size */
560  _dest += sizeof(int32_t)*3;
561  ntbytes += sizeof(int32_t)*3;
562  bstarts = (uint32_t *)_dest;             /* starts for every block */
563  _dest += sizeof(int32_t)*nblocks;        /* book space for pointers to */
564  ntbytes += sizeof(int32_t)*nblocks;      /* every block in output */
565
566  if (shuffle == 1) {
567    /* Shuffle is active */
568    *flags |= 0x1;                         /* bit 0 set to one in flags */
569  }
570
571  /* Populate parameters for compression routines */
572  params.compress = 1;
573  params.clevel = clevel;
574  params.shuffle = shuffle;
575  params.typesize = typesize;
576  params.blocksize = blocksize;
577  params.ntbytes = ntbytes;
578  params.nbytes = nbytes;
579  params.nblocks = nblocks;
580  params.leftover = leftover;
581  params.bstarts = bstarts;
582  params.src = (uint8_t *)src;
583  params.dest = (uint8_t *)dest;
584
585  /* Do the actual compression */
586  ntbytes = do_job();
587  /* Set the number of compressed bytes in header */
588  *ntbytes_ = ntbytes;
589
590  assert((int32_t)ntbytes < (int32_t)nbytes);
591  return ntbytes;
592}
593
594
595unsigned int blosc_decompress(const void *src, void *dest, size_t dest_size)
596{
597  uint8_t *_src=NULL;            /* current pos for source buffer */
598  uint8_t *_dest=NULL;           /* current pos for destination buffer */
599  uint8_t version, versionlz;    /* versions for compressed header */
600  uint8_t flags;                 /* flags for header */
601  int32_t shuffle = 0;           /* do unshuffle? */
602  int32_t ntbytes;               /* the number of uncompressed bytes */
603  uint32_t nblocks;              /* number of total blocks in buffer */
604  uint32_t leftover;             /* extra bytes at end of buffer */
605  uint32_t *bstarts;             /* start pointers for each block */
606  uint32_t typesize, blocksize, nbytes, ctbytes;
607
608  _src = (uint8_t *)(src);
609  _dest = (uint8_t *)(dest);
610
611  /* Read the header block */
612  version = _src[0];                   /* blosc format version */
613  versionlz = _src[1];                 /* blosclz format version */
614  flags = _src[2];                     /* flags */
615  typesize = (uint32_t)_src[3];        /* typesize */
616  _src += 4;
617  nbytes = ((uint32_t *)_src)[0];      /* buffer size */
618  blocksize = ((uint32_t *)_src)[1];   /* block size */
619  ctbytes = ((uint32_t *)_src)[2];     /* compressed buffer size */
620  _src += sizeof(int32_t)*3;
621  bstarts = (uint32_t *)_src;
622  /* Compute some params */
623  /* Total blocks */
624  nblocks = nbytes / blocksize;
625  leftover = nbytes % blocksize;
626  nblocks = (leftover>0)? nblocks+1: nblocks;
627  _src += sizeof(int32_t)*nblocks;
628
629  /* Check zero typesizes.  From Blosc version format 2 on, this value
630   has been reserved for future use (most probably to indicate
631   uncompressible buffers). */
632  if ((version == 1) && (typesize == 0)) {
633    typesize = 256;             /* 0 means 256 in format version 1 */
634  }
635
636  if (nbytes > dest_size) {
637    /* This should never happen but just in case */
638    return -1;
639  }
640
641  if ((flags & 0x1) == 1) {
642    /* Input is shuffled.  Unshuffle it. */
643    shuffle = 1;
644  }
645
646  /* Populate parameters for decompression routines */
647  params.compress = 0;
648  params.clevel = 0;            /* specific for compression */
649  params.shuffle = shuffle;
650  params.typesize = typesize;
651  params.blocksize = blocksize;
652  params.ntbytes = 0;
653  params.nbytes = nbytes;
654  params.nblocks = nblocks;
655  params.leftover = leftover;
656  params.bstarts = bstarts;
657  params.src = (uint8_t *)src;
658  params.dest = (uint8_t *)dest;
659
660  /* Do the actual decompression */
661  ntbytes = do_job();
662
663  assert(ntbytes <= (int32_t)dest_size);
664  return ntbytes;
665}
666
667
668/* Decompress & unshuffle several blocks in a single thread */
669void *t_blosc(void *tids)
670{
671  int32_t tid = *(int32_t *)tids;
672  int32_t cbytes, ntdest;
673  uint32_t tblocks;              /* number of blocks per thread */
674  uint32_t leftover2;
675  uint32_t tblock;               /* limit block on a thread */
676  uint32_t nblock_;              /* private copy of nblock */
677  int32_t rc;
678  uint32_t bsize, leftoverblock;
679  /* Parameters for threads */
680  size_t blocksize;
681  size_t ebsize;
682  int32_t compress;
683  uint32_t nbytes;
684  uint32_t ntbytes;
685  uint32_t nblocks;
686  uint32_t leftover;
687  uint32_t *bstarts;
688  uint8_t *src;
689  uint8_t *dest;
690  uint8_t *tmp;
691  uint8_t *tmp2;
692
693  while (1) {
694
695    init_sentinels_done = 0;     /* sentinels have to be initialised yet */
696   
697    /* Meeting point for all threads (wait for initialization) */
698#ifdef _POSIX_BARRIERS_MINE
699    rc = pthread_barrier_wait(&barr_init);
700    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
701      printf("Could not wait on barrier (init)\n");
702      exit(-1);
703    }
704#else
705    pthread_mutex_lock(&count_threads_mutex);
706    if (count_threads < nthreads) {
707      count_threads++;
708      pthread_cond_wait(&count_threads_cv, &count_threads_mutex);
709    }
710    else {
711      pthread_cond_broadcast(&count_threads_cv);
712    }
713    pthread_mutex_unlock(&count_threads_mutex);
714#endif
715
716    /* Check if thread has been asked to return */
717    if (end_threads) {
718      return(0);
719    }
720
721    pthread_mutex_lock(&count_mutex);
722    if (!init_sentinels_done) {
723      /* Set sentinels and other global variables */
724      giveup_code = 1;            /* no error code initially */
725      nblock = -1;                /* block counter */
726      init_sentinels_done = 1;    /* sentinels have been initialised */
727    }
728    pthread_mutex_unlock(&count_mutex);
729
730    /* Get parameters for this thread before entering the main loop */
731    blocksize = params.blocksize;
732    ebsize = blocksize + params.typesize*sizeof(int32_t);
733    compress = params.compress;
734    nbytes = params.nbytes;
735    nblocks = params.nblocks;
736    leftover = params.leftover;
737    bstarts = params.bstarts;
738    src = params.src;
739    dest = params.dest;
740    tmp = params.tmp[tid];
741    tmp2 = params.tmp2[tid];
742
743    ntbytes = 0;                /* only useful for decompression */
744
745    if (compress) {
746      /* Compression always has to follow the block order */
747      pthread_mutex_lock(&count_mutex);
748      nblock++;
749      nblock_ = nblock;
750      pthread_mutex_unlock(&count_mutex);
751      tblock = nblocks;
752    }
753    else {
754      /* Decompression can happen using any order.  We choose
755       sequential block order on each thread */
756
757      /* Blocks per thread */
758      tblocks = nblocks / nthreads;
759      leftover2 = nblocks % nthreads;
760      tblocks = (leftover2>0)? tblocks+1: tblocks;
761
762      nblock_ = tid*tblocks;
763      tblock = nblock_ + tblocks;
764      if (tblock > nblocks) {
765        tblock = nblocks;
766      }
767    }
768
769    /* Loop over blocks */
770    leftoverblock = 0;
771    while ((nblock_ < tblock) && giveup_code > 0) {
772      bsize = blocksize;
773      if (nblock_ == (nblocks - 1) && (leftover > 0)) {
774        bsize = leftover;
775        leftoverblock = 1;
776      }
777      if (compress) {
778        cbytes = blosc_c(bsize, leftoverblock, 0, ebsize,
779                         src+nblock_*blocksize, tmp2, tmp);
780      }
781      else {
782        cbytes = blosc_d(bsize, leftoverblock,
783                         src+bstarts[nblock_], dest+nblock_*blocksize,
784                         tmp, tmp2);
785      }
786
787      /* Check whether current thread has to giveup */
788      if (giveup_code <= 0) {
789        break;
790      }
791
792      /* Check results for the compressed/decompressed block */
793      if (cbytes < 0) {            /* compr/decompr failure */
794        /* Set giveup_code error */
795        pthread_mutex_lock(&count_mutex);
796        giveup_code = cbytes;
797        pthread_mutex_unlock(&count_mutex);
798        break;
799      }
800
801      if (compress) {
802        /* Start critical section */
803        pthread_mutex_lock(&count_mutex);
804        ntdest = params.ntbytes;
805        bstarts[nblock_] = ntdest;    /* update block start counter */
806        if ( (cbytes == 0) || (ntdest+cbytes > (int32_t)nbytes) ) {
807          giveup_code = 0;             /* uncompressible buffer */
808          pthread_mutex_unlock(&count_mutex);
809          break;
810        }
811        nblock++;
812        nblock_ = nblock;
813        params.ntbytes += cbytes;       /* update return bytes counter */
814        pthread_mutex_unlock(&count_mutex);
815        /* End of critical section */
816
817        /* Copy the compressed buffer to destination */
818        memcpy(dest+ntdest, tmp2, cbytes);
819      }
820      else {
821        nblock_++;
822        /* Update counter for this thread */
823        ntbytes += cbytes;
824      }
825
826    } /* closes while (nblock_) */
827
828    /* Sum up all the bytes decompressed */
829    if (!compress && giveup_code > 0) {
830      /* Update global counter for all threads (decompression only) */
831      pthread_mutex_lock(&count_mutex);
832      params.ntbytes += ntbytes;
833      pthread_mutex_unlock(&count_mutex);
834    }
835
836    /* Meeting point for all threads (wait for finalization) */
837#ifdef _POSIX_BARRIERS_MINE
838    rc = pthread_barrier_wait(&barr_finish);
839    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
840      printf("Could not wait on barrier (finish)\n");
841      exit(-1);
842    }
843#else
844    pthread_mutex_lock(&count_threads_mutex);
845    if (count_threads > 0) {
846      count_threads--;
847      pthread_cond_wait(&count_threads_cv, &count_threads_mutex);
848    }
849    else {
850      pthread_cond_broadcast(&count_threads_cv);
851    }
852    pthread_mutex_unlock(&count_threads_mutex);
853#endif
854
855  }  /* closes while(1) */
856
857  /* This should never be reached, but anyway */
858  return(0);
859}
860
861
862int init_threads(void)
863{
864  int32_t tid, rc;
865
866  /* Initialize mutex and condition variable objects */
867  pthread_mutex_init(&count_mutex, NULL);
868
869  /* Barrier initialization */
870#ifdef _POSIX_BARRIERS_MINE
871  pthread_barrier_init(&barr_init, NULL, nthreads+1);
872  pthread_barrier_init(&barr2_init, NULL, nthreads);
873  pthread_barrier_init(&barr_finish, NULL, nthreads+1);
874#else
875  pthread_mutex_init(&count_threads_mutex, NULL);
876  pthread_cond_init(&count_threads_cv, NULL);
877  pthread_mutex_init(&count2_threads_mutex, NULL);
878  pthread_cond_init(&count2_threads_cv, NULL);
879#endif
880
881  /* Initialize and set thread detached attribute */
882  pthread_attr_init(&ct_attr);
883  pthread_attr_setdetachstate(&ct_attr, PTHREAD_CREATE_JOINABLE);
884
885  /* Finally, create the threads in detached state */
886  for (tid = 0; tid < nthreads; tid++) {
887    tids[tid] = tid;
888    rc = pthread_create(&threads[tid], &ct_attr, t_blosc, (void *)&tids[tid]);
889    if (rc) {
890      fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc);
891      fprintf(stderr, "\tError detail: %s\n", strerror(rc));
892      exit(-1);
893    }
894  }
895
896  init_threads_done = 1;                 /* Initialization done! */
897
898  return(0);
899}
900
901
902int blosc_set_nthreads(int nthreads_new)
903{
904  int32_t nthreads_old = nthreads;
905  int32_t t, rc;
906  void *status;
907
908  if (nthreads_new > MAX_THREADS) {
909    fprintf(stderr, "Error.  nthreads cannot be larger than MAX_THREADS (%d)",
910            MAX_THREADS);
911    return -1;
912  }
913  else if (nthreads_new <= 0) {
914    fprintf(stderr, "Error.  nthreads must be a positive integer");
915    return -1;
916  }
917  else if (nthreads_new != nthreads) {
918    if (nthreads > 1 && init_threads_done) {
919      /* Tell all existing threads to finish */
920      end_threads = 1;
921#ifdef _POSIX_BARRIERS_MINE
922      rc = pthread_barrier_wait(&barr_init);
923      if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
924        printf("Could not wait on barrier (init)\n");
925        exit(-1);
926      }
927#else
928      pthread_mutex_lock(&count_threads_mutex);
929      if (count_threads < nthreads) {
930        count_threads++;
931        pthread_cond_wait(&count_threads_cv, &count_threads_mutex);
932      }
933      else {
934        pthread_cond_broadcast(&count_threads_cv);
935        count_threads = 0;      /* Reset count to 0 */
936      }
937      pthread_mutex_unlock(&count_threads_mutex);
938#endif
939
940      /* Join exiting threads */
941      for (t=0; t<nthreads; t++) {
942        rc = pthread_join(threads[t], &status);
943        if (rc) {
944          fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc);
945          fprintf(stderr, "\tError detail: %s\n", strerror(rc));
946          exit(-1);
947        }
948      }
949      init_threads_done = 0;
950      end_threads = 0;
951    }
952    nthreads = nthreads_new;
953    if (nthreads > 1) {
954      /* Launch a new pool of threads */
955      init_threads();
956    }
957  }
958  return nthreads_old;
959}
960
961
962/* Free possible memory temporaries and thread resources */
963void blosc_free_resources(void)
964{
965  int32_t t, rc;
966  void *status;
967
968  /* Release temporaries */
969  if (init_temps_done) {
970    release_temporaries();
971  }
972
973  /* Finish the possible thread pool */
974  if (nthreads > 1 && init_threads_done) {
975    /* Tell all existing threads to finish */
976    end_threads = 1;
977#ifdef _POSIX_BARRIERS_MINE
978    rc = pthread_barrier_wait(&barr_init);
979    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
980        printf("Could not wait on barrier (init)\n");
981      exit(-1);
982    }
983#else
984    pthread_mutex_lock(&count_threads_mutex);
985    if (count_threads < nthreads) {
986      count_threads++;
987      pthread_cond_wait(&count_threads_cv, &count_threads_mutex);
988    }
989    else {
990      pthread_cond_broadcast(&count_threads_cv);
991      count_threads = 0;      /* Reset count to 0 */
992    }
993    pthread_mutex_unlock(&count_threads_mutex);
994#endif
995
996    /* Join exiting threads */
997    for (t=0; t<nthreads; t++) {
998      rc = pthread_join(threads[t], &status);
999      if (rc) {
1000        fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc);
1001        fprintf(stderr, "\tError detail: %s\n", strerror(rc));
1002        exit(-1);
1003      }
1004    }
1005
1006    /* Release mutex and condition variable objects */
1007    pthread_mutex_destroy(&count_mutex);
1008
1009    /* Barriers */
1010#ifdef _POSIX_BARRIERS_MINE
1011    pthread_barrier_destroy(&barr_init);
1012    pthread_barrier_destroy(&barr2_init);
1013    pthread_barrier_destroy(&barr_finish);
1014#else
1015    pthread_mutex_destroy(&count_threads_mutex);
1016    pthread_cond_destroy(&count_threads_cv);
1017    pthread_mutex_destroy(&count2_threads_mutex);
1018    pthread_cond_destroy(&count2_threads_cv);
1019#endif
1020
1021    /* Thread attributes */
1022    pthread_attr_destroy(&ct_attr);
1023
1024    init_threads_done = 0;
1025    end_threads = 0;
1026  }
1027}
1028
1029
1030/* Force the use of a specific blocksize.  If 0, an automatic
1031   blocksize will be used (the default). */
1032void blosc_set_blocksize(size_t size)
1033{
1034  force_blocksize = size;
1035}
1036
Note: See TracBrowser for help on using the repository browser.