45 #ifndef KOKKOS_HPX_HPP 46 #define KOKKOS_HPX_HPP 48 #include <Kokkos_Macros.hpp> 49 #if defined(KOKKOS_ENABLE_HPX) 51 #include <Kokkos_Core_fwd.hpp> 53 #include <Kokkos_HostSpace.hpp> 57 #ifdef KOKKOS_ENABLE_HBWSPACE 58 #include <Kokkos_HBWSpace.hpp> 61 #include <HPX/Kokkos_HPX_ChunkedRoundRobinExecutor.hpp> 62 #include <Kokkos_HostSpace.hpp> 64 #include <Kokkos_MemoryTraits.hpp> 66 #include <Kokkos_ScratchSpace.hpp> 67 #include <Kokkos_TaskScheduler.hpp> 68 #include <impl/Kokkos_ConcurrentBitset.hpp> 69 #include <impl/Kokkos_FunctorAdapter.hpp> 70 #include <impl/Kokkos_FunctorAnalysis.hpp> 71 #include <impl/Kokkos_Tools.hpp> 72 #include <impl/Kokkos_Tags.hpp> 73 #include <impl/Kokkos_TaskQueue.hpp> 74 #include <impl/Kokkos_ExecSpaceInitializer.hpp> 76 #include <KokkosExp_MDRangePolicy.hpp> 78 #include <hpx/apply.hpp> 79 #include <hpx/hpx_start.hpp> 80 #include <hpx/include/util.hpp> 81 #include <hpx/lcos/local/barrier.hpp> 82 #include <hpx/lcos/local/latch.hpp> 83 #include <hpx/parallel/algorithms/for_loop.hpp> 84 #include <hpx/parallel/algorithms/reduce.hpp> 85 #include <hpx/parallel/executors/static_chunk_size.hpp> 86 #include <hpx/runtime.hpp> 87 #include <hpx/runtime/threads/run_as_hpx_thread.hpp> 88 #include <hpx/runtime/threads/threadmanager.hpp> 89 #include <hpx/runtime/thread_pool_helpers.hpp> 91 #include <Kokkos_UniqueToken.hpp> 98 #include <type_traits> 113 #ifndef KOKKOS_HPX_IMPLEMENTATION 114 #define KOKKOS_HPX_IMPLEMENTATION 1 117 #if (KOKKOS_HPX_IMPLEMENTATION < 0) || (KOKKOS_HPX_IMPLEMENTATION > 2) 118 #error "You have chosen an invalid value for KOKKOS_HPX_IMPLEMENTATION" 145 class thread_buffer {
146 static constexpr std::size_t m_cache_line_size = 64;
148 std::size_t m_num_threads;
149 std::size_t m_size_per_thread;
150 std::size_t m_size_total;
153 void pad_to_cache_line(std::size_t &size) {
154 size = ((size + m_cache_line_size - 1) / m_cache_line_size) *
161 m_size_per_thread(0),
164 thread_buffer(
const std::size_t num_threads,
165 const std::size_t size_per_thread) {
166 resize(num_threads, size_per_thread);
168 ~thread_buffer() {
delete[] m_data; }
170 thread_buffer(
const thread_buffer &) =
delete;
171 thread_buffer(thread_buffer &&) =
delete;
172 thread_buffer &operator=(
const thread_buffer &) =
delete;
173 thread_buffer &operator=(thread_buffer) =
delete;
175 void resize(
const std::size_t num_threads,
176 const std::size_t size_per_thread) {
177 m_num_threads = num_threads;
178 m_size_per_thread = size_per_thread;
180 pad_to_cache_line(m_size_per_thread);
182 std::size_t size_total_new = m_num_threads * m_size_per_thread;
184 if (m_size_total < size_total_new) {
186 m_data =
new char[size_total_new];
187 m_size_total = size_total_new;
191 char *
get(std::size_t thread_num) {
192 assert(thread_num < m_num_threads);
193 if (m_data ==
nullptr) {
196 return &m_data[thread_num * m_size_per_thread];
199 std::size_t size_per_thread() const noexcept {
return m_size_per_thread; }
200 std::size_t size_total() const noexcept {
return m_size_total; }
207 static bool m_hpx_initialized;
208 static std::atomic<uint32_t> m_next_instance_id;
209 uint32_t m_instance_id = 0;
211 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 213 enum class instance_mode { global, independent };
214 instance_mode m_mode;
217 static std::atomic<uint32_t> m_active_parallel_region_count;
219 struct instance_data {
220 instance_data() =
default;
221 instance_data(hpx::shared_future<void> future) : m_future(future) {}
222 Kokkos::Impl::thread_buffer m_buffer;
223 hpx::shared_future<void> m_future = hpx::make_ready_future<void>();
226 mutable std::shared_ptr<instance_data> m_independent_instance_data;
227 static instance_data m_global_instance_data;
229 std::reference_wrapper<Kokkos::Impl::thread_buffer> m_buffer;
230 std::reference_wrapper<hpx::shared_future<void>> m_future;
232 static Kokkos::Impl::thread_buffer m_global_buffer;
236 using execution_space = HPX;
237 using memory_space = HostSpace;
238 using device_type = Kokkos::Device<execution_space, memory_space>;
239 using array_layout = LayoutRight;
240 using size_type = memory_space::size_type;
241 using scratch_memory_space = ScratchMemorySpace<HPX>;
243 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 247 m_mode(instance_mode::global),
248 m_buffer(m_global_instance_data.m_buffer),
249 m_future(m_global_instance_data.m_future) {}
251 HPX(instance_mode mode)
252 : m_instance_id(mode == instance_mode::independent ? m_next_instance_id++
255 m_independent_instance_data(mode == instance_mode::independent
256 ? (new instance_data())
258 m_buffer(mode == instance_mode::independent
259 ? m_independent_instance_data->m_buffer
260 : m_global_instance_data.m_buffer),
261 m_future(mode == instance_mode::independent
262 ? m_independent_instance_data->m_future
263 : m_global_instance_data.m_future) {}
265 HPX(hpx::shared_future<void> future)
266 : m_instance_id(m_next_instance_id++),
267 m_mode(instance_mode::independent),
269 m_independent_instance_data(new instance_data(future)),
270 m_buffer(m_independent_instance_data->m_buffer),
271 m_future(m_independent_instance_data->m_future) {}
273 HPX(
const HPX &other)
274 : m_instance_id(other.m_instance_id),
275 m_mode(other.m_mode),
276 m_independent_instance_data(other.m_independent_instance_data),
277 m_buffer(other.m_buffer),
278 m_future(other.m_future) {}
280 HPX &operator=(
const HPX &other) {
282 other.m_mode == instance_mode::independent ? m_next_instance_id++ : 0;
283 m_mode = other.m_mode;
284 m_independent_instance_data = other.m_independent_instance_data;
285 m_buffer = m_mode == instance_mode::independent
286 ? m_independent_instance_data->m_buffer
287 : m_global_instance_data.m_buffer;
288 m_future = m_mode == instance_mode::independent
289 ? m_independent_instance_data->m_future
290 : m_global_instance_data.m_future;
297 static void print_configuration(std::ostream &,
298 const bool =
false) {
299 std::cout <<
"HPX backend" << std::endl;
301 uint32_t impl_instance_id() const noexcept {
return m_instance_id; }
303 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 304 static bool in_parallel(HPX
const &instance = HPX()) noexcept {
305 return !instance.impl_get_future().is_ready();
308 static bool in_parallel(HPX
const & = HPX()) noexcept {
return false; }
311 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 312 static void impl_decrement_active_parallel_region_count() {
313 --m_active_parallel_region_count;
316 static void impl_increment_active_parallel_region_count() {
317 ++m_active_parallel_region_count;
320 void impl_fence_instance()
const {
321 if (hpx::threads::get_self_ptr() ==
nullptr) {
322 hpx::threads::run_as_hpx_thread([
this]() { impl_get_future().wait(); });
324 impl_get_future().wait();
328 void impl_fence_all_instances()
const {
329 hpx::util::yield_while(
330 []() {
return m_active_parallel_region_count.load() != 0; });
335 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 336 if (m_mode == instance_mode::global) {
337 impl_fence_all_instances();
339 impl_fence_instance();
344 static bool is_asynchronous(HPX
const & = HPX()) noexcept {
345 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 352 static std::vector<HPX> partition(...) {
354 "Kokkos::Experimental::HPX::partition_master: can't partition an HPX " 356 return std::vector<HPX>();
359 template <
typename F>
360 static void partition_master(F
const &,
int requested_num_partitions = 0,
362 if (requested_num_partitions > 1) {
364 "Kokkos::Experimental::HPX::partition_master: can't partition an " 369 static int concurrency();
370 static void impl_initialize(
int thread_count);
371 static void impl_initialize();
372 static bool impl_is_initialized() noexcept;
373 static
void impl_finalize();
375 static
int impl_thread_pool_size() noexcept {
376 hpx::runtime *rt = hpx::get_runtime_ptr();
380 if (hpx::threads::get_self_ptr() ==
nullptr) {
381 return hpx::resource::get_thread_pool(0).get_os_thread_count();
383 return hpx::this_thread::get_pool()->get_os_thread_count();
388 static int impl_thread_pool_rank() noexcept {
389 hpx::runtime *rt = hpx::get_runtime_ptr();
393 if (hpx::threads::get_self_ptr() ==
nullptr) {
396 return hpx::this_thread::get_pool()->get_pool_index();
401 static int impl_thread_pool_size(
int depth) {
403 return impl_thread_pool_size();
409 static int impl_max_hardware_threads() noexcept {
410 return hpx::threads::hardware_concurrency();
413 static int impl_hardware_thread_id() noexcept {
414 return hpx::get_worker_thread_num();
417 Kokkos::Impl::thread_buffer &impl_get_buffer() const noexcept {
418 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 419 return m_buffer.get();
421 return m_global_buffer;
425 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 426 hpx::shared_future<void> &impl_get_future() const noexcept {
431 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 432 struct KOKKOS_ATTRIBUTE_NODISCARD reset_on_exit_parallel {
434 reset_on_exit_parallel(HPX
const &space) : m_space(space) {}
435 ~reset_on_exit_parallel() {
438 m_space.m_independent_instance_data.reset();
440 HPX::impl_decrement_active_parallel_region_count();
445 static constexpr
const char *name() noexcept {
return "HPX"; }
452 struct DeviceTypeTraits<
Kokkos::Experimental::HPX> {
453 static constexpr DeviceType
id = DeviceType::HPX;
460 class HPXSpaceInitializer :
public ExecSpaceInitializerBase {
462 HPXSpaceInitializer() =
default;
463 ~HPXSpaceInitializer() =
default;
464 void initialize(
const InitArguments &args)
final;
465 void finalize(
const bool)
final;
467 void print_configuration(std::ostream &msg, const
bool detail) final;
470 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 471 template <
typename Closure>
472 inline void dispatch_execute_task(Closure *closure,
473 Kokkos::Experimental::HPX
const &instance,
474 bool force_synchronous =
false) {
475 Kokkos::Experimental::HPX::impl_increment_active_parallel_region_count();
477 if (hpx::threads::get_self_ptr() ==
nullptr) {
478 hpx::threads::run_as_hpx_thread([closure, &instance]() {
479 hpx::shared_future<void> &fut = instance.impl_get_future();
480 Closure closure_copy = *closure;
481 fut = fut.then([closure_copy](hpx::shared_future<void> &&) {
482 closure_copy.execute_task();
486 hpx::shared_future<void> &fut = instance.impl_get_future();
487 Closure closure_copy = *closure;
488 fut = fut.then([closure_copy](hpx::shared_future<void> &&) {
489 closure_copy.execute_task();
493 if (force_synchronous) {
498 template <
typename Closure>
499 inline void dispatch_execute_task(Closure *closure,
500 Kokkos::Experimental::HPX
const &,
502 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 503 Kokkos::Experimental::HPX::impl_increment_active_parallel_region_count();
506 if (hpx::threads::get_self_ptr() ==
nullptr) {
507 hpx::threads::run_as_hpx_thread([closure]() { closure->execute_task(); });
509 closure->execute_task();
519 struct MemorySpaceAccess<
Kokkos::Experimental::HPX::memory_space,
520 Kokkos::Experimental::HPX::scratch_memory_space> {
521 enum :
bool { assignable =
false };
522 enum :
bool { accessible =
true };
523 enum :
bool { deepcopy =
false };
532 class UniqueToken<HPX, UniqueTokenScope::Instance> {
536 buffer_type m_buffer_view;
537 uint32_t
volatile *m_buffer;
540 using execution_space = HPX;
541 using size_type = int;
546 UniqueToken(execution_space
const & = execution_space()) noexcept
547 : m_count(execution_space::impl_max_hardware_threads()),
548 m_buffer_view(buffer_type()),
551 UniqueToken(size_type max_size, execution_space
const & = execution_space())
552 : m_count(max_size > execution_space::impl_max_hardware_threads()
553 ? execution_space::impl_max_hardware_threads()
556 max_size > execution_space::impl_max_hardware_threads()
558 : buffer_type(
"UniqueToken::m_buffer_view",
559 ::
Kokkos::Impl::concurrent_bitset::buffer_bound(
561 m_buffer(m_buffer_view.data()) {}
564 KOKKOS_INLINE_FUNCTION
565 int size() const noexcept {
return m_count; }
568 KOKKOS_INLINE_FUNCTION
570 #if defined(KOKKOS_ACTIVE_EXECUTION_MEMORY_SPACE_HOST) 571 if (m_buffer ==
nullptr) {
572 return execution_space::impl_hardware_thread_id();
574 const ::Kokkos::pair<int, int> result =
575 ::Kokkos::Impl::concurrent_bitset::acquire_bounded(
576 m_buffer, m_count, ::Kokkos::Impl::clock_tic() % m_count);
578 if (result.first < 0) {
580 "UniqueToken<HPX> failure to acquire tokens, no tokens " 591 KOKKOS_INLINE_FUNCTION
592 void release(
int i)
const noexcept {
593 #if defined(KOKKOS_ACTIVE_EXECUTION_MEMORY_SPACE_HOST) 594 if (m_buffer !=
nullptr) {
595 ::Kokkos::Impl::concurrent_bitset::release(m_buffer, i);
604 class UniqueToken<HPX, UniqueTokenScope::Global> {
606 using execution_space = HPX;
607 using size_type = int;
608 UniqueToken(execution_space
const & = execution_space()) noexcept {}
613 int size() const noexcept {
return HPX::impl_max_hardware_threads(); }
614 int acquire() const noexcept {
return HPX::impl_hardware_thread_id(); }
615 void release(
int)
const noexcept {}
623 struct HPXTeamMember {
625 using execution_space = Kokkos::Experimental::HPX;
626 using scratch_memory_space =
630 scratch_memory_space m_team_shared;
638 KOKKOS_INLINE_FUNCTION
639 const scratch_memory_space &team_shmem()
const {
640 return m_team_shared.set_team_thread_mode(0, 1, 0);
643 KOKKOS_INLINE_FUNCTION
644 const execution_space::scratch_memory_space &team_scratch(
const int)
const {
645 return m_team_shared.set_team_thread_mode(0, 1, 0);
648 KOKKOS_INLINE_FUNCTION
649 const execution_space::scratch_memory_space &thread_scratch(
const int)
const {
650 return m_team_shared.set_team_thread_mode(0, team_size(), team_rank());
653 KOKKOS_INLINE_FUNCTION
int league_rank() const noexcept {
654 return m_league_rank;
657 KOKKOS_INLINE_FUNCTION
int league_size() const noexcept {
658 return m_league_size;
661 KOKKOS_INLINE_FUNCTION
int team_rank() const noexcept {
return m_team_rank; }
662 KOKKOS_INLINE_FUNCTION
int team_size() const noexcept {
return m_team_size; }
664 template <
class... Properties>
665 constexpr KOKKOS_INLINE_FUNCTION HPXTeamMember(
666 const TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>
668 const int team_rank,
const int league_rank,
void *scratch,
669 int scratch_size) noexcept
670 : m_team_shared(scratch, scratch_size, scratch, scratch_size),
671 m_league_size(policy.league_size()),
672 m_league_rank(league_rank),
673 m_team_size(policy.team_size()),
674 m_team_rank(team_rank) {}
676 KOKKOS_INLINE_FUNCTION
677 void team_barrier()
const {}
679 template <
class ValueType>
680 KOKKOS_INLINE_FUNCTION
void team_broadcast(ValueType &,
const int &)
const {
681 static_assert(std::is_trivially_default_constructible<ValueType>(),
682 "Only trivial constructible types can be broadcasted");
685 template <
class Closure,
class ValueType>
686 KOKKOS_INLINE_FUNCTION
void team_broadcast(
const Closure &, ValueType &,
688 static_assert(std::is_trivially_default_constructible<ValueType>(),
689 "Only trivial constructible types can be broadcasted");
692 template <
class ValueType,
class JoinOp>
693 KOKKOS_INLINE_FUNCTION ValueType team_reduce(
const ValueType &value,
694 const JoinOp &)
const {
698 template <
class ReducerType>
699 KOKKOS_INLINE_FUNCTION
700 typename std::enable_if<is_reducer<ReducerType>::value>::type
701 team_reduce(
const ReducerType &)
const {}
703 template <
typename Type>
704 KOKKOS_INLINE_FUNCTION Type
705 team_scan(
const Type &value, Type *
const global_accum =
nullptr)
const {
707 Kokkos::atomic_fetch_add(global_accum, value);
714 template <
class... Properties>
715 class TeamPolicyInternal<
Kokkos::Experimental::HPX, Properties...>
716 :
public PolicyTraits<Properties...> {
717 using traits = PolicyTraits<Properties...>;
721 std::size_t m_team_scratch_size[2];
722 std::size_t m_thread_scratch_size[2];
727 using execution_policy = TeamPolicyInternal;
729 using member_type = HPXTeamMember;
732 using execution_space = Kokkos::Experimental::HPX;
736 template <
class FunctorType>
737 inline static int team_size_max(
const FunctorType &) {
741 template <
class FunctorType>
742 inline static int team_size_recommended(
const FunctorType &) {
746 template <
class FunctorType>
747 inline static int team_size_recommended(
const FunctorType &,
const int &) {
751 template <
class FunctorType>
752 int team_size_max(
const FunctorType &,
const ParallelForTag &)
const {
756 template <
class FunctorType>
757 int team_size_max(
const FunctorType &,
const ParallelReduceTag &)
const {
761 template <
class FunctorType,
class ReducerType>
762 int team_size_max(
const FunctorType &,
const ReducerType &,
763 const ParallelReduceTag &)
const {
767 template <
class FunctorType>
768 int team_size_recommended(
const FunctorType &,
const ParallelForTag &)
const {
772 template <
class FunctorType>
773 int team_size_recommended(
const FunctorType &,
774 const ParallelReduceTag &)
const {
778 template <
class FunctorType,
class ReducerType>
779 int team_size_recommended(
const FunctorType &,
const ReducerType &,
780 const ParallelReduceTag &)
const {
784 static int vector_length_max() {
return 1; }
786 inline int impl_vector_length() noexcept {
return 1; }
787 inline bool impl_auto_team_size() noexcept {
return false; }
788 inline bool impl_auto_vector_length() noexcept {
return false; }
789 inline void impl_set_vector_length(
int) noexcept {}
790 inline void impl_set_team_size(
int) noexcept {}
793 inline void init(
const int league_size_request,
const int team_size_request) {
794 m_league_size = league_size_request;
795 const int max_team_size = 1;
798 team_size_request > max_team_size ? max_team_size : team_size_request;
800 if (m_chunk_size > 0) {
801 if (!Impl::is_integral_power_of_two(m_chunk_size))
802 Kokkos::abort(
"TeamPolicy blocking granularity must be power of two");
804 int new_chunk_size = 1;
805 while (new_chunk_size * 4 * Kokkos::Experimental::HPX::concurrency() <
810 if (new_chunk_size < 128) {
812 while ((new_chunk_size * Kokkos::Experimental::HPX::concurrency() <
814 (new_chunk_size < 128))
818 m_chunk_size = new_chunk_size;
823 inline int team_size()
const {
return m_team_size; }
824 inline int league_size()
const {
return m_league_size; }
826 inline size_t scratch_size(
const int &level,
int team_size_ = -1)
const {
827 if (team_size_ < 0) {
828 team_size_ = m_team_size;
830 return m_team_scratch_size[level] +
831 team_size_ * m_thread_scratch_size[level];
834 inline static int scratch_size_max(
int level) {
835 return (level == 0 ? 1024 * 32 :
840 template <
class ExecSpace,
class... OtherProperties>
841 friend class TeamPolicyInternal;
843 const typename traits::execution_space &space()
const {
844 static typename traits::execution_space m_space;
848 template <
class... OtherProperties>
849 TeamPolicyInternal(
const TeamPolicyInternal<Kokkos::Experimental::HPX,
850 OtherProperties...> &p) {
851 m_league_size = p.m_league_size;
852 m_team_size = p.m_team_size;
853 m_team_scratch_size[0] = p.m_team_scratch_size[0];
854 m_thread_scratch_size[0] = p.m_thread_scratch_size[0];
855 m_team_scratch_size[1] = p.m_team_scratch_size[1];
856 m_thread_scratch_size[1] = p.m_thread_scratch_size[1];
857 m_chunk_size = p.m_chunk_size;
860 TeamPolicyInternal(
const typename traits::execution_space &,
861 int league_size_request,
int team_size_request,
863 : m_team_scratch_size{0, 0},
864 m_thread_scratch_size{0, 0},
866 init(league_size_request, team_size_request);
869 TeamPolicyInternal(
const typename traits::execution_space &,
870 int league_size_request,
const Kokkos::AUTO_t &,
872 : m_team_scratch_size{0, 0},
873 m_thread_scratch_size{0, 0},
875 init(league_size_request, 1);
878 TeamPolicyInternal(
const typename traits::execution_space &space,
879 int league_size_request,
880 const Kokkos::AUTO_t &,
881 const Kokkos::AUTO_t & )
882 : m_team_scratch_size{0, 0},
883 m_thread_scratch_size{0, 0},
885 init(league_size_request, 1);
888 TeamPolicyInternal(
const typename traits::execution_space &space,
889 int league_size_request,
int team_size_request,
890 const Kokkos::AUTO_t &
892 : m_team_scratch_size{0, 0},
893 m_thread_scratch_size{0, 0},
895 init(league_size_request, team_size_request);
898 TeamPolicyInternal(
int league_size_request,
899 const Kokkos::AUTO_t &,
900 const Kokkos::AUTO_t & )
901 : m_team_scratch_size{0, 0},
902 m_thread_scratch_size{0, 0},
904 init(league_size_request, 1);
907 TeamPolicyInternal(
int league_size_request,
int team_size_request,
908 const Kokkos::AUTO_t &
910 : m_team_scratch_size{0, 0},
911 m_thread_scratch_size{0, 0},
913 init(league_size_request, team_size_request);
916 TeamPolicyInternal(
int league_size_request,
int team_size_request,
918 : m_team_scratch_size{0, 0},
919 m_thread_scratch_size{0, 0},
921 init(league_size_request, team_size_request);
924 TeamPolicyInternal(
int league_size_request,
const Kokkos::AUTO_t &,
926 : m_team_scratch_size{0, 0},
927 m_thread_scratch_size{0, 0},
929 init(league_size_request, 1);
932 inline int chunk_size()
const {
return m_chunk_size; }
934 inline TeamPolicyInternal &set_chunk_size(
935 typename traits::index_type chunk_size_) {
936 m_chunk_size = chunk_size_;
940 inline TeamPolicyInternal &set_scratch_size(
const int &level,
941 const PerTeamValue &per_team) {
942 m_team_scratch_size[level] = per_team.value;
946 inline TeamPolicyInternal &set_scratch_size(
947 const int &level,
const PerThreadValue &per_thread) {
948 m_thread_scratch_size[level] = per_thread.value;
952 inline TeamPolicyInternal &set_scratch_size(
953 const int &level,
const PerTeamValue &per_team,
954 const PerThreadValue &per_thread) {
955 m_team_scratch_size[level] = per_team.value;
956 m_thread_scratch_size[level] = per_thread.value;
966 template <
class FunctorType,
class... Traits>
967 class ParallelFor<FunctorType,
Kokkos::RangePolicy<Traits...>,
968 Kokkos::Experimental::HPX> {
971 using WorkTag =
typename Policy::work_tag;
972 using WorkRange =
typename Policy::WorkRange;
973 using Member =
typename Policy::member_type;
975 const FunctorType m_functor;
976 const Policy m_policy;
978 template <
class TagType>
979 static typename std::enable_if<std::is_same<TagType, void>::value>::type
980 execute_functor(
const FunctorType &functor,
const Member i) {
984 template <
class TagType>
985 static typename std::enable_if<!std::is_same<TagType, void>::value>::type
986 execute_functor(
const FunctorType &functor,
const Member i) {
991 template <
class TagType>
992 static typename std::enable_if<std::is_same<TagType, void>::value>::type
993 execute_functor_range(
const FunctorType &functor,
const Member i_begin,
994 const Member i_end) {
995 for (Member i = i_begin; i < i_end; ++i) {
1000 template <
class TagType>
1001 static typename std::enable_if<!std::is_same<TagType, void>::value>::type
1002 execute_functor_range(
const FunctorType &functor,
const Member i_begin,
1003 const Member i_end) {
1005 for (Member i = i_begin; i < i_end; ++i) {
1011 void execute()
const {
1012 Kokkos::Impl::dispatch_execute_task(
this, m_policy.space());
1015 void execute_task()
const {
1017 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 1018 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1022 #if KOKKOS_HPX_IMPLEMENTATION == 0 1023 using hpx::parallel::for_loop;
1024 using hpx::parallel::execution::par;
1025 using hpx::parallel::execution::static_chunk_size;
1027 for_loop(par.with(static_chunk_size(m_policy.chunk_size())),
1028 m_policy.begin(), m_policy.end(), [
this](
const Member i) {
1029 execute_functor<WorkTag>(m_functor, i);
1032 #elif KOKKOS_HPX_IMPLEMENTATION == 1 1034 using hpx::lcos::local::latch;
1036 const int num_tasks =
1037 (m_policy.end() - m_policy.begin() + m_policy.chunk_size() - 1) /
1038 m_policy.chunk_size();
1039 latch num_tasks_remaining(num_tasks);
1040 ChunkedRoundRobinExecutor exec(num_tasks);
1042 for (Member i_begin = m_policy.begin(); i_begin < m_policy.end();
1043 i_begin += m_policy.chunk_size()) {
1044 apply(exec, [
this, &num_tasks_remaining, i_begin]() {
1045 const Member i_end =
1046 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1047 execute_functor_range<WorkTag>(m_functor, i_begin, i_end);
1049 num_tasks_remaining.count_down(1);
1053 num_tasks_remaining.wait();
1055 #elif KOKKOS_HPX_IMPLEMENTATION == 2 1056 using hpx::parallel::for_loop_strided;
1057 using hpx::parallel::execution::par;
1058 using hpx::parallel::execution::static_chunk_size;
1060 const int num_tasks =
1061 (m_policy.end() - m_policy.begin() + m_policy.chunk_size() - 1) /
1062 m_policy.chunk_size();
1063 ChunkedRoundRobinExecutor exec(num_tasks);
1066 par.on(exec).with(static_chunk_size(1)), m_policy.begin(),
1067 m_policy.end(), m_policy.chunk_size(), [
this](
const Member i_begin) {
1068 const Member i_end =
1069 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1070 execute_functor_range<WorkTag>(m_functor, i_begin, i_end);
1075 inline ParallelFor(
const FunctorType &arg_functor, Policy arg_policy)
1076 : m_functor(arg_functor), m_policy(arg_policy) {}
1079 template <
class FunctorType,
class... Traits>
1080 class ParallelFor<FunctorType,
Kokkos::MDRangePolicy<Traits...>,
1081 Kokkos::Experimental::HPX> {
1083 using MDRangePolicy = Kokkos::MDRangePolicy<Traits...>;
1084 using Policy =
typename MDRangePolicy::impl_range_policy;
1085 using WorkTag =
typename MDRangePolicy::work_tag;
1086 using WorkRange =
typename Policy::WorkRange;
1087 using Member =
typename Policy::member_type;
1088 using iterate_type =
1089 typename Kokkos::Impl::HostIterateTile<MDRangePolicy, FunctorType,
1092 const FunctorType m_functor;
1093 const MDRangePolicy m_mdr_policy;
1094 const Policy m_policy;
1097 void execute()
const { dispatch_execute_task(
this, m_mdr_policy.space()); }
1099 inline void execute_task()
const {
1101 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 1102 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1103 m_mdr_policy.space());
1106 #if KOKKOS_HPX_IMPLEMENTATION == 0 1107 using hpx::parallel::for_loop;
1108 using hpx::parallel::execution::par;
1109 using hpx::parallel::execution::static_chunk_size;
1111 for_loop(par.with(static_chunk_size(m_policy.chunk_size())),
1112 m_policy.begin(), m_policy.end(), [
this](
const Member i) {
1113 iterate_type(m_mdr_policy, m_functor)(i);
1116 #elif KOKKOS_HPX_IMPLEMENTATION == 1 1118 using hpx::lcos::local::latch;
1120 const int num_tasks =
1121 (m_policy.end() - m_policy.begin() + m_policy.chunk_size() - 1) /
1122 m_policy.chunk_size();
1123 latch num_tasks_remaining(num_tasks);
1124 ChunkedRoundRobinExecutor exec(num_tasks);
1126 for (Member i_begin = m_policy.begin(); i_begin < m_policy.end();
1127 i_begin += m_policy.chunk_size()) {
1128 apply(exec, [
this, &num_tasks_remaining, i_begin]() {
1129 const Member i_end =
1130 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1131 for (Member i = i_begin; i < i_end; ++i) {
1132 iterate_type(m_mdr_policy, m_functor)(i);
1135 num_tasks_remaining.count_down(1);
1139 num_tasks_remaining.wait();
1141 #elif KOKKOS_HPX_IMPLEMENTATION == 2 1142 using hpx::parallel::for_loop_strided;
1143 using hpx::parallel::execution::par;
1144 using hpx::parallel::execution::static_chunk_size;
1146 const int num_tasks =
1147 (m_policy.end() - m_policy.begin() + m_policy.chunk_size() - 1) /
1148 m_policy.chunk_size();
1149 ChunkedRoundRobinExecutor exec(num_tasks);
1152 par.on(exec).with(static_chunk_size(1)), m_policy.begin(),
1153 m_policy.end(), m_policy.chunk_size(), [
this](
const Member i_begin) {
1154 const Member i_end =
1155 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1156 for (Member i = i_begin; i < i_end; ++i) {
1157 iterate_type(m_mdr_policy, m_functor)(i);
1163 inline ParallelFor(
const FunctorType &arg_functor, MDRangePolicy arg_policy)
1164 : m_functor(arg_functor),
1165 m_mdr_policy(arg_policy),
1166 m_policy(Policy(0, m_mdr_policy.m_num_tiles).set_chunk_size(1)) {}
1167 template <
typename Policy,
typename Functor>
1168 static int max_tile_size_product(
const Policy &,
const Functor &) {
1182 template <
class FunctorType,
class ReducerType,
class... Traits>
1183 class ParallelReduce<FunctorType,
Kokkos::RangePolicy<Traits...>, ReducerType,
1184 Kokkos::Experimental::HPX> {
1187 using WorkTag =
typename Policy::work_tag;
1188 using WorkRange =
typename Policy::WorkRange;
1189 using Member =
typename Policy::member_type;
1191 FunctorAnalysis<FunctorPatternInterface::REDUCE, Policy, FunctorType>;
1192 using ReducerConditional =
1193 Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1194 FunctorType, ReducerType>;
1195 using ReducerTypeFwd =
typename ReducerConditional::type;
1197 typename Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1198 WorkTag,
void>::type;
1199 using ValueInit = Kokkos::Impl::FunctorValueInit<ReducerTypeFwd, WorkTagFwd>;
1200 using ValueFinal = Kokkos::Impl::FunctorFinal<ReducerTypeFwd, WorkTagFwd>;
1201 using ValueJoin = Kokkos::Impl::FunctorValueJoin<ReducerTypeFwd, WorkTagFwd>;
1202 using ValueOps = Kokkos::Impl::FunctorValueOps<ReducerTypeFwd, WorkTagFwd>;
1203 using value_type =
typename Analysis::value_type;
1204 using pointer_type =
typename Analysis::pointer_type;
1205 using reference_type =
typename Analysis::reference_type;
1207 const FunctorType m_functor;
1208 const Policy m_policy;
1209 const ReducerType m_reducer;
1210 const pointer_type m_result_ptr;
1212 bool m_force_synchronous;
1214 template <
class TagType>
1216 typename std::enable_if<std::is_same<TagType, void>::value>::type
1217 execute_functor(
const FunctorType &functor,
const Member i,
1218 reference_type update) {
1222 template <
class TagType>
1224 typename std::enable_if<!std::is_same<TagType, void>::value>::type
1225 execute_functor(
const FunctorType &functor,
const Member i,
1226 reference_type update) {
1228 functor(t, i, update);
1231 template <
class TagType>
1232 inline typename std::enable_if<std::is_same<TagType, void>::value>::type
1233 execute_functor_range(reference_type update,
const Member i_begin,
1234 const Member i_end)
const {
1235 for (Member i = i_begin; i < i_end; ++i) {
1236 m_functor(i, update);
1240 template <
class TagType>
1241 inline typename std::enable_if<!std::is_same<TagType, void>::value>::type
1242 execute_functor_range(reference_type update,
const Member i_begin,
1243 const Member i_end)
const {
1246 for (Member i = i_begin; i < i_end; ++i) {
1247 m_functor(t, i, update);
1251 class value_type_wrapper {
1253 std::size_t m_value_size;
1254 char *m_value_buffer;
1257 value_type_wrapper() : m_value_size(0), m_value_buffer(nullptr) {}
1259 value_type_wrapper(
const std::size_t value_size)
1260 : m_value_size(value_size), m_value_buffer(new char[m_value_size]) {}
1262 value_type_wrapper(
const value_type_wrapper &other)
1263 : m_value_size(0), m_value_buffer(nullptr) {
1264 if (
this != &other) {
1265 m_value_buffer =
new char[other.m_value_size];
1266 m_value_size = other.m_value_size;
1268 std::copy(other.m_value_buffer, other.m_value_buffer + m_value_size,
1273 ~value_type_wrapper() {
delete[] m_value_buffer; }
1275 value_type_wrapper(value_type_wrapper &&other)
1276 : m_value_size(0), m_value_buffer(nullptr) {
1277 if (
this != &other) {
1278 m_value_buffer = other.m_value_buffer;
1279 m_value_size = other.m_value_size;
1281 other.m_value_buffer =
nullptr;
1282 other.m_value_size = 0;
1286 value_type_wrapper &operator=(
const value_type_wrapper &other) {
1287 if (
this != &other) {
1288 delete[] m_value_buffer;
1289 m_value_buffer =
new char[other.m_value_size];
1290 m_value_size = other.m_value_size;
1292 std::copy(other.m_value_buffer, other.m_value_buffer + m_value_size,
1299 value_type_wrapper &operator=(value_type_wrapper &&other) {
1300 if (
this != &other) {
1301 delete[] m_value_buffer;
1302 m_value_buffer = other.m_value_buffer;
1303 m_value_size = other.m_value_size;
1305 other.m_value_buffer =
nullptr;
1306 other.m_value_size = 0;
1312 pointer_type pointer()
const {
1313 return reinterpret_cast<pointer_type
>(m_value_buffer);
1316 reference_type reference()
const {
1317 return ValueOps::reference(
1318 reinterpret_cast<pointer_type>(m_value_buffer));
1323 void execute()
const {
1324 if (m_policy.end() <= m_policy.begin()) {
1326 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
1328 ValueFinal::final(ReducerConditional::select(m_functor, m_reducer),
1333 dispatch_execute_task(
this, m_policy.space(), m_force_synchronous);
1336 inline void execute_task()
const {
1338 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 1339 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1343 const std::size_t value_size =
1344 Analysis::value_size(ReducerConditional::select(m_functor, m_reducer));
1346 #if KOKKOS_HPX_IMPLEMENTATION == 0 1351 using hpx::parallel::for_loop;
1352 using hpx::parallel::reduction;
1353 using hpx::parallel::execution::par;
1354 using hpx::parallel::execution::static_chunk_size;
1356 value_type_wrapper final_value(value_size);
1357 value_type_wrapper identity(value_size);
1359 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
1360 final_value.pointer());
1361 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
1362 identity.pointer());
1364 for_loop(par.with(static_chunk_size(m_policy.chunk_size())),
1365 m_policy.begin(), m_policy.end(),
1366 reduction(final_value, identity,
1367 [
this](value_type_wrapper &a,
1368 value_type_wrapper &b) -> value_type_wrapper & {
1370 ReducerConditional::select(m_functor, m_reducer),
1371 a.pointer(), b.pointer());
1374 [
this](Member i, value_type_wrapper &update) {
1375 execute_functor<WorkTag>(m_functor, i, update.reference());
1378 pointer_type final_value_ptr = final_value.pointer();
1380 #elif KOKKOS_HPX_IMPLEMENTATION == 1 1381 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1383 thread_buffer &buffer = m_policy.space().impl_get_buffer();
1384 buffer.resize(num_worker_threads, value_size);
1387 using hpx::lcos::local::latch;
1390 latch num_tasks_remaining(num_worker_threads);
1391 ChunkedRoundRobinExecutor exec(num_worker_threads);
1393 for (
int t = 0; t < num_worker_threads; ++t) {
1394 apply(exec, [
this, &num_tasks_remaining, &buffer, t]() {
1395 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
1396 reinterpret_cast<pointer_type>(buffer.get(t)));
1398 num_tasks_remaining.count_down(1);
1402 num_tasks_remaining.wait();
1405 const int num_tasks =
1406 (m_policy.end() - m_policy.begin() + m_policy.chunk_size() - 1) /
1407 m_policy.chunk_size();
1408 latch num_tasks_remaining(num_tasks);
1409 ChunkedRoundRobinExecutor exec(num_tasks);
1411 for (Member i_begin = m_policy.begin(); i_begin < m_policy.end();
1412 i_begin += m_policy.chunk_size()) {
1413 apply(exec, [
this, &num_tasks_remaining, &buffer, i_begin]() {
1414 reference_type update =
1415 ValueOps::reference(reinterpret_cast<pointer_type>(buffer.get(
1416 Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1417 const Member i_end =
1418 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1419 execute_functor_range<WorkTag>(update, i_begin, i_end);
1421 num_tasks_remaining.count_down(1);
1425 num_tasks_remaining.wait();
1427 for (
int i = 1; i < num_worker_threads; ++i) {
1428 ValueJoin::join(ReducerConditional::select(m_functor, m_reducer),
1429 reinterpret_cast<pointer_type>(buffer.get(0)),
1430 reinterpret_cast<pointer_type>(buffer.get(i)));
1433 pointer_type final_value_ptr =
1434 reinterpret_cast<pointer_type
>(buffer.get(0));
1436 #elif KOKKOS_HPX_IMPLEMENTATION == 2 1437 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1439 thread_buffer &buffer = m_policy.space().impl_get_buffer();
1440 buffer.resize(num_worker_threads, value_size);
1442 using hpx::parallel::for_loop;
1443 using hpx::parallel::for_loop_strided;
1444 using hpx::parallel::execution::par;
1445 using hpx::parallel::execution::static_chunk_size;
1448 ChunkedRoundRobinExecutor exec(num_worker_threads);
1450 for_loop(par.on(exec).with(static_chunk_size(1)), std::size_t(0),
1451 num_worker_threads, [
this, &buffer](
const std::size_t t) {
1453 ReducerConditional::select(m_functor, m_reducer),
1454 reinterpret_cast<pointer_type>(buffer.get(t)));
1458 const int num_tasks =
1459 (m_policy.end() - m_policy.begin() + m_policy.chunk_size() - 1) /
1460 m_policy.chunk_size();
1461 ChunkedRoundRobinExecutor exec(num_tasks);
1464 par.on(exec).with(static_chunk_size(1)), m_policy.begin(),
1465 m_policy.end(), m_policy.chunk_size(),
1466 [
this, &buffer](
const Member i_begin) {
1467 reference_type update =
1468 ValueOps::reference(reinterpret_cast<pointer_type>(buffer.get(
1469 Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1470 const Member i_end =
1471 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1472 execute_functor_range<WorkTag>(update, i_begin, i_end);
1475 for (
int i = 1; i < num_worker_threads; ++i) {
1476 ValueJoin::join(ReducerConditional::select(m_functor, m_reducer),
1477 reinterpret_cast<pointer_type>(buffer.get(0)),
1478 reinterpret_cast<pointer_type>(buffer.get(i)));
1481 pointer_type final_value_ptr =
1482 reinterpret_cast<pointer_type
>(buffer.get(0));
1485 Kokkos::Impl::FunctorFinal<ReducerTypeFwd, WorkTagFwd>::final(
1486 ReducerConditional::select(m_functor, m_reducer), final_value_ptr);
1488 if (m_result_ptr !=
nullptr) {
1489 const int n = Analysis::value_count(
1490 ReducerConditional::select(m_functor, m_reducer));
1492 for (
int j = 0; j < n; ++j) {
1493 m_result_ptr[j] = final_value_ptr[j];
1498 template <
class ViewType>
1499 inline ParallelReduce(
1500 const FunctorType &arg_functor, Policy arg_policy,
1501 const ViewType &arg_view,
1502 typename std::enable_if<Kokkos::is_view<ViewType>::value &&
1503 !Kokkos::is_reducer_type<ReducerType>::value,
1504 void *>::type =
nullptr)
1505 : m_functor(arg_functor),
1506 m_policy(arg_policy),
1507 m_reducer(InvalidType()),
1508 m_result_ptr(arg_view.data()),
1509 m_force_synchronous(!arg_view.impl_track().has_record()) {}
1511 inline ParallelReduce(
const FunctorType &arg_functor, Policy arg_policy,
1512 const ReducerType &reducer)
1513 : m_functor(arg_functor),
1514 m_policy(arg_policy),
1516 m_result_ptr(reducer.view().data()),
1517 m_force_synchronous(!reducer.view().impl_track().has_record()) {}
1520 template <
class FunctorType,
class ReducerType,
class... Traits>
1521 class ParallelReduce<FunctorType,
Kokkos::MDRangePolicy<Traits...>, ReducerType,
1522 Kokkos::Experimental::HPX> {
1524 using MDRangePolicy = Kokkos::MDRangePolicy<Traits...>;
1525 using Policy =
typename MDRangePolicy::impl_range_policy;
1526 using WorkTag =
typename MDRangePolicy::work_tag;
1527 using WorkRange =
typename Policy::WorkRange;
1528 using Member =
typename Policy::member_type;
1529 using Analysis = FunctorAnalysis<FunctorPatternInterface::REDUCE,
1530 MDRangePolicy, FunctorType>;
1531 using ReducerConditional =
1532 Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1533 FunctorType, ReducerType>;
1534 using ReducerTypeFwd =
typename ReducerConditional::type;
1536 typename Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1537 WorkTag,
void>::type;
1538 using ValueInit = Kokkos::Impl::FunctorValueInit<ReducerTypeFwd, WorkTagFwd>;
1539 using ValueFinal = Kokkos::Impl::FunctorFinal<ReducerTypeFwd, WorkTagFwd>;
1540 using ValueJoin = Kokkos::Impl::FunctorValueJoin<ReducerTypeFwd, WorkTagFwd>;
1541 using ValueOps = Kokkos::Impl::FunctorValueOps<ReducerTypeFwd, WorkTagFwd>;
1542 using pointer_type =
typename Analysis::pointer_type;
1543 using value_type =
typename Analysis::value_type;
1544 using reference_type =
typename Analysis::reference_type;
1545 using iterate_type =
1546 typename Kokkos::Impl::HostIterateTile<MDRangePolicy, FunctorType,
1547 WorkTag, reference_type>;
1549 const FunctorType m_functor;
1550 const MDRangePolicy m_mdr_policy;
1551 const Policy m_policy;
1552 const ReducerType m_reducer;
1553 const pointer_type m_result_ptr;
1555 bool m_force_synchronous;
1558 void execute()
const {
1559 dispatch_execute_task(
this, m_mdr_policy.space(), m_force_synchronous);
1562 inline void execute_task()
const {
1564 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 1565 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1566 m_mdr_policy.space());
1569 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1570 const std::size_t value_size =
1571 Analysis::value_size(ReducerConditional::select(m_functor, m_reducer));
1573 thread_buffer &buffer = m_mdr_policy.space().impl_get_buffer();
1574 buffer.resize(num_worker_threads, value_size);
1576 #if KOKKOS_HPX_IMPLEMENTATION == 0 1577 using hpx::parallel::for_loop;
1578 using hpx::parallel::execution::par;
1579 using hpx::parallel::execution::static_chunk_size;
1581 for_loop(par, 0, num_worker_threads, [
this, &buffer](std::size_t t) {
1582 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
1583 reinterpret_cast<pointer_type>(buffer.get(t)));
1586 for_loop(par.with(static_chunk_size(m_policy.chunk_size())),
1587 m_policy.begin(), m_policy.end(), [
this, &buffer](
const Member i) {
1588 reference_type update = ValueOps::reference(
1589 reinterpret_cast<pointer_type>(buffer.get(
1590 Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1591 iterate_type(m_mdr_policy, m_functor, update)(i);
1594 #elif KOKKOS_HPX_IMPLEMENTATION == 1 1596 using hpx::lcos::local::latch;
1599 latch num_tasks_remaining(num_worker_threads);
1600 ChunkedRoundRobinExecutor exec(num_worker_threads);
1602 for (
int t = 0; t < num_worker_threads; ++t) {
1603 apply(exec, [
this, &buffer, &num_tasks_remaining, t]() {
1604 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
1605 reinterpret_cast<pointer_type>(buffer.get(t)));
1607 num_tasks_remaining.count_down(1);
1611 num_tasks_remaining.wait();
1614 const int num_tasks =
1615 (m_policy.end() - m_policy.begin() + m_policy.chunk_size() - 1) /
1616 m_policy.chunk_size();
1617 latch num_tasks_remaining(num_tasks);
1618 ChunkedRoundRobinExecutor exec(num_tasks);
1620 for (Member i_begin = m_policy.begin(); i_begin < m_policy.end();
1621 i_begin += m_policy.chunk_size()) {
1622 apply(exec, [
this, &num_tasks_remaining, &buffer, i_begin]() {
1623 reference_type update =
1624 ValueOps::reference(reinterpret_cast<pointer_type>(buffer.get(
1625 Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1626 const Member i_end =
1627 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1629 for (Member i = i_begin; i < i_end; ++i) {
1630 iterate_type(m_mdr_policy, m_functor, update)(i);
1633 num_tasks_remaining.count_down(1);
1637 num_tasks_remaining.wait();
1639 #elif KOKKOS_HPX_IMPLEMENTATION == 2 1640 using hpx::parallel::for_loop;
1641 using hpx::parallel::for_loop_strided;
1642 using hpx::parallel::execution::par;
1643 using hpx::parallel::execution::static_chunk_size;
1646 ChunkedRoundRobinExecutor exec(num_worker_threads);
1648 for_loop(par.on(exec).with(static_chunk_size(1)), std::size_t(0),
1649 num_worker_threads, [
this, &buffer](
const std::size_t t) {
1651 ReducerConditional::select(m_functor, m_reducer),
1652 reinterpret_cast<pointer_type>(buffer.get(t)));
1656 const int num_tasks =
1657 (m_policy.end() - m_policy.begin() + m_policy.chunk_size() - 1) /
1658 m_policy.chunk_size();
1659 ChunkedRoundRobinExecutor exec(num_tasks);
1662 par.on(exec).with(static_chunk_size(1)), m_policy.begin(),
1663 m_policy.end(), m_policy.chunk_size(),
1664 [
this, &buffer](
const Member i_begin) {
1665 reference_type update =
1666 ValueOps::reference(reinterpret_cast<pointer_type>(buffer.get(
1667 Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1668 const Member i_end =
1669 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1671 for (Member i = i_begin; i < i_end; ++i) {
1672 iterate_type(m_mdr_policy, m_functor, update)(i);
1677 for (
int i = 1; i < num_worker_threads; ++i) {
1678 ValueJoin::join(ReducerConditional::select(m_functor, m_reducer),
1679 reinterpret_cast<pointer_type>(buffer.get(0)),
1680 reinterpret_cast<pointer_type>(buffer.get(i)));
1683 Kokkos::Impl::FunctorFinal<ReducerTypeFwd, WorkTagFwd>::final(
1684 ReducerConditional::select(m_functor, m_reducer),
1685 reinterpret_cast<pointer_type>(buffer.get(0)));
1687 if (m_result_ptr !=
nullptr) {
1688 const int n = Analysis::value_count(
1689 ReducerConditional::select(m_functor, m_reducer));
1691 for (
int j = 0; j < n; ++j) {
1692 m_result_ptr[j] =
reinterpret_cast<pointer_type
>(buffer.get(0))[j];
1697 template <
class ViewType>
1698 inline ParallelReduce(
1699 const FunctorType &arg_functor, MDRangePolicy arg_policy,
1700 const ViewType &arg_view,
1701 typename std::enable_if<Kokkos::is_view<ViewType>::value &&
1702 !Kokkos::is_reducer_type<ReducerType>::value,
1703 void *>::type =
nullptr)
1704 : m_functor(arg_functor),
1705 m_mdr_policy(arg_policy),
1706 m_policy(Policy(0, m_mdr_policy.m_num_tiles).set_chunk_size(1)),
1707 m_reducer(InvalidType()),
1708 m_result_ptr(arg_view.data()),
1709 m_force_synchronous(!arg_view.impl_track().has_record()) {}
1711 inline ParallelReduce(
const FunctorType &arg_functor,
1712 MDRangePolicy arg_policy,
const ReducerType &reducer)
1713 : m_functor(arg_functor),
1714 m_mdr_policy(arg_policy),
1715 m_policy(Policy(0, m_mdr_policy.m_num_tiles).set_chunk_size(1)),
1717 m_result_ptr(reducer.view().data()),
1718 m_force_synchronous(!reducer.view().impl_track().has_record()) {}
1719 template <
typename Policy,
typename Functor>
1720 static int max_tile_size_product(
const Policy &,
const Functor &) {
1735 template <
class FunctorType,
class... Traits>
1736 class ParallelScan<FunctorType,
Kokkos::RangePolicy<Traits...>,
1737 Kokkos::Experimental::HPX> {
1740 using WorkTag =
typename Policy::work_tag;
1741 using WorkRange =
typename Policy::WorkRange;
1742 using Member =
typename Policy::member_type;
1744 FunctorAnalysis<FunctorPatternInterface::SCAN, Policy, FunctorType>;
1745 using ValueInit = Kokkos::Impl::FunctorValueInit<FunctorType, WorkTag>;
1746 using ValueJoin = Kokkos::Impl::FunctorValueJoin<FunctorType, WorkTag>;
1747 using ValueOps = Kokkos::Impl::FunctorValueOps<FunctorType, WorkTag>;
1748 using pointer_type =
typename Analysis::pointer_type;
1749 using reference_type =
typename Analysis::reference_type;
1750 using value_type =
typename Analysis::value_type;
1752 const FunctorType m_functor;
1753 const Policy m_policy;
1755 template <
class TagType>
1757 typename std::enable_if<std::is_same<TagType, void>::value>::type
1758 execute_functor_range(
const FunctorType &functor,
const Member i_begin,
1759 const Member i_end, reference_type update,
1761 for (Member i = i_begin; i < i_end; ++i) {
1762 functor(i, update,
final);
1766 template <
class TagType>
1768 typename std::enable_if<!std::is_same<TagType, void>::value>::type
1769 execute_functor_range(
const FunctorType &functor,
const Member i_begin,
1770 const Member i_end, reference_type update,
1773 for (Member i = i_begin; i < i_end; ++i) {
1774 functor(t, i, update,
final);
1779 void execute()
const { dispatch_execute_task(
this, m_policy.space()); }
1781 inline void execute_task()
const {
1783 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 1784 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1788 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1789 const int value_count = Analysis::value_count(m_functor);
1790 const std::size_t value_size = Analysis::value_size(m_functor);
1792 thread_buffer &buffer = m_policy.space().impl_get_buffer();
1793 buffer.resize(num_worker_threads, 2 * value_size);
1796 using hpx::lcos::local::barrier;
1797 using hpx::lcos::local::latch;
1799 barrier bar(num_worker_threads);
1800 latch num_tasks_remaining(num_worker_threads);
1801 ChunkedRoundRobinExecutor exec(num_worker_threads);
1803 for (
int t = 0; t < num_worker_threads; ++t) {
1804 apply(exec, [
this, &bar, &buffer, &num_tasks_remaining,
1805 num_worker_threads, value_count, value_size, t]() {
1806 reference_type update_sum = ValueInit::init(
1807 m_functor, reinterpret_cast<pointer_type>(buffer.get(t)));
1809 const WorkRange range(m_policy, t, num_worker_threads);
1810 execute_functor_range<WorkTag>(m_functor, range.begin(), range.end(),
1816 ValueInit::init(m_functor, reinterpret_cast<pointer_type>(
1817 buffer.get(0) + value_size));
1819 for (
int i = 1; i < num_worker_threads; ++i) {
1820 pointer_type ptr_1_prev =
1821 reinterpret_cast<pointer_type
>(buffer.get(i - 1));
1822 pointer_type ptr_2_prev =
1823 reinterpret_cast<pointer_type
>(buffer.get(i - 1) + value_size);
1824 pointer_type ptr_2 =
1825 reinterpret_cast<pointer_type
>(buffer.get(i) + value_size);
1827 for (
int j = 0; j < value_count; ++j) {
1828 ptr_2[j] = ptr_2_prev[j];
1831 ValueJoin::join(m_functor, ptr_2, ptr_1_prev);
1837 reference_type update_base = ValueOps::reference(
1838 reinterpret_cast<pointer_type>(buffer.get(t) + value_size));
1840 execute_functor_range<WorkTag>(m_functor, range.begin(), range.end(),
1843 num_tasks_remaining.count_down(1);
1847 num_tasks_remaining.wait();
1850 inline ParallelScan(
const FunctorType &arg_functor,
const Policy &arg_policy)
1851 : m_functor(arg_functor), m_policy(arg_policy) {}
1854 template <
class FunctorType,
class ReturnType,
class... Traits>
1855 class ParallelScanWithTotal<FunctorType,
Kokkos::RangePolicy<Traits...>,
1859 using WorkTag =
typename Policy::work_tag;
1860 using WorkRange =
typename Policy::WorkRange;
1861 using Member =
typename Policy::member_type;
1863 FunctorAnalysis<FunctorPatternInterface::SCAN, Policy, FunctorType>;
1864 using ValueInit = Kokkos::Impl::FunctorValueInit<FunctorType, WorkTag>;
1865 using ValueJoin = Kokkos::Impl::FunctorValueJoin<FunctorType, WorkTag>;
1866 using ValueOps = Kokkos::Impl::FunctorValueOps<FunctorType, WorkTag>;
1867 using pointer_type =
typename Analysis::pointer_type;
1868 using reference_type =
typename Analysis::reference_type;
1869 using value_type =
typename Analysis::value_type;
1871 const FunctorType m_functor;
1872 const Policy m_policy;
1875 template <
class TagType>
1877 typename std::enable_if<std::is_same<TagType, void>::value>::type
1878 execute_functor_range(
const FunctorType &functor,
const Member i_begin,
1879 const Member i_end, reference_type update,
1881 for (Member i = i_begin; i < i_end; ++i) {
1882 functor(i, update,
final);
1886 template <
class TagType>
1888 typename std::enable_if<!std::is_same<TagType, void>::value>::type
1889 execute_functor_range(
const FunctorType &functor,
const Member i_begin,
1890 const Member i_end, reference_type update,
1893 for (Member i = i_begin; i < i_end; ++i) {
1894 functor(t, i, update,
final);
1899 void execute()
const { dispatch_execute_task(
this, m_policy.space()); }
1901 inline void execute_task()
const {
1903 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 1904 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1908 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1909 const int value_count = Analysis::value_count(m_functor);
1910 const std::size_t value_size = Analysis::value_size(m_functor);
1912 thread_buffer &buffer = m_policy.space().impl_get_buffer();
1913 buffer.resize(num_worker_threads, 2 * value_size);
1916 using hpx::lcos::local::barrier;
1917 using hpx::lcos::local::latch;
1919 barrier bar(num_worker_threads);
1920 latch num_tasks_remaining(num_worker_threads);
1921 ChunkedRoundRobinExecutor exec(num_worker_threads);
1923 for (
int t = 0; t < num_worker_threads; ++t) {
1924 apply(exec, [
this, &bar, &buffer, &num_tasks_remaining,
1925 num_worker_threads, value_count, value_size, t]() {
1926 reference_type update_sum = ValueInit::init(
1927 m_functor, reinterpret_cast<pointer_type>(buffer.get(t)));
1929 const WorkRange range(m_policy, t, num_worker_threads);
1930 execute_functor_range<WorkTag>(m_functor, range.begin(), range.end(),
1936 ValueInit::init(m_functor, reinterpret_cast<pointer_type>(
1937 buffer.get(0) + value_size));
1939 for (
int i = 1; i < num_worker_threads; ++i) {
1940 pointer_type ptr_1_prev =
1941 reinterpret_cast<pointer_type
>(buffer.get(i - 1));
1942 pointer_type ptr_2_prev =
1943 reinterpret_cast<pointer_type
>(buffer.get(i - 1) + value_size);
1944 pointer_type ptr_2 =
1945 reinterpret_cast<pointer_type
>(buffer.get(i) + value_size);
1947 for (
int j = 0; j < value_count; ++j) {
1948 ptr_2[j] = ptr_2_prev[j];
1951 ValueJoin::join(m_functor, ptr_2, ptr_1_prev);
1957 reference_type update_base = ValueOps::reference(
1958 reinterpret_cast<pointer_type>(buffer.get(t) + value_size));
1960 execute_functor_range<WorkTag>(m_functor, range.begin(), range.end(),
1963 if (t == num_worker_threads - 1) {
1964 m_returnvalue = update_base;
1967 num_tasks_remaining.count_down(1);
1971 num_tasks_remaining.wait();
1974 inline ParallelScanWithTotal(
const FunctorType &arg_functor,
1975 const Policy &arg_policy,
1977 : m_functor(arg_functor),
1978 m_policy(arg_policy),
1979 m_returnvalue(arg_returnvalue) {}
1986 template <
class FunctorType,
class... Properties>
1987 class ParallelFor<FunctorType,
Kokkos::TeamPolicy<Properties...>,
1988 Kokkos::Experimental::HPX> {
1990 using Policy = TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>;
1991 using WorkTag =
typename Policy::work_tag;
1992 using Member =
typename Policy::member_type;
1995 const FunctorType m_functor;
1996 const Policy m_policy;
1998 const std::size_t m_shared;
2000 template <
class TagType>
2002 typename std::enable_if<std::is_same<TagType, void>::value>::type
2003 execute_functor(
const FunctorType &functor,
const Policy &policy,
2004 const int league_rank,
char *local_buffer,
2005 const std::size_t local_buffer_size) {
2006 functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size));
2009 template <
class TagType>
2011 typename std::enable_if<!std::is_same<TagType, void>::value>::type
2012 execute_functor(
const FunctorType &functor,
const Policy &policy,
2013 const int league_rank,
char *local_buffer,
2014 const std::size_t local_buffer_size) {
2016 functor(t, Member(policy, 0, league_rank, local_buffer, local_buffer_size));
2019 template <
class TagType>
2021 typename std::enable_if<std::is_same<TagType, void>::value>::type
2022 execute_functor_range(
const FunctorType &functor,
const Policy &policy,
2023 const int league_rank_begin,
2024 const int league_rank_end,
char *local_buffer,
2025 const std::size_t local_buffer_size) {
2026 for (
int league_rank = league_rank_begin; league_rank < league_rank_end;
2028 functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size));
2032 template <
class TagType>
2034 typename std::enable_if<!std::is_same<TagType, void>::value>::type
2035 execute_functor_range(
const FunctorType &functor,
const Policy &policy,
2036 const int league_rank_begin,
2037 const int league_rank_end,
char *local_buffer,
2038 const std::size_t local_buffer_size) {
2040 for (
int league_rank = league_rank_begin; league_rank < league_rank_end;
2043 Member(policy, 0, league_rank, local_buffer, local_buffer_size));
2048 void execute()
const { dispatch_execute_task(
this, m_policy.space()); }
2050 inline void execute_task()
const {
2052 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 2053 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
2057 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
2059 thread_buffer &buffer = m_policy.space().impl_get_buffer();
2060 buffer.resize(num_worker_threads, m_shared);
2062 #if KOKKOS_HPX_IMPLEMENTATION == 0 2063 using hpx::parallel::for_loop;
2064 using hpx::parallel::execution::par;
2065 using hpx::parallel::execution::static_chunk_size;
2068 par.with(static_chunk_size(m_policy.chunk_size())), 0,
2069 m_policy.league_size(), [
this, &buffer](
const int league_rank) {
2070 execute_functor<WorkTag>(
2071 m_functor, m_policy, league_rank,
2072 buffer.get(Kokkos::Experimental::HPX::impl_hardware_thread_id()),
2076 #elif KOKKOS_HPX_IMPLEMENTATION == 1 2078 using hpx::lcos::local::latch;
2080 const int num_tasks = (m_policy.league_size() + m_policy.chunk_size() - 1) /
2081 m_policy.chunk_size();
2082 latch num_tasks_remaining(num_tasks);
2083 ChunkedRoundRobinExecutor exec(num_tasks);
2085 for (
int league_rank_begin = 0; league_rank_begin < m_policy.league_size();
2086 league_rank_begin += m_policy.chunk_size()) {
2087 apply(exec, [
this, &buffer, &num_tasks_remaining, league_rank_begin]() {
2088 const int league_rank_end = (std::min)(
2089 league_rank_begin + m_policy.chunk_size(), m_policy.league_size());
2090 execute_functor_range<WorkTag>(
2091 m_functor, m_policy, league_rank_begin, league_rank_end,
2092 buffer.get(Kokkos::Experimental::HPX::impl_hardware_thread_id()),
2095 num_tasks_remaining.count_down(1);
2099 num_tasks_remaining.wait();
2101 #elif KOKKOS_HPX_IMPLEMENTATION == 2 2102 using hpx::parallel::for_loop_strided;
2103 using hpx::parallel::execution::par;
2104 using hpx::parallel::execution::static_chunk_size;
2106 const int num_tasks = (m_policy.league_size() + m_policy.chunk_size() - 1) /
2107 m_policy.chunk_size();
2108 ChunkedRoundRobinExecutor exec(num_tasks);
2111 par.on(exec).with(static_chunk_size(1)), 0, m_policy.league_size(),
2112 m_policy.chunk_size(), [
this, &buffer](
const int league_rank_begin) {
2113 const int league_rank_end =
2114 (std::min)(league_rank_begin + m_policy.chunk_size(),
2115 m_policy.league_size());
2116 execute_functor_range<WorkTag>(
2117 m_functor, m_policy, league_rank_begin, league_rank_end,
2118 buffer.get(Kokkos::Experimental::HPX::impl_hardware_thread_id()),
2124 ParallelFor(
const FunctorType &arg_functor,
const Policy &arg_policy)
2125 : m_functor(arg_functor),
2126 m_policy(arg_policy),
2127 m_league(arg_policy.league_size()),
2128 m_shared(arg_policy.scratch_size(0) + arg_policy.scratch_size(1) +
2129 FunctorTeamShmemSize<FunctorType>::value(
2130 arg_functor, arg_policy.team_size())) {}
2133 template <
class FunctorType,
class ReducerType,
class... Properties>
2134 class ParallelReduce<FunctorType,
Kokkos::TeamPolicy<Properties...>,
2135 ReducerType, Kokkos::Experimental::HPX> {
2137 using Policy = TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>;
2139 FunctorAnalysis<FunctorPatternInterface::REDUCE, Policy, FunctorType>;
2140 using Member =
typename Policy::member_type;
2141 using WorkTag =
typename Policy::work_tag;
2142 using ReducerConditional =
2143 Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
2144 FunctorType, ReducerType>;
2145 using ReducerTypeFwd =
typename ReducerConditional::type;
2147 typename Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
2148 WorkTag,
void>::type;
2149 using ValueInit = Kokkos::Impl::FunctorValueInit<ReducerTypeFwd, WorkTagFwd>;
2150 using ValueFinal = Kokkos::Impl::FunctorFinal<ReducerTypeFwd, WorkTagFwd>;
2151 using ValueJoin = Kokkos::Impl::FunctorValueJoin<ReducerTypeFwd, WorkTagFwd>;
2152 using ValueOps = Kokkos::Impl::FunctorValueOps<ReducerTypeFwd, WorkTagFwd>;
2153 using pointer_type =
typename Analysis::pointer_type;
2154 using reference_type =
typename Analysis::reference_type;
2155 using value_type =
typename Analysis::value_type;
2157 const FunctorType m_functor;
2159 const Policy m_policy;
2160 const ReducerType m_reducer;
2161 pointer_type m_result_ptr;
2162 const std::size_t m_shared;
2164 bool m_force_synchronous;
2166 template <
class TagType>
2168 typename std::enable_if<std::is_same<TagType, void>::value>::type
2169 execute_functor(
const FunctorType &functor,
const Policy &policy,
2170 const int league_rank,
char *local_buffer,
2171 const std::size_t local_buffer_size,
2172 reference_type update) {
2173 functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size),
2177 template <
class TagType>
2179 typename std::enable_if<!std::is_same<TagType, void>::value>::type
2180 execute_functor(
const FunctorType &functor,
const Policy &policy,
2181 const int league_rank,
char *local_buffer,
2182 const std::size_t local_buffer_size,
2183 reference_type update) {
2185 functor(t, Member(policy, 0, league_rank, local_buffer, local_buffer_size),
2189 template <
class TagType>
2191 typename std::enable_if<std::is_same<TagType, void>::value>::type
2192 execute_functor_range(
const FunctorType &functor,
const Policy &policy,
2193 const int league_rank_begin,
2194 const int league_rank_end,
char *local_buffer,
2195 const std::size_t local_buffer_size,
2196 reference_type update) {
2197 for (
int league_rank = league_rank_begin; league_rank < league_rank_end;
2199 functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size),
2204 template <
class TagType>
2206 typename std::enable_if<!std::is_same<TagType, void>::value>::type
2207 execute_functor_range(
const FunctorType &functor,
const Policy &policy,
2208 const int league_rank_begin,
2209 const int league_rank_end,
char *local_buffer,
2210 const std::size_t local_buffer_size,
2211 reference_type update) {
2213 for (
int league_rank = league_rank_begin; league_rank < league_rank_end;
2216 Member(policy, 0, league_rank, local_buffer, local_buffer_size),
2222 void execute()
const {
2223 if (m_policy.league_size() * m_policy.team_size() == 0) {
2225 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
2227 ValueFinal::final(ReducerConditional::select(m_functor, m_reducer),
2232 dispatch_execute_task(
this, m_policy.space());
2235 inline void execute_task()
const {
2237 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH) 2238 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
2242 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
2243 const std::size_t value_size =
2244 Analysis::value_size(ReducerConditional::select(m_functor, m_reducer));
2246 thread_buffer &buffer = m_policy.space().impl_get_buffer();
2247 buffer.resize(num_worker_threads, value_size + m_shared);
2249 #if KOKKOS_HPX_IMPLEMENTATION == 0 2250 using hpx::parallel::for_loop;
2251 using hpx::parallel::execution::par;
2253 for_loop(par, 0, num_worker_threads, [
this, &buffer](
const std::size_t t) {
2254 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
2255 reinterpret_cast<pointer_type>(buffer.get(t)));
2258 using hpx::parallel::execution::static_chunk_size;
2260 hpx::parallel::for_loop(
2261 par.with(static_chunk_size(m_policy.chunk_size())), 0,
2262 m_policy.league_size(),
2263 [
this, &buffer, value_size](
const int league_rank) {
2264 std::size_t t = Kokkos::Experimental::HPX::impl_hardware_thread_id();
2265 reference_type update = ValueOps::reference(
2266 reinterpret_cast<pointer_type>(buffer.get(t)));
2268 execute_functor<WorkTag>(m_functor, m_policy, league_rank,
2269 buffer.get(t) + value_size, m_shared,
2273 #elif KOKKOS_HPX_IMPLEMENTATION == 1 2275 using hpx::lcos::local::latch;
2278 latch num_tasks_remaining(num_worker_threads);
2279 ChunkedRoundRobinExecutor exec(num_worker_threads);
2281 for (
int t = 0; t < num_worker_threads; ++t) {
2282 apply(exec, [
this, &buffer, &num_tasks_remaining, t]() {
2283 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
2284 reinterpret_cast<pointer_type>(buffer.get(t)));
2286 num_tasks_remaining.count_down(1);
2290 num_tasks_remaining.wait();
2293 const int num_tasks = (m_policy.league_size() + m_policy.chunk_size() - 1) /
2294 m_policy.chunk_size();
2295 latch num_tasks_remaining(num_tasks);
2296 ChunkedRoundRobinExecutor exec(num_tasks);
2298 for (
int league_rank_begin = 0; league_rank_begin < m_policy.league_size();
2299 league_rank_begin += m_policy.chunk_size()) {
2300 apply(exec, [
this, &buffer, &num_tasks_remaining, league_rank_begin,
2302 std::size_t t = Kokkos::Experimental::HPX::impl_hardware_thread_id();
2303 reference_type update =
2304 ValueOps::reference(reinterpret_cast<pointer_type>(buffer.get(t)));
2305 const int league_rank_end = (std::min)(
2306 league_rank_begin + m_policy.chunk_size(), m_policy.league_size());
2307 execute_functor_range<WorkTag>(
2308 m_functor, m_policy, league_rank_begin, league_rank_end,
2309 buffer.get(t) + value_size, m_shared, update);
2311 num_tasks_remaining.count_down(1);
2315 num_tasks_remaining.wait();
2317 #elif KOKKOS_HPX_IMPLEMENTATION == 2 2318 using hpx::parallel::for_loop;
2319 using hpx::parallel::for_loop_strided;
2320 using hpx::parallel::execution::par;
2321 using hpx::parallel::execution::static_chunk_size;
2324 ChunkedRoundRobinExecutor exec(num_worker_threads);
2326 for_loop(par.on(exec).with(static_chunk_size(1)), 0, num_worker_threads,
2327 [
this, &buffer](std::size_t
const t) {
2329 ReducerConditional::select(m_functor, m_reducer),
2330 reinterpret_cast<pointer_type>(buffer.get(t)));
2334 const int num_tasks = (m_policy.league_size() + m_policy.chunk_size() - 1) /
2335 m_policy.chunk_size();
2336 ChunkedRoundRobinExecutor exec(num_tasks);
2339 par.on(exec).with(static_chunk_size(1)), 0, m_policy.league_size(),
2340 m_policy.chunk_size(),
2341 [
this, &buffer, value_size](
int const league_rank_begin) {
2342 std::size_t t = Kokkos::Experimental::HPX::impl_hardware_thread_id();
2343 reference_type update = ValueOps::reference(
2344 reinterpret_cast<pointer_type>(buffer.get(t)));
2345 const int league_rank_end =
2346 (std::min)(league_rank_begin + m_policy.chunk_size(),
2347 m_policy.league_size());
2348 execute_functor_range<WorkTag>(
2349 m_functor, m_policy, league_rank_begin, league_rank_end,
2350 buffer.get(t) + value_size, m_shared, update);
2354 const pointer_type ptr =
reinterpret_cast<pointer_type
>(buffer.get(0));
2355 for (
int t = 1; t < num_worker_threads; ++t) {
2356 ValueJoin::join(ReducerConditional::select(m_functor, m_reducer), ptr,
2357 reinterpret_cast<pointer_type>(buffer.get(t)));
2360 Kokkos::Impl::FunctorFinal<ReducerTypeFwd, WorkTagFwd>::final(
2361 ReducerConditional::select(m_functor, m_reducer), ptr);
2364 const int n = Analysis::value_count(
2365 ReducerConditional::select(m_functor, m_reducer));
2367 for (
int j = 0; j < n; ++j) {
2368 m_result_ptr[j] = ptr[j];
2373 template <
class ViewType>
2375 const FunctorType &arg_functor,
const Policy &arg_policy,
2376 const ViewType &arg_result,
2377 typename std::enable_if<Kokkos::is_view<ViewType>::value &&
2378 !Kokkos::is_reducer_type<ReducerType>::value,
2379 void *>::type =
nullptr)
2380 : m_functor(arg_functor),
2381 m_league(arg_policy.league_size()),
2382 m_policy(arg_policy),
2383 m_reducer(InvalidType()),
2384 m_result_ptr(arg_result.data()),
2385 m_shared(arg_policy.scratch_size(0) + arg_policy.scratch_size(1) +
2386 FunctorTeamShmemSize<FunctorType>::value(
2387 m_functor, arg_policy.team_size())),
2388 m_force_synchronous(!arg_result.impl_track().has_record()) {}
2390 inline ParallelReduce(
const FunctorType &arg_functor, Policy arg_policy,
2391 const ReducerType &reducer)
2392 : m_functor(arg_functor),
2393 m_league(arg_policy.league_size()),
2394 m_policy(arg_policy),
2396 m_result_ptr(reducer.view().data()),
2397 m_shared(arg_policy.scratch_size(0) + arg_policy.scratch_size(1) +
2398 FunctorTeamShmemSize<FunctorType>::value(
2399 arg_functor, arg_policy.team_size())),
2400 m_force_synchronous(!reducer.view().impl_track().has_record()) {}
2407 template <
typename iType>
2408 KOKKOS_INLINE_FUNCTION
2409 Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2410 TeamThreadRange(
const Impl::HPXTeamMember &thread,
const iType &count) {
2411 return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2415 template <
typename iType1,
typename iType2>
2416 KOKKOS_INLINE_FUNCTION Impl::TeamThreadRangeBoundariesStruct<
2417 typename std::common_type<iType1, iType2>::type, Impl::HPXTeamMember>
2418 TeamThreadRange(
const Impl::HPXTeamMember &thread,
const iType1 &i_begin,
2419 const iType2 &i_end) {
2420 using iType =
typename std::common_type<iType1, iType2>::type;
2421 return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2422 thread, iType(i_begin), iType(i_end));
2425 template <
typename iType>
2426 KOKKOS_INLINE_FUNCTION
2427 Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2428 TeamVectorRange(
const Impl::HPXTeamMember &thread,
const iType &count) {
2429 return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2433 template <
typename iType1,
typename iType2>
2434 KOKKOS_INLINE_FUNCTION Impl::TeamThreadRangeBoundariesStruct<
2435 typename std::common_type<iType1, iType2>::type, Impl::HPXTeamMember>
2436 TeamVectorRange(
const Impl::HPXTeamMember &thread,
const iType1 &i_begin,
2437 const iType2 &i_end) {
2438 using iType =
typename std::common_type<iType1, iType2>::type;
2439 return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2440 thread, iType(i_begin), iType(i_end));
2443 template <
typename iType>
2444 KOKKOS_INLINE_FUNCTION
2445 Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2446 ThreadVectorRange(
const Impl::HPXTeamMember &thread,
const iType &count) {
2447 return Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2451 template <
typename iType1,
typename iType2>
2452 KOKKOS_INLINE_FUNCTION Impl::ThreadVectorRangeBoundariesStruct<
2453 typename std::common_type<iType1, iType2>::type, Impl::HPXTeamMember>
2454 ThreadVectorRange(
const Impl::HPXTeamMember &thread,
const iType1 &i_begin,
2455 const iType2 &i_end) {
2456 using iType =
typename std::common_type<iType1, iType2>::type;
2457 return Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2458 thread, iType(i_begin), iType(i_end));
2461 KOKKOS_INLINE_FUNCTION
2462 Impl::ThreadSingleStruct<Impl::HPXTeamMember> PerTeam(
2463 const Impl::HPXTeamMember &thread) {
2464 return Impl::ThreadSingleStruct<Impl::HPXTeamMember>(thread);
2467 KOKKOS_INLINE_FUNCTION
2468 Impl::VectorSingleStruct<Impl::HPXTeamMember> PerThread(
2469 const Impl::HPXTeamMember &thread) {
2470 return Impl::VectorSingleStruct<Impl::HPXTeamMember>(thread);
2478 template <
typename iType,
class Lambda>
2480 const Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2482 const Lambda &lambda) {
2483 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2484 i += loop_boundaries.increment)
2494 template <
typename iType,
class Lambda,
typename ValueType>
2495 KOKKOS_INLINE_FUNCTION
void parallel_reduce(
2496 const Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2498 const Lambda &lambda, ValueType &result) {
2499 result = ValueType();
2500 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2501 i += loop_boundaries.increment) {
2511 template <
typename iType,
class Lambda>
2513 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2515 const Lambda &lambda) {
2516 #ifdef KOKKOS_ENABLE_PRAGMA_IVDEP 2519 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2520 i += loop_boundaries.increment) {
2531 template <
typename iType,
class Lambda,
typename ValueType>
2532 KOKKOS_INLINE_FUNCTION
void parallel_reduce(
2533 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2535 const Lambda &lambda, ValueType &result) {
2536 result = ValueType();
2537 #ifdef KOKKOS_ENABLE_PRAGMA_IVDEP 2540 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2541 i += loop_boundaries.increment) {
2546 template <
typename iType,
class Lambda,
typename ReducerType>
2547 KOKKOS_INLINE_FUNCTION
void parallel_reduce(
2548 const Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2550 const Lambda &lambda,
const ReducerType &reducer) {
2551 reducer.init(reducer.reference());
2552 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2553 i += loop_boundaries.increment) {
2554 lambda(i, reducer.reference());
2558 template <
typename iType,
class Lambda,
typename ReducerType>
2559 KOKKOS_INLINE_FUNCTION
void parallel_reduce(
2560 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2562 const Lambda &lambda,
const ReducerType &reducer) {
2563 reducer.init(reducer.reference());
2564 #ifdef KOKKOS_ENABLE_PRAGMA_IVDEP 2567 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2568 i += loop_boundaries.increment) {
2569 lambda(i, reducer.reference());
2573 template <
typename iType,
class FunctorType>
2574 KOKKOS_INLINE_FUNCTION
void parallel_scan(
2575 Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
const 2577 const FunctorType &lambda) {
2578 using value_type =
typename Kokkos::Impl::FunctorAnalysis<
2579 Kokkos::Impl::FunctorPatternInterface::SCAN, void,
2580 FunctorType>::value_type;
2582 value_type scan_val = value_type();
2585 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2586 i += loop_boundaries.increment) {
2587 lambda(i, scan_val,
false);
2591 scan_val = loop_boundaries.thread.team_scan(scan_val);
2593 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2594 i += loop_boundaries.increment) {
2595 lambda(i, scan_val,
true);
2610 template <
typename iType,
class FunctorType>
2611 KOKKOS_INLINE_FUNCTION
void parallel_scan(
2612 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2614 const FunctorType &lambda) {
2615 using ValueTraits = Kokkos::Impl::FunctorValueTraits<FunctorType, void>;
2616 using value_type =
typename ValueTraits::value_type;
2618 value_type scan_val = value_type();
2620 #ifdef KOKKOS_ENABLE_PRAGMA_IVDEP 2623 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2624 i += loop_boundaries.increment) {
2625 lambda(i, scan_val,
true);
2632 template <
typename iType,
class FunctorType,
typename ReducerType>
2633 KOKKOS_INLINE_FUNCTION
2634 typename std::enable_if<Kokkos::is_reducer<ReducerType>::value>::type
2635 parallel_scan(
const Impl::ThreadVectorRangeBoundariesStruct<
2636 iType, Impl::HPXTeamMember> &loop_boundaries,
2637 const FunctorType &lambda,
const ReducerType &reducer) {
2638 typename ReducerType::value_type scan_val;
2639 reducer.init(scan_val);
2641 #ifdef KOKKOS_ENABLE_PRAGMA_IVDEP 2644 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2645 i += loop_boundaries.increment) {
2646 lambda(i, scan_val,
true);
2650 template <
class FunctorType>
2651 KOKKOS_INLINE_FUNCTION
void single(
2652 const Impl::VectorSingleStruct<Impl::HPXTeamMember> &,
2653 const FunctorType &lambda) {
2657 template <
class FunctorType>
2658 KOKKOS_INLINE_FUNCTION
void single(
2659 const Impl::ThreadSingleStruct<Impl::HPXTeamMember> &,
2660 const FunctorType &lambda) {
2664 template <
class FunctorType,
class ValueType>
2665 KOKKOS_INLINE_FUNCTION
void single(
2666 const Impl::VectorSingleStruct<Impl::HPXTeamMember> &,
2667 const FunctorType &lambda, ValueType &val) {
2671 template <
class FunctorType,
class ValueType>
2672 KOKKOS_INLINE_FUNCTION
void single(
2673 const Impl::ThreadSingleStruct<Impl::HPXTeamMember> &,
2674 const FunctorType &lambda, ValueType &val) {
2680 #include <HPX/Kokkos_HPX_Task.hpp>
KOKKOS_INLINE_FUNCTION void release(size_type) const
release a value acquired by generate
Scratch memory space associated with an execution space.
View to an array of data.
Memory management for host memory.
Declaration of various MemoryLayout options.
KOKKOS_INLINE_FUNCTION size_type size() const
upper bound for acquired values, i.e. 0 <= value < size()
UniqueToken(execution_space const &=execution_space())
create object size for concurrency on the given instance
Declaration of parallel operators.
KOKKOS_INLINE_FUNCTION size_type acquire() const
acquire value such that 0 <= value < size()
void parallel_for(const ExecPolicy &policy, const FunctorType &functor, const std::string &str="", typename std::enable_if< Kokkos::Impl::is_execution_policy< ExecPolicy >::value >::type *=nullptr)
Execute functor in parallel according to the execution policy.
Execution policy for work over a range of an integral type.