50class HAZELCAST_API iexecutor_service :
public proxy::ProxyImpl
52 friend class spi::ProxyManager;
55 static constexpr const char* SERVICE_NAME =
"hz:impl:executorService";
58 class executor_promise
61 executor_promise(spi::ClientContext& context)
66 boost::future<boost::optional<T>>& future,
67 boost::uuids::uuid uuid,
70 spi::ClientContext& context,
71 const std::shared_ptr<spi::impl::ClientInvocation>& invocation)
72 : shared_future_(future.share())
74 , partition_id_(partition_id)
77 , invocation_(invocation)
80 bool cancel(
bool may_interrupt_if_running)
82 if (shared_future_.is_ready()) {
87 return invoke_cancel_request(may_interrupt_if_running);
89 util::exception_util::rethrow(std::current_exception());
94 boost::shared_future<boost::optional<T>> get_future()
96 return shared_future_;
100 boost::shared_future<boost::optional<T>> shared_future_;
101 boost::uuids::uuid uuid_;
103 boost::uuids::uuid member_uuid_;
104 spi::ClientContext& context_;
105 std::shared_ptr<spi::impl::ClientInvocation> invocation_;
107 bool invoke_cancel_request(
bool may_interrupt_if_running)
109 invocation_->wait_invoked();
111 if (partition_id_ > -1) {
113 protocol::codec::executorservice_cancelonpartition_encode(
114 uuid_, may_interrupt_if_running);
115 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
116 spi::impl::ClientInvocation::create(
119 boost::uuids::to_string(uuid_),
121 return clientInvocation->invoke()
123 .get_first_fixed_sized_field<
bool>();
126 protocol::codec::executorservice_cancelonmember_encode(
127 uuid_, member_uuid_, may_interrupt_if_running);
128 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
129 spi::impl::ClientInvocation::create(
132 boost::uuids::to_string(uuid_),
134 return clientInvocation->invoke()
136 .get_first_fixed_sized_field<
bool>();
150 template<
typename HazelcastSerializable>
151 void execute(
const HazelcastSerializable& command)
164 template<
typename HazelcastSerializable>
165 void execute(
const HazelcastSerializable& command,
169 int selectedMember = rand() % (int)members.size();
171 members[selectedMember]);
180 template<
typename HazelcastSerializable,
typename K>
193 template<
typename HazelcastSerializable>
207 template<
typename HazelcastSerializable>
209 const std::vector<member>& members)
211 for (std::vector<member>::const_iterator it = members.begin();
226 template<
typename HazelcastSerializable>
239 template<
typename HazelcastSerializable>
242 std::vector<member> memberList =
243 get_context().get_client_cluster_service().get_member_list();
244 for (std::vector<member>::const_iterator it = memberList.begin();
245 it != memberList.end();
261 template<
typename HazelcastSerializable,
typename T,
typename K>
265 return submit_to_key_owner_internal<HazelcastSerializable, T, K>(
278 template<
typename HazelcastSerializable,
typename T>
282 return submit_to_target_internal<HazelcastSerializable, T>(
297 template<
typename HazelcastSerializable,
typename T>
299 const HazelcastSerializable& task,
300 const std::vector<member>& members)
302 std::unordered_map<member, executor_promise<T>> futureMap;
303 for (
auto&
member : members) {
304 auto f = submit_to_target_internal<HazelcastSerializable, T>(
307 futureMap.emplace(
member, std::move(f));
324 template<
typename HazelcastSerializable,
typename T>
326 const HazelcastSerializable& task,
343 template<
typename HazelcastSerializable,
typename T>
345 const HazelcastSerializable& task)
347 std::unordered_map<member, executor_promise<T>> futureMap;
349 get_context().get_client_cluster_service().get_member_list()) {
350 auto f = submit_to_target_internal<HazelcastSerializable, T>(
353 futureMap.emplace(m, std::move(f));
372 template<
typename HazelcastSerializable,
typename T>
375 serialization::pimpl::data task_data =
376 to_data<HazelcastSerializable>(task);
378 if (task_data.has_partition_hash()) {
379 int partitionId = get_partition_id(task_data);
381 return submit_to_partition_internal<T>(
382 task_data,
false, partitionId);
384 return submit_to_random_internal<T>(task_data,
false);
398 template<
typename HazelcastSerializable,
typename T>
403 int selectedMember = rand() % (int)members.size();
405 task, members[selectedMember]);
424 boost::future<bool> is_shutdown();
433 boost::future<bool> is_terminated();
438 struct executor_marker
441 std::vector<member> select_members(
const member_selector& member_selector);
444 executor_promise<T> submit_to_partition_internal(
445 const serialization::pimpl::data& task_data,
449 auto uuid = context_.random_uuid();
451 auto f = invoke_on_partition_internal(task_data, partition_id, uuid);
453 return check_sync<T>(f, uuid, partition_id, prevent_sync);
456 std::pair<boost::future<protocol::ClientMessage>,
457 std::shared_ptr<spi::impl::ClientInvocation>>
458 invoke_on_partition_internal(
const serialization::pimpl::data& task_data,
460 boost::uuids::uuid uuid)
462 return invoke_on_partition_owner(
463 protocol::codec::executorservice_submittopartition_encode(
464 name_, uuid, task_data),
468 template<
typename HazelcastSerializable,
typename T,
typename K>
469 executor_promise<T> submit_to_key_owner_internal(
470 const HazelcastSerializable& task,
475 serialization::pimpl::data dataKey = to_data<K>(key);
477 int partitionId = get_partition_id(dataKey);
479 return submit_to_partition_internal<T>(
480 to_data<HazelcastSerializable>(task), prevent_sync, partitionId);
484 executor_promise<T> submit_to_random_internal(
485 const serialization::pimpl::data& task_data,
489 int partitionId = random_partition_id();
491 return submit_to_partition_internal<T>(
492 task_data, prevent_sync, partitionId);
495 template<
typename HazelcastSerializable,
typename T>
496 executor_promise<T> submit_to_target_internal(
497 const HazelcastSerializable& task,
498 const member& member,
501 boost::uuids::uuid uuid = context_.random_uuid();
504 invoke_on_target_internal<HazelcastSerializable>(task, member, uuid);
506 return check_sync<T>(f, uuid, -1, member, prevent_sync);
509 template<
typename HazelcastSerializable>
510 std::pair<boost::future<protocol::ClientMessage>,
511 std::shared_ptr<spi::impl::ClientInvocation>>
512 invoke_on_target_internal(
const HazelcastSerializable& task,
513 const member& member,
514 boost::uuids::uuid uuid)
516 return invoke_on_target(
517 protocol::codec::executorservice_submittomember_encode(
518 name_, uuid, to_data(task), member.get_uuid()),
522 std::pair<boost::future<protocol::ClientMessage>,
523 std::shared_ptr<spi::impl::ClientInvocation>>
524 invoke_on_partition_owner(protocol::ClientMessage&& request,
527 std::pair<boost::future<protocol::ClientMessage>,
528 std::shared_ptr<spi::impl::ClientInvocation>>
529 invoke_on_target(protocol::ClientMessage&& request,
530 boost::uuids::uuid target);
533 executor_promise<T> check_sync(
534 std::pair<boost::future<protocol::ClientMessage>,
535 std::shared_ptr<spi::impl::ClientInvocation>>& future_pair,
536 boost::uuids::uuid uuid,
540 return check_sync<T>(
541 future_pair, uuid, partition_id, member(), prevent_sync);
545 typename std::enable_if<!std::is_same<executor_marker, T>::value,
546 executor_promise<T>>::type
548 std::pair<boost::future<protocol::ClientMessage>,
549 std::shared_ptr<spi::impl::ClientInvocation>>& future_pair,
550 boost::uuids::uuid uuid,
552 const member& member,
555 bool sync = is_sync_computation(prevent_sync);
556 auto objectFuture = to_object<T>(
557 decode<serialization::pimpl::data>(std::move(future_pair.first)));
562 return executor_promise<T>(objectFuture,
571 typename std::enable_if<std::is_same<executor_marker, T>::value,
572 executor_promise<T>>::type
574 std::pair<boost::future<protocol::ClientMessage>,
575 std::shared_ptr<spi::impl::ClientInvocation>>& future_pair,
576 boost::uuids::uuid uuid,
578 const member& member,
581 bool sync = is_sync_computation(prevent_sync);
583 future_pair.first.get();
586 return executor_promise<T>(get_context());
589 bool is_sync_computation(
bool prevent_sync);
591 address get_member_address(
const member& member);
593 int random_partition_id();
595 static const int32_t MIN_TIME_RESOLUTION_OF_CONSECUTIVE_SUBMITS = 10;
596 static const int32_t MAX_CONSECUTIVE_SUBMITS = 100;
598 std::atomic<int32_t> consecutive_submits_;
599 std::atomic<int64_t> last_submit_time_;