source: branches/threaded/src/blosc.c @ 63

Revision 63, 24.3 KB checked in by faltet, 3 years ago (diff)

Optimizations for small buffers.

1.- When buffer size is <= 128 KB, then do not split in blocks

2.- When buffer size is <= 128 KB, always use the serial code

Line 
1/*********************************************************************
2  Blosc - Blocked Suffling and Compression Library
3
4  Author: Francesc Alted (faltet@pytables.org)
5  Creation date: 2009-05-20
6
7  See LICENSES/BLOSC.txt for details about copyright and rights to use.
8**********************************************************************/
9
10
11#include <stdlib.h>
12#include <stdio.h>
13#include <string.h>
14#include <sys/types.h>
15#include <sys/stat.h>
16#include <assert.h>
17#include "blosc.h"
18#include "blosclz.h"
19#include "shuffle.h"
20#ifdef _WIN32
21  #include <windows.h>
22#else
23  #include <stdint.h>
24  #include <unistd.h>
25  #include <pthread.h>
26#endif  /* _WIN32 */
27
28
29/* Starting point for the blocksize computation */
30#define SP_BLOCKSIZE (2*1024)
31
32/* Minimal blocksize before splitting it up */
33#define MIN_BLOCKSIZE (128*1024)
34
35/* Maximum typesize before considering buffer as a stream of bytes. */
36#define MAXTYPESIZE 256         /* Cannot be larger than 256 */
37
38/* The maximum number of splits in a block for compression */
39#define MAXSPLITS 16         /* Cannot be larger than 128 */
40
41/* The number of threads */
42#define MAX_THREADS 64
43
44
45/* Global variables for threads */
46int nthreads = 1;               /* number of desired threads in pool */
47int init_threads_done = 0;      /* pool of threads initialized? */
48int end_threads = 0;            /* should exisiting threads end? */
49int init_temps_done = 0;        /* temporaries in threads initialized? */
50int giveup;                     /* should (de-)compression give up? */
51int nblock = -1;                /* block counter */
52pthread_t threads[MAX_THREADS]; /* opaque structure for threads */
53int tids[MAX_THREADS];          /* ID per each thread */
54pthread_attr_t ct_attr;         /* creation time attributes for threads */
55
56/* Syncronization variables */
57pthread_mutex_t count_mutex;
58pthread_barrier_t barr_init;
59pthread_barrier_t barr_inter;
60pthread_barrier_t barr_finish;
61
62/* Structure for parameters in (de-)compression threads */
63struct thread_data {
64  size_t typesize;
65  size_t blocksize;
66  int compress;
67  int clevel;
68  int shuffle;
69  int ntbytes;
70  unsigned int nbytes;
71  unsigned int nblocks;
72  unsigned int leftover;
73  unsigned int *bstarts;             /* start pointers for each block */
74  unsigned char *src;
75  unsigned char *dest;
76  unsigned char *tmp[MAX_THREADS];
77  unsigned char *tmp2[MAX_THREADS];
78} params;
79
80
81/* Structure for parameters meant for keeping track of current temporaries */
82struct temp_data {
83  int nthreads;
84  size_t typesize;
85  size_t blocksize;
86} current_temp;
87
88
89
90/* Shuffle & Compress a single block */
91static int
92blosc_c(size_t blocksize, int leftoverblock,
93        unsigned int ntbytes, unsigned int nbytes,
94        unsigned char *src, unsigned char *dest, unsigned char *tmp)
95{
96  size_t j, neblock, nsplits;
97  int cbytes;                   /* number of compressed bytes in split */
98  int ctbytes = 0;              /* number of compressed bytes in block */
99  int maxout;
100  unsigned char *_tmp;
101  size_t typesize = params.typesize;
102
103  if (params.shuffle && (typesize > 1)) {
104    /* Shuffle this block (this makes sense only if typesize > 1) */
105    shuffle(typesize, blocksize, src, tmp);
106    _tmp = tmp;
107  }
108  else {
109    _tmp = src;
110  }
111
112  /* Compress for each shuffled slice split for this block. */
113  /* If the number of bytes of type is too large, or we are in a leftover
114     block, do not split at all. */
115  if ((typesize <= MAXSPLITS) && (!leftoverblock)) {
116    nsplits = typesize;
117  }
118  else {
119    nsplits = 1;
120  }
121  neblock = blocksize / nsplits;
122  for (j = 0; j < nsplits; j++) {
123    dest += sizeof(int);
124    ntbytes += sizeof(int);
125    ctbytes += sizeof(int);
126    maxout = neblock;
127    if (ntbytes+maxout > nbytes) {
128      maxout = nbytes - ntbytes;   /* avoid buffer overrun */
129      if (maxout <= 0) {
130        return 0;                  /* non-compressible block */
131      }
132    }
133    cbytes = blosclz_compress(params.clevel, _tmp+j*neblock, neblock,
134                              dest, maxout-1);
135    if (cbytes >= maxout) {
136      /* Buffer overrun caused by blosclz_compress (should never happen) */
137      return -1;
138    }
139    else if (cbytes < 0) {
140      /* cbytes should never be negative */
141      return -2;
142    }
143    else if (cbytes == 0) {
144      /* The compressor has been unable to compress data significantly. */
145      /* Before doing the copy, check that we are not running into a
146         buffer overflow. */
147      if ((ntbytes+neblock) > nbytes) {
148        return 0;    /* Non-compressible data */
149      }
150      memcpy(dest, _tmp+j*neblock, neblock);
151      cbytes = neblock;
152    }
153    ((unsigned int *)(dest))[-1] = cbytes;
154    dest += cbytes;
155    ntbytes += cbytes;
156    ctbytes += cbytes;
157  }  /* Closes j < nsplits */
158
159  return ctbytes;
160}
161
162
163size_t
164compute_blocksize(int clevel, size_t typesize, size_t nbytes)
165{
166  size_t blocksize;
167  int i;
168
169  if (nbytes > MIN_BLOCKSIZE) {
170    /* Compute a blocksize depending on the optimization level */
171    blocksize = SP_BLOCKSIZE;
172    /* 3 first optimization levels will not change blocksize */
173    for (i=2; i<=(unsigned int)clevel; i++) {
174      /* Escape if blocksize grows more than nbytes */
175      if (blocksize*2 > nbytes) break;
176      blocksize *= 2;
177    }
178  }
179  else {
180    /* Buffer size is too small for splitting it in blocks */
181    blocksize = nbytes;
182  }
183  /* blocksize must be a multiple of the typesize */
184  blocksize = blocksize / typesize * typesize;
185
186  return blocksize;
187}
188
189
190unsigned int
191blosc_compress(int clevel, int shuffle, size_t typesize, size_t nbytes,
192               const void *src, void *dest)
193{
194  unsigned char *_dest=NULL;       /* current pos for destination buffer */
195  unsigned char *flags;            /* flags for header */
196  unsigned int nblocks;            /* number of total blocks in buffer */
197  unsigned int leftover;           /* extra bytes at end of buffer */
198  unsigned int *bstarts;           /* start pointers for each block */
199  size_t blocksize;                /* length of the block in bytes */
200  unsigned int ntbytes = 0;        /* the number of compressed bytes */
201  unsigned int *ntbytes_;          /* placeholder for bytes in output buffer */
202
203  /* Compression level */
204  if (clevel < 0 || clevel > 9) {
205    /* If clevel not in 0..9, print an error */
206    fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
207    return -10;
208  }
209  else if (clevel == 0) {
210    /* No compression wanted.  Just return without doing anything else. */
211    return 0;
212  }
213
214  /* Shuffle */
215  if (shuffle != 0 && shuffle != 1) {
216    fprintf(stderr, "`shuffle` parameter must be either 0 or 1!\n");
217    return -10;
218  }
219
220  /* Get the blocksize */
221  blocksize = compute_blocksize(clevel, typesize, nbytes);
222
223  /* Compute number of blocks in buffer */
224  nblocks = nbytes / blocksize;
225  leftover = nbytes % blocksize;
226  nblocks = (leftover>0)? nblocks+1: nblocks;
227
228  /* Check typesize limits */
229  if (typesize == MAXTYPESIZE) {
230    typesize = 0;               /* zero means MAXTYPESIZE */
231  }
232  else if (typesize > MAXTYPESIZE) {
233    /* If typesize is too large, treat buffer as an 1-byte stream. */
234    typesize = 1;
235  }
236
237  _dest = (unsigned char *)(dest);
238  /* Write header for this block */
239  _dest[0] = BLOSC_VERSION_FORMAT;         /* blosc format version */
240  _dest[1] = BLOSCLZ_VERSION_FORMAT;       /* blosclz format version */
241  flags = _dest+2;                         /* flags */
242  _dest[2] = 0;                            /* zeroes flags */
243  _dest[3] = (unsigned char)typesize;      /* type size */
244  _dest += 4;
245  ntbytes += 4;
246  ((unsigned int *)_dest)[0] = nbytes;     /* size of the buffer */
247  ((unsigned int *)_dest)[1] = blocksize;  /* block size */
248  ntbytes_ = (unsigned int *)(_dest+8);    /* compressed buffer size */
249  _dest += sizeof(int)*3;
250  ntbytes += sizeof(int)*3;
251  bstarts = (unsigned int *)_dest;         /* starts for every block */
252  _dest += sizeof(int)*nblocks;            /* book space for pointers to */
253  ntbytes += sizeof(int)*nblocks;          /* every block in output */
254
255  if (shuffle == 1) {
256    /* Shuffle is active */
257    *flags |= 0x1;                         /* bit 0 set to one in flags */
258  }
259
260  /* Populate parameters for compression routines */
261  params.compress = 1;
262  params.clevel = clevel;
263  params.shuffle = shuffle;
264  params.typesize = typesize;
265  params.blocksize = blocksize;
266  params.ntbytes = ntbytes;
267  params.nbytes = nbytes;
268  params.nblocks = nblocks;
269  params.leftover = leftover;
270  params.bstarts = bstarts;
271  params.src = (unsigned char *)src;
272  params.dest = (unsigned char *)dest;
273
274  /* Do the actual compression */
275  ntbytes = do_job();
276  /* Set the number of compressed bytes in header */
277  *ntbytes_ = ntbytes;
278
279  assert(ntbytes < (int)nbytes);
280  return ntbytes;
281}
282
283
284/* Decompress & unshuffle a single block */
285static int
286blosc_d(size_t blocksize, int leftoverblock,
287        unsigned char *src, unsigned char *dest,
288        unsigned char *tmp, unsigned char *tmp2)
289{
290  int j, neblock, nsplits;
291  int nbytes;                /* number of decompressed bytes in split */
292  int cbytes;                /* number of compressed bytes in split */
293  int ctbytes = 0;           /* number of compressed bytes in block */
294  int ntbytes = 0;           /* number of uncompressed bytes in block */
295  unsigned char *_tmp;
296  size_t typesize = params.typesize;
297  size_t shuffle = params.shuffle;
298
299  if (shuffle && (typesize > 1)) {
300    _tmp = tmp;
301  }
302  else {
303    _tmp = dest;
304  }
305
306  /* Compress for each shuffled slice split for this block. */
307  /* If the number of bytes is too large, do not split all. */
308  if ((typesize <= MAXSPLITS) && (!leftoverblock)) {
309    nsplits = typesize;
310  }
311  else {
312    nsplits = 1;
313  }
314  neblock = blocksize / nsplits;
315  for (j = 0; j < nsplits; j++) {
316    cbytes = ((unsigned int *)(src))[0]; /* amount of compressed bytes */
317    src += sizeof(int);
318    ctbytes += sizeof(int);
319    /* Uncompress */
320    if (cbytes == neblock) {
321      memcpy(_tmp, src, neblock);
322      nbytes = neblock;
323    }
324    else {
325      nbytes = blosclz_decompress(src, cbytes, _tmp, neblock);
326      if (nbytes != neblock) {
327        return -2;
328      }
329    }
330    src += cbytes;
331    ctbytes += cbytes;
332    _tmp += nbytes;
333    ntbytes += nbytes;
334  } /* Closes j < nsplits */
335
336  if (shuffle && (typesize > 1)) {
337    if ((uintptr_t)dest % 16 == 0) {
338      /* 16-bytes aligned dest.  SSE2 unshuffle will work. */
339      unshuffle(typesize, blocksize, tmp, dest);
340    }
341    else {
342      /* dest is not aligned.  Use tmp2, which is aligned, and copy. */
343      unshuffle(typesize, blocksize, tmp, tmp2);
344      memcpy(dest, tmp2, blocksize);
345    }
346  }
347
348  /* Return the number of uncompressed bytes */
349  return ntbytes;
350}
351
352
353unsigned int
354blosc_decompress(const void *src, void *dest, size_t dest_size)
355{
356  unsigned char *_src=NULL;          /* current pos for source buffer */
357  unsigned char *_dest=NULL;         /* current pos for destination buffer */
358  unsigned char version, versionlz;  /* versions for compressed header */
359  unsigned char flags;               /* flags for header */
360  int shuffle = 0;                   /* do unshuffle? */
361  int ntbytes;                       /* the number of uncompressed bytes */
362  unsigned int nblocks;              /* number of total blocks in buffer */
363  unsigned int leftover;             /* extra bytes at end of buffer */
364  unsigned int *bstarts;             /* start pointers for each block */
365  unsigned int typesize, blocksize, nbytes, ctbytes;
366  unsigned char *tmp, *tmp2;
367  int tid;
368
369  _src = (unsigned char *)(src);
370  _dest = (unsigned char *)(dest);
371
372  /* Read the header block */
373  version = _src[0];                        /* blosc format version */
374  versionlz = _src[1];                      /* blosclz format version */
375  flags = _src[2];                          /* flags */
376  typesize = (unsigned int)_src[3];         /* typesize */
377  _src += 4;
378  nbytes = ((unsigned int *)_src)[0];       /* buffer size */
379  blocksize = ((unsigned int *)_src)[1];    /* block size */
380  ctbytes = ((unsigned int *)_src)[2];      /* compressed buffer size */
381  _src += sizeof(int)*3;
382  bstarts = (unsigned int *)_src;
383  /* Compute some params */
384  /* Total blocks */
385  nblocks = nbytes / blocksize;
386  leftover = nbytes % blocksize;
387  nblocks = (leftover>0)? nblocks+1: nblocks;
388  _src += sizeof(int)*nblocks;
389
390  /* Check zero typesizes */
391  if (typesize == 0) {
392    typesize = MAXTYPESIZE;
393  }
394
395  if (nbytes > dest_size) {
396    /* This should never happen but just in case */
397    return -1;
398  }
399
400  if ((flags & 0x1) == 1) {
401    /* Input is shuffled.  Unshuffle it. */
402    shuffle = 1;
403  }
404
405  /* Populate parameters for decompression routines */
406  params.compress = 0;
407  params.clevel = 0;            /* specific for compression */
408  params.shuffle = shuffle;
409  params.typesize = typesize;
410  params.blocksize = blocksize;
411  params.ntbytes = 0;
412  params.nbytes = nbytes;
413  params.nblocks = nblocks;
414  params.leftover = leftover;
415  params.bstarts = bstarts;
416  params.src = (unsigned char *)src;
417  params.dest = (unsigned char *)dest;
418
419  /* Do the actual decompression */
420  ntbytes = do_job();
421
422  assert(ntbytes <= (int)dest_size);
423  return ntbytes;
424}
425
426
427/* Convenience functions for creating and releasing temporaries */
428void
429create_temporaries(void)
430{
431  int tid;
432  size_t typesize = params.typesize;
433  size_t blocksize = params.blocksize;
434  /* Extended blocksize for temporary destination.  Extended blocksize
435   is only useful for compression in parallel mode, but it doesn't
436   hurt other modes either. */
437  size_t ebsize = blocksize + typesize*sizeof(int);
438  unsigned char *tmp, *tmp2;
439
440  /* Create temporary area for each thread */
441  for (tid = 0; tid < nthreads; tid++) {
442#ifdef _WIN32
443    tmp = (unsigned char *)_aligned_malloc(blocksize, 16);
444    tmp2 = (unsigned char *)_aligned_malloc(ebsize, 16);
445#elif defined __APPLE__
446    /* Mac OS X guarantees 16-byte alignment in small allocs */
447    tmp = (unsigned char *)malloc(blocksize);
448    tmp2 = (unsigned char *)malloc(ebsize);
449#else
450    posix_memalign((void **)&tmp, 16, blocksize);
451    posix_memalign((void **)&tmp2, 16, ebsize);
452#endif  /* _WIN32 */
453    params.tmp[tid] = tmp;
454    params.tmp2[tid] = tmp2;
455  }
456
457  init_temps_done = 1;
458  /* Update params for current temporaries */
459  current_temp.nthreads = nthreads;
460  current_temp.typesize = typesize;
461  current_temp.blocksize = blocksize;
462
463}
464
465
466void
467release_temporaries(void)
468{
469  int tid;
470  unsigned char *tmp, *tmp2;
471
472  /* Release buffers */
473  for (tid = 0; tid < nthreads; tid++) {
474    tmp = params.tmp[tid];
475    tmp2 = params.tmp2[tid];
476#ifdef _WIN32
477    _aligned_free(tmp);
478    _aligned_free(tmp2);
479#else
480    free(tmp);
481    free(tmp2);
482#endif  /* _WIN32 */
483  }
484
485  init_temps_done = 0;
486
487}
488
489
490/* Do the compression or decompression of the buffer depending on the
491   global params. */
492int
493do_job(void) {
494  int ntbytes;
495
496  /* Initialize/reset temporaries if needed */
497  if (!init_temps_done) {
498    create_temporaries();
499  }
500  else if (current_temp.nthreads != nthreads ||
501           current_temp.typesize != params.typesize ||
502           current_temp.blocksize != params.blocksize) {
503    release_temporaries();
504    create_temporaries();
505  }
506
507  if (nthreads == 1 || params.nbytes <= MIN_BLOCKSIZE) {
508    ntbytes = serial_blosc();
509  }
510  else {
511    ntbytes = parallel_blosc();
512  }
513
514  return ntbytes;
515}
516
517
518/* Serial version for compression/decompression */
519int
520serial_blosc(void)
521{
522  int j, bsize, leftoverblock, cbytes;
523  int compress = params.compress;
524  int clevel = params.clevel;
525  int shuffle = params.shuffle;
526  unsigned int typesize = params.typesize;
527  unsigned int blocksize = params.blocksize;
528  int ntbytes = params.ntbytes;
529  int nbytes = params.nbytes;
530  unsigned int nblocks = params.nblocks;
531  int leftover = params.nbytes % params.blocksize;
532  unsigned int *bstarts = params.bstarts;
533  unsigned char *src = params.src;
534  unsigned char *dest = params.dest;
535  unsigned char *tmp = params.tmp[0];     /* tmp for thread 0 */
536  unsigned char *tmp2 = params.tmp2[0];   /* tmp2 for thread 0 */
537
538  for (j = 0; j < nblocks; j++) {
539    if (compress) {
540      bstarts[j] = ntbytes;
541    }
542    bsize = blocksize;
543    leftoverblock = 0;
544    if ((j == nblocks - 1) && (leftover > 0)) {
545      bsize = leftover;
546      leftoverblock = 1;
547    }
548    if (compress) {
549      cbytes = blosc_c(bsize, leftoverblock, ntbytes, nbytes,
550                       src+j*blocksize, dest+bstarts[j], tmp);
551      if (cbytes == 0) {
552        ntbytes = 0;              /* uncompressible data */
553        break;
554      }
555    }
556    else {
557      cbytes = blosc_d(bsize, leftoverblock,
558                       src+bstarts[j], dest+j*blocksize, tmp, tmp2);
559    }
560    if (cbytes < 0) {
561      ntbytes = cbytes;         /* error in blosc_c / blosc_d */
562      break;
563    }
564    ntbytes += cbytes;
565  }
566
567  /* Final check for ntbytes (only in compression mode) */
568  if (compress) {
569    if (ntbytes == nbytes) {
570      ntbytes = 0;               /* non-compressible data */
571    }
572    else if (ntbytes > nbytes) {
573      fprintf(stderr, "The impossible happened: buffer overflow!\n");
574      ntbytes = -5;               /* too large buffer */
575    }
576  }  /* Close j < nblocks */
577
578  return ntbytes;
579}
580
581
582/* Threaded version for compression/decompression */
583int
584parallel_blosc(void)
585{
586  int rc;
587
588  /* Synchronization point for all threads (wait for initialization) */
589  rc = pthread_barrier_wait(&barr_init);
590  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
591    printf("Could not wait on barrier (init)\n");
592    exit(-1);
593  }
594  /* Synchronization point for all threads (wait for finalization) */
595  rc = pthread_barrier_wait(&barr_finish);
596  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
597    printf("Could not wait on barrier (finish)\n");
598    exit(-1);
599  }
600
601  /* Return the total bytes decompressed in threads */
602  return params.ntbytes;
603}
604
605
606/* Decompress & unshuffle several blocks in a single thread */
607void *t_blosc(void *tids)
608{
609  int tid = *(int *)tids;
610  int cbytes, ntdest;
611  unsigned int tblocks;              /* number of blocks per thread */
612  unsigned int leftover2;
613  int nblock_;                       /* private copy of nblock */
614  int tblock;                        /* limit block on a thread */
615  int rc;
616  unsigned int bsize, leftoverblock;
617
618  while (1) {
619    /* Meeting point for all threads (wait for initialization) */
620    rc = pthread_barrier_wait(&barr_init);
621    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
622      printf("Could not wait on barrier (init)\n");
623      exit(-1);
624    }
625    if (end_threads) {
626      return(0);
627    }
628
629    /* Get parameters for this thread */
630    size_t typesize = params.typesize;
631    size_t blocksize = params.blocksize;
632    size_t ebsize = blocksize + params.typesize*sizeof(int);
633    int compress = params.compress;
634    int clevel = params.clevel;
635    int shuffle = params.shuffle;
636    unsigned int nbytes = params.nbytes;
637    unsigned int ntbytes = params.ntbytes;
638    unsigned int nblocks = params.nblocks;
639    unsigned int leftover = params.leftover;
640    unsigned int *bstarts = params.bstarts;
641    unsigned char *src = params.src;
642    unsigned char *dest = params.dest;
643    unsigned char *tmp = params.tmp[tid];
644    unsigned char *tmp2 = params.tmp2[tid];
645
646    giveup = 0;
647    if (compress) {
648      /* Compression always has to follow the block order */
649      pthread_mutex_lock(&count_mutex);
650      nblock++;
651      nblock_ = nblock;
652      pthread_mutex_unlock(&count_mutex);
653      tblock = nblocks;
654    }
655    else {
656      /* Decompression can happen using any order.  We choose
657       sequential block order on each thread */
658
659      /* Blocks per thread */
660      tblocks = nblocks / nthreads;
661      leftover2 = nblocks % nthreads;
662      tblocks = (leftover2>0)? tblocks+1: tblocks;
663
664      nblock_ = tid*tblocks;
665      tblock = nblock_ + tblocks;
666      if (tblock > nblocks) {
667        tblock = nblocks;
668      }
669      ntbytes = 0;
670    }
671
672    /* Loop over blocks */
673    leftoverblock = 0;
674    while ((nblock_ < tblock) && !giveup) {
675      bsize = blocksize;
676      if (nblock_ == (nblocks - 1) && (leftover > 0)) {
677        bsize = leftover;
678        leftoverblock = 1;
679      }
680      if (compress) {
681        cbytes = blosc_c(bsize, leftoverblock, 0, ebsize,
682                         src+nblock_*blocksize, tmp2, tmp);
683      }
684      else {
685        cbytes = blosc_d(bsize, leftoverblock,
686                         src+bstarts[nblock_], dest+nblock_*blocksize,
687                         tmp, tmp2);
688      }
689
690      /* Check whether current thread has to giveup */
691      /* This is critical in order to not overwrite variables */
692      if (giveup != 0) {
693        break;
694      }
695
696      /* Check results for the decompressed block */
697      if (cbytes < 0) {            /* compr/decompr failure */
698        /* Set cbytes code error first */
699        pthread_mutex_lock(&count_mutex);
700        params.ntbytes = cbytes;
701        pthread_mutex_unlock(&count_mutex);
702        giveup = 1;                 /* give up (de-)compressing after */
703        break;
704      }
705
706      if (compress) {
707        /* Start critical section */
708        pthread_mutex_lock(&count_mutex);
709        if (cbytes == 0) {            /* uncompressible buffer */
710          params.ntbytes = 0;
711          pthread_mutex_unlock(&count_mutex);
712          giveup = 1;                 /* give up compressing */
713          break;
714        }
715        bstarts[nblock_] = params.ntbytes;   /* update block start counter */
716        ntdest = params.ntbytes;
717        if (ntdest+cbytes > nbytes) {         /* uncompressible buffer */
718          params.ntbytes = 0;
719          pthread_mutex_unlock(&count_mutex);
720          giveup = 1;
721          break;
722        }
723        nblock++;
724        nblock_ = nblock;
725        params.ntbytes += cbytes;       /* update return bytes counter */
726        pthread_mutex_unlock(&count_mutex);
727        /* End of critical section */
728
729        /* Copy the compressed buffer to destination */
730        memcpy(dest+ntdest, tmp2, cbytes);
731      }
732      else {
733        nblock_++;
734      }
735      /* Update counter for this thread */
736      ntbytes += cbytes;
737
738    } /* closes while (nblock_) */
739
740    if (!compress && !giveup) {
741      /* Update global counter for all threads (decompression only) */
742      pthread_mutex_lock(&count_mutex);
743      params.ntbytes += ntbytes;
744      pthread_mutex_unlock(&count_mutex);
745    }
746
747    /* Meeting point for all threads (wait for finalization) */
748    rc = pthread_barrier_wait(&barr_finish);
749    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
750      printf("Could not wait on barrier (finish)\n");
751      exit(-1);
752    }
753    /* Reset block counter */
754    nblock = -1;
755
756  }  /* closes while(1) */
757
758  /* This should never be reached, but anyway */
759  return(0);
760}
761
762
763int
764init_threads()
765{
766  int tid, rc;
767
768  /* Initialize mutex and condition variable objects */
769  pthread_mutex_init(&count_mutex, NULL);
770
771  /* Barrier initialization */
772  pthread_barrier_init(&barr_init, NULL, nthreads+1);
773  pthread_barrier_init(&barr_inter, NULL, nthreads);
774  pthread_barrier_init(&barr_finish, NULL, nthreads+1);
775
776  /* Initialize and set thread detached attribute */
777  pthread_attr_init(&ct_attr);
778  pthread_attr_setdetachstate(&ct_attr, PTHREAD_CREATE_JOINABLE);
779
780  /* Finally, create the threads in detached state */
781  for (tid = 0; tid < nthreads; tid++) {
782    tids[tid] = tid;
783    rc = pthread_create(&threads[tid], &ct_attr, t_blosc, (void *)&tids[tid]);
784    if (rc) {
785      fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc);
786      fprintf(stderr, "\tError detail: %s\n", strerror(rc));
787      exit(-1);
788    }
789  }
790
791  init_threads_done = 1;                 /* Initialization done! */
792
793  return(0);
794}
795
796
797int
798blosc_set_nthreads(int nthreads_new)
799{
800  int nthreads_old = nthreads;
801  int t, rc;
802  void *status;
803
804  if (nthreads_new > MAX_THREADS) {
805    fprintf(stderr, "Error.  nthreads cannot be larger than MAX_THREADS (%d)",
806            MAX_THREADS);
807    return -1;
808  }
809  else if (nthreads_new <= 0) {
810    fprintf(stderr, "Error.  nthreads must be a positive integer");
811    return -1;
812  }
813  else if (nthreads_new != nthreads) {
814    if (nthreads > 1 && init_threads_done) {
815      /* Tell all existing threads to end */
816      end_threads = 1;
817      rc = pthread_barrier_wait(&barr_init);
818      if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
819        printf("Could not wait on barrier (init)\n");
820        exit(-1);
821      }
822      /* Join exiting threads */
823      for (t=0; t<nthreads; t++) {
824        rc = pthread_join(threads[t], &status);
825        if (rc) {
826          fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc);
827          fprintf(stderr, "\tError detail: %s\n", strerror(rc));
828          exit(-1);
829        }
830      }
831      init_threads_done = 0;
832      end_threads = 0;
833    }
834    nthreads = nthreads_new;
835    if (nthreads > 1) {
836      /* Launch a new pool of threads */
837      init_threads();
838    }
839    return nthreads_old;
840  }
841  return 0;
842}
843
Note: See TracBrowser for help on using the repository browser.