-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add try_put_and_wait documentation #1514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
b13a759
Add try_put_and_wait documentation
kboyarinov 98a59c0
Fix indentation
kboyarinov 4415f10
Apply comments from the review
kboyarinov 62c6b0c
computted -> computed
kboyarinov 386ef09
Fix PR review comments
kboyarinov 558f86c
Fix comments
kboyarinov File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,324 @@ | ||
| .. _try_put_and_wait: | ||
|
|
||
| Waiting for Single Messages in Flow Graph | ||
| ========================================= | ||
|
|
||
| .. contents:: | ||
| :local: | ||
| :depth: 1 | ||
|
|
||
| Description | ||
| *********** | ||
|
|
||
| This feature adds a new ``try_put_and_wait`` interface to the receiving nodes in the Flow Graph. | ||
| This function puts a message as an input into a Flow Graph and waits until all work related to | ||
| that message is complete. | ||
| ``try_put_and_wait`` may reduce latency compared to calling ``graph::wait_for_all`` since | ||
| ``graph::wait_for_all`` waits for all work, including work that is unrelated to the input message, to complete. | ||
|
|
||
| ``node.try_put_and_wait(msg)`` performs ``node.try_put(msg)`` on the node and waits until the work on ``msg`` is completed. | ||
| Therefore, the following conditions are true: | ||
|
|
||
| * Any task initiated by any node in the Flow Graph that involves working with ``msg`` or any other intermediate result | ||
| computed from ``msg`` is completed. | ||
| * No intermediate results computed from ``msg`` remain in any buffers in the graph. | ||
|
|
||
| .. caution:: | ||
|
|
||
| To prevent ``try_put_and_wait`` calls from infinite waiting, avoid using buffering nodes at the end of the Flow Graph since the final result | ||
| will not be automatically consumed by the Flow Graph. | ||
|
|
||
| .. caution:: | ||
|
|
||
| The ``multifunction_node`` and ``async_node`` classes are not currently supported by this feature. Including one of these nodes in the | ||
| Flow Graph may cause ``try_put_and_wait`` to exit early, even if the computations on the initial input message are | ||
| still in progress. | ||
|
|
||
| API | ||
| *** | ||
|
|
||
| Header | ||
| ------ | ||
|
|
||
| .. code:: cpp | ||
|
|
||
| #define TBB_PREVIEW_FLOW_GRAPH_FEATURES // macro option 1 | ||
| #define TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // macro option 2 | ||
| #include <oneapi/tbb/flow_graph.h> | ||
|
|
||
| Synopsis | ||
| -------- | ||
|
|
||
| .. code:: cpp | ||
|
|
||
| namespace oneapi { | ||
| namespace tbb { | ||
| template <typename Output, typename Policy = /*default-policy*/> | ||
| class continue_node { | ||
| public: | ||
| bool try_put_and_wait(const continue_msg& input); | ||
| }; // class continue_node | ||
|
|
||
| template <typename Input, typename Output = continue_msg, typename Policy = /*default-policy*/> | ||
| class function_node { | ||
| public: | ||
| bool try_put_and_wait(const Input& input); | ||
| }; // class function_node | ||
|
|
||
| template <typename T> | ||
| class overwrite_node { | ||
| public: | ||
| bool try_put_and_wait(const T& input); | ||
| }; // class overwrite_node | ||
|
|
||
| template <typename T> | ||
| class write_once_node { | ||
| public: | ||
| bool try_put_and_wait(const T& input); | ||
| }; // class write_once_node | ||
|
|
||
| template <typename T> | ||
| class buffer_node { | ||
| public: | ||
| bool try_put_and_wait(const T& input); | ||
| }; // class buffer_node | ||
|
|
||
| template <typename T> | ||
| class queue_node { | ||
| public: | ||
| bool try_put_and_wait(const T& input); | ||
| }; // class queue_node | ||
|
|
||
| template <typename T, typename Compare = std::less<T>> | ||
| class priority_queue_node { | ||
| public: | ||
| bool try_put_and_wait(const T& input); | ||
| }; // class priority_queue_node | ||
|
|
||
| template <typename T> | ||
| class sequencer_node { | ||
| public: | ||
| bool try_put_and_wait(const T& input); | ||
| }; // class sequencer_node | ||
|
|
||
| template <typename T, typename DecrementType = continue_msg> | ||
| class limiter_node { | ||
| public: | ||
| bool try_put_and_wait(const T& input); | ||
| }; // class limiter_node | ||
|
|
||
| template <typename T> | ||
| class broadcast_node { | ||
| public: | ||
| bool try_put_and_wait(const T& input); | ||
| }; // class broadcast_node | ||
|
|
||
| template <typename TupleType> | ||
| class split_node { | ||
| public: | ||
| bool try_put_and_wait(const TupleType& input); | ||
| }; // class split_node | ||
| } // namespace tbb | ||
| } // namespace oneapi | ||
|
|
||
| Member Functions | ||
| ---------------- | ||
|
|
||
| .. code:: cpp | ||
|
|
||
| template <typename Output, typename Policy> | ||
| bool continue_node<Output, Policy>::try_put_and_wait(const continue_msg& input) | ||
|
|
||
| **Effects**: Increments the count of input signals received. If the incremented count is equal to the number | ||
| of known predecessors, performs the ``body`` function object execution. | ||
|
|
||
| Waits for the completion of the ``input`` in the Flow Graph, meaning all tasks created by each node and | ||
| related to ``input`` are executed, and no related objects remain in any buffer within the graph. | ||
|
|
||
| **Returns**: ``true``. | ||
|
|
||
| .. code:: cpp | ||
|
|
||
| template <typename Input, typename Output, typename Policy> | ||
| bool function_node<Input, Output, Policy>::try_put_and_wait(const Input& input) | ||
|
|
||
| **Effects**: If the concurrency limit allows, executes the user-provided body on the incoming message ``input``. | ||
| Otherwise, depending on the ``Policy`` of the node, either queues the incoming message ``input`` or rejects it. | ||
|
|
||
| Waits for the completion of the ``input`` in the Flow Graph, meaning all tasks created by each node and | ||
| related to ``input`` are executed, and no related objects remain in any buffer within the graph. | ||
|
|
||
| **Returns**: ``true`` if the input is accepted, ``false`` otherwise. | ||
|
|
||
| .. code:: cpp | ||
|
|
||
| template <typename T> | ||
| bool overwrite_node<T>::try_put_and_wait(const T& input) | ||
|
|
||
| **Effects**: Stores ``input`` in the internal single-item buffer and broadcasts it to all successors. | ||
|
|
||
| Waits for the completion of the ``input`` in the Flow Graph, meaning all tasks created by each node and | ||
| related to ``input`` are executed, and no related objects remain in any buffer within the graph. | ||
|
|
||
| **Returns**: ``true``. | ||
|
|
||
| .. caution:: | ||
|
|
||
| Since the input element is not retrieved from ``overwrite_node`` once accepted by the successor, | ||
| retrieve it by explicitly calling the ``clear()`` method or by overwriting with another element to prevent | ||
| ``try_put_and_wait`` from indefinite waiting. | ||
|
|
||
| .. code:: cpp | ||
|
|
||
| template <typename T> | ||
| bool write_once_node<T>::try_put_and_wait(const T& input) | ||
|
|
||
| **Effects**: Stores ``input`` in the internal single-item buffer if it does not contain a valid value already. | ||
| If a new value is set, the node broadcasts it to all successors. | ||
|
|
||
| Waits for the completion of the ``input`` in the Flow Graph, meaning all tasks created by each node and | ||
| related to ``input`` are executed, and no related objects remain in any buffer within the graph. | ||
|
|
||
| **Returns**: ``true`` for the first time after construction or a call to ``clear()``. | ||
|
|
||
| .. caution:: | ||
|
|
||
| Since the input element is not retrieved from the ``write_once_node`` once accepted by the successor, | ||
| retrieve it by explicitly calling the ``clear()`` method to prevent ``try_put_and_wait`` from indefinite waiting. | ||
|
|
||
| .. code:: cpp | ||
|
|
||
| template <typename T> | ||
| bool buffer_node<T>::try_put_and_wait(const T& input) | ||
|
|
||
| **Effects**: Adds ``input`` to the set of items managed by the node and tries forwarding it to a successor. | ||
|
|
||
| Waits for the completion of the ``input`` in the Flow Graph, meaning all tasks created by each node and | ||
| related to ``input`` are executed, and no related objects remain in any buffer within the graph. | ||
|
|
||
| **Returns**: ``true``. | ||
|
|
||
| .. code:: cpp | ||
|
|
||
| template <typename T> | ||
| bool queue_node<T>::try_put_and_wait(const T& input) | ||
|
|
||
| **Effects**: Adds ``input`` to the set of items managed by the node and tries forwarding the least recently added item | ||
| to a successor. | ||
|
|
||
| Waits for the completion of the ``input`` in the Flow Graph, meaning all tasks created by each node and | ||
| related to ``input`` are executed, and no related objects remain in any buffer within the graph. | ||
|
|
||
| **Returns**: ``true``. | ||
|
|
||
| .. code:: cpp | ||
|
|
||
| template <typename T, typename Compare> | ||
| bool priority_queue_node<T>::try_put_and_wait(const T& input) | ||
|
|
||
| **Effects**: Adds ``input`` to the ``priority_queue_node`` and attempts to forward the item with the highest | ||
| priority among all items added to the node but not yet forwarded to the successors. | ||
|
|
||
| Waits for the completion of the ``input`` in the Flow Graph, meaning all tasks created by each node and | ||
| related to ``input`` are executed, and no related objects remain in any buffer within the graph. | ||
|
|
||
| **Returns**: ``true``. | ||
|
|
||
| .. code:: cpp | ||
|
|
||
| template <typename T> | ||
| bool sequencer_node<T>::try_put_and_wait(const T& input) | ||
|
|
||
| **Effects**: Adds ``input`` to the ``sequencer_node`` and tries forwarding the next item in sequence to a successor. | ||
|
|
||
| Waits for the completion of the ``input`` in the Flow Graph, meaning all tasks created by each node and | ||
| related to ``input`` are executed, and no related objects remain in any buffer within the graph. | ||
|
|
||
| **Returns**: ``true``. | ||
|
|
||
| .. code:: cpp | ||
|
|
||
| template <typename T, typename DecrementType> | ||
| bool limiter_node<T, DecrementType>::try_put_and_wait(const T& input) | ||
|
|
||
| **Effects**: If the broadcast count is below the threshold, broadcasts ``input`` to all successors. | ||
|
|
||
| Waits for the completion of the ``input`` in the Flow Graph, meaning all tasks created by each node and | ||
| related to ``input`` are executed, and no related objects remain in any buffer within the graph. | ||
|
|
||
| **Returns**: ``true`` if ``input`` is broadcasted; ``false`` otherwise. | ||
|
|
||
| .. code:: cpp | ||
|
|
||
| template <typename T> | ||
| bool broadcast_node<T>::try_put_and_wait(const T& input) | ||
|
|
||
| **Effects**: Broadcasts ``input`` to all successors. | ||
|
|
||
| Waits for the completion of the ``input`` in the Flow Graph, meaning all tasks created by each node and | ||
| related to ``input`` are executed, and no related objects remain in any buffer within the graph. | ||
|
|
||
| **Returns**: ``true`` even if the node cannot successfully forward the message to any of its successors. | ||
|
|
||
| .. code:: cpp | ||
|
|
||
| template <typename TupleType> | ||
| bool split_node<TupleType>::try_put_and_wait(const TupleType& input); | ||
|
|
||
| **Effects**: Broadcasts each element in the incoming tuple to the nodes connected to the ``split_node`` output ports. | ||
| The element at index ``i`` of ``input`` is broadcasted through the output port number ``i``. | ||
|
|
||
| Waits for the completion of the ``input`` in the Flow Graph, meaning all tasks created by each node and | ||
| related to ``input`` are executed, and no related objects remain in any buffer within the graph. | ||
|
|
||
| **Returns**: ``true``. | ||
|
|
||
| Example | ||
| ******* | ||
|
|
||
| .. code:: cpp | ||
|
|
||
| #define TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT | ||
| #include <oneapi/tbb/flow_graph.h> | ||
| #include <oneapi/tbb/parallel_for.h> | ||
|
|
||
| struct f1_body; | ||
| struct f2_body; | ||
| struct f3_body; | ||
| struct f4_body; | ||
|
|
||
| int main() { | ||
| using namespace oneapi::tbb; | ||
|
|
||
| flow::graph g; | ||
| flow::broadcast_node<int> start_node(g); | ||
|
|
||
| flow::function_node<int, int> f1(g, flow::unlimited, f1_body{}); | ||
| flow::function_node<int, int> f2(g, flow::unlimited, f2_body{}); | ||
| flow::function_node<int, int> f3(g, flow::unlimited, f3_body{}); | ||
|
|
||
| flow::join_node<std::tuple<int, int>> join(g); | ||
|
|
||
| flow::function_node<std::tuple<int, int>, int> f4(g, flow::serial, f4_body{}); | ||
|
|
||
| flow::make_edge(start_node, f1); | ||
| flow::make_edge(f1, f2); | ||
|
|
||
| flow::make_edge(start_node, f3); | ||
|
|
||
| flow::make_edge(f2, flow::input_port<0>(join)); | ||
| flow::make_edge(f3, flow::input_port<1>(join)); | ||
|
|
||
| flow::make_edge(join, f4); | ||
|
|
||
| // Submit work into the graph | ||
| parallel_for(0, 100, [](int input) { | ||
| start_node.try_put_and_wait(input); | ||
|
|
||
| // Post processing the result of input | ||
| }); | ||
| } | ||
|
|
||
| Each iteration of ``parallel_for`` submits an input into the Flow Graph. After returning from ``try_put_and_wait(input)``, it is | ||
| guaranteed that all of the work related to the completion of ``input`` is done by all of the nodes in the graph. Tasks related to inputs | ||
| submitted by other calls are not guaranteed to be completed. | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.