Changeset 51


Ignore:
Timestamp:
03/30/10 06:35:57 (3 years ago)
Author:
faltet
Message:

First version of threaded blosc. Performance increase has not been detected yet (rather the contrary :-/).

Location:
branches/threaded/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • branches/threaded/src/blosc.c

    r49 r51  
    1414#include <sys/types.h> 
    1515#include <sys/stat.h> 
     16#include <assert.h> 
    1617#include "blosc.h" 
    1718#include "blosclz.h" 
     
    2223  #include <stdint.h> 
    2324  #include <unistd.h> 
     25  #include <pthread.h> 
    2426#endif  /* _WIN32 */ 
    2527 
     
    3436#define MAXSPLITS 16         /* Cannot be larger than 128 */ 
    3537 
     38/* The number of threads */ 
     39#define NUM_THREADS 2 
    3640 
    3741/* Global variables for compressing/shuffling actions */ 
     
    120124  unsigned char *_dest=NULL;       /* alias for destination buffer */ 
    121125  unsigned char *flags;            /* flags for header */ 
    122   unsigned int *starts;            /* start pointers for each block */ 
     126  unsigned int *bstarts;           /* start pointers for each block */ 
    123127  size_t nblocks;                  /* number of complete blocks in buffer */ 
    124128  size_t tblocks;                  /* number of total blocks in buffer */ 
     
    202206  _dest += sizeof(int)*3; 
    203207  ctbytes += sizeof(int)*3; 
    204   starts = (unsigned int *)_dest;          /* starts for every block */ 
     208  bstarts = (unsigned int *)_dest;         /* starts for every block */ 
    205209  _dest += sizeof(int)*tblocks;            /* book space for pointers to */ 
    206210  ctbytes += sizeof(int)*tblocks;          /* every block in output */ 
     
    212216 
    213217  for (j = 0; j < tblocks; j++) { 
    214     starts[j] = ctbytes; 
     218    bstarts[j] = ctbytes; 
    215219    bsize = blocksize; 
    216220    leftoverblock = 0; 
     
    318322 
    319323  return ctbytes; 
     324} 
     325 
     326 
     327/* Structure for parameters in decompression routines */ 
     328struct thread_data_d{ 
     329  size_t typesize; 
     330  size_t blocksize; 
     331  int thread_id; 
     332  int dounshuffle; 
     333  int cbytes; 
     334  unsigned int tblocks; 
     335  unsigned int thblocks; 
     336  unsigned int leftover; 
     337  unsigned int *bstarts;             /* start pointers for each block */ 
     338  unsigned char *src; 
     339  unsigned char *dest; 
     340  unsigned char *tmp; 
     341  unsigned char *tmp2; 
     342}; 
     343 
     344 
     345/* Decompress & unshuffle several blocks in a single thread */ 
     346void *t_blosc_d(void *params) 
     347{ 
     348  /* Parameters */ 
     349  struct thread_data_d *my_params = (struct thread_data_d *)params; 
     350  int thread_id = my_params->thread_id; 
     351  int dounshuffle = my_params->dounshuffle; 
     352  size_t typesize = my_params->typesize; 
     353  size_t blocksize = my_params->blocksize; 
     354  unsigned int tblocks = my_params->tblocks; 
     355  unsigned int thblocks = my_params->thblocks; 
     356  unsigned int *bstarts = my_params->bstarts; 
     357  unsigned int leftover = my_params->leftover; 
     358  unsigned char *src = my_params->src; 
     359  unsigned char *dest = my_params->dest; 
     360  unsigned char *tmp = my_params->tmp; 
     361  unsigned char *tmp2 = my_params->tmp2; 
     362  /* Other local vars */ 
     363  int j, cbytes, ntbytes = 0; 
     364  unsigned int bsize = blocksize; 
     365  unsigned int leftoverblock = 0; 
     366 
     367  for (j = 0; j < thblocks; j++) { 
     368    if (((thread_id*thblocks+j) == (tblocks - 1)) && (leftover > 0)) { 
     369      bsize = leftover; 
     370      leftoverblock = 1; 
     371    } 
     372    /* printf("dounshuffle, typesize, bsize, leftoverblock-->%d,%d,%d,%d\n", */ 
     373    /*        dounshuffle, typesize, bsize, leftoverblock); */ 
     374    /* printf("src, dest, tmp, tmp2, thread-->%p,%p,%p,%p,%d,%d\n", */ 
     375    /*        src+bstarts[j], dest+j*blocksize, tmp, tmp2, thread_id, j); */ 
     376    cbytes = _blosc_d(dounshuffle, typesize, bsize, leftoverblock, 
     377                      src+bstarts[j], dest+j*blocksize, tmp, tmp2); 
     378    //printf("cbytes2-->%d\n", cbytes); 
     379    if (cbytes < 0) { 
     380      //printf("Ei!!"); 
     381      ntbytes = cbytes;          /* _blosc_d failure */ 
     382      break; 
     383      //goto out; 
     384    } 
     385    ntbytes += cbytes; 
     386    /* Break if we have reached the end of the blocks */ 
     387    if ((thread_id*thblocks+j) == (tblocks - 1)) { 
     388      break; 
     389    } 
     390  } 
     391 
     392 out: 
     393  //printf("ntbytes-->%d\n", ntbytes); 
     394  my_params->cbytes = ntbytes; 
     395  pthread_exit((void *)my_params); 
    320396} 
    321397 
     
    331407  size_t nblocks;                    /* number of complete blocks in buffer */ 
    332408  size_t tblocks;                    /* number of total blocks in buffer */ 
    333   size_t j; 
    334   size_t nbytes, ntbytes = 0; 
    335   int cbytes; 
    336   unsigned char *tmp, *tmp2; 
     409  size_t thblocks;                   /* number of total blocks per thread */ 
     410  unsigned int *bstarts;             /* start pointers for each block */ 
     411  size_t nblocks2, leftover2; 
     412  size_t t; 
     413  int nbytes, cbytes, ntbytes = 0; 
    337414  int dounshuffle = 0; 
    338   unsigned int typesize, blocksize, bsize, ctbytes_; 
    339   int leftoverblock;               /* left over block? */ 
     415  unsigned int typesize, blocksize, ctbytes; 
     416  pthread_t threads[NUM_THREADS]; 
     417  unsigned char *tmp[NUM_THREADS], *tmp2[NUM_THREADS]; 
     418  struct thread_data_d params[NUM_THREADS]; 
     419  int rc; 
     420  void *status; 
     421  pthread_attr_t attr; 
    340422 
    341423  _src = (unsigned char *)(src); 
     
    350432  nbytes = ((unsigned int *)_src)[0];       /* chunk size */ 
    351433  blocksize = ((unsigned int *)_src)[1];    /* block size */ 
    352   ctbytes_ = ((unsigned int *)_src)[2];     /* compressed chunk size */ 
     434  ctbytes = ((unsigned int *)_src)[2];      /* compressed chunk size */ 
    353435  _src += sizeof(int)*3; 
    354436  /* Compute some params */ 
     437  /* Total blocks */ 
    355438  nblocks = nbytes / blocksize; 
    356439  leftover = nbytes % blocksize; 
    357440  tblocks = (leftover>0)? nblocks+1: nblocks; 
    358   _src += sizeof(int)*tblocks;              /* skip starts of blocks */ 
     441  //printf("tblocks-->%d\n", tblocks); 
     442  /* Blocks per thread */ 
     443  nblocks2 = tblocks / NUM_THREADS; 
     444  leftover2 = tblocks % NUM_THREADS; 
     445  thblocks = (leftover2>0)? nblocks2+1: nblocks2; 
     446  //printf("thblocks-->%d\n", thblocks); 
     447  bstarts = (unsigned int *)_src; 
     448  _src += sizeof(int)*tblocks; 
    359449 
    360450  /* Check zero typesizes */ 
     
    364454 
    365455  if (nbytes > dest_size) { 
    366     /* This should never happen, but just in case. */ 
     456    /* This should never happen but just in case */ 
    367457    return -1; 
    368458  } 
     
    373463  } 
    374464 
    375   /* Create temporary area */ 
     465  /* Create temporary area for each thread */ 
     466  for(t=0; t<NUM_THREADS; t++){ 
    376467#ifdef _WIN32 
    377   tmp = (unsigned char*)_aligned_malloc(blocksize, 16); 
    378   tmp2 = (unsigned char*)_aligned_malloc(blocksize, 16); 
     468    tmp[t] = (unsigned char*)_aligned_malloc(blocksize, 16); 
     469    tmp2[t] = (unsigned char*)_aligned_malloc(blocksize, 16); 
    379470#elif defined __APPLE__ 
    380   /* Mac OS X guarantees 16-byte alignment in small allocs */ 
    381   tmp = (unsigned char *)(malloc(blocksize)); 
    382   tmp2 = (unsigned char *)(malloc(blocksize)); 
     471    /* Mac OS X guarantees 16-byte alignment in small allocs */ 
     472    tmp[t] = (unsigned char *)malloc(blocksize); 
     473    tmp2[t] = (unsigned char *)malloc(blocksize); 
    383474#else 
    384   posix_memalign((void **)&tmp, 16, blocksize); 
    385   posix_memalign((void **)&tmp2, 16, blocksize); 
     475    posix_memalign((void **)&tmp[t], 16, blocksize); 
     476    posix_memalign((void **)&tmp2[t], 16, blocksize); 
    386477#endif  /* _WIN32 */ 
    387  
    388   for (j = 0; j < tblocks; j++) { 
    389     bsize = blocksize; 
    390     leftoverblock = 0; 
    391     if ((j == tblocks - 1) && (leftover > 0)) { 
    392       bsize = leftover; 
    393       leftoverblock = 1; 
    394     } 
    395     cbytes = _blosc_d(dounshuffle, typesize, bsize, leftoverblock, 
    396                       _src, _dest, tmp, tmp2); 
     478  } 
     479 
     480  /* Initialize and set thread detached attribute */ 
     481  pthread_attr_init(&attr); 
     482  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 
     483 
     484  for(t=0; t < NUM_THREADS; t++){ 
     485    params[t].thread_id = t; 
     486    params[t].dounshuffle = dounshuffle; 
     487    params[t].typesize = typesize; 
     488    params[t].blocksize = blocksize; 
     489    params[t].tblocks = tblocks; 
     490    params[t].thblocks = thblocks; 
     491    params[t].bstarts = bstarts + t*thblocks; 
     492    if (t == (NUM_THREADS - 1) && leftover > 0) { 
     493      params[t].leftover = leftover; 
     494    } 
     495    else { 
     496      params[t].leftover = 0; 
     497    } 
     498    //printf("Thread: %d\tbstarts: %d\n", t, bstarts[t*thblocks]); 
     499    params[t].src = (unsigned char *)src;   // + bstarts[t*thblocks]; 
     500    params[t].dest = (unsigned char *)dest + t*thblocks*blocksize; 
     501    params[t].tmp = tmp[t]; 
     502    params[t].tmp2 = tmp2[t]; 
     503    //printf("In main: creating thread %ld\n", t); 
     504    rc = pthread_create(&threads[t], &attr, t_blosc_d, (void *)&params[t]); 
     505    if (rc){ 
     506      fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc); 
     507      exit(-1); 
     508    } 
     509  } 
     510 
     511  /* Wait for threads to finish (join threads) */ 
     512  pthread_attr_destroy(&attr);    /* free attribute */ 
     513  for(t=0; t < NUM_THREADS; t++){ 
     514    rc = pthread_join(threads[t], &status); 
     515    if (rc) { 
     516      fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc); 
     517      exit(-1); 
     518    } 
     519    cbytes = ((struct thread_data_d *)status)->cbytes; 
     520    //printf("Thread %d.  cbytes-->%d\n", t, cbytes); 
    397521    if (cbytes < 0) { 
    398522      ntbytes = cbytes;          /* _blosc_d failure */ 
    399523      goto out; 
    400524    } 
    401     _src += cbytes; 
    402     _dest += blocksize; 
    403     ntbytes += blocksize; 
     525    ntbytes += cbytes;          /* update decompressed bytes */ 
    404526  } 
    405527 
    406528 out: 
     529  for(t=0; t<NUM_THREADS; t++){ 
    407530#ifdef _WIN32 
    408   _aligned_free(tmp); 
    409   _aligned_free(tmp2); 
     531    _aligned_free(tmp[t]); 
     532    _aligned_free(tmp2[t]); 
    410533#else 
    411   free(tmp); 
    412   free(tmp2); 
     534    free(tmp[t]); 
     535    free(tmp2[t]); 
    413536#endif  /* _WIN32 */ 
     537  } 
     538 
     539  //printf("ntbytes, ctbytes-->%d, %d\n", ntbytes, ctbytes); 
     540  assert(ntbytes+bstarts[0] == ctbytes); 
    414541  return nbytes; 
    415542} 
    416  
    417  
  • branches/threaded/src/blosc.h

    r49 r51  
    1313/* Version numbers */ 
    1414#define BLOSC_VERSION_MAJOR    0    /* for major interface/format changes  */ 
    15 #define BLOSC_VERSION_MINOR    8    /* for minor interface/format changes  */ 
    16 #define BLOSC_VERSION_RELEASE  2    /* for tweaks, bug-fixes, or development */ 
     15#define BLOSC_VERSION_MINOR    9    /* for minor interface/format changes  */ 
     16#define BLOSC_VERSION_RELEASE  0    /* for tweaks, bug-fixes, or development */ 
    1717 
    18 #define BLOSC_VERSION_STRING   "0.8.2"  /* string version.  Sync with above! */ 
     18#define BLOSC_VERSION_STRING   "0.9.0.dev"  /* string version.  Sync with above! */ 
    1919#define BLOSC_VERSION_DATE     "2010-03-30"      /* date version */ 
    2020 
Note: See TracChangeset for help on using the changeset viewer.