Hazelcast C++ Client
Hazelcast C++ Client Library
iexecutor_service.h
1 /*
2  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <vector>
19 #include <atomic>
20 
21 #include <boost/uuid/uuid_generators.hpp>
22 #include <boost/uuid/uuid_io.hpp>
23 
24 #include "hazelcast/util/export.h"
25 #include "hazelcast/client/member_selectors.h"
26 #include "hazelcast/client/proxy/ProxyImpl.h"
27 #include "hazelcast/client/member.h"
28 #include "hazelcast/client/spi/ClientContext.h"
29 #include "hazelcast/client/spi/impl/ClientInvocation.h"
30 #include "hazelcast/util/exception_util.h"
31 #include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
32 
33 // CODECs
34 #include "hazelcast/client/protocol/codec/codecs.h"
35 
36 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
37 #pragma warning(push)
38 #pragma warning(disable: 4251) //for dll export
39 #endif
40 
41 namespace hazelcast {
42  namespace client {
50  class HAZELCAST_API iexecutor_service : public proxy::ProxyImpl {
51  friend class spi::ProxyManager;
52  public:
53  static constexpr const char *SERVICE_NAME = "hz:impl:executorService";
54 
55  template<typename T>
57  public:
58  executor_promise(spi::ClientContext &context) : context_(context) {}
59 
60  executor_promise(boost::future<boost::optional<T>> &future, boost::uuids::uuid uuid, int partition_id,
61  boost::uuids::uuid member, spi::ClientContext &context,
62  const std::shared_ptr<spi::impl::ClientInvocation> &invocation)
63  : shared_future_(future.share()), uuid_(uuid), partition_id_(partition_id), member_uuid_(member),
64  context_(context), invocation_(invocation) {}
65 
66  bool cancel(bool may_interrupt_if_running) {
67  if (shared_future_.is_ready()) {
68  return false;
69  }
70 
71  try {
72  return invoke_cancel_request(may_interrupt_if_running);
73  } catch (exception::iexception &) {
74  util::exception_util::rethrow(std::current_exception());
75  }
76  return false;
77  }
78 
79  boost::shared_future<boost::optional<T>> get_future() {
80  return shared_future_;
81  }
82 
83  private:
84  boost::shared_future<boost::optional<T>> shared_future_;
85  boost::uuids::uuid uuid_;
86  int partition_id_;
87  boost::uuids::uuid member_uuid_;
88  spi::ClientContext &context_;
89  std::shared_ptr<spi::impl::ClientInvocation> invocation_;
90 
91  bool invoke_cancel_request(bool may_interrupt_if_running) {
92  invocation_->wait_invoked();
93 
94  if (partition_id_ > -1) {
95  auto request = protocol::codec::executorservice_cancelonpartition_encode(uuid_, may_interrupt_if_running);
96  std::shared_ptr<spi::impl::ClientInvocation> clientInvocation = spi::impl::ClientInvocation::create(
97  context_, request, boost::uuids::to_string(uuid_), partition_id_);
98  return clientInvocation->invoke().get().get_first_fixed_sized_field<bool>();
99  } else {
100  auto request = protocol::codec::executorservice_cancelonmember_encode(
101  uuid_, member_uuid_, may_interrupt_if_running);
102  std::shared_ptr<spi::impl::ClientInvocation> clientInvocation = spi::impl::ClientInvocation::create(
103  context_, request, boost::uuids::to_string(uuid_), member_uuid_);
104  return clientInvocation->invoke().get().get_first_fixed_sized_field<bool>();
105  }
106  }
107  };
108 
118  template<typename HazelcastSerializable>
119  void execute(const HazelcastSerializable &command) {
120  submit<HazelcastSerializable, executor_marker>(command);
121  }
122 
130  template<typename HazelcastSerializable>
131  void execute(const HazelcastSerializable &command,
133  std::vector<member> members = select_members(member_selector);
134  int selectedMember = rand() % (int) members.size();
135  execute_on_member<HazelcastSerializable>(command, members[selectedMember]);
136  }
137 
144  template<typename HazelcastSerializable, typename K>
145  void execute_on_key_owner(const HazelcastSerializable &command, const K &key) {
146  submit_to_key_owner<HazelcastSerializable, K>(command, key);
147  }
148 
155  template<typename HazelcastSerializable>
156  void execute_on_member(const HazelcastSerializable &command, const member &member) {
157  submit_to_member<HazelcastSerializable, executor_marker>(command, member);
158  }
159 
166  template<typename HazelcastSerializable>
167  void execute_on_members(const HazelcastSerializable &command, const std::vector<member> &members) {
168  for (std::vector<member>::const_iterator it = members.begin(); it != members.end(); ++it) {
169  submit_to_member<HazelcastSerializable, executor_marker>(command, *it);
170  }
171  }
172 
180  template<typename HazelcastSerializable>
181  void execute_on_members(const HazelcastSerializable &command,
183  std::vector<member> members = select_members(member_selector);
184  execute_on_members<HazelcastSerializable>(command, members);
185  }
186 
192  template<typename HazelcastSerializable>
193  void execute_on_all_members(const HazelcastSerializable &command) {
194  std::vector<member> memberList = get_context().get_client_cluster_service().get_member_list();
195  for (std::vector<member>::const_iterator it = memberList.begin(); it != memberList.end(); ++it) {
196  submit_to_member<HazelcastSerializable, executor_marker>(command, *it);
197  }
198  }
199 
209  template<typename HazelcastSerializable, typename T, typename K>
210  executor_promise<T>
211  submit_to_key_owner(const HazelcastSerializable &task, const K &key) {
212  return submit_to_key_owner_internal<HazelcastSerializable, T, K>(task, key, false);
213  }
214 
224  template<typename HazelcastSerializable, typename T>
225  executor_promise<T>
226  submit_to_member(const HazelcastSerializable &task, const member &member) {
227  return submit_to_target_internal<HazelcastSerializable, T>(task, member, false);
228  }
229 
239  template<typename HazelcastSerializable, typename T>
240  std::unordered_map<member, executor_promise<T>>
241  submit_to_members(const HazelcastSerializable &task, const std::vector<member> &members) {
242  std::unordered_map<member, executor_promise<T>> futureMap;
243  for (auto &member : members) {
244  auto f = submit_to_target_internal<HazelcastSerializable, T>(task, member, true);
245  // no need to check if emplace is success since member is unique
246  futureMap.emplace(member, std::move(f));
247  }
248  return futureMap;
249  }
250 
261  template<typename HazelcastSerializable, typename T>
262  std::unordered_map<member, executor_promise<T>>
263  submit_to_members(const HazelcastSerializable &task,
265  std::vector<member> members = select_members(member_selector);
266  return submit_to_members<HazelcastSerializable, T>(task, members);
267  }
268 
277  template<typename HazelcastSerializable, typename T>
278  std::unordered_map<member, executor_promise<T>>
279  submit_to_all_members(const HazelcastSerializable &task) {
280  std::unordered_map<member, executor_promise<T>> futureMap;
281  for (const auto &m : get_context().get_client_cluster_service().get_member_list()) {
282  auto f = submit_to_target_internal<HazelcastSerializable, T>(task, m, true);
283  // no need to check if emplace is success since member is unique
284  futureMap.emplace(m, std::move(f));
285  }
286  return futureMap;
287  }
288 
302  template<typename HazelcastSerializable, typename T>
303  executor_promise<T>
304  submit(const HazelcastSerializable &task) {
305  serialization::pimpl::data task_data = to_data<HazelcastSerializable>(task);
306 
307  if (task_data.has_partition_hash()) {
308  int partitionId = get_partition_id(task_data);
309 
310  return submit_to_partition_internal<T>(task_data, false, partitionId);
311  } else {
312  return submit_to_random_internal<T>(task_data, false);
313  }
314  }
315 
326  template<typename HazelcastSerializable, typename T>
327  executor_promise<T>
328  submit(const HazelcastSerializable &task, const member_selector &member_selector) {
329  std::vector<member> members = select_members(member_selector);
330  int selectedMember = rand() % (int) members.size();
331  return submit_to_member<HazelcastSerializable, T>(task, members[selectedMember]);
332  }
333 
343  void shutdown();
344 
350  boost::future<bool> is_shutdown();
351 
359  boost::future<bool> is_terminated();
360 
361  private:
362  iexecutor_service(const std::string &name, spi::ClientContext *context);
363 
364  struct executor_marker {};
365 
366  std::vector<member> select_members(const member_selector &member_selector);
367 
368  template<typename T>
369  executor_promise<T>
370  submit_to_partition_internal(const serialization::pimpl::data &task_data, bool prevent_sync, int partition_id) {
371  auto uuid = context_.random_uuid();
372 
373  auto f = invoke_on_partition_internal(task_data, partition_id, uuid);
374 
375  return check_sync<T>(f, uuid, partition_id, prevent_sync);
376  }
377 
378  std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>>
379  invoke_on_partition_internal(const serialization::pimpl::data &task_data, int partition_id,
380  boost::uuids::uuid uuid) {
381  return invoke_on_partition_owner(
382  protocol::codec::executorservice_submittopartition_encode(name_, uuid, task_data), partition_id);
383  }
384 
385  template<typename HazelcastSerializable, typename T, typename K>
386  executor_promise<T>
387  submit_to_key_owner_internal(const HazelcastSerializable &task, const K &key, bool prevent_sync) {
388 
389  serialization::pimpl::data dataKey = to_data<K>(key);
390 
391  int partitionId = get_partition_id(dataKey);
392 
393  return submit_to_partition_internal<T>(to_data<HazelcastSerializable>(task), prevent_sync, partitionId);
394  }
395 
396  template<typename T>
397  executor_promise<T>
398  submit_to_random_internal(const serialization::pimpl::data &task_data, bool prevent_sync) {
399 
400  int partitionId = random_partition_id();
401 
402  return submit_to_partition_internal<T>(task_data, prevent_sync, partitionId);
403  }
404 
405  template<typename HazelcastSerializable, typename T>
406  executor_promise<T> submit_to_target_internal(const HazelcastSerializable &task, const member &member,
407  bool prevent_sync) {
408  boost::uuids::uuid uuid = context_.random_uuid();
409 
410  auto f = invoke_on_target_internal<HazelcastSerializable>(task, member, uuid);
411 
412  return check_sync<T>(f, uuid, -1, member, prevent_sync);
413  }
414 
415  template<typename HazelcastSerializable>
416  std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>>
417  invoke_on_target_internal(const HazelcastSerializable &task, const member &member,
418  boost::uuids::uuid uuid) {
419  return invoke_on_target(
420  protocol::codec::executorservice_submittomember_encode(name_, uuid, to_data(task), member.get_uuid()),
421  member.get_uuid());
422  }
423 
424  std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>>
425  invoke_on_partition_owner(protocol::ClientMessage &&request, int partition_id);
426 
427  std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>>
428  invoke_on_target(protocol::ClientMessage &&request, boost::uuids::uuid target);
429 
430  template<typename T>
431  boost::optional<T>
432  retrieve_result_from_message(serialization::pimpl::SerializationService *serialization_service,
433  boost::future<protocol::ClientMessage> f) {
434  auto msg = f.get();
435  msg.skip_frame();
436  return serialization_service->to_object<T>(msg.get_nullable<serialization::pimpl::data>().get_ptr());
437  }
438 
439  template<typename T>
440  executor_promise<T>
441  check_sync(
442  std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>> &future_pair,
443  boost::uuids::uuid uuid, int partition_id, bool prevent_sync) {
444  return check_sync<T>(future_pair, uuid, partition_id, member(), prevent_sync);
445  }
446 
447  template<typename T>
448  boost::future<boost::optional<T>>
449  retrieve_result_sync(boost::future<protocol::ClientMessage> future) {
450  try {
451  auto response = retrieve_result_from_message<T>(&(get_serialization_service()), std::move(future));
452  return boost::make_ready_future(response);
453  } catch (exception::iexception &) {
454  return boost::make_exceptional_future<boost::optional<T>>(boost::current_exception());
455  }
456  }
457 
458  template<typename T>
459  typename std::enable_if<!std::is_same<executor_marker, T>::value, executor_promise<T>>::type
460  check_sync(
461  std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>> &future_pair,
462  boost::uuids::uuid uuid, int partition_id, const member &member, bool prevent_sync) {
463  bool sync = is_sync_computation(prevent_sync);
464  boost::future<boost::optional<T>> objectFuture;
465  if (sync) {
466  objectFuture = retrieve_result_sync<T>(std::move(future_pair.first));
467  } else {
468  serialization::pimpl::SerializationService *serializationService = &get_serialization_service();
469  objectFuture = future_pair.first.then(boost::launch::sync,
470  [=](boost::future<protocol::ClientMessage> f) {
471  return retrieve_result_from_message<T>(
472  serializationService, std::move(f));
473  });
474  }
475 
476  return executor_promise<T>(objectFuture, uuid, partition_id, member.get_uuid(), get_context(), future_pair.second);
477  }
478 
479  template<typename T>
480  typename std::enable_if<std::is_same<executor_marker, T>::value, executor_promise<T>>::type
481  check_sync(
482  std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>> &future_pair,
483  boost::uuids::uuid uuid, int partition_id, const member &member, bool prevent_sync) {
484  bool sync = is_sync_computation(prevent_sync);
485  if (sync) {
486  future_pair.first.get();
487  }
488 
489  return executor_promise<T>(get_context());
490  }
491 
492  bool is_sync_computation(bool prevent_sync);
493 
494  address get_member_address(const member &member);
495 
496  int random_partition_id();
497 
498  static const int32_t MIN_TIME_RESOLUTION_OF_CONSECUTIVE_SUBMITS = 10;
499  static const int32_t MAX_CONSECUTIVE_SUBMITS = 100;
500 
501  std::atomic<int32_t> consecutive_submits_;
502  std::atomic<int64_t> last_submit_time_;
503  };
504  }
505 }
506 
507 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
508 #pragma warning(pop)
509 #endif
510 
511 
Base class for all exception originated from Hazelcast methods.
Definition: iexception.h:48
Distributed implementation of java.util.concurrent.ExecutorService.
executor_promise< T > submit(const HazelcastSerializable &task)
Submits a task for execution and returns a executor_promise representing that task.
std::unordered_map< member, executor_promise< T > > submit_to_members(const HazelcastSerializable &task, const std::vector< member > &members)
Submits a task to given members and returns map of Member-executor_promise pairs representing pending...
std::unordered_map< member, executor_promise< T > > submit_to_members(const HazelcastSerializable &task, const member_selector &member_selector)
Submits a task to selected members and returns a map of Member-executor_promise pairs representing pe...
void execute_on_all_members(const HazelcastSerializable &command)
Executes a task on all of the known cluster members.
executor_promise< T > submit_to_member(const HazelcastSerializable &task, const member &member)
Submits a task to the specified member and returns a executor_promise representing that task.
void execute_on_members(const HazelcastSerializable &command, const std::vector< member > &members)
Executes a task on each of the specified members.
std::unordered_map< member, executor_promise< T > > submit_to_all_members(const HazelcastSerializable &task)
Submits task to all cluster members and returns a map of Member-executor_promise pairs representing p...
executor_promise< T > submit_to_key_owner(const HazelcastSerializable &task, const K &key)
Submits a task to the owner of the specified key and returns a executor_promise representing that tas...
void execute(const HazelcastSerializable &command)
Executes the given command at some time in the future.
void execute(const HazelcastSerializable &command, const member_selector &member_selector)
Executes a task on a randomly selected member.
void execute_on_member(const HazelcastSerializable &command, const member &member)
Executes a task on the specified member.
void execute_on_key_owner(const HazelcastSerializable &command, const K &key)
Executes a task on the owner of the specified key.
void execute_on_members(const HazelcastSerializable &command, const member_selector &member_selector)
Executes a task on each of the selected members.
executor_promise< T > submit(const HazelcastSerializable &task, const member_selector &member_selector)
Submits a task to a randomly selected member and returns a executor_promise representing that task.
hz_cluster member class.
Definition: member.h:39