1#define EXTENDED_DEBUG 0
5extern "C" int syscall(
int);
18#define log_message(stuff) do { print(nullptr) << gettid() << ": " << stuff << "\n"; } while (0)
24#define log_message(stuff) do { } while (0)
128 }
else if (threads < 1) {
201 const char *bytes = ((
const char *)&this->zero_marker);
203 while (bytes <
limit && *bytes == 0) {
213 char *bytes = ((
char *)&this->zero_marker);
227 const char *name =
job->task.name ?
job->task.name :
"<no name>";
228 const char *
parent_name =
job->parent_job ? (
job->parent_job->task.name ?
job->parent_job->task.name :
"<no name>") :
"<no parent job>";
229 log_message(
prefix << name <<
"[" <<
job <<
"] serial: " <<
job->task.serial <<
" active_workers: " <<
job->active_workers <<
" min: " <<
job->task.
min <<
" extent: " <<
job->task.extent <<
" siblings: " <<
job->siblings <<
" sibling count: " <<
job->sibling_count <<
" min_threads " <<
job->task.min_threads <<
" next_sempaphore: " <<
job->next_semaphore <<
" threads_reserved: " <<
job->threads_reserved <<
" parent_job: " <<
parent_name <<
"[" <<
job->parent_job <<
"]");
230 for (
int i = 0;
i <
job->task.num_semaphores;
i++) {
231 log_message(indent <<
" semaphore " << (
void *)
job->task.semaphores[
i].semaphore <<
" count " <<
job->task.semaphores[
i].count <<
" val " << *(
int *)
job->task.semaphores[
i].semaphore);
238 while (
job !=
nullptr) {
248#define print_job(job, indent, prefix) do { } while (0)
249#define dump_job_state() do { } while (0)
265 prev_ptr = &
job->next_job;
268 *prev_ptr =
job->next_job;
269 job->task.extent = 0;
293 work *parent_job =
job->parent_job;
296 if (parent_job ==
nullptr) {
313 log_message(
"Cannot run job " <<
job->task.name <<
" on this thread.");
321 if (
job->make_runnable()) {
327 prev_ptr = &(
job->next_job);
359 job->active_workers++;
361 if (
job->parent_job ==
nullptr) {
365 job->parent_job->threads_reserved +=
job->task.min_threads;
366 log_message(
"Reserved " <<
job->task.min_threads <<
" on " <<
job->parent_job->task.name <<
" for " <<
job->task.name <<
" giving " <<
job->parent_job->threads_reserved <<
" of " <<
job->parent_job->task.min_threads);
371 if (
job->task.serial) {
373 *prev_ptr =
job->next_job;
382 job->make_runnable()) {
403 job->task.extent = 0;
404 }
else if (
job->task.extent > 0) {
416 if (
job->task.extent == 0) {
417 *prev_ptr =
job->next_job;
434 log_message(
"Saw thread pool saw error from task: " << (
int)result);
437 bool wake_owners =
false;
441 job->exit_status = result;
443 for (
int i = 0;
i <
job->sibling_count;
i++) {
446 job->siblings[
i].exit_status = result;
447 wake_owners |= (
job->active_workers == 0 &&
job->siblings[
i].owner_is_sleeping);
453 if (
job->parent_job ==
nullptr) {
457 job->parent_job->threads_reserved -=
job->task.min_threads;
458 log_message(
"Returned " <<
job->task.min_threads <<
" to " <<
job->parent_job->task.name <<
" for " <<
job->task.name <<
" giving " <<
job->parent_job->threads_reserved <<
" of " <<
job->parent_job->task.min_threads);
462 job->active_workers--;
511 if (jobs[
i].task.min_threads == 0) {
517 if (jobs[
i].task.num_semaphores != 0) {
521 if (jobs[
i].task.serial) {
539 log_message(
"enqueue_work_already_locked adding one to min_threads.");
558 log_message(
"enqueue_work_already_locked job " << jobs[0].task.name <<
" with min_threads " << min_threads <<
" task_parent " <<
task_parent->task.name <<
" task_parent->task.min_threads " <<
task_parent->task.min_threads <<
" task_parent->threads_reserved " <<
task_parent->threads_reserved);
561 "Logic error: thread over commit.\n");
625WEAK __attribute__((destructor))
void halide_thread_pool_cleanup() {
636 int min,
int extent,
uint8_t *closure,
638 return f(
user_context, min, extent, closure, task_parent);
642 int min,
int size,
uint8_t *closure) {
676 work *jobs = (
work *)__builtin_alloca(
sizeof(
work) * num_tasks);
678 for (
int i = 0; i < num_tasks; i++) {
684 jobs[i].
task = *tasks++;
694 if (num_tasks == 0) {
701 for (
int i = 0; i < num_tasks; i++) {
715 halide_error(
nullptr,
"halide_set_num_threads: must be >= 0.");
765 Halide::Runtime::Internal::Synchronization::atomic_store_release(&sem->
value, &n);
771 int old_val = Halide::Runtime::Internal::Synchronization::atomic_fetch_add_acquire_release(&sem->
value, n);
773 if (old_val == 0 && n != 0) {
791 Halide::Runtime::Internal::Synchronization::atomic_load_acquire(&sem->
value, &expected);
793 desired = expected - n;
794 }
while (desired >= 0 &&
795 !Halide::Runtime::Internal::Synchronization::atomic_cas_weak_relacq_relaxed(&sem->
value, &expected, &desired));
841 int min,
int size,
uint8_t *closure) {
846 int min,
int size,
uint8_t *closure,
void *task_parent) {
int halide_default_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
int(* halide_semaphore_release_t)(struct halide_semaphore_t *, int)
int halide_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
bool halide_default_semaphore_try_acquire(struct halide_semaphore_t *, int n)
void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex)
int(* halide_do_par_for_t)(void *, halide_task_t, int, int, uint8_t *)
Set a custom method for performing a parallel for loop.
int halide_default_do_par_for(void *user_context, halide_task_t task, int min, int size, uint8_t *closure)
The default versions of the parallel runtime functions.
int halide_default_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
int(* halide_task_t)(void *user_context, int task_number, uint8_t *closure)
Define halide_do_par_for to replace the default thread pool implementation.
void halide_mutex_lock(struct halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...
int halide_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
int halide_default_semaphore_init(struct halide_semaphore_t *, int n)
void halide_mutex_unlock(struct halide_mutex *mutex)
struct halide_thread * halide_spawn_thread(void(*f)(void *), void *closure)
Spawn a thread.
int(* halide_do_loop_task_t)(void *, halide_loop_task_t, int, int, uint8_t *, void *)
The version of do_task called for loop tasks.
bool(* halide_semaphore_try_acquire_t)(struct halide_semaphore_t *, int)
int(* halide_loop_task_t)(void *user_context, int min, int extent, uint8_t *closure, void *task_parent)
A task representing a serial for loop evaluated over some range.
int halide_default_semaphore_release(struct halide_semaphore_t *, int n)
void halide_join_thread(struct halide_thread *)
Join a thread.
@ halide_error_code_success
There was no error.
void halide_cond_broadcast(struct halide_cond *cond)
int(* halide_do_task_t)(void *, halide_task_t, int, uint8_t *)
If you use the default do_par_for, you can still set a custom handler to perform each individual task...
int halide_default_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
int(* halide_semaphore_init_t)(struct halide_semaphore_t *, int)
void halide_error(void *user_context, const char *)
Halide calls this function on runtime errors (for example bounds checking failures).
int(* halide_do_parallel_tasks_t)(void *, int, struct halide_parallel_task_t *, void *task_parent)
Provide an entire custom tasking runtime via function pointers.
WEAK halide_semaphore_release_t custom_semaphore_release
WEAK halide_semaphore_init_t custom_semaphore_init
WEAK int default_desired_num_threads()
WEAK halide_do_task_t custom_do_task
WEAK halide_do_par_for_t custom_do_par_for
ALWAYS_INLINE int clamp_num_threads(int threads)
WEAK void worker_thread(void *)
WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_parent)
WEAK halide_do_parallel_tasks_t custom_do_parallel_tasks
WEAK void worker_thread_already_locked(work *owned_job)
WEAK halide_do_loop_task_t custom_do_loop_task
WEAK work_queue_t work_queue
WEAK halide_semaphore_try_acquire_t custom_semaphore_try_acquire
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
Internal::ConstantInterval cast(Type t, const Internal::ConstantInterval &a)
Cast operators for ConstantIntervals.
WEAK int halide_host_cpu_count()
__UINTPTR_TYPE__ uintptr_t
unsigned __INT8_TYPE__ uint8_t
void halide_thread_yield()
void * memset(void *s, int val, size_t n)
#define halide_abort_if_false(user_context, cond)
char * getenv(const char *)
int64_t min
The lower and upper bound of the interval.
void wait(halide_mutex *mutex)
int desired_threads_working
ALWAYS_INLINE bool running() const
halide_cond_with_spinning wake_b_team
halide_cond_with_spinning wake_owners
halide_thread * threads[MAX_THREADS]
ALWAYS_INLINE void reset()
halide_cond_with_spinning wake_a_team
ALWAYS_INLINE void assert_zeroed() const
ALWAYS_INLINE bool running() const
ALWAYS_INLINE bool make_runnable()
halide_parallel_task_t task
Cross platform condition variable.
A parallel task to be passed to halide_do_parallel_tasks.
struct halide_semaphore_acquire_t * semaphores
struct halide_semaphore_t * semaphore
An opaque struct representing a semaphore.
WEAK void halide_set_custom_parallel_runtime(halide_do_par_for_t do_par_for, halide_do_task_t do_task, halide_do_loop_task_t do_loop_task, halide_do_parallel_tasks_t do_parallel_tasks, halide_semaphore_init_t semaphore_init, halide_semaphore_try_acquire_t semaphore_try_acquire, halide_semaphore_release_t semaphore_release)
WEAK int halide_default_semaphore_release(halide_semaphore_t *s, int n)
WEAK halide_do_task_t halide_set_custom_do_task(halide_do_task_t f)
WEAK bool halide_default_semaphore_try_acquire(halide_semaphore_t *s, int n)
WEAK bool halide_semaphore_try_acquire(struct halide_semaphore_t *sema, int count)
WEAK int halide_default_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
WEAK int halide_get_num_threads()
Get or set the number of threads used by Halide's thread pool.
WEAK halide_do_loop_task_t halide_set_custom_do_loop_task(halide_do_loop_task_t f)
WEAK int halide_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
WEAK int halide_semaphore_init(struct halide_semaphore_t *sema, int count)
WEAK int halide_default_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
#define log_message(stuff)
WEAK halide_do_par_for_t halide_set_custom_do_par_for(halide_do_par_for_t f)
#define print_job(job, indent, prefix)
WEAK int halide_do_loop_task(void *user_context, halide_loop_task_t f, int min, int size, uint8_t *closure, void *task_parent)
WEAK int halide_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
Enqueue some number of the tasks described above and wait for them to complete.
WEAK void halide_shutdown_thread_pool()
WEAK int halide_default_semaphore_init(halide_semaphore_t *s, int n)
WEAK int halide_default_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
The default versions of the parallel runtime functions.
WEAK int halide_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
WEAK int halide_default_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
WEAK int halide_set_num_threads(int n)
WEAK int halide_semaphore_release(struct halide_semaphore_t *sema, int count)