Changeset 54


Ignore:
Timestamp:
03/30/10 14:36:01 (3 years ago)
Author:
faltet
Message:

New version for accelerating the decompressor when NUM_THREADS=1.

Some code cleanup done too, and added a README-threaded.txt file for
telling my experience with threaded Blosc to those who are interested.

Location:
branches/threaded
Files:
1 added
2 edited

Legend:

Unmodified
Added
Removed
  • branches/threaded/src/bench.c

    r49 r54  
    3636 
    3737#define NITER  (20*1000)               /* Number of iterations */ 
    38 //#define NITER  (1)               /* Number of iterations */ 
    3938 
    4039 
     
    109108  int *_srccpy; 
    110109  int rshift = 22;              /* For random data */ 
     110  //int rshift = 20;              /* For random data */ 
    111111  int clevel; 
    112112  int doshuffle = 1;            /* Shuffle? */ 
     
    126126 
    127127  /* Initialize the original buffer */ 
    128   /* _src = (int *)src; */ 
    129   /* _srccpy = (int *)srccpy; */ 
    130   /* elsize = sizeof(int); */ 
    131   /* for(i = 0; i < size/elsize; ++i) { */ 
    132   /*   /\* Choose one below *\/ */ 
    133   /*   /\* _src[i] = 1; *\/ */ 
    134   /*   /\* _src[i] = 0x01010101; *\/ */ 
    135   /*   /\* _src[i] = 0x01020304; *\/ */ 
    136   /*   /\* _src[i] = i * 1/.3; *\/ */ 
    137   /*   /\* _src[i] = i; *\/ */ 
    138   /*   _src[i] = rand() >> rshift; */ 
     128  _src = (int *)src; 
     129  _srccpy = (int *)srccpy; 
     130  elsize = sizeof(int); 
     131  for(i = 0; i < size/elsize; ++i) { 
     132    /* Choose one below */ 
     133    /* _src[i] = 1; */ 
     134    /* _src[i] = 0x01010101; */ 
     135    /* _src[i] = 0x01020304; */ 
     136    /* _src[i] = i * 1/.3; */ 
     137    /* _src[i] = i; */ 
     138    _src[i] = rand() >> rshift; 
     139  } 
     140 
     141  /* For data coming from a file */ 
     142  /* fd = open(filename, 0); */ 
     143  /* status = read(fd, src, size); */ 
     144  /* if (status == -1) { */ 
     145  /*   perror(NULL); */ 
    139146  /* } */ 
    140  
    141   /* For data coming from a file */ 
    142   fd = open(filename, 0); 
    143   status = read(fd, src, size); 
    144   if (status == -1) { 
    145     perror(NULL); 
    146   } 
    147   close(fd); 
     147  /* close(fd); */ 
    148148 
    149149  printf("********************** Setup info *****************************\n"); 
     
    166166 
    167167  for (clevel=1; clevel<10; clevel++) { 
    168   //for (clevel=9; clevel<10; clevel++) { 
    169168 
    170169    printf("Compression level: %d\n", clevel); 
     
    200199      printf("FAILED.  Error code: %d\n", nbytes); 
    201200    } 
    202     printf("Orig bytes: %d\tFinal bytes: %d\n", cbytes, nbytes); 
     201    /* printf("Orig bytes: %d\tFinal bytes: %d\n", cbytes, nbytes); */ 
    203202 
    204203    /* Check if data has had a good roundtrip */ 
  • branches/threaded/src/blosc.c

    r51 r54  
    370370      leftoverblock = 1; 
    371371    } 
    372     /* printf("dounshuffle, typesize, bsize, leftoverblock-->%d,%d,%d,%d\n", */ 
    373     /*        dounshuffle, typesize, bsize, leftoverblock); */ 
    374     /* printf("src, dest, tmp, tmp2, thread-->%p,%p,%p,%p,%d,%d\n", */ 
    375     /*        src+bstarts[j], dest+j*blocksize, tmp, tmp2, thread_id, j); */ 
    376372    cbytes = _blosc_d(dounshuffle, typesize, bsize, leftoverblock, 
    377373                      src+bstarts[j], dest+j*blocksize, tmp, tmp2); 
    378     //printf("cbytes2-->%d\n", cbytes); 
    379374    if (cbytes < 0) { 
    380       //printf("Ei!!"); 
    381375      ntbytes = cbytes;          /* _blosc_d failure */ 
    382       break; 
    383       //goto out; 
     376      goto out; 
    384377    } 
    385378    ntbytes += cbytes; 
    386379    /* Break if we have reached the end of the blocks */ 
    387380    if ((thread_id*thblocks+j) == (tblocks - 1)) { 
    388       break; 
     381      goto out; 
    389382    } 
    390383  } 
    391384 
    392385 out: 
    393   //printf("ntbytes-->%d\n", ntbytes); 
    394386  my_params->cbytes = ntbytes; 
    395   pthread_exit((void *)my_params); 
     387  if (NUM_THREADS > 1) { 
     388    pthread_exit((void *)my_params); 
     389  } 
     390  else { 
     391    return (void *)my_params; 
     392  } 
    396393} 
    397394 
     
    405402  unsigned char flags;               /* flags for header */ 
    406403  size_t leftover;                   /* extra bytes at end of buffer */ 
    407   size_t nblocks;                    /* number of complete blocks in buffer */ 
    408404  size_t tblocks;                    /* number of total blocks in buffer */ 
    409405  size_t thblocks;                   /* number of total blocks per thread */ 
    410406  unsigned int *bstarts;             /* start pointers for each block */ 
    411   size_t nblocks2, leftover2; 
     407  size_t leftover2, leftover3; 
    412408  size_t t; 
    413409  int nbytes, cbytes, ntbytes = 0; 
     
    420416  void *status; 
    421417  pthread_attr_t attr; 
     418  int nthreads = NUM_THREADS; 
    422419 
    423420  _src = (unsigned char *)(src); 
     
    436433  /* Compute some params */ 
    437434  /* Total blocks */ 
    438   nblocks = nbytes / blocksize; 
     435  tblocks = nbytes / blocksize; 
    439436  leftover = nbytes % blocksize; 
    440   tblocks = (leftover>0)? nblocks+1: nblocks; 
    441   //printf("tblocks-->%d\n", tblocks); 
     437  tblocks = (leftover>0)? tblocks+1: tblocks; 
    442438  /* Blocks per thread */ 
    443   nblocks2 = tblocks / NUM_THREADS; 
    444   leftover2 = tblocks % NUM_THREADS; 
    445   thblocks = (leftover2>0)? nblocks2+1: nblocks2; 
    446   //printf("thblocks-->%d\n", thblocks); 
     439  thblocks = tblocks / nthreads; 
     440  leftover2 = tblocks % nthreads; 
     441  thblocks = (leftover2>0)? thblocks+1: thblocks; 
     442  /* Final correction for the number of threads */ 
     443  if (nthreads > (tblocks / thblocks)) { 
     444    nthreads = tblocks / thblocks; 
     445    leftover3 = tblocks % thblocks; 
     446    nthreads = (leftover3>0)? nthreads+1: nthreads; 
     447  } 
    447448  bstarts = (unsigned int *)_src; 
    448449  _src += sizeof(int)*tblocks; 
     
    464465 
    465466  /* Create temporary area for each thread */ 
    466   for(t=0; t<NUM_THREADS; t++){ 
     467  for(t=0; t<nthreads; t++){ 
    467468#ifdef _WIN32 
    468469    tmp[t] = (unsigned char*)_aligned_malloc(blocksize, 16); 
     
    482483  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 
    483484 
    484   for(t=0; t < NUM_THREADS; t++){ 
     485  for(t=0; t < nthreads; t++){ 
    485486    params[t].thread_id = t; 
    486487    params[t].dounshuffle = dounshuffle; 
     
    490491    params[t].thblocks = thblocks; 
    491492    params[t].bstarts = bstarts + t*thblocks; 
    492     if (t == (NUM_THREADS - 1) && leftover > 0) { 
     493    if (t == (nthreads - 1) && leftover > 0) { 
    493494      params[t].leftover = leftover; 
    494495    } 
     
    496497      params[t].leftover = 0; 
    497498    } 
    498     //printf("Thread: %d\tbstarts: %d\n", t, bstarts[t*thblocks]); 
    499     params[t].src = (unsigned char *)src;   // + bstarts[t*thblocks]; 
     499    params[t].src = (unsigned char *)src; 
    500500    params[t].dest = (unsigned char *)dest + t*thblocks*blocksize; 
    501501    params[t].tmp = tmp[t]; 
    502502    params[t].tmp2 = tmp2[t]; 
    503     //printf("In main: creating thread %ld\n", t); 
    504     rc = pthread_create(&threads[t], &attr, t_blosc_d, (void *)&params[t]); 
    505     if (rc){ 
    506       fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc); 
    507       exit(-1); 
     503    if (NUM_THREADS > 1) { 
     504      rc = pthread_create(&threads[t], &attr, t_blosc_d, (void *)&params[t]); 
     505      if (rc){ 
     506        fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc); 
     507        fprintf(stderr, "\tError detail: %s\n", strerror(rc)); 
     508        exit(-1); 
     509      } 
     510    } 
     511    else { 
     512      t_blosc_d((void *)&params[t]); 
    508513    } 
    509514  } 
     
    511516  /* Wait for threads to finish (join threads) */ 
    512517  pthread_attr_destroy(&attr);    /* free attribute */ 
    513   for(t=0; t < NUM_THREADS; t++){ 
     518  if (NUM_THREADS > 1) { 
     519  for(t=0; t < nthreads; t++){ 
    514520    rc = pthread_join(threads[t], &status); 
    515521    if (rc) { 
    516522      fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc); 
     523      fprintf(stderr, "\tError detail: %s\n", strerror(rc)); 
    517524      exit(-1); 
    518525    } 
    519526    cbytes = ((struct thread_data_d *)status)->cbytes; 
    520     //printf("Thread %d.  cbytes-->%d\n", t, cbytes); 
    521527    if (cbytes < 0) { 
    522       ntbytes = cbytes;          /* _blosc_d failure */ 
     528      nbytes = cbytes;          /* _blosc_d failure */ 
    523529      goto out; 
    524530    } 
    525531    ntbytes += cbytes;          /* update decompressed bytes */ 
    526532  } 
     533  } 
    527534 
    528535 out: 
    529   for(t=0; t<NUM_THREADS; t++){ 
     536  for(t=0; t < nthreads; t++){ 
    530537#ifdef _WIN32 
    531538    _aligned_free(tmp[t]); 
     
    537544  } 
    538545 
    539   //printf("ntbytes, ctbytes-->%d, %d\n", ntbytes, ctbytes); 
    540   assert(ntbytes+bstarts[0] == ctbytes); 
     546  /* assert(ntbytes+bstarts[0] == ctbytes); */ 
    541547  return nbytes; 
    542548} 
Note: See TracChangeset for help on using the changeset viewer.