source: branches/threaded/src/blosc.c @ 55

Revision 55, 21.4 KB checked in by faltet, 3 years ago (diff)

Added a threaded version for the compression routine.

The speed-up is a little better than with decompression, but still,
too small for typical blocksizes (8 KB ~ 128 KB). Mmm, I'm afraid
that I'll need to implement some kind of thead pool in order to avoid
thread creation overhead.

Line 
1/*********************************************************************
2  Blosc - Blocked Suffling and Compression Library
3
4  Author: Francesc Alted (faltet@pytables.org)
5  Creation date: 2009-05-20
6
7  See LICENSES/BLOSC.txt for details about copyright and rights to use.
8**********************************************************************/
9
10
11#include <stdlib.h>
12#include <stdio.h>
13#include <string.h>
14#include <sys/types.h>
15#include <sys/stat.h>
16#include <assert.h>
17#include "blosc.h"
18#include "blosclz.h"
19#include "shuffle.h"
20#ifdef _WIN32
21  #include <windows.h>
22#else
23  #include <stdint.h>
24  #include <unistd.h>
25  #include <pthread.h>
26#endif  /* _WIN32 */
27
28
29/* Starting point for the blocksize computation */
30#define BLOCKSIZE (4*1024)      /* 4 KB (page size) */
31
32/* Maximum typesize before considering buffer as a stream of bytes. */
33#define MAXTYPESIZE 256         /* Cannot be larger than 256 */
34
35/* The maximum number of splits in a block for compression */
36#define MAXSPLITS 16         /* Cannot be larger than 128 */
37
38/* The number of threads */
39#define NUM_THREADS 4
40
41/* Global variables for compressing/shuffling actions */
42int clevel;                     /* Compression level */
43int do_shuffle;                 /* Shuffle is active? */
44
45
46
47/* Shuffle & Compress a single block */
48static size_t
49_blosc_c(int clevel, int doshuffle, size_t typesize, size_t blocksize,
50         int leftoverblock, unsigned int ctbytes, unsigned int nbytes,
51         unsigned char* _src, unsigned char* _dest, unsigned char *tmp)
52{
53  size_t j, neblock, nsplits;
54  int cbytes, maxout;
55  unsigned int btbytes = 0;
56  unsigned char* _tmp;
57
58  if (doshuffle && (typesize > 1)) {
59    /* Shuffle this block (this makes sense only if typesize > 1) */
60    shuffle(typesize, blocksize, _src, tmp);
61    _tmp = tmp;
62  }
63  else {
64    _tmp = _src;
65  }
66
67  /* Compress for each shuffled slice split for this block. */
68  /* If the number of bytes of type is too large, or we are in a leftover
69     block, do not split at all. */
70  if ((typesize <= MAXSPLITS) && (!leftoverblock)) {
71    nsplits = typesize;
72  }
73  else {
74    nsplits = 1;
75  }
76  neblock = blocksize / nsplits;
77  for (j = 0; j < nsplits; j++) {
78    _dest += sizeof(int);
79    btbytes += sizeof(int);
80    ctbytes += sizeof(int);
81    maxout = neblock;
82    if (ctbytes+maxout > nbytes) {
83      maxout = nbytes - ctbytes;   /* avoid buffer overrun */
84      if (maxout <= 0) {
85        return 0;                  /* non-compressible block */
86      }
87    }
88    cbytes = blosclz_compress(clevel, _tmp+j*neblock, neblock,
89                              _dest, maxout-1);
90    if (cbytes >= maxout) {
91      /* Buffer overrun caused by blosclz_compress (should never happen) */
92      return -1;
93    }
94    else if (cbytes < 0) {
95      /* cbytes should never be negative */
96      return -2;
97    }
98    else if (cbytes == 0) {
99      /* The compressor has been unable to compress data significantly. */
100      /* Before doing the copy, check that we are not running into a
101         buffer overflow. */
102      if ((ctbytes+neblock) > nbytes) {
103        return 0;    /* Non-compressible data */
104      }
105      memcpy(_dest, _tmp+j*neblock, neblock);
106      cbytes = neblock;
107    }
108    ((unsigned int *)(_dest))[-1] = cbytes;
109    _dest += cbytes;
110    btbytes += cbytes;
111    ctbytes += cbytes;
112  }  /* Closes j < nsplits */
113
114  return btbytes;
115}
116
117
118/* Structure for parameters in compression routines */
119struct thread_data_c{
120  size_t typesize;
121  size_t blocksize;
122  int thread_id;
123  int clevel;
124  int doshuffle;
125  int cbytes;
126  unsigned int tblocks;
127  unsigned int thblocks;
128  unsigned int leftover;
129  unsigned char *src;
130  unsigned char *tmp;
131  unsigned char *tmp2;
132  unsigned int *thstarts;
133};
134
135
136/* Compress & shuffle several blocks in a single thread */
137void *t_blosc_c(void *params)
138{
139  /* Parameters */
140  struct thread_data_c *my_params = (struct thread_data_c *)params;
141  int thread_id = my_params->thread_id;
142  int clevel = my_params->clevel;
143  int doshuffle = my_params->doshuffle;
144  size_t typesize = my_params->typesize;
145  size_t blocksize = my_params->blocksize;
146  unsigned int tblocks = my_params->tblocks;
147  unsigned int thblocks = my_params->thblocks;
148  unsigned int leftover = my_params->leftover;
149  unsigned char *src = my_params->src;
150  unsigned char *tmp = my_params->tmp;
151  unsigned char *tmp2 = my_params->tmp2;
152  unsigned int *thstarts = my_params->thstarts;
153  /* Other local vars */
154  int j, cbytes, ntbytes = 0;
155  unsigned int bsize = blocksize;
156  unsigned int leftoverblock = 0;
157  unsigned int sstart;
158
159
160  for (j = 0; j < thblocks; j++) {
161    if (((thread_id*thblocks+j) == (tblocks - 1)) && (leftover > 0)) {
162      bsize = leftover;
163      leftoverblock = 1;
164    }
165    sstart = (thread_id*thblocks+j)*blocksize;
166    cbytes = _blosc_c(clevel, doshuffle, typesize, bsize, leftoverblock,
167                      (unsigned int)ntbytes, thblocks*blocksize,
168                      src+sstart, tmp2, tmp);
169    if (cbytes <= 0) {
170      ntbytes = cbytes;          /* _blosc_c failure or uncompressible data */
171      break;
172    }
173    thstarts[j] = ntbytes;
174    ntbytes += cbytes;
175    tmp2 += cbytes;
176    /* Break if we have reached the end of the blocks */
177    if ((thread_id*thblocks+j) == (tblocks - 1)) {
178      break;
179    }
180  }
181
182  my_params->cbytes = ntbytes;
183  pthread_exit((void *)my_params);
184}
185
186
187unsigned int
188blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
189               const void *src, void *dest)
190{
191  unsigned char *_dest=NULL;       /* current pos for destination buffer */
192  unsigned char *flags;            /* flags for header */
193  unsigned int *bstarts;           /* start pointers for each block */
194  size_t tblocks;                  /* number of total blocks in buffer */
195  size_t leftover;                 /* extra bytes at end of buffer */
196  size_t thblocks;                 /* number of total blocks per thread */
197  size_t leftover2, leftover3;     /* helpers */
198  size_t blocksize;                /* length of the block in bytes */
199  size_t bsize;                    /* corrected blocksize for last block */
200  unsigned int ctbytes = 0;        /* the number of bytes in output buffer */
201  unsigned int *ctbytes_;          /* placeholder for bytes in output buffer */
202  int cbytes;                      /* temporary compressed buffer length */
203  int leftoverblock;               /* left over block? */
204  unsigned int i, j;               /* local index variables */
205  char *too_long_message = "The impossible happened: buffer overflow!";
206  size_t t;
207  pthread_t threads[NUM_THREADS];
208  unsigned char *tmp[NUM_THREADS], *tmp2[NUM_THREADS];
209  struct thread_data_c params[NUM_THREADS];
210  unsigned int *thstarts[NUM_THREADS];   /* start pointers for each thread */
211  int rc;
212  void *status;
213  pthread_attr_t attr;
214  int nthreads = NUM_THREADS;
215  int giveup = 0;                  /* sentinel */
216
217  /* Compression level */
218  if (clevel < 0 || clevel > 9) {
219    /* If clevel not in 0..9, print an error */
220    fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
221    return -10;
222  }
223  else if (clevel == 0) {
224    /* No compression wanted.  Just return without doing anything else. */
225    return 0;
226  }
227
228  /* Shuffle */
229  if (doshuffle != 0 && doshuffle != 1) {
230    fprintf(stderr, "`doshuffle` parameter must be either 0 or 1!\n");
231    return -10;
232  }
233
234  /* Compute a blocksize depending on the optimization level */
235  blocksize = BLOCKSIZE;
236  /* 3 first optimization levels will not change blocksize */
237  for (i=4; i<=(unsigned int)clevel; i++) {
238    /* Escape if blocksize grows more than nbytes */
239    if (blocksize*2 > nbytes) break;
240    blocksize *= 2;
241  }
242  /* blocksize must be a multiple of the typesize */
243  blocksize = blocksize / typesize * typesize;
244
245  /* Compute number of blocks in buffer */
246  tblocks = nbytes / blocksize;
247  leftover = nbytes % blocksize;
248  tblocks = (leftover>0)? tblocks+1: tblocks;
249  /* Blocks per thread */
250  thblocks = tblocks / nthreads;
251  leftover2 = tblocks % nthreads;
252  thblocks = (leftover2>0)? thblocks+1: thblocks;
253  /* Final correction for the number of threads */
254  if (nthreads > (tblocks / thblocks)) {
255    nthreads = tblocks / thblocks;
256    leftover3 = tblocks % thblocks;
257    nthreads = (leftover3>0)? nthreads+1: nthreads;
258  }
259
260  /* Check typesize limits */
261  if (typesize == MAXTYPESIZE) {
262    typesize = 0;               /* zero means MAXTYPESIZE */
263  }
264  else if (typesize > MAXTYPESIZE) {
265    /* If typesize is too large, treat buffer as an 1-byte stream. */
266    typesize = 1;
267  }
268
269  _dest = (unsigned char *)(dest);
270  /* Write header for this block */
271  _dest[0] = BLOSC_VERSION_FORMAT;         /* blosc format version */
272  _dest[1] = BLOSCLZ_VERSION_FORMAT;       /* blosclz format version */
273  flags = _dest+2;                         /* flags */
274  _dest[2] = 0;                            /* zeroes flags */
275  _dest[3] = (unsigned char)typesize;      /* type size */
276  _dest += 4;
277  ctbytes += 4;
278  ((unsigned int *)_dest)[0] = nbytes;     /* size of the chunk */
279  ((unsigned int *)_dest)[1] = blocksize;  /* block size */
280  ctbytes_ = (unsigned int *)(_dest+8);    /* compressed chunk size (pointer) */
281  _dest += sizeof(int)*3;
282  ctbytes += sizeof(int)*3;
283  bstarts = (unsigned int *)_dest;         /* starts for every block */
284  _dest += sizeof(int)*tblocks;            /* book space for pointers to */
285  ctbytes += sizeof(int)*tblocks;          /* every block in output */
286
287  if (doshuffle == 1) {
288    /* Shuffle is active */
289    *flags |= 0x1;                         /* bit 0 set to one in flags */
290  }
291
292  /* Initialize and set thread detached attribute */
293  pthread_attr_init(&attr);
294  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
295
296  for(t=0; t < nthreads; t++){
297  /* Create temporary area for each thread */
298#ifdef _WIN32
299    tmp[t] = (unsigned char*)_aligned_malloc(blocksize, 16);
300    tmp2[t] = (unsigned char*)_aligned_malloc(blocksize*thblocks, 16);
301    thstarts[t] = (unsigned int*)_aligned_malloc(sizeof(int)*thblocks, 16);
302#elif defined __APPLE__
303    /* Mac OS X guarantees 16-byte alignment in small allocs */
304    tmp[t] = (unsigned char *)malloc(blocksize);
305    tmp2[t] = (unsigned char *)malloc(blocksize*thblocks);
306    thstarts[t] = (unsigned int*)malloc(sizeof(int)*thblocks);
307#else
308    rc = posix_memalign((void **)&tmp[t], 16, blocksize);
309    rc = posix_memalign((void **)&tmp2[t], 16, blocksize*thblocks);
310    rc = posix_memalign((void **)&thstarts[t], 16, sizeof(int)*thblocks);
311#endif  /* _WIN32 */
312    /* Populate parameters for thread */
313    params[t].thread_id = t;
314    params[t].clevel = clevel;
315    params[t].doshuffle = doshuffle;
316    params[t].typesize = typesize;
317    params[t].blocksize = blocksize;
318    params[t].tblocks = tblocks;
319    params[t].thblocks = thblocks;
320    if (t == (nthreads - 1) && leftover > 0) {
321      params[t].leftover = leftover;
322    }
323    else {
324      params[t].leftover = 0;
325    }
326    params[t].src = (unsigned char *)src;
327    params[t].tmp = tmp[t];
328    params[t].tmp2 = tmp2[t];
329    params[t].thstarts = thstarts[t];
330    rc = pthread_create(&threads[t], &attr, t_blosc_c, (void *)&params[t]);
331    if (rc){
332      fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc);
333      fprintf(stderr, "\tError detail: %s\n", strerror(rc));
334      exit(-1);
335    }
336  }  /* Close t < nthreads */
337
338  /* Wait for threads to finish (join threads) */
339  pthread_attr_destroy(&attr);    /* free attribute */
340  for(t=0; t<nthreads; t++){
341    rc = pthread_join(threads[t], &status);
342    if (rc) {
343      fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc);
344      fprintf(stderr, "\tError detail: %s\n", strerror(rc));
345      exit(-1);
346    }
347    if (giveup) {
348      /* We have been signaled to give up.  Wait for all threads to finish. */
349      continue;
350    }
351    cbytes = ((struct thread_data_c *)status)->cbytes;
352    /* Check result of compression */
353    if (cbytes < 0) {
354      fprintf(stderr, "%s\n", too_long_message);
355      ctbytes = cbytes;         /* error in _blosc_c */
356      giveup = 1;
357    }
358    else if ((cbytes == 0) | (ctbytes+cbytes >= nbytes)) {
359      ctbytes = 0;              /* uncompressible data */
360      giveup = 1;
361    }
362    else {
363      /* Compression process seems right */
364      /* Copy block starts */
365      for (j=0; j<thblocks; j++) {
366        /* Check that we do not exceed the end of the blocks */
367        if ((t*thblocks+j) < tblocks) {
368          bstarts[t*thblocks+j] = thstarts[t][j]+ctbytes;
369        }
370      }
371      /* Copy thread's private buffer to destination buffer */
372      memcpy(_dest, tmp2[t], cbytes);
373      _dest += cbytes;
374      ctbytes += cbytes;          /* update compressed bytes */
375    }
376  }  /* Close t < nthreads */
377
378  /* Release buffers */
379  for(t=0; t < nthreads; t++){
380#ifdef _WIN32
381    _aligned_free(tmp[t]);
382    _aligned_free(tmp2[t]);
383    _aligned_free(thstarts[t]);
384#else
385    free(tmp[t]);
386    free(tmp2[t]);
387    free(thstarts[t]);
388#endif  /* _WIN32 */
389  }
390
391  assert(ctbytes < nbytes);
392  *ctbytes_ = ctbytes;   /* set the number of compressed bytes in header */
393  return ctbytes;
394}
395
396
397/* Decompress & unshuffle a single block */
398static size_t
399_blosc_d(int dounshuffle, size_t typesize, size_t blocksize, int leftoverblock,
400         unsigned char* _src, unsigned char* _dest,
401         unsigned char *tmp, unsigned char *tmp2)
402{
403  size_t j, neblock, nsplits;
404  size_t nbytes, cbytes, ctbytes = 0, ntbytes = 0;
405  unsigned char* _tmp;
406
407  if (dounshuffle && (typesize > 1)) {
408    _tmp = tmp;
409  }
410  else {
411    _tmp = _dest;
412  }
413
414  /* Compress for each shuffled slice split for this block. */
415  /* If the number of bytes is too large, do not split all. */
416  if ((typesize <= MAXSPLITS) && (!leftoverblock)) {
417    nsplits = typesize;
418  }
419  else {
420    nsplits = 1;
421  }
422  neblock = blocksize / nsplits;
423  for (j = 0; j < nsplits; j++) {
424    cbytes = ((unsigned int *)(_src))[0]; /* amount of compressed bytes */
425    _src += sizeof(int);
426    ctbytes += sizeof(int);
427    /* Uncompress */
428    if (cbytes == neblock) {
429      memcpy(_tmp, _src, neblock);
430      nbytes = neblock;
431    }
432    else {
433      nbytes = blosclz_decompress(_src, cbytes, _tmp, neblock);
434      if (nbytes != neblock) {
435        return -2;
436      }
437    }
438    _src += cbytes;
439    ctbytes += cbytes;
440    _tmp += neblock;
441    ntbytes += nbytes;
442  } /* Closes j < nsplits */
443
444  if (dounshuffle && (typesize > 1)) {
445    if ((uintptr_t)_dest % 16 == 0) {
446      /* 16-bytes aligned _dest.  SSE2 unshuffle will work. */
447      unshuffle(typesize, blocksize, tmp, _dest);
448    }
449    else {
450      /* _dest is not aligned.  Use tmp2, which is aligned, and copy. */
451      unshuffle(typesize, blocksize, tmp, tmp2);
452      memcpy(_dest, tmp2, blocksize);
453    }
454  }
455
456  return ctbytes;
457}
458
459
460/* Structure for parameters in decompression routines */
461struct thread_data_d{
462  size_t typesize;
463  size_t blocksize;
464  int thread_id;
465  int dounshuffle;
466  int cbytes;
467  unsigned int tblocks;
468  unsigned int thblocks;
469  unsigned int leftover;
470  unsigned int *bstarts;             /* start pointers for each block */
471  unsigned char *src;
472  unsigned char *dest;
473  unsigned char *tmp;
474  unsigned char *tmp2;
475};
476
477
478/* Decompress & unshuffle several blocks in a single thread */
479void *t_blosc_d(void *params)
480{
481  /* Parameters */
482  struct thread_data_d *my_params = (struct thread_data_d *)params;
483  int thread_id = my_params->thread_id;
484  int dounshuffle = my_params->dounshuffle;
485  size_t typesize = my_params->typesize;
486  size_t blocksize = my_params->blocksize;
487  unsigned int tblocks = my_params->tblocks;
488  unsigned int thblocks = my_params->thblocks;
489  unsigned int *bstarts = my_params->bstarts;
490  unsigned int leftover = my_params->leftover;
491  unsigned char *src = my_params->src;
492  unsigned char *dest = my_params->dest;
493  unsigned char *tmp = my_params->tmp;
494  unsigned char *tmp2 = my_params->tmp2;
495  /* Other local vars */
496  int j, cbytes, ntbytes = 0;
497  unsigned int bsize = blocksize;
498  unsigned int leftoverblock = 0;
499
500  for (j = 0; j < thblocks; j++) {
501    if (((thread_id*thblocks+j) == (tblocks - 1)) && (leftover > 0)) {
502      bsize = leftover;
503      leftoverblock = 1;
504    }
505    cbytes = _blosc_d(dounshuffle, typesize, bsize, leftoverblock,
506                      src+bstarts[j], dest+j*blocksize, tmp, tmp2);
507    if (cbytes < 0) {
508      ntbytes = cbytes;          /* _blosc_d failure */
509      goto out;
510    }
511    ntbytes += cbytes;
512    /* Break if we have reached the end of the blocks */
513    if ((thread_id*thblocks+j) == (tblocks - 1)) {
514      goto out;
515    }
516  }
517
518 out:
519  my_params->cbytes = ntbytes;
520  if (NUM_THREADS > 1) {
521    pthread_exit((void *)my_params);
522  }
523  else {
524    return (void *)my_params;
525  }
526}
527
528
529unsigned int
530blosc_decompress(const void *src, void *dest, size_t dest_size)
531{
532  unsigned char *_src=NULL;          /* alias for source buffer */
533  unsigned char *_dest=NULL;         /* alias for destination buffer */
534  unsigned char version, versionlz;  /* versions for compressed header */
535  unsigned char flags;               /* flags for header */
536  size_t leftover;                   /* extra bytes at end of buffer */
537  size_t tblocks;                    /* number of total blocks in buffer */
538  size_t thblocks;                   /* number of total blocks per thread */
539  size_t leftover2, leftover3;       /* helpers */
540  unsigned int *bstarts;             /* start pointers for each block */
541  int nbytes, cbytes, ntbytes = 0;
542  int dounshuffle = 0;
543  unsigned int typesize, blocksize, ctbytes;
544  size_t t;
545  pthread_t threads[NUM_THREADS];
546  unsigned char *tmp[NUM_THREADS], *tmp2[NUM_THREADS];
547  struct thread_data_d params[NUM_THREADS];
548  int rc;
549  void *status;
550  pthread_attr_t attr;
551  int nthreads = NUM_THREADS;
552  int giveup;
553
554  _src = (unsigned char *)(src);
555  _dest = (unsigned char *)(dest);
556
557  /* Read the header block */
558  version = _src[0];                        /* blosc format version */
559  versionlz = _src[1];                      /* blosclz format version */
560  flags = _src[2];                          /* flags */
561  typesize = (unsigned int)_src[3];         /* typesize */
562  _src += 4;
563  nbytes = ((unsigned int *)_src)[0];       /* chunk size */
564  blocksize = ((unsigned int *)_src)[1];    /* block size */
565  ctbytes = ((unsigned int *)_src)[2];      /* compressed chunk size */
566  _src += sizeof(int)*3;
567  /* Compute some params */
568  /* Total blocks */
569  tblocks = nbytes / blocksize;
570  leftover = nbytes % blocksize;
571  tblocks = (leftover>0)? tblocks+1: tblocks;
572  /* Blocks per thread */
573  thblocks = tblocks / nthreads;
574  leftover2 = tblocks % nthreads;
575  thblocks = (leftover2>0)? thblocks+1: thblocks;
576  /* Final correction for the number of threads */
577  if (nthreads > (tblocks / thblocks)) {
578    nthreads = tblocks / thblocks;
579    leftover3 = tblocks % thblocks;
580    nthreads = (leftover3>0)? nthreads+1: nthreads;
581  }
582  bstarts = (unsigned int *)_src;
583  _src += sizeof(int)*tblocks;
584
585  /* Check zero typesizes */
586  if (typesize == 0) {
587    typesize = MAXTYPESIZE;
588  }
589
590  if (nbytes > dest_size) {
591    /* This should never happen but just in case */
592    return -1;
593  }
594
595  if ((flags & 0x1) == 1) {
596    /* Input is shuffled.  Unshuffle it. */
597    dounshuffle = 1;
598  }
599
600  /* Create temporary area for each thread */
601  for(t=0; t<nthreads; t++){
602#ifdef _WIN32
603    tmp[t] = (unsigned char*)_aligned_malloc(blocksize, 16);
604    tmp2[t] = (unsigned char*)_aligned_malloc(blocksize, 16);
605#elif defined __APPLE__
606    /* Mac OS X guarantees 16-byte alignment in small allocs */
607    tmp[t] = (unsigned char *)malloc(blocksize);
608    tmp2[t] = (unsigned char *)malloc(blocksize);
609#else
610    rc = posix_memalign((void **)&tmp[t], 16, blocksize);
611    rc = posix_memalign((void **)&tmp2[t], 16, blocksize);
612#endif  /* _WIN32 */
613  }
614
615  /* Initialize and set thread detached attribute */
616  pthread_attr_init(&attr);
617  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
618
619  for(t=0; t < nthreads; t++){
620    params[t].thread_id = t;
621    params[t].dounshuffle = dounshuffle;
622    params[t].typesize = typesize;
623    params[t].blocksize = blocksize;
624    params[t].tblocks = tblocks;
625    params[t].thblocks = thblocks;
626    params[t].bstarts = bstarts + t*thblocks;
627    if (t == (nthreads - 1) && leftover > 0) {
628      params[t].leftover = leftover;
629    }
630    else {
631      params[t].leftover = 0;
632    }
633    params[t].src = (unsigned char *)src;
634    params[t].dest = (unsigned char *)dest + t*thblocks*blocksize;
635    params[t].tmp = tmp[t];
636    params[t].tmp2 = tmp2[t];
637    if (nthreads > 1) {
638      rc = pthread_create(&threads[t], &attr, t_blosc_d, (void *)&params[t]);
639      if (rc){
640        fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc);
641        fprintf(stderr, "\tError detail: %s\n", strerror(rc));
642        exit(-1);
643      }
644    }
645    else {
646      status = t_blosc_d((void *)&params[t]);
647      cbytes = ((struct thread_data_d *)status)->cbytes;
648      if (cbytes < 0) {
649        nbytes = cbytes;          /* _blosc_d failure */
650        goto out;
651      }
652      ntbytes += cbytes;          /* update decompressed bytes */
653    }
654  }
655
656  /* Wait for threads to finish (join threads) */
657  pthread_attr_destroy(&attr);    /* free attribute */
658  if (nthreads > 1) {
659    for(t=0; t < nthreads; t++){
660      rc = pthread_join(threads[t], &status);
661      if (rc) {
662        fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc);
663        fprintf(stderr, "\tError detail: %s\n", strerror(rc));
664        exit(-1);
665      }
666      if (giveup) {
667        continue;
668      }
669      cbytes = ((struct thread_data_d *)status)->cbytes;
670      if (cbytes < 0) {
671        nbytes = cbytes;          /* _blosc_d failure */
672        giveup = 1;
673      }
674      ntbytes += cbytes;          /* update decompressed bytes */
675    }
676  }
677
678 out:
679  /* Release buffers */
680  for(t=0; t < nthreads; t++){
681#ifdef _WIN32
682    _aligned_free(tmp[t]);
683    _aligned_free(tmp2[t]);
684#else
685    free(tmp[t]);
686    free(tmp2[t]);
687#endif  /* _WIN32 */
688  }
689
690  /* assert(ntbytes+bstarts[0] == ctbytes); */
691  return nbytes;
692}
Note: See TracBrowser for help on using the repository browser.