diff options
author | Justin Bedo <cu@cua0.org> | 2023-02-09 09:35:55 +1100 |
---|---|---|
committer | Justin Bedo <cu@cua0.org> | 2023-02-09 09:36:19 +1100 |
commit | 12d46be8c70205662d96fc26aee19fba06fd7ddf (patch) | |
tree | 1bca999dde086ff020aa56cb6e39d7aa2e3af1af | |
parent | d55277694dc2e8317dd4c0ea990a2e2bf41f6d4d (diff) |
switch to multicore version
-rw-r--r-- | pca.c | 32309 | ||||
-rw-r--r-- | pca.h | 3 |
2 files changed, 30204 insertions, 2108 deletions
@@ -40,6 +40,7 @@ int futhark_context_config_set_tuning_param(struct futhark_context_config *cfg, struct futhark_context; struct futhark_context *futhark_context_new(struct futhark_context_config *cfg); void futhark_context_free(struct futhark_context *cfg); +void futhark_context_config_set_num_threads(struct futhark_context_config *cfg, int n); void futhark_context_config_set_debugging(struct futhark_context_config *cfg, int flag); void futhark_context_config_set_profiling(struct futhark_context_config *cfg, int flag); void futhark_context_config_set_logging(struct futhark_context_config *cfg, int flag); @@ -74,7 +75,7 @@ void futhark_context_set_logging_file(struct futhark_context *ctx, FILE *f); void futhark_context_pause_profiling(struct futhark_context *ctx); void futhark_context_unpause_profiling(struct futhark_context *ctx); int futhark_context_clear_caches(struct futhark_context *ctx); -#define FUTHARK_BACKEND_c +#define FUTHARK_BACKEND_multicore #define FUTHARK_SUCCESS 0 #define FUTHARK_PROGRAM_ERROR 2 #define FUTHARK_OUT_OF_MEMORY 3 @@ -4886,12 +4887,1180 @@ struct memblock { struct constants { int dummy; }; +static int32_t dlossziwithacc_locks_mem_realtype_24530[100151]; +static int32_t dlossziwithacc_locks_mem_realtype_24545[100151]; +static int32_t dlossziwithacc_locks_mem_realtype_24546[100151]; +static int32_t dlossziwithacc_locks_mem_realtype_24613[100151]; +static int32_t dlossziwithacc_locks_mem_realtype_24630[100151]; +static int32_t pcaWithQuantileziwithacc_locks_mem_realtype_24828[100151]; +static int32_t pcaWithQuantileziwithacc_locks_mem_realtype_24843[100151]; +static int32_t pcaWithQuantileziwithacc_locks_mem_realtype_24844[100151]; +static int32_t pcaWithQuantileziwithacc_locks_mem_realtype_24911[100151]; +static int32_t pcaWithQuantileziwithacc_locks_mem_realtype_24928[100151]; static const int num_tuning_params = 0; static const char *tuning_param_names[1]; static const char *tuning_param_vars[1]; static const char *tuning_param_classes[1]; static int64_t *tuning_param_defaults[1]; -// Start of backends/c.h +// start of scheduler.h + +// First, the API that the generated code will access. In principle, +// we could then compile the scheduler separately and link an object +// file with the generated code. In practice, we will embed all of +// this in the generated code. + +// Scheduler handle. +struct scheduler; + +// Initialise a scheduler (and start worker threads). +static int scheduler_init(struct scheduler *scheduler, + int num_workers, + double kappa); + +// Shut down a scheduler (and destroy worker threads). +static int scheduler_destroy(struct scheduler *scheduler); + +// Figure out the smallest amount of work that amortises task +// creation. +static int determine_kappa(double *kappa); + +// How a segop should be scheduled. +enum scheduling { + DYNAMIC, + STATIC +}; + +// How a given task should be executed. Filled out by the scheduler +// and passed to the segop function +struct scheduler_info { + int64_t iter_pr_subtask; + int64_t remainder; + int nsubtasks; + enum scheduling sched; + int wake_up_threads; + + int64_t *task_time; + int64_t *task_iter; +}; + +// A segop function. This is what you hand the scheduler for +// execution. +typedef int (*segop_fn)(void* args, + int64_t iterations, + int tid, + struct scheduler_info info); + +// A task for the scheduler to execute. +struct scheduler_segop { + void *args; + segop_fn top_level_fn; + segop_fn nested_fn; + int64_t iterations; + enum scheduling sched; + + // Pointers to timer and iter associated with the task + int64_t *task_time; + int64_t *task_iter; + + // For debugging + const char* name; +}; + +static inline int scheduler_prepare_task(struct scheduler *scheduler, + struct scheduler_segop *task); + +typedef int (*parloop_fn)(void* args, + int64_t start, + int64_t end, + int subtask_id, + int tid); + +// A parallel parloop task. +struct scheduler_parloop { + void* args; + parloop_fn fn; + int64_t iterations; + struct scheduler_info info; + + // For debugging + const char* name; +}; + +static inline int scheduler_execute_task(struct scheduler *scheduler, + struct scheduler_parloop *task); + +// Then the API implementation. + +#include <signal.h> + +#if defined(_WIN32) +#include <windows.h> +#elif defined(__APPLE__) +#include <sys/sysctl.h> +// For getting cpu usage of threads +#include <mach/mach.h> +#include <sys/resource.h> +#elif defined(__linux__) +#include <sys/sysinfo.h> +#include <sys/resource.h> +#include <signal.h> +#elif defined(__EMSCRIPTEN__) +#include <emscripten/threading.h> +#include <sys/sysinfo.h> +#include <sys/resource.h> +#include <signal.h> +#endif + +/* Multicore Utility functions */ + +/* A wrapper for getting rusage on Linux and MacOS */ +/* TODO maybe figure out this for windows */ +static inline int getrusage_thread(struct rusage *rusage) +{ + int err = -1; +#if defined(__APPLE__) + thread_basic_info_data_t info = { 0 }; + mach_msg_type_number_t info_count = THREAD_BASIC_INFO_COUNT; + kern_return_t kern_err; + + kern_err = thread_info(mach_thread_self(), + THREAD_BASIC_INFO, + (thread_info_t)&info, + &info_count); + if (kern_err == KERN_SUCCESS) { + memset(rusage, 0, sizeof(struct rusage)); + rusage->ru_utime.tv_sec = info.user_time.seconds; + rusage->ru_utime.tv_usec = info.user_time.microseconds; + rusage->ru_stime.tv_sec = info.system_time.seconds; + rusage->ru_stime.tv_usec = info.system_time.microseconds; + err = 0; + } else { + errno = EINVAL; + } +#elif defined(__linux__) || __EMSCRIPTEN__ + err = getrusage(RUSAGE_THREAD, rusage); +#endif + return err; +} + +/* returns the number of logical cores */ +static int num_processors() +{ +#if defined(_WIN32) +/* https://docs.microsoft.com/en-us/windows/win32/api/sysinfoapi/ns-sysinfoapi-system_info */ + SYSTEM_INFO sysinfo; + GetSystemInfo(&sysinfo); + int ncores = sysinfo.dwNumberOfProcessors; + fprintf(stderr, "Found %d cores on your Windows machine\n Is that correct?\n", ncores); + return ncores; +#elif defined(__APPLE__) + int ncores; + size_t ncores_size = sizeof(ncores); + CHECK_ERRNO(sysctlbyname("hw.logicalcpu", &ncores, &ncores_size, NULL, 0), + "sysctlbyname (hw.logicalcpu)"); + return ncores; +#elif defined(__linux__) + return get_nprocs(); +#elif __EMSCRIPTEN__ + return emscripten_num_logical_cores(); +#else + fprintf(stderr, "operating system not recognised\n"); + return -1; +#endif +} + +static unsigned int g_seed; + +// Used to seed the generator. +static inline void fast_srand(unsigned int seed) { + g_seed = seed; +} + +// Compute a pseudorandom integer. +// Output value in range [0, 32767] +static inline unsigned int fast_rand(void) { + g_seed = (214013*g_seed+2531011); + return (g_seed>>16)&0x7FFF; +} + +struct subtask_queue { + int capacity; // Size of the buffer. + int first; // Index of the start of the ring buffer. + int num_used; // Number of used elements in the buffer. + struct subtask **buffer; + + pthread_mutex_t mutex; // Mutex used for synchronisation. + pthread_cond_t cond; // Condition variable used for synchronisation. + int dead; + +#if defined(MCPROFILE) + /* Profiling fields */ + uint64_t time_enqueue; + uint64_t time_dequeue; + uint64_t n_dequeues; + uint64_t n_enqueues; +#endif +}; + +/* A subtask that can be executed by a worker */ +struct subtask { + /* The parloop function */ + parloop_fn fn; + /* Execution parameters */ + void* args; + int64_t start, end; + int id; + + /* Dynamic scheduling parameters */ + int chunkable; + int64_t chunk_size; + + /* Shared variables across subtasks */ + volatile int *counter; // Counter for ongoing subtasks + // Shared task timers and iterators + int64_t *task_time; + int64_t *task_iter; + + /* For debugging */ + const char *name; +}; + + +struct worker { + pthread_t thread; + struct scheduler *scheduler; /* Reference to the scheduler struct the worker belongs to*/ + struct subtask_queue q; + int dead; + int tid; /* Just a thread id */ + + /* "thread local" time fields used for online algorithm */ + uint64_t timer; + uint64_t total; + int nested; /* How nested the current computation is */ + + // Profiling fields + int output_usage; /* Whether to dump thread usage */ + uint64_t time_spent_working; /* Time spent in parloop functions */ +}; + +static inline void output_worker_usage(struct worker *worker) +{ + struct rusage usage; + CHECK_ERRNO(getrusage_thread(&usage), "getrusage_thread"); + struct timeval user_cpu_time = usage.ru_utime; + struct timeval sys_cpu_time = usage.ru_stime; + fprintf(stderr, "tid: %2d - work time %10llu us - user time: %10llu us - sys: %10llu us\n", + worker->tid, + (long long unsigned)worker->time_spent_working / 1000, + (long long unsigned)(user_cpu_time.tv_sec * 1000000 + user_cpu_time.tv_usec), + (long long unsigned)(sys_cpu_time.tv_sec * 1000000 + sys_cpu_time.tv_usec)); +} + +/* Doubles the size of the queue */ +static inline int subtask_queue_grow_queue(struct subtask_queue *subtask_queue) { + + int new_capacity = 2 * subtask_queue->capacity; +#ifdef MCDEBUG + fprintf(stderr, "Growing queue to %d\n", subtask_queue->capacity * 2); +#endif + + struct subtask **new_buffer = calloc(new_capacity, sizeof(struct subtask*)); + for (int i = 0; i < subtask_queue->num_used; i++) { + new_buffer[i] = subtask_queue->buffer[(subtask_queue->first + i) % subtask_queue->capacity]; + } + + free(subtask_queue->buffer); + subtask_queue->buffer = new_buffer; + subtask_queue->capacity = new_capacity; + subtask_queue->first = 0; + + return 0; +} + +// Initialise a job queue with the given capacity. The queue starts out +// empty. Returns non-zero on error. +static inline int subtask_queue_init(struct subtask_queue *subtask_queue, int capacity) +{ + assert(subtask_queue != NULL); + memset(subtask_queue, 0, sizeof(struct subtask_queue)); + + subtask_queue->capacity = capacity; + subtask_queue->buffer = calloc(capacity, sizeof(struct subtask*)); + if (subtask_queue->buffer == NULL) { + return -1; + } + + CHECK_ERRNO(pthread_mutex_init(&subtask_queue->mutex, NULL), "pthread_mutex_init"); + CHECK_ERRNO(pthread_cond_init(&subtask_queue->cond, NULL), "pthread_cond_init"); + + return 0; +} + +// Destroy the job queue. Blocks until the queue is empty before it +// is destroyed. +static inline int subtask_queue_destroy(struct subtask_queue *subtask_queue) +{ + assert(subtask_queue != NULL); + + CHECK_ERR(pthread_mutex_lock(&subtask_queue->mutex), "pthread_mutex_lock"); + + while (subtask_queue->num_used != 0) { + CHECK_ERR(pthread_cond_wait(&subtask_queue->cond, &subtask_queue->mutex), "pthread_cond_wait"); + } + + // Queue is now empty. Let's kill it! + subtask_queue->dead = 1; + free(subtask_queue->buffer); + CHECK_ERR(pthread_cond_broadcast(&subtask_queue->cond), "pthread_cond_broadcast"); + CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); + + return 0; +} + +static inline void dump_queue(struct worker *worker) +{ + struct subtask_queue *subtask_queue = &worker->q; + CHECK_ERR(pthread_mutex_lock(&subtask_queue->mutex), "pthread_mutex_lock"); + for (int i = 0; i < subtask_queue->num_used; i++) { + struct subtask * subtask = subtask_queue->buffer[(subtask_queue->first + i) % subtask_queue->capacity]; + printf("queue tid %d with %d task %s\n", worker->tid, i, subtask->name); + } + CHECK_ERR(pthread_cond_broadcast(&subtask_queue->cond), "pthread_cond_broadcast"); + CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); +} + +// Push an element onto the end of the job queue. Blocks if the +// subtask_queue is full (its size is equal to its capacity). Returns +// non-zero on error. It is an error to push a job onto a queue that +// has been destroyed. +static inline int subtask_queue_enqueue(struct worker *worker, struct subtask *subtask ) +{ + assert(worker != NULL); + struct subtask_queue *subtask_queue = &worker->q; + +#ifdef MCPROFILE + uint64_t start = get_wall_time(); +#endif + + CHECK_ERR(pthread_mutex_lock(&subtask_queue->mutex), "pthread_mutex_lock"); + // Wait until there is room in the subtask_queue. + while (subtask_queue->num_used == subtask_queue->capacity && !subtask_queue->dead) { + if (subtask_queue->num_used == subtask_queue->capacity) { + CHECK_ERR(subtask_queue_grow_queue(subtask_queue), "subtask_queue_grow_queue"); + continue; + } + CHECK_ERR(pthread_cond_wait(&subtask_queue->cond, &subtask_queue->mutex), "pthread_cond_wait"); + } + + if (subtask_queue->dead) { + CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); + return -1; + } + + // If we made it past the loop, there is room in the subtask_queue. + subtask_queue->buffer[(subtask_queue->first + subtask_queue->num_used) % subtask_queue->capacity] = subtask; + subtask_queue->num_used++; + +#ifdef MCPROFILE + uint64_t end = get_wall_time(); + subtask_queue->time_enqueue += (end - start); + subtask_queue->n_enqueues++; +#endif + // Broadcast a reader (if any) that there is now an element. + CHECK_ERR(pthread_cond_broadcast(&subtask_queue->cond), "pthread_cond_broadcast"); + CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); + + return 0; +} + + +/* Like subtask_queue_dequeue, but with two differences: + 1) the subtask is stolen from the __front__ of the queue + 2) returns immediately if there is no subtasks queued, + as we dont' want to block on another workers queue and +*/ +static inline int subtask_queue_steal(struct worker *worker, + struct subtask **subtask) +{ + struct subtask_queue *subtask_queue = &worker->q; + assert(subtask_queue != NULL); + +#ifdef MCPROFILE + uint64_t start = get_wall_time(); +#endif + CHECK_ERR(pthread_mutex_lock(&subtask_queue->mutex), "pthread_mutex_lock"); + + if (subtask_queue->num_used == 0) { + CHECK_ERR(pthread_cond_broadcast(&subtask_queue->cond), "pthread_cond_broadcast"); + CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); + return 1; + } + + if (subtask_queue->dead) { + CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); + return -1; + } + + // Tasks gets stolen from the "front" + struct subtask *cur_back = subtask_queue->buffer[subtask_queue->first]; + struct subtask *new_subtask = NULL; + int remaining_iter = cur_back->end - cur_back->start; + // If subtask is chunkable, we steal half of the iterations + if (cur_back->chunkable && remaining_iter > 1) { + int64_t half = remaining_iter / 2; + new_subtask = malloc(sizeof(struct subtask)); + *new_subtask = *cur_back; + new_subtask->start = cur_back->end - half; + cur_back->end = new_subtask->start; + __atomic_fetch_add(cur_back->counter, 1, __ATOMIC_RELAXED); + } else { + new_subtask = cur_back; + subtask_queue->num_used--; + subtask_queue->first = (subtask_queue->first + 1) % subtask_queue->capacity; + } + *subtask = new_subtask; + + if (*subtask == NULL) { + CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthred_mutex_unlock"); + return 1; + } + +#ifdef MCPROFILE + uint64_t end = get_wall_time(); + subtask_queue->time_dequeue += (end - start); + subtask_queue->n_dequeues++; +#endif + + // Broadcast a writer (if any) that there is now room for more. + CHECK_ERR(pthread_cond_broadcast(&subtask_queue->cond), "pthread_cond_broadcast"); + CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); + + return 0; +} + + +// Pop an element from the back of the job queue. +// Optional argument can be provided to block or not +static inline int subtask_queue_dequeue(struct worker *worker, + struct subtask **subtask, int blocking) +{ + assert(worker != NULL); + struct subtask_queue *subtask_queue = &worker->q; + +#ifdef MCPROFILE + uint64_t start = get_wall_time(); +#endif + + CHECK_ERR(pthread_mutex_lock(&subtask_queue->mutex), "pthread_mutex_lock"); + if (subtask_queue->num_used == 0 && !blocking) { + CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); + return 1; + } + // Try to steal some work while the subtask_queue is empty + while (subtask_queue->num_used == 0 && !subtask_queue->dead) { + pthread_cond_wait(&subtask_queue->cond, &subtask_queue->mutex); + } + + if (subtask_queue->dead) { + CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); + return -1; + } + + // dequeue pops from the back + *subtask = subtask_queue->buffer[(subtask_queue->first + subtask_queue->num_used - 1) % subtask_queue->capacity]; + subtask_queue->num_used--; + + if (*subtask == NULL) { + assert(!"got NULL ptr"); + CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthred_mutex_unlock"); + return -1; + } + +#ifdef MCPROFILE + uint64_t end = get_wall_time(); + subtask_queue->time_dequeue += (end - start); + subtask_queue->n_dequeues++; +#endif + + // Broadcast a writer (if any) that there is now room for more. + CHECK_ERR(pthread_cond_broadcast(&subtask_queue->cond), "pthread_cond_broadcast"); + CHECK_ERR(pthread_mutex_unlock(&subtask_queue->mutex), "pthread_mutex_unlock"); + + return 0; +} + +static inline int subtask_queue_is_empty(struct subtask_queue *subtask_queue) +{ + return subtask_queue->num_used == 0; +} + +/* Scheduler definitions */ + +struct scheduler { + struct worker *workers; + int num_threads; + int minimum_chunk_size; + + + // If there is work to steal => active_work > 0 + volatile int active_work; + + // Only one error can be returned at the time now. Maybe we can + // provide a stack like structure for pushing errors onto if we wish + // to backpropagte multiple errors + volatile int error; + + // kappa time unit in nanoseconds + double kappa; +}; + + +// Thread local variable worker struct +// Note that, accesses to tls variables are expensive +// Minimize direct references to this variable +__thread struct worker* worker_local = NULL; + +static int64_t total_now(int64_t total, int64_t time) { + return total + (get_wall_time_ns() - time); +} + +static int random_other_worker(struct scheduler *scheduler, int my_id) { + int my_num_workers = scheduler->num_threads; + assert(my_num_workers != 1); + int i = fast_rand() % (my_num_workers - 1); + if (i >= my_id) { + i++; + } +#ifdef MCDEBUG + assert(i >= 0); + assert(i < my_num_workers); + assert(i != my_id); +#endif + + return i; +} + +static inline int64_t compute_chunk_size(int64_t minimum_chunk_size, double kappa, struct subtask* subtask) +{ + double C = (double)*subtask->task_time / (double)*subtask->task_iter; + if (C == 0.0F) C += DBL_EPSILON; + return smax64((int64_t)(kappa / C), minimum_chunk_size); +} + +/* Takes a chunk from subtask and enqueues the remaining iterations onto the worker's queue */ +/* A no-op if the subtask is not chunkable */ +static inline struct subtask* chunk_subtask(struct worker* worker, struct subtask *subtask) +{ + if (subtask->chunkable) { + // Do we have information from previous runs avaliable + if (*subtask->task_iter > 0) { + subtask->chunk_size = compute_chunk_size(worker->scheduler->minimum_chunk_size, + worker->scheduler->kappa, + subtask); + assert(subtask->chunk_size > 0); + } + int64_t remaining_iter = subtask->end - subtask->start; + assert(remaining_iter > 0); + if (remaining_iter > subtask->chunk_size) { + struct subtask *new_subtask = malloc(sizeof(struct subtask)); + *new_subtask = *subtask; + // increment the subtask join counter to account for new subtask + __atomic_fetch_add(subtask->counter, 1, __ATOMIC_RELAXED); + // Update range parameters + subtask->end = subtask->start + subtask->chunk_size; + new_subtask->start = subtask->end; + subtask_queue_enqueue(worker, new_subtask); + } + } + return subtask; +} + +static inline int run_subtask(struct worker* worker, struct subtask* subtask) +{ + assert(subtask != NULL); + assert(worker != NULL); + + subtask = chunk_subtask(worker, subtask); + worker->total = 0; + worker->timer = get_wall_time_ns(); +#if defined(MCPROFILE) + int64_t start = worker->timer; +#endif + worker->nested++; + int err = subtask->fn(subtask->args, subtask->start, subtask->end, + subtask->id, + worker->tid); + worker->nested--; + // Some error occured during some other subtask + // so we just clean-up and return + if (worker->scheduler->error != 0) { + // Even a failed task counts as finished. + __atomic_fetch_sub(subtask->counter, 1, __ATOMIC_RELAXED); + free(subtask); + return 0; + } + if (err != 0) { + __atomic_store_n(&worker->scheduler->error, err, __ATOMIC_RELAXED); + } + // Total sequential time spent + int64_t time_elapsed = total_now(worker->total, worker->timer); +#if defined(MCPROFILE) + worker->time_spent_working += get_wall_time_ns() - start; +#endif + int64_t iter = subtask->end - subtask->start; + // report measurements + // These updates should really be done using a single atomic CAS operation + __atomic_fetch_add(subtask->task_time, time_elapsed, __ATOMIC_RELAXED); + __atomic_fetch_add(subtask->task_iter, iter, __ATOMIC_RELAXED); + // We need a fence here, since if the counter is decremented before either + // of the two above are updated bad things can happen, e.g. if they are stack-allocated + __atomic_thread_fence(__ATOMIC_SEQ_CST); + __atomic_fetch_sub(subtask->counter, 1, __ATOMIC_RELAXED); + free(subtask); + return 0; +} + + +static inline int is_small(struct scheduler_segop *task, struct scheduler *scheduler, int *nsubtasks) +{ + int64_t time = *task->task_time; + int64_t iter = *task->task_iter; + + if (task->sched == DYNAMIC || iter == 0) { + *nsubtasks = scheduler->num_threads; + return 0; + } + + // Estimate the constant C + double C = (double)time / (double)iter; + double cur_task_iter = (double) task->iterations; + + // Returns true if the task is small i.e. + // if the number of iterations times C is smaller + // than the overhead of subtask creation + if (C == 0.0F || C * cur_task_iter < scheduler->kappa) { + *nsubtasks = 1; + return 1; + } + + // Else compute how many subtasks this tasks should create + int64_t min_iter_pr_subtask = smax64(scheduler->kappa / C, 1); + *nsubtasks = smin64(smax64(task->iterations / min_iter_pr_subtask, 1), scheduler->num_threads); + + return 0; +} + +// TODO make this prettier +static inline struct subtask* create_subtask(parloop_fn fn, + void* args, + const char* name, + volatile int* counter, + int64_t *timer, + int64_t *iter, + int64_t start, int64_t end, + int chunkable, + int64_t chunk_size, + int id) +{ + struct subtask* subtask = malloc(sizeof(struct subtask)); + if (subtask == NULL) { + assert(!"malloc failed in create_subtask"); + return NULL; + } + subtask->fn = fn; + subtask->args = args; + + subtask->counter = counter; + subtask->task_time = timer; + subtask->task_iter = iter; + + subtask->start = start; + subtask->end = end; + subtask->id = id; + subtask->chunkable = chunkable; + subtask->chunk_size = chunk_size; + + subtask->name = name; + return subtask; +} + +static int dummy_counter = 0; +static int64_t dummy_timer = 0; +static int64_t dummy_iter = 0; + +static int dummy_fn(void *args, int64_t start, int64_t end, int subtask_id, int tid) { + (void)args; + (void)start; + (void)end; + (void)subtask_id; + (void)tid; + return 0; +} + +// Wake up threads, who are blocking by pushing a dummy task +// onto their queue +static inline void wake_up_threads(struct scheduler *scheduler, int start_tid, int end_tid) { + +#if defined(MCDEBUG) + assert(start_tid >= 1); + assert(end_tid <= scheduler->num_threads); +#endif + for (int i = start_tid; i < end_tid; i++) { + struct subtask *subtask = create_subtask(dummy_fn, NULL, "dummy_fn", + &dummy_counter, + &dummy_timer, &dummy_iter, + 0, 0, + 0, 0, + 0); + CHECK_ERR(subtask_queue_enqueue(&scheduler->workers[i], subtask), "subtask_queue_enqueue"); + } +} + +static inline int is_finished(struct worker *worker) { + return worker->dead && subtask_queue_is_empty(&worker->q); +} + +// Try to steal from a random queue +static inline int steal_from_random_worker(struct worker* worker) +{ + int my_id = worker->tid; + struct scheduler* scheduler = worker->scheduler; + int k = random_other_worker(scheduler, my_id); + struct worker *worker_k = &scheduler->workers[k]; + struct subtask* subtask = NULL; + int retval = subtask_queue_steal(worker_k, &subtask); + if (retval == 0) { + subtask_queue_enqueue(worker, subtask); + return 1; + } + return 0; +} + + +static inline void *scheduler_worker(void* args) +{ + struct worker *worker = (struct worker*) args; + struct scheduler *scheduler = worker->scheduler; + worker_local = worker; + struct subtask *subtask = NULL; + + while(!is_finished(worker)) { + if (!subtask_queue_is_empty(&worker->q)) { + int retval = subtask_queue_dequeue(worker, &subtask, 0); + if (retval == 0) { + assert(subtask != NULL); + CHECK_ERR(run_subtask(worker, subtask), "run_subtask"); + } // else someone stole our work + + } else if (scheduler->active_work) { /* steal */ + while (!is_finished(worker) && scheduler->active_work) { + if (steal_from_random_worker(worker)) { + break; + } + } + } else { /* go back to sleep and wait for work */ + int retval = subtask_queue_dequeue(worker, &subtask, 1); + if (retval == 0) { + assert(subtask != NULL); + CHECK_ERR(run_subtask(worker, subtask), "run_subtask"); + } + } + } + + assert(subtask_queue_is_empty(&worker->q)); +#if defined(MCPROFILE) + if (worker->output_usage) + output_worker_usage(worker); +#endif + return NULL; +} + + +static inline int scheduler_execute_parloop(struct scheduler *scheduler, + struct scheduler_parloop *task, + int64_t *timer) +{ + + struct worker *worker = worker_local; + + struct scheduler_info info = task->info; + int64_t iter_pr_subtask = info.iter_pr_subtask; + int64_t remainder = info.remainder; + int nsubtasks = info.nsubtasks; + volatile int join_counter = nsubtasks; + + // Shared timer used to sum up all + // sequential work from each subtask + int64_t task_timer = 0; + int64_t task_iter = 0; + + enum scheduling sched = info.sched; + /* If each subtasks should be processed in chunks */ + int chunkable = sched == STATIC ? 0 : 1; + int64_t chunk_size = scheduler->minimum_chunk_size; // The initial chunk size when no info is avaliable + + + if (info.wake_up_threads || sched == DYNAMIC) + __atomic_add_fetch(&scheduler->active_work, nsubtasks, __ATOMIC_RELAXED); + + int64_t start = 0; + int64_t end = iter_pr_subtask + (int64_t)(remainder != 0); + for (int subtask_id = 0; subtask_id < nsubtasks; subtask_id++) { + struct subtask *subtask = create_subtask(task->fn, task->args, task->name, + &join_counter, + &task_timer, &task_iter, + start, end, + chunkable, chunk_size, + subtask_id); + assert(subtask != NULL); + // In most cases we will never have more subtasks than workers, + // but there can be exceptions (e.g. the kappa tuning function). + struct worker *subtask_worker = + worker->nested + ? &scheduler->workers[worker->tid] + : &scheduler->workers[subtask_id % scheduler->num_threads]; + CHECK_ERR(subtask_queue_enqueue(subtask_worker, subtask), + "subtask_queue_enqueue"); + // Update range params + start = end; + end += iter_pr_subtask + ((subtask_id + 1) < remainder); + } + + if (info.wake_up_threads) { + wake_up_threads(scheduler, nsubtasks, scheduler->num_threads); + } + + // Join (wait for subtasks to finish) + while(join_counter != 0) { + if (!subtask_queue_is_empty(&worker->q)) { + struct subtask *subtask = NULL; + int err = subtask_queue_dequeue(worker, &subtask, 0); + if (err == 0 ) { + CHECK_ERR(run_subtask(worker, subtask), "run_subtask"); + } + } else { + if (steal_from_random_worker(worker)) { + struct subtask *subtask = NULL; + int err = subtask_queue_dequeue(worker, &subtask, 0); + if (err == 0) { + CHECK_ERR(run_subtask(worker, subtask), "run_subtask"); + } + } + } + } + + + if (info.wake_up_threads || sched == DYNAMIC) { + __atomic_sub_fetch(&scheduler->active_work, nsubtasks, __ATOMIC_RELAXED); + } + + // Write back timing results of all sequential work + (*timer) += task_timer; + return scheduler->error; +} + + +static inline int scheduler_execute_task(struct scheduler *scheduler, + struct scheduler_parloop *task) +{ + + struct worker *worker = worker_local; + + int err = 0; + + // How much sequential work was performed by the task + int64_t task_timer = 0; + + /* Execute task sequential or parallel based on decision made earlier */ + if (task->info.nsubtasks == 1) { + int64_t start = get_wall_time_ns(); + err = task->fn(task->args, 0, task->iterations, 0, worker->tid); + int64_t end = get_wall_time_ns(); + task_timer = end - start; + worker->time_spent_working += task_timer; + // Report time measurements + // TODO the update of both of these should really be a single atomic!! + __atomic_fetch_add(task->info.task_time, task_timer, __ATOMIC_RELAXED); + __atomic_fetch_add(task->info.task_iter, task->iterations, __ATOMIC_RELAXED); + } else { + // Add "before" time if we already are inside a task + int64_t time_before = 0; + if (worker->nested > 0) { + time_before = total_now(worker->total, worker->timer); + } + + err = scheduler_execute_parloop(scheduler, task, &task_timer); + + // Report time measurements + // TODO the update of both of these should really be a single atomic!! + __atomic_fetch_add(task->info.task_time, task_timer, __ATOMIC_RELAXED); + __atomic_fetch_add(task->info.task_iter, task->iterations, __ATOMIC_RELAXED); + + // Update timers to account for new timings + worker->total = time_before + task_timer; + worker->timer = get_wall_time_ns(); + } + + + return err; +} + +/* Decide on how schedule the incoming task i.e. how many subtasks and + to run sequential or (potentially nested) parallel code body */ +static inline int scheduler_prepare_task(struct scheduler* scheduler, + struct scheduler_segop *task) +{ + assert(task != NULL); + + struct worker *worker = worker_local; + struct scheduler_info info; + info.task_time = task->task_time; + info.task_iter = task->task_iter; + + int nsubtasks; + // Decide if task should be scheduled sequentially + if (is_small(task, scheduler, &nsubtasks)) { + info.iter_pr_subtask = task->iterations; + info.remainder = 0; + info.nsubtasks = nsubtasks; + return task->top_level_fn(task->args, task->iterations, worker->tid, info); + } else { + info.iter_pr_subtask = task->iterations / nsubtasks; + info.remainder = task->iterations % nsubtasks; + info.sched = task->sched; + switch (task->sched) { + case STATIC: + info.nsubtasks = info.iter_pr_subtask == 0 ? info.remainder : ((task->iterations - info.remainder) / info.iter_pr_subtask); + break; + case DYNAMIC: + // As any thread can take any subtasks, we are being safe with using + // an upper bound on the number of tasks such that the task allocate enough memory + info.nsubtasks = info.iter_pr_subtask == 0 ? info.remainder : nsubtasks; + break; + default: + assert(!"Got unknown scheduling"); + } + } + + info.wake_up_threads = 0; + // We only use the nested parallel segop function if we can't exchaust all cores + // using the outer most level + if (task->nested_fn != NULL && info.nsubtasks < scheduler->num_threads && info.nsubtasks == task->iterations) { + if (worker->nested == 0) + info.wake_up_threads = 1; + return task->nested_fn(task->args, task->iterations, worker->tid, info); + } + + return task->top_level_fn(task->args, task->iterations, worker->tid, info); +} + +// Now some code for finding the proper value of kappa on a given +// machine (the smallest amount of work that amortises the cost of +// task creation). + +struct tuning_struct { + int32_t *free_tuning_res; + int32_t *array; +}; + +// Reduction function over an integer array +static int tuning_loop(void *args, int64_t start, int64_t end, + int flat_tid, int tid) { + (void)flat_tid; + (void)tid; + + int err = 0; + struct tuning_struct *tuning_struct = (struct tuning_struct *) args; + int32_t *array = tuning_struct->array; + int32_t *tuning_res = tuning_struct->free_tuning_res; + + int32_t sum = 0; + for (int i = start; i < end; i++) { + int32_t y = array[i]; + sum = add32(sum, y); + } + *tuning_res = sum; + return err; +} + +// The main entry point for the tuning process. Sets the provided +// variable ``kappa``. +static int determine_kappa(double *kappa) { + int err = 0; + + int64_t iterations = 100000000; + int64_t tuning_time = 0; + int64_t tuning_iter = 0; + + int32_t *array = malloc(sizeof(int32_t) * iterations); + for (int64_t i = 0; i < iterations; i++) { + array[i] = fast_rand(); + } + + int64_t start_tuning = get_wall_time_ns(); + /* **************************** */ + /* Run sequential reduce first' */ + /* ** |