diff options
| author | Justin Bedo <cu@cua0.org> | 2023-02-09 09:35:55 +1100 | 
|---|---|---|
| committer | Justin Bedo <cu@cua0.org> | 2023-02-09 09:36:19 +1100 | 
| commit | 12d46be8c70205662d96fc26aee19fba06fd7ddf (patch) | |
| tree | 1bca999dde086ff020aa56cb6e39d7aa2e3af1af /pca.c | |
| parent | d55277694dc2e8317dd4c0ea990a2e2bf41f6d4d (diff) | |
switch to multicore version
Diffstat (limited to 'pca.c')
| -rw-r--r-- | pca.c | 32309 | 
1 files changed, 30202 insertions, 2107 deletions
| @@ -40,6 +40,7 @@ int futhark_context_config_set_tuning_param(struct futhark_context_config *cfg,  struct futhark_context;  struct futhark_context *futhark_context_new(struct futhark_context_config *cfg);  void futhark_context_free(struct futhark_context *cfg); +void futhark_context_config_set_num_threads(struct futhark_context_config *cfg, int n);  void futhark_context_config_set_debugging(struct futhark_context_config *cfg, int flag);  void futhark_context_config_set_profiling(struct futhark_context_config *cfg, int flag);  void futhark_context_config_set_logging(struct futhark_context_config *cfg, int flag); @@ -74,7 +75,7 @@ void futhark_context_set_logging_file(struct futhark_context *ctx, FILE *f);  void futhark_context_pause_profiling(struct futhark_context *ctx);  void futhark_context_unpause_profiling(struct futhark_context *ctx);  int futhark_context_clear_caches(struct futhark_context *ctx); -#define FUTHARK_BACKEND_c +#define FUTHARK_BACKEND_multicore  #define FUTHARK_SUCCESS 0  #define FUTHARK_PROGRAM_ERROR 2  #define FUTHARK_OUT_OF_MEMORY 3 @@ -4886,12 +4887,1180 @@ struct memblock {  struct constants {      int dummy;  }; +static int32_t dlossziwithacc_locks_mem_realtype_24530[100151]; +static int32_t dlossziwithacc_locks_mem_realtype_24545[100151]; +static int32_t dlossziwithacc_locks_mem_realtype_24546[100151]; +static int32_t dlossziwithacc_locks_mem_realtype_24613[100151]; +static int32_t dlossziwithacc_locks_mem_realtype_24630[100151]; +static int32_t pcaWithQuantileziwithacc_locks_mem_realtype_24828[100151]; +static int32_t pcaWithQuantileziwithacc_locks_mem_realtype_24843[100151]; +static int32_t pcaWithQuantileziwithacc_locks_mem_realtype_24844[100151]; +static int32_t pcaWithQuantileziwithacc_locks_mem_realtype_24911[100151]; +static int32_t pcaWithQuantileziwithacc_locks_mem_realtype_24928[100151];  static const int num_tuning_params = 0;  static const char *tuning_param_names[1];  static const char *tuning_param_vars[1];  static const char *tuning_param_classes[1];  static int64_t *tuning_param_defaults[1]; -// Start of backends/c.h +// start of scheduler.h + +// First, the API that the generated code will access.  In principle, +// we could then compile the scheduler separately and link an object +// file with the generated code.  In practice, we will embed all of +// this in the generated code. + +// Scheduler handle. +struct scheduler; + +// Initialise a scheduler (and start worker threads). +static int scheduler_init(struct scheduler *scheduler, +                          int num_workers, +                          double kappa); + +// Shut down a scheduler (and destroy worker threads). +static int scheduler_destroy(struct scheduler *scheduler); + +// Figure out the smallest amount of work that amortises task +// creation. +static int determine_kappa(double *kappa); + +// How a segop should be scheduled. +enum scheduling { +  DYNAMIC, +  STATIC +}; + +// How a given task should be executed.  Filled out by the scheduler +// and passed to the segop function +struct scheduler_info { +  int64_t iter_pr_subtask; +  int64_t remainder; +  int nsubtasks; +  enum scheduling sched; +  int wake_up_threads; + +  int64_t *task_time; +  int64_t *task_iter; +}; + +// A segop function.  This is what you hand the scheduler for +// execution. +typedef int (*segop_fn)(void* args, +                        int64_t iterations, +                        int tid, +                        struct scheduler_info info); + +// A task for the scheduler to execute. +struct scheduler_segop { +  void *args; +  segop_fn top_level_fn; +  segop_fn nested_fn; +  int64_t iterations; +  enum scheduling sched; + +  // Pointers to timer and iter associated with the task +  int64_t *task_time; +  int64_t *task_iter; + +  // For debugging +  const char* name; +}; + +static inline int scheduler_prepare_task(struct scheduler *scheduler, +                                         struct scheduler_segop *task); + +typedef int (*parloop_fn)(void* args, +                          int64_t start, +                          int64_t end, +                          int subtask_id, +                          int tid); + +// A parallel parloop task. +struct scheduler_parloop { +  void* args; +  parloop_fn fn; +  int64_t iterations; +  struct scheduler_info info; + +  // For debugging +  const char* name; +}; + +static inline int scheduler_execute_task(struct scheduler *scheduler, +                                         struct scheduler_parloop *task); + +// Then the API implementation. + +#include <signal.h> + +#if defined(_WIN32) +#include <windows.h> +#elif defined(__APPLE__) +#include <sys/sysctl.h> +// For getting cpu usage of threads +#include <mach/mach.h> +#include <sys/resource.h> +#elif defined(__linux__) +#include <sys/sysinfo.h> +#include <sys/resource.h> +#include <signal.h> +#elif defined(__EMSCRIPTEN__) +#include <emscripten/threading.h> +#include <sys/sysinfo.h> +#include <sys/resource.h> +#include <signal.h> +#endif + +/* Multicore Utility functions */ + +/* A wrapper for getting rusage on Linux and MacOS */ +/* TODO maybe figure out this for windows */ +static inline int getrusage_thread(struct rusage *rusage) +{ +  int err = -1; +#if  defined(__APPLE__) +    thread_basic_info_data_t info = { 0 }; +    mach_msg_type_number_t info_count = THREAD_BASIC_INFO_COUNT; +    kern_return_t kern_err; + +    kern_err = thread_info(mach_thread_self(), +                           THREAD_BASIC_INFO, +                           (thread_info_t)&info, +                           &info_count); +    if (kern_err == KERN_SUCCESS) { +        memset(rusage, 0, sizeof(struct rusage)); +        rusage->ru_utime.tv_sec = info.user_time.seconds; +        rusage->ru_utime.tv_usec = info.user_time.microseconds; +        rusage->ru_stime.tv_sec = info.system_time.seconds; +        rusage->ru_stime.tv_usec = info.system_time.microseconds; +        err = 0; +    } else { +        errno = EINVAL; +    } +#elif defined(__linux__) || __EMSCRIPTEN__ +    err = getrusage(RUSAGE_THREAD, rusage); +#endif +    return err; +} + +/* returns the number of logical cores */ +static int num_processors() +{ +#if  defined(_WIN32) +/* https://docs.microsoft.com/en-us/windows/win32/api/sysinfoapi/ns-sysinfoapi-system_info */ +    SYSTEM_INFO sysinfo; +    GetSystemInfo(&sysinfo); +    int ncores = sysinfo.dwNumberOfProcessors; +    fprintf(stderr, "Found %d cores on your Windows machine\n Is that correct?\n", ncores); +    return ncores; +#elif defined(__APPLE__) +    int ncores; +    size_t ncores_size = sizeof(ncores); +    CHECK_ERRNO(sysctlbyname("hw.logicalcpu", &ncores, &ncores_size, NULL, 0), +                "sysctlbyname (hw.logicalcpu)"); +    return ncores; +#elif defined(__linux__) +  return get_nprocs(); +#elif __EMSCRIPTEN__ +  return emscripten_num_logical_cores(); +#else +  fprintf(stderr, "operating system not recognised\n"); +  return -1; +#endif +} + +static unsigned int g_seed; + +// Used to seed the generator. +static inline void fast_srand(unsigned int seed) { +    g_seed = seed; +} + +// Compute a pseudorandom integer. +// Output value in range [0, 32767] +static inline unsigned int fast_rand(void) { +    g_seed = (214013*g_seed+2531011); +    return (g_seed>>16)&0x7FFF; +} + +struct subtask_queue { +  int capacity;             // Size of the buffer. +  int first;                // Index of the start of the ring buffer. +  int num_used;             // Number of used elements in the buffer. +  struct subtask **buffer; + +  pthread_mutex_t mutex;    // Mutex used for synchronisation. +  pthread_cond_t cond;      // Condition variable used for synchronisation. +  int dead; + +#if defined(MCPROFILE) +  /* Profiling fields */ +  uint64_t time_enqueue; +  uint64_t time_dequeue; +  uint64_t n_dequeues; +  uint64_t n_enqueues; +#endif +}; + +/* A subtask that can be executed by a worker */ +struct subtask { +  /* The parloop function */ +  parloop_fn fn; +  /* Execution parameters */ +  void* args; +  int64_t start, end; +  int id; + +  /* Dynamic scheduling parameters */ +  int chunkable; +  int64_t chunk_size; + +  /* Shared variables across subtasks */ +  volatile int *counter; // Counter for ongoing subtasks +  // Shared task timers and iterators +  int64_t *task_time; +  int64_t *task_iter; + +  /* For debugging */ +  const char *name; +}; + + +struct worker { +  pthread_t thread; +  struct scheduler *scheduler;  /* Reference to the scheduler struct the worker belongs to*/ +  struct subtask_queue q; +  int dead; +  int tid;                      /* Just a thread id */ + +  /* "thread local" time fields used for online algorithm */ +  uint64_t timer; +  uint64_t total; +  int nested; /* How nested the current computation is */ + +  // Profiling fields +  int output_usage;            /* Whether to dump thread usage */ +  uint64_t time_spent_working; /* Time spent in parloop functions */ +}; + +static inline void output_worker_usage(struct worker *worker) +{ +  struct rusage usage; +  CHECK_ERRNO(getrusage_thread(&usage), "getrusage_thread"); +  struct timeval user_cpu_time = usage.ru_utime; +  struct timeval sys_cpu_time = usage.ru_stime; +  fprintf(stderr, "tid: %2d - work time %10llu us - user time: %10llu us - sys: %10llu us\n", +          worker->tid, +          (long long unsigned)worker->time_spent_working / 1000, +          (long long unsigned)(user_cpu_time.tv_sec * 1000000 + user_cpu_time.tv_usec), +          (long long unsigned)(sys_cpu_time.tv_sec * 1000000 + sys_cpu_time.tv_usec)); +} + +/* Doubles the size of the queue */ +static inline int subtask_queue_grow_queue(struct subtask_queue *subtask_queue) { + +  int new_capacity = 2 * subtask_queue->capacity; +#ifdef MCDEBUG +  fprintf(stderr, "Growing queue to %d\n", subtask_queue->capacity * 2); +#endif + +  struct subtask **new_buffer = calloc(new_capacity, sizeof(struct subtask*)); +  for (int i = 0; i < subtask_queue->num_used; i++) { +    new_buffer[i] = subtask_queue->buffer[(subtask_queue->first + i) % subtask_queue->capacity]; +  } + +  free(subtask_queue->buffer); +  subtask_queue->buffer = new_buffer; +  subtask_queue->capacity = new_capacity; +  subtask_queue->first = 0; + +  return 0; +} + +// Initialise a job queue with the given capacity.  The queue starts out +// empty.  Returns non-zero on error. +static inline int subtask_queue_init(struct subtask_queue *subtask_queue, int capacity) +{ +  assert(subtask_queue != NULL); +  memset(subtask_queue, 0, sizeof(struct subtask_queue)); + +  subtask_queue->capacity = capacity; +  subtask_queue->buffer = calloc(capacity, sizeof(struct subtask*)); +  if (subtask_queue->buffer == NULL) { +    return -1; +  } + +  CHECK_ERRNO(pthread_mutex_init(&subtask_queue->mutex, NULL), "pthread_mutex_init"); +  CHECK_ERRNO(pthread_cond_init(&subtask_queue->cond, NULL), "pthread_cond_init"); + +  return 0; +} + +// Destroy the job queue.  Blocks until the queue is empty before it +// is destroyed. +static inline int subtask_queue_destroy(struct subtask_queue *subtask_queue) +{ +  assert(subtask_queue != NULL); + +  CHECK_ERR(pthread_mutex_lock(&subtask_queue->mutex), "pthread_mutex_lock"); + +  while (subtask_queue->num_used != 0) { +    CHECK_ERR(pthread_cond_wait(&subtask_queue->cond, &subtask_queue->mutex), "pthread_cond_wait"); +  } + +  // Queue is now empty.  Let's kill it! +  subtask_queue->dead = 1; +  free(subtask_queue->buffer); +  CHECK_ERR(pthread_cond_broadcast(&subtask_queue->cond), "pthread_cond_broadcast"); +  CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); + +  return 0; +} + +static inline void dump_queue(struct worker *worker) +{ +  struct subtask_queue *subtask_queue = &worker->q; +  CHECK_ERR(pthread_mutex_lock(&subtask_queue->mutex), "pthread_mutex_lock"); +  for (int i = 0; i < subtask_queue->num_used; i++) { +    struct subtask * subtask = subtask_queue->buffer[(subtask_queue->first + i) % subtask_queue->capacity]; +    printf("queue tid %d with %d task %s\n", worker->tid, i, subtask->name); +  } +  CHECK_ERR(pthread_cond_broadcast(&subtask_queue->cond), "pthread_cond_broadcast"); +  CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); +} + +// Push an element onto the end of the job queue.  Blocks if the +// subtask_queue is full (its size is equal to its capacity).  Returns +// non-zero on error.  It is an error to push a job onto a queue that +// has been destroyed. +static inline int subtask_queue_enqueue(struct worker *worker, struct subtask *subtask ) +{ +  assert(worker != NULL); +  struct subtask_queue *subtask_queue = &worker->q; + +#ifdef MCPROFILE +  uint64_t start = get_wall_time(); +#endif + +  CHECK_ERR(pthread_mutex_lock(&subtask_queue->mutex), "pthread_mutex_lock"); +  // Wait until there is room in the subtask_queue. +  while (subtask_queue->num_used == subtask_queue->capacity && !subtask_queue->dead) { +    if (subtask_queue->num_used == subtask_queue->capacity) { +      CHECK_ERR(subtask_queue_grow_queue(subtask_queue), "subtask_queue_grow_queue"); +      continue; +    } +    CHECK_ERR(pthread_cond_wait(&subtask_queue->cond, &subtask_queue->mutex), "pthread_cond_wait"); +  } + +  if (subtask_queue->dead) { +    CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); +    return -1; +  } + +  // If we made it past the loop, there is room in the subtask_queue. +  subtask_queue->buffer[(subtask_queue->first + subtask_queue->num_used) % subtask_queue->capacity] = subtask; +  subtask_queue->num_used++; + +#ifdef MCPROFILE +  uint64_t end = get_wall_time(); +  subtask_queue->time_enqueue += (end - start); +  subtask_queue->n_enqueues++; +#endif +  // Broadcast a reader (if any) that there is now an element. +  CHECK_ERR(pthread_cond_broadcast(&subtask_queue->cond), "pthread_cond_broadcast"); +  CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); + +  return 0; +} + + +/* Like subtask_queue_dequeue, but with two differences: +   1) the subtask is stolen from the __front__ of the queue +   2) returns immediately if there is no subtasks queued, +      as we dont' want to block on another workers queue and +*/ +static inline int subtask_queue_steal(struct worker *worker, +                                      struct subtask **subtask) +{ +  struct subtask_queue *subtask_queue = &worker->q; +  assert(subtask_queue != NULL); + +#ifdef MCPROFILE +  uint64_t start = get_wall_time(); +#endif +  CHECK_ERR(pthread_mutex_lock(&subtask_queue->mutex), "pthread_mutex_lock"); + +  if (subtask_queue->num_used == 0) { +    CHECK_ERR(pthread_cond_broadcast(&subtask_queue->cond), "pthread_cond_broadcast"); +    CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); +    return 1; +  } + +  if (subtask_queue->dead) { +    CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); +    return -1; +  } + +  // Tasks gets stolen from the "front" +  struct subtask *cur_back = subtask_queue->buffer[subtask_queue->first]; +  struct subtask *new_subtask = NULL; +  int remaining_iter = cur_back->end - cur_back->start; +  // If subtask is chunkable, we steal half of the iterations +  if (cur_back->chunkable && remaining_iter > 1) { +      int64_t half = remaining_iter / 2; +      new_subtask = malloc(sizeof(struct subtask)); +      *new_subtask = *cur_back; +      new_subtask->start = cur_back->end - half; +      cur_back->end = new_subtask->start; +      __atomic_fetch_add(cur_back->counter, 1, __ATOMIC_RELAXED); +  } else { +    new_subtask = cur_back; +    subtask_queue->num_used--; +    subtask_queue->first = (subtask_queue->first + 1) % subtask_queue->capacity; +  } +  *subtask = new_subtask; + +  if (*subtask == NULL) { +    CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthred_mutex_unlock"); +    return 1; +  } + +#ifdef MCPROFILE +  uint64_t end = get_wall_time(); +  subtask_queue->time_dequeue += (end - start); +  subtask_queue->n_dequeues++; +#endif + +  // Broadcast a writer (if any) that there is now room for more. +  CHECK_ERR(pthread_cond_broadcast(&subtask_queue->cond), "pthread_cond_broadcast"); +  CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); + +  return 0; +} + + +// Pop an element from the back of the job queue. +// Optional argument can be provided to block or not +static inline int subtask_queue_dequeue(struct worker *worker, +                                        struct subtask **subtask, int blocking) +{ +  assert(worker != NULL); +  struct subtask_queue *subtask_queue = &worker->q; + +#ifdef MCPROFILE +  uint64_t start = get_wall_time(); +#endif + +  CHECK_ERR(pthread_mutex_lock(&subtask_queue->mutex), "pthread_mutex_lock"); +  if (subtask_queue->num_used == 0 && !blocking) { +    CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); +    return 1; +  } +  // Try to steal some work while the subtask_queue is empty +  while (subtask_queue->num_used == 0 && !subtask_queue->dead) { +    pthread_cond_wait(&subtask_queue->cond, &subtask_queue->mutex); +  } + +  if (subtask_queue->dead) { +    CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); +    return -1; +  } + +  // dequeue pops from the back +  *subtask = subtask_queue->buffer[(subtask_queue->first + subtask_queue->num_used - 1) % subtask_queue->capacity]; +  subtask_queue->num_used--; + +  if (*subtask == NULL) { +    assert(!"got NULL ptr"); +    CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthred_mutex_unlock"); +    return -1; +  } + +#ifdef MCPROFILE +  uint64_t end = get_wall_time(); +  subtask_queue->time_dequeue += (end - start); +  subtask_queue->n_dequeues++; +#endif + +  // Broadcast a writer (if any) that there is now room for more. +  CHECK_ERR(pthread_cond_broadcast(&subtask_queue->cond), "pthread_cond_broadcast"); +  CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); + +  return 0; +} + +static inline int subtask_queue_is_empty(struct subtask_queue *subtask_queue) +{ +  return subtask_queue->num_used == 0; +} + +/* Scheduler definitions */ + +struct scheduler { +  struct worker *workers; +  int num_threads; +  int minimum_chunk_size; + + +  // If there is work to steal => active_work > 0 +  volatile int active_work; + +  // Only one error can be returned at the time now.  Maybe we can +  // provide a stack like structure for pushing errors onto if we wish +  // to backpropagte multiple errors +  volatile int error; + +  // kappa time unit in nanoseconds +  double kappa; +}; + + +// Thread local variable worker struct +// Note that, accesses to tls variables are expensive +// Minimize direct references to this variable +__thread struct worker* worker_local = NULL; + +static int64_t total_now(int64_t total, int64_t time) { +  return total + (get_wall_time_ns() - time); +} + +static int random_other_worker(struct scheduler *scheduler, int my_id) { +  int my_num_workers = scheduler->num_threads; +  assert(my_num_workers != 1); +  int i = fast_rand() % (my_num_workers - 1); +  if (i >= my_id) { +    i++; +  } +#ifdef MCDEBUG +  assert(i >= 0); +  assert(i < my_num_workers); +  assert(i != my_id); +#endif + +  return i; +} + +static inline int64_t compute_chunk_size(int64_t minimum_chunk_size, double kappa, struct subtask* subtask) +{ +  double C = (double)*subtask->task_time / (double)*subtask->task_iter; +  if (C == 0.0F) C += DBL_EPSILON; +  return smax64((int64_t)(kappa / C), minimum_chunk_size); +} + +/* Takes a chunk from subtask and enqueues the remaining iterations onto the worker's queue */ +/* A no-op if the subtask is not chunkable */ +static inline struct subtask* chunk_subtask(struct worker* worker, struct subtask *subtask) +{ +  if (subtask->chunkable) { +    // Do we have information from previous runs avaliable +    if (*subtask->task_iter > 0) { +      subtask->chunk_size = compute_chunk_size(worker->scheduler->minimum_chunk_size, +                                               worker->scheduler->kappa, +                                               subtask); +      assert(subtask->chunk_size > 0); +    } +    int64_t remaining_iter = subtask->end - subtask->start; +    assert(remaining_iter > 0); +    if (remaining_iter > subtask->chunk_size) { +      struct subtask *new_subtask = malloc(sizeof(struct subtask)); +      *new_subtask = *subtask; +      // increment the subtask join counter to account for new subtask +      __atomic_fetch_add(subtask->counter, 1, __ATOMIC_RELAXED); +      // Update range parameters +      subtask->end = subtask->start + subtask->chunk_size; +      new_subtask->start = subtask->end; +      subtask_queue_enqueue(worker, new_subtask); +    } +  } +  return subtask; +} + +static inline int run_subtask(struct worker* worker, struct subtask* subtask) +{ +  assert(subtask != NULL); +  assert(worker != NULL); + +  subtask = chunk_subtask(worker, subtask); +  worker->total = 0; +  worker->timer = get_wall_time_ns(); +#if defined(MCPROFILE) +  int64_t start = worker->timer; +#endif +  worker->nested++; +  int err = subtask->fn(subtask->args, subtask->start, subtask->end, +                        subtask->id, +                        worker->tid); +  worker->nested--; +  // Some error occured during some other subtask +  // so we just clean-up and return +  if (worker->scheduler->error != 0) { +    // Even a failed task counts as finished. +    __atomic_fetch_sub(subtask->counter, 1, __ATOMIC_RELAXED); +    free(subtask); +    return 0; +  } +  if (err != 0) { +    __atomic_store_n(&worker->scheduler->error, err, __ATOMIC_RELAXED); +  } +  // Total sequential time spent +  int64_t time_elapsed = total_now(worker->total, worker->timer); +#if defined(MCPROFILE) +  worker->time_spent_working += get_wall_time_ns() - start; +#endif +  int64_t iter = subtask->end - subtask->start; +  // report measurements +  // These updates should really be done using a single atomic CAS operation +  __atomic_fetch_add(subtask->task_time, time_elapsed, __ATOMIC_RELAXED); +  __atomic_fetch_add(subtask->task_iter, iter, __ATOMIC_RELAXED); +  // We need a fence here, since if the counter is decremented before either +  // of the two above are updated bad things can happen, e.g. if they are stack-allocated +  __atomic_thread_fence(__ATOMIC_SEQ_CST); +  __atomic_fetch_sub(subtask->counter, 1, __ATOMIC_RELAXED); +  free(subtask); +  return 0; +} + + +static inline int is_small(struct scheduler_segop *task, struct scheduler *scheduler, int *nsubtasks) +{ +  int64_t time = *task->task_time; +  int64_t iter = *task->task_iter; + +  if (task->sched == DYNAMIC || iter == 0) { +    *nsubtasks = scheduler->num_threads; +    return 0; +  } + +  // Estimate the constant C +  double C = (double)time / (double)iter; +  double cur_task_iter = (double) task->iterations; + +  // Returns true if the task is small i.e. +  // if the number of iterations times C is smaller +  // than the overhead of subtask creation +  if (C == 0.0F || C * cur_task_iter < scheduler->kappa) { +    *nsubtasks = 1; +    return 1; +  } + +  // Else compute how many subtasks this tasks should create +  int64_t min_iter_pr_subtask = smax64(scheduler->kappa / C, 1); +  *nsubtasks = smin64(smax64(task->iterations / min_iter_pr_subtask, 1), scheduler->num_threads); + +  return 0; +} + +// TODO make this prettier +static inline struct subtask* create_subtask(parloop_fn fn, +                                             void* args, +                                             const char* name, +                                             volatile int* counter, +                                             int64_t *timer, +                                             int64_t *iter, +                                             int64_t start, int64_t end, +                                             int chunkable, +                                             int64_t chunk_size, +                                             int id) +{ +  struct subtask* subtask = malloc(sizeof(struct subtask)); +  if (subtask == NULL) { +    assert(!"malloc failed in create_subtask"); +    return NULL; +  } +  subtask->fn         = fn; +  subtask->args       = args; + +  subtask->counter    = counter; +  subtask->task_time  = timer; +  subtask->task_iter  = iter; + +  subtask->start      = start; +  subtask->end        = end; +  subtask->id         = id; +  subtask->chunkable  = chunkable; +  subtask->chunk_size = chunk_size; + +  subtask->name       = name; +  return subtask; +} + +static int dummy_counter = 0; +static int64_t dummy_timer = 0; +static int64_t dummy_iter = 0; + +static int dummy_fn(void *args, int64_t start, int64_t end, int subtask_id, int tid) { +  (void)args; +  (void)start; +  (void)end; +  (void)subtask_id; +  (void)tid; +  return 0; +} + +// Wake up threads, who are blocking by pushing a dummy task +// onto their queue +static inline void wake_up_threads(struct scheduler *scheduler, int start_tid, int end_tid) { + +#if defined(MCDEBUG) +  assert(start_tid >= 1); +  assert(end_tid <= scheduler->num_threads); +#endif +  for (int i = start_tid; i < end_tid; i++) { +    struct subtask *subtask = create_subtask(dummy_fn, NULL, "dummy_fn", +                                            &dummy_counter, +                                            &dummy_timer, &dummy_iter, +                                            0, 0, +                                            0, 0, +                                            0); +    CHECK_ERR(subtask_queue_enqueue(&scheduler->workers[i], subtask), "subtask_queue_enqueue"); +  } +} + +static inline int is_finished(struct worker *worker) { +  return worker->dead && subtask_queue_is_empty(&worker->q); +} + +// Try to steal from a random queue +static inline int steal_from_random_worker(struct worker* worker) +{ +  int my_id = worker->tid; +  struct scheduler* scheduler = worker->scheduler; +  int k = random_other_worker(scheduler, my_id); +  struct worker *worker_k = &scheduler->workers[k]; +  struct subtask* subtask =  NULL; +  int retval = subtask_queue_steal(worker_k, &subtask); +  if (retval == 0) { +    subtask_queue_enqueue(worker, subtask); +    return 1; +  } +  return 0; +} + + +static inline void *scheduler_worker(void* args) +{ +  struct worker *worker = (struct worker*) args; +  struct scheduler *scheduler = worker->scheduler; +  worker_local = worker; +  struct subtask *subtask = NULL; + +  while(!is_finished(worker)) { +    if (!subtask_queue_is_empty(&worker->q)) { +      int retval = subtask_queue_dequeue(worker, &subtask, 0); +      if (retval == 0) { +        assert(subtask != NULL); +        CHECK_ERR(run_subtask(worker, subtask), "run_subtask"); +      } // else someone stole our work + +    } else if (scheduler->active_work) { /* steal */ +      while (!is_finished(worker) && scheduler->active_work) { +        if (steal_from_random_worker(worker)) { +          break; +        } +      } +    } else { /* go back to sleep and wait for work */ +      int retval = subtask_queue_dequeue(worker, &subtask, 1); +      if (retval == 0) { +        assert(subtask != NULL); +        CHECK_ERR(run_subtask(worker, subtask), "run_subtask"); +      } +    } +  } + +  assert(subtask_queue_is_empty(&worker->q)); +#if defined(MCPROFILE) +  if (worker->output_usage) +    output_worker_usage(worker); +#endif +  return NULL; +} + + +static inline int scheduler_execute_parloop(struct scheduler *scheduler, +                                            struct scheduler_parloop *task, +                                            int64_t *timer) +{ + +  struct worker *worker = worker_local; + +  struct scheduler_info info = task->info; +  int64_t iter_pr_subtask = info.iter_pr_subtask; +  int64_t remainder = info.remainder; +  int nsubtasks = info.nsubtasks; +  volatile int join_counter = nsubtasks; + +  // Shared timer used to sum up all +  // sequential work from each subtask +  int64_t task_timer = 0; +  int64_t task_iter = 0; + +  enum scheduling sched = info.sched; +  /* If each subtasks should be processed in chunks */ +  int chunkable = sched == STATIC ? 0 : 1; +  int64_t chunk_size = scheduler->minimum_chunk_size; // The initial chunk size when no info is avaliable + + +  if (info.wake_up_threads || sched == DYNAMIC) +    __atomic_add_fetch(&scheduler->active_work, nsubtasks, __ATOMIC_RELAXED); + +  int64_t start = 0; +  int64_t end = iter_pr_subtask + (int64_t)(remainder != 0); +  for (int subtask_id = 0; subtask_id < nsubtasks; subtask_id++) { +    struct subtask *subtask = create_subtask(task->fn, task->args, task->name, +                                              &join_counter, +                                              &task_timer, &task_iter, +                                              start, end, +                                              chunkable, chunk_size, +                                              subtask_id); +    assert(subtask != NULL); +    // In most cases we will never have more subtasks than workers, +    // but there can be exceptions (e.g. the kappa tuning function). +    struct worker *subtask_worker = +      worker->nested +      ? &scheduler->workers[worker->tid] +      : &scheduler->workers[subtask_id % scheduler->num_threads]; +    CHECK_ERR(subtask_queue_enqueue(subtask_worker, subtask), +              "subtask_queue_enqueue"); +    // Update range params +    start = end; +    end += iter_pr_subtask + ((subtask_id + 1) < remainder); +  } + +  if (info.wake_up_threads) { +    wake_up_threads(scheduler, nsubtasks, scheduler->num_threads); +  } + +  // Join (wait for subtasks to finish) +  while(join_counter != 0) { +    if (!subtask_queue_is_empty(&worker->q)) { +      struct subtask *subtask = NULL; +      int err = subtask_queue_dequeue(worker, &subtask, 0); +      if (err == 0 ) { +        CHECK_ERR(run_subtask(worker, subtask), "run_subtask"); +      } +    } else { +      if (steal_from_random_worker(worker)) { +        struct subtask *subtask = NULL; +        int err = subtask_queue_dequeue(worker, &subtask, 0); +        if (err == 0) { +          CHECK_ERR(run_subtask(worker, subtask), "run_subtask"); +        } +      } +    } +  } + + +  if (info.wake_up_threads || sched == DYNAMIC) { +    __atomic_sub_fetch(&scheduler->active_work, nsubtasks, __ATOMIC_RELAXED); +  } + +  // Write back timing results of all sequential work +  (*timer) += task_timer; +  return scheduler->error; +} + + +static inline int scheduler_execute_task(struct scheduler *scheduler, +                                         struct scheduler_parloop *task) +{ + +  struct worker *worker = worker_local; + +  int err = 0; + +  // How much sequential work was performed by the task +  int64_t task_timer = 0; + +  /* Execute task sequential or parallel based on decision made earlier */ +  if (task->info.nsubtasks == 1) { +    int64_t start = get_wall_time_ns(); +    err = task->fn(task->args, 0, task->iterations, 0, worker->tid); +    int64_t end = get_wall_time_ns(); +    task_timer = end - start; +    worker->time_spent_working += task_timer; +    // Report time measurements +    // TODO the update of both of these should really be a single atomic!! +    __atomic_fetch_add(task->info.task_time, task_timer, __ATOMIC_RELAXED); +    __atomic_fetch_add(task->info.task_iter, task->iterations, __ATOMIC_RELAXED); +  } else { +    // Add "before" time if we already are inside a task +    int64_t time_before = 0; +    if (worker->nested > 0) { +      time_before = total_now(worker->total, worker->timer); +    } + +    err = scheduler_execute_parloop(scheduler, task, &task_timer); + +    // Report time measurements +    // TODO the update of both of these should really be a single atomic!! +    __atomic_fetch_add(task->info.task_time, task_timer, __ATOMIC_RELAXED); +    __atomic_fetch_add(task->info.task_iter, task->iterations, __ATOMIC_RELAXED); + +    // Update timers to account for new timings +    worker->total = time_before + task_timer; +    worker->timer = get_wall_time_ns(); +  } + + +  return err; +} + +/* Decide on how schedule the incoming task i.e. how many subtasks and +   to run sequential or (potentially nested) parallel code body */ +static inline int scheduler_prepare_task(struct scheduler* scheduler, +                                         struct scheduler_segop *task) +{ +  assert(task != NULL); + +  struct worker *worker = worker_local; +  struct scheduler_info info; +  info.task_time = task->task_time; +  info.task_iter = task->task_iter; + +  int nsubtasks; +  // Decide if task should be scheduled sequentially +  if (is_small(task, scheduler, &nsubtasks)) { +    info.iter_pr_subtask = task->iterations; +    info.remainder = 0; +    info.nsubtasks = nsubtasks; +    return task->top_level_fn(task->args, task->iterations, worker->tid, info); +  } else { +    info.iter_pr_subtask = task->iterations / nsubtasks; +    info.remainder = task->iterations % nsubtasks; +    info.sched = task->sched; +    switch (task->sched) { +    case STATIC: +      info.nsubtasks = info.iter_pr_subtask == 0 ? info.remainder : ((task->iterations - info.remainder) / info.iter_pr_subtask); +      break; +    case DYNAMIC: +      // As any thread can take any subtasks, we are being safe with using +      // an upper bound on the number of tasks such that the task allocate enough memory +      info.nsubtasks = info.iter_pr_subtask == 0 ? info.remainder : nsubtasks; +      break; +    default: +      assert(!"Got unknown scheduling"); +    } +  } + +  info.wake_up_threads = 0; +  // We only use the nested parallel segop function if we can't exchaust all cores +  // using the outer most level +  if (task->nested_fn != NULL && info.nsubtasks < scheduler->num_threads && info.nsubtasks == task->iterations) { +    if (worker->nested == 0) +      info.wake_up_threads = 1; +    return task->nested_fn(task->args, task->iterations, worker->tid, info); +  } + +  return task->top_level_fn(task->args, task->iterations, worker->tid, info); +} + +// Now some code for finding the proper value of kappa on a given +// machine (the smallest amount of work that amortises the cost of +// task creation). + +struct tuning_struct { +  int32_t *free_tuning_res; +  int32_t *array; +}; + +// Reduction function over an integer array +static int tuning_loop(void *args, int64_t start, int64_t end, +                                     int flat_tid, int tid) { +  (void)flat_tid; +  (void)tid; + +  int err = 0; +  struct tuning_struct *tuning_struct = (struct tuning_struct *) args; +  int32_t *array = tuning_struct->array; +  int32_t *tuning_res = tuning_struct->free_tuning_res; + +  int32_t sum = 0; +  for (int i = start; i < end; i++) { +    int32_t y = array[i]; +    sum = add32(sum, y); +  } +  *tuning_res = sum; +  return err; +} + +// The main entry point for the tuning process.  Sets the provided +// variable ``kappa``. +static int determine_kappa(double *kappa) { +  int err = 0; + +  int64_t iterations = 100000000; +  int64_t tuning_time = 0; +  int64_t tuning_iter = 0; + +  int32_t *array = malloc(sizeof(int32_t) * iterations); +  for (int64_t i = 0; i < iterations; i++) { +    array[i] = fast_rand(); +  } + +  int64_t start_tuning = get_wall_time_ns(); +  /* **************************** */ +  /* Run sequential reduce first' */ +  /* | 
