Skip to content

Commit 0e35744

Browse files
committed
Add distributed::reduce() algorithm
1 parent 35ead92 commit 0e35744

File tree

3 files changed

+503
-0
lines changed

3 files changed

+503
-0
lines changed
Lines changed: 359 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,359 @@
1+
//---------------------------------------------------------------------------//
2+
// Copyright (c) 2016 Jakub Szuppe <j.szuppe@gmail.com>
3+
//
4+
// Distributed under the Boost Software License, Version 1.0
5+
// See accompanying file LICENSE_1_0.txt or copy at
6+
// http://www.boost.org/LICENSE_1_0.txt
7+
//
8+
// See http://boostorg.github.com/compute for more information.
9+
//---------------------------------------------------------------------------//
10+
11+
#ifndef BOOST_COMPUTE_DISTRIBUTED_REDUCE_HPP
12+
#define BOOST_COMPUTE_DISTRIBUTED_REDUCE_HPP
13+
14+
#include <vector>
15+
16+
#include <boost/utility/enable_if.hpp>
17+
18+
#include <boost/mpl/and.hpp>
19+
#include <boost/mpl/not.hpp>
20+
#include <boost/mpl/or.hpp>
21+
22+
#include <boost/compute/buffer.hpp>
23+
#include <boost/compute/algorithm/reduce.hpp>
24+
#include <boost/compute/algorithm/copy_n.hpp>
25+
#include <boost/compute/iterator/buffer_iterator.hpp>
26+
#include <boost/compute/type_traits/is_device_iterator.hpp>
27+
28+
#include <boost/compute/distributed/command_queue.hpp>
29+
#include <boost/compute/distributed/vector.hpp>
30+
31+
namespace boost {
32+
namespace compute {
33+
namespace distributed {
34+
namespace detail {
35+
36+
template<class OutputIterator>
37+
inline ::boost::compute::command_queue&
38+
final_reduce_queue(OutputIterator result,
39+
command_queue &queue,
40+
typename boost::enable_if_c<
41+
!is_device_iterator<OutputIterator>::value
42+
>::type* = 0)
43+
{
44+
(void) result;
45+
46+
::boost::compute::command_queue& device_queue = queue.get(0);
47+
// CPU device is preferred, however if there is none, the first device
48+
// queue is used
49+
for(size_t i = 0; i < queue.size(); i++)
50+
{
51+
if(queue.get(i).get_device().type() & ::boost::compute::device::cpu)
52+
{
53+
device_queue = queue.get(i);
54+
break;
55+
}
56+
}
57+
return device_queue;
58+
}
59+
60+
template<class OutputIterator>
61+
inline ::boost::compute::command_queue&
62+
final_reduce_queue(OutputIterator result,
63+
command_queue &queue,
64+
typename boost::enable_if_c<
65+
is_device_iterator<OutputIterator>::value
66+
>::type* = 0)
67+
{
68+
// first, find all queues that can be used with result iterator
69+
const ::boost::compute::context& result_context =
70+
result.get_buffer().get_context();
71+
std::vector<size_t> compatible_queues;
72+
for(size_t i = 0; i < queue.size(); i++)
73+
{
74+
if(queue.get(i).get_context() == result_context)
75+
{
76+
compatible_queues.push_back(i);
77+
}
78+
}
79+
BOOST_ASSERT_MSG(
80+
compatible_queues.empty(),
81+
"There is no device command queue that can be use to copy to result"
82+
);
83+
84+
// then choose device queue from compatible device queues
85+
86+
// CPU device is preferred, however if there is none, the first
87+
// compatible device queue is used
88+
::boost::compute::command_queue& device_queue = queue.get(compatible_queues[0]);
89+
for(size_t i = 0; i < compatible_queues.size(); i++)
90+
{
91+
size_t n = compatible_queues[i];
92+
if(queue.get(n).get_device().type() & ::boost::compute::device::cpu)
93+
{
94+
device_queue = queue.get(n);
95+
break;
96+
}
97+
}
98+
return device_queue;
99+
}
100+
101+
template<
102+
class InputType, weight_func weight, class Alloc,
103+
class OutputIterator,
104+
class BinaryFunction
105+
>
106+
inline void
107+
dispatch_reduce(const vector<InputType, weight, Alloc> &input,
108+
OutputIterator result,
109+
BinaryFunction function,
110+
command_queue &queue)
111+
{
112+
typedef typename
113+
boost::compute::result_of<BinaryFunction(InputType, InputType)>::type
114+
result_type;
115+
116+
// find device queue for the final reduction
117+
::boost::compute::command_queue& device_queue =
118+
final_reduce_queue(result, queue);
119+
120+
::boost::compute::buffer parts_results_device(
121+
device_queue.get_context(), input.parts() * sizeof(result_type)
122+
);
123+
124+
// if all devices queues are in the same OpenCL context we can
125+
// save part reduction directly into parts_results_device buffer
126+
size_t reduced = 0;
127+
if(queue.one_context())
128+
{
129+
// reduce each part of input vector
130+
for(size_t i = 0; i < input.parts(); i++)
131+
{
132+
if(input.begin(i) != input.end(i)) {
133+
::boost::compute::reduce(
134+
input.begin(i),
135+
input.end(i),
136+
::boost::compute::make_buffer_iterator<result_type>(
137+
parts_results_device, reduced
138+
),
139+
function,
140+
queue.get(i)
141+
);
142+
reduced++;
143+
}
144+
}
145+
}
146+
else
147+
{
148+
// reduce each part of input vector
149+
std::vector<result_type> parts_results_host(input.parts());
150+
for(size_t i = 0; i < input.parts(); i++)
151+
{
152+
if(input.begin(i) != input.end(i)) {
153+
::boost::compute::reduce(
154+
input.begin(i),
155+
input.end(i),
156+
&parts_results_host[reduced],
157+
function,
158+
queue.get(i)
159+
);
160+
reduced++;
161+
}
162+
}
163+
::boost::compute::copy_n(
164+
parts_results_host.begin(),
165+
reduced,
166+
::boost::compute::make_buffer_iterator<result_type>(
167+
parts_results_device
168+
),
169+
device_queue
170+
);
171+
}
172+
// final reduction
173+
::boost::compute::reduce(
174+
::boost::compute::make_buffer_iterator<result_type>(
175+
parts_results_device
176+
),
177+
::boost::compute::make_buffer_iterator<result_type>(
178+
parts_results_device, reduced
179+
),
180+
result,
181+
function,
182+
device_queue
183+
);
184+
}
185+
186+
// special case for when OutputIterator is a host iterator
187+
// and binary operator is plus<T>
188+
template<
189+
class InputType, weight_func weight, class Alloc,
190+
class OutputIterator,
191+
class T
192+
>
193+
inline void
194+
dispatch_reduce(const vector<InputType, weight, Alloc> &input,
195+
OutputIterator result,
196+
::boost::compute::plus<T> function,
197+
command_queue &queue,
198+
typename boost::enable_if_c<
199+
!is_device_iterator<OutputIterator>::value
200+
>::type* = 0)
201+
{
202+
// reduce each part of input vector
203+
std::vector<T> parts_results_host(input.parts());
204+
for(size_t i = 0; i < input.parts(); i++)
205+
{
206+
::boost::compute::reduce(
207+
input.begin(i),
208+
input.end(i),
209+
&parts_results_host[i],
210+
function,
211+
queue.get(i)
212+
);
213+
}
214+
215+
// final reduction
216+
*result = parts_results_host[0];
217+
for(size_t i = 1; i < input.parts(); i++)
218+
{
219+
*result += static_cast<T>(parts_results_host[i]);
220+
}
221+
}
222+
223+
// special case for when OutputIterator is a host iterator
224+
// and binary operator is min<T>
225+
template<
226+
class InputType, weight_func weight, class Alloc,
227+
class OutputIterator,
228+
class T
229+
>
230+
inline void
231+
dispatch_reduce(vector<InputType, weight, Alloc> &input,
232+
OutputIterator result,
233+
::boost::compute::min<T> function,
234+
command_queue &queue,
235+
typename boost::enable_if_c<
236+
!is_device_iterator<OutputIterator>::value
237+
>::type* = 0)
238+
{
239+
// reduce each part of input vector
240+
std::vector<T> parts_results_host(input.parts());
241+
for(size_t i = 0; i < input.parts(); i++)
242+
{
243+
::boost::compute::reduce(
244+
input.begin(i),
245+
input.end(i),
246+
&parts_results_host[i],
247+
function,
248+
queue.get(i)
249+
);
250+
}
251+
252+
// final reduction
253+
*result = parts_results_host[0];
254+
for(size_t i = 1; i < input.parts(); i++)
255+
{
256+
*result = (std::min)(static_cast<T>(*result), parts_results_host[i]);
257+
}
258+
}
259+
260+
// special case for when OutputIterator is a host iterator
261+
// and binary operator is max<T>
262+
template<
263+
class InputType, weight_func weight, class Alloc,
264+
class OutputIterator,
265+
class T
266+
>
267+
inline void
268+
dispatch_reduce(const vector<InputType, weight, Alloc> &input,
269+
OutputIterator result,
270+
::boost::compute::max<T> function,
271+
command_queue &queue,
272+
typename boost::enable_if_c<
273+
!is_device_iterator<OutputIterator>::value
274+
>::type* = 0)
275+
{
276+
// reduce each part of input vector
277+
std::vector<T> parts_results_host(input.parts());
278+
for(size_t i = 0; i < input.parts(); i++)
279+
{
280+
::boost::compute::reduce(
281+
input.begin(i),
282+
input.end(i),
283+
&parts_results_host[i],
284+
function,
285+
queue.get(i)
286+
);
287+
}
288+
289+
// final reduction
290+
*result = parts_results_host[0];
291+
for(size_t i = 1; i < input.parts(); i++)
292+
{
293+
*result = (std::max)(static_cast<T>(*result), parts_results_host[i]);
294+
}
295+
}
296+
297+
} // end detail namespace
298+
299+
/// Returns the result of applying \p function to the elements in the
300+
/// \p input vector.
301+
///
302+
/// If no function is specified, \c plus will be used.
303+
///
304+
/// \param input input vector
305+
/// \param result iterator pointing to the output
306+
/// \param function binary reduction function
307+
/// \param queue distributed command queue to perform the operation
308+
///
309+
/// Distributed command queue \p queue has to span same set of compute
310+
/// devices as distributed command queue used to create \p input vector.
311+
///
312+
/// If \p result is a device iterator, its underlying buffer must be allocated
313+
/// in context of at least one device command queue from \p queue.
314+
///
315+
/// The \c reduce() algorithm assumes that the binary reduction function is
316+
/// associative. When used with non-associative functions the result may
317+
/// be non-deterministic and vary in precision. Notably this affects the
318+
/// \c plus<float>() function as floating-point addition is not associative
319+
/// and may produce slightly different results than a serial algorithm.
320+
///
321+
/// This algorithm supports both host and device iterators for the
322+
/// result argument. This allows for values to be reduced and copied
323+
/// to the host all with a single function call.
324+
template<
325+
class InputType, weight_func weight, class Alloc,
326+
class OutputIterator,
327+
class BinaryFunction
328+
>
329+
inline void
330+
reduce(const vector<InputType, weight, Alloc> &input,
331+
OutputIterator result,
332+
BinaryFunction function,
333+
command_queue &queue)
334+
{
335+
if(input.empty()) {
336+
return;
337+
}
338+
339+
detail::dispatch_reduce(input, result, function, queue);
340+
}
341+
342+
/// \overload
343+
template<
344+
class InputType, weight_func weight, class Alloc,
345+
class OutputIterator
346+
>
347+
inline void
348+
reduce(const vector<InputType, weight, Alloc> &input,
349+
OutputIterator result,
350+
command_queue &queue)
351+
{
352+
return reduce(input, result, ::boost::compute::plus<InputType>(), queue);
353+
}
354+
355+
} // end distributed namespace
356+
} // end compute namespace
357+
} // end boost namespace
358+
359+
#endif /* BOOST_COMPUTE_DISTRIBUTED_REDUCE_HPP */

test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ add_compute_test("distributed.context" test_distributed_context.cpp)
8585
add_compute_test("distributed.command_queue" test_distributed_command_queue.cpp)
8686
add_compute_test("distributed.vector" test_distributed_vector.cpp)
8787
add_compute_test("distributed.copy" test_distributed_copy.cpp)
88+
add_compute_test("distributed.reduce" test_distributed_reduce.cpp)
8889
add_compute_test("distributed.transform" test_distributed_transform.cpp)
8990

9091
add_compute_test("utility.extents" test_extents.cpp)

0 commit comments

Comments
 (0)