Hazelcast C++ Client
Hazelcast C++ Client Library
iexecutor_service.h
1 /*
2  * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <vector>
19 #include <atomic>
20 
21 #include <boost/uuid/uuid_generators.hpp>
22 #include <boost/uuid/uuid_io.hpp>
23 
24 #include "hazelcast/util/export.h"
25 #include "hazelcast/client/member_selectors.h"
26 #include "hazelcast/client/proxy/ProxyImpl.h"
27 #include "hazelcast/client/member.h"
28 #include "hazelcast/client/spi/ClientContext.h"
29 #include "hazelcast/client/spi/impl/ClientInvocation.h"
30 #include "hazelcast/util/exception_util.h"
31 #include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
32 
33 // CODECs
34 #include "hazelcast/client/protocol/codec/codecs.h"
35 
36 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
37 #pragma warning(push)
38 #pragma warning(disable : 4251) // for dll export
39 #endif
40 
41 namespace hazelcast {
42 namespace client {
50 class HAZELCAST_API iexecutor_service : public proxy::ProxyImpl
51 {
52  friend class spi::ProxyManager;
53 
54 public:
55  static constexpr const char* SERVICE_NAME = "hz:impl:executorService";
56 
57  template<typename T>
59  {
60  public:
61  executor_promise(spi::ClientContext& context)
62  : context_(context)
63  {}
64 
66  boost::future<boost::optional<T>>& future,
67  boost::uuids::uuid uuid,
68  int partition_id,
69  boost::uuids::uuid member,
70  spi::ClientContext& context,
71  const std::shared_ptr<spi::impl::ClientInvocation>& invocation)
72  : shared_future_(future.share())
73  , uuid_(uuid)
74  , partition_id_(partition_id)
75  , member_uuid_(member)
76  , context_(context)
77  , invocation_(invocation)
78  {}
79 
80  bool cancel(bool may_interrupt_if_running)
81  {
82  if (shared_future_.is_ready()) {
83  return false;
84  }
85 
86  try {
87  return invoke_cancel_request(may_interrupt_if_running);
88  } catch (exception::iexception&) {
89  util::exception_util::rethrow(std::current_exception());
90  }
91  return false;
92  }
93 
94  boost::shared_future<boost::optional<T>> get_future()
95  {
96  return shared_future_;
97  }
98 
99  private:
100  boost::shared_future<boost::optional<T>> shared_future_;
101  boost::uuids::uuid uuid_;
102  int partition_id_;
103  boost::uuids::uuid member_uuid_;
104  spi::ClientContext& context_;
105  std::shared_ptr<spi::impl::ClientInvocation> invocation_;
106 
107  bool invoke_cancel_request(bool may_interrupt_if_running)
108  {
109  invocation_->wait_invoked();
110 
111  if (partition_id_ > -1) {
112  auto request =
113  protocol::codec::executorservice_cancelonpartition_encode(
114  uuid_, may_interrupt_if_running);
115  std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
116  spi::impl::ClientInvocation::create(
117  context_,
118  request,
119  boost::uuids::to_string(uuid_),
120  partition_id_);
121  return clientInvocation->invoke()
122  .get()
123  .get_first_fixed_sized_field<bool>();
124  } else {
125  auto request =
126  protocol::codec::executorservice_cancelonmember_encode(
127  uuid_, member_uuid_, may_interrupt_if_running);
128  std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
129  spi::impl::ClientInvocation::create(
130  context_,
131  request,
132  boost::uuids::to_string(uuid_),
133  member_uuid_);
134  return clientInvocation->invoke()
135  .get()
136  .get_first_fixed_sized_field<bool>();
137  }
138  }
139  };
140 
150  template<typename HazelcastSerializable>
151  void execute(const HazelcastSerializable& command)
152  {
153  submit<HazelcastSerializable, executor_marker>(command);
154  }
155 
164  template<typename HazelcastSerializable>
165  void execute(const HazelcastSerializable& command,
167  {
168  std::vector<member> members = select_members(member_selector);
169  int selectedMember = rand() % (int)members.size();
170  execute_on_member<HazelcastSerializable>(command,
171  members[selectedMember]);
172  }
173 
180  template<typename HazelcastSerializable, typename K>
181  void execute_on_key_owner(const HazelcastSerializable& command,
182  const K& key)
183  {
184  submit_to_key_owner<HazelcastSerializable, K>(command, key);
185  }
186 
193  template<typename HazelcastSerializable>
194  void execute_on_member(const HazelcastSerializable& command,
195  const member& member)
196  {
197  submit_to_member<HazelcastSerializable, executor_marker>(command,
198  member);
199  }
200 
207  template<typename HazelcastSerializable>
208  void execute_on_members(const HazelcastSerializable& command,
209  const std::vector<member>& members)
210  {
211  for (std::vector<member>::const_iterator it = members.begin();
212  it != members.end();
213  ++it) {
214  submit_to_member<HazelcastSerializable, executor_marker>(command,
215  *it);
216  }
217  }
218 
226  template<typename HazelcastSerializable>
227  void execute_on_members(const HazelcastSerializable& command,
229  {
230  std::vector<member> members = select_members(member_selector);
231  execute_on_members<HazelcastSerializable>(command, members);
232  }
233 
239  template<typename HazelcastSerializable>
240  void execute_on_all_members(const HazelcastSerializable& command)
241  {
242  std::vector<member> memberList =
243  get_context().get_client_cluster_service().get_member_list();
244  for (std::vector<member>::const_iterator it = memberList.begin();
245  it != memberList.end();
246  ++it) {
247  submit_to_member<HazelcastSerializable, executor_marker>(command,
248  *it);
249  }
250  }
251 
261  template<typename HazelcastSerializable, typename T, typename K>
262  executor_promise<T> submit_to_key_owner(const HazelcastSerializable& task,
263  const K& key)
264  {
265  return submit_to_key_owner_internal<HazelcastSerializable, T, K>(
266  task, key, false);
267  }
268 
278  template<typename HazelcastSerializable, typename T>
279  executor_promise<T> submit_to_member(const HazelcastSerializable& task,
280  const member& member)
281  {
282  return submit_to_target_internal<HazelcastSerializable, T>(
283  task, member, false);
284  }
285 
297  template<typename HazelcastSerializable, typename T>
298  std::unordered_map<member, executor_promise<T>> submit_to_members(
299  const HazelcastSerializable& task,
300  const std::vector<member>& members)
301  {
302  std::unordered_map<member, executor_promise<T>> futureMap;
303  for (auto& member : members) {
304  auto f = submit_to_target_internal<HazelcastSerializable, T>(
305  task, member, true);
306  // no need to check if emplace is success since member is unique
307  futureMap.emplace(member, std::move(f));
308  }
309  return futureMap;
310  }
311 
324  template<typename HazelcastSerializable, typename T>
325  std::unordered_map<member, executor_promise<T>> submit_to_members(
326  const HazelcastSerializable& task,
328  {
329  std::vector<member> members = select_members(member_selector);
330  return submit_to_members<HazelcastSerializable, T>(task, members);
331  }
332 
343  template<typename HazelcastSerializable, typename T>
344  std::unordered_map<member, executor_promise<T>> submit_to_all_members(
345  const HazelcastSerializable& task)
346  {
347  std::unordered_map<member, executor_promise<T>> futureMap;
348  for (const auto& m :
349  get_context().get_client_cluster_service().get_member_list()) {
350  auto f = submit_to_target_internal<HazelcastSerializable, T>(
351  task, m, true);
352  // no need to check if emplace is success since member is unique
353  futureMap.emplace(m, std::move(f));
354  }
355  return futureMap;
356  }
357 
372  template<typename HazelcastSerializable, typename T>
373  executor_promise<T> submit(const HazelcastSerializable& task)
374  {
375  serialization::pimpl::data task_data =
376  to_data<HazelcastSerializable>(task);
377 
378  if (task_data.has_partition_hash()) {
379  int partitionId = get_partition_id(task_data);
380 
381  return submit_to_partition_internal<T>(
382  task_data, false, partitionId);
383  } else {
384  return submit_to_random_internal<T>(task_data, false);
385  }
386  }
387 
398  template<typename HazelcastSerializable, typename T>
399  executor_promise<T> submit(const HazelcastSerializable& task,
401  {
402  std::vector<member> members = select_members(member_selector);
403  int selectedMember = rand() % (int)members.size();
404  return submit_to_member<HazelcastSerializable, T>(
405  task, members[selectedMember]);
406  }
407 
417  void shutdown();
418 
424  boost::future<bool> is_shutdown();
425 
433  boost::future<bool> is_terminated();
434 
435 private:
436  iexecutor_service(const std::string& name, spi::ClientContext* context);
437 
438  struct executor_marker
439  {};
440 
441  std::vector<member> select_members(const member_selector& member_selector);
442 
443  template<typename T>
444  executor_promise<T> submit_to_partition_internal(
445  const serialization::pimpl::data& task_data,
446  bool prevent_sync,
447  int partition_id)
448  {
449  auto uuid = context_.random_uuid();
450 
451  auto f = invoke_on_partition_internal(task_data, partition_id, uuid);
452 
453  return check_sync<T>(f, uuid, partition_id, prevent_sync);
454  }
455 
456  std::pair<boost::future<protocol::ClientMessage>,
457  std::shared_ptr<spi::impl::ClientInvocation>>
458  invoke_on_partition_internal(const serialization::pimpl::data& task_data,
459  int partition_id,
460  boost::uuids::uuid uuid)
461  {
462  return invoke_on_partition_owner(
463  protocol::codec::executorservice_submittopartition_encode(
464  name_, uuid, task_data),
465  partition_id);
466  }
467 
468  template<typename HazelcastSerializable, typename T, typename K>
469  executor_promise<T> submit_to_key_owner_internal(
470  const HazelcastSerializable& task,
471  const K& key,
472  bool prevent_sync)
473  {
474 
475  serialization::pimpl::data dataKey = to_data<K>(key);
476 
477  int partitionId = get_partition_id(dataKey);
478 
479  return submit_to_partition_internal<T>(
480  to_data<HazelcastSerializable>(task), prevent_sync, partitionId);
481  }
482 
483  template<typename T>
484  executor_promise<T> submit_to_random_internal(
485  const serialization::pimpl::data& task_data,
486  bool prevent_sync)
487  {
488 
489  int partitionId = random_partition_id();
490 
491  return submit_to_partition_internal<T>(
492  task_data, prevent_sync, partitionId);
493  }
494 
495  template<typename HazelcastSerializable, typename T>
496  executor_promise<T> submit_to_target_internal(
497  const HazelcastSerializable& task,
498  const member& member,
499  bool prevent_sync)
500  {
501  boost::uuids::uuid uuid = context_.random_uuid();
502 
503  auto f =
504  invoke_on_target_internal<HazelcastSerializable>(task, member, uuid);
505 
506  return check_sync<T>(f, uuid, -1, member, prevent_sync);
507  }
508 
509  template<typename HazelcastSerializable>
510  std::pair<boost::future<protocol::ClientMessage>,
511  std::shared_ptr<spi::impl::ClientInvocation>>
512  invoke_on_target_internal(const HazelcastSerializable& task,
513  const member& member,
514  boost::uuids::uuid uuid)
515  {
516  return invoke_on_target(
517  protocol::codec::executorservice_submittomember_encode(
518  name_, uuid, to_data(task), member.get_uuid()),
519  member.get_uuid());
520  }
521 
522  std::pair<boost::future<protocol::ClientMessage>,
523  std::shared_ptr<spi::impl::ClientInvocation>>
524  invoke_on_partition_owner(protocol::ClientMessage&& request,
525  int partition_id);
526 
527  std::pair<boost::future<protocol::ClientMessage>,
528  std::shared_ptr<spi::impl::ClientInvocation>>
529  invoke_on_target(protocol::ClientMessage&& request,
530  boost::uuids::uuid target);
531 
532  template<typename T>
533  executor_promise<T> check_sync(
534  std::pair<boost::future<protocol::ClientMessage>,
535  std::shared_ptr<spi::impl::ClientInvocation>>& future_pair,
536  boost::uuids::uuid uuid,
537  int partition_id,
538  bool prevent_sync)
539  {
540  return check_sync<T>(
541  future_pair, uuid, partition_id, member(), prevent_sync);
542  }
543 
544  template<typename T>
545  typename std::enable_if<!std::is_same<executor_marker, T>::value,
546  executor_promise<T>>::type
547  check_sync(
548  std::pair<boost::future<protocol::ClientMessage>,
549  std::shared_ptr<spi::impl::ClientInvocation>>& future_pair,
550  boost::uuids::uuid uuid,
551  int partition_id,
552  const member& member,
553  bool prevent_sync)
554  {
555  bool sync = is_sync_computation(prevent_sync);
556  auto objectFuture = to_object<T>(
557  decode<serialization::pimpl::data>(std::move(future_pair.first)));
558  if (sync) {
559  objectFuture.wait();
560  }
561 
562  return executor_promise<T>(objectFuture,
563  uuid,
564  partition_id,
565  member.get_uuid(),
566  get_context(),
567  future_pair.second);
568  }
569 
570  template<typename T>
571  typename std::enable_if<std::is_same<executor_marker, T>::value,
572  executor_promise<T>>::type
573  check_sync(
574  std::pair<boost::future<protocol::ClientMessage>,
575  std::shared_ptr<spi::impl::ClientInvocation>>& future_pair,
576  boost::uuids::uuid uuid,
577  int partition_id,
578  const member& member,
579  bool prevent_sync)
580  {
581  bool sync = is_sync_computation(prevent_sync);
582  if (sync) {
583  future_pair.first.get();
584  }
585 
586  return executor_promise<T>(get_context());
587  }
588 
589  bool is_sync_computation(bool prevent_sync);
590 
591  address get_member_address(const member& member);
592 
593  int random_partition_id();
594 
595  static const int32_t MIN_TIME_RESOLUTION_OF_CONSECUTIVE_SUBMITS = 10;
596  static const int32_t MAX_CONSECUTIVE_SUBMITS = 100;
597 
598  std::atomic<int32_t> consecutive_submits_;
599  std::atomic<int64_t> last_submit_time_;
600 };
601 } // namespace client
602 } // namespace hazelcast
603 
604 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
605 #pragma warning(pop)
606 #endif
Base class for all exception originated from Hazelcast methods.
Definition: iexception.h:49
Distributed implementation of java.util.concurrent.ExecutorService.
executor_promise< T > submit(const HazelcastSerializable &task)
Submits a task for execution and returns a executor_promise representing that task.
std::unordered_map< member, executor_promise< T > > submit_to_members(const HazelcastSerializable &task, const std::vector< member > &members)
Submits a task to given members and returns map of Member-executor_promise pairs representing pending...
std::unordered_map< member, executor_promise< T > > submit_to_members(const HazelcastSerializable &task, const member_selector &member_selector)
Submits a task to selected members and returns a map of Member-executor_promise pairs representing pe...
void execute_on_all_members(const HazelcastSerializable &command)
Executes a task on all of the known cluster members.
executor_promise< T > submit_to_member(const HazelcastSerializable &task, const member &member)
Submits a task to the specified member and returns a executor_promise representing that task.
void execute_on_members(const HazelcastSerializable &command, const std::vector< member > &members)
Executes a task on each of the specified members.
std::unordered_map< member, executor_promise< T > > submit_to_all_members(const HazelcastSerializable &task)
Submits task to all cluster members and returns a map of Member-executor_promise pairs representing p...
executor_promise< T > submit_to_key_owner(const HazelcastSerializable &task, const K &key)
Submits a task to the owner of the specified key and returns a executor_promise representing that tas...
void execute(const HazelcastSerializable &command)
Executes the given command at some time in the future.
void execute(const HazelcastSerializable &command, const member_selector &member_selector)
Executes a task on a randomly selected member.
void execute_on_member(const HazelcastSerializable &command, const member &member)
Executes a task on the specified member.
void execute_on_key_owner(const HazelcastSerializable &command, const K &key)
Executes a task on the owner of the specified key.
void execute_on_members(const HazelcastSerializable &command, const member_selector &member_selector)
Executes a task on each of the selected members.
executor_promise< T > submit(const HazelcastSerializable &task, const member_selector &member_selector)
Submits a task to a randomly selected member and returns a executor_promise representing that task.
hz_cluster member class.
Definition: member.h:62