Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 226 additions & 0 deletions rfcs/proposed/flow_graph_fold_node/README.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, since the node is supposed to handle multiple input streams, we need to decide how similar these streams must be. Should the same initial value be used, or should it be per stream? Should the same binary operation be used, or can/should it be separate for each stream? Should the types of input and output be the same for all streams, or can those differ as well?

Of course I see that in the proposal all those things are the same for all streams. But it is not discussed, rather assumed as given.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use cases should help in answering this question. Probably, the same type and same initializer value will make sense for most use cases, but we should add support for this choice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A related question is on the number of input and output ports. Do we expect all streams to go into a single port, or should there be a single port per stream, or even many-to-many? Of course we can add a preceding indexer_node for the port-per-stream setup in front of a single input port (and it would also nicely create tagged_msges). But if the 1-1 correspondence is expected to be the typical use, maybe we would not want to complicate it. Also there is no "deindexer" node to put after a single output port.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting point. We have a split_node that is complementary to join_node, joining and splitting tuples. But nothing that is complementary to an indexer_node. The indexer_node was introduced to tagged messages that came into a single point of a functional_node. So far, I don't think we've seen the need to automatically send indexed messages to different output ports.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we see the case of folding a single stream to be important enough, and would it be sufficiently easy to do with the node designed to handle multiple streams? Perhaps some code samples could show that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is important to outline some use cases which motivate creation of the fold node. It will guide the design and help to prevent "suboptimal" decisions.

A few simplified use case ideas to consider:

  • compute the word/token count in a text file read line by line.
  • compute the sum of floating-point values, assuming that the sum might be many orders of magnitude bigger than some individual values.
  • "reduce by key": for a key-value sequence, compute the value sums for all different keys.

Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
# Folding node in the Flow Graph

## Introduction

Current oneTBB Flow Graph API provides basic functional nodes for expressing basic user-scenarios:

* `continue_node` that converts multiple input signals (usually from several different predecessors) to the single signal to each of successors.
* `function_node` converting each input message from one or several predecessors to the single output message.
* `multifunction_node` converting each input message into some amount of output messages (zero messages are also allowed) to one or several successors.

But expressing the use-case converting multiple input messages (input stream) into single output is not straightforward to express using only the existing API.

Such an API is extremely useful for expressing _reductions_ (or _folds_) using the oneTBB Flow Graph which is transformation of a sequence of
objects (_fold input stream_) into a single object (_fold result_) starting from the user-defined initial value (_fold initializer_) by repeating the user defined binary operation (_fold operation_).
Consider the basic example:

*_S = sum(1, n) = 0 + 1 + 2 + 3 + 4 + ... + n_*

_fold input stream_ is a sequence _(1, 2, 3, 4, ..., n)_, _fold result_ is _S_, _fold initializer_ is _0_ and the _fold operation_ is binary _+_.

This paper proposes to add a new functional node to the oneTBB Flow Graph that would allow the user to specify _fold initializer_ and _fold operation_
and would compute _fold_ on multiple _fold input stream_s in parallel (both several operations on a single stream and different streams can be
calculated in parallel).

## Proposal

The new ``oneapi::tbb::flow::fold_node<InputType, OutputType>`` is a new oneTBB Flow Graph functional node that allows to calculate parallel _fold_
on multiple input streams. The concurrency limit (similar to other functional nodes), the _fold initializer_ and a binary _fold operation_
should be specified while constructing the node:

```cpp
tbb::flow::graph g;

tbb::flow::fold_node<int, int> f_node(g, tbb::flow::unlimited,
/*fold initializer = */0,
/*fold operation = */std::plus<int>{});
```

