Changeset 51
- Timestamp:
- 03/30/10 06:35:57 (3 years ago)
- Location:
- branches/threaded/src
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/threaded/src/blosc.c
r49 r51 14 14 #include <sys/types.h> 15 15 #include <sys/stat.h> 16 #include <assert.h> 16 17 #include "blosc.h" 17 18 #include "blosclz.h" … … 22 23 #include <stdint.h> 23 24 #include <unistd.h> 25 #include <pthread.h> 24 26 #endif /* _WIN32 */ 25 27 … … 34 36 #define MAXSPLITS 16 /* Cannot be larger than 128 */ 35 37 38 /* The number of threads */ 39 #define NUM_THREADS 2 36 40 37 41 /* Global variables for compressing/shuffling actions */ … … 120 124 unsigned char *_dest=NULL; /* alias for destination buffer */ 121 125 unsigned char *flags; /* flags for header */ 122 unsigned int * starts;/* start pointers for each block */126 unsigned int *bstarts; /* start pointers for each block */ 123 127 size_t nblocks; /* number of complete blocks in buffer */ 124 128 size_t tblocks; /* number of total blocks in buffer */ … … 202 206 _dest += sizeof(int)*3; 203 207 ctbytes += sizeof(int)*3; 204 starts = (unsigned int *)_dest;/* starts for every block */208 bstarts = (unsigned int *)_dest; /* starts for every block */ 205 209 _dest += sizeof(int)*tblocks; /* book space for pointers to */ 206 210 ctbytes += sizeof(int)*tblocks; /* every block in output */ … … 212 216 213 217 for (j = 0; j < tblocks; j++) { 214 starts[j] = ctbytes;218 bstarts[j] = ctbytes; 215 219 bsize = blocksize; 216 220 leftoverblock = 0; … … 318 322 319 323 return ctbytes; 324 } 325 326 327 /* Structure for parameters in decompression routines */ 328 struct thread_data_d{ 329 size_t typesize; 330 size_t blocksize; 331 int thread_id; 332 int dounshuffle; 333 int cbytes; 334 unsigned int tblocks; 335 unsigned int thblocks; 336 unsigned int leftover; 337 unsigned int *bstarts; /* start pointers for each block */ 338 unsigned char *src; 339 unsigned char *dest; 340 unsigned char *tmp; 341 unsigned char *tmp2; 342 }; 343 344 345 /* Decompress & unshuffle several blocks in a single thread */ 346 void *t_blosc_d(void *params) 347 { 348 /* Parameters */ 349 struct thread_data_d *my_params = (struct thread_data_d *)params; 350 int thread_id = my_params->thread_id; 351 int dounshuffle = my_params->dounshuffle; 352 size_t typesize = my_params->typesize; 353 size_t blocksize = my_params->blocksize; 354 unsigned int tblocks = my_params->tblocks; 355 unsigned int thblocks = my_params->thblocks; 356 unsigned int *bstarts = my_params->bstarts; 357 unsigned int leftover = my_params->leftover; 358 unsigned char *src = my_params->src; 359 unsigned char *dest = my_params->dest; 360 unsigned char *tmp = my_params->tmp; 361 unsigned char *tmp2 = my_params->tmp2; 362 /* Other local vars */ 363 int j, cbytes, ntbytes = 0; 364 unsigned int bsize = blocksize; 365 unsigned int leftoverblock = 0; 366 367 for (j = 0; j < thblocks; j++) { 368 if (((thread_id*thblocks+j) == (tblocks - 1)) && (leftover > 0)) { 369 bsize = leftover; 370 leftoverblock = 1; 371 } 372 /* printf("dounshuffle, typesize, bsize, leftoverblock-->%d,%d,%d,%d\n", */ 373 /* dounshuffle, typesize, bsize, leftoverblock); */ 374 /* printf("src, dest, tmp, tmp2, thread-->%p,%p,%p,%p,%d,%d\n", */ 375 /* src+bstarts[j], dest+j*blocksize, tmp, tmp2, thread_id, j); */ 376 cbytes = _blosc_d(dounshuffle, typesize, bsize, leftoverblock, 377 src+bstarts[j], dest+j*blocksize, tmp, tmp2); 378 //printf("cbytes2-->%d\n", cbytes); 379 if (cbytes < 0) { 380 //printf("Ei!!"); 381 ntbytes = cbytes; /* _blosc_d failure */ 382 break; 383 //goto out; 384 } 385 ntbytes += cbytes; 386 /* Break if we have reached the end of the blocks */ 387 if ((thread_id*thblocks+j) == (tblocks - 1)) { 388 break; 389 } 390 } 391 392 out: 393 //printf("ntbytes-->%d\n", ntbytes); 394 my_params->cbytes = ntbytes; 395 pthread_exit((void *)my_params); 320 396 } 321 397 … … 331 407 size_t nblocks; /* number of complete blocks in buffer */ 332 408 size_t tblocks; /* number of total blocks in buffer */ 333 size_t j; 334 size_t nbytes, ntbytes = 0; 335 int cbytes; 336 unsigned char *tmp, *tmp2; 409 size_t thblocks; /* number of total blocks per thread */ 410 unsigned int *bstarts; /* start pointers for each block */ 411 size_t nblocks2, leftover2; 412 size_t t; 413 int nbytes, cbytes, ntbytes = 0; 337 414 int dounshuffle = 0; 338 unsigned int typesize, blocksize, bsize, ctbytes_; 339 int leftoverblock; /* left over block? */ 415 unsigned int typesize, blocksize, ctbytes; 416 pthread_t threads[NUM_THREADS]; 417 unsigned char *tmp[NUM_THREADS], *tmp2[NUM_THREADS]; 418 struct thread_data_d params[NUM_THREADS]; 419 int rc; 420 void *status; 421 pthread_attr_t attr; 340 422 341 423 _src = (unsigned char *)(src); … … 350 432 nbytes = ((unsigned int *)_src)[0]; /* chunk size */ 351 433 blocksize = ((unsigned int *)_src)[1]; /* block size */ 352 ctbytes _ = ((unsigned int *)_src)[2];/* compressed chunk size */434 ctbytes = ((unsigned int *)_src)[2]; /* compressed chunk size */ 353 435 _src += sizeof(int)*3; 354 436 /* Compute some params */ 437 /* Total blocks */ 355 438 nblocks = nbytes / blocksize; 356 439 leftover = nbytes % blocksize; 357 440 tblocks = (leftover>0)? nblocks+1: nblocks; 358 _src += sizeof(int)*tblocks; /* skip starts of blocks */ 441 //printf("tblocks-->%d\n", tblocks); 442 /* Blocks per thread */ 443 nblocks2 = tblocks / NUM_THREADS; 444 leftover2 = tblocks % NUM_THREADS; 445 thblocks = (leftover2>0)? nblocks2+1: nblocks2; 446 //printf("thblocks-->%d\n", thblocks); 447 bstarts = (unsigned int *)_src; 448 _src += sizeof(int)*tblocks; 359 449 360 450 /* Check zero typesizes */ … … 364 454 365 455 if (nbytes > dest_size) { 366 /* This should never happen , but just in case.*/456 /* This should never happen but just in case */ 367 457 return -1; 368 458 } … … 373 463 } 374 464 375 /* Create temporary area */ 465 /* Create temporary area for each thread */ 466 for(t=0; t<NUM_THREADS; t++){ 376 467 #ifdef _WIN32 377 tmp= (unsigned char*)_aligned_malloc(blocksize, 16);378 tmp2= (unsigned char*)_aligned_malloc(blocksize, 16);468 tmp[t] = (unsigned char*)_aligned_malloc(blocksize, 16); 469 tmp2[t] = (unsigned char*)_aligned_malloc(blocksize, 16); 379 470 #elif defined __APPLE__ 380 /* Mac OS X guarantees 16-byte alignment in small allocs */381 tmp = (unsigned char *)(malloc(blocksize));382 tmp2 = (unsigned char *)(malloc(blocksize));471 /* Mac OS X guarantees 16-byte alignment in small allocs */ 472 tmp[t] = (unsigned char *)malloc(blocksize); 473 tmp2[t] = (unsigned char *)malloc(blocksize); 383 474 #else 384 posix_memalign((void **)&tmp, 16, blocksize);385 posix_memalign((void **)&tmp2, 16, blocksize);475 posix_memalign((void **)&tmp[t], 16, blocksize); 476 posix_memalign((void **)&tmp2[t], 16, blocksize); 386 477 #endif /* _WIN32 */ 387 388 for (j = 0; j < tblocks; j++) { 389 bsize = blocksize; 390 leftoverblock = 0; 391 if ((j == tblocks - 1) && (leftover > 0)) { 392 bsize = leftover; 393 leftoverblock = 1; 394 } 395 cbytes = _blosc_d(dounshuffle, typesize, bsize, leftoverblock, 396 _src, _dest, tmp, tmp2); 478 } 479 480 /* Initialize and set thread detached attribute */ 481 pthread_attr_init(&attr); 482 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 483 484 for(t=0; t < NUM_THREADS; t++){ 485 params[t].thread_id = t; 486 params[t].dounshuffle = dounshuffle; 487 params[t].typesize = typesize; 488 params[t].blocksize = blocksize; 489 params[t].tblocks = tblocks; 490 params[t].thblocks = thblocks; 491 params[t].bstarts = bstarts + t*thblocks; 492 if (t == (NUM_THREADS - 1) && leftover > 0) { 493 params[t].leftover = leftover; 494 } 495 else { 496 params[t].leftover = 0; 497 } 498 //printf("Thread: %d\tbstarts: %d\n", t, bstarts[t*thblocks]); 499 params[t].src = (unsigned char *)src; // + bstarts[t*thblocks]; 500 params[t].dest = (unsigned char *)dest + t*thblocks*blocksize; 501 params[t].tmp = tmp[t]; 502 params[t].tmp2 = tmp2[t]; 503 //printf("In main: creating thread %ld\n", t); 504 rc = pthread_create(&threads[t], &attr, t_blosc_d, (void *)¶ms[t]); 505 if (rc){ 506 fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc); 507 exit(-1); 508 } 509 } 510 511 /* Wait for threads to finish (join threads) */ 512 pthread_attr_destroy(&attr); /* free attribute */ 513 for(t=0; t < NUM_THREADS; t++){ 514 rc = pthread_join(threads[t], &status); 515 if (rc) { 516 fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc); 517 exit(-1); 518 } 519 cbytes = ((struct thread_data_d *)status)->cbytes; 520 //printf("Thread %d. cbytes-->%d\n", t, cbytes); 397 521 if (cbytes < 0) { 398 522 ntbytes = cbytes; /* _blosc_d failure */ 399 523 goto out; 400 524 } 401 _src += cbytes; 402 _dest += blocksize; 403 ntbytes += blocksize; 525 ntbytes += cbytes; /* update decompressed bytes */ 404 526 } 405 527 406 528 out: 529 for(t=0; t<NUM_THREADS; t++){ 407 530 #ifdef _WIN32 408 _aligned_free(tmp);409 _aligned_free(tmp2);531 _aligned_free(tmp[t]); 532 _aligned_free(tmp2[t]); 410 533 #else 411 free(tmp);412 free(tmp2);534 free(tmp[t]); 535 free(tmp2[t]); 413 536 #endif /* _WIN32 */ 537 } 538 539 //printf("ntbytes, ctbytes-->%d, %d\n", ntbytes, ctbytes); 540 assert(ntbytes+bstarts[0] == ctbytes); 414 541 return nbytes; 415 542 } 416 417 -
branches/threaded/src/blosc.h
r49 r51 13 13 /* Version numbers */ 14 14 #define BLOSC_VERSION_MAJOR 0 /* for major interface/format changes */ 15 #define BLOSC_VERSION_MINOR 8/* for minor interface/format changes */16 #define BLOSC_VERSION_RELEASE 2/* for tweaks, bug-fixes, or development */15 #define BLOSC_VERSION_MINOR 9 /* for minor interface/format changes */ 16 #define BLOSC_VERSION_RELEASE 0 /* for tweaks, bug-fixes, or development */ 17 17 18 #define BLOSC_VERSION_STRING "0. 8.2" /* string version. Sync with above! */18 #define BLOSC_VERSION_STRING "0.9.0.dev" /* string version. Sync with above! */ 19 19 #define BLOSC_VERSION_DATE "2010-03-30" /* date version */ 20 20
Note: See TracChangeset
for help on using the changeset viewer.
![(please configure the [header_logo] section in trac.ini)](/images/blosc-logo-small.png)