Changeset 89


Ignore:
Timestamp:
05/26/10 13:12:55 (4 years ago)
Author:
faltet
Message:

A replacement for barriers for Mac OSX, or other systems not implementing them, has been carried out. Fixes #4.

Location:
trunk/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/blosc.c

    r85 r89  
    6161pthread_attr_t ct_attr;          /* creation time attributes for threads */ 
    6262 
     63#if defined(_POSIX_BARRIERS) && (_POSIX_BARRIERS - 20012L) >= 0 
     64#define _POSIX_BARRIERS_MINE 
     65#endif 
     66 
    6367/* Syncronization variables */ 
    6468pthread_mutex_t count_mutex; 
     69#ifdef _POSIX_BARRIERS_MINE 
    6570pthread_barrier_t barr_init; 
    66 pthread_barrier_t barr_inter; 
    6771pthread_barrier_t barr_finish; 
     72#else 
     73int32_t count_threads = 0; 
     74pthread_mutex_t count_threads_mutex; 
     75pthread_cond_t count_threads_cv; 
     76#endif 
     77 
    6878 
    6979/* Structure for parameters in (de-)compression threads */ 
     
    302312 
    303313  /* Synchronization point for all threads (wait for initialization) */ 
     314#ifdef _POSIX_BARRIERS_MINE 
    304315  rc = pthread_barrier_wait(&barr_init); 
    305316  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { 
     
    307318    exit(-1); 
    308319  } 
     320#else 
     321  pthread_mutex_lock(&count_threads_mutex); 
     322  if (count_threads < nthreads) { 
     323    count_threads++; 
     324    pthread_cond_wait(&count_threads_cv, &count_threads_mutex); 
     325  } 
     326  else { 
     327    pthread_cond_broadcast(&count_threads_cv); 
     328  } 
     329  pthread_mutex_unlock(&count_threads_mutex); 
     330#endif 
     331 
    309332  /* Synchronization point for all threads (wait for finalization) */ 
     333#ifdef _POSIX_BARRIERS_MINE 
    310334  rc = pthread_barrier_wait(&barr_finish); 
    311335  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { 
     
    313337    exit(-1); 
    314338  } 
     339#else 
     340  pthread_mutex_lock(&count_threads_mutex); 
     341  if (count_threads > 0) { 
     342    count_threads--; 
     343    pthread_cond_wait(&count_threads_cv, &count_threads_mutex); 
     344  } 
     345  else { 
     346    pthread_cond_broadcast(&count_threads_cv); 
     347  } 
     348  pthread_mutex_unlock(&count_threads_mutex); 
     349#endif 
    315350 
    316351  /* Return the total bytes decompressed in threads */ 
     
    647682  while (1) { 
    648683    /* Meeting point for all threads (wait for initialization) */ 
     684#ifdef _POSIX_BARRIERS_MINE 
    649685    rc = pthread_barrier_wait(&barr_init); 
    650686    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { 
     
    652688      exit(-1); 
    653689    } 
     690#else 
     691    pthread_mutex_lock(&count_threads_mutex); 
     692    if (count_threads < nthreads) { 
     693      count_threads++; 
     694      pthread_cond_wait(&count_threads_cv, &count_threads_mutex); 
     695    } 
     696    else { 
     697      pthread_cond_broadcast(&count_threads_cv); 
     698    } 
     699    pthread_mutex_unlock(&count_threads_mutex); 
     700#endif 
     701 
    654702    if (end_threads) { 
    655703      return(0); 
     
    772820 
    773821    /* Meeting point for all threads (wait for finalization) */ 
     822#ifdef _POSIX_BARRIERS_MINE 
    774823    rc = pthread_barrier_wait(&barr_finish); 
    775824    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { 
     
    777826      exit(-1); 
    778827    } 
     828#else 
     829    pthread_mutex_lock(&count_threads_mutex); 
     830    if (count_threads > 0) { 
     831      count_threads--; 
     832      pthread_cond_wait(&count_threads_cv, &count_threads_mutex); 
     833    } 
     834    else { 
     835      pthread_cond_broadcast(&count_threads_cv); 
     836    } 
     837    pthread_mutex_unlock(&count_threads_mutex); 
     838#endif 
     839 
    779840    /* Reset block counter */ 
    780841    nblock = -1; 
     
    795856 
    796857  /* Barrier initialization */ 
     858#ifdef _POSIX_BARRIERS_MINE 
    797859  pthread_barrier_init(&barr_init, NULL, nthreads+1); 
    798   pthread_barrier_init(&barr_inter, NULL, nthreads); 
    799860  pthread_barrier_init(&barr_finish, NULL, nthreads+1); 
     861#else 
     862  pthread_mutex_init(&count_threads_mutex, NULL); 
     863  pthread_cond_init(&count_threads_cv, NULL); 
     864#endif 
    800865 
    801866  /* Initialize and set thread detached attribute */ 
     
    837902  else if (nthreads_new != nthreads) { 
    838903    if (nthreads > 1 && init_threads_done) { 
    839       /* Tell all existing threads to end */ 
     904      /* Tell all existing threads to finish */ 
    840905      end_threads = 1; 
     906#ifdef _POSIX_BARRIERS_MINE 
    841907      rc = pthread_barrier_wait(&barr_init); 
    842908      if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { 
     
    844910        exit(-1); 
    845911      } 
     912#else 
     913      pthread_mutex_lock(&count_threads_mutex); 
     914      if (count_threads < nthreads) { 
     915        count_threads++; 
     916        pthread_cond_wait(&count_threads_cv, &count_threads_mutex); 
     917      } 
     918      else { 
     919        pthread_cond_broadcast(&count_threads_cv); 
     920        count_threads = 0;      /* Reset count to 0 */ 
     921      } 
     922      pthread_mutex_unlock(&count_threads_mutex); 
     923#endif 
     924 
    846925      /* Join exiting threads */ 
    847926      for (t=0; t<nthreads; t++) { 
     
    880959  /* Finish the possible thread pool */ 
    881960  if (nthreads > 1 && init_threads_done) { 
    882     /* Tell all existing threads to end */ 
     961    /* Tell all existing threads to finish */ 
    883962    end_threads = 1; 
     963#ifdef _POSIX_BARRIERS_MINE 
    884964    rc = pthread_barrier_wait(&barr_init); 
    885965    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { 
     966        printf("Could not wait on barrier (init)\n"); 
    886967      exit(-1); 
    887968    } 
     969#else 
     970    pthread_mutex_lock(&count_threads_mutex); 
     971    if (count_threads < nthreads) { 
     972      count_threads++; 
     973      pthread_cond_wait(&count_threads_cv, &count_threads_mutex); 
     974    } 
     975    else { 
     976      pthread_cond_broadcast(&count_threads_cv); 
     977      count_threads = 0;      /* Reset count to 0 */ 
     978    } 
     979    pthread_mutex_unlock(&count_threads_mutex); 
     980#endif 
     981 
    888982    /* Join exiting threads */ 
    889983    for (t=0; t<nthreads; t++) { 
     
    900994 
    901995    /* Barriers */ 
     996#ifdef _POSIX_BARRIERS_MINE 
    902997    pthread_barrier_destroy(&barr_init); 
    903     pthread_barrier_destroy(&barr_inter); 
    904998    pthread_barrier_destroy(&barr_finish); 
     999#else 
     1000    pthread_mutex_destroy(&count_threads_mutex); 
     1001    pthread_cond_destroy(&count_threads_cv); 
     1002#endif 
    9051003 
    9061004    /* Thread attributes */ 
  • trunk/src/blosc.h

    r87 r89  
    3232  `nthreads` is 1, then the serial version is chosen and a possible 
    3333  previous existing pool is ended.  Returns the previous number of 
    34   threads. 
     34  threads.  If this is not called, `nthreads` is set to 1 internally. 
    3535 
    3636*/ 
Note: See TracChangeset for help on using the changeset viewer.