Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/tbb/arena.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
Copyright (c) 2005-2025 Intel Corporation
Copyright (c) 2025 UXL Foundation Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -274,6 +275,7 @@ arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserv
my_slots[i].init_task_streams(i);
my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this);
my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed);
my_slots[i].has_skipped_tasks.store(false, std::memory_order_relaxed);
}
my_fifo_task_stream.initialize(my_num_slots);
my_resume_task_stream.initialize(my_num_slots);
Expand Down Expand Up @@ -372,7 +374,7 @@ bool arena::has_tasks() {
std::size_t n = my_limit.load(std::memory_order_acquire);
bool tasks_are_available = false;
for (std::size_t k = 0; k < n && !tasks_are_available; ++k) {
tasks_are_available = !my_slots[k].is_empty();
tasks_are_available = my_slots[k].has_tasks();
}
tasks_are_available = tasks_are_available || has_enqueued_tasks() || !my_resume_task_stream.empty();
#if __TBB_CRITICAL_TASKS
Expand Down
107 changes: 46 additions & 61 deletions src/tbb/arena_slot.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
Copyright (c) 2005-2021 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation
Copyright (c) 2025 UXL Foundation Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -25,21 +26,22 @@ namespace r1 {
//------------------------------------------------------------------------
// Arena Slot
//------------------------------------------------------------------------
d1::task* arena_slot::get_task_impl(size_t T, execution_data_ext& ed, bool& tasks_omitted, isolation_type isolation) {
d1::task* arena_slot::get_task_impl(size_t T, execution_data_ext& ed, bool& tasks_skipped, isolation_type isolation) {
__TBB_ASSERT(tail.load(std::memory_order_relaxed) <= T || is_local_task_pool_quiescent(),
"Is it safe to get a task at position T?");

d1::task* result = task_pool_ptr[T];
__TBB_ASSERT(!is_poisoned( result ), "The poisoned task is going to be processed");
__TBB_ASSERT(!is_poisoned( result ), "A poisoned task is going to be processed");

if (!result) {
return nullptr;
}
bool omit = isolation != no_isolation && isolation != task_accessor::isolation(*result);
if (!omit && !task_accessor::is_proxy_task(*result)) {
bool skip = isolation != no_isolation && isolation != task_accessor::isolation(*result);
if (!skip && !task_accessor::is_proxy_task(*result)) {
return result;
} else if (omit) {
tasks_omitted = true;
} else if (skip) {
has_skipped_tasks.store(true, std::memory_order_relaxed);
tasks_skipped = true;
return nullptr;
}

Expand All @@ -52,7 +54,7 @@ d1::task* arena_slot::get_task_impl(size_t T, execution_data_ext& ed, bool& task
// Proxy was empty, so it's our responsibility to free it
tp.allocator.delete_object(&tp, ed);

if ( tasks_omitted ) {
if ( tasks_skipped ) {
task_pool_ptr[T] = nullptr;
}
return nullptr;
Expand All @@ -65,8 +67,8 @@ d1::task* arena_slot::get_task(execution_data_ext& ed, isolation_type isolation)
// The bounds of available tasks in the task pool. H0 is only used when the head bound is reached.
std::size_t H0 = (std::size_t)-1, T = T0;
d1::task* result = nullptr;
bool task_pool_empty = false;
bool tasks_omitted = false;
bool all_tasks_checked = false;
bool tasks_skipped = false;
do {
__TBB_ASSERT( !result, nullptr );
// The full fence is required to sync the store of `tail` with the load of `head` (write-read barrier)
Expand All @@ -81,67 +83,54 @@ d1::task* arena_slot::get_task(execution_data_ext& ed, isolation_type isolation)
__TBB_ASSERT( H0 == head.load(std::memory_order_relaxed)
&& T == tail.load(std::memory_order_relaxed)
&& H0 == T + 1, "victim/thief arbitration algorithm failure" );
reset_task_pool_and_leave();
(tasks_skipped) ? release_task_pool() : reset_task_pool_and_leave();
// No tasks in the task pool.
task_pool_empty = true;
all_tasks_checked = true;
break;
} else if ( H0 == T ) {
// There is only one task in the task pool.
reset_task_pool_and_leave();
task_pool_empty = true;
// There is only one task in the task pool. If it can be taken, we want to reset the pool
if ( isolation != no_isolation && task_pool_ptr[T] &&
isolation != task_accessor::isolation(*task_pool_ptr[T]) ) {
// The task will be skipped due to isolation mismatch
has_skipped_tasks.store(true, std::memory_order_relaxed);
tasks_skipped = true;
}
(tasks_skipped) ? release_task_pool() : reset_task_pool_and_leave();
all_tasks_checked = true;
} else {
// Release task pool if there are still some tasks.
// After the release, the tail will be less than T, thus a thief
// will not attempt to get a task at position T.
release_task_pool();
}
}
result = get_task_impl( T, ed, tasks_omitted, isolation );
result = get_task_impl( T, ed, tasks_skipped, isolation );
if ( result ) {
poison_pointer( task_pool_ptr[T] );
// If some tasks were skipped, we need to make a hole in position T.
if ( tasks_skipped ) {
// If all tasks have been checked, the taken task must be at the H0 position
__TBB_ASSERT( !all_tasks_checked || H0 == T, nullptr );
task_pool_ptr[T] = nullptr;
} else {
poison_pointer( task_pool_ptr[T] );
}
break;
} else if ( !tasks_omitted ) {
} else if ( !tasks_skipped ) {
poison_pointer( task_pool_ptr[T] );
__TBB_ASSERT( T0 == T+1, nullptr );
T0 = T;
}
} while ( !result && !task_pool_empty );

if ( tasks_omitted ) {
if ( task_pool_empty ) {
// All tasks have been checked. The task pool should be in reset state.
// We just restore the bounds for the available tasks.
// TODO: Does it have sense to move them to the beginning of the task pool?
__TBB_ASSERT( is_quiescent_local_task_pool_reset(), nullptr );
if ( result ) {
// If we have a task, it should be at H0 position.
__TBB_ASSERT( H0 == T, nullptr );
++H0;
}
__TBB_ASSERT( H0 <= T0, nullptr );
if ( H0 < T0 ) {
// Restore the task pool if there are some tasks.
head.store(H0, std::memory_order_relaxed);
tail.store(T0, std::memory_order_relaxed);
// The release fence is used in publish_task_pool.
publish_task_pool();
// Synchronize with snapshot as we published some tasks.
ed.task_disp->m_thread_data->my_arena->advertise_new_work<arena::wakeup>();
}
} else {
// A task has been obtained. We need to make a hole in position T.
__TBB_ASSERT( is_task_pool_published(), nullptr );
__TBB_ASSERT( result, nullptr );
task_pool_ptr[T] = nullptr;
tail.store(T0, std::memory_order_release);
// Synchronize with snapshot as we published some tasks.
// TODO: consider some approach not to call wakeup for each time. E.g. check if the tail reached the head.
ed.task_disp->m_thread_data->my_arena->advertise_new_work<arena::wakeup>();
}
} while ( !result && !all_tasks_checked );

if ( tasks_skipped ) {
__TBB_ASSERT( is_task_pool_published(), nullptr ); // the pool was not reset
tail.store(T0, std::memory_order_release);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that we do not need to restore head here since it is a stealing thread responsibility and is done in the steal_task?

Copy link
Contributor Author

@akukanov akukanov Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct.

Generally, H0 represents the state of the pool head as it was seen by the owner; it might get outdated at any time. The core principle therefore is that the owner only works with the tail and does not change the head.

Indeed, if there was no conflict for the last task, the owner has no idea what the proper value for the head should be. And in case of a conflict the pool lock is taken and the head is re-read, and we can be sure that there is no skipped tasks beyond the head, so there is no need to change anything.

Prior to the patch, there is a store of H0 to the head - but it is done at the point where the pool is temporarily quiescent, and therefore it is safe. It "optimizes" the case when the task at the head is taken while others were skipped. In the patch, the pool is not reset if tasks were skipped, as that would also mislead observers. So this optimization cannot be safely performed anymore.

}
// At this point, skipped tasks - if any - are back in the pool bounds
has_skipped_tasks.store(false, std::memory_order_relaxed);

__TBB_ASSERT( (std::intptr_t)tail.load(std::memory_order_relaxed) >= 0, nullptr );
__TBB_ASSERT( result || tasks_omitted || is_quiescent_local_task_pool_reset(), nullptr );
__TBB_ASSERT( result || tasks_skipped || is_quiescent_local_task_pool_reset(), nullptr );
return result;
}

Expand All @@ -153,7 +142,7 @@ d1::task* arena_slot::steal_task(arena& a, isolation_type isolation, std::size_t
d1::task* result = nullptr;
std::size_t H = head.load(std::memory_order_relaxed); // mirror
std::size_t H0 = H;
bool tasks_omitted = false;
bool tasks_skipped = false;
do {
// The full fence is required to sync the store of `head` with the load of `tail` (write-read barrier)
H = ++head;
Expand Down Expand Up @@ -181,8 +170,8 @@ d1::task* arena_slot::steal_task(arena& a, isolation_type isolation, std::size_t
}
// The task cannot be executed either due to isolation or proxy constraints.
result = nullptr;
tasks_omitted = true;
} else if (!tasks_omitted) {
tasks_skipped = true;
} else if (!tasks_skipped) {
// Cleanup the task pool from holes until a task is skipped.
__TBB_ASSERT( H0 == H-1, nullptr );
poison_pointer( victim_pool[H0] );
Expand All @@ -193,8 +182,8 @@ d1::task* arena_slot::steal_task(arena& a, isolation_type isolation, std::size_t

// emit "task was consumed" signal
poison_pointer( victim_pool[H-1] );
if (tasks_omitted) {
// Some proxies in the task pool have been omitted. Set the stolen task to nullptr.
if (tasks_skipped) {
// Some proxies in the task pool have been skipped. Set the stolen task to nullptr.
victim_pool[H-1] = nullptr;
// The release store synchronizes the victim_pool update(the store of nullptr).
head.store( /*dead: H = */ H0, std::memory_order_release );
Expand All @@ -206,10 +195,6 @@ d1::task* arena_slot::steal_task(arena& a, isolation_type isolation, std::size_t
__TBB_cl_evict(&victim_slot.head);
__TBB_cl_evict(&victim_slot.tail);
#endif
if (tasks_omitted) {
// Synchronize with snapshot as the head and tail can be bumped which can falsely trigger EMPTY state
a.advertise_new_work<arena::wakeup>();
}
return result;
}

Expand Down
53 changes: 36 additions & 17 deletions src/tbb/arena_slot.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
Copyright (c) 2005-2025 Intel Corporation
Copyright (c) 2025 UXL Foundation Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,14 +46,18 @@ static d1::task** const EmptyTaskPool = nullptr;
static d1::task** const LockedTaskPool = reinterpret_cast<d1::task**>(~std::intptr_t(0));

struct alignas(max_nfs_size) arena_slot_shared_state {
//! Scheduler of the thread attached to the slot
/** Marks the slot as busy, and is used to iterate through the schedulers belonging to this arena **/
//! The flag indicates whether the slot is used by a thread.
/** Marks the slot as idle or busy, and is used when threads join and leave the arena **/
std::atomic<bool> my_is_occupied;

// Synchronization of access to Task pool
//! The flag indicates that the task pool might temporarily exclude some valid tasks.
/** Set by the owning thread in get_task(), tested during exhaustive task search in has_tasks(). **/
std::atomic<bool> has_skipped_tasks;

//! Synchronization of access to the task pool.
/** Also is used to specify if the slot is empty or locked:
0 - empty
-1 - locked **/
EmptyTaskPool === nullptr
LockedTaskPool === a "pointer" with all bits set to 1 **/
std::atomic<d1::task**> task_pool;

//! Index of the first ready task in the deque.
Expand Down Expand Up @@ -169,9 +174,28 @@ class arena_slot : private arena_slot_shared_state, private arena_slot_private_s
return task_pool.load(std::memory_order_relaxed) != EmptyTaskPool;
}

bool is_empty() const {
return task_pool.load(std::memory_order_relaxed) == EmptyTaskPool ||
head.load(std::memory_order_relaxed) >= tail.load(std::memory_order_relaxed);
bool has_tasks() {
d1::task** the_task_pool = task_pool.load(std::memory_order_relaxed);
if ( the_task_pool == EmptyTaskPool ) {
return false;
}
std::size_t hd = head.load(std::memory_order_relaxed), tl = tail.load(std::memory_order_relaxed);
if ( (std::intptr_t)hd >= (std::intptr_t)tl ) {
// Since some tasks might be temporary out of the visible pool bounds, lock the pool to examine closely
bool skipped_tasks = false;
the_task_pool = lock_task_pool();
if ( the_task_pool == EmptyTaskPool ) {
return false;
}
hd = head.load(std::memory_order_relaxed);
tl = tail.load(std::memory_order_relaxed);
skipped_tasks = has_skipped_tasks.load(std::memory_order_relaxed);
unlock_task_pool(the_task_pool);
if ( (std::intptr_t)hd >= (std::intptr_t)tl && !skipped_tasks ) {
return false;
}
}
return true;
}

bool is_occupied() const {
Expand Down Expand Up @@ -202,10 +226,10 @@ class arena_slot : private arena_slot_shared_state, private arena_slot_private_s
//! Get a task from the local pool at specified location T.
/** Returns the pointer to the task or nullptr if the task cannot be executed,
e.g. proxy has been deallocated or isolation constraint is not met.
tasks_omitted tells if some tasks have been omitted.
tasks_skipped tells if some tasks have been skipped.
Called only by the pool owner. The caller should guarantee that the
position T is not available for a thief. **/
d1::task* get_task_impl(size_t T, execution_data_ext& ed, bool& tasks_omitted, isolation_type isolation);
d1::task* get_task_impl(size_t T, execution_data_ext& ed, bool& tasks_skipped, isolation_type isolation);

//! Makes sure that the task pool can accommodate at least n more elements
/** If necessary relocates existing task pointers or grows the ready task deque.
Expand Down Expand Up @@ -288,7 +312,6 @@ class arena_slot : private arena_slot_shared_state, private arena_slot_private_s
if (!is_task_pool_published()) {
return; // we are not in arena - nothing to lock
}
bool sync_prepare_done = false;
for( atomic_backoff b;;b.pause() ) {
#if TBB_USE_ASSERT
// Local copy of the arena slot task pool pointer is necessary for the next
Expand All @@ -301,11 +324,8 @@ class arena_slot : private arena_slot_shared_state, private arena_slot_private_s
task_pool.compare_exchange_strong(expected, LockedTaskPool ) ) {
// We acquired our own slot
break;
} else if( !sync_prepare_done ) {
// Start waiting
sync_prepare_done = true;
}
// Someone else acquired a lock, so pause and do exponential backoff.
// Someone else acquired the lock, so pause and do exponential backoff.
}
__TBB_ASSERT( task_pool.load(std::memory_order_relaxed) == LockedTaskPool, "not really acquired task pool" );
}
Expand Down Expand Up @@ -339,7 +359,7 @@ class arena_slot : private arena_slot_shared_state, private arena_slot_private_s
// We've locked victim's task pool
break;
}
// Someone else acquired a lock, so pause and do exponential backoff.
// Someone else acquired the lock, so pause and do exponential backoff.
backoff.pause();
}
__TBB_ASSERT(victim_task_pool == EmptyTaskPool ||
Expand Down Expand Up @@ -376,7 +396,6 @@ class arena_slot : private arena_slot_shared_state, private arena_slot_private_s
//! Leave the task pool
/** Leaving task pool automatically releases the task pool if it is locked. **/
void leave_task_pool() {
__TBB_ASSERT(is_task_pool_published(), "Not in arena");
// Do not reset my_arena_index. It will be used to (attempt to) re-acquire the slot next time
__TBB_ASSERT(task_pool.load(std::memory_order_relaxed) == LockedTaskPool, "Task pool must be locked when leaving arena");
__TBB_ASSERT(is_quiescent_local_task_pool_empty(), "Cannot leave arena when the task pool is not empty");
Expand Down