Changeset 59


Ignore:
Timestamp:
04/26/10 14:03:24 (4 years ago)
Author:
faltet
Message:

New version that implements a pool of threads for (de-)compression.

The pool avoids the thread initialization time. Also, both compression
and decompression are supported now in threaded mode.

The benchmark has also been updated to mimic more closely its use in
PyTables? computing mode (tables.Expr).

Location:
branches/threaded/src
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • branches/threaded/src/bench.c

    r54 r59  
    3535#define MB  (1024*1024) 
    3636 
    37 #define NITER  (20*1000)               /* Number of iterations */ 
     37#define NCHUNKS (100) 
     38#define NITER  (10)               /* Number of iterations */ 
    3839 
    3940 
     
    9596  sec = current.tv_sec - last.tv_sec; 
    9697  usec = current.tv_usec - last.tv_usec; 
    97   return (float)(((double)sec + usec*1e-6)/(double)NITER*1e6); 
    98 } 
    99  
    100  
    101 int main(void) { 
    102   int nbytes, cbytes; 
    103   void *src, *srccpy, *dest, *dest2; 
    104   size_t i; 
    105   struct timeval last, current; 
    106   float tmemcpy, tshuf, tunshuf; 
    107   int *_src; 
    108   int *_srccpy; 
    109   int rshift = 22;              /* For random data */ 
    110   //int rshift = 20;              /* For random data */ 
    111   int clevel; 
    112   int doshuffle = 1;            /* Shuffle? */ 
    113   int fd; 
    114   int status; 
    115   char *filename = "25Kelem-4B-typesize.data"; 
    116   unsigned int size = 100*1000; /* Buffer size */ 
    117   unsigned int elsize = 4;      /* Datatype size */ 
    118   unsigned char *orig, *round; 
    119  
    120   src = malloc(size); 
    121   srccpy = malloc(size); 
    122   dest = malloc(size); 
    123   dest2 = malloc(size); 
    124  
     98  return (float)(((double)sec + usec*1e-6)/((double)NITER*NCHUNKS)*1e6); 
     99} 
     100 
     101 
     102int get_value(int i, int rshift) { 
     103  int v; 
     104 
     105  v = (i<<26)^(i<<18)^(i<<11)^(i<<3)^i; 
     106  v &= (1 << (32-rshift)) - 1; 
     107  return v; 
     108} 
     109 
     110 
     111void init_buffer(void *src, int size, int rshift) { 
     112  int i, *_src = (int *)src; 
     113 
     114  /* To have reproducible results */ 
    125115  srand(1); 
    126116 
    127117  /* Initialize the original buffer */ 
    128   _src = (int *)src; 
    129   _srccpy = (int *)srccpy; 
    130   elsize = sizeof(int); 
    131   for(i = 0; i < size/elsize; ++i) { 
     118  for (i = 0; i < size/sizeof(int); ++i) { 
    132119    /* Choose one below */ 
    133120    /* _src[i] = 1; */ 
     
    136123    /* _src[i] = i * 1/.3; */ 
    137124    /* _src[i] = i; */ 
    138     _src[i] = rand() >> rshift; 
    139   } 
    140  
    141   /* For data coming from a file */ 
    142   /* fd = open(filename, 0); */ 
    143   /* status = read(fd, src, size); */ 
    144   /* if (status == -1) { */ 
    145   /*   perror(NULL); */ 
    146   /* } */ 
    147   /* close(fd); */ 
     125    //_src[i] = rand() >> rshift; 
     126    _src[i] = get_value(i, rshift); 
     127  } 
     128} 
     129 
     130 
     131int main(void) { 
     132  int nbytes, cbytes; 
     133  void *src, *srccpy; 
     134  void **dest[NCHUNKS], *dest2; 
     135  size_t i, j; 
     136  struct timeval last, current; 
     137  float tmemcpy, tshuf, tunshuf; 
     138  int *_src; 
     139  int *_srccpy; 
     140  int rshift = 12;                /* For random data */ 
     141  int clevel; 
     142  int doshuffle = 1;              /* Shuffle? */ 
     143  int fd; 
     144  int status; 
     145  unsigned int size = 1024*1024;  /* Buffer size */ 
     146  unsigned int elsize = 8;        /* Datatype size */ 
     147  unsigned char *orig, *round; 
     148 
     149  /* The number of threads */ 
     150  blosc_init_threads(4); 
     151 
     152  src = malloc(size); 
     153  srccpy = malloc(size); 
     154  dest2 = malloc(size); 
     155 
     156  /* Initialize buffers */ 
     157  init_buffer(src, size, rshift); 
     158  memcpy(srccpy, src, size); 
     159  for (j = 0; j < NCHUNKS; j++) { 
     160    dest[j] = malloc(size); 
     161  } 
    148162 
    149163  printf("********************** Setup info *****************************\n"); 
    150164  printf("Blosc version: %s (%s)\n", BLOSC_VERSION_STRING, BLOSC_VERSION_DATE); 
    151 /*  printf("Using random data with %d significant bits (out of 32)\n", 32-rshift); */ 
    152   printf("Using data coming from file: %s\n", filename); 
     165  printf("Using random data with %d significant bits (out of 32)\n", 32-rshift); 
    153166  printf("Dataset size: %d bytes\t Type size: %d bytes\n", size, elsize); 
    154167  printf("Shuffle active?  %s\n", doshuffle ? "Yes" : "No"); 
    155168  printf("********************** Running benchmarks *********************\n"); 
    156169 
    157   memcpy(srccpy, src, size); 
    158  
    159170  gettimeofday(&last, NULL); 
    160171  for (i = 0; i < NITER; i++) { 
    161     memcpy(srccpy, src, size); 
     172    for (j = 0; j < NCHUNKS; j++) { 
     173      memcpy(dest[j], src, size); 
     174    } 
    162175  } 
    163176  gettimeofday(&current, NULL); 
    164177  tmemcpy = getseconds(last, current); 
    165   printf("memcpy:\t\t %6.1f us, %.1f MB/s\n", tmemcpy, size/(tmemcpy*MB/1e6)); 
     178  printf("memcpy(write):\t\t %6.1f us, %.1f MB/s\n", 
     179         tmemcpy, size/(tmemcpy*MB/1e6)); 
     180 
     181  gettimeofday(&last, NULL); 
     182  for (i = 0; i < NITER; i++) { 
     183    for (j = 0; j < NCHUNKS; j++) { 
     184      memcpy(dest2, dest[j], size); 
     185    } 
     186  } 
     187  gettimeofday(&current, NULL); 
     188  tmemcpy = getseconds(last, current); 
     189  printf("memcpy(read):\t\t %6.1f us, %.1f MB/s\n", 
     190         tmemcpy, size/(tmemcpy*MB/1e6)); 
    166191 
    167192  for (clevel=1; clevel<10; clevel++) { 
     
    171196    gettimeofday(&last, NULL); 
    172197    for (i = 0; i < NITER; i++) { 
    173       cbytes = blosc_compress(clevel, doshuffle, elsize, size, src, dest); 
     198      for (j = 0; j < NCHUNKS; j++) { 
     199        cbytes = blosc_compress(clevel, doshuffle, elsize, size, src, dest[j]); 
     200      } 
    174201    } 
    175202    gettimeofday(&current, NULL); 
    176203    tshuf = getseconds(last, current); 
    177     printf("compression:\t %6.1f us, %.1f MB/s\t  ", 
     204    printf("compression(write):\t %6.1f us, %.1f MB/s\t  ", 
    178205           tshuf, size/(tshuf*MB/1e6)); 
    179206    printf("Final bytes: %d  ", cbytes); 
     
    183210    printf("\n"); 
    184211 
     212    /* Compressor was unable to compress.  Copy the buffer manually. */ 
     213    if (cbytes == 0) { 
     214      for (j = 0; j < NCHUNKS; j++) { 
     215        memcpy(dest[j], src, size); 
     216      } 
     217    } 
     218 
    185219    gettimeofday(&last, NULL); 
    186     for (i = 0; i < NITER; i++) 
    187       if (cbytes == 0) { 
    188         memcpy(dest2, src, size); 
    189         nbytes = size; 
    190       } 
    191       else { 
    192         nbytes = blosc_decompress(dest, dest2, size); 
    193       } 
     220    for (i = 0; i < NITER; i++) { 
     221      for (j = 0; j < NCHUNKS; j++) { 
     222        if (cbytes == 0) { 
     223          memcpy(dest2, dest[j], size); 
     224          nbytes = size; 
     225        } 
     226        else { 
     227          nbytes = blosc_decompress(dest[j], dest2, size); 
     228        } 
     229      } 
     230    } 
    194231    gettimeofday(&current, NULL); 
    195232    tunshuf = getseconds(last, current); 
    196     printf("decompression:\t %6.1f us, %.1f MB/s\t  ", 
     233    printf("decompression(read):\t %6.1f us, %.1f MB/s\t  ", 
    197234           tunshuf, nbytes/(tunshuf*MB/1e6)); 
    198235    if (nbytes < 0) { 
     
    218255 
    219256 out: 
    220   free(src); free(srccpy); free(dest); free(dest2); 
     257  free(src); free(srccpy); free(dest2); 
     258  for (i = 0; i < NITER; i++) { 
     259    free(dest[i]); 
     260  } 
    221261  return 0; 
    222262} 
  • branches/threaded/src/blosc.c

    r55 r59  
    3737 
    3838/* The number of threads */ 
    39 #define NUM_THREADS 4 
    40  
    41 /* Global variables for compressing/shuffling actions */ 
    42 int clevel;                     /* Compression level */ 
    43 int do_shuffle;                 /* Shuffle is active? */ 
    44  
     39#define MAX_THREADS 64 
     40 
     41 
     42/* Global variables for threads */ 
     43int nthreads;                   /* number of desired threads */ 
     44int giveup;                     /* should (de-)compression give up? */ 
     45int init_done = 0;              /* init threads has been done yet? */ 
     46int nblock = -1;                /* block counter */ 
     47pthread_t threads[MAX_THREADS]; /* opaque structure for threads */ 
     48int tids[MAX_THREADS];          /* ID per each threads */ 
     49pthread_attr_t ct_attr;         /* creation time attributes for threads */ 
     50 
     51/* Syncronization variables */ 
     52pthread_mutex_t count_mutex; 
     53pthread_barrier_t barr_init; 
     54pthread_barrier_t barr_inter; 
     55pthread_barrier_t barr_finish; 
     56 
     57/* Structure for parameters in (de-)compression threads */ 
     58struct thread_data { 
     59  size_t typesize; 
     60  size_t blocksize; 
     61  int compress; 
     62  int clevel; 
     63  int shuffle; 
     64  int ntbytes; 
     65  unsigned int nbytes; 
     66  unsigned int nblocks; 
     67  unsigned int leftover; 
     68  unsigned int *bstarts;             /* start pointers for each block */ 
     69  unsigned char *src; 
     70  unsigned char *dest; 
     71  unsigned char *tmp[MAX_THREADS]; 
     72  unsigned char *tmp2[MAX_THREADS]; 
     73} params; 
     74 
     75 
     76/* Convenience functions for creating and releasing temporaries */ 
     77void 
     78create_temporaries(void) 
     79{ 
     80  int tid; 
     81  size_t blocksize = params.blocksize; 
     82  /* Extended blocksize for temporary destination.  Extended blocksize 
     83   is only useful for compression in parallel mode, but it doesn't 
     84   hurt other modes either. */ 
     85  size_t ebsize = blocksize + params.typesize*sizeof(int); 
     86  unsigned char *tmp, *tmp2; 
     87 
     88  /* Create temporary area for each thread */ 
     89  for (tid = 0; tid < nthreads; tid++) { 
     90#ifdef _WIN32 
     91    tmp = (unsigned char *)_aligned_malloc(blocksize, 16); 
     92    tmp2 = (unsigned char *)_aligned_malloc(ebsize, 16); 
     93#elif defined __APPLE__ 
     94    /* Mac OS X guarantees 16-byte alignment in small allocs */ 
     95    tmp = (unsigned char *)malloc(blocksize); 
     96    tmp2 = (unsigned char *)malloc(ebsize); 
     97#else 
     98    posix_memalign((void **)&tmp, 16, blocksize); 
     99    posix_memalign((void **)&tmp2, 16, ebsize); 
     100#endif  /* _WIN32 */ 
     101    params.tmp[tid] = tmp; 
     102    params.tmp2[tid] = tmp2; 
     103  } 
     104} 
     105 
     106 
     107void 
     108release_temporaries(void) 
     109{ 
     110  int tid; 
     111  unsigned char *tmp, *tmp2; 
     112 
     113  /* Release buffers */ 
     114  for (tid = 0; tid < nthreads; tid++) { 
     115    tmp = params.tmp[tid]; 
     116    tmp2 = params.tmp2[tid]; 
     117#ifdef _WIN32 
     118    _aligned_free(tmp); 
     119    _aligned_free(tmp2); 
     120#else 
     121    free(tmp); 
     122    free(tmp2); 
     123#endif  /* _WIN32 */ 
     124  } 
     125} 
    45126 
    46127 
    47128/* Shuffle & Compress a single block */ 
    48 static size_t 
    49 _blosc_c(int clevel, int doshuffle, size_t typesize, size_t blocksize, 
    50          int leftoverblock, unsigned int ctbytes, unsigned int nbytes, 
    51          unsigned char* _src, unsigned char* _dest, unsigned char *tmp) 
     129static int 
     130blosc_c(size_t blocksize, int leftoverblock, 
     131        unsigned int ntbytes, unsigned int nbytes, 
     132        unsigned char *src, unsigned char *dest, unsigned char *tmp) 
    52133{ 
    53134  size_t j, neblock, nsplits; 
    54   int cbytes, maxout; 
    55   unsigned int btbytes = 0; 
    56   unsigned char* _tmp; 
    57  
    58   if (doshuffle && (typesize > 1)) { 
     135  int cbytes;                   /* number of compressed bytes in split */ 
     136  int ctbytes = 0;              /* number of compressed bytes in block */ 
     137  int maxout; 
     138  unsigned char *_tmp; 
     139  size_t typesize = params.typesize; 
     140 
     141  if (params.shuffle && (typesize > 1)) { 
    59142    /* Shuffle this block (this makes sense only if typesize > 1) */ 
    60     shuffle(typesize, blocksize, _src, tmp); 
     143    shuffle(typesize, blocksize, src, tmp); 
    61144    _tmp = tmp; 
    62145  } 
    63146  else { 
    64     _tmp = _src; 
     147    _tmp = src; 
    65148  } 
    66149 
     
    76159  neblock = blocksize / nsplits; 
    77160  for (j = 0; j < nsplits; j++) { 
    78     _dest += sizeof(int); 
    79     btbytes += sizeof(int); 
     161    dest += sizeof(int); 
     162    ntbytes += sizeof(int); 
    80163    ctbytes += sizeof(int); 
    81164    maxout = neblock; 
    82     if (ctbytes+maxout > nbytes) { 
    83       maxout = nbytes - ctbytes;   /* avoid buffer overrun */ 
     165    if (ntbytes+maxout > nbytes) { 
     166      maxout = nbytes - ntbytes;   /* avoid buffer overrun */ 
    84167      if (maxout <= 0) { 
    85168        return 0;                  /* non-compressible block */ 
    86169      } 
    87170    } 
    88     cbytes = blosclz_compress(clevel, _tmp+j*neblock, neblock, 
    89                               _dest, maxout-1); 
     171    cbytes = blosclz_compress(params.clevel, _tmp+j*neblock, neblock, 
     172                              dest, maxout-1); 
    90173    if (cbytes >= maxout) { 
    91174      /* Buffer overrun caused by blosclz_compress (should never happen) */ 
     
    100183      /* Before doing the copy, check that we are not running into a 
    101184         buffer overflow. */ 
    102       if ((ctbytes+neblock) > nbytes) { 
     185      if ((ntbytes+neblock) > nbytes) { 
    103186        return 0;    /* Non-compressible data */ 
    104187      } 
    105       memcpy(_dest, _tmp+j*neblock, neblock); 
     188      memcpy(dest, _tmp+j*neblock, neblock); 
    106189      cbytes = neblock; 
    107190    } 
    108     ((unsigned int *)(_dest))[-1] = cbytes; 
    109     _dest += cbytes; 
    110     btbytes += cbytes; 
     191    ((unsigned int *)(dest))[-1] = cbytes; 
     192    dest += cbytes; 
     193    ntbytes += cbytes; 
    111194    ctbytes += cbytes; 
    112195  }  /* Closes j < nsplits */ 
    113196 
    114   return btbytes; 
    115 } 
    116  
    117  
    118 /* Structure for parameters in compression routines */ 
    119 struct thread_data_c{ 
    120   size_t typesize; 
    121   size_t blocksize; 
    122   int thread_id; 
    123   int clevel; 
    124   int doshuffle; 
    125   int cbytes; 
    126   unsigned int tblocks; 
    127   unsigned int thblocks; 
    128   unsigned int leftover; 
    129   unsigned char *src; 
    130   unsigned char *tmp; 
    131   unsigned char *tmp2; 
    132   unsigned int *thstarts; 
    133 }; 
    134  
    135  
    136 /* Compress & shuffle several blocks in a single thread */ 
    137 void *t_blosc_c(void *params) 
    138 { 
    139   /* Parameters */ 
    140   struct thread_data_c *my_params = (struct thread_data_c *)params; 
    141   int thread_id = my_params->thread_id; 
    142   int clevel = my_params->clevel; 
    143   int doshuffle = my_params->doshuffle; 
    144   size_t typesize = my_params->typesize; 
    145   size_t blocksize = my_params->blocksize; 
    146   unsigned int tblocks = my_params->tblocks; 
    147   unsigned int thblocks = my_params->thblocks; 
    148   unsigned int leftover = my_params->leftover; 
    149   unsigned char *src = my_params->src; 
    150   unsigned char *tmp = my_params->tmp; 
    151   unsigned char *tmp2 = my_params->tmp2; 
    152   unsigned int *thstarts = my_params->thstarts; 
    153   /* Other local vars */ 
    154   int j, cbytes, ntbytes = 0; 
    155   unsigned int bsize = blocksize; 
    156   unsigned int leftoverblock = 0; 
    157   unsigned int sstart; 
    158  
    159  
    160   for (j = 0; j < thblocks; j++) { 
    161     if (((thread_id*thblocks+j) == (tblocks - 1)) && (leftover > 0)) { 
    162       bsize = leftover; 
    163       leftoverblock = 1; 
    164     } 
    165     sstart = (thread_id*thblocks+j)*blocksize; 
    166     cbytes = _blosc_c(clevel, doshuffle, typesize, bsize, leftoverblock, 
    167                       (unsigned int)ntbytes, thblocks*blocksize, 
    168                       src+sstart, tmp2, tmp); 
    169     if (cbytes <= 0) { 
    170       ntbytes = cbytes;          /* _blosc_c failure or uncompressible data */ 
    171       break; 
    172     } 
    173     thstarts[j] = ntbytes; 
    174     ntbytes += cbytes; 
    175     tmp2 += cbytes; 
    176     /* Break if we have reached the end of the blocks */ 
    177     if ((thread_id*thblocks+j) == (tblocks - 1)) { 
    178       break; 
    179     } 
    180   } 
    181  
    182   my_params->cbytes = ntbytes; 
    183   pthread_exit((void *)my_params); 
     197  return ctbytes; 
    184198} 
    185199 
    186200 
    187201unsigned int 
    188 blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes, 
     202blosc_compress(int clevel, int shuffle, size_t typesize, size_t nbytes, 
    189203               const void *src, void *dest) 
    190204{ 
    191205  unsigned char *_dest=NULL;       /* current pos for destination buffer */ 
    192206  unsigned char *flags;            /* flags for header */ 
     207  unsigned int nblocks;            /* number of total blocks in buffer */ 
     208  unsigned int leftover;           /* extra bytes at end of buffer */ 
    193209  unsigned int *bstarts;           /* start pointers for each block */ 
    194   size_t tblocks;                  /* number of total blocks in buffer */ 
    195   size_t leftover;                 /* extra bytes at end of buffer */ 
    196   size_t thblocks;                 /* number of total blocks per thread */ 
    197   size_t leftover2, leftover3;     /* helpers */ 
    198210  size_t blocksize;                /* length of the block in bytes */ 
    199   size_t bsize;                    /* corrected blocksize for last block */ 
    200   unsigned int ctbytes = 0;        /* the number of bytes in output buffer */ 
    201   unsigned int *ctbytes_;          /* placeholder for bytes in output buffer */ 
    202   int cbytes;                      /* temporary compressed buffer length */ 
    203   int leftoverblock;               /* left over block? */ 
    204   unsigned int i, j;               /* local index variables */ 
    205   char *too_long_message = "The impossible happened: buffer overflow!"; 
    206   size_t t; 
    207   pthread_t threads[NUM_THREADS]; 
    208   unsigned char *tmp[NUM_THREADS], *tmp2[NUM_THREADS]; 
    209   struct thread_data_c params[NUM_THREADS]; 
    210   unsigned int *thstarts[NUM_THREADS];   /* start pointers for each thread */ 
    211   int rc; 
    212   void *status; 
    213   pthread_attr_t attr; 
    214   int nthreads = NUM_THREADS; 
    215   int giveup = 0;                  /* sentinel */ 
     211  unsigned int ntbytes = 0;        /* the number of compressed bytes */ 
     212  unsigned int *ntbytes_;          /* placeholder for bytes in output buffer */ 
     213  unsigned int i;                  /* local index variables */ 
    216214 
    217215  /* Compression level */ 
     
    227225 
    228226  /* Shuffle */ 
    229   if (doshuffle != 0 && doshuffle != 1) { 
    230     fprintf(stderr, "`doshuffle` parameter must be either 0 or 1!\n"); 
     227  if (shuffle != 0 && shuffle != 1) { 
     228    fprintf(stderr, "`shuffle` parameter must be either 0 or 1!\n"); 
    231229    return -10; 
    232230  } 
     
    244242 
    245243  /* Compute number of blocks in buffer */ 
    246   tblocks = nbytes / blocksize; 
     244  nblocks = nbytes / blocksize; 
    247245  leftover = nbytes % blocksize; 
    248   tblocks = (leftover>0)? tblocks+1: tblocks; 
    249   /* Blocks per thread */ 
    250   thblocks = tblocks / nthreads; 
    251   leftover2 = tblocks % nthreads; 
    252   thblocks = (leftover2>0)? thblocks+1: thblocks; 
    253   /* Final correction for the number of threads */ 
    254   if (nthreads > (tblocks / thblocks)) { 
    255     nthreads = tblocks / thblocks; 
    256     leftover3 = tblocks % thblocks; 
    257     nthreads = (leftover3>0)? nthreads+1: nthreads; 
    258   } 
     246  nblocks = (leftover>0)? nblocks+1: nblocks; 
    259247 
    260248  /* Check typesize limits */ 
     
    275263  _dest[3] = (unsigned char)typesize;      /* type size */ 
    276264  _dest += 4; 
    277   ctbytes += 4; 
    278   ((unsigned int *)_dest)[0] = nbytes;     /* size of the chunk */ 
     265  ntbytes += 4; 
     266  ((unsigned int *)_dest)[0] = nbytes;     /* size of the buffer */ 
    279267  ((unsigned int *)_dest)[1] = blocksize;  /* block size */ 
    280   ctbytes_ = (unsigned int *)(_dest+8);    /* compressed chunk size (pointer) */ 
     268  ntbytes_ = (unsigned int *)(_dest+8);    /* compressed buffer size */ 
    281269  _dest += sizeof(int)*3; 
    282   ctbytes += sizeof(int)*3; 
     270  ntbytes += sizeof(int)*3; 
    283271  bstarts = (unsigned int *)_dest;         /* starts for every block */ 
    284   _dest += sizeof(int)*tblocks;            /* book space for pointers to */ 
    285   ctbytes += sizeof(int)*tblocks;          /* every block in output */ 
    286  
    287   if (doshuffle == 1) { 
     272  _dest += sizeof(int)*nblocks;            /* book space for pointers to */ 
     273  ntbytes += sizeof(int)*nblocks;          /* every block in output */ 
     274 
     275  if (shuffle == 1) { 
    288276    /* Shuffle is active */ 
    289277    *flags |= 0x1;                         /* bit 0 set to one in flags */ 
    290278  } 
    291279 
    292   /* Initialize and set thread detached attribute */ 
    293   pthread_attr_init(&attr); 
    294   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 
    295  
    296   for(t=0; t < nthreads; t++){ 
    297   /* Create temporary area for each thread */ 
    298 #ifdef _WIN32 
    299     tmp[t] = (unsigned char*)_aligned_malloc(blocksize, 16); 
    300     tmp2[t] = (unsigned char*)_aligned_malloc(blocksize*thblocks, 16); 
    301     thstarts[t] = (unsigned int*)_aligned_malloc(sizeof(int)*thblocks, 16); 
    302 #elif defined __APPLE__ 
    303     /* Mac OS X guarantees 16-byte alignment in small allocs */ 
    304     tmp[t] = (unsigned char *)malloc(blocksize); 
    305     tmp2[t] = (unsigned char *)malloc(blocksize*thblocks); 
    306     thstarts[t] = (unsigned int*)malloc(sizeof(int)*thblocks); 
    307 #else 
    308     rc = posix_memalign((void **)&tmp[t], 16, blocksize); 
    309     rc = posix_memalign((void **)&tmp2[t], 16, blocksize*thblocks); 
    310     rc = posix_memalign((void **)&thstarts[t], 16, sizeof(int)*thblocks); 
    311 #endif  /* _WIN32 */ 
    312     /* Populate parameters for thread */ 
    313     params[t].thread_id = t; 
    314     params[t].clevel = clevel; 
    315     params[t].doshuffle = doshuffle; 
    316     params[t].typesize = typesize; 
    317     params[t].blocksize = blocksize; 
    318     params[t].tblocks = tblocks; 
    319     params[t].thblocks = thblocks; 
    320     if (t == (nthreads - 1) && leftover > 0) { 
    321       params[t].leftover = leftover; 
    322     } 
    323     else { 
    324       params[t].leftover = 0; 
    325     } 
    326     params[t].src = (unsigned char *)src; 
    327     params[t].tmp = tmp[t]; 
    328     params[t].tmp2 = tmp2[t]; 
    329     params[t].thstarts = thstarts[t]; 
    330     rc = pthread_create(&threads[t], &attr, t_blosc_c, (void *)&params[t]); 
    331     if (rc){ 
    332       fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc); 
    333       fprintf(stderr, "\tError detail: %s\n", strerror(rc)); 
    334       exit(-1); 
    335     } 
    336   }  /* Close t < nthreads */ 
    337  
    338   /* Wait for threads to finish (join threads) */ 
    339   pthread_attr_destroy(&attr);    /* free attribute */ 
    340   for(t=0; t<nthreads; t++){ 
    341     rc = pthread_join(threads[t], &status); 
    342     if (rc) { 
    343       fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc); 
    344       fprintf(stderr, "\tError detail: %s\n", strerror(rc)); 
    345       exit(-1); 
    346     } 
    347     if (giveup) { 
    348       /* We have been signaled to give up.  Wait for all threads to finish. */ 
    349       continue; 
    350     } 
    351     cbytes = ((struct thread_data_c *)status)->cbytes; 
    352     /* Check result of compression */ 
    353     if (cbytes < 0) { 
    354       fprintf(stderr, "%s\n", too_long_message); 
    355       ctbytes = cbytes;         /* error in _blosc_c */ 
    356       giveup = 1; 
    357     } 
    358     else if ((cbytes == 0) | (ctbytes+cbytes >= nbytes)) { 
    359       ctbytes = 0;              /* uncompressible data */ 
    360       giveup = 1; 
    361     } 
    362     else { 
    363       /* Compression process seems right */ 
    364       /* Copy block starts */ 
    365       for (j=0; j<thblocks; j++) { 
    366         /* Check that we do not exceed the end of the blocks */ 
    367         if ((t*thblocks+j) < tblocks) { 
    368           bstarts[t*thblocks+j] = thstarts[t][j]+ctbytes; 
    369         } 
    370       } 
    371       /* Copy thread's private buffer to destination buffer */ 
    372       memcpy(_dest, tmp2[t], cbytes); 
    373       _dest += cbytes; 
    374       ctbytes += cbytes;          /* update compressed bytes */ 
    375     } 
    376   }  /* Close t < nthreads */ 
    377  
    378   /* Release buffers */ 
    379   for(t=0; t < nthreads; t++){ 
    380 #ifdef _WIN32 
    381     _aligned_free(tmp[t]); 
    382     _aligned_free(tmp2[t]); 
    383     _aligned_free(thstarts[t]); 
    384 #else 
    385     free(tmp[t]); 
    386     free(tmp2[t]); 
    387     free(thstarts[t]); 
    388 #endif  /* _WIN32 */ 
    389   } 
    390  
    391   assert(ctbytes < nbytes); 
    392   *ctbytes_ = ctbytes;   /* set the number of compressed bytes in header */ 
    393   return ctbytes; 
     280  /* Populate parameters for compression routines */ 
     281  params.compress = 1; 
     282  params.clevel = clevel; 
     283  params.shuffle = shuffle; 
     284  params.typesize = typesize; 
     285  params.blocksize = blocksize; 
     286  params.ntbytes = ntbytes; 
     287  params.nbytes = nbytes; 
     288  params.nblocks = nblocks; 
     289  params.leftover = leftover; 
     290  params.bstarts = bstarts; 
     291  params.src = (unsigned char *)src; 
     292  params.dest = (unsigned char *)dest; 
     293 
     294  /* Do the actual compression */ 
     295  ntbytes = do_job(); 
     296  /* Set the number of compressed bytes in header */ 
     297  *ntbytes_ = ntbytes; 
     298 
     299  assert(ntbytes < nbytes); 
     300  return ntbytes; 
    394301} 
    395302 
    396303 
    397304/* Decompress & unshuffle a single block */ 
    398 static size_t 
    399 _blosc_d(int dounshuffle, size_t typesize, size_t blocksize, int leftoverblock, 
    400          unsigned char* _src, unsigned char* _dest, 
    401          unsigned char *tmp, unsigned char *tmp2) 
    402 { 
    403   size_t j, neblock, nsplits; 
    404   size_t nbytes, cbytes, ctbytes = 0, ntbytes = 0; 
    405   unsigned char* _tmp; 
    406  
    407   if (dounshuffle && (typesize > 1)) { 
     305static int 
     306blosc_d(size_t blocksize, int leftoverblock, 
     307        unsigned char *src, unsigned char *dest, 
     308        unsigned char *tmp, unsigned char *tmp2) 
     309{ 
     310  int j, neblock, nsplits; 
     311  int nbytes;                /* number of decompressed bytes in split */ 
     312  int cbytes;                /* number of compressed bytes in split */ 
     313  int ctbytes = 0;           /* number of compressed bytes in block */ 
     314  int ntbytes = 0;           /* number of uncompressed bytes in block */ 
     315  unsigned char *_tmp; 
     316  size_t typesize = params.typesize; 
     317 
     318  if (shuffle && (typesize > 1)) { 
    408319    _tmp = tmp; 
    409320  } 
    410321  else { 
    411     _tmp = _dest; 
     322    _tmp = dest; 
    412323  } 
    413324 
     
    422333  neblock = blocksize / nsplits; 
    423334  for (j = 0; j < nsplits; j++) { 
    424     cbytes = ((unsigned int *)(_src))[0]; /* amount of compressed bytes */ 
    425     _src += sizeof(int); 
     335    cbytes = ((unsigned int *)(src))[0]; /* amount of compressed bytes */ 
     336    src += sizeof(int); 
    426337    ctbytes += sizeof(int); 
    427338    /* Uncompress */ 
    428339    if (cbytes == neblock) { 
    429       memcpy(_tmp, _src, neblock); 
     340      memcpy(_tmp, src, neblock); 
    430341      nbytes = neblock; 
    431342    } 
    432343    else { 
    433       nbytes = blosclz_decompress(_src, cbytes, _tmp, neblock); 
     344      nbytes = blosclz_decompress(src, cbytes, _tmp, neblock); 
    434345      if (nbytes != neblock) { 
    435346        return -2; 
    436347      } 
    437348    } 
    438     _src += cbytes; 
     349    src += cbytes; 
    439350    ctbytes += cbytes; 
    440     _tmp += neblock; 
     351    _tmp += nbytes; 
    441352    ntbytes += nbytes; 
    442353  } /* Closes j < nsplits */ 
    443354 
    444   if (dounshuffle && (typesize > 1)) { 
    445     if ((uintptr_t)_dest % 16 == 0) { 
    446       /* 16-bytes aligned _dest.  SSE2 unshuffle will work. */ 
    447       unshuffle(typesize, blocksize, tmp, _dest); 
     355  if (shuffle && (typesize > 1)) { 
     356    if ((uintptr_t)dest % 16 == 0) { 
     357      /* 16-bytes aligned dest.  SSE2 unshuffle will work. */ 
     358      unshuffle(typesize, blocksize, tmp, dest); 
    448359    } 
    449360    else { 
    450       /* _dest is not aligned.  Use tmp2, which is aligned, and copy. */ 
     361      /* dest is not aligned.  Use tmp2, which is aligned, and copy. */ 
    451362      unshuffle(typesize, blocksize, tmp, tmp2); 
    452       memcpy(_dest, tmp2, blocksize); 
    453     } 
    454   } 
    455  
    456   return ctbytes; 
    457 } 
    458  
    459  
    460 /* Structure for parameters in decompression routines */ 
    461 struct thread_data_d{ 
    462   size_t typesize; 
    463   size_t blocksize; 
    464   int thread_id; 
    465   int dounshuffle; 
    466   int cbytes; 
    467   unsigned int tblocks; 
    468   unsigned int thblocks; 
    469   unsigned int leftover; 
    470   unsigned int *bstarts;             /* start pointers for each block */ 
    471   unsigned char *src; 
    472   unsigned char *dest; 
    473   unsigned char *tmp; 
    474   unsigned char *tmp2; 
    475 }; 
    476  
    477  
    478 /* Decompress & unshuffle several blocks in a single thread */ 
    479 void *t_blosc_d(void *params) 
    480 { 
    481   /* Parameters */ 
    482   struct thread_data_d *my_params = (struct thread_data_d *)params; 
    483   int thread_id = my_params->thread_id; 
    484   int dounshuffle = my_params->dounshuffle; 
    485   size_t typesize = my_params->typesize; 
    486   size_t blocksize = my_params->blocksize; 
    487   unsigned int tblocks = my_params->tblocks; 
    488   unsigned int thblocks = my_params->thblocks; 
    489   unsigned int *bstarts = my_params->bstarts; 
    490   unsigned int leftover = my_params->leftover; 
    491   unsigned char *src = my_params->src; 
    492   unsigned char *dest = my_params->dest; 
    493   unsigned char *tmp = my_params->tmp; 
    494   unsigned char *tmp2 = my_params->tmp2; 
    495   /* Other local vars */ 
    496   int j, cbytes, ntbytes = 0; 
    497   unsigned int bsize = blocksize; 
    498   unsigned int leftoverblock = 0; 
    499  
    500   for (j = 0; j < thblocks; j++) { 
    501     if (((thread_id*thblocks+j) == (tblocks - 1)) && (leftover > 0)) { 
    502       bsize = leftover; 
    503       leftoverblock = 1; 
    504     } 
    505     cbytes = _blosc_d(dounshuffle, typesize, bsize, leftoverblock, 
    506                       src+bstarts[j], dest+j*blocksize, tmp, tmp2); 
    507     if (cbytes < 0) { 
    508       ntbytes = cbytes;          /* _blosc_d failure */ 
    509       goto out; 
    510     } 
    511     ntbytes += cbytes; 
    512     /* Break if we have reached the end of the blocks */ 
    513     if ((thread_id*thblocks+j) == (tblocks - 1)) { 
    514       goto out; 
    515     } 
    516   } 
    517  
    518  out: 
    519   my_params->cbytes = ntbytes; 
    520   if (NUM_THREADS > 1) { 
    521     pthread_exit((void *)my_params); 
    522   } 
    523   else { 
    524     return (void *)my_params; 
    525   } 
     363      memcpy(dest, tmp2, blocksize); 
     364    } 
     365  } 
     366 
     367  /* Return the number of uncompressed bytes */ 
     368  return ntbytes; 
    526369} 
    527370 
     
    530373blosc_decompress(const void *src, void *dest, size_t dest_size) 
    531374{ 
    532   unsigned char *_src=NULL;          /* alias for source buffer */ 
    533   unsigned char *_dest=NULL;         /* alias for destination buffer */ 
     375  unsigned char *_src=NULL;          /* current pos for source buffer */ 
     376  unsigned char *_dest=NULL;         /* current pos for destination buffer */ 
    534377  unsigned char version, versionlz;  /* versions for compressed header */ 
    535378  unsigned char flags;               /* flags for header */ 
    536   size_t leftover;                   /* extra bytes at end of buffer */ 
    537   size_t tblocks;                    /* number of total blocks in buffer */ 
    538   size_t thblocks;                   /* number of total blocks per thread */ 
    539   size_t leftover2, leftover3;       /* helpers */ 
     379  int shuffle = 0;                   /* do unshuffle? */ 
     380  int ntbytes;                       /* the number of uncompressed bytes */ 
     381  unsigned int nblocks;              /* number of total blocks in buffer */ 
     382  unsigned int leftover;             /* extra bytes at end of buffer */ 
    540383  unsigned int *bstarts;             /* start pointers for each block */ 
    541   int nbytes, cbytes, ntbytes = 0; 
    542   int dounshuffle = 0; 
    543   unsigned int typesize, blocksize, ctbytes; 
    544   size_t t; 
    545   pthread_t threads[NUM_THREADS]; 
    546   unsigned char *tmp[NUM_THREADS], *tmp2[NUM_THREADS]; 
    547   struct thread_data_d params[NUM_THREADS]; 
    548   int rc; 
    549   void *status; 
    550   pthread_attr_t attr; 
    551   int nthreads = NUM_THREADS; 
    552   int giveup; 
     384  unsigned int typesize, blocksize, nbytes, ctbytes; 
     385  unsigned char *tmp, *tmp2; 
     386  int tid; 
    553387 
    554388  _src = (unsigned char *)(src); 
     
    561395  typesize = (unsigned int)_src[3];         /* typesize */ 
    562396  _src += 4; 
    563   nbytes = ((unsigned int *)_src)[0];       /* chunk size */ 
     397  nbytes = ((unsigned int *)_src)[0];       /* buffer size */ 
    564398  blocksize = ((unsigned int *)_src)[1];    /* block size */ 
    565   ctbytes = ((unsigned int *)_src)[2];      /* compressed chunk size */ 
     399  ctbytes = ((unsigned int *)_src)[2];      /* compressed buffer size */ 
    566400  _src += sizeof(int)*3; 
     401  bstarts = (unsigned int *)_src; 
    567402  /* Compute some params */ 
    568403  /* Total blocks */ 
    569   tblocks = nbytes / blocksize; 
     404  nblocks = nbytes / blocksize; 
    570405  leftover = nbytes % blocksize; 
    571   tblocks = (leftover>0)? tblocks+1: tblocks; 
    572   /* Blocks per thread */ 
    573   thblocks = tblocks / nthreads; 
    574   leftover2 = tblocks % nthreads; 
    575   thblocks = (leftover2>0)? thblocks+1: thblocks; 
    576   /* Final correction for the number of threads */ 
    577   if (nthreads > (tblocks / thblocks)) { 
    578     nthreads = tblocks / thblocks; 
    579     leftover3 = tblocks % thblocks; 
    580     nthreads = (leftover3>0)? nthreads+1: nthreads; 
    581   } 
    582   bstarts = (unsigned int *)_src; 
    583   _src += sizeof(int)*tblocks; 
     406  nblocks = (leftover>0)? nblocks+1: nblocks; 
     407  _src += sizeof(int)*nblocks; 
    584408 
    585409  /* Check zero typesizes */ 
     
    595419  if ((flags & 0x1) == 1) { 
    596420    /* Input is shuffled.  Unshuffle it. */ 
    597     dounshuffle = 1; 
    598   } 
    599  
    600   /* Create temporary area for each thread */ 
    601   for(t=0; t<nthreads; t++){ 
    602 #ifdef _WIN32 
    603     tmp[t] = (unsigned char*)_aligned_malloc(blocksize, 16); 
    604     tmp2[t] = (unsigned char*)_aligned_malloc(blocksize, 16); 
    605 #elif defined __APPLE__ 
    606     /* Mac OS X guarantees 16-byte alignment in small allocs */ 
    607     tmp[t] = (unsigned char *)malloc(blocksize); 
    608     tmp2[t] = (unsigned char *)malloc(blocksize); 
    609 #else 
    610     rc = posix_memalign((void **)&tmp[t], 16, blocksize); 
    611     rc = posix_memalign((void **)&tmp2[t], 16, blocksize); 
    612 #endif  /* _WIN32 */ 
    613   } 
     421    shuffle = 1; 
     422  } 
     423 
     424  /* Populate parameters for decompression routines */ 
     425  params.compress = 0; 
     426  params.clevel = 0;            /* specific for compression */ 
     427  params.shuffle = shuffle; 
     428  params.typesize = typesize; 
     429  params.blocksize = blocksize; 
     430  params.ntbytes = 0; 
     431  params.nbytes = nbytes; 
     432  params.nblocks = nblocks; 
     433  params.leftover = leftover; 
     434  params.bstarts = bstarts; 
     435  params.src = (unsigned char *)src; 
     436  params.dest = (unsigned char *)dest; 
     437 
     438  /* Do the actual decompression */ 
     439  ntbytes = do_job(); 
     440 
     441  assert(ntbytes <= dest_size); 
     442  return ntbytes; 
     443} 
     444 
     445 
     446/* Do the compression or decompression of the buffer depending on the 
     447   global params. */ 
     448int 
     449do_job(void) { 
     450  int ntbytes; 
     451 
     452  create_temporaries(); 
     453 
     454  if (nthreads == 1) { 
     455    ntbytes = serial_blosc(); 
     456  } 
     457  else { 
     458    ntbytes = parallel_blosc(); 
     459  } 
     460 
     461  release_temporaries(); 
     462 
     463  return ntbytes; 
     464} 
     465 
     466 
     467/* Serial version for compression/decompression */ 
     468int 
     469serial_blosc(void) 
     470{ 
     471  int j, bsize, leftoverblock, cbytes; 
     472  int compress = params.compress; 
     473  int clevel = params.clevel; 
     474  int shuffle = params.shuffle; 
     475  unsigned int typesize = params.typesize; 
     476  unsigned int blocksize = params.blocksize; 
     477  int ntbytes = params.ntbytes; 
     478  int nbytes = params.nbytes; 
     479  unsigned int nblocks = params.nblocks; 
     480  int leftover = params.nbytes % params.blocksize; 
     481  unsigned int *bstarts = params.bstarts; 
     482  unsigned char *src = params.src; 
     483  unsigned char *dest = params.dest; 
     484  unsigned char *tmp = params.tmp[0];     /* tmp for thread 0 */ 
     485  unsigned char *tmp2 = params.tmp2[0];   /* tmp2 for thread 0 */ 
     486 
     487  for (j = 0; j < nblocks; j++) { 
     488    if (compress) { 
     489      bstarts[j] = ntbytes; 
     490    } 
     491    bsize = blocksize; 
     492    leftoverblock = 0; 
     493    if ((j == nblocks - 1) && (leftover > 0)) { 
     494      bsize = leftover; 
     495      leftoverblock = 1; 
     496    } 
     497    if (compress) { 
     498      cbytes = blosc_c(bsize, leftoverblock, ntbytes, nbytes, 
     499                       src+j*blocksize, dest+bstarts[j], tmp); 
     500      if (cbytes == 0) { 
     501        ntbytes = 0;              /* uncompressible data */ 
     502        break; 
     503      } 
     504    } 
     505    else { 
     506      cbytes = blosc_d(bsize, leftoverblock, 
     507                       src+bstarts[j], dest+j*blocksize, tmp, tmp2); 
     508    } 
     509    if (cbytes < 0) { 
     510      ntbytes = cbytes;         /* error in blosc_c / blosc_d */ 
     511      break; 
     512    } 
     513    ntbytes += cbytes; 
     514  } 
     515 
     516  /* Final check for ntbytes (only in compression mode) */ 
     517  if (compress) { 
     518    if (ntbytes == nbytes) { 
     519      ntbytes = 0;               /* non-compressible data */ 
     520    } 
     521    else if (ntbytes > nbytes) { 
     522      fprintf(stderr, "The impossible happened: buffer overflow!\n"); 
     523      ntbytes = -5;               /* too large buffer */ 
     524    } 
     525  }  /* Close j < nblocks */ 
     526 
     527  return ntbytes; 
     528} 
     529 
     530 
     531/* Threaded version for compression/decompression */ 
     532int 
     533parallel_blosc(void) 
     534{ 
     535  int rc; 
     536 
     537  /* Synchronization point for all threads (wait for initialization) */ 
     538  rc = pthread_barrier_wait(&barr_init); 
     539  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { 
     540    printf("Could not wait on barrier (init)\n"); 
     541    exit(-1); 
     542  } 
     543  /* Synchronization point for all threads (wait for finalization) */ 
     544  rc = pthread_barrier_wait(&barr_finish); 
     545  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { 
     546    printf("Could not wait on barrier (finish)\n"); 
     547    exit(-1); 
     548  } 
     549 
     550  /* Return the total bytes decompressed in threads */ 
     551  return params.ntbytes; 
     552} 
     553 
     554 
     555/* Decompress & unshuffle several blocks in a single thread */ 
     556void *t_blosc(void *tids) 
     557{ 
     558  int tid = *(int *)tids; 
     559  int cbytes, ntdest; 
     560  unsigned int tblocks;              /* number of blocks per thread */ 
     561  unsigned int leftover2; 
     562  int nblock_;                       /* private copy of nblock */ 
     563  int tblock;                        /* limit block on a thread */ 
     564  int rc; 
     565  unsigned int bsize, leftoverblock; 
     566 
     567  while (1) { 
     568    /* Meeting point for all threads (wait for initialization) */ 
     569    rc = pthread_barrier_wait(&barr_init); 
     570    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { 
     571      printf("Could not wait on barrier (init)\n"); 
     572      exit(-1); 
     573    } 
     574 
     575    /* Get parameters for this thread */ 
     576    size_t typesize = params.typesize; 
     577    size_t blocksize = params.blocksize; 
     578    size_t ebsize = blocksize + params.typesize*sizeof(int); 
     579    int compress = params.compress; 
     580    int clevel = params.clevel; 
     581    int shuffle = params.shuffle; 
     582    unsigned int nbytes = params.nbytes; 
     583    unsigned int ntbytes = params.ntbytes; 
     584    unsigned int nblocks = params.nblocks; 
     585    unsigned int leftover = params.leftover; 
     586    unsigned int *bstarts = params.bstarts; 
     587    unsigned char *src = params.src; 
     588    unsigned char *dest = params.dest; 
     589    unsigned char *tmp = params.tmp[tid]; 
     590    unsigned char *tmp2 = params.tmp2[tid]; 
     591 
     592    giveup = 0; 
     593    if (compress) { 
     594      /* Compression always has to follow the block order */ 
     595      pthread_mutex_lock(&count_mutex); 
     596      nblock++; 
     597      nblock_ = nblock; 
     598      pthread_mutex_unlock(&count_mutex); 
     599      tblock = nblocks; 
     600    } 
     601    else { 
     602      /* Decompression can happen using any order.  We choose 
     603       sequential block order on each thread */ 
     604 
     605      /* Blocks per thread */ 
     606      tblocks = nblocks / nthreads; 
     607      leftover2 = nblocks % nthreads; 
     608      tblocks = (leftover2>0)? tblocks+1: tblocks; 
     609 
     610      nblock_ = tid*tblocks; 
     611      tblock = nblock_ + tblocks; 
     612      if (tblock > nblocks) { 
     613        tblock = nblocks; 
     614      } 
     615      ntbytes = 0; 
     616    } 
     617 
     618    /* Loop over blocks */ 
     619    leftoverblock = 0; 
     620    while ((nblock_ < tblock) && !giveup) { 
     621      bsize = blocksize; 
     622      if (nblock_ == (nblocks - 1) && (leftover > 0)) { 
     623        bsize = leftover; 
     624        leftoverblock = 1; 
     625      } 
     626      if (compress) { 
     627        cbytes = blosc_c(bsize, leftoverblock, 0, ebsize, 
     628                         src+nblock_*blocksize, tmp2, tmp); 
     629      } 
     630      else { 
     631        cbytes = blosc_d(bsize, leftoverblock, 
     632                         src+bstarts[nblock_], dest+nblock_*blocksize, 
     633                         tmp, tmp2); 
     634      } 
     635 
     636      /* Check whether current thread has to giveup */ 
     637      /* This is critical in order to not overwrite variables */ 
     638      if (giveup != 0) { 
     639        break; 
     640      } 
     641 
     642      /* Check results for the decompressed block */ 
     643      if (cbytes < 0) {            /* compr/decompr failure */ 
     644        /* Set cbytes code error first */ 
     645        pthread_mutex_lock(&count_mutex); 
     646        params.ntbytes = cbytes; 
     647        pthread_mutex_unlock(&count_mutex); 
     648        giveup = 1;                 /* give up (de-)compressing after */ 
     649        break; 
     650      } 
     651 
     652      if (compress) { 
     653        /* Start critical section */ 
     654        pthread_mutex_lock(&count_mutex); 
     655        if (cbytes == 0) {            /* uncompressible buffer */ 
     656          params.ntbytes = 0; 
     657          pthread_mutex_unlock(&count_mutex); 
     658          giveup = 1;                 /* give up compressing */ 
     659          break; 
     660        } 
     661        bstarts[nblock_] = params.ntbytes;   /* update block start counter */ 
     662        ntdest = params.ntbytes; 
     663        if (ntdest+cbytes > nbytes) {         /* uncompressible buffer */ 
     664          params.ntbytes = 0; 
     665          pthread_mutex_unlock(&count_mutex); 
     666          giveup = 1; 
     667          break; 
     668        } 
     669        nblock++; 
     670        nblock_ = nblock; 
     671        params.ntbytes += cbytes;       /* update return bytes counter */ 
     672        pthread_mutex_unlock(&count_mutex); 
     673        /* End of critical section */ 
     674 
     675        /* Copy the compressed buffer to destination */ 
     676        memcpy(dest+ntdest, tmp2, cbytes); 
     677      } 
     678      else { 
     679        nblock_++; 
     680      } 
     681      /* Update counter for this thread */ 
     682      ntbytes += cbytes; 
     683 
     684    } /* closes while (nblock_) */ 
     685 
     686    if (!compress && !giveup) { 
     687      /* Update global counter for all threads (decompression only) */ 
     688      pthread_mutex_lock(&count_mutex); 
     689      params.ntbytes += ntbytes; 
     690      pthread_mutex_unlock(&count_mutex); 
     691    } 
     692 
     693    /* Meeting point for all threads (wait for finalization) */ 
     694    rc = pthread_barrier_wait(&barr_finish); 
     695    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { 
     696      printf("Could not wait on barrier (finish)\n"); 
     697      exit(-1); 
     698    } 
     699    /* Reset block counter */ 
     700    nblock = -1; 
     701 
     702  }  /* closes while(1) */ 
     703 
     704  /* This should never be reached, but anyway */ 
     705  return(0); 
     706} 
     707 
     708 
     709int 
     710blosc_init_threads(int nthreads_) 
     711{ 
     712  int tid, rc; 
     713 
     714  //printf("Init!\n"); 
     715  if (init_done) { 
     716    //printf("init already done!\n"); 
     717    return(0); 
     718  } 
     719 
     720  nthreads = nthreads_; 
     721  if (nthreads > MAX_THREADS) { 
     722    fprintf(stderr, "Error.  nthreads cannot be larger than MAX_THREADS (%d)", 
     723            MAX_THREADS); 
     724    exit(-1); 
     725  } 
     726  else if (nthreads <= 0) { 
     727    fprintf(stderr, "Error.  nthreads must be a positive integer"); 
     728    exit(-1); 
     729  } 
     730 
     731  /* Initialize mutex and condition variable objects */ 
     732  pthread_mutex_init(&count_mutex, NULL); 
     733 
     734  /* Barrier initialization */ 
     735  pthread_barrier_init(&barr_init, NULL, nthreads+1); 
     736  pthread_barrier_init(&barr_inter, NULL, nthreads); 
     737  pthread_barrier_init(&barr_finish, NULL, nthreads+1); 
    614738 
    615739  /* Initialize and set thread detached attribute */ 
    616   pthread_attr_init(&attr); 
    617   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 
    618  
    619   for(t=0; t < nthreads; t++){ 
    620     params[t].thread_id = t; 
    621     params[t].dounshuffle = dounshuffle; 
    622     params[t].typesize = typesize; 
    623     params[t].blocksize = blocksize; 
    624     params[t].tblocks = tblocks; 
    625     params[t].thblocks = thblocks; 
    626     params[t].bstarts = bstarts + t*thblocks; 
    627     if (t == (nthreads - 1) && leftover > 0) { 
    628       params[t].leftover = leftover; 
    629     } 
    630     else { 
    631       params[t].leftover = 0; 
    632     } 
    633     params[t].src = (unsigned char *)src; 
    634     params[t].dest = (unsigned char *)dest + t*thblocks*blocksize; 
    635     params[t].tmp = tmp[t]; 
    636     params[t].tmp2 = tmp2[t]; 
    637     if (nthreads > 1) { 
    638       rc = pthread_create(&threads[t], &attr, t_blosc_d, (void *)&params[t]); 
    639       if (rc){ 
    640         fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc); 
    641         fprintf(stderr, "\tError detail: %s\n", strerror(rc)); 
    642         exit(-1); 
    643       } 
    644     } 
    645     else { 
    646       status = t_blosc_d((void *)&params[t]); 
    647       cbytes = ((struct thread_data_d *)status)->cbytes; 
    648       if (cbytes < 0) { 
    649         nbytes = cbytes;          /* _blosc_d failure */ 
    650         goto out; 
    651       } 
    652       ntbytes += cbytes;          /* update decompressed bytes */ 
    653     } 
    654   } 
    655  
    656   /* Wait for threads to finish (join threads) */ 
    657   pthread_attr_destroy(&attr);    /* free attribute */ 
    658   if (nthreads > 1) { 
    659     for(t=0; t < nthreads; t++){ 
    660       rc = pthread_join(threads[t], &status); 
    661       if (rc) { 
    662         fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc); 
    663         fprintf(stderr, "\tError detail: %s\n", strerror(rc)); 
    664         exit(-1); 
    665       } 
    666       if (giveup) { 
    667         continue; 
    668       } 
    669       cbytes = ((struct thread_data_d *)status)->cbytes; 
    670       if (cbytes < 0) { 
    671         nbytes = cbytes;          /* _blosc_d failure */ 
    672         giveup = 1; 
    673       } 
    674       ntbytes += cbytes;          /* update decompressed bytes */ 
    675     } 
    676   } 
    677  
    678  out: 
    679   /* Release buffers */ 
    680   for(t=0; t < nthreads; t++){ 
    681 #ifdef _WIN32 
    682     _aligned_free(tmp[t]); 
    683     _aligned_free(tmp2[t]); 
    684 #else 
    685     free(tmp[t]); 
    686     free(tmp2[t]); 
    687 #endif  /* _WIN32 */ 
    688   } 
    689  
    690   /* assert(ntbytes+bstarts[0] == ctbytes); */ 
    691   return nbytes; 
    692 } 
     740  pthread_attr_init(&ct_attr); 
     741  pthread_attr_setdetachstate(&ct_attr, PTHREAD_CREATE_DETACHED); 
     742 
     743  /* Finally, create the threads in detached state */ 
     744  for (tid = 0; tid < nthreads; tid++) { 
     745    tids[tid] = tid; 
     746    rc = pthread_create(&threads[tid], &ct_attr, t_blosc, (void *)&tids[tid]); 
     747    if (rc) { 
     748      fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc); 
     749      fprintf(stderr, "\tError detail: %s\n", strerror(rc)); 
     750      exit(-1); 
     751    } 
     752  } 
     753 
     754  init_done = 1;                /* Initialization done! */ 
     755 
     756  return(0); 
     757} 
  • branches/threaded/src/blosc.h

    r51 r59  
    2525/* The combined blosc and blosclz formats */ 
    2626#define BLOSC_VERSION_CFORMAT (BLOSC_VERSION_FORMAT << 8) & (BLOSCLZ_VERSION_FORMAT) 
     27 
     28 
     29/* Initialize pool of threads and other structures */ 
     30int blosc_init_threads(int nthreads); 
    2731 
    2832 
Note: See TracChangeset for help on using the changeset viewer.