Since the ``fold_node`` needs to operate multiple _fold input stream_s in parallel, some identifier of the stream should be provided
as an input message together with the actual data to compute. It is proposed to use ``tbb::flow::tagged_msg`` class for that purpose
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
as an input message together with the actual data to compute. It is proposed to use ``tbb::flow::tagged_msg`` class for that purpose
as an input message together with the actual data to compute. It is proposed to use [``tbb::flow::tagged_msg``](https://oneapi-spec.uxlfoundation.org/specifications/oneapi/latest/elements/onetbb/source/flow_graph/tagged_msg_cls) class for that purpose

as an input for the ``fold_node`` that would be sent by the predecessor or by manual ``try_put`` calls:
Comment on lines +40 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tagged_msg was designed to hold data of multiple types, it's a 'variant' on steroids. If we decide to require a single data type for all streams, tagged_msg will likely be not that convenient to use and may also add some overheads. In other words, I think the way to combine the data with the stream index will depend on the overall node design.


```cpp
tbb::flow::graph g;

tbb::flow::fold_node<int, int> f_node(g, tbb::flow::unlimited,
/*fold initializer = */0,
/*fold operation = */std::plus<int>{});

tbb::flow::buffer_node<int> buf(g);

tbb::flow::make_edge(f_node, buf);

std::size_t stream1_index = 1, stream2_index = 2;

std::vector<int> stream1 = {1, 3, 5, 7, 9};
std::vector<int> stream2 = {2, 4, 6, 8, 10};

using fold_input = typename tbb::flow::fold_node<int, int>::input_type;

// Submit stream1
for (auto item : stream1) {
f_node.try_put(input_type{stream1_index, item});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
f_node.try_put(input_type{stream1_index, item});
f_node.try_put(fold_input{stream1_index, item});

}

for (auto item : stream2) {
f_node.try_put(input_type{stream2_index, item});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
f_node.try_put(input_type{stream2_index, item});
f_node.try_put(fold_input{stream2_index, item});

}

int result = 0;
buf.try_get(result);

// result is expected to be either 25 (result of the first stream fold) or
// 30 (result of the second stream fold)
```

Explicit specification the ``tagged_msg`` as an ``Input`` template parameter as well as allowing ``tagged_msg`` as a fold operation argument
are [open question](#process-specific-information).

An other important aspect is since the ``fold_node`` takes the elements from the _fold input stream_ one-by-one, not the entire stream and
computes partial folds for each element, it is unclear for the node when the computed result is final and all of the elements in the stream
were processed. To handle this, it is required to take a special signal from the user indicating that no more elements from the input stream
with the specified tag are expected. Some special type defined by the Flow Graph is required for that purpose, e.g. ``tbb::flow::fold_stream_end``.
``fold_stream_end`` instance should be explicitly submitted to the node once the input stream end was reached:

```cpp
// Submit stream1
for (auto item : stream1) {
f_node.try_put(input_type{stream1_index, item});
}
f_node.try_put(tbb::flow::fold_stream_end{stream1_index});

for (auto item : stream2) {
f_node.try_put(input_type{stream2_index, item});
}
f_node.try_put(tbb::flow::fold_stream_end{stream2_index});

int result = 0;
buf.try_get(result);

// result is expected to be either 25 (result of the first stream fold) or
// 30 (result of the second stream fold)
```

From the implementation perspective, there can be multiple approaches for implementing the ``fold_stream_end`` that affects the user API.
Some of them were considered in a [separate section](#fold_stream_end-implementation-approaches).
Comment on lines +105 to +106
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that all the approaches are based on a message counter. The question then is, at which point the total number of messages to be folded (the stream "size") becomes known. If it is known a priori, or if we want to compute partial folds for each K messages, the counter might be set even at construction. If the stream size is dynamic but known before the first message, we may require that the stream is somehow "initialized" to set that counter.

The most flexible, but also the most complicated and risky approach is to get the stream size at any point, or to get the end of stream message. If the message does not carry any counter, there are problems with potential "still in the flight" messages that might be missed. If the message does carry a counter, what to do if more messages were already received and folded? Also, the message type is kind of problematic, as both the stream size and the end of stream message are likely of different type than the stream data.

A possible design alternative is to have a dedicated port for the "end of stream" signals.


## ``fold_stream_end`` Implementation Approaches

Consider the possible implementation of ``fold_node`` described in this paper as a wrapper of ``tbb::flow::multifunction_node`` combined with
concurrent hash table for storing the partial results. Let's also consider the ``multifunction_node`` to be ``tbb::flow::unlimited`` for simplicity.
Since the elements from the input stream and the ``fold_stream_end`` indicator are submitted in the same manner, the current TBB implementation
of ``multifunction_node`` would create and spawn _N + 1_ tasks, where _N_ is a number of elements in the input stream. Since the tasks are processed by
the TBB scheduler in an unspecified order, it is possible that the task processing the ``fold_stream_end`` can be processed before other tasks processing
the input stream.

### ``fold_stream_end`` containing the number of elements in the stream

The first implementation approach suggests ``fold_stream_end`` to contain the number of elements in the corresponding input stream.
The ``fold_node`` should support an internal *signed* integer counter of elements for each input stream. Once the node is
processing the element that is not a ``fold_stream_end``, the counter should be increased by 1 for each element. If the node is
processing the ``fold_stream_end`` element, the counter should be decreased by _N_, where _N_ is a number of elements in the stream
which is stored as part of the ``fold_stream_end``.

In this case, having the internal counter not equal to _0_ indicates that there are still elements to process and once it is equal to _0_, the result
of the fold is considered full and can be propagated to the ``fold_node`` successors.
Comment on lines +125 to +126
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making the single counter for both expected (negative count) and received (positive count) messages, which triggers at 0, has a downside - it loses the information about the number of messages and cannot pass it down. This value was computed as a byproduct of folding, and in theory it might not be known anywhere else, so it could possibly be useful in the subsequent processing.


### Automated approach

The second implementation approach suggests ``fold_stream_end`` to be a simple flag type and not to spawn a tasks while receiving this flag.
The ``fold_node`` in that case should support a counter indicating the number of tasks spawned for each input stream and another flag indicating that
``fold_stream_end`` flag was received. Each executed task should decrease the reference counting after the completion and check if the end flag was received.
If the node is processing the ``fold_stream_end``, it stores the end flag to ``true`` and check if all of the tasks were processed.
The result would be considered complete and propagated to successors if both completion flag was set to ``true`` and all of the tasks were processed, i.e.
the corresponding reference counter equals to _0_.

## Process Specific Information

### Propagating the stream index to the successors

It may be useful for the successors of the ``fold_node`` to identify the input stream, the result for which was received:

```cpp
tbb::flow::graph g;

tbb::flow::fold_node<int, int> fold(g, ...);
tbb::flow::function_node<int, int> postprocess(g, tbb::flow::unlimited,
[](const tbb::flow::tagged_msg<std::size_t, int>& tagged_input) {
// tagged_input.tag() - a unique tag of the input stream
// cast_to<int>(tagged_input) - the fold result
});
```

One of the solutions is to provide flexibility:

* If the ``OutputType`` template argument of the ``fold_node`` is specified as a specialization of ``tagged_msg`` - the tag should be propagated to the
successors. In this case the successors should be ready to take the same ``tagged_msg`` (be a ``receiver<tagged_msg<...>>``).
* If the ``OutputType`` template argument of the ``fold_node`` is not a specialization of ``tagged_msg`` - the tag should *not* be propagated and the
successors are not expected to be receivers of ``tagged_msg``.

### Propagation of the stream index to the node body

Same as in the previous question, there may be useful for some use-cases to propagate the index of the input stream to the fold operation to
implement the specific logic for different stream tags:

```cpp
tbb::flow::graph g;

using fold_node_type = fold_node<int, int>;
using input_type = typename fold_node_type::input_type;

fold_node_type fold(g, tbb::flow::unlimited,
/*fold initializer = */0,
[](int lhs, const input_type& rhs) {
if (rhs.tag() == 0) {
// Fold logic for tag 0
return lhs + cast_to<int>(rhs);
} else {
// Fold logic for any other streams
return (lhs / 2) + (cast_to<int>(rhs) / 2);
}
});
```

The same flexible approach can be applied for solving this as well:
* If the invocation of _fold operation_ with an object of type

It is also an open question, should ``input_type`` be allowed as a first, second, or both operands of _fold operation_.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, there can be a separate optional argument for the stream index.


Similar to the previous open question, the flexible approach can be applied to allow both use-cases:
* If the invocation of the _fold operation_ associated with the ``fold_node`` with the argument of type ``input_type`` (i.e. ``tagged_msg``)
is well-formed - propagate the index of the stream to the body.
* Otherwise - the index of the input stream is not propagated to the _fold operation_.

### Should ``InputType`` and ``OutputType`` be different?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think yes, generally these should be different types. If I remember correctly, one of our motivating scenarios is computing a histogram for a data sequence; it is obviously of a different type than the data elements.


Basic use-case of computing a fold is reducing a set of single type objects to a single object of the same type, but in general
there can be folds where _fold operation_ returns different type. To support such use-cases it should be allowed to specify different
``InputType`` and ``OutputType`` template arguments.

But in that case the requirements for _fold operation_ should be clearly defined since the partial results would be of the type ``OutputType``
so ``fold_operation(OutputType, InputType)`` should be defined. Should other variations be defined as well is an open question.

### Computation of the single fold

This question is not related to the ``fold_node`` design, only to it's possible implementation on top of
``multifunction_node<InputType, std::tuple<OutputType>>`` and ``concurrent_hash_map<std::size_t, partial-result-holder>`` for
containing the partial results of the fold.

In that case, each task would need to either calculate the initial fold ``fold_operation(fold_initializer, input)`` if there are no partial
result yet and put it to the hash map.

The first thread calculating the fold would insert the _empty_ element to the partial results hash_map and keep holding the ``accessor``
instance (with the underlying mutex) and while computing the initial fold ``fold_operation(fold_initializer, input)``.
If the single fold is not a lightweight operation, it may negatively affect performance because of holding the mutex inside of the accessor.

An other option is to check for the partial result presence and if there is no element - calculate the ``result = fold_operation(fold_initializer, input)``
without holding an ``accessor`` and try to update the partial result. If an other thread inserts simultaneously an other partial result
with the same index, the current thread needs to take that result into account and calculate ``fold_operation(prev_result, result)`` one more time.
Ideally without holding the lock. Further partial results may also fail due to other threads putting the results first.

On the one hand, holding the accessor while calculating the _fold operation_ can negatively affect the performance and on the other hand,
trying to update the result each time after the calculation and failing can also have negative effect because of recalculation on each failure.

An other approach is to use ``std::atomic`` as a partial result inside of the hash map but it will significantly narrow the amount of types that can
be used as ``InputType`` and ``OutputType`` arguments of the ``fold_node``.
Loading