Changeset 59
- Timestamp:
- 04/26/10 14:03:24 (3 years ago)
- Location:
- branches/threaded/src
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/threaded/src/bench.c
r54 r59 35 35 #define MB (1024*1024) 36 36 37 #define NITER (20*1000) /* Number of iterations */ 37 #define NCHUNKS (100) 38 #define NITER (10) /* Number of iterations */ 38 39 39 40 … … 95 96 sec = current.tv_sec - last.tv_sec; 96 97 usec = current.tv_usec - last.tv_usec; 97 return (float)(((double)sec + usec*1e-6)/(double)NITER*1e6); 98 } 99 100 101 int main(void) { 102 int nbytes, cbytes; 103 void *src, *srccpy, *dest, *dest2; 104 size_t i; 105 struct timeval last, current; 106 float tmemcpy, tshuf, tunshuf; 107 int *_src; 108 int *_srccpy; 109 int rshift = 22; /* For random data */ 110 //int rshift = 20; /* For random data */ 111 int clevel; 112 int doshuffle = 1; /* Shuffle? */ 113 int fd; 114 int status; 115 char *filename = "25Kelem-4B-typesize.data"; 116 unsigned int size = 100*1000; /* Buffer size */ 117 unsigned int elsize = 4; /* Datatype size */ 118 unsigned char *orig, *round; 119 120 src = malloc(size); 121 srccpy = malloc(size); 122 dest = malloc(size); 123 dest2 = malloc(size); 124 98 return (float)(((double)sec + usec*1e-6)/((double)NITER*NCHUNKS)*1e6); 99 } 100 101 102 int get_value(int i, int rshift) { 103 int v; 104 105 v = (i<<26)^(i<<18)^(i<<11)^(i<<3)^i; 106 v &= (1 << (32-rshift)) - 1; 107 return v; 108 } 109 110 111 void init_buffer(void *src, int size, int rshift) { 112 int i, *_src = (int *)src; 113 114 /* To have reproducible results */ 125 115 srand(1); 126 116 127 117 /* Initialize the original buffer */ 128 _src = (int *)src; 129 _srccpy = (int *)srccpy; 130 elsize = sizeof(int); 131 for(i = 0; i < size/elsize; ++i) { 118 for (i = 0; i < size/sizeof(int); ++i) { 132 119 /* Choose one below */ 133 120 /* _src[i] = 1; */ … … 136 123 /* _src[i] = i * 1/.3; */ 137 124 /* _src[i] = i; */ 138 _src[i] = rand() >> rshift; 139 } 140 141 /* For data coming from a file */ 142 /* fd = open(filename, 0); */ 143 /* status = read(fd, src, size); */ 144 /* if (status == -1) { */ 145 /* perror(NULL); */ 146 /* } */ 147 /* close(fd); */ 125 //_src[i] = rand() >> rshift; 126 _src[i] = get_value(i, rshift); 127 } 128 } 129 130 131 int main(void) { 132 int nbytes, cbytes; 133 void *src, *srccpy; 134 void **dest[NCHUNKS], *dest2; 135 size_t i, j; 136 struct timeval last, current; 137 float tmemcpy, tshuf, tunshuf; 138 int *_src; 139 int *_srccpy; 140 int rshift = 12; /* For random data */ 141 int clevel; 142 int doshuffle = 1; /* Shuffle? */ 143 int fd; 144 int status; 145 unsigned int size = 1024*1024; /* Buffer size */ 146 unsigned int elsize = 8; /* Datatype size */ 147 unsigned char *orig, *round; 148 149 /* The number of threads */ 150 blosc_init_threads(4); 151 152 src = malloc(size); 153 srccpy = malloc(size); 154 dest2 = malloc(size); 155 156 /* Initialize buffers */ 157 init_buffer(src, size, rshift); 158 memcpy(srccpy, src, size); 159 for (j = 0; j < NCHUNKS; j++) { 160 dest[j] = malloc(size); 161 } 148 162 149 163 printf("********************** Setup info *****************************\n"); 150 164 printf("Blosc version: %s (%s)\n", BLOSC_VERSION_STRING, BLOSC_VERSION_DATE); 151 /* printf("Using random data with %d significant bits (out of 32)\n", 32-rshift); */ 152 printf("Using data coming from file: %s\n", filename); 165 printf("Using random data with %d significant bits (out of 32)\n", 32-rshift); 153 166 printf("Dataset size: %d bytes\t Type size: %d bytes\n", size, elsize); 154 167 printf("Shuffle active? %s\n", doshuffle ? "Yes" : "No"); 155 168 printf("********************** Running benchmarks *********************\n"); 156 169 157 memcpy(srccpy, src, size);158 159 170 gettimeofday(&last, NULL); 160 171 for (i = 0; i < NITER; i++) { 161 memcpy(srccpy, src, size); 172 for (j = 0; j < NCHUNKS; j++) { 173 memcpy(dest[j], src, size); 174 } 162 175 } 163 176 gettimeofday(¤t, NULL); 164 177 tmemcpy = getseconds(last, current); 165 printf("memcpy:\t\t %6.1f us, %.1f MB/s\n", tmemcpy, size/(tmemcpy*MB/1e6)); 178 printf("memcpy(write):\t\t %6.1f us, %.1f MB/s\n", 179 tmemcpy, size/(tmemcpy*MB/1e6)); 180 181 gettimeofday(&last, NULL); 182 for (i = 0; i < NITER; i++) { 183 for (j = 0; j < NCHUNKS; j++) { 184 memcpy(dest2, dest[j], size); 185 } 186 } 187 gettimeofday(¤t, NULL); 188 tmemcpy = getseconds(last, current); 189 printf("memcpy(read):\t\t %6.1f us, %.1f MB/s\n", 190 tmemcpy, size/(tmemcpy*MB/1e6)); 166 191 167 192 for (clevel=1; clevel<10; clevel++) { … … 171 196 gettimeofday(&last, NULL); 172 197 for (i = 0; i < NITER; i++) { 173 cbytes = blosc_compress(clevel, doshuffle, elsize, size, src, dest); 198 for (j = 0; j < NCHUNKS; j++) { 199 cbytes = blosc_compress(clevel, doshuffle, elsize, size, src, dest[j]); 200 } 174 201 } 175 202 gettimeofday(¤t, NULL); 176 203 tshuf = getseconds(last, current); 177 printf("compression :\t %6.1f us, %.1f MB/s\t ",204 printf("compression(write):\t %6.1f us, %.1f MB/s\t ", 178 205 tshuf, size/(tshuf*MB/1e6)); 179 206 printf("Final bytes: %d ", cbytes); … … 183 210 printf("\n"); 184 211 212 /* Compressor was unable to compress. Copy the buffer manually. */ 213 if (cbytes == 0) { 214 for (j = 0; j < NCHUNKS; j++) { 215 memcpy(dest[j], src, size); 216 } 217 } 218 185 219 gettimeofday(&last, NULL); 186 for (i = 0; i < NITER; i++) 187 if (cbytes == 0) { 188 memcpy(dest2, src, size); 189 nbytes = size; 190 } 191 else { 192 nbytes = blosc_decompress(dest, dest2, size); 193 } 220 for (i = 0; i < NITER; i++) { 221 for (j = 0; j < NCHUNKS; j++) { 222 if (cbytes == 0) { 223 memcpy(dest2, dest[j], size); 224 nbytes = size; 225 } 226 else { 227 nbytes = blosc_decompress(dest[j], dest2, size); 228 } 229 } 230 } 194 231 gettimeofday(¤t, NULL); 195 232 tunshuf = getseconds(last, current); 196 printf("decompression :\t %6.1f us, %.1f MB/s\t ",233 printf("decompression(read):\t %6.1f us, %.1f MB/s\t ", 197 234 tunshuf, nbytes/(tunshuf*MB/1e6)); 198 235 if (nbytes < 0) { … … 218 255 219 256 out: 220 free(src); free(srccpy); free(dest); free(dest2); 257 free(src); free(srccpy); free(dest2); 258 for (i = 0; i < NITER; i++) { 259 free(dest[i]); 260 } 221 261 return 0; 222 262 } -
branches/threaded/src/blosc.c
r55 r59 37 37 38 38 /* The number of threads */ 39 #define NUM_THREADS 4 40 41 /* Global variables for compressing/shuffling actions */ 42 int clevel; /* Compression level */ 43 int do_shuffle; /* Shuffle is active? */ 44 39 #define MAX_THREADS 64 40 41 42 /* Global variables for threads */ 43 int nthreads; /* number of desired threads */ 44 int giveup; /* should (de-)compression give up? */ 45 int init_done = 0; /* init threads has been done yet? */ 46 int nblock = -1; /* block counter */ 47 pthread_t threads[MAX_THREADS]; /* opaque structure for threads */ 48 int tids[MAX_THREADS]; /* ID per each threads */ 49 pthread_attr_t ct_attr; /* creation time attributes for threads */ 50 51 /* Syncronization variables */ 52 pthread_mutex_t count_mutex; 53 pthread_barrier_t barr_init; 54 pthread_barrier_t barr_inter; 55 pthread_barrier_t barr_finish; 56 57 /* Structure for parameters in (de-)compression threads */ 58 struct thread_data { 59 size_t typesize; 60 size_t blocksize; 61 int compress; 62 int clevel; 63 int shuffle; 64 int ntbytes; 65 unsigned int nbytes; 66 unsigned int nblocks; 67 unsigned int leftover; 68 unsigned int *bstarts; /* start pointers for each block */ 69 unsigned char *src; 70 unsigned char *dest; 71 unsigned char *tmp[MAX_THREADS]; 72 unsigned char *tmp2[MAX_THREADS]; 73 } params; 74 75 76 /* Convenience functions for creating and releasing temporaries */ 77 void 78 create_temporaries(void) 79 { 80 int tid; 81 size_t blocksize = params.blocksize; 82 /* Extended blocksize for temporary destination. Extended blocksize 83 is only useful for compression in parallel mode, but it doesn't 84 hurt other modes either. */ 85 size_t ebsize = blocksize + params.typesize*sizeof(int); 86 unsigned char *tmp, *tmp2; 87 88 /* Create temporary area for each thread */ 89 for (tid = 0; tid < nthreads; tid++) { 90 #ifdef _WIN32 91 tmp = (unsigned char *)_aligned_malloc(blocksize, 16); 92 tmp2 = (unsigned char *)_aligned_malloc(ebsize, 16); 93 #elif defined __APPLE__ 94 /* Mac OS X guarantees 16-byte alignment in small allocs */ 95 tmp = (unsigned char *)malloc(blocksize); 96 tmp2 = (unsigned char *)malloc(ebsize); 97 #else 98 posix_memalign((void **)&tmp, 16, blocksize); 99 posix_memalign((void **)&tmp2, 16, ebsize); 100 #endif /* _WIN32 */ 101 params.tmp[tid] = tmp; 102 params.tmp2[tid] = tmp2; 103 } 104 } 105 106 107 void 108 release_temporaries(void) 109 { 110 int tid; 111 unsigned char *tmp, *tmp2; 112 113 /* Release buffers */ 114 for (tid = 0; tid < nthreads; tid++) { 115 tmp = params.tmp[tid]; 116 tmp2 = params.tmp2[tid]; 117 #ifdef _WIN32 118 _aligned_free(tmp); 119 _aligned_free(tmp2); 120 #else 121 free(tmp); 122 free(tmp2); 123 #endif /* _WIN32 */ 124 } 125 } 45 126 46 127 47 128 /* Shuffle & Compress a single block */ 48 static size_t49 _blosc_c(int clevel, int doshuffle, size_t typesize, size_t blocksize,50 int leftoverblock, unsigned int ctbytes, unsigned int nbytes,51 unsigned char* _src, unsigned char* _dest, unsigned char *tmp)129 static int 130 blosc_c(size_t blocksize, int leftoverblock, 131 unsigned int ntbytes, unsigned int nbytes, 132 unsigned char *src, unsigned char *dest, unsigned char *tmp) 52 133 { 53 134 size_t j, neblock, nsplits; 54 int cbytes, maxout; 55 unsigned int btbytes = 0; 56 unsigned char* _tmp; 57 58 if (doshuffle && (typesize > 1)) { 135 int cbytes; /* number of compressed bytes in split */ 136 int ctbytes = 0; /* number of compressed bytes in block */ 137 int maxout; 138 unsigned char *_tmp; 139 size_t typesize = params.typesize; 140 141 if (params.shuffle && (typesize > 1)) { 59 142 /* Shuffle this block (this makes sense only if typesize > 1) */ 60 shuffle(typesize, blocksize, _src, tmp);143 shuffle(typesize, blocksize, src, tmp); 61 144 _tmp = tmp; 62 145 } 63 146 else { 64 _tmp = _src;147 _tmp = src; 65 148 } 66 149 … … 76 159 neblock = blocksize / nsplits; 77 160 for (j = 0; j < nsplits; j++) { 78 _dest += sizeof(int);79 btbytes += sizeof(int);161 dest += sizeof(int); 162 ntbytes += sizeof(int); 80 163 ctbytes += sizeof(int); 81 164 maxout = neblock; 82 if ( ctbytes+maxout > nbytes) {83 maxout = nbytes - ctbytes; /* avoid buffer overrun */165 if (ntbytes+maxout > nbytes) { 166 maxout = nbytes - ntbytes; /* avoid buffer overrun */ 84 167 if (maxout <= 0) { 85 168 return 0; /* non-compressible block */ 86 169 } 87 170 } 88 cbytes = blosclz_compress( clevel, _tmp+j*neblock, neblock,89 _dest, maxout-1);171 cbytes = blosclz_compress(params.clevel, _tmp+j*neblock, neblock, 172 dest, maxout-1); 90 173 if (cbytes >= maxout) { 91 174 /* Buffer overrun caused by blosclz_compress (should never happen) */ … … 100 183 /* Before doing the copy, check that we are not running into a 101 184 buffer overflow. */ 102 if (( ctbytes+neblock) > nbytes) {185 if ((ntbytes+neblock) > nbytes) { 103 186 return 0; /* Non-compressible data */ 104 187 } 105 memcpy( _dest, _tmp+j*neblock, neblock);188 memcpy(dest, _tmp+j*neblock, neblock); 106 189 cbytes = neblock; 107 190 } 108 ((unsigned int *)( _dest))[-1] = cbytes;109 _dest += cbytes;110 btbytes += cbytes;191 ((unsigned int *)(dest))[-1] = cbytes; 192 dest += cbytes; 193 ntbytes += cbytes; 111 194 ctbytes += cbytes; 112 195 } /* Closes j < nsplits */ 113 196 114 return btbytes; 115 } 116 117 118 /* Structure for parameters in compression routines */ 119 struct thread_data_c{ 120 size_t typesize; 121 size_t blocksize; 122 int thread_id; 123 int clevel; 124 int doshuffle; 125 int cbytes; 126 unsigned int tblocks; 127 unsigned int thblocks; 128 unsigned int leftover; 129 unsigned char *src; 130 unsigned char *tmp; 131 unsigned char *tmp2; 132 unsigned int *thstarts; 133 }; 134 135 136 /* Compress & shuffle several blocks in a single thread */ 137 void *t_blosc_c(void *params) 138 { 139 /* Parameters */ 140 struct thread_data_c *my_params = (struct thread_data_c *)params; 141 int thread_id = my_params->thread_id; 142 int clevel = my_params->clevel; 143 int doshuffle = my_params->doshuffle; 144 size_t typesize = my_params->typesize; 145 size_t blocksize = my_params->blocksize; 146 unsigned int tblocks = my_params->tblocks; 147 unsigned int thblocks = my_params->thblocks; 148 unsigned int leftover = my_params->leftover; 149 unsigned char *src = my_params->src; 150 unsigned char *tmp = my_params->tmp; 151 unsigned char *tmp2 = my_params->tmp2; 152 unsigned int *thstarts = my_params->thstarts; 153 /* Other local vars */ 154 int j, cbytes, ntbytes = 0; 155 unsigned int bsize = blocksize; 156 unsigned int leftoverblock = 0; 157 unsigned int sstart; 158 159 160 for (j = 0; j < thblocks; j++) { 161 if (((thread_id*thblocks+j) == (tblocks - 1)) && (leftover > 0)) { 162 bsize = leftover; 163 leftoverblock = 1; 164 } 165 sstart = (thread_id*thblocks+j)*blocksize; 166 cbytes = _blosc_c(clevel, doshuffle, typesize, bsize, leftoverblock, 167 (unsigned int)ntbytes, thblocks*blocksize, 168 src+sstart, tmp2, tmp); 169 if (cbytes <= 0) { 170 ntbytes = cbytes; /* _blosc_c failure or uncompressible data */ 171 break; 172 } 173 thstarts[j] = ntbytes; 174 ntbytes += cbytes; 175 tmp2 += cbytes; 176 /* Break if we have reached the end of the blocks */ 177 if ((thread_id*thblocks+j) == (tblocks - 1)) { 178 break; 179 } 180 } 181 182 my_params->cbytes = ntbytes; 183 pthread_exit((void *)my_params); 197 return ctbytes; 184 198 } 185 199 186 200 187 201 unsigned int 188 blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,202 blosc_compress(int clevel, int shuffle, size_t typesize, size_t nbytes, 189 203 const void *src, void *dest) 190 204 { 191 205 unsigned char *_dest=NULL; /* current pos for destination buffer */ 192 206 unsigned char *flags; /* flags for header */ 207 unsigned int nblocks; /* number of total blocks in buffer */ 208 unsigned int leftover; /* extra bytes at end of buffer */ 193 209 unsigned int *bstarts; /* start pointers for each block */ 194 size_t tblocks; /* number of total blocks in buffer */195 size_t leftover; /* extra bytes at end of buffer */196 size_t thblocks; /* number of total blocks per thread */197 size_t leftover2, leftover3; /* helpers */198 210 size_t blocksize; /* length of the block in bytes */ 199 size_t bsize; /* corrected blocksize for last block */ 200 unsigned int ctbytes = 0; /* the number of bytes in output buffer */ 201 unsigned int *ctbytes_; /* placeholder for bytes in output buffer */ 202 int cbytes; /* temporary compressed buffer length */ 203 int leftoverblock; /* left over block? */ 204 unsigned int i, j; /* local index variables */ 205 char *too_long_message = "The impossible happened: buffer overflow!"; 206 size_t t; 207 pthread_t threads[NUM_THREADS]; 208 unsigned char *tmp[NUM_THREADS], *tmp2[NUM_THREADS]; 209 struct thread_data_c params[NUM_THREADS]; 210 unsigned int *thstarts[NUM_THREADS]; /* start pointers for each thread */ 211 int rc; 212 void *status; 213 pthread_attr_t attr; 214 int nthreads = NUM_THREADS; 215 int giveup = 0; /* sentinel */ 211 unsigned int ntbytes = 0; /* the number of compressed bytes */ 212 unsigned int *ntbytes_; /* placeholder for bytes in output buffer */ 213 unsigned int i; /* local index variables */ 216 214 217 215 /* Compression level */ … … 227 225 228 226 /* Shuffle */ 229 if ( doshuffle != 0 && doshuffle != 1) {230 fprintf(stderr, "` doshuffle` parameter must be either 0 or 1!\n");227 if (shuffle != 0 && shuffle != 1) { 228 fprintf(stderr, "`shuffle` parameter must be either 0 or 1!\n"); 231 229 return -10; 232 230 } … … 244 242 245 243 /* Compute number of blocks in buffer */ 246 tblocks = nbytes / blocksize;244 nblocks = nbytes / blocksize; 247 245 leftover = nbytes % blocksize; 248 tblocks = (leftover>0)? tblocks+1: tblocks; 249 /* Blocks per thread */ 250 thblocks = tblocks / nthreads; 251 leftover2 = tblocks % nthreads; 252 thblocks = (leftover2>0)? thblocks+1: thblocks; 253 /* Final correction for the number of threads */ 254 if (nthreads > (tblocks / thblocks)) { 255 nthreads = tblocks / thblocks; 256 leftover3 = tblocks % thblocks; 257 nthreads = (leftover3>0)? nthreads+1: nthreads; 258 } 246 nblocks = (leftover>0)? nblocks+1: nblocks; 259 247 260 248 /* Check typesize limits */ … … 275 263 _dest[3] = (unsigned char)typesize; /* type size */ 276 264 _dest += 4; 277 ctbytes += 4;278 ((unsigned int *)_dest)[0] = nbytes; /* size of the chunk*/265 ntbytes += 4; 266 ((unsigned int *)_dest)[0] = nbytes; /* size of the buffer */ 279 267 ((unsigned int *)_dest)[1] = blocksize; /* block size */ 280 ctbytes_ = (unsigned int *)(_dest+8); /* compressed chunk size (pointer)*/268 ntbytes_ = (unsigned int *)(_dest+8); /* compressed buffer size */ 281 269 _dest += sizeof(int)*3; 282 ctbytes += sizeof(int)*3;270 ntbytes += sizeof(int)*3; 283 271 bstarts = (unsigned int *)_dest; /* starts for every block */ 284 _dest += sizeof(int)* tblocks; /* book space for pointers to */285 ctbytes += sizeof(int)*tblocks; /* every block in output */286 287 if ( doshuffle == 1) {272 _dest += sizeof(int)*nblocks; /* book space for pointers to */ 273 ntbytes += sizeof(int)*nblocks; /* every block in output */ 274 275 if (shuffle == 1) { 288 276 /* Shuffle is active */ 289 277 *flags |= 0x1; /* bit 0 set to one in flags */ 290 278 } 291 279 292 /* Initialize and set thread detached attribute */ 293 pthread_attr_init(&attr); 294 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 295 296 for(t=0; t < nthreads; t++){ 297 /* Create temporary area for each thread */ 298 #ifdef _WIN32 299 tmp[t] = (unsigned char*)_aligned_malloc(blocksize, 16); 300 tmp2[t] = (unsigned char*)_aligned_malloc(blocksize*thblocks, 16); 301 thstarts[t] = (unsigned int*)_aligned_malloc(sizeof(int)*thblocks, 16); 302 #elif defined __APPLE__ 303 /* Mac OS X guarantees 16-byte alignment in small allocs */ 304 tmp[t] = (unsigned char *)malloc(blocksize); 305 tmp2[t] = (unsigned char *)malloc(blocksize*thblocks); 306 thstarts[t] = (unsigned int*)malloc(sizeof(int)*thblocks); 307 #else 308 rc = posix_memalign((void **)&tmp[t], 16, blocksize); 309 rc = posix_memalign((void **)&tmp2[t], 16, blocksize*thblocks); 310 rc = posix_memalign((void **)&thstarts[t], 16, sizeof(int)*thblocks); 311 #endif /* _WIN32 */ 312 /* Populate parameters for thread */ 313 params[t].thread_id = t; 314 params[t].clevel = clevel; 315 params[t].doshuffle = doshuffle; 316 params[t].typesize = typesize; 317 params[t].blocksize = blocksize; 318 params[t].tblocks = tblocks; 319 params[t].thblocks = thblocks; 320 if (t == (nthreads - 1) && leftover > 0) { 321 params[t].leftover = leftover; 322 } 323 else { 324 params[t].leftover = 0; 325 } 326 params[t].src = (unsigned char *)src; 327 params[t].tmp = tmp[t]; 328 params[t].tmp2 = tmp2[t]; 329 params[t].thstarts = thstarts[t]; 330 rc = pthread_create(&threads[t], &attr, t_blosc_c, (void *)¶ms[t]); 331 if (rc){ 332 fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc); 333 fprintf(stderr, "\tError detail: %s\n", strerror(rc)); 334 exit(-1); 335 } 336 } /* Close t < nthreads */ 337 338 /* Wait for threads to finish (join threads) */ 339 pthread_attr_destroy(&attr); /* free attribute */ 340 for(t=0; t<nthreads; t++){ 341 rc = pthread_join(threads[t], &status); 342 if (rc) { 343 fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc); 344 fprintf(stderr, "\tError detail: %s\n", strerror(rc)); 345 exit(-1); 346 } 347 if (giveup) { 348 /* We have been signaled to give up. Wait for all threads to finish. */ 349 continue; 350 } 351 cbytes = ((struct thread_data_c *)status)->cbytes; 352 /* Check result of compression */ 353 if (cbytes < 0) { 354 fprintf(stderr, "%s\n", too_long_message); 355 ctbytes = cbytes; /* error in _blosc_c */ 356 giveup = 1; 357 } 358 else if ((cbytes == 0) | (ctbytes+cbytes >= nbytes)) { 359 ctbytes = 0; /* uncompressible data */ 360 giveup = 1; 361 } 362 else { 363 /* Compression process seems right */ 364 /* Copy block starts */ 365 for (j=0; j<thblocks; j++) { 366 /* Check that we do not exceed the end of the blocks */ 367 if ((t*thblocks+j) < tblocks) { 368 bstarts[t*thblocks+j] = thstarts[t][j]+ctbytes; 369 } 370 } 371 /* Copy thread's private buffer to destination buffer */ 372 memcpy(_dest, tmp2[t], cbytes); 373 _dest += cbytes; 374 ctbytes += cbytes; /* update compressed bytes */ 375 } 376 } /* Close t < nthreads */ 377 378 /* Release buffers */ 379 for(t=0; t < nthreads; t++){ 380 #ifdef _WIN32 381 _aligned_free(tmp[t]); 382 _aligned_free(tmp2[t]); 383 _aligned_free(thstarts[t]); 384 #else 385 free(tmp[t]); 386 free(tmp2[t]); 387 free(thstarts[t]); 388 #endif /* _WIN32 */ 389 } 390 391 assert(ctbytes < nbytes); 392 *ctbytes_ = ctbytes; /* set the number of compressed bytes in header */ 393 return ctbytes; 280 /* Populate parameters for compression routines */ 281 params.compress = 1; 282 params.clevel = clevel; 283 params.shuffle = shuffle; 284 params.typesize = typesize; 285 params.blocksize = blocksize; 286 params.ntbytes = ntbytes; 287 params.nbytes = nbytes; 288 params.nblocks = nblocks; 289 params.leftover = leftover; 290 params.bstarts = bstarts; 291 params.src = (unsigned char *)src; 292 params.dest = (unsigned char *)dest; 293 294 /* Do the actual compression */ 295 ntbytes = do_job(); 296 /* Set the number of compressed bytes in header */ 297 *ntbytes_ = ntbytes; 298 299 assert(ntbytes < nbytes); 300 return ntbytes; 394 301 } 395 302 396 303 397 304 /* Decompress & unshuffle a single block */ 398 static size_t 399 _blosc_d(int dounshuffle, size_t typesize, size_t blocksize, int leftoverblock, 400 unsigned char* _src, unsigned char* _dest, 401 unsigned char *tmp, unsigned char *tmp2) 402 { 403 size_t j, neblock, nsplits; 404 size_t nbytes, cbytes, ctbytes = 0, ntbytes = 0; 405 unsigned char* _tmp; 406 407 if (dounshuffle && (typesize > 1)) { 305 static int 306 blosc_d(size_t blocksize, int leftoverblock, 307 unsigned char *src, unsigned char *dest, 308 unsigned char *tmp, unsigned char *tmp2) 309 { 310 int j, neblock, nsplits; 311 int nbytes; /* number of decompressed bytes in split */ 312 int cbytes; /* number of compressed bytes in split */ 313 int ctbytes = 0; /* number of compressed bytes in block */ 314 int ntbytes = 0; /* number of uncompressed bytes in block */ 315 unsigned char *_tmp; 316 size_t typesize = params.typesize; 317 318 if (shuffle && (typesize > 1)) { 408 319 _tmp = tmp; 409 320 } 410 321 else { 411 _tmp = _dest;322 _tmp = dest; 412 323 } 413 324 … … 422 333 neblock = blocksize / nsplits; 423 334 for (j = 0; j < nsplits; j++) { 424 cbytes = ((unsigned int *)( _src))[0]; /* amount of compressed bytes */425 _src += sizeof(int);335 cbytes = ((unsigned int *)(src))[0]; /* amount of compressed bytes */ 336 src += sizeof(int); 426 337 ctbytes += sizeof(int); 427 338 /* Uncompress */ 428 339 if (cbytes == neblock) { 429 memcpy(_tmp, _src, neblock);340 memcpy(_tmp, src, neblock); 430 341 nbytes = neblock; 431 342 } 432 343 else { 433 nbytes = blosclz_decompress( _src, cbytes, _tmp, neblock);344 nbytes = blosclz_decompress(src, cbytes, _tmp, neblock); 434 345 if (nbytes != neblock) { 435 346 return -2; 436 347 } 437 348 } 438 _src += cbytes;349 src += cbytes; 439 350 ctbytes += cbytes; 440 _tmp += n eblock;351 _tmp += nbytes; 441 352 ntbytes += nbytes; 442 353 } /* Closes j < nsplits */ 443 354 444 if ( dounshuffle && (typesize > 1)) {445 if ((uintptr_t) _dest % 16 == 0) {446 /* 16-bytes aligned _dest. SSE2 unshuffle will work. */447 unshuffle(typesize, blocksize, tmp, _dest);355 if (shuffle && (typesize > 1)) { 356 if ((uintptr_t)dest % 16 == 0) { 357 /* 16-bytes aligned dest. SSE2 unshuffle will work. */ 358 unshuffle(typesize, blocksize, tmp, dest); 448 359 } 449 360 else { 450 /* _dest is not aligned. Use tmp2, which is aligned, and copy. */361 /* dest is not aligned. Use tmp2, which is aligned, and copy. */ 451 362 unshuffle(typesize, blocksize, tmp, tmp2); 452 memcpy(_dest, tmp2, blocksize); 453 } 454 } 455 456 return ctbytes; 457 } 458 459 460 /* Structure for parameters in decompression routines */ 461 struct thread_data_d{ 462 size_t typesize; 463 size_t blocksize; 464 int thread_id; 465 int dounshuffle; 466 int cbytes; 467 unsigned int tblocks; 468 unsigned int thblocks; 469 unsigned int leftover; 470 unsigned int *bstarts; /* start pointers for each block */ 471 unsigned char *src; 472 unsigned char *dest; 473 unsigned char *tmp; 474 unsigned char *tmp2; 475 }; 476 477 478 /* Decompress & unshuffle several blocks in a single thread */ 479 void *t_blosc_d(void *params) 480 { 481 /* Parameters */ 482 struct thread_data_d *my_params = (struct thread_data_d *)params; 483 int thread_id = my_params->thread_id; 484 int dounshuffle = my_params->dounshuffle; 485 size_t typesize = my_params->typesize; 486 size_t blocksize = my_params->blocksize; 487 unsigned int tblocks = my_params->tblocks; 488 unsigned int thblocks = my_params->thblocks; 489 unsigned int *bstarts = my_params->bstarts; 490 unsigned int leftover = my_params->leftover; 491 unsigned char *src = my_params->src; 492 unsigned char *dest = my_params->dest; 493 unsigned char *tmp = my_params->tmp; 494 unsigned char *tmp2 = my_params->tmp2; 495 /* Other local vars */ 496 int j, cbytes, ntbytes = 0; 497 unsigned int bsize = blocksize; 498 unsigned int leftoverblock = 0; 499 500 for (j = 0; j < thblocks; j++) { 501 if (((thread_id*thblocks+j) == (tblocks - 1)) && (leftover > 0)) { 502 bsize = leftover; 503 leftoverblock = 1; 504 } 505 cbytes = _blosc_d(dounshuffle, typesize, bsize, leftoverblock, 506 src+bstarts[j], dest+j*blocksize, tmp, tmp2); 507 if (cbytes < 0) { 508 ntbytes = cbytes; /* _blosc_d failure */ 509 goto out; 510 } 511 ntbytes += cbytes; 512 /* Break if we have reached the end of the blocks */ 513 if ((thread_id*thblocks+j) == (tblocks - 1)) { 514 goto out; 515 } 516 } 517 518 out: 519 my_params->cbytes = ntbytes; 520 if (NUM_THREADS > 1) { 521 pthread_exit((void *)my_params); 522 } 523 else { 524 return (void *)my_params; 525 } 363 memcpy(dest, tmp2, blocksize); 364 } 365 } 366 367 /* Return the number of uncompressed bytes */ 368 return ntbytes; 526 369 } 527 370 … … 530 373 blosc_decompress(const void *src, void *dest, size_t dest_size) 531 374 { 532 unsigned char *_src=NULL; /* alias for source buffer */533 unsigned char *_dest=NULL; /* alias for destination buffer */375 unsigned char *_src=NULL; /* current pos for source buffer */ 376 unsigned char *_dest=NULL; /* current pos for destination buffer */ 534 377 unsigned char version, versionlz; /* versions for compressed header */ 535 378 unsigned char flags; /* flags for header */ 536 size_t leftover; /* extra bytes at end of buffer*/537 size_t tblocks; /* number of total blocks in buffer*/538 size_t thblocks; /* number of total blocks per thread*/539 size_t leftover2, leftover3; /* helpers*/379 int shuffle = 0; /* do unshuffle? */ 380 int ntbytes; /* the number of uncompressed bytes */ 381 unsigned int nblocks; /* number of total blocks in buffer */ 382 unsigned int leftover; /* extra bytes at end of buffer */ 540 383 unsigned int *bstarts; /* start pointers for each block */ 541 int nbytes, cbytes, ntbytes = 0; 542 int dounshuffle = 0; 543 unsigned int typesize, blocksize, ctbytes; 544 size_t t; 545 pthread_t threads[NUM_THREADS]; 546 unsigned char *tmp[NUM_THREADS], *tmp2[NUM_THREADS]; 547 struct thread_data_d params[NUM_THREADS]; 548 int rc; 549 void *status; 550 pthread_attr_t attr; 551 int nthreads = NUM_THREADS; 552 int giveup; 384 unsigned int typesize, blocksize, nbytes, ctbytes; 385 unsigned char *tmp, *tmp2; 386 int tid; 553 387 554 388 _src = (unsigned char *)(src); … … 561 395 typesize = (unsigned int)_src[3]; /* typesize */ 562 396 _src += 4; 563 nbytes = ((unsigned int *)_src)[0]; /* chunksize */397 nbytes = ((unsigned int *)_src)[0]; /* buffer size */ 564 398 blocksize = ((unsigned int *)_src)[1]; /* block size */ 565 ctbytes = ((unsigned int *)_src)[2]; /* compressed chunksize */399 ctbytes = ((unsigned int *)_src)[2]; /* compressed buffer size */ 566 400 _src += sizeof(int)*3; 401 bstarts = (unsigned int *)_src; 567 402 /* Compute some params */ 568 403 /* Total blocks */ 569 tblocks = nbytes / blocksize;404 nblocks = nbytes / blocksize; 570 405 leftover = nbytes % blocksize; 571 tblocks = (leftover>0)? tblocks+1: tblocks; 572 /* Blocks per thread */ 573 thblocks = tblocks / nthreads; 574 leftover2 = tblocks % nthreads; 575 thblocks = (leftover2>0)? thblocks+1: thblocks; 576 /* Final correction for the number of threads */ 577 if (nthreads > (tblocks / thblocks)) { 578 nthreads = tblocks / thblocks; 579 leftover3 = tblocks % thblocks; 580 nthreads = (leftover3>0)? nthreads+1: nthreads; 581 } 582 bstarts = (unsigned int *)_src; 583 _src += sizeof(int)*tblocks; 406 nblocks = (leftover>0)? nblocks+1: nblocks; 407 _src += sizeof(int)*nblocks; 584 408 585 409 /* Check zero typesizes */ … … 595 419 if ((flags & 0x1) == 1) { 596 420 /* Input is shuffled. Unshuffle it. */ 597 dounshuffle = 1; 598 } 599 600 /* Create temporary area for each thread */ 601 for(t=0; t<nthreads; t++){ 602 #ifdef _WIN32 603 tmp[t] = (unsigned char*)_aligned_malloc(blocksize, 16); 604 tmp2[t] = (unsigned char*)_aligned_malloc(blocksize, 16); 605 #elif defined __APPLE__ 606 /* Mac OS X guarantees 16-byte alignment in small allocs */ 607 tmp[t] = (unsigned char *)malloc(blocksize); 608 tmp2[t] = (unsigned char *)malloc(blocksize); 609 #else 610 rc = posix_memalign((void **)&tmp[t], 16, blocksize); 611 rc = posix_memalign((void **)&tmp2[t], 16, blocksize); 612 #endif /* _WIN32 */ 613 } 421 shuffle = 1; 422 } 423 424 /* Populate parameters for decompression routines */ 425 params.compress = 0; 426 params.clevel = 0; /* specific for compression */ 427 params.shuffle = shuffle; 428 params.typesize = typesize; 429 params.blocksize = blocksize; 430 params.ntbytes = 0; 431 params.nbytes = nbytes; 432 params.nblocks = nblocks; 433 params.leftover = leftover; 434 params.bstarts = bstarts; 435 params.src = (unsigned char *)src; 436 params.dest = (unsigned char *)dest; 437 438 /* Do the actual decompression */ 439 ntbytes = do_job(); 440 441 assert(ntbytes <= dest_size); 442 return ntbytes; 443 } 444 445 446 /* Do the compression or decompression of the buffer depending on the 447 global params. */ 448 int 449 do_job(void) { 450 int ntbytes; 451 452 create_temporaries(); 453 454 if (nthreads == 1) { 455 ntbytes = serial_blosc(); 456 } 457 else { 458 ntbytes = parallel_blosc(); 459 } 460 461 release_temporaries(); 462 463 return ntbytes; 464 } 465 466 467 /* Serial version for compression/decompression */ 468 int 469 serial_blosc(void) 470 { 471 int j, bsize, leftoverblock, cbytes; 472 int compress = params.compress; 473 int clevel = params.clevel; 474 int shuffle = params.shuffle; 475 unsigned int typesize = params.typesize; 476 unsigned int blocksize = params.blocksize; 477 int ntbytes = params.ntbytes; 478 int nbytes = params.nbytes; 479 unsigned int nblocks = params.nblocks; 480 int leftover = params.nbytes % params.blocksize; 481 unsigned int *bstarts = params.bstarts; 482 unsigned char *src = params.src; 483 unsigned char *dest = params.dest; 484 unsigned char *tmp = params.tmp[0]; /* tmp for thread 0 */ 485 unsigned char *tmp2 = params.tmp2[0]; /* tmp2 for thread 0 */ 486 487 for (j = 0; j < nblocks; j++) { 488 if (compress) { 489 bstarts[j] = ntbytes; 490 } 491 bsize = blocksize; 492 leftoverblock = 0; 493 if ((j == nblocks - 1) && (leftover > 0)) { 494 bsize = leftover; 495 leftoverblock = 1; 496 } 497 if (compress) { 498 cbytes = blosc_c(bsize, leftoverblock, ntbytes, nbytes, 499 src+j*blocksize, dest+bstarts[j], tmp); 500 if (cbytes == 0) { 501 ntbytes = 0; /* uncompressible data */ 502 break; 503 } 504 } 505 else { 506 cbytes = blosc_d(bsize, leftoverblock, 507 src+bstarts[j], dest+j*blocksize, tmp, tmp2); 508 } 509 if (cbytes < 0) { 510 ntbytes = cbytes; /* error in blosc_c / blosc_d */ 511 break; 512 } 513 ntbytes += cbytes; 514 } 515 516 /* Final check for ntbytes (only in compression mode) */ 517 if (compress) { 518 if (ntbytes == nbytes) { 519 ntbytes = 0; /* non-compressible data */ 520 } 521 else if (ntbytes > nbytes) { 522 fprintf(stderr, "The impossible happened: buffer overflow!\n"); 523 ntbytes = -5; /* too large buffer */ 524 } 525 } /* Close j < nblocks */ 526 527 return ntbytes; 528 } 529 530 531 /* Threaded version for compression/decompression */ 532 int 533 parallel_blosc(void) 534 { 535 int rc; 536 537 /* Synchronization point for all threads (wait for initialization) */ 538 rc = pthread_barrier_wait(&barr_init); 539 if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { 540 printf("Could not wait on barrier (init)\n"); 541 exit(-1); 542 } 543 /* Synchronization point for all threads (wait for finalization) */ 544 rc = pthread_barrier_wait(&barr_finish); 545 if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { 546 printf("Could not wait on barrier (finish)\n"); 547 exit(-1); 548 } 549 550 /* Return the total bytes decompressed in threads */ 551 return params.ntbytes; 552 } 553 554 555 /* Decompress & unshuffle several blocks in a single thread */ 556 void *t_blosc(void *tids) 557 { 558 int tid = *(int *)tids; 559 int cbytes, ntdest; 560 unsigned int tblocks; /* number of blocks per thread */ 561 unsigned int leftover2; 562 int nblock_; /* private copy of nblock */ 563 int tblock; /* limit block on a thread */ 564 int rc; 565 unsigned int bsize, leftoverblock; 566 567 while (1) { 568 /* Meeting point for all threads (wait for initialization) */ 569 rc = pthread_barrier_wait(&barr_init); 570 if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { 571 printf("Could not wait on barrier (init)\n"); 572 exit(-1); 573 } 574 575 /* Get parameters for this thread */ 576 size_t typesize = params.typesize; 577 size_t blocksize = params.blocksize; 578 size_t ebsize = blocksize + params.typesize*sizeof(int); 579 int compress = params.compress; 580 int clevel = params.clevel; 581 int shuffle = params.shuffle; 582 unsigned int nbytes = params.nbytes; 583 unsigned int ntbytes = params.ntbytes; 584 unsigned int nblocks = params.nblocks; 585 unsigned int leftover = params.leftover; 586 unsigned int *bstarts = params.bstarts; 587 unsigned char *src = params.src; 588 unsigned char *dest = params.dest; 589 unsigned char *tmp = params.tmp[tid]; 590 unsigned char *tmp2 = params.tmp2[tid]; 591 592 giveup = 0; 593 if (compress) { 594 /* Compression always has to follow the block order */ 595 pthread_mutex_lock(&count_mutex); 596 nblock++; 597 nblock_ = nblock; 598 pthread_mutex_unlock(&count_mutex); 599 tblock = nblocks; 600 } 601 else { 602 /* Decompression can happen using any order. We choose 603 sequential block order on each thread */ 604 605 /* Blocks per thread */ 606 tblocks = nblocks / nthreads; 607 leftover2 = nblocks % nthreads; 608 tblocks = (leftover2>0)? tblocks+1: tblocks; 609 610 nblock_ = tid*tblocks; 611 tblock = nblock_ + tblocks; 612 if (tblock > nblocks) { 613 tblock = nblocks; 614 } 615 ntbytes = 0; 616 } 617 618 /* Loop over blocks */ 619 leftoverblock = 0; 620 while ((nblock_ < tblock) && !giveup) { 621 bsize = blocksize; 622 if (nblock_ == (nblocks - 1) && (leftover > 0)) { 623 bsize = leftover; 624 leftoverblock = 1; 625 } 626 if (compress) { 627 cbytes = blosc_c(bsize, leftoverblock, 0, ebsize, 628 src+nblock_*blocksize, tmp2, tmp); 629 } 630 else { 631 cbytes = blosc_d(bsize, leftoverblock, 632 src+bstarts[nblock_], dest+nblock_*blocksize, 633 tmp, tmp2); 634 } 635 636 /* Check whether current thread has to giveup */ 637 /* This is critical in order to not overwrite variables */ 638 if (giveup != 0) { 639 break; 640 } 641 642 /* Check results for the decompressed block */ 643 if (cbytes < 0) { /* compr/decompr failure */ 644 /* Set cbytes code error first */ 645 pthread_mutex_lock(&count_mutex); 646 params.ntbytes = cbytes; 647 pthread_mutex_unlock(&count_mutex); 648 giveup = 1; /* give up (de-)compressing after */ 649 break; 650 } 651 652 if (compress) { 653 /* Start critical section */ 654 pthread_mutex_lock(&count_mutex); 655 if (cbytes == 0) { /* uncompressible buffer */ 656 params.ntbytes = 0; 657 pthread_mutex_unlock(&count_mutex); 658 giveup = 1; /* give up compressing */ 659 break; 660 } 661 bstarts[nblock_] = params.ntbytes; /* update block start counter */ 662 ntdest = params.ntbytes; 663 if (ntdest+cbytes > nbytes) { /* uncompressible buffer */ 664 params.ntbytes = 0; 665 pthread_mutex_unlock(&count_mutex); 666 giveup = 1; 667 break; 668 } 669 nblock++; 670 nblock_ = nblock; 671 params.ntbytes += cbytes; /* update return bytes counter */ 672 pthread_mutex_unlock(&count_mutex); 673 /* End of critical section */ 674 675 /* Copy the compressed buffer to destination */ 676 memcpy(dest+ntdest, tmp2, cbytes); 677 } 678 else { 679 nblock_++; 680 } 681 /* Update counter for this thread */ 682 ntbytes += cbytes; 683 684 } /* closes while (nblock_) */ 685 686 if (!compress && !giveup) { 687 /* Update global counter for all threads (decompression only) */ 688 pthread_mutex_lock(&count_mutex); 689 params.ntbytes += ntbytes; 690 pthread_mutex_unlock(&count_mutex); 691 } 692 693 /* Meeting point for all threads (wait for finalization) */ 694 rc = pthread_barrier_wait(&barr_finish); 695 if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { 696 printf("Could not wait on barrier (finish)\n"); 697 exit(-1); 698 } 699 /* Reset block counter */ 700 nblock = -1; 701 702 } /* closes while(1) */ 703 704 /* This should never be reached, but anyway */ 705 return(0); 706 } 707 708 709 int 710 blosc_init_threads(int nthreads_) 711 { 712 int tid, rc; 713 714 //printf("Init!\n"); 715 if (init_done) { 716 //printf("init already done!\n"); 717 return(0); 718 } 719 720 nthreads = nthreads_; 721 if (nthreads > MAX_THREADS) { 722 fprintf(stderr, "Error. nthreads cannot be larger than MAX_THREADS (%d)", 723 MAX_THREADS); 724 exit(-1); 725 } 726 else if (nthreads <= 0) { 727 fprintf(stderr, "Error. nthreads must be a positive integer"); 728 exit(-1); 729 } 730 731 /* Initialize mutex and condition variable objects */ 732 pthread_mutex_init(&count_mutex, NULL); 733 734 /* Barrier initialization */ 735 pthread_barrier_init(&barr_init, NULL, nthreads+1); 736 pthread_barrier_init(&barr_inter, NULL, nthreads); 737 pthread_barrier_init(&barr_finish, NULL, nthreads+1); 614 738 615 739 /* Initialize and set thread detached attribute */ 616 pthread_attr_init(&attr); 617 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 618 619 for(t=0; t < nthreads; t++){ 620 params[t].thread_id = t; 621 params[t].dounshuffle = dounshuffle; 622 params[t].typesize = typesize; 623 params[t].blocksize = blocksize; 624 params[t].tblocks = tblocks; 625 params[t].thblocks = thblocks; 626 params[t].bstarts = bstarts + t*thblocks; 627 if (t == (nthreads - 1) && leftover > 0) { 628 params[t].leftover = leftover; 629 } 630 else { 631 params[t].leftover = 0; 632 } 633 params[t].src = (unsigned char *)src; 634 params[t].dest = (unsigned char *)dest + t*thblocks*blocksize; 635 params[t].tmp = tmp[t]; 636 params[t].tmp2 = tmp2[t]; 637 if (nthreads > 1) { 638 rc = pthread_create(&threads[t], &attr, t_blosc_d, (void *)¶ms[t]); 639 if (rc){ 640 fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc); 641 fprintf(stderr, "\tError detail: %s\n", strerror(rc)); 642 exit(-1); 643 } 644 } 645 else { 646 status = t_blosc_d((void *)¶ms[t]); 647 cbytes = ((struct thread_data_d *)status)->cbytes; 648 if (cbytes < 0) { 649 nbytes = cbytes; /* _blosc_d failure */ 650 goto out; 651 } 652 ntbytes += cbytes; /* update decompressed bytes */ 653 } 654 } 655 656 /* Wait for threads to finish (join threads) */ 657 pthread_attr_destroy(&attr); /* free attribute */ 658 if (nthreads > 1) { 659 for(t=0; t < nthreads; t++){ 660 rc = pthread_join(threads[t], &status); 661 if (rc) { 662 fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc); 663 fprintf(stderr, "\tError detail: %s\n", strerror(rc)); 664 exit(-1); 665 } 666 if (giveup) { 667 continue; 668 } 669 cbytes = ((struct thread_data_d *)status)->cbytes; 670 if (cbytes < 0) { 671 nbytes = cbytes; /* _blosc_d failure */ 672 giveup = 1; 673 } 674 ntbytes += cbytes; /* update decompressed bytes */ 675 } 676 } 677 678 out: 679 /* Release buffers */ 680 for(t=0; t < nthreads; t++){ 681 #ifdef _WIN32 682 _aligned_free(tmp[t]); 683 _aligned_free(tmp2[t]); 684 #else 685 free(tmp[t]); 686 free(tmp2[t]); 687 #endif /* _WIN32 */ 688 } 689 690 /* assert(ntbytes+bstarts[0] == ctbytes); */ 691 return nbytes; 692 } 740 pthread_attr_init(&ct_attr); 741 pthread_attr_setdetachstate(&ct_attr, PTHREAD_CREATE_DETACHED); 742 743 /* Finally, create the threads in detached state */ 744 for (tid = 0; tid < nthreads; tid++) { 745 tids[tid] = tid; 746 rc = pthread_create(&threads[tid], &ct_attr, t_blosc, (void *)&tids[tid]); 747 if (rc) { 748 fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc); 749 fprintf(stderr, "\tError detail: %s\n", strerror(rc)); 750 exit(-1); 751 } 752 } 753 754 init_done = 1; /* Initialization done! */ 755 756 return(0); 757 } -
branches/threaded/src/blosc.h
r51 r59 25 25 /* The combined blosc and blosclz formats */ 26 26 #define BLOSC_VERSION_CFORMAT (BLOSC_VERSION_FORMAT << 8) & (BLOSCLZ_VERSION_FORMAT) 27 28 29 /* Initialize pool of threads and other structures */ 30 int blosc_init_threads(int nthreads); 27 31 28 32
Note: See TracChangeset
for help on using the changeset viewer.
![(please configure the [header_logo] section in trac.ini)](/images/blosc-logo-small.png)