Hazelcast C++ Client
Hazelcast C++ Client Library
Loading...
Searching...
No Matches
iexecutor_service.h
1/*
2 * Copyright (c) 2008-2025, Hazelcast, Inc. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#pragma once
17
18#include <vector>
19#include <atomic>
20
21#include <boost/uuid/uuid_generators.hpp>
22#include <boost/uuid/uuid_io.hpp>
23
24#include "hazelcast/util/export.h"
25#include "hazelcast/client/member_selectors.h"
26#include "hazelcast/client/proxy/ProxyImpl.h"
27#include "hazelcast/client/member.h"
28#include "hazelcast/client/spi/ClientContext.h"
29#include "hazelcast/client/spi/impl/ClientInvocation.h"
30#include "hazelcast/util/exception_util.h"
31#include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
32
33// CODECs
34#include "hazelcast/client/protocol/codec/codecs.h"
35
36#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
37#pragma warning(push)
38#pragma warning(disable : 4251) // for dll export
39#endif
40
41namespace hazelcast {
42namespace client {
50class HAZELCAST_API iexecutor_service : public proxy::ProxyImpl
51{
52 friend class spi::ProxyManager;
53
54public:
55 static constexpr const char* SERVICE_NAME = "hz:impl:executorService";
56
57 template<typename T>
58 class executor_promise
59 {
60 public:
61 executor_promise(spi::ClientContext& context)
62 : context_(context)
63 {}
64
65 executor_promise(
66 boost::future<boost::optional<T>>& future,
67 boost::uuids::uuid uuid,
68 int partition_id,
69 boost::uuids::uuid member,
70 spi::ClientContext& context,
71 const std::shared_ptr<spi::impl::ClientInvocation>& invocation)
72 : shared_future_(future.share())
73 , uuid_(uuid)
74 , partition_id_(partition_id)
75 , member_uuid_(member)
76 , context_(context)
77 , invocation_(invocation)
78 {}
79
80 bool cancel(bool may_interrupt_if_running)
81 {
82 if (shared_future_.is_ready()) {
83 return false;
84 }
85
86 try {
87 return invoke_cancel_request(may_interrupt_if_running);
88 } catch (exception::iexception&) {
89 util::exception_util::rethrow(std::current_exception());
90 }
91 return false;
92 }
93
94 boost::shared_future<boost::optional<T>> get_future()
95 {
96 return shared_future_;
97 }
98
99 private:
100 boost::shared_future<boost::optional<T>> shared_future_;
101 boost::uuids::uuid uuid_;
102 int partition_id_;
103 boost::uuids::uuid member_uuid_;
104 spi::ClientContext& context_;
105 std::shared_ptr<spi::impl::ClientInvocation> invocation_;
106
107 bool invoke_cancel_request(bool may_interrupt_if_running)
108 {
109 invocation_->wait_invoked();
110
111 if (partition_id_ > -1) {
112 auto request =
113 protocol::codec::executorservice_cancelonpartition_encode(
114 uuid_, may_interrupt_if_running);
115 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
116 spi::impl::ClientInvocation::create(
117 context_,
118 request,
119 boost::uuids::to_string(uuid_),
120 partition_id_);
121 return clientInvocation->invoke()
122 .get()
123 .get_first_fixed_sized_field<bool>();
124 } else {
125 auto request =
126 protocol::codec::executorservice_cancelonmember_encode(
127 uuid_, member_uuid_, may_interrupt_if_running);
128 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
129 spi::impl::ClientInvocation::create(
130 context_,
131 request,
132 boost::uuids::to_string(uuid_),
133 member_uuid_);
134 return clientInvocation->invoke()
135 .get()
136 .get_first_fixed_sized_field<bool>();
137 }
138 }
139 };
140
150 template<typename HazelcastSerializable>
151 void execute(const HazelcastSerializable& command)
152 {
154 }
155
164 template<typename HazelcastSerializable>
165 void execute(const HazelcastSerializable& command,
167 {
168 std::vector<member> members = select_members(member_selector);
169 int selectedMember = rand() % (int)members.size();
171 members[selectedMember]);
172 }
173
180 template<typename HazelcastSerializable, typename K>
181 void execute_on_key_owner(const HazelcastSerializable& command,
182 const K& key)
183 {
185 }
186
193 template<typename HazelcastSerializable>
194 void execute_on_member(const HazelcastSerializable& command,
195 const member& member)
196 {
198 member);
199 }
200
207 template<typename HazelcastSerializable>
208 void execute_on_members(const HazelcastSerializable& command,
209 const std::vector<member>& members)
210 {
211 for (std::vector<member>::const_iterator it = members.begin();
212 it != members.end();
213 ++it) {
215 *it);
216 }
217 }
218
226 template<typename HazelcastSerializable>
227 void execute_on_members(const HazelcastSerializable& command,
229 {
230 std::vector<member> members = select_members(member_selector);
232 }
233
239 template<typename HazelcastSerializable>
240 void execute_on_all_members(const HazelcastSerializable& command)
241 {
242 std::vector<member> memberList =
243 get_context().get_client_cluster_service().get_member_list();
244 for (std::vector<member>::const_iterator it = memberList.begin();
245 it != memberList.end();
246 ++it) {
248 *it);
249 }
250 }
251
261 template<typename HazelcastSerializable, typename T, typename K>
262 executor_promise<T> submit_to_key_owner(const HazelcastSerializable& task,
263 const K& key)
264 {
265 return submit_to_key_owner_internal<HazelcastSerializable, T, K>(
266 task, key, false);
267 }
268
278 template<typename HazelcastSerializable, typename T>
279 executor_promise<T> submit_to_member(const HazelcastSerializable& task,
280 const member& member)
281 {
282 return submit_to_target_internal<HazelcastSerializable, T>(
283 task, member, false);
284 }
285
297 template<typename HazelcastSerializable, typename T>
298 std::unordered_map<member, executor_promise<T>> submit_to_members(
299 const HazelcastSerializable& task,
300 const std::vector<member>& members)
301 {
302 std::unordered_map<member, executor_promise<T>> futureMap;
303 for (auto& member : members) {
304 auto f = submit_to_target_internal<HazelcastSerializable, T>(
305 task, member, true);
306 // no need to check if emplace is success since member is unique
307 futureMap.emplace(member, std::move(f));
308 }
309 return futureMap;
310 }
311
324 template<typename HazelcastSerializable, typename T>
325 std::unordered_map<member, executor_promise<T>> submit_to_members(
326 const HazelcastSerializable& task,
328 {
329 std::vector<member> members = select_members(member_selector);
331 }
332
343 template<typename HazelcastSerializable, typename T>
344 std::unordered_map<member, executor_promise<T>> submit_to_all_members(
345 const HazelcastSerializable& task)
346 {
347 std::unordered_map<member, executor_promise<T>> futureMap;
348 for (const auto& m :
349 get_context().get_client_cluster_service().get_member_list()) {
350 auto f = submit_to_target_internal<HazelcastSerializable, T>(
351 task, m, true);
352 // no need to check if emplace is success since member is unique
353 futureMap.emplace(m, std::move(f));
354 }
355 return futureMap;
356 }
357
372 template<typename HazelcastSerializable, typename T>
373 executor_promise<T> submit(const HazelcastSerializable& task)
374 {
375 serialization::pimpl::data task_data =
376 to_data<HazelcastSerializable>(task);
377
378 if (task_data.has_partition_hash()) {
379 int partitionId = get_partition_id(task_data);
380
381 return submit_to_partition_internal<T>(
382 task_data, false, partitionId);
383 } else {
384 return submit_to_random_internal<T>(task_data, false);
385 }
386 }
387
398 template<typename HazelcastSerializable, typename T>
399 executor_promise<T> submit(const HazelcastSerializable& task,
401 {
402 std::vector<member> members = select_members(member_selector);
403 int selectedMember = rand() % (int)members.size();
405 task, members[selectedMember]);
406 }
407
417 void shutdown();
418
424 boost::future<bool> is_shutdown();
425
433 boost::future<bool> is_terminated();
434
435private:
436 iexecutor_service(const std::string& name, spi::ClientContext* context);
437
438 struct executor_marker
439 {};
440
441 std::vector<member> select_members(const member_selector& member_selector);
442
443 template<typename T>
444 executor_promise<T> submit_to_partition_internal(
445 const serialization::pimpl::data& task_data,
446 bool prevent_sync,
447 int partition_id)
448 {
449 auto uuid = context_.random_uuid();
450
451 auto f = invoke_on_partition_internal(task_data, partition_id, uuid);
452
453 return check_sync<T>(f, uuid, partition_id, prevent_sync);
454 }
455
456 std::pair<boost::future<protocol::ClientMessage>,
457 std::shared_ptr<spi::impl::ClientInvocation>>
458 invoke_on_partition_internal(const serialization::pimpl::data& task_data,
459 int partition_id,
460 boost::uuids::uuid uuid)
461 {
462 return invoke_on_partition_owner(
463 protocol::codec::executorservice_submittopartition_encode(
464 name_, uuid, task_data),
465 partition_id);
466 }
467
468 template<typename HazelcastSerializable, typename T, typename K>
469 executor_promise<T> submit_to_key_owner_internal(
470 const HazelcastSerializable& task,
471 const K& key,
472 bool prevent_sync)
473 {
474
475 serialization::pimpl::data dataKey = to_data<K>(key);
476
477 int partitionId = get_partition_id(dataKey);
478
479 return submit_to_partition_internal<T>(
480 to_data<HazelcastSerializable>(task), prevent_sync, partitionId);
481 }
482
483 template<typename T>
484 executor_promise<T> submit_to_random_internal(
485 const serialization::pimpl::data& task_data,
486 bool prevent_sync)
487 {
488
489 int partitionId = random_partition_id();
490
491 return submit_to_partition_internal<T>(
492 task_data, prevent_sync, partitionId);
493 }
494
495 template<typename HazelcastSerializable, typename T>
496 executor_promise<T> submit_to_target_internal(
497 const HazelcastSerializable& task,
498 const member& member,
499 bool prevent_sync)
500 {
501 boost::uuids::uuid uuid = context_.random_uuid();
502
503 auto f =
504 invoke_on_target_internal<HazelcastSerializable>(task, member, uuid);
505
506 return check_sync<T>(f, uuid, -1, member, prevent_sync);
507 }
508
509 template<typename HazelcastSerializable>
510 std::pair<boost::future<protocol::ClientMessage>,
511 std::shared_ptr<spi::impl::ClientInvocation>>
512 invoke_on_target_internal(const HazelcastSerializable& task,
513 const member& member,
514 boost::uuids::uuid uuid)
515 {
516 return invoke_on_target(
517 protocol::codec::executorservice_submittomember_encode(
518 name_, uuid, to_data(task), member.get_uuid()),
519 member.get_uuid());
520 }
521
522 std::pair<boost::future<protocol::ClientMessage>,
523 std::shared_ptr<spi::impl::ClientInvocation>>
524 invoke_on_partition_owner(protocol::ClientMessage&& request,
525 int partition_id);
526
527 std::pair<boost::future<protocol::ClientMessage>,
528 std::shared_ptr<spi::impl::ClientInvocation>>
529 invoke_on_target(protocol::ClientMessage&& request,
530 boost::uuids::uuid target);
531
532 template<typename T>
533 executor_promise<T> check_sync(
534 std::pair<boost::future<protocol::ClientMessage>,
535 std::shared_ptr<spi::impl::ClientInvocation>>& future_pair,
536 boost::uuids::uuid uuid,
537 int partition_id,
538 bool prevent_sync)
539 {
540 return check_sync<T>(
541 future_pair, uuid, partition_id, member(), prevent_sync);
542 }
543
544 template<typename T>
545 typename std::enable_if<!std::is_same<executor_marker, T>::value,
546 executor_promise<T>>::type
547 check_sync(
548 std::pair<boost::future<protocol::ClientMessage>,
549 std::shared_ptr<spi::impl::ClientInvocation>>& future_pair,
550 boost::uuids::uuid uuid,
551 int partition_id,
552 const member& member,
553 bool prevent_sync)
554 {
555 bool sync = is_sync_computation(prevent_sync);
556 auto objectFuture = to_object<T>(
557 decode<serialization::pimpl::data>(std::move(future_pair.first)));
558 if (sync) {
559 objectFuture.wait();
560 }
561
562 return executor_promise<T>(objectFuture,
563 uuid,
564 partition_id,
565 member.get_uuid(),
566 get_context(),
567 future_pair.second);
568 }
569
570 template<typename T>
571 typename std::enable_if<std::is_same<executor_marker, T>::value,
572 executor_promise<T>>::type
573 check_sync(
574 std::pair<boost::future<protocol::ClientMessage>,
575 std::shared_ptr<spi::impl::ClientInvocation>>& future_pair,
576 boost::uuids::uuid uuid,
577 int partition_id,
578 const member& member,
579 bool prevent_sync)
580 {
581 bool sync = is_sync_computation(prevent_sync);
582 if (sync) {
583 future_pair.first.get();
584 }
585
586 return executor_promise<T>(get_context());
587 }
588
589 bool is_sync_computation(bool prevent_sync);
590
591 address get_member_address(const member& member);
592
593 int random_partition_id();
594
595 static const int32_t MIN_TIME_RESOLUTION_OF_CONSECUTIVE_SUBMITS = 10;
596 static const int32_t MAX_CONSECUTIVE_SUBMITS = 100;
597
598 std::atomic<int32_t> consecutive_submits_;
599 std::atomic<int64_t> last_submit_time_;
600};
601} // namespace client
602} // namespace hazelcast
603
604#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
605#pragma warning(pop)
606#endif
Base class for all exception originated from Hazelcast methods.
Definition iexception.h:49
Distributed implementation of java.util.concurrent.ExecutorService.
void execute_on_all_members(const HazelcastSerializable &command)
Executes a task on all of the known cluster members.
void execute_on_members(const HazelcastSerializable &command, const std::vector< member > &members)
Executes a task on each of the specified members.
executor_promise< T > submit_to_member(const HazelcastSerializable &task, const member &member)
Submits a task to the specified member and returns a executor_promise representing that task.
std::unordered_map< member, executor_promise< T > > submit_to_members(const HazelcastSerializable &task, const member_selector &member_selector)
Submits a task to selected members and returns a map of Member-executor_promise pairs representing pe...
executor_promise< T > submit_to_key_owner(const HazelcastSerializable &task, const K &key)
Submits a task to the owner of the specified key and returns a executor_promise representing that tas...
std::unordered_map< member, executor_promise< T > > submit_to_all_members(const HazelcastSerializable &task)
Submits task to all cluster members and returns a map of Member-executor_promise pairs representing p...
void execute(const HazelcastSerializable &command)
Executes the given command at some time in the future.
void execute(const HazelcastSerializable &command, const member_selector &member_selector)
Executes a task on a randomly selected member.
executor_promise< T > submit(const HazelcastSerializable &task)
Submits a task for execution and returns a executor_promise representing that task.
void execute_on_member(const HazelcastSerializable &command, const member &member)
Executes a task on the specified member.
std::unordered_map< member, executor_promise< T > > submit_to_members(const HazelcastSerializable &task, const std::vector< member > &members)
Submits a task to given members and returns map of Member-executor_promise pairs representing pending...
void execute_on_key_owner(const HazelcastSerializable &command, const K &key)
Executes a task on the owner of the specified key.
void execute_on_members(const HazelcastSerializable &command, const member_selector &member_selector)
Executes a task on each of the selected members.
executor_promise< T > submit(const HazelcastSerializable &task, const member_selector &member_selector)
Submits a task to a randomly selected member and returns a executor_promise representing that task.
hz_cluster member class.
Definition member.h:62