21 #include <boost/uuid/uuid_generators.hpp>
22 #include <boost/uuid/uuid_io.hpp>
24 #include "hazelcast/util/export.h"
25 #include "hazelcast/client/member_selectors.h"
26 #include "hazelcast/client/proxy/ProxyImpl.h"
27 #include "hazelcast/client/member.h"
28 #include "hazelcast/client/spi/ClientContext.h"
29 #include "hazelcast/client/spi/impl/ClientInvocation.h"
30 #include "hazelcast/util/exception_util.h"
31 #include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
34 #include "hazelcast/client/protocol/codec/codecs.h"
36 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
38 #pragma warning(disable : 4251)
52 friend class spi::ProxyManager;
55 static constexpr
const char* SERVICE_NAME =
"hz:impl:executorService";
66 boost::future<boost::optional<T>>& future,
67 boost::uuids::uuid uuid,
70 spi::ClientContext& context,
71 const std::shared_ptr<spi::impl::ClientInvocation>& invocation)
72 : shared_future_(future.share())
74 , partition_id_(partition_id)
77 , invocation_(invocation)
80 bool cancel(
bool may_interrupt_if_running)
82 if (shared_future_.is_ready()) {
87 return invoke_cancel_request(may_interrupt_if_running);
89 util::exception_util::rethrow(std::current_exception());
94 boost::shared_future<boost::optional<T>> get_future()
96 return shared_future_;
100 boost::shared_future<boost::optional<T>> shared_future_;
101 boost::uuids::uuid uuid_;
103 boost::uuids::uuid member_uuid_;
104 spi::ClientContext& context_;
105 std::shared_ptr<spi::impl::ClientInvocation> invocation_;
107 bool invoke_cancel_request(
bool may_interrupt_if_running)
109 invocation_->wait_invoked();
111 if (partition_id_ > -1) {
113 protocol::codec::executorservice_cancelonpartition_encode(
114 uuid_, may_interrupt_if_running);
115 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
116 spi::impl::ClientInvocation::create(
119 boost::uuids::to_string(uuid_),
121 return clientInvocation->invoke()
123 .get_first_fixed_sized_field<
bool>();
126 protocol::codec::executorservice_cancelonmember_encode(
127 uuid_, member_uuid_, may_interrupt_if_running);
128 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
129 spi::impl::ClientInvocation::create(
132 boost::uuids::to_string(uuid_),
134 return clientInvocation->invoke()
136 .get_first_fixed_sized_field<
bool>();
150 template<
typename HazelcastSerializable>
151 void execute(
const HazelcastSerializable& command)
153 submit<HazelcastSerializable, executor_marker>(command);
164 template<
typename HazelcastSerializable>
165 void execute(
const HazelcastSerializable& command,
169 int selectedMember = rand() % (int)members.size();
170 execute_on_member<HazelcastSerializable>(command,
171 members[selectedMember]);
180 template<
typename HazelcastSerializable,
typename K>
184 submit_to_key_owner<HazelcastSerializable, K>(command, key);
193 template<
typename HazelcastSerializable>
197 submit_to_member<HazelcastSerializable, executor_marker>(command,
207 template<
typename HazelcastSerializable>
209 const std::vector<member>& members)
211 for (std::vector<member>::const_iterator it = members.begin();
214 submit_to_member<HazelcastSerializable, executor_marker>(command,
226 template<
typename HazelcastSerializable>
231 execute_on_members<HazelcastSerializable>(command, members);
239 template<
typename HazelcastSerializable>
242 std::vector<member> memberList =
243 get_context().get_client_cluster_service().get_member_list();
244 for (std::vector<member>::const_iterator it = memberList.begin();
245 it != memberList.end();
247 submit_to_member<HazelcastSerializable, executor_marker>(command,
261 template<
typename HazelcastSerializable,
typename T,
typename K>
265 return submit_to_key_owner_internal<HazelcastSerializable, T, K>(
278 template<
typename HazelcastSerializable,
typename T>
282 return submit_to_target_internal<HazelcastSerializable, T>(
297 template<
typename HazelcastSerializable,
typename T>
299 const HazelcastSerializable& task,
300 const std::vector<member>& members)
302 std::unordered_map<member, executor_promise<T>> futureMap;
303 for (
auto&
member : members) {
304 auto f = submit_to_target_internal<HazelcastSerializable, T>(
307 futureMap.emplace(
member, std::move(f));
324 template<
typename HazelcastSerializable,
typename T>
326 const HazelcastSerializable& task,
330 return submit_to_members<HazelcastSerializable, T>(task, members);
343 template<
typename HazelcastSerializable,
typename T>
345 const HazelcastSerializable& task)
347 std::unordered_map<member, executor_promise<T>> futureMap;
349 get_context().get_client_cluster_service().get_member_list()) {
350 auto f = submit_to_target_internal<HazelcastSerializable, T>(
353 futureMap.emplace(m, std::move(f));
372 template<
typename HazelcastSerializable,
typename T>
375 serialization::pimpl::data task_data =
376 to_data<HazelcastSerializable>(task);
378 if (task_data.has_partition_hash()) {
379 int partitionId = get_partition_id(task_data);
381 return submit_to_partition_internal<T>(
382 task_data,
false, partitionId);
384 return submit_to_random_internal<T>(task_data,
false);
398 template<
typename HazelcastSerializable,
typename T>
403 int selectedMember = rand() % (int)members.size();
404 return submit_to_member<HazelcastSerializable, T>(
405 task, members[selectedMember]);
424 boost::future<bool> is_shutdown();
433 boost::future<bool> is_terminated();
438 struct executor_marker
441 std::vector<member> select_members(
const member_selector& member_selector);
444 executor_promise<T> submit_to_partition_internal(
445 const serialization::pimpl::data& task_data,
449 auto uuid = context_.random_uuid();
451 auto f = invoke_on_partition_internal(task_data, partition_id, uuid);
453 return check_sync<T>(f, uuid, partition_id, prevent_sync);
456 std::pair<boost::future<protocol::ClientMessage>,
457 std::shared_ptr<spi::impl::ClientInvocation>>
458 invoke_on_partition_internal(
const serialization::pimpl::data& task_data,
460 boost::uuids::uuid uuid)
462 return invoke_on_partition_owner(
463 protocol::codec::executorservice_submittopartition_encode(
464 name_, uuid, task_data),
468 template<
typename HazelcastSerializable,
typename T,
typename K>
469 executor_promise<T> submit_to_key_owner_internal(
470 const HazelcastSerializable& task,
475 serialization::pimpl::data dataKey = to_data<K>(key);
477 int partitionId = get_partition_id(dataKey);
479 return submit_to_partition_internal<T>(
480 to_data<HazelcastSerializable>(task), prevent_sync, partitionId);
484 executor_promise<T> submit_to_random_internal(
485 const serialization::pimpl::data& task_data,
489 int partitionId = random_partition_id();
491 return submit_to_partition_internal<T>(
492 task_data, prevent_sync, partitionId);
495 template<
typename HazelcastSerializable,
typename T>
496 executor_promise<T> submit_to_target_internal(
497 const HazelcastSerializable& task,
498 const member& member,
501 boost::uuids::uuid uuid = context_.random_uuid();
504 invoke_on_target_internal<HazelcastSerializable>(task, member, uuid);
506 return check_sync<T>(f, uuid, -1, member, prevent_sync);
509 template<
typename HazelcastSerializable>
510 std::pair<boost::future<protocol::ClientMessage>,
511 std::shared_ptr<spi::impl::ClientInvocation>>
512 invoke_on_target_internal(
const HazelcastSerializable& task,
513 const member& member,
514 boost::uuids::uuid uuid)
516 return invoke_on_target(
517 protocol::codec::executorservice_submittomember_encode(
518 name_, uuid, to_data(task), member.get_uuid()),
522 std::pair<boost::future<protocol::ClientMessage>,
523 std::shared_ptr<spi::impl::ClientInvocation>>
524 invoke_on_partition_owner(protocol::ClientMessage&& request,
527 std::pair<boost::future<protocol::ClientMessage>,
528 std::shared_ptr<spi::impl::ClientInvocation>>
529 invoke_on_target(protocol::ClientMessage&& request,
530 boost::uuids::uuid target);
533 executor_promise<T> check_sync(
534 std::pair<boost::future<protocol::ClientMessage>,
535 std::shared_ptr<spi::impl::ClientInvocation>>& future_pair,
536 boost::uuids::uuid uuid,
540 return check_sync<T>(
541 future_pair, uuid, partition_id, member(), prevent_sync);
545 typename std::enable_if<!std::is_same<executor_marker, T>::value,
546 executor_promise<T>>::type
548 std::pair<boost::future<protocol::ClientMessage>,
549 std::shared_ptr<spi::impl::ClientInvocation>>& future_pair,
550 boost::uuids::uuid uuid,
552 const member& member,
555 bool sync = is_sync_computation(prevent_sync);
556 auto objectFuture = to_object<T>(
557 decode<serialization::pimpl::data>(std::move(future_pair.first)));
562 return executor_promise<T>(objectFuture,
571 typename std::enable_if<std::is_same<executor_marker, T>::value,
572 executor_promise<T>>::type
574 std::pair<boost::future<protocol::ClientMessage>,
575 std::shared_ptr<spi::impl::ClientInvocation>>& future_pair,
576 boost::uuids::uuid uuid,
578 const member& member,
581 bool sync = is_sync_computation(prevent_sync);
583 future_pair.first.get();
586 return executor_promise<T>(get_context());
589 bool is_sync_computation(
bool prevent_sync);
591 address get_member_address(
const member& member);
593 int random_partition_id();
595 static const int32_t MIN_TIME_RESOLUTION_OF_CONSECUTIVE_SUBMITS = 10;
596 static const int32_t MAX_CONSECUTIVE_SUBMITS = 100;
598 std::atomic<int32_t> consecutive_submits_;
599 std::atomic<int64_t> last_submit_time_;
604 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
Base class for all exception originated from Hazelcast methods.
Distributed implementation of java.util.concurrent.ExecutorService.
executor_promise< T > submit(const HazelcastSerializable &task)
Submits a task for execution and returns a executor_promise representing that task.
std::unordered_map< member, executor_promise< T > > submit_to_members(const HazelcastSerializable &task, const std::vector< member > &members)
Submits a task to given members and returns map of Member-executor_promise pairs representing pending...
std::unordered_map< member, executor_promise< T > > submit_to_members(const HazelcastSerializable &task, const member_selector &member_selector)
Submits a task to selected members and returns a map of Member-executor_promise pairs representing pe...
void execute_on_all_members(const HazelcastSerializable &command)
Executes a task on all of the known cluster members.
executor_promise< T > submit_to_member(const HazelcastSerializable &task, const member &member)
Submits a task to the specified member and returns a executor_promise representing that task.
void execute_on_members(const HazelcastSerializable &command, const std::vector< member > &members)
Executes a task on each of the specified members.
std::unordered_map< member, executor_promise< T > > submit_to_all_members(const HazelcastSerializable &task)
Submits task to all cluster members and returns a map of Member-executor_promise pairs representing p...
executor_promise< T > submit_to_key_owner(const HazelcastSerializable &task, const K &key)
Submits a task to the owner of the specified key and returns a executor_promise representing that tas...
void execute(const HazelcastSerializable &command)
Executes the given command at some time in the future.
void execute(const HazelcastSerializable &command, const member_selector &member_selector)
Executes a task on a randomly selected member.
void execute_on_member(const HazelcastSerializable &command, const member &member)
Executes a task on the specified member.
void execute_on_key_owner(const HazelcastSerializable &command, const K &key)
Executes a task on the owner of the specified key.
void execute_on_members(const HazelcastSerializable &command, const member_selector &member_selector)
Executes a task on each of the selected members.
executor_promise< T > submit(const HazelcastSerializable &task, const member_selector &member_selector)
Submits a task to a randomly selected member and returns a executor_promise representing that task.