21 #include <boost/uuid/uuid_generators.hpp>
22 #include <boost/uuid/uuid_io.hpp>
24 #include "hazelcast/util/export.h"
25 #include "hazelcast/client/member_selectors.h"
26 #include "hazelcast/client/proxy/ProxyImpl.h"
27 #include "hazelcast/client/member.h"
28 #include "hazelcast/client/spi/ClientContext.h"
29 #include "hazelcast/client/spi/impl/ClientInvocation.h"
30 #include "hazelcast/util/exception_util.h"
31 #include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
34 #include "hazelcast/client/protocol/codec/codecs.h"
36 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
38 #pragma warning(disable: 4251)
51 friend class spi::ProxyManager;
53 static constexpr
const char *SERVICE_NAME =
"hz:impl:executorService";
60 executor_promise(boost::future<boost::optional<T>> &future, boost::uuids::uuid uuid,
int partition_id,
61 boost::uuids::uuid
member, spi::ClientContext &context,
62 const std::shared_ptr<spi::impl::ClientInvocation> &invocation)
63 : shared_future_(future.share()), uuid_(uuid), partition_id_(partition_id), member_uuid_(
member),
64 context_(context), invocation_(invocation) {}
66 bool cancel(
bool may_interrupt_if_running) {
67 if (shared_future_.is_ready()) {
72 return invoke_cancel_request(may_interrupt_if_running);
74 util::exception_util::rethrow(std::current_exception());
79 boost::shared_future<boost::optional<T>> get_future() {
80 return shared_future_;
84 boost::shared_future<boost::optional<T>> shared_future_;
85 boost::uuids::uuid uuid_;
87 boost::uuids::uuid member_uuid_;
88 spi::ClientContext &context_;
89 std::shared_ptr<spi::impl::ClientInvocation> invocation_;
91 bool invoke_cancel_request(
bool may_interrupt_if_running) {
92 invocation_->wait_invoked();
94 if (partition_id_ > -1) {
95 auto request = protocol::codec::executorservice_cancelonpartition_encode(uuid_, may_interrupt_if_running);
96 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation = spi::impl::ClientInvocation::create(
97 context_, request, boost::uuids::to_string(uuid_), partition_id_);
98 return clientInvocation->invoke().get().get_first_fixed_sized_field<
bool>();
100 auto request = protocol::codec::executorservice_cancelonmember_encode(
101 uuid_, member_uuid_, may_interrupt_if_running);
102 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation = spi::impl::ClientInvocation::create(
103 context_, request, boost::uuids::to_string(uuid_), member_uuid_);
104 return clientInvocation->invoke().get().get_first_fixed_sized_field<
bool>();
118 template<
typename HazelcastSerializable>
119 void execute(
const HazelcastSerializable &command) {
120 submit<HazelcastSerializable, executor_marker>(command);
130 template<
typename HazelcastSerializable>
131 void execute(
const HazelcastSerializable &command,
134 int selectedMember = rand() % (int) members.size();
135 execute_on_member<HazelcastSerializable>(command, members[selectedMember]);
144 template<
typename HazelcastSerializable,
typename K>
146 submit_to_key_owner<HazelcastSerializable, K>(command, key);
155 template<
typename HazelcastSerializable>
157 submit_to_member<HazelcastSerializable, executor_marker>(command,
member);
166 template<
typename HazelcastSerializable>
168 for (std::vector<member>::const_iterator it = members.begin(); it != members.end(); ++it) {
169 submit_to_member<HazelcastSerializable, executor_marker>(command, *it);
180 template<
typename HazelcastSerializable>
184 execute_on_members<HazelcastSerializable>(command, members);
192 template<
typename HazelcastSerializable>
194 std::vector<member> memberList = get_context().get_client_cluster_service().get_member_list();
195 for (std::vector<member>::const_iterator it = memberList.begin(); it != memberList.end(); ++it) {
196 submit_to_member<HazelcastSerializable, executor_marker>(command, *it);
209 template<
typename HazelcastSerializable,
typename T,
typename K>
212 return submit_to_key_owner_internal<HazelcastSerializable, T, K>(task, key,
false);
224 template<
typename HazelcastSerializable,
typename T>
227 return submit_to_target_internal<HazelcastSerializable, T>(task,
member,
false);
239 template<
typename HazelcastSerializable,
typename T>
240 std::unordered_map<member, executor_promise<T>>
242 std::unordered_map<member, executor_promise<T>> futureMap;
243 for (
auto &
member : members) {
244 auto f = submit_to_target_internal<HazelcastSerializable, T>(task,
member,
true);
246 futureMap.emplace(
member, std::move(f));
261 template<
typename HazelcastSerializable,
typename T>
262 std::unordered_map<member, executor_promise<T>>
266 return submit_to_members<HazelcastSerializable, T>(task, members);
277 template<
typename HazelcastSerializable,
typename T>
278 std::unordered_map<member, executor_promise<T>>
280 std::unordered_map<member, executor_promise<T>> futureMap;
281 for (
const auto &m : get_context().get_client_cluster_service().get_member_list()) {
282 auto f = submit_to_target_internal<HazelcastSerializable, T>(task, m,
true);
284 futureMap.emplace(m, std::move(f));
302 template<
typename HazelcastSerializable,
typename T>
304 submit(
const HazelcastSerializable &task) {
305 serialization::pimpl::data task_data = to_data<HazelcastSerializable>(task);
307 if (task_data.has_partition_hash()) {
308 int partitionId = get_partition_id(task_data);
310 return submit_to_partition_internal<T>(task_data,
false, partitionId);
312 return submit_to_random_internal<T>(task_data,
false);
326 template<
typename HazelcastSerializable,
typename T>
330 int selectedMember = rand() % (int) members.size();
331 return submit_to_member<HazelcastSerializable, T>(task, members[selectedMember]);
350 boost::future<bool> is_shutdown();
359 boost::future<bool> is_terminated();
364 struct executor_marker {};
366 std::vector<member> select_members(
const member_selector &member_selector);
370 submit_to_partition_internal(
const serialization::pimpl::data &task_data,
bool prevent_sync,
int partition_id) {
371 auto uuid = context_.random_uuid();
373 auto f = invoke_on_partition_internal(task_data, partition_id, uuid);
375 return check_sync<T>(f, uuid, partition_id, prevent_sync);
378 std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>>
379 invoke_on_partition_internal(
const serialization::pimpl::data &task_data,
int partition_id,
380 boost::uuids::uuid uuid) {
381 return invoke_on_partition_owner(
382 protocol::codec::executorservice_submittopartition_encode(name_, uuid, task_data), partition_id);
385 template<
typename HazelcastSerializable,
typename T,
typename K>
387 submit_to_key_owner_internal(
const HazelcastSerializable &task,
const K &key,
bool prevent_sync) {
389 serialization::pimpl::data dataKey = to_data<K>(key);
391 int partitionId = get_partition_id(dataKey);
393 return submit_to_partition_internal<T>(to_data<HazelcastSerializable>(task), prevent_sync, partitionId);
398 submit_to_random_internal(
const serialization::pimpl::data &task_data,
bool prevent_sync) {
400 int partitionId = random_partition_id();
402 return submit_to_partition_internal<T>(task_data, prevent_sync, partitionId);
405 template<
typename HazelcastSerializable,
typename T>
406 executor_promise<T> submit_to_target_internal(
const HazelcastSerializable &task,
const member &member,
408 boost::uuids::uuid uuid = context_.random_uuid();
410 auto f = invoke_on_target_internal<HazelcastSerializable>(task, member, uuid);
412 return check_sync<T>(f, uuid, -1, member, prevent_sync);
415 template<
typename HazelcastSerializable>
416 std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>>
417 invoke_on_target_internal(
const HazelcastSerializable &task,
const member &member,
418 boost::uuids::uuid uuid) {
419 return invoke_on_target(
420 protocol::codec::executorservice_submittomember_encode(name_, uuid, to_data(task), member.get_uuid()),
424 std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>>
425 invoke_on_partition_owner(protocol::ClientMessage &&request,
int partition_id);
427 std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>>
428 invoke_on_target(protocol::ClientMessage &&request, boost::uuids::uuid target);
432 retrieve_result_from_message(serialization::pimpl::SerializationService *serialization_service,
433 boost::future<protocol::ClientMessage> f) {
436 return serialization_service->to_object<T>(msg.get_nullable<serialization::pimpl::data>().get_ptr());
442 std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>> &future_pair,
443 boost::uuids::uuid uuid,
int partition_id,
bool prevent_sync) {
444 return check_sync<T>(future_pair, uuid, partition_id, member(), prevent_sync);
448 boost::future<boost::optional<T>>
449 retrieve_result_sync(boost::future<protocol::ClientMessage> future) {
451 auto response = retrieve_result_from_message<T>(&(get_serialization_service()), std::move(future));
452 return boost::make_ready_future(response);
453 }
catch (exception::iexception &) {
454 return boost::make_exceptional_future<boost::optional<T>>(boost::current_exception());
459 typename std::enable_if<!std::is_same<executor_marker, T>::value, executor_promise<T>>::type
461 std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>> &future_pair,
462 boost::uuids::uuid uuid,
int partition_id,
const member &member,
bool prevent_sync) {
463 bool sync = is_sync_computation(prevent_sync);
464 boost::future<boost::optional<T>> objectFuture;
466 objectFuture = retrieve_result_sync<T>(std::move(future_pair.first));
468 serialization::pimpl::SerializationService *serializationService = &get_serialization_service();
469 objectFuture = future_pair.first.then(boost::launch::sync,
470 [=](boost::future<protocol::ClientMessage> f) {
471 return retrieve_result_from_message<T>(
472 serializationService, std::move(f));
476 return executor_promise<T>(objectFuture, uuid, partition_id, member.get_uuid(), get_context(), future_pair.second);
480 typename std::enable_if<std::is_same<executor_marker, T>::value, executor_promise<T>>::type
482 std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>> &future_pair,
483 boost::uuids::uuid uuid,
int partition_id,
const member &member,
bool prevent_sync) {
484 bool sync = is_sync_computation(prevent_sync);
486 future_pair.first.get();
489 return executor_promise<T>(get_context());
492 bool is_sync_computation(
bool prevent_sync);
494 address get_member_address(
const member &member);
496 int random_partition_id();
498 static const int32_t MIN_TIME_RESOLUTION_OF_CONSECUTIVE_SUBMITS = 10;
499 static const int32_t MAX_CONSECUTIVE_SUBMITS = 100;
501 std::atomic<int32_t> consecutive_submits_;
502 std::atomic<int64_t> last_submit_time_;
507 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
Base class for all exception originated from Hazelcast methods.
Distributed implementation of java.util.concurrent.ExecutorService.
executor_promise< T > submit(const HazelcastSerializable &task)
Submits a task for execution and returns a executor_promise representing that task.
std::unordered_map< member, executor_promise< T > > submit_to_members(const HazelcastSerializable &task, const std::vector< member > &members)
Submits a task to given members and returns map of Member-executor_promise pairs representing pending...
std::unordered_map< member, executor_promise< T > > submit_to_members(const HazelcastSerializable &task, const member_selector &member_selector)
Submits a task to selected members and returns a map of Member-executor_promise pairs representing pe...
void execute_on_all_members(const HazelcastSerializable &command)
Executes a task on all of the known cluster members.
executor_promise< T > submit_to_member(const HazelcastSerializable &task, const member &member)
Submits a task to the specified member and returns a executor_promise representing that task.
void execute_on_members(const HazelcastSerializable &command, const std::vector< member > &members)
Executes a task on each of the specified members.
std::unordered_map< member, executor_promise< T > > submit_to_all_members(const HazelcastSerializable &task)
Submits task to all cluster members and returns a map of Member-executor_promise pairs representing p...
executor_promise< T > submit_to_key_owner(const HazelcastSerializable &task, const K &key)
Submits a task to the owner of the specified key and returns a executor_promise representing that tas...
void execute(const HazelcastSerializable &command)
Executes the given command at some time in the future.
void execute(const HazelcastSerializable &command, const member_selector &member_selector)
Executes a task on a randomly selected member.
void execute_on_member(const HazelcastSerializable &command, const member &member)
Executes a task on the specified member.
void execute_on_key_owner(const HazelcastSerializable &command, const K &key)
Executes a task on the owner of the specified key.
void execute_on_members(const HazelcastSerializable &command, const member_selector &member_selector)
Executes a task on each of the selected members.
executor_promise< T > submit(const HazelcastSerializable &task, const member_selector &member_selector)
Submits a task to a randomly selected member and returns a executor_promise representing that task.