Changeset 54
- Timestamp:
- 03/30/10 14:36:01 (3 years ago)
- Location:
- branches/threaded
- Files:
-
- 1 added
- 2 edited
-
README-threaded.txt (added)
-
src/bench.c (modified) (5 diffs)
-
src/blosc.c (modified) (10 diffs)
Legend:
- Unmodified
- Added
- Removed
-
branches/threaded/src/bench.c
r49 r54 36 36 37 37 #define NITER (20*1000) /* Number of iterations */ 38 //#define NITER (1) /* Number of iterations */39 38 40 39 … … 109 108 int *_srccpy; 110 109 int rshift = 22; /* For random data */ 110 //int rshift = 20; /* For random data */ 111 111 int clevel; 112 112 int doshuffle = 1; /* Shuffle? */ … … 126 126 127 127 /* Initialize the original buffer */ 128 /* _src = (int *)src; */ 129 /* _srccpy = (int *)srccpy; */ 130 /* elsize = sizeof(int); */ 131 /* for(i = 0; i < size/elsize; ++i) { */ 132 /* /\* Choose one below *\/ */ 133 /* /\* _src[i] = 1; *\/ */ 134 /* /\* _src[i] = 0x01010101; *\/ */ 135 /* /\* _src[i] = 0x01020304; *\/ */ 136 /* /\* _src[i] = i * 1/.3; *\/ */ 137 /* /\* _src[i] = i; *\/ */ 138 /* _src[i] = rand() >> rshift; */ 128 _src = (int *)src; 129 _srccpy = (int *)srccpy; 130 elsize = sizeof(int); 131 for(i = 0; i < size/elsize; ++i) { 132 /* Choose one below */ 133 /* _src[i] = 1; */ 134 /* _src[i] = 0x01010101; */ 135 /* _src[i] = 0x01020304; */ 136 /* _src[i] = i * 1/.3; */ 137 /* _src[i] = i; */ 138 _src[i] = rand() >> rshift; 139 } 140 141 /* For data coming from a file */ 142 /* fd = open(filename, 0); */ 143 /* status = read(fd, src, size); */ 144 /* if (status == -1) { */ 145 /* perror(NULL); */ 139 146 /* } */ 140 141 /* For data coming from a file */ 142 fd = open(filename, 0); 143 status = read(fd, src, size); 144 if (status == -1) { 145 perror(NULL); 146 } 147 close(fd); 147 /* close(fd); */ 148 148 149 149 printf("********************** Setup info *****************************\n"); … … 166 166 167 167 for (clevel=1; clevel<10; clevel++) { 168 //for (clevel=9; clevel<10; clevel++) {169 168 170 169 printf("Compression level: %d\n", clevel); … … 200 199 printf("FAILED. Error code: %d\n", nbytes); 201 200 } 202 printf("Orig bytes: %d\tFinal bytes: %d\n", cbytes, nbytes);201 /* printf("Orig bytes: %d\tFinal bytes: %d\n", cbytes, nbytes); */ 203 202 204 203 /* Check if data has had a good roundtrip */ -
branches/threaded/src/blosc.c
r51 r54 370 370 leftoverblock = 1; 371 371 } 372 /* printf("dounshuffle, typesize, bsize, leftoverblock-->%d,%d,%d,%d\n", */373 /* dounshuffle, typesize, bsize, leftoverblock); */374 /* printf("src, dest, tmp, tmp2, thread-->%p,%p,%p,%p,%d,%d\n", */375 /* src+bstarts[j], dest+j*blocksize, tmp, tmp2, thread_id, j); */376 372 cbytes = _blosc_d(dounshuffle, typesize, bsize, leftoverblock, 377 373 src+bstarts[j], dest+j*blocksize, tmp, tmp2); 378 //printf("cbytes2-->%d\n", cbytes);379 374 if (cbytes < 0) { 380 //printf("Ei!!");381 375 ntbytes = cbytes; /* _blosc_d failure */ 382 break; 383 //goto out; 376 goto out; 384 377 } 385 378 ntbytes += cbytes; 386 379 /* Break if we have reached the end of the blocks */ 387 380 if ((thread_id*thblocks+j) == (tblocks - 1)) { 388 break;381 goto out; 389 382 } 390 383 } 391 384 392 385 out: 393 //printf("ntbytes-->%d\n", ntbytes);394 386 my_params->cbytes = ntbytes; 395 pthread_exit((void *)my_params); 387 if (NUM_THREADS > 1) { 388 pthread_exit((void *)my_params); 389 } 390 else { 391 return (void *)my_params; 392 } 396 393 } 397 394 … … 405 402 unsigned char flags; /* flags for header */ 406 403 size_t leftover; /* extra bytes at end of buffer */ 407 size_t nblocks; /* number of complete blocks in buffer */408 404 size_t tblocks; /* number of total blocks in buffer */ 409 405 size_t thblocks; /* number of total blocks per thread */ 410 406 unsigned int *bstarts; /* start pointers for each block */ 411 size_t nblocks2, leftover2;407 size_t leftover2, leftover3; 412 408 size_t t; 413 409 int nbytes, cbytes, ntbytes = 0; … … 420 416 void *status; 421 417 pthread_attr_t attr; 418 int nthreads = NUM_THREADS; 422 419 423 420 _src = (unsigned char *)(src); … … 436 433 /* Compute some params */ 437 434 /* Total blocks */ 438 nblocks = nbytes / blocksize;435 tblocks = nbytes / blocksize; 439 436 leftover = nbytes % blocksize; 440 tblocks = (leftover>0)? nblocks+1: nblocks; 441 //printf("tblocks-->%d\n", tblocks); 437 tblocks = (leftover>0)? tblocks+1: tblocks; 442 438 /* Blocks per thread */ 443 nblocks2 = tblocks / NUM_THREADS; 444 leftover2 = tblocks % NUM_THREADS; 445 thblocks = (leftover2>0)? nblocks2+1: nblocks2; 446 //printf("thblocks-->%d\n", thblocks); 439 thblocks = tblocks / nthreads; 440 leftover2 = tblocks % nthreads; 441 thblocks = (leftover2>0)? thblocks+1: thblocks; 442 /* Final correction for the number of threads */ 443 if (nthreads > (tblocks / thblocks)) { 444 nthreads = tblocks / thblocks; 445 leftover3 = tblocks % thblocks; 446 nthreads = (leftover3>0)? nthreads+1: nthreads; 447 } 447 448 bstarts = (unsigned int *)_src; 448 449 _src += sizeof(int)*tblocks; … … 464 465 465 466 /* Create temporary area for each thread */ 466 for(t=0; t< NUM_THREADS; t++){467 for(t=0; t<nthreads; t++){ 467 468 #ifdef _WIN32 468 469 tmp[t] = (unsigned char*)_aligned_malloc(blocksize, 16); … … 482 483 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 483 484 484 for(t=0; t < NUM_THREADS; t++){485 for(t=0; t < nthreads; t++){ 485 486 params[t].thread_id = t; 486 487 params[t].dounshuffle = dounshuffle; … … 490 491 params[t].thblocks = thblocks; 491 492 params[t].bstarts = bstarts + t*thblocks; 492 if (t == ( NUM_THREADS- 1) && leftover > 0) {493 if (t == (nthreads - 1) && leftover > 0) { 493 494 params[t].leftover = leftover; 494 495 } … … 496 497 params[t].leftover = 0; 497 498 } 498 //printf("Thread: %d\tbstarts: %d\n", t, bstarts[t*thblocks]); 499 params[t].src = (unsigned char *)src; // + bstarts[t*thblocks]; 499 params[t].src = (unsigned char *)src; 500 500 params[t].dest = (unsigned char *)dest + t*thblocks*blocksize; 501 501 params[t].tmp = tmp[t]; 502 502 params[t].tmp2 = tmp2[t]; 503 //printf("In main: creating thread %ld\n", t); 504 rc = pthread_create(&threads[t], &attr, t_blosc_d, (void *)¶ms[t]); 505 if (rc){ 506 fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc); 507 exit(-1); 503 if (NUM_THREADS > 1) { 504 rc = pthread_create(&threads[t], &attr, t_blosc_d, (void *)¶ms[t]); 505 if (rc){ 506 fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc); 507 fprintf(stderr, "\tError detail: %s\n", strerror(rc)); 508 exit(-1); 509 } 510 } 511 else { 512 t_blosc_d((void *)¶ms[t]); 508 513 } 509 514 } … … 511 516 /* Wait for threads to finish (join threads) */ 512 517 pthread_attr_destroy(&attr); /* free attribute */ 513 for(t=0; t < NUM_THREADS; t++){ 518 if (NUM_THREADS > 1) { 519 for(t=0; t < nthreads; t++){ 514 520 rc = pthread_join(threads[t], &status); 515 521 if (rc) { 516 522 fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc); 523 fprintf(stderr, "\tError detail: %s\n", strerror(rc)); 517 524 exit(-1); 518 525 } 519 526 cbytes = ((struct thread_data_d *)status)->cbytes; 520 //printf("Thread %d. cbytes-->%d\n", t, cbytes);521 527 if (cbytes < 0) { 522 n tbytes = cbytes; /* _blosc_d failure */528 nbytes = cbytes; /* _blosc_d failure */ 523 529 goto out; 524 530 } 525 531 ntbytes += cbytes; /* update decompressed bytes */ 526 532 } 533 } 527 534 528 535 out: 529 for(t=0; t <NUM_THREADS; t++){536 for(t=0; t < nthreads; t++){ 530 537 #ifdef _WIN32 531 538 _aligned_free(tmp[t]); … … 537 544 } 538 545 539 //printf("ntbytes, ctbytes-->%d, %d\n", ntbytes, ctbytes); 540 assert(ntbytes+bstarts[0] == ctbytes); 546 /* assert(ntbytes+bstarts[0] == ctbytes); */ 541 547 return nbytes; 542 548 }
Note: See TracChangeset
for help on using the changeset viewer.
![(please configure the [header_logo] section in trac.ini)](/images/blosc-logo-small.png)