Hazelcast C++ Client
Hazelcast C++ Client Library
client_impl.cpp
1 /*
2  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
19  *
20  * Licensed under the Apache License, Version 2.0 (the "License");
21  * you may not use this file except in compliance with the License.
22  * You may obtain a copy of the License at
23  *
24  * http://www.apache.org/licenses/LICENSE-2.0
25  *
26  * Unless required by applicable law or agreed to in writing, software
27  * distributed under the License is distributed on an "AS IS" BASIS,
28  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29  * See the License for the specific language governing permissions and
30  * limitations under the License.
31  */
32 
33 #include <utility>
34 #include <vector>
35 #include <functional>
36 
37 #include <boost/format.hpp>
38 
39 #include "hazelcast/util/Util.h"
40 #include "hazelcast/util/IOUtil.h"
41 #include "hazelcast/client/hazelcast_client.h"
42 #include "hazelcast/client/transaction_context.h"
43 #include "hazelcast/client/cluster.h"
44 #include "hazelcast/client/spi/lifecycle_service.h"
45 #include "hazelcast/client/lifecycle_listener.h"
46 #include "hazelcast/client/exception/protocol_exceptions.h"
47 #include "hazelcast/client/aws/aws_client.h"
48 #include "hazelcast/client/spi/impl/discovery/cloud_discovery.h"
49 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
50 #include "hazelcast/client/impl/ClientLockReferenceIdGenerator.h"
51 #include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
52 #include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
53 #include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
54 #include "hazelcast/client/spi/impl/sequence/CallIdFactory.h"
55 #include "hazelcast/client/spi/impl/DefaultAddressProvider.h"
56 #include "hazelcast/client/spi/impl/discovery/remote_address_provider.h"
57 #include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
58 #include "hazelcast/client/load_balancer.h"
59 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
60 #include "hazelcast/client/proxy/flake_id_generator_impl.h"
61 #include "hazelcast/logger.h"
62 #include "hazelcast/client/member_selectors.h"
63 #include "hazelcast/client/client_properties.h"
64 
65 #ifndef HAZELCAST_VERSION
66 #define HAZELCAST_VERSION "NOT_FOUND"
67 #endif
68 
69 namespace hazelcast {
70  namespace client {
71  hazelcast_client::hazelcast_client() : client_impl_(new impl::hazelcast_client_instance_impl(client_config())) {
72  client_impl_->start();
73  }
74 
75  hazelcast_client::hazelcast_client(client_config config) : client_impl_(
76  new impl::hazelcast_client_instance_impl(std::move(config))) {
77  client_impl_->start();
78  }
79 
80  const std::string &hazelcast_client::get_name() const {
81  return client_impl_->get_name();
82  }
83 
84  client_config &hazelcast_client::get_client_config() {
85  return client_impl_->get_client_config();
86  }
87 
88  transaction_context hazelcast_client::new_transaction_context() {
89  return client_impl_->new_transaction_context();
90  }
91 
92  transaction_context hazelcast_client::new_transaction_context(const transaction_options &options) {
93  return client_impl_->new_transaction_context(options);
94  }
95 
96  cluster &hazelcast_client::get_cluster() {
97  return client_impl_->get_cluster();
98  }
99 
100  boost::uuids::uuid hazelcast_client::add_lifecycle_listener(lifecycle_listener &&lifecycle_listener) {
101  return client_impl_->add_lifecycle_listener(std::move(lifecycle_listener));
102  }
103 
104  bool hazelcast_client::remove_lifecycle_listener(const boost::uuids::uuid &registration_id) {
105  return client_impl_->remove_lifecycle_listener(registration_id);
106  }
107 
108  boost::future<void> hazelcast_client::shutdown() {
109  return boost::async([=]() { client_impl_->shutdown(); });
110  }
111 
112  spi::lifecycle_service &hazelcast_client::get_lifecycle_service() {
113  return client_impl_->get_lifecycle_service();
114  }
115 
116  local_endpoint hazelcast_client::get_local_endpoint() const {
117  return client_impl_->get_local_endpoint();
118  }
119 
120  hazelcast_client::~hazelcast_client() = default;
121 
122  cp::cp_subsystem &hazelcast_client::get_cp_subsystem() {
123  return client_impl_->get_cp_subsystem();
124  }
125 
126  const boost::string_view version() {
127  return HAZELCAST_VERSION;
128  }
129 
130  namespace impl {
131  std::atomic<int32_t> hazelcast_client_instance_impl::CLIENT_ID(0);
132 
133  hazelcast_client_instance_impl::hazelcast_client_instance_impl(client_config config)
134  : client_config_(std::move(config)), client_properties_(client_config_.get_properties()),
135  client_context_(*this),
136  serialization_service_(client_config_.get_serialization_config()),
137  cluster_service_(client_context_),
138  transaction_manager_(client_context_), cluster_(cluster_service_),
139  lifecycle_service_(client_context_, client_config_.get_lifecycle_listeners()),
140  proxy_manager_(client_context_),
141  id_(++CLIENT_ID), random_generator_(std::random_device{}()),
142  uuid_generator_{random_generator_},
143  cp_subsystem_(client_context_), proxy_session_manager_(client_context_) {
144  auto &name = client_config_.get_instance_name();
145  if (name) {
146  instance_name_ = *name;
147  } else {
148  std::ostringstream out;
149  out << "hz.client_" << id_;
150  instance_name_ = out.str();
151  }
152 
153  auto logger_config = client_config_.get_logger_config();
154  logger_ = std::make_shared<logger>(instance_name_, client_config_.get_cluster_name(),
155  logger_config.level(), logger_config.handler());
156 
157  execution_service_ = init_execution_service();
158 
159  initalize_near_cache_manager();
160 
161  int32_t maxAllowedConcurrentInvocations = client_properties_.get_integer(
162  client_properties_.get_max_concurrent_invocations());
163  int64_t backofftimeoutMs = client_properties_.get_long(
164  client_properties_.get_backpressure_backoff_timeout_millis());
165  bool isBackPressureEnabled = maxAllowedConcurrentInvocations != INT32_MAX;
166  call_id_sequence_ = spi::impl::sequence::CallIdFactory::new_call_id_sequence(isBackPressureEnabled,
167  maxAllowedConcurrentInvocations,
168  backofftimeoutMs);
169 
170  auto address_provider = create_address_provider();
171 
172  connection_manager_ = std::make_shared<connection::ClientConnectionManagerImpl>(
173  client_context_, std::move(address_provider));
174 
175  cluster_listener_.reset(new spi::impl::listener::cluster_view_listener(client_context_));
176 
177  partition_service_.reset(new spi::impl::ClientPartitionServiceImpl(client_context_));
178 
179  invocation_service_.reset(new spi::impl::ClientInvocationServiceImpl(client_context_));
180 
181  listener_service_ = init_listener_service();
182 
183  proxy_manager_.init();
184 
185  lock_reference_id_generator_.reset(new impl::ClientLockReferenceIdGenerator());
186 
187  statistics_.reset(new statistics::Statistics(client_context_));
188  }
189 
190  hazelcast_client_instance_impl::~hazelcast_client_instance_impl() {
191  shutdown();
192  }
193 
194  void hazelcast_client_instance_impl::start() {
195  lifecycle_service_.fire_lifecycle_event(lifecycle_event::STARTING);
196 
197  try {
198  if (!lifecycle_service_.start()) {
199  lifecycle_service_.shutdown();
200  BOOST_THROW_EXCEPTION(exception::illegal_state("hazelcast_client",
201  "hazelcast_client could not be started!"));
202  }
203  } catch (std::exception &) {
204  lifecycle_service_.shutdown();
205  throw;
206  }
207  }
208 
209  client_config &hazelcast_client_instance_impl::get_client_config() {
210  return client_config_;
211  }
212 
213  cluster &hazelcast_client_instance_impl::get_cluster() {
214  return cluster_;
215  }
216 
217  boost::uuids::uuid
218  hazelcast_client_instance_impl::add_lifecycle_listener(lifecycle_listener &&lifecycle_listener) {
219  return lifecycle_service_.add_listener(std::move(lifecycle_listener));
220  }
221 
222  bool hazelcast_client_instance_impl::remove_lifecycle_listener(const boost::uuids::uuid &registration_id) {
223  return lifecycle_service_.remove_listener(registration_id);
224  }
225 
226  void hazelcast_client_instance_impl::shutdown() {
227  lifecycle_service_.shutdown();
228  }
229 
230  transaction_context hazelcast_client_instance_impl::new_transaction_context() {
231  transaction_options defaultOptions;
232  return new_transaction_context(defaultOptions);
233  }
234 
235  transaction_context
236  hazelcast_client_instance_impl::new_transaction_context(const transaction_options &options) {
237  return transaction_context(transaction_manager_, options);
238  }
239 
240  internal::nearcache::NearCacheManager &hazelcast_client_instance_impl::get_near_cache_manager() {
241  return *near_cache_manager_;
242  }
243 
244  serialization::pimpl::SerializationService &hazelcast_client_instance_impl::get_serialization_service() {
245  return serialization_service_;
246  }
247 
248  const protocol::ClientExceptionFactory &hazelcast_client_instance_impl::get_exception_factory() const {
249  return exception_factory_;
250  }
251 
252  std::shared_ptr<spi::impl::listener::listener_service_impl>
253  hazelcast_client_instance_impl::init_listener_service() {
254  auto eventThreadCount = client_properties_.get_integer(client_properties_.get_event_thread_count());
255  return std::make_shared<spi::impl::listener::listener_service_impl>(client_context_, eventThreadCount);
256  }
257 
258  std::shared_ptr<spi::impl::ClientExecutionServiceImpl>
259  hazelcast_client_instance_impl::init_execution_service() {
260  return std::make_shared<spi::impl::ClientExecutionServiceImpl>(instance_name_, client_properties_,
261  client_config_.get_executor_pool_size(),
262  lifecycle_service_);
263  }
264 
265  void hazelcast_client_instance_impl::on_cluster_restart() {
266  HZ_LOG(*logger_, info,
267  "Clearing local state of the client, because of a cluster restart");
268 
269  near_cache_manager_->clear_all_near_caches();
270  //clear the member list
271  cluster_service_.clear_member_list();
272  }
273 
274  std::unique_ptr<connection::AddressProvider> hazelcast_client_instance_impl::create_address_provider() {
275  config::client_network_config &networkConfig = get_client_config().get_network_config();
276  config::client_aws_config &awsConfig = networkConfig.get_aws_config();
277  config::cloud_config &cloudConfig = networkConfig.get_cloud_config();
278 
279  auto addresses = networkConfig.get_addresses();
280  bool addressListProvided = !addresses.empty();
281  bool awsDiscoveryEnabled = awsConfig.is_enabled();
282  bool cloud_enabled = cloudConfig.enabled;
283 
284  check_discovery_configuration_consistency(addressListProvided, awsDiscoveryEnabled, cloud_enabled);
285 
286  auto connect_timeout = networkConfig.get_connection_timeout();
287  if (cloud_enabled) {
288  auto cloud_provider = std::make_shared<spi::impl::discovery::cloud_discovery>(cloudConfig,
289  client_properties_.get_string(client_properties_.cloud_base_url()),
290  connect_timeout);
291  return std::unique_ptr<connection::AddressProvider>(
292  new spi::impl::discovery::remote_address_provider([=]() {
293  return cloud_provider->get_addresses();
294  }, true));
295  }
296 
297  if (awsDiscoveryEnabled) {
298  auto aws_addr_provider = std::make_shared<aws::aws_client>(connect_timeout, awsConfig,
299  client_properties_, *logger_);
300  return std::unique_ptr<connection::AddressProvider>(
301  new spi::impl::discovery::remote_address_provider([=]() {
302  return aws_addr_provider->get_addresses();
303  }, !awsConfig.is_inside_aws()));
304  }
305 
306  return std::unique_ptr<connection::AddressProvider>(
307  new spi::impl::DefaultAddressProvider(networkConfig));
308  }
309 
310  const std::string &hazelcast_client_instance_impl::get_name() const {
311  return instance_name_;
312  }
313 
314  spi::lifecycle_service &hazelcast_client_instance_impl::get_lifecycle_service() {
315  return lifecycle_service_;
316  }
317 
318  const std::shared_ptr<ClientLockReferenceIdGenerator> &
319  hazelcast_client_instance_impl::get_lock_reference_id_generator() const {
320  return lock_reference_id_generator_;
321  }
322 
323  spi::ProxyManager &hazelcast_client_instance_impl::get_proxy_manager() {
324  return proxy_manager_;
325  }
326 
327  void hazelcast_client_instance_impl::initalize_near_cache_manager() {
328  near_cache_manager_.reset(
329  new internal::nearcache::NearCacheManager(execution_service_, serialization_service_,
330  *logger_));
331  }
332 
333  local_endpoint hazelcast_client_instance_impl::get_local_endpoint() const {
334  return cluster_service_.get_local_client();
335  }
336 
337  template<>
338  boost::shared_future<std::shared_ptr<imap>>
339  hazelcast_client_instance_impl::get_distributed_object(const std::string &name) {
340  auto nearCacheConfig = client_config_.get_near_cache_config(name);
341  if (nearCacheConfig) {
342  return proxy_manager_.get_or_create_proxy<map::NearCachedClientMapProxy<
343  serialization::pimpl::data, serialization::pimpl::data >>(
344  imap::SERVICE_NAME, name).then(boost::launch::sync,
345  [=](boost::shared_future<std::shared_ptr<
346  map::NearCachedClientMapProxy<
347  serialization::pimpl::data, serialization::pimpl::data>>>
348  f) {
349  return std::static_pointer_cast<imap>(f.get());
350  });
351  } else {
352  return proxy_manager_.get_or_create_proxy<imap>(imap::SERVICE_NAME, name);
353  }
354  }
355 
356  const std::shared_ptr<logger> &hazelcast_client_instance_impl::get_logger() const {
357  return logger_;
358  }
359 
360  boost::uuids::uuid hazelcast_client_instance_impl::random_uuid() {
361  std::lock_guard<std::mutex> g(uuid_generator_lock_);
362  return uuid_generator_();
363  }
364 
365  cp::cp_subsystem &hazelcast_client_instance_impl::get_cp_subsystem() {
366  return cp_subsystem_;
367  }
368 
369  void hazelcast_client_instance_impl::check_discovery_configuration_consistency(bool address_list_provided,
370  bool aws_enabled,
371  bool cloud_enabled) {
372  int count = 0;
373  if (address_list_provided) count++;
374  if (aws_enabled) count++;
375  if (cloud_enabled) count++;
376  if (count > 1) {
377  throw exception::illegal_state(
378  "hazelcast_client_instance_impl::check_discovery_configuration_consistency",
379  (boost::format("Only one discovery method can be enabled at a time. cluster "
380  "members given explicitly : %1%, aws discovery: %2%, hazelcast.cloud enabled : %3%")
381  % address_list_provided % aws_enabled % cloud_enabled).str());
382  }
383  }
384 
385  BaseEventHandler::~BaseEventHandler() = default;
386 
387  BaseEventHandler::BaseEventHandler() : logger_(nullptr) {
388  }
389 
390  void BaseEventHandler::set_logger(logger *lg) {
391  BaseEventHandler::logger_ = lg;
392  }
393 
394  logger *BaseEventHandler::get_logger() const {
395  return logger_;
396  }
397  }
398 
399  constexpr int address::ID;
400 
401  address::address() : host_("localhost"), type_(IPV4), scope_id_(0) {
402  }
403 
404  address::address(std::string url, int port)
405  : host_(std::move(url)), port_(port), type_(IPV4), scope_id_(0) {
406  }
407 
408  address::address(std::string hostname, int port, unsigned long scope_id) : host_(std::move(hostname)),
409  port_(port),
410  type_(IPV6), scope_id_(scope_id) {
411  }
412 
413  bool address::operator==(const address &rhs) const {
414  return rhs.port_ == port_ && rhs.type_ == type_ && 0 == rhs.host_.compare(host_);
415  }
416 
417  bool address::operator!=(const address &rhs) const {
418  return !(*this == rhs);
419  }
420 
421  int address::get_port() const {
422  return port_;
423  }
424 
425  const std::string &address::get_host() const {
426  return host_;
427  }
428 
429  bool address::operator<(const address &rhs) const {
430  if (host_ < rhs.host_) {
431  return true;
432  }
433  if (rhs.host_ < host_) {
434  return false;
435  }
436  if (port_ < rhs.port_) {
437  return true;
438  }
439  if (rhs.port_ < port_) {
440  return false;
441  }
442  return type_ < rhs.type_;
443  }
444 
445  bool address::is_ip_v4() const {
446  return type_ == IPV4;
447  }
448 
449  unsigned long address::get_scope_id() const {
450  return scope_id_;
451  }
452 
453  std::string address::to_string() const {
454  std::ostringstream out;
455  out << "Address[" << get_host() << ":" << get_port() << "]";
456  return out.str();
457  }
458 
459  std::ostream &operator<<(std::ostream &stream, const address &address) {
460  return stream << address.to_string();
461  }
462 
463  namespace serialization {
464  int32_t hz_serializer<address>::get_factory_id() {
465  return F_ID;
466  }
467 
468  int32_t hz_serializer<address>::get_class_id() {
469  return ADDRESS;
470  }
471 
472  void hz_serializer<address>::write_data(const address &object, object_data_output &out) {
473  out.write<int32_t>(object.port_);
474  out.write<byte>(object.type_);
475  out.write(object.host_);
476  }
477 
478  address hz_serializer<address>::read_data(object_data_input &in) {
479  address object;
480  object.port_ = in.read<int32_t>();
481  object.type_ = in.read<byte>();
482  object.host_ = in.read<std::string>();
483  return object;
484  }
485  }
486 
487  iexecutor_service::iexecutor_service(const std::string &name, spi::ClientContext *context) : ProxyImpl(
488  SERVICE_NAME, name, context), consecutive_submits_(0), last_submit_time_(0) {
489  }
490 
491  std::vector<member>
492  iexecutor_service::select_members(const member_selector &member_selector) {
493  std::vector<member> selected;
494  std::vector<member> members = get_context().get_client_cluster_service().get_member_list();
495  for (const member &member : members) {
496  if (member_selector.select(member)) {
497  selected.push_back(member);
498  }
499  }
500  if (selected.empty()) {
501  BOOST_THROW_EXCEPTION(exception::rejected_execution("IExecutorService::selectMembers",
502  "No member could be selected with member selector"));
503  }
504  return selected;
505  }
506 
507  std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>>
508  iexecutor_service::invoke_on_target(protocol::ClientMessage &&request, boost::uuids::uuid target) {
509  try {
510  std::shared_ptr<spi::impl::ClientInvocation> clientInvocation = spi::impl::ClientInvocation::create(
511  get_context(), request, get_name(), target);
512  return std::make_pair(clientInvocation->invoke(), clientInvocation);
513  } catch (exception::iexception &) {
514  util::exception_util::rethrow(std::current_exception());
515  }
516  return std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>>();
517  }
518 
519  std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>>
520  iexecutor_service::invoke_on_partition_owner(protocol::ClientMessage &&request, int partition_id) {
521  try {
522  std::shared_ptr<spi::impl::ClientInvocation> clientInvocation = spi::impl::ClientInvocation::create(
523  get_context(), request, get_name(), partition_id);
524  return std::make_pair(clientInvocation->invoke(), clientInvocation);
525  } catch (exception::iexception &) {
526  util::exception_util::rethrow(std::current_exception());
527  }
528  return std::pair<boost::future<protocol::ClientMessage>, std::shared_ptr<spi::impl::ClientInvocation>>();
529  }
530 
531  bool iexecutor_service::is_sync_computation(bool prevent_sync) {
532  int64_t now = util::current_time_millis();
533 
534  int64_t last = last_submit_time_;
535  last_submit_time_ = now;
536 
537  if (last + MIN_TIME_RESOLUTION_OF_CONSECUTIVE_SUBMITS < now) {
538  consecutive_submits_ = 0;
539  return false;
540  }
541 
542  return !prevent_sync && (consecutive_submits_++ % MAX_CONSECUTIVE_SUBMITS == 0);
543  }
544 
545  address iexecutor_service::get_member_address(const member &member) {
546  auto m = get_context().get_client_cluster_service().get_member(member.get_uuid());
547  if (!m) {
548  throw (exception::exception_builder<exception::hazelcast_>(
549  "IExecutorService::getMemberAddress(Member)") << member << " is not available!").build();
550  }
551  return m->get_address();
552  }
553 
554  int iexecutor_service::random_partition_id() {
555  auto &partitionService = get_context().get_partition_service();
556  return rand() % partitionService.get_partition_count();
557  }
558 
560  auto request = protocol::codec::executorservice_shutdown_encode(
561  get_name());
562  invoke(request);
563  }
564 
565  boost::future<bool> iexecutor_service::is_shutdown() {
566  auto request = protocol::codec::executorservice_isshutdown_encode(
567  get_name());
568  return invoke_and_get_future<bool>(
569  request);
570  }
571 
572  boost::future<bool> iexecutor_service::is_terminated() {
573  return is_shutdown();
574  }
575 
576  const std::string client_properties::PROP_HEARTBEAT_TIMEOUT = "hazelcast_client_heartbeat_timeout";
577  const std::string client_properties::PROP_HEARTBEAT_TIMEOUT_DEFAULT = "60000";
578  const std::string client_properties::PROP_HEARTBEAT_INTERVAL = "hazelcast_client_heartbeat_interval";
579  const std::string client_properties::PROP_HEARTBEAT_INTERVAL_DEFAULT = "5000";
580  const std::string client_properties::PROP_REQUEST_RETRY_COUNT = "hazelcast_client_request_retry_count";
581  const std::string client_properties::PROP_REQUEST_RETRY_COUNT_DEFAULT = "20";
582  const std::string client_properties::PROP_REQUEST_RETRY_WAIT_TIME = "hazelcast_client_request_retry_wait_time";
583  const std::string client_properties::PROP_REQUEST_RETRY_WAIT_TIME_DEFAULT = "1";
584 
585  const std::string client_properties::PROP_AWS_MEMBER_PORT = "hz-port";
586  const std::string client_properties::PROP_AWS_MEMBER_PORT_DEFAULT = "5701";
587 
588  const std::string client_properties::INVOCATION_RETRY_PAUSE_MILLIS = "hazelcast.client.invocation.retry.pause.millis";
589  const std::string client_properties::INVOCATION_RETRY_PAUSE_MILLIS_DEFAULT = "1000";
590 
591  const std::string client_properties::INVOCATION_TIMEOUT_SECONDS = "hazelcast.client.invocation.timeout.seconds";
592  const std::string client_properties::INVOCATION_TIMEOUT_SECONDS_DEFAULT = "120";
593 
594  const std::string client_properties::EVENT_THREAD_COUNT = "hazelcast.client.event.thread.count";
595  const std::string client_properties::EVENT_THREAD_COUNT_DEFAULT = "5";
596 
597  const std::string client_properties::INTERNAL_EXECUTOR_POOL_SIZE = "hazelcast.client.internal.executor.pool.size";
598  const std::string client_properties::INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT = "3";
599 
600  const std::string client_properties::SHUFFLE_MEMBER_LIST = "hazelcast.client.shuffle.member.list";
601  const std::string client_properties::SHUFFLE_MEMBER_LIST_DEFAULT = "true";
602 
603  const std::string client_properties::MAX_CONCURRENT_INVOCATIONS = "hazelcast.client.max.concurrent.invocations";
604  const std::string client_properties::MAX_CONCURRENT_INVOCATIONS_DEFAULT = util::IOUtil::to_string<int32_t>(
605  INT32_MAX);
606 
607  const std::string client_properties::BACKPRESSURE_BACKOFF_TIMEOUT_MILLIS = "hazelcast.client.invocation.backoff.timeout.millis";
608  const std::string client_properties::BACKPRESSURE_BACKOFF_TIMEOUT_MILLIS_DEFAULT = "-1";
609 
610  const std::string client_properties::STATISTICS_ENABLED = "hazelcast.client.statistics.enabled";
611  const std::string client_properties::STATISTICS_ENABLED_DEFAULT = "false";
612 
613  const std::string client_properties::STATISTICS_PERIOD_SECONDS = "hazelcast.client.statistics.period.seconds";
614  const std::string client_properties::STATISTICS_PERIOD_SECONDS_DEFAULT = "3";
615 
616  client_property::client_property(const std::string &name, const std::string &default_value)
617  : name_(name), default_value_(default_value) {
618  }
619 
620  const std::string &client_property::get_name() const {
621  return name_;
622  }
623 
624  const std::string &client_property::get_default_value() const {
625  return default_value_;
626  }
627 
629 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
630  #pragma warning(push)
631 #pragma warning(disable: 4996) //for 'getenv': This function or variable may be unsafe.
632 #endif
633  return std::getenv(name_.c_str());
634 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
635  #pragma warning(pop)
636 #endif
637  }
638 
639  client_properties::client_properties(const std::unordered_map<std::string, std::string> &properties)
640  : heartbeat_timeout_(PROP_HEARTBEAT_TIMEOUT, PROP_HEARTBEAT_TIMEOUT_DEFAULT),
641  heartbeat_interval_(PROP_HEARTBEAT_INTERVAL, PROP_HEARTBEAT_INTERVAL_DEFAULT),
642  retry_count_(PROP_REQUEST_RETRY_COUNT, PROP_REQUEST_RETRY_COUNT_DEFAULT),
643  retry_wait_time_(PROP_REQUEST_RETRY_WAIT_TIME, PROP_REQUEST_RETRY_WAIT_TIME_DEFAULT),
644  aws_member_port_(PROP_AWS_MEMBER_PORT, PROP_AWS_MEMBER_PORT_DEFAULT),
645  invocation_retry_pause_millis_(INVOCATION_RETRY_PAUSE_MILLIS,
646  INVOCATION_RETRY_PAUSE_MILLIS_DEFAULT),
647  invocation_timeout_seconds_(INVOCATION_TIMEOUT_SECONDS,
648  INVOCATION_TIMEOUT_SECONDS_DEFAULT),
649  event_thread_count_(EVENT_THREAD_COUNT, EVENT_THREAD_COUNT_DEFAULT),
650  internal_executor_pool_size_(INTERNAL_EXECUTOR_POOL_SIZE,
651  INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT),
652  shuffle_member_list_(SHUFFLE_MEMBER_LIST, SHUFFLE_MEMBER_LIST_DEFAULT),
653  max_concurrent_invocations_(MAX_CONCURRENT_INVOCATIONS,
654  MAX_CONCURRENT_INVOCATIONS_DEFAULT),
655  backpressure_backoff_timeout_millis_(BACKPRESSURE_BACKOFF_TIMEOUT_MILLIS,
656  BACKPRESSURE_BACKOFF_TIMEOUT_MILLIS_DEFAULT),
657  statistics_enabled_(STATISTICS_ENABLED, STATISTICS_ENABLED_DEFAULT),
658  statistics_period_seconds_(STATISTICS_PERIOD_SECONDS, STATISTICS_PERIOD_SECONDS_DEFAULT),
659  backup_timeout_millis_(OPERATION_BACKUP_TIMEOUT_MILLIS, OPERATION_BACKUP_TIMEOUT_MILLIS_DEFAULT),
660  fail_on_indeterminate_state_(FAIL_ON_INDETERMINATE_OPERATION_STATE,
661  FAIL_ON_INDETERMINATE_OPERATION_STATE_DEFAULT),
662  cloud_base_url_(CLOUD_URL_BASE, CLOUD_URL_BASE_DEFAULT),
663  properties_map_(properties) {
664  }
665 
666  const client_property &client_properties::get_heartbeat_timeout() const {
667  return heartbeat_timeout_;
668  }
669 
670  const client_property &client_properties::get_heartbeat_interval() const {
671  return heartbeat_interval_;
672  }
673 
674  const client_property &client_properties::get_aws_member_port() const {
675  return aws_member_port_;
676  }
677 
678  const client_property &client_properties::get_invocation_retry_pause_millis() const {
679  return invocation_retry_pause_millis_;
680  }
681 
682  const client_property &client_properties::get_invocation_timeout_seconds() const {
683  return invocation_timeout_seconds_;
684  }
685 
686  const client_property &client_properties::get_event_thread_count() const {
687  return event_thread_count_;
688  }
689 
690  const client_property &client_properties::get_internal_executor_pool_size() const {
691  return internal_executor_pool_size_;
692  }
693 
694  const client_property &client_properties::get_shuffle_member_list() const {
695  return shuffle_member_list_;
696  }
697 
698  const client_property &client_properties::get_max_concurrent_invocations() const {
699  return max_concurrent_invocations_;
700  }
701 
702  const client_property &client_properties::get_backpressure_backoff_timeout_millis() const {
703  return backpressure_backoff_timeout_millis_;
704  }
705 
706  const client_property &client_properties::get_statistics_enabled() const {
707  return statistics_enabled_;
708  }
709 
710  const client_property &client_properties::get_statistics_period_seconds() const {
711  return statistics_period_seconds_;
712  }
713 
714  std::string client_properties::get_string(const client_property &property) const {
715  std::unordered_map<std::string, std::string>::const_iterator valueIt = properties_map_.find(
716  property.get_name());
717  if (valueIt != properties_map_.end()) {
718  return valueIt->second;
719  }
720 
721  const char *value = property.get_system_property();
722  if (value != NULL) {
723  return value;
724  }
725 
726  return property.get_default_value();
727  }
728 
729  bool client_properties::get_boolean(const client_property &property) const {
730  return util::IOUtil::to_value<bool>(get_string(property));
731  }
732 
733  int32_t client_properties::get_integer(const client_property &property) const {
734  return util::IOUtil::to_value<int32_t>(get_string(property));
735  }
736 
737  int64_t client_properties::get_long(const client_property &property) const {
738  return util::IOUtil::to_value<int64_t>(get_string(property));
739  }
740 
741  const client_property &client_properties::backup_timeout_millis() const {
742  return backup_timeout_millis_;
743  }
744 
745  const client_property &client_properties::fail_on_indeterminate_state() const {
746  return fail_on_indeterminate_state_;
747  }
748 
749  const client_property &client_properties::cloud_base_url() const {
750  return cloud_base_url_;
751  }
752 
753  namespace exception {
754  iexception::iexception(const std::string &exception_name, const std::string &source,
755  const std::string &message, const std::string &details, int32_t error_no,
756  std::exception_ptr cause, bool is_runtime, bool retryable)
757  : src_(source), msg_(message), details_(details), error_code_(error_no), cause_(cause),
758  runtime_exception_(is_runtime), retryable_(retryable), report_((boost::format(
759  "%1% {%2%. Error code:%3%, Details:%4%.} at %5%.") % exception_name % message % error_no %
760  details % source).str()) {
761  }
762 
763  iexception::~iexception() noexcept = default;
764 
765  char const *iexception::what() const noexcept {
766  return report_.c_str();
767  }
768 
769  const std::string &iexception::get_source() const {
770  return src_;
771  }
772 
773  const std::string &iexception::get_message() const {
774  return msg_;
775  }
776 
777  std::ostream &operator<<(std::ostream &os, const iexception &exception) {
778  os << exception.what();
779  return os;
780  }
781 
782  const std::string &iexception::get_details() const {
783  return details_;
784  }
785 
786  int32_t iexception::get_error_code() const {
787  return error_code_;
788  }
789 
790  bool iexception::is_runtime() const {
791  return runtime_exception_;
792  }
793 
794  bool iexception::is_retryable() const {
795  return retryable_;
796  }
797 
798  iexception::iexception() = default;
799 
800  retryable_hazelcast::retryable_hazelcast(const std::string &source,
801  const std::string &message,
802  const std::string &details,
803  std::exception_ptr cause)
804  : retryable_hazelcast(
805  "retryable_hazelcast", protocol::RETRYABLE_HAZELCAST, source, message, details, cause, true,
806  true) {}
807 
808  retryable_hazelcast::retryable_hazelcast(const std::string &error_name, int32_t error_code,
809  const std::string &source,
810  const std::string &message,
811  const std::string &details, std::exception_ptr cause,
812  bool runtime, bool retryable) : hazelcast_(error_name,
813  error_code,
814  source,
815  message,
816  details,
817  cause,
818  runtime,
819  retryable) {}
820 
821  member_left::member_left(const std::string &source, const std::string &message,
822  const std::string &details, std::exception_ptr cause)
823  : execution("member_left", protocol::MEMBER_LEFT, source, message, details,
824  cause, false, true) {}
825 
826  consistency_lost::consistency_lost(const std::string &source, const std::string &message,
827  const std::string &details, std::exception_ptr cause)
828  : hazelcast_("consistency_lost", protocol::CONSISTENCY_LOST_EXCEPTION, source, message,
829  details, cause, true, false) {}
830  }
831  }
832 
833  boost::future<client::hazelcast_client> new_client() {
834  return boost::async([]() { return client::hazelcast_client(); });
835  }
836 
837  boost::future<client::hazelcast_client> new_client(client::client_config config) {
838  return boost::async(
839  [](client::client_config &&c) {
840  return client::hazelcast_client(std::move(c));
841  },
842  std::move(config)
843  );
844  }
845 }
846 
847 namespace std {
848  std::size_t hash<hazelcast::client::address>::operator()(const hazelcast::client::address &address) const noexcept {
849  return std::hash<std::string>()(address.get_host()) + std::hash<int>()(address.get_port()) +
850  std::hash<unsigned long>()(address.type_);
851  }
852 }
Represents an address of a client or member in the cluster.
Definition: address.h:36
const std::string & get_host() const
bool operator==(const address &address) const
bool operator!=(const address &address) const
hazelcast_client configuration class.
static const std::string PROP_HEARTBEAT_INTERVAL
Time interval in milliseconds between the heartbeats sent by the client to the nodes.
static const std::string PROP_REQUEST_RETRY_COUNT
Client will retry requests which either inherently retryable(idempotent client) or client_network_con...
static const std::string PROP_REQUEST_RETRY_WAIT_TIME
Client will retry requests which either inherently retryable(idempotent client) or client_network_con...
static const std::string MAX_CONCURRENT_INVOCATIONS
The maximum number of concurrent invocations allowed.
static const std::string SHUFFLE_MEMBER_LIST
Client shuffles the given member list to prevent all clients to connect to the same node when this pr...
static const std::string STATISTICS_PERIOD_SECONDS
The period in seconds the statistics sent to the cluster.
static const std::string EVENT_THREAD_COUNT
Number of the threads to handle the incoming event packets.
static const std::string STATISTICS_ENABLED
Use to enable the client statistics collection.
static const std::string INVOCATION_RETRY_PAUSE_MILLIS
Pause time between each retry cycle of an invocation in milliseconds.
std::string get_string(const client_property &property) const
Returns the configured value of a ClientProperty as std::string.
int32_t get_integer(const client_property &property) const
Returns the configured int32_t value of a ClientProperty.
static const std::string PROP_AWS_MEMBER_PORT
The discovery mechanism will discover only IP addresses.
static const std::string PROP_HEARTBEAT_TIMEOUT
Client will be sending heartbeat messages to members and this is the timeout.
static const std::string INVOCATION_TIMEOUT_SECONDS
When an invocation gets an exception because :
bool get_boolean(const client_property &property) const
Returns the configured boolean value of a ClientProperty.
int64_t get_long(const client_property &property) const
Returns the configured int64_t value of a ClientProperty.
static const std::string BACKPRESSURE_BACKOFF_TIMEOUT_MILLIS
Control the maximum timeout in millis to wait for an invocation space to be available.
A client property is a configuration for hazelcast client.
const char * get_system_property() const
Gets the system environment property value of the property.
Hazelcast cluster interface.
Definition: cluster.h:36
Base class for all exception originated from Hazelcast methods.
Definition: iexception.h:48
boost::future< bool > is_shutdown()
Returns.
boost::future< bool > is_terminated()
Returns.
void shutdown()
Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will...
Listener object for listening lifecycle events of hazelcast instance.
The Client interface allows to get information about a connected client's socket address,...
Provides a context to do transactional operations; so beginning/committing transactions,...
Contains the configuration for a Hazelcast transaction.
CP Subsystem is a component of Hazelcast that builds a strongly consistent layer for a set of distrib...
Definition: cp.h:1300