Hazelcast C++ Client
Hazelcast C++ Client Library
client_impl.cpp
1 /*
2  * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <utility>
18 #include <vector>
19 #include <functional>
20 
21 #include <boost/format.hpp>
22 
23 #include "hazelcast/util/Util.h"
24 #include "hazelcast/util/IOUtil.h"
25 #include "hazelcast/client/hazelcast_client.h"
26 #include "hazelcast/client/transaction_context.h"
27 #include "hazelcast/client/cluster.h"
28 #include "hazelcast/client/spi/lifecycle_service.h"
29 #include "hazelcast/client/lifecycle_listener.h"
30 #include "hazelcast/client/exception/protocol_exceptions.h"
31 #include "hazelcast/client/aws/aws_client.h"
32 #include "hazelcast/client/spi/impl/discovery/cloud_discovery.h"
33 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
34 #include "hazelcast/client/impl/ClientLockReferenceIdGenerator.h"
35 #include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
36 #include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
37 #include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
38 #include "hazelcast/client/spi/impl/sequence/CallIdFactory.h"
39 #include "hazelcast/client/spi/impl/DefaultAddressProvider.h"
40 #include "hazelcast/client/spi/impl/discovery/remote_address_provider.h"
41 #include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
42 #include "hazelcast/client/load_balancer.h"
43 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
44 #include "hazelcast/client/proxy/flake_id_generator_impl.h"
45 #include "hazelcast/logger.h"
46 #include "hazelcast/client/member_selectors.h"
47 #include "hazelcast/client/client_properties.h"
48 #include "hazelcast/client/big_decimal.h"
49 #include "hazelcast/client/local_time.h"
50 #include "hazelcast/client/local_date.h"
51 #include "hazelcast/client/local_date_time.h"
52 #include "hazelcast/client/offset_date_time.h"
53 #ifndef HAZELCAST_VERSION
54 #define HAZELCAST_VERSION "NOT_FOUND"
55 #endif
56 
57 namespace hazelcast {
58 namespace client {
59 hazelcast_client::hazelcast_client()
60  : client_impl_(
61  new impl::hazelcast_client_instance_impl(*this, client_config()))
62 {
63  client_impl_->start();
64 }
65 
66 hazelcast_client::hazelcast_client(client_config config)
67  : client_impl_(
68  new impl::hazelcast_client_instance_impl(*this, std::move(config)))
69 {
70  client_impl_->start();
71 }
72 
73 const std::string&
74 hazelcast_client::get_name() const
75 {
76  return client_impl_->get_name();
77 }
78 
80 hazelcast_client::get_client_config()
81 {
82  return client_impl_->get_client_config();
83 }
84 
86 hazelcast_client::new_transaction_context()
87 {
88  return client_impl_->new_transaction_context();
89 }
90 
92 hazelcast_client::new_transaction_context(const transaction_options& options)
93 {
94  return client_impl_->new_transaction_context(options);
95 }
96 
97 cluster&
98 hazelcast_client::get_cluster()
99 {
100  return client_impl_->get_cluster();
101 }
102 
103 boost::uuids::uuid
104 hazelcast_client::add_lifecycle_listener(
106 {
107  return client_impl_->add_lifecycle_listener(std::move(lifecycle_listener));
108 }
109 
110 bool
111 hazelcast_client::remove_lifecycle_listener(
112  const boost::uuids::uuid& registration_id)
113 {
114  return client_impl_->remove_lifecycle_listener(registration_id);
115 }
116 
117 boost::future<void>
118 hazelcast_client::shutdown()
119 {
120  return boost::async([=]() { client_impl_->shutdown(); });
121 }
122 
123 spi::lifecycle_service&
124 hazelcast_client::get_lifecycle_service()
125 {
126  return client_impl_->get_lifecycle_service();
127 }
128 
130 hazelcast_client::get_local_endpoint() const
131 {
132  return client_impl_->get_local_endpoint();
133 }
134 
135 hazelcast_client::~hazelcast_client() = default;
136 
138 hazelcast_client::get_cp_subsystem()
139 {
140  return client_impl_->get_cp_subsystem();
141 }
142 
144 hazelcast_client::get_sql()
145 {
146  return client_impl_->get_sql();
147 }
148 
149 const boost::string_view
150 version()
151 {
152  return HAZELCAST_VERSION;
153 }
154 
155 namespace impl {
156 std::atomic<int32_t> hazelcast_client_instance_impl::CLIENT_ID(0);
157 
158 hazelcast_client_instance_impl::hazelcast_client_instance_impl(
160  client_config config)
161  : client_config_(std::move(config))
162  , client_properties_(client_config_.get_properties())
163  , client_context_(*this)
164  , serialization_service_(client_config_.get_serialization_config())
165  , cluster_service_(client_context_)
166  , transaction_manager_(client_context_)
167  , cluster_(cluster_service_)
168  , lifecycle_service_(client_context_,
169  client_config_.get_lifecycle_listeners())
170  , proxy_manager_(client_context_)
171  , id_(++CLIENT_ID)
172  , random_generator_(std::random_device{}())
173  , uuid_generator_{ random_generator_ }
174  , cp_subsystem_(client_context_)
175  , sql_service_(client_context_)
176  , proxy_session_manager_(client_context_)
177 {
178  auto& name = client_config_.get_instance_name();
179  if (name) {
180  instance_name_ = *name;
181  } else {
182  std::ostringstream out;
183  out << "hz.client_" << id_;
184  instance_name_ = out.str();
185  }
186 
187  auto logger_config = client_config_.get_logger_config();
188  logger_ = std::make_shared<logger>(instance_name_,
189  client_config_.get_cluster_name(),
190  logger_config.level(),
191  logger_config.handler());
192 
193  execution_service_ = init_execution_service();
194 
195  initalize_near_cache_manager();
196 
197  int32_t maxAllowedConcurrentInvocations = client_properties_.get_integer(
198  client_properties_.get_max_concurrent_invocations());
199  int64_t backofftimeoutMs = client_properties_.get_long(
200  client_properties_.get_backpressure_backoff_timeout_millis());
201  bool isBackPressureEnabled = maxAllowedConcurrentInvocations != INT32_MAX;
202  call_id_sequence_ =
203  spi::impl::sequence::CallIdFactory::new_call_id_sequence(
204  isBackPressureEnabled,
205  maxAllowedConcurrentInvocations,
206  backofftimeoutMs);
207 
208  auto address_provider = create_address_provider();
209 
210  connection_manager_ =
211  std::make_shared<connection::ClientConnectionManagerImpl>(
212  client_context_, std::move(address_provider));
213 
214  cluster_listener_.reset(
215  new spi::impl::listener::cluster_view_listener(client_context_));
216 
217  partition_service_.reset(
218  new spi::impl::ClientPartitionServiceImpl(client_context_));
219 
220  invocation_service_.reset(
221  new spi::impl::ClientInvocationServiceImpl(client_context_));
222 
223  listener_service_ = init_listener_service();
224 
225  proxy_manager_.init();
226 
227  lock_reference_id_generator_.reset(
228  new impl::ClientLockReferenceIdGenerator());
229 
230  statistics_.reset(new statistics::Statistics(client_context_));
231 }
232 
233 hazelcast_client_instance_impl::~hazelcast_client_instance_impl()
234 {
235  shutdown();
236 }
237 
238 void
239 hazelcast_client_instance_impl::start()
240 {
241  lifecycle_service_.fire_lifecycle_event(lifecycle_event::STARTING);
242 
243  try {
244  if (!lifecycle_service_.start()) {
245  lifecycle_service_.shutdown();
246  BOOST_THROW_EXCEPTION(exception::illegal_state(
247  "hazelcast_client", "hazelcast_client could not be started!"));
248  }
249  } catch (std::exception&) {
250  lifecycle_service_.shutdown();
251  throw;
252  }
253 }
254 
255 client_config&
256 hazelcast_client_instance_impl::get_client_config()
257 {
258  return client_config_;
259 }
260 
261 cluster&
262 hazelcast_client_instance_impl::get_cluster()
263 {
264  return cluster_;
265 }
266 
267 boost::uuids::uuid
268 hazelcast_client_instance_impl::add_lifecycle_listener(
269  lifecycle_listener&& lifecycle_listener)
270 {
271  return lifecycle_service_.add_listener(std::move(lifecycle_listener));
272 }
273 
274 bool
275 hazelcast_client_instance_impl::remove_lifecycle_listener(
276  const boost::uuids::uuid& registration_id)
277 {
278  return lifecycle_service_.remove_listener(registration_id);
279 }
280 
281 void
282 hazelcast_client_instance_impl::shutdown()
283 {
284  lifecycle_service_.shutdown();
285 }
286 
287 transaction_context
288 hazelcast_client_instance_impl::new_transaction_context()
289 {
290  transaction_options defaultOptions;
291  return new_transaction_context(defaultOptions);
292 }
293 
294 transaction_context
295 hazelcast_client_instance_impl::new_transaction_context(
296  const transaction_options& options)
297 {
298  return transaction_context(transaction_manager_, options);
299 }
300 
301 internal::nearcache::NearCacheManager&
302 hazelcast_client_instance_impl::get_near_cache_manager()
303 {
304  return *near_cache_manager_;
305 }
306 
307 serialization::pimpl::SerializationService&
308 hazelcast_client_instance_impl::get_serialization_service()
309 {
310  return serialization_service_;
311 }
312 
313 const protocol::ClientExceptionFactory&
314 hazelcast_client_instance_impl::get_exception_factory() const
315 {
316  return exception_factory_;
317 }
318 
319 std::shared_ptr<spi::impl::listener::listener_service_impl>
320 hazelcast_client_instance_impl::init_listener_service()
321 {
322  auto eventThreadCount = client_properties_.get_integer(
323  client_properties_.get_event_thread_count());
324  return std::make_shared<spi::impl::listener::listener_service_impl>(
325  client_context_, eventThreadCount);
326 }
327 
328 std::shared_ptr<spi::impl::ClientExecutionServiceImpl>
329 hazelcast_client_instance_impl::init_execution_service()
330 {
331  return std::make_shared<spi::impl::ClientExecutionServiceImpl>(
332  instance_name_,
333  client_properties_,
334  client_config_.get_executor_pool_size(),
335  lifecycle_service_);
336 }
337 
338 void
339 hazelcast_client_instance_impl::on_cluster_restart()
340 {
341  HZ_LOG(*logger_,
342  info,
343  "Clearing local state of the client, because of a cluster restart");
344 
345  near_cache_manager_->clear_all_near_caches();
346  // clear the member list
347  cluster_service_.clear_member_list();
348 }
349 
350 std::unique_ptr<connection::AddressProvider>
351 hazelcast_client_instance_impl::create_address_provider()
352 {
353  config::client_network_config& networkConfig =
354  get_client_config().get_network_config();
355  config::client_aws_config& awsConfig = networkConfig.get_aws_config();
356  config::cloud_config& cloudConfig = networkConfig.get_cloud_config();
357 
358  auto addresses = networkConfig.get_addresses();
359  bool addressListProvided = !addresses.empty();
360  bool awsDiscoveryEnabled = awsConfig.is_enabled();
361  bool cloud_enabled = cloudConfig.enabled;
362 
363  check_discovery_configuration_consistency(
364  addressListProvided, awsDiscoveryEnabled, cloud_enabled);
365 
366  auto connect_timeout = networkConfig.get_connection_timeout();
367  if (cloud_enabled) {
368  auto cloud_provider =
369  std::make_shared<spi::impl::discovery::cloud_discovery>(
370  cloudConfig,
371  client_properties_.get_string(client_properties_.cloud_base_url()),
372  connect_timeout);
373  return std::unique_ptr<connection::AddressProvider>(
374  new spi::impl::discovery::remote_address_provider(
375  [=]() { return cloud_provider->get_addresses(); }, true));
376  }
377 
378  if (awsDiscoveryEnabled) {
379  auto aws_addr_provider = std::make_shared<aws::aws_client>(
380  connect_timeout, awsConfig, client_properties_, *logger_);
381  return std::unique_ptr<connection::AddressProvider>(
382  new spi::impl::discovery::remote_address_provider(
383  [=]() { return aws_addr_provider->get_addresses(); },
384  !awsConfig.is_inside_aws()));
385  }
386 
387  return std::unique_ptr<connection::AddressProvider>(
388  new spi::impl::DefaultAddressProvider(networkConfig));
389 }
390 
391 const std::string&
392 hazelcast_client_instance_impl::get_name() const
393 {
394  return instance_name_;
395 }
396 
397 spi::lifecycle_service&
398 hazelcast_client_instance_impl::get_lifecycle_service()
399 {
400  return lifecycle_service_;
401 }
402 
403 const std::shared_ptr<ClientLockReferenceIdGenerator>&
404 hazelcast_client_instance_impl::get_lock_reference_id_generator() const
405 {
406  return lock_reference_id_generator_;
407 }
408 
409 spi::ProxyManager&
410 hazelcast_client_instance_impl::get_proxy_manager()
411 {
412  return proxy_manager_;
413 }
414 
415 void
416 hazelcast_client_instance_impl::initalize_near_cache_manager()
417 {
418  near_cache_manager_.reset(new internal::nearcache::NearCacheManager(
419  execution_service_, serialization_service_, *logger_));
420 }
421 
422 local_endpoint
423 hazelcast_client_instance_impl::get_local_endpoint() const
424 {
425  return cluster_service_.get_local_client();
426 }
427 
428 template<>
429 boost::shared_future<std::shared_ptr<imap>>
430 hazelcast_client_instance_impl::get_distributed_object(const std::string& name)
431 {
432  auto nearCacheConfig = client_config_.get_near_cache_config(name);
433  if (nearCacheConfig) {
434  return proxy_manager_
435  .get_or_create_proxy<
436  map::NearCachedClientMapProxy<serialization::pimpl::data,
437  serialization::pimpl::data>>(
438  imap::SERVICE_NAME, name)
439  .then(
440  boost::launch::sync,
441  [=](
442  boost::shared_future<std::shared_ptr<
443  map::NearCachedClientMapProxy<serialization::pimpl::data,
444  serialization::pimpl::data>>> f) {
445  return std::static_pointer_cast<imap>(f.get());
446  });
447  } else {
448  return proxy_manager_.get_or_create_proxy<imap>(imap::SERVICE_NAME,
449  name);
450  }
451 }
452 
453 const std::shared_ptr<logger>&
454 hazelcast_client_instance_impl::get_logger() const
455 {
456  return logger_;
457 }
458 
459 boost::uuids::uuid
460 hazelcast_client_instance_impl::random_uuid()
461 {
462  std::lock_guard<std::mutex> g(uuid_generator_lock_);
463  return uuid_generator_();
464 }
465 
466 cp::cp_subsystem&
467 hazelcast_client_instance_impl::get_cp_subsystem()
468 {
469  return cp_subsystem_;
470 }
471 
472 sql::sql_service&
473 hazelcast_client_instance_impl::get_sql()
474 {
475  return sql_service_;
476 }
477 
478 void
479 hazelcast_client_instance_impl::check_discovery_configuration_consistency(
480  bool address_list_provided,
481  bool aws_enabled,
482  bool cloud_enabled)
483 {
484  int count = 0;
485  if (address_list_provided)
486  count++;
487  if (aws_enabled)
488  count++;
489  if (cloud_enabled)
490  count++;
491  if (count > 1) {
492  throw exception::illegal_state(
493  "hazelcast_client_instance_impl::check_discovery_configuration_"
494  "consistency",
495  (boost::format(
496  "Only one discovery method can be enabled at a time. cluster "
497  "members given explicitly : %1%, aws discovery: %2%, "
498  "hazelcast.cloud enabled : %3%") %
499  address_list_provided % aws_enabled % cloud_enabled)
500  .str());
501  }
502 }
503 
504 BaseEventHandler::~BaseEventHandler() = default;
505 
506 BaseEventHandler::BaseEventHandler()
507  : logger_(nullptr)
508 {}
509 
510 void
511 BaseEventHandler::set_logger(logger* lg)
512 {
513  BaseEventHandler::logger_ = lg;
514 }
515 
516 logger*
517 BaseEventHandler::get_logger() const
518 {
519  return logger_;
520 }
521 } // namespace impl
522 
523 constexpr int address::ID;
524 
525 address::address()
526  : host_("localhost")
527  , type_(IPV4)
528  , scope_id_(0)
529 {}
530 
531 address::address(std::string url, int port)
532  : host_(std::move(url))
533  , port_(port)
534  , type_(IPV4)
535  , scope_id_(0)
536 {}
537 
538 address::address(std::string hostname, int port, unsigned long scope_id)
539  : host_(std::move(hostname))
540  , port_(port)
541  , type_(IPV6)
542  , scope_id_(scope_id)
543 {}
544 
545 bool
546 address::operator==(const address& rhs) const
547 {
548  return rhs.port_ == port_ && rhs.type_ == type_ &&
549  0 == rhs.host_.compare(host_);
550 }
551 
552 bool
553 address::operator!=(const address& rhs) const
554 {
555  return !(*this == rhs);
556 }
557 
558 int
560 {
561  return port_;
562 }
563 
564 const std::string&
566 {
567  return host_;
568 }
569 
570 bool
571 address::operator<(const address& rhs) const
572 {
573  if (host_ < rhs.host_) {
574  return true;
575  }
576  if (rhs.host_ < host_) {
577  return false;
578  }
579  if (port_ < rhs.port_) {
580  return true;
581  }
582  if (rhs.port_ < port_) {
583  return false;
584  }
585  return type_ < rhs.type_;
586 }
587 
588 bool
590 {
591  return type_ == IPV4;
592 }
593 
594 unsigned long
595 address::get_scope_id() const
596 {
597  return scope_id_;
598 }
599 
600 std::string
601 address::to_string() const
602 {
603  std::ostringstream out;
604  out << "Address[" << get_host() << ":" << get_port() << "]";
605  return out.str();
606 }
607 
608 std::ostream&
609 operator<<(std::ostream& stream, const address& address)
610 {
611  return stream << address.to_string();
612 }
613 
614 bool
615 operator==(const big_decimal& lhs, const big_decimal& rhs)
616 {
617  return lhs.unscaled == rhs.unscaled && lhs.scale == rhs.scale;
618 }
619 
620 bool
621 operator<(const big_decimal& lhs, const big_decimal& rhs)
622 {
623  if (lhs.scale != rhs.scale) {
624  return lhs.scale < rhs.scale;
625  }
626  return lhs.unscaled < rhs.unscaled;
627 }
628 
629 bool
630 operator==(const local_time& lhs, const local_time& rhs)
631 {
632  return lhs.hours == rhs.hours && lhs.minutes == rhs.minutes &&
633  lhs.seconds == rhs.seconds && lhs.nanos == rhs.nanos;
634 }
635 
640 template<typename T>
641 std::size_t
642 hash_value(const T& v)
643 {
644  return std::hash<T>()(v);
645 }
646 
647 bool
648 operator<(const local_time& lhs, const local_time& rhs)
649 {
650  if (lhs.hours < rhs.hours) {
651  return true;
652  }
653  if (rhs.hours < lhs.hours) {
654  return false;
655  }
656  if (lhs.minutes < rhs.minutes) {
657  return true;
658  }
659  if (rhs.minutes < lhs.minutes) {
660  return false;
661  }
662  if (lhs.seconds < rhs.seconds) {
663  return true;
664  }
665  if (rhs.seconds < lhs.seconds) {
666  return false;
667  }
668  return lhs.nanos < rhs.nanos;
669 }
670 
671 bool
672 operator==(const local_date& lhs, const local_date& rhs)
673 {
674  return lhs.year == rhs.year && lhs.month == rhs.month &&
675  lhs.day_of_month == rhs.day_of_month;
676 }
677 
678 bool
679 operator<(const local_date& lhs, const local_date& rhs)
680 {
681  if (lhs.year < rhs.year) {
682  return true;
683  }
684  if (rhs.year < lhs.year) {
685  return false;
686  }
687  if (lhs.month < rhs.month) {
688  return true;
689  }
690  if (rhs.month < lhs.month) {
691  return false;
692  }
693  return lhs.day_of_month < rhs.day_of_month;
694 }
695 
696 bool
697 operator==(const local_date_time& lhs, const local_date_time& rhs)
698 {
699  return lhs.date == rhs.date && lhs.time == rhs.time;
700 }
701 
702 bool
703 operator<(const local_date_time& lhs, const local_date_time& rhs)
704 {
705  if (lhs.date < rhs.date) {
706  return true;
707  }
708  if (rhs.date < lhs.date) {
709  return false;
710  }
711  return lhs.time < rhs.time;
712 }
713 
714 bool
715 operator==(const offset_date_time& lhs, const offset_date_time& rhs)
716 {
717  return lhs.date_time == rhs.date_time &&
718  lhs.zone_offset_in_seconds == rhs.zone_offset_in_seconds;
719 }
720 
721 bool
722 operator<(const offset_date_time& lhs, const offset_date_time& rhs)
723 {
724  if (lhs.date_time < rhs.date_time) {
725  return true;
726  }
727  if (rhs.date_time < lhs.date_time) {
728  return false;
729  }
730  return lhs.zone_offset_in_seconds < rhs.zone_offset_in_seconds;
731 }
732 
733 namespace pimpl {
734 
735 void
736 twos_complement(std::vector<int8_t>& a)
737 {
738  // twos complement is calculated via flipping the bits and adding 1
739  // flip the bits
740  for (auto& item : a) {
741  item = ~item;
742  }
743  // add 1
744  int8_t carry = 1;
745  for (int i = a.size() - 1; i >= 0; i--) {
746  a[i] = a[i] + carry;
747  if (a[i] == 0) {
748  carry = 1;
749  } else {
750  break;
751  }
752  }
753 }
754 
755 boost::multiprecision::cpp_int
756 from_bytes(std::vector<int8_t> v)
757 {
758  boost::multiprecision::cpp_int i;
759  bool is_negative = v[0] < 0;
760  if (is_negative) {
761  twos_complement(v);
762  }
763  import_bits(i, v.begin(), v.end(), 8);
764  if (is_negative) {
765  return -i;
766  }
767  return i;
768 }
769 
770 std::vector<int8_t>
771 to_bytes(const boost::multiprecision::cpp_int& i)
772 {
773  std::vector<int8_t> v;
774  export_bits(i, std::back_inserter(v), 8);
775  if (i < 0) {
776  twos_complement(v);
777  if (v[0] > 0) {
778  // add -1 as the most significant to have a negative sign bit
779  v.insert(v.begin(), -1);
780  }
781  } else {
782  // add 0 as the most significant byte to have a positive sign bit
783  if (v[0] < 0) {
784  v.insert(v.begin(), 0);
785  }
786  }
787  return v;
788 }
789 } // namespace pimpl
790 
791 namespace serialization {
792 int32_t
793 hz_serializer<address>::get_factory_id()
794 {
795  return F_ID;
796 }
797 
798 int32_t
799 hz_serializer<address>::get_class_id()
800 {
801  return ADDRESS;
802 }
803 
804 void
805 hz_serializer<address>::write_data(const address& object,
806  object_data_output& out)
807 {
808  out.write<int32_t>(object.port_);
809  out.write<byte>(object.type_);
810  out.write(object.host_);
811 }
812 
813 address
814 hz_serializer<address>::read_data(object_data_input& in)
815 {
816  address object;
817  object.port_ = in.read<int32_t>();
818  object.type_ = in.read<byte>();
819  object.host_ = in.read<std::string>();
820  return object;
821 }
822 } // namespace serialization
823 
824 iexecutor_service::iexecutor_service(const std::string& name,
825  spi::ClientContext* context)
826  : ProxyImpl(SERVICE_NAME, name, context)
827  , consecutive_submits_(0)
828  , last_submit_time_(0)
829 {}
830 
831 std::vector<member>
832 iexecutor_service::select_members(const member_selector& member_selector)
833 {
834  std::vector<member> selected;
835  std::vector<member> members =
836  get_context().get_client_cluster_service().get_member_list();
837  for (const member& member : members) {
838  if (member_selector.select(member)) {
839  selected.push_back(member);
840  }
841  }
842  if (selected.empty()) {
843  BOOST_THROW_EXCEPTION(exception::rejected_execution(
844  "IExecutorService::selectMembers",
845  "No member could be selected with member selector"));
846  }
847  return selected;
848 }
849 
850 std::pair<boost::future<protocol::ClientMessage>,
851  std::shared_ptr<spi::impl::ClientInvocation>>
852 iexecutor_service::invoke_on_target(protocol::ClientMessage&& request,
853  boost::uuids::uuid target)
854 {
855  try {
856  std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
857  spi::impl::ClientInvocation::create(
858  get_context(), request, get_name(), target);
859  return std::make_pair(clientInvocation->invoke(), clientInvocation);
860  } catch (exception::iexception&) {
861  util::exception_util::rethrow(std::current_exception());
862  }
863  return std::pair<boost::future<protocol::ClientMessage>,
864  std::shared_ptr<spi::impl::ClientInvocation>>();
865 }
866 
867 std::pair<boost::future<protocol::ClientMessage>,
868  std::shared_ptr<spi::impl::ClientInvocation>>
869 iexecutor_service::invoke_on_partition_owner(protocol::ClientMessage&& request,
870  int partition_id)
871 {
872  try {
873  std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
874  spi::impl::ClientInvocation::create(
875  get_context(), request, get_name(), partition_id);
876  return std::make_pair(clientInvocation->invoke(), clientInvocation);
877  } catch (exception::iexception&) {
878  util::exception_util::rethrow(std::current_exception());
879  }
880  return std::pair<boost::future<protocol::ClientMessage>,
881  std::shared_ptr<spi::impl::ClientInvocation>>();
882 }
883 
884 bool
885 iexecutor_service::is_sync_computation(bool prevent_sync)
886 {
887  int64_t now = util::current_time_millis();
888 
889  int64_t last = last_submit_time_;
890  last_submit_time_ = now;
891 
892  if (last + MIN_TIME_RESOLUTION_OF_CONSECUTIVE_SUBMITS < now) {
893  consecutive_submits_ = 0;
894  return false;
895  }
896 
897  return !prevent_sync &&
898  (consecutive_submits_++ % MAX_CONSECUTIVE_SUBMITS == 0);
899 }
900 
901 address
902 iexecutor_service::get_member_address(const member& member)
903 {
904  auto m =
905  get_context().get_client_cluster_service().get_member(member.get_uuid());
906  if (!m) {
907  throw(exception::exception_builder<exception::hazelcast_>(
908  "IExecutorService::getMemberAddress(Member)")
909  << member << " is not available!")
910  .build();
911  }
912  return m->get_address();
913 }
914 
915 int
916 iexecutor_service::random_partition_id()
917 {
918  auto& partitionService = get_context().get_partition_service();
919  return rand() % partitionService.get_partition_count();
920 }
921 
922 void
924 {
925  auto request = protocol::codec::executorservice_shutdown_encode(get_name());
926  invoke(request);
927 }
928 
929 boost::future<bool>
931 {
932  auto request =
933  protocol::codec::executorservice_isshutdown_encode(get_name());
934  return invoke_and_get_future<bool>(request);
935 }
936 
937 boost::future<bool>
939 {
940  return is_shutdown();
941 }
942 
944  "hazelcast_client_heartbeat_timeout";
945 const std::string client_properties::PROP_HEARTBEAT_TIMEOUT_DEFAULT = "60000";
947  "hazelcast_client_heartbeat_interval";
948 const std::string client_properties::PROP_HEARTBEAT_INTERVAL_DEFAULT = "5000";
950  "hazelcast_client_request_retry_count";
951 const std::string client_properties::PROP_REQUEST_RETRY_COUNT_DEFAULT = "20";
953  "hazelcast_client_request_retry_wait_time";
954 const std::string client_properties::PROP_REQUEST_RETRY_WAIT_TIME_DEFAULT = "1";
955 
956 const std::string client_properties::PROP_AWS_MEMBER_PORT = "hz-port";
957 const std::string client_properties::PROP_AWS_MEMBER_PORT_DEFAULT = "5701";
958 
960  "hazelcast.client.invocation.retry.pause.millis";
961 const std::string client_properties::INVOCATION_RETRY_PAUSE_MILLIS_DEFAULT =
962  "1000";
963 
965  "hazelcast.client.invocation.timeout.seconds";
966 const std::string client_properties::INVOCATION_TIMEOUT_SECONDS_DEFAULT = "120";
967 
968 const std::string client_properties::EVENT_THREAD_COUNT =
969  "hazelcast.client.event.thread.count";
970 const std::string client_properties::EVENT_THREAD_COUNT_DEFAULT = "5";
971 
972 const std::string client_properties::INTERNAL_EXECUTOR_POOL_SIZE =
973  "hazelcast.client.internal.executor.pool.size";
974 const std::string client_properties::INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT = "3";
975 
976 const std::string client_properties::SHUFFLE_MEMBER_LIST =
977  "hazelcast.client.shuffle.member.list";
978 const std::string client_properties::SHUFFLE_MEMBER_LIST_DEFAULT = "true";
979 
981  "hazelcast.client.max.concurrent.invocations";
982 const std::string client_properties::MAX_CONCURRENT_INVOCATIONS_DEFAULT =
983  util::IOUtil::to_string<int32_t>(INT32_MAX);
984 
986  "hazelcast.client.invocation.backoff.timeout.millis";
987 const std::string
988  client_properties::BACKPRESSURE_BACKOFF_TIMEOUT_MILLIS_DEFAULT = "-1";
989 
990 const std::string client_properties::STATISTICS_ENABLED =
991  "hazelcast.client.statistics.enabled";
992 const std::string client_properties::STATISTICS_ENABLED_DEFAULT = "false";
993 
995  "hazelcast.client.statistics.period.seconds";
996 const std::string client_properties::STATISTICS_PERIOD_SECONDS_DEFAULT = "3";
997 
998 client_property::client_property(const std::string& name,
999  const std::string& default_value)
1000  : name_(name)
1001  , default_value_(default_value)
1002 {}
1003 
1004 const std::string&
1005 client_property::get_name() const
1006 {
1007  return name_;
1008 }
1009 
1010 const std::string&
1011 client_property::get_default_value() const
1012 {
1013  return default_value_;
1014 }
1015 
1016 const char*
1018 {
1019 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
1020 #pragma warning(push)
1021 #pragma warning( \
1022  disable : 4996) // for 'getenv': This function or variable may be unsafe.
1023 #endif
1024  return std::getenv(name_.c_str());
1025 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
1026 #pragma warning(pop)
1027 #endif
1028 }
1029 
1030 client_properties::client_properties(
1031  const std::unordered_map<std::string, std::string>& properties)
1032  : heartbeat_timeout_(PROP_HEARTBEAT_TIMEOUT, PROP_HEARTBEAT_TIMEOUT_DEFAULT)
1033  , heartbeat_interval_(PROP_HEARTBEAT_INTERVAL,
1034  PROP_HEARTBEAT_INTERVAL_DEFAULT)
1035  , retry_count_(PROP_REQUEST_RETRY_COUNT, PROP_REQUEST_RETRY_COUNT_DEFAULT)
1036  , retry_wait_time_(PROP_REQUEST_RETRY_WAIT_TIME,
1037  PROP_REQUEST_RETRY_WAIT_TIME_DEFAULT)
1038  , aws_member_port_(PROP_AWS_MEMBER_PORT, PROP_AWS_MEMBER_PORT_DEFAULT)
1039  , invocation_retry_pause_millis_(INVOCATION_RETRY_PAUSE_MILLIS,
1040  INVOCATION_RETRY_PAUSE_MILLIS_DEFAULT)
1041  , invocation_timeout_seconds_(INVOCATION_TIMEOUT_SECONDS,
1042  INVOCATION_TIMEOUT_SECONDS_DEFAULT)
1043  , event_thread_count_(EVENT_THREAD_COUNT, EVENT_THREAD_COUNT_DEFAULT)
1044  , internal_executor_pool_size_(INTERNAL_EXECUTOR_POOL_SIZE,
1045  INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT)
1046  , shuffle_member_list_(SHUFFLE_MEMBER_LIST, SHUFFLE_MEMBER_LIST_DEFAULT)
1047  , max_concurrent_invocations_(MAX_CONCURRENT_INVOCATIONS,
1048  MAX_CONCURRENT_INVOCATIONS_DEFAULT)
1049  , backpressure_backoff_timeout_millis_(
1050  BACKPRESSURE_BACKOFF_TIMEOUT_MILLIS,
1051  BACKPRESSURE_BACKOFF_TIMEOUT_MILLIS_DEFAULT)
1052  , statistics_enabled_(STATISTICS_ENABLED, STATISTICS_ENABLED_DEFAULT)
1053  , statistics_period_seconds_(STATISTICS_PERIOD_SECONDS,
1054  STATISTICS_PERIOD_SECONDS_DEFAULT)
1055  , backup_timeout_millis_(OPERATION_BACKUP_TIMEOUT_MILLIS,
1056  OPERATION_BACKUP_TIMEOUT_MILLIS_DEFAULT)
1057  , fail_on_indeterminate_state_(FAIL_ON_INDETERMINATE_OPERATION_STATE,
1058  FAIL_ON_INDETERMINATE_OPERATION_STATE_DEFAULT)
1059  , cloud_base_url_(CLOUD_URL_BASE, CLOUD_URL_BASE_DEFAULT)
1060  , properties_map_(properties)
1061 {}
1062 
1063 const client_property&
1064 client_properties::get_heartbeat_timeout() const
1065 {
1066  return heartbeat_timeout_;
1067 }
1068 
1069 const client_property&
1070 client_properties::get_heartbeat_interval() const
1071 {
1072  return heartbeat_interval_;
1073 }
1074 
1075 const client_property&
1076 client_properties::get_aws_member_port() const
1077 {
1078  return aws_member_port_;
1079 }
1080 
1081 const client_property&
1082 client_properties::get_invocation_retry_pause_millis() const
1083 {
1084  return invocation_retry_pause_millis_;
1085 }
1086 
1087 const client_property&
1088 client_properties::get_invocation_timeout_seconds() const
1089 {
1090  return invocation_timeout_seconds_;
1091 }
1092 
1093 const client_property&
1094 client_properties::get_event_thread_count() const
1095 {
1096  return event_thread_count_;
1097 }
1098 
1099 const client_property&
1100 client_properties::get_internal_executor_pool_size() const
1101 {
1102  return internal_executor_pool_size_;
1103 }
1104 
1105 const client_property&
1106 client_properties::get_shuffle_member_list() const
1107 {
1108  return shuffle_member_list_;
1109 }
1110 
1111 const client_property&
1112 client_properties::get_max_concurrent_invocations() const
1113 {
1114  return max_concurrent_invocations_;
1115 }
1116 
1117 const client_property&
1118 client_properties::get_backpressure_backoff_timeout_millis() const
1119 {
1120  return backpressure_backoff_timeout_millis_;
1121 }
1122 
1123 const client_property&
1124 client_properties::get_statistics_enabled() const
1125 {
1126  return statistics_enabled_;
1127 }
1128 
1129 const client_property&
1130 client_properties::get_statistics_period_seconds() const
1131 {
1132  return statistics_period_seconds_;
1133 }
1134 
1135 std::string
1137 {
1138  std::unordered_map<std::string, std::string>::const_iterator valueIt =
1139  properties_map_.find(property.get_name());
1140  if (valueIt != properties_map_.end()) {
1141  return valueIt->second;
1142  }
1143 
1144  const char* value = property.get_system_property();
1145  if (value != NULL) {
1146  return value;
1147  }
1148 
1149  return property.get_default_value();
1150 }
1151 
1152 bool
1154 {
1155  return util::IOUtil::to_value<bool>(get_string(property));
1156 }
1157 
1158 int32_t
1160 {
1161  return util::IOUtil::to_value<int32_t>(get_string(property));
1162 }
1163 
1164 int64_t
1166 {
1167  return util::IOUtil::to_value<int64_t>(get_string(property));
1168 }
1169 
1170 const client_property&
1171 client_properties::backup_timeout_millis() const
1172 {
1173  return backup_timeout_millis_;
1174 }
1175 
1176 const client_property&
1177 client_properties::fail_on_indeterminate_state() const
1178 {
1179  return fail_on_indeterminate_state_;
1180 }
1181 
1182 const client_property&
1183 client_properties::cloud_base_url() const
1184 {
1185  return cloud_base_url_;
1186 }
1187 
1188 namespace exception {
1189 iexception::iexception(std::string exception_name,
1190  std::string source,
1191  std::string message,
1192  std::string details,
1193  int32_t error_no,
1194  std::exception_ptr cause,
1195  bool is_runtime,
1196  bool retryable)
1197  : src_(std::move(source))
1198  , msg_(std::move(message))
1199  , details_(std::move(details))
1200  , error_code_(error_no)
1201  , cause_(std::move(cause))
1202  , runtime_exception_(is_runtime)
1203  , retryable_(retryable)
1204  , report_((boost::format("%1% {%2%. Error code:%3%, Details:%4%.} at %5%.") %
1205  exception_name % msg_ % error_code_ % details_ % src_)
1206  .str())
1207 {}
1208 
1209 iexception::~iexception() noexcept = default;
1210 
1211 char const*
1212 iexception::what() const noexcept
1213 {
1214  return report_.c_str();
1215 }
1216 
1217 const std::string&
1218 iexception::get_source() const
1219 {
1220  return src_;
1221 }
1222 
1223 const std::string&
1224 iexception::get_message() const
1225 {
1226  return msg_;
1227 }
1228 
1229 std::ostream&
1230 operator<<(std::ostream& os, const iexception& exception)
1231 {
1232  os << exception.what();
1233  return os;
1234 }
1235 
1236 const std::string&
1237 iexception::get_details() const
1238 {
1239  return details_;
1240 }
1241 
1242 int32_t
1243 iexception::get_error_code() const
1244 {
1245  return error_code_;
1246 }
1247 
1248 bool
1249 iexception::is_runtime() const
1250 {
1251  return runtime_exception_;
1252 }
1253 
1254 bool
1255 iexception::is_retryable() const
1256 {
1257  return retryable_;
1258 }
1259 
1260 iexception::iexception() = default;
1261 
1262 retryable_hazelcast::retryable_hazelcast(std::string source,
1263  std::string message,
1264  std::string details,
1265  std::exception_ptr cause)
1266  : retryable_hazelcast("retryable_hazelcast",
1267  protocol::RETRYABLE_HAZELCAST,
1268  std::move(source),
1269  std::move(message),
1270  std::move(details),
1271  std::move(cause),
1272  true,
1273  true)
1274 {}
1275 
1276 retryable_hazelcast::retryable_hazelcast(std::string error_name,
1277  int32_t error_code,
1278  std::string source,
1279  std::string message,
1280  std::string details,
1281  std::exception_ptr cause,
1282  bool runtime,
1283  bool retryable)
1284  : hazelcast_(std::move(error_name),
1285  error_code,
1286  std::move(source),
1287  std::move(message),
1288  std::move(details),
1289  std::move(cause),
1290  runtime,
1291  retryable)
1292 {}
1293 
1294 member_left::member_left(std::string source,
1295  std::string message,
1296  std::string details,
1297  std::exception_ptr cause)
1298  : execution("member_left",
1299  protocol::MEMBER_LEFT,
1300  std::move(source),
1301  std::move(message),
1302  std::move(details),
1303  std::move(cause),
1304  false,
1305  true)
1306 {}
1307 
1308 consistency_lost::consistency_lost(std::string source,
1309  std::string message,
1310  std::string details,
1311  std::exception_ptr cause)
1312  : hazelcast_("consistency_lost",
1313  protocol::CONSISTENCY_LOST_EXCEPTION,
1314  std::move(source),
1315  std::move(message),
1316  std::move(details),
1317  std::move(cause),
1318  true,
1319  false)
1320 {}
1321 
1322 query::query(std::string source,
1323  std::string message,
1324  std::string details,
1325  std::exception_ptr cause)
1326  : hazelcast_(std::move(source),
1327  std::move(message),
1328  std::move(details),
1329  std::move(cause),
1330  false)
1331 {
1332 }
1333 
1334 query::query(int32_t code,
1335  std::string message,
1336  std::exception_ptr cause,
1337  boost::uuids::uuid originating_member_id,
1338  std::string suggestion)
1339  : hazelcast_("", std::move(message), "", std::move(cause))
1340  , code_(code)
1341  , suggestion_(std::move(suggestion))
1342  , originating_member_uuid_(originating_member_id)
1343 {
1344 }
1345 
1346 int32_t
1347 query::code() const
1348 {
1349  return code_;
1350 }
1351 
1352 const std::string&
1353 query::suggestion() const
1354 {
1355  return suggestion_;
1356 }
1357 
1358 const boost::uuids::uuid&
1359 query::originating_member_uuid() const
1360 {
1361  return originating_member_uuid_;
1362 }
1363 
1364 } // namespace exception
1365 } // namespace client
1366 
1367 boost::future<client::hazelcast_client>
1368 new_client()
1369 {
1370  return boost::async([]() { return client::hazelcast_client(); });
1371 }
1372 
1373 boost::future<client::hazelcast_client>
1374 new_client(client::client_config config)
1375 {
1376  return boost::async(
1377  [](client::client_config&& c) {
1378  return client::hazelcast_client(std::move(c));
1379  },
1380  std::move(config));
1381 }
1382 } // namespace hazelcast
1383 
1384 namespace std {
1385 std::size_t
1386 hash<hazelcast::client::address>::operator()(
1387  const hazelcast::client::address& address) const noexcept
1388 {
1389  std::size_t seed = 0;
1390  boost::hash_combine(seed, address.get_host());
1391  boost::hash_combine(seed, address.get_port());
1392  boost::hash_combine(seed, address.type_);
1393  return seed;
1394 }
1395 
1396 std::size_t
1397 hash<hazelcast::client::big_decimal>::operator()(
1398  const hazelcast::client::big_decimal& dec) const
1399 {
1400  std::size_t seed = 0;
1401  boost::hash_combine(seed, dec.unscaled);
1402  boost::hash_combine(seed, dec.scale);
1403  return seed;
1404 }
1405 
1406 std::size_t
1407 hash<hazelcast::client::local_time>::operator()(
1408  const hazelcast::client::local_time& v) const
1409 {
1410  std::size_t seed = 0;
1411  boost::hash_combine(seed, v.hours);
1412  boost::hash_combine(seed, v.minutes);
1413  boost::hash_combine(seed, v.seconds);
1414  boost::hash_combine(seed, v.nanos);
1415  return seed;
1416 }
1417 
1418 std::size_t
1419 hash<hazelcast::client::local_date>::operator()(
1420  const hazelcast::client::local_date& v) const
1421 {
1422  std::size_t seed = 0;
1423  boost::hash_combine(seed, v.year);
1424  boost::hash_combine(seed, v.month);
1425  boost::hash_combine(seed, v.day_of_month);
1426  return seed;
1427 }
1428 
1429 std::size_t
1430 hash<hazelcast::client::local_date_time>::operator()(
1431  const hazelcast::client::local_date_time& v) const
1432 {
1433  std::size_t seed = 0;
1434  boost::hash_combine<hazelcast::client::local_date>(seed, v.date);
1435  boost::hash_combine<hazelcast::client::local_time>(seed, v.time);
1436  return seed;
1437 }
1438 
1439 std::size_t
1440 hash<hazelcast::client::offset_date_time>::operator()(
1441  const hazelcast::client::offset_date_time& v) const
1442 {
1443  std::size_t seed = 0;
1444  boost::hash_combine<hazelcast::client::local_date_time>(seed, v.date_time);
1445  boost::hash_combine(seed, v.zone_offset_in_seconds);
1446  return seed;
1447 }
1448 } // namespace std
Represents an address of a client or member in the cluster.
Definition: address.h:37
const std::string & get_host() const
bool operator==(const address &address) const
bool operator!=(const address &address) const
hazelcast_client configuration class.
static const std::string PROP_HEARTBEAT_INTERVAL
Time interval in milliseconds between the heartbeats sent by the client to the nodes.
static const std::string PROP_REQUEST_RETRY_COUNT
Client will retry requests which either inherently retryable(idempotent client) or client_network_con...
static const std::string PROP_REQUEST_RETRY_WAIT_TIME
Client will retry requests which either inherently retryable(idempotent client) or client_network_con...
static const std::string MAX_CONCURRENT_INVOCATIONS
The maximum number of concurrent invocations allowed.
static const std::string SHUFFLE_MEMBER_LIST
Client shuffles the given member list to prevent all clients to connect to the same node when this pr...
static const std::string STATISTICS_PERIOD_SECONDS
The period in seconds the statistics sent to the cluster.
static const std::string EVENT_THREAD_COUNT
Number of the threads to handle the incoming event packets.
static const std::string STATISTICS_ENABLED
Use to enable the client statistics collection.
static const std::string INVOCATION_RETRY_PAUSE_MILLIS
Pause time between each retry cycle of an invocation in milliseconds.
std::string get_string(const client_property &property) const
Returns the configured value of a ClientProperty as std::string.
int32_t get_integer(const client_property &property) const
Returns the configured int32_t value of a ClientProperty.
static const std::string PROP_AWS_MEMBER_PORT
The discovery mechanism will discover only IP addresses.
static const std::string PROP_HEARTBEAT_TIMEOUT
Client will be sending heartbeat messages to members and this is the timeout.
static const std::string INVOCATION_TIMEOUT_SECONDS
When an invocation gets an exception because :
bool get_boolean(const client_property &property) const
Returns the configured boolean value of a ClientProperty.
int64_t get_long(const client_property &property) const
Returns the configured int64_t value of a ClientProperty.
static const std::string BACKPRESSURE_BACKOFF_TIMEOUT_MILLIS
Control the maximum timeout in millis to wait for an invocation space to be available.
A client property is a configuration for hazelcast client.
const char * get_system_property() const
Gets the system environment property value of the property.
Hazelcast cluster interface.
Definition: cluster.h:37
Base class for all exception originated from Hazelcast methods.
Definition: iexception.h:49
boost::future< bool > is_shutdown()
Returns.
boost::future< bool > is_terminated()
Returns.
void shutdown()
Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will...
Listener object for listening lifecycle events of hazelcast instance.
The Client interface allows to get information about a connected client's socket address,...
A service to execute SQL statements.
Definition: sql_service.h:89
Provides a context to do transactional operations; so beginning/committing transactions,...
Contains the configuration for a Hazelcast transaction.
CP Subsystem is a component of Hazelcast that builds a strongly consistent layer for a set of distrib...
Definition: cp.h:1407
An arbitrary precision and scale floating point number.
Definition: big_decimal.h:44
A date-time without a time-zone in the ISO-8601 calendar system, such as.
A date without a time-zone in the ISO-8601 calendar system, such as.
Definition: local_date.h:53
uint8_t day_of_month
minimum value is 1 maximum value is 31
Definition: local_date.h:68
int32_t year
minimum value is -999999999 maximum value is 999999999
Definition: local_date.h:58
uint8_t month
minimum value is 1 maximum value is 12
Definition: local_date.h:63
A time without a time-zone in the ISO-8601 calendar system, such as.
Definition: local_time.h:53
uint8_t hours
the hour-of-day to represent, from 0 to 23
Definition: local_time.h:57
uint8_t seconds
the second-of-minute to represent, from 0 to 59
Definition: local_time.h:65
int32_t nanos
the nanosecond-of-second to represent, from 0 to 999,999,999
Definition: local_time.h:69
uint8_t minutes
the minute-of-hour to represent, from 0 to 59
Definition: local_time.h:61
A date-time with an offset from UTC/Greenwich in the ISO-8601 calendar system, such as.
int32_t zone_offset_in_seconds
The offset from UTC/Greenwich.
local_date_time date_time
The local date-time.