Hazelcast C++ Client
Hazelcast C++ Client Library
spi.cpp
1 /*
2  * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <utility>
18 
19 #include <boost/uuid/uuid_hash.hpp>
20 #include <boost/functional/hash.hpp>
21 #include <boost/property_tree/ptree.hpp>
22 #include <boost/property_tree/json_parser.hpp>
23 
24 #include "hazelcast/client/hazelcast_client.h"
25 #include <hazelcast/client/protocol/codec/ErrorCodec.h>
26 #include <hazelcast/client/spi/impl/ListenerMessageCodec.h>
27 #include <hazelcast/client/spi/impl/ClientClusterServiceImpl.h>
28 #include <hazelcast/client/spi/impl/listener/cluster_view_listener.h>
29 #include <hazelcast/client/spi/impl/listener/listener_service_impl.h>
30 #include <hazelcast/client/spi/impl/discovery/remote_address_provider.h>
31 #include <hazelcast/client/spi/impl/discovery/cloud_discovery.h>
32 #include <hazelcast/util/AddressUtil.h>
33 #include "hazelcast/client/member_selectors.h"
34 #include "hazelcast/client/lifecycle_event.h"
35 #include "hazelcast/client/initial_membership_event.h"
36 #include "hazelcast/client/membership_event.h"
37 #include "hazelcast/client/lifecycle_listener.h"
38 #include "hazelcast/client/spi/ProxyManager.h"
39 #include "hazelcast/client/spi/ClientProxy.h"
40 #include "hazelcast/client/spi/ClientContext.h"
41 #include "hazelcast/client/spi/impl/ClientInvocation.h"
42 #include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
43 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
44 #include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
45 #include "hazelcast/client/spi/impl/DefaultAddressProvider.h"
46 #include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithBackpressure.h"
47 #include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithoutBackpressure.h"
48 #include "hazelcast/client/spi/impl/sequence/FailFastCallIdSequence.h"
49 #include "hazelcast/client/spi/impl/sequence/CallIdFactory.h"
50 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
51 #include "hazelcast/util/AddressHelper.h"
52 #include "hazelcast/util/HashUtil.h"
53 #include "hazelcast/util/concurrent/BackoffIdleStrategy.h"
54 #ifdef HZ_BUILD_WITH_SSL
55 #include <hazelcast/util/SyncHttpsClient.h>
56 #endif //HZ_BUILD_WITH_SSL
57 
58 namespace hazelcast {
59  namespace client {
60  const std::unordered_set<member> &initial_membership_event::get_members() const {
61  return members_;
62  }
63 
65  return cluster_;
66  }
67 
68  initial_membership_event::initial_membership_event(cluster &cluster, std::unordered_set<member> members) : cluster_(
69  cluster), members_(std::move(members)) {
70  }
71 
73  : state_(state) {
74  }
75 
77  return state_;
78  }
79 
80  namespace spi {
81  ProxyManager::ProxyManager(ClientContext &context) : client_(context) {
82  }
83 
84  void ProxyManager::init() {
85  }
86 
87  void ProxyManager::destroy() {
88  std::lock_guard<std::recursive_mutex> guard(lock_);
89  for (auto &p : proxies_) {
90  try {
91  auto proxy = p.second.get();
92  p.second.get()->on_shutdown();
93  } catch (std::exception &se) {
94  auto &lg = client_.get_logger();
95  HZ_LOG(lg, finest,
96  boost::str(boost::format("Proxy was not created, "
97  "hence onShutdown can be called. Exception: %1%")
98  % se.what())
99  );
100  }
101  }
102  proxies_.clear();
103  }
104 
105  boost::future<void> ProxyManager::initialize(const std::shared_ptr<ClientProxy> &client_proxy) {
106  auto clientMessage = protocol::codec::client_createproxy_encode(client_proxy->get_name(),
107  client_proxy->get_service_name());
108  return spi::impl::ClientInvocation::create(client_, clientMessage,
109  client_proxy->get_service_name())->invoke().then(
110  boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
111  f.get();
112  client_proxy->on_initialize();
113  });
114  }
115 
116  boost::future<void> ProxyManager::destroy_proxy(ClientProxy &proxy) {
117  DefaultObjectNamespace objectNamespace(proxy.get_service_name(), proxy.get_name());
118  std::shared_ptr<ClientProxy> registeredProxy;
119  {
120  std::lock_guard<std::recursive_mutex> guard(lock_);
121  auto it = proxies_.find(objectNamespace);
122  registeredProxy = it == proxies_.end() ? nullptr : it->second.get();
123  if (it != proxies_.end()) {
124  proxies_.erase(it);
125  }
126  }
127 
128  try {
129  if (registeredProxy) {
130  try {
131  proxy.destroy_locally();
132  return proxy.destroy_remotely();
133  } catch (exception::iexception &) {
134  proxy.destroy_remotely();
135  throw;
136  }
137  }
138  if (&proxy != registeredProxy.get()) {
139  // The given proxy is stale and was already destroyed, but the caller
140  // may have allocated local resources in the context of this stale proxy
141  // instance after it was destroyed, so we have to cleanup it locally one
142  // more time to make sure there are no leaking local resources.
143  proxy.destroy_locally();
144  }
145  } catch (...) {
146  if (&proxy != registeredProxy.get()) {
147  // The given proxy is stale and was already destroyed, but the caller
148  // may have allocated local resources in the context of this stale proxy
149  // instance after it was destroyed, so we have to cleanup it locally one
150  // more time to make sure there are no leaking local resources.
151  proxy.destroy_locally();
152  }
153  throw;
154  }
155  return boost::make_ready_future();
156  }
157 
158  ClientContext::ClientContext(const client::hazelcast_client &hazelcast_client) : hazelcast_client_(
159  *hazelcast_client.client_impl_) {
160  }
161 
162  ClientContext::ClientContext(client::impl::hazelcast_client_instance_impl &hazelcast_client)
163  : hazelcast_client_(hazelcast_client) {
164  }
165 
166  serialization::pimpl::SerializationService &ClientContext::get_serialization_service() {
167  return hazelcast_client_.serialization_service_;
168  }
169 
170  impl::ClientClusterServiceImpl & ClientContext::get_client_cluster_service() {
171  return hazelcast_client_.cluster_service_;
172  }
173 
174  impl::ClientInvocationServiceImpl &ClientContext::get_invocation_service() {
175  return *hazelcast_client_.invocation_service_;
176  }
177 
178  client_config &ClientContext::get_client_config() {
179  return hazelcast_client_.client_config_;
180  }
181 
182  impl::ClientPartitionServiceImpl & ClientContext::get_partition_service() {
183  return *hazelcast_client_.partition_service_;
184  }
185 
186  lifecycle_service &ClientContext::get_lifecycle_service() {
187  return hazelcast_client_.lifecycle_service_;
188  }
189 
190  spi::impl::listener::listener_service_impl &ClientContext::get_client_listener_service() {
191  return *hazelcast_client_.listener_service_;
192  }
193 
194  connection::ClientConnectionManagerImpl &ClientContext::get_connection_manager() {
195  return *hazelcast_client_.connection_manager_;
196  }
197 
198  internal::nearcache::NearCacheManager &ClientContext::get_near_cache_manager() {
199  return *hazelcast_client_.near_cache_manager_;
200  }
201 
202  client_properties &ClientContext::get_client_properties() {
203  return hazelcast_client_.client_properties_;
204  }
205 
206  cluster &ClientContext::get_cluster() {
207  return hazelcast_client_.cluster_;
208  }
209 
210  std::shared_ptr<impl::sequence::CallIdSequence> &ClientContext::get_call_id_sequence() const {
211  return hazelcast_client_.call_id_sequence_;
212  }
213 
214  const protocol::ClientExceptionFactory &ClientContext::get_client_exception_factory() const {
215  return hazelcast_client_.get_exception_factory();
216  }
217 
218  const std::string &ClientContext::get_name() const {
219  return hazelcast_client_.get_name();
220  }
221 
222  impl::ClientExecutionServiceImpl &ClientContext::get_client_execution_service() const {
223  return *hazelcast_client_.execution_service_;
224  }
225 
226  const std::shared_ptr<client::impl::ClientLockReferenceIdGenerator> &
227  ClientContext::get_lock_reference_id_generator() {
228  return hazelcast_client_.get_lock_reference_id_generator();
229  }
230 
231  std::shared_ptr<client::impl::hazelcast_client_instance_impl>
232  ClientContext::get_hazelcast_client_implementation() {
233  return hazelcast_client_.shared_from_this();
234  }
235 
236  spi::ProxyManager &ClientContext::get_proxy_manager() {
237  return hazelcast_client_.get_proxy_manager();
238  }
239 
240  logger &ClientContext::get_logger() {
241  return *hazelcast_client_.logger_;
242  }
243 
244  client::impl::statistics::Statistics &ClientContext::get_clientstatistics() {
245  return *hazelcast_client_.statistics_;
246  }
247 
248  spi::impl::listener::cluster_view_listener &ClientContext::get_cluster_view_listener() {
249  return *hazelcast_client_.cluster_listener_;
250  }
251 
252  boost::uuids::uuid ClientContext::random_uuid() {
253  return hazelcast_client_.random_uuid();
254  }
255 
256  cp::internal::session::proxy_session_manager &ClientContext::get_proxy_session_manager() {
257  return hazelcast_client_.proxy_session_manager_;
258  }
259 
260  lifecycle_service::lifecycle_service(ClientContext &client_context,
261  const std::vector<lifecycle_listener> &listeners) :
262  client_context_(client_context), listeners_(),
263  shutdown_completed_latch_(1) {
264  for (const auto &listener: listeners) {
265  add_listener(lifecycle_listener(listener));
266  }
267  }
268 
269  bool lifecycle_service::start() {
270  bool expected = false;
271  if (!active_.compare_exchange_strong(expected, true)) {
272  return false;
273  }
274 
275  fire_lifecycle_event(lifecycle_event::STARTED);
276 
277  client_context_.get_client_execution_service().start();
278 
279  client_context_.get_client_listener_service().start();
280 
281  client_context_.get_invocation_service().start();
282 
283  client_context_.get_client_cluster_service().start();
284 
285  client_context_.get_cluster_view_listener().start();
286 
287  if (!client_context_.get_connection_manager().start()) {
288  return false;
289  }
290 
291  auto &connectionStrategyConfig = client_context_.get_client_config().get_connection_strategy_config();
292  if (!connectionStrategyConfig.is_async_start()) {
293  // The client needs to open connections to all members before any services requiring internal listeners start
294  wait_for_initial_membership_event();
295  client_context_.get_connection_manager().connect_to_all_cluster_members();
296  }
297 
298  client_context_.get_invocation_service().add_backup_listener();
299 
300  client_context_.get_clientstatistics().start();
301 
302  return true;
303  }
304 
305  void lifecycle_service::shutdown() {
306  bool expected = true;
307  if (!active_.compare_exchange_strong(expected, false)) {
308  shutdown_completed_latch_.wait();
309  return;
310  }
311  try {
312  fire_lifecycle_event(lifecycle_event::SHUTTING_DOWN);
313  client_context_.get_proxy_session_manager().shutdown();
314  client_context_.get_clientstatistics().shutdown();
315  client_context_.get_proxy_manager().destroy();
316  client_context_.get_connection_manager().shutdown();
317  client_context_.get_client_cluster_service().shutdown();
318  client_context_.get_invocation_service().shutdown();
319  client_context_.get_client_listener_service().shutdown();
320  client_context_.get_near_cache_manager().destroy_all_near_caches();
321  fire_lifecycle_event(lifecycle_event::SHUTDOWN);
322  client_context_.get_client_execution_service().shutdown();
323  client_context_.get_serialization_service().dispose();
324  shutdown_completed_latch_.count_down();
325  } catch (std::exception &e) {
326  HZ_LOG(client_context_.get_logger(), info,
327  boost::str(boost::format("An exception occured during LifecycleService shutdown. %1%")
328  % e.what())
329  );
330  shutdown_completed_latch_.count_down();
331  }
332  }
333 
334  boost::uuids::uuid lifecycle_service::add_listener(lifecycle_listener &&lifecycle_listener) {
335  std::lock_guard<std::mutex> lg(listener_lock_);
336  const auto id = uuid_generator_();
337  listeners_.emplace(id, std::move(lifecycle_listener));
338  return id;
339  }
340 
341  bool lifecycle_service::remove_listener(const boost::uuids::uuid &registration_id) {
342  std::lock_guard<std::mutex> guard(listener_lock_);
343  return listeners_.erase(registration_id) == 1;
344  }
345 
346  void lifecycle_service::fire_lifecycle_event(const lifecycle_event &lifecycle_event) {
347  std::lock_guard<std::mutex> guard(listener_lock_);
348  logger &lg = client_context_.get_logger();
349 
350  std::function<void(lifecycle_listener &)> fire_one;
351 
352  switch (lifecycle_event.get_state()) {
353  case lifecycle_event::STARTING : {
354  // convert the date string from "2016-04-20" to 20160420
355  std::string date(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_DATE));
356  util::git_date_to_hazelcast_log_date(date);
357  std::string commitId(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_ID));
358  commitId.erase(std::remove(commitId.begin(), commitId.end(), '"'), commitId.end());
359 
360  HZ_LOG(lg, info,
361  (boost::format("(%1%:%2%) LifecycleService::LifecycleEvent Client (%3%) is STARTING") %
362  date % commitId % client_context_.get_connection_manager().get_client_uuid()).str());
363  char msg[100];
364  util::hz_snprintf(msg, 100, "(%s:%s) LifecycleService::LifecycleEvent STARTING", date.c_str(),
365  commitId.c_str());
366  HZ_LOG(lg, info, msg);
367 
368  fire_one = [](lifecycle_listener &listener) {
369  listener.starting_();
370  };
371  break;
372  }
373  case lifecycle_event::STARTED : {
374  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent STARTED");
375 
376  fire_one = [](lifecycle_listener &listener) {
377  listener.started_();
378  };
379  break;
380  }
381  case lifecycle_event::SHUTTING_DOWN : {
382  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent SHUTTING_DOWN");
383 
384  fire_one = [](lifecycle_listener &listener) {
385  listener.shutting_down_();
386  };
387  break;
388  }
389  case lifecycle_event::SHUTDOWN : {
390  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent SHUTDOWN");
391 
392  fire_one = [](lifecycle_listener &listener) {
393  listener.shutdown_();
394  };
395  break;
396  }
397  case lifecycle_event::CLIENT_CONNECTED : {
398  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent CLIENT_CONNECTED");
399 
400  fire_one = [](lifecycle_listener &listener) {
401  listener.connected_();
402  };
403  break;
404  }
405  case lifecycle_event::CLIENT_DISCONNECTED : {
406  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent CLIENT_DISCONNECTED");
407 
408  fire_one = [](lifecycle_listener &listener) {
409  listener.disconnected_();
410  };
411  break;
412  }
413  }
414 
415  for (auto &item: listeners_) {
416  fire_one(item.second);
417  }
418  }
419 
420  bool lifecycle_service::is_running() {
421  return active_;
422  }
423 
424  lifecycle_service::~lifecycle_service() {
425  if (active_) {
426  shutdown();
427  }
428  }
429 
430  void lifecycle_service::wait_for_initial_membership_event() const {
431  client_context_.get_client_cluster_service().wait_initial_member_list_fetched();
432  }
433 
434  DefaultObjectNamespace::DefaultObjectNamespace(const std::string &service, const std::string &object)
435  : service_name_(service), object_name_(object) {
436 
437  }
438 
439  const std::string &DefaultObjectNamespace::get_service_name() const {
440  return service_name_;
441  }
442 
443  const std::string &DefaultObjectNamespace::get_object_name() const {
444  return object_name_;
445  }
446 
447  bool DefaultObjectNamespace::operator==(const DefaultObjectNamespace &rhs) const {
448  return service_name_ == rhs.service_name_ && object_name_ == rhs.object_name_;
449  }
450 
451  ClientProxy::ClientProxy(const std::string &name, const std::string &service_name, ClientContext &context)
452  : name_(name), service_name_(service_name), context_(context) {}
453 
454  ClientProxy::~ClientProxy() = default;
455 
456  const std::string &ClientProxy::get_name() const {
457  return name_;
458  }
459 
460  const std::string &ClientProxy::get_service_name() const {
461  return service_name_;
462  }
463 
464  ClientContext &ClientProxy::get_context() {
465  return context_;
466  }
467 
468  void ClientProxy::on_destroy() {
469  }
470 
471  boost::future<void> ClientProxy::destroy() {
472  return get_context().get_proxy_manager().destroy_proxy(*this);
473  }
474 
475  void ClientProxy::destroy_locally() {
476  if (pre_destroy()) {
477  try {
478  on_destroy();
479  post_destroy();
480  } catch (exception::iexception &) {
481  post_destroy();
482  throw;
483  }
484  }
485  }
486 
487  bool ClientProxy::pre_destroy() {
488  return true;
489  }
490 
491  void ClientProxy::post_destroy() {
492  }
493 
494  void ClientProxy::on_initialize() {
495  }
496 
497  void ClientProxy::on_shutdown() {
498  }
499 
500  serialization::pimpl::SerializationService &ClientProxy::get_serialization_service() {
501  return context_.get_serialization_service();
502  }
503 
504  boost::future<void> ClientProxy::destroy_remotely() {
505  auto clientMessage = protocol::codec::client_destroyproxy_encode(
506  get_name(), get_service_name());
507  return spi::impl::ClientInvocation::create(get_context(), std::make_shared<protocol::ClientMessage>(
508  std::move(clientMessage)), get_name())->invoke().then(
509  boost::launch::sync, [](boost::future<protocol::ClientMessage> f) { f.get(); });
510  }
511 
512  boost::future<boost::uuids::uuid>
513  ClientProxy::register_listener(std::shared_ptr<impl::ListenerMessageCodec> listener_message_codec,
514  std::shared_ptr<client::impl::BaseEventHandler> handler) {
515  handler->set_logger(&get_context().get_logger());
516  return get_context().get_client_listener_service().register_listener(listener_message_codec,
517  handler);
518  }
519 
520  boost::future<bool> ClientProxy::deregister_listener(boost::uuids::uuid registration_id) {
521  return get_context().get_client_listener_service().deregister_listener(registration_id);
522  }
523 
524  namespace impl {
525  boost::uuids::uuid
526  ListenerMessageCodec::decode_add_response(protocol::ClientMessage &msg) const {
527  return msg.get_first_uuid();
528  }
529 
530  bool ListenerMessageCodec::decode_remove_response(protocol::ClientMessage &msg) const {
531  return msg.get_first_fixed_sized_field<bool>();
532  }
533 
534  ClientInvocationServiceImpl::ClientInvocationServiceImpl(ClientContext &client)
535  : client_(client), logger_(client.get_logger()),
536  invocation_timeout_(std::chrono::seconds(client.get_client_properties().get_integer(
537  client.get_client_properties().get_invocation_timeout_seconds()))),
538  invocation_retry_pause_(std::chrono::milliseconds(client.get_client_properties().get_long(
539  client.get_client_properties().get_invocation_retry_pause_millis()))),
540  smart_routing_(client.get_client_config().get_network_config().is_smart_routing()),
541  backup_acks_enabled_(smart_routing_ && client.get_client_config().backup_acks_enabled()),
542  fail_on_indeterminate_operation_state_(client.get_client_properties().get_boolean(client.get_client_properties().fail_on_indeterminate_state())),
543  backup_timeout_(std::chrono::milliseconds(client.get_client_properties().get_integer(client.get_client_properties().backup_timeout_millis()))) {}
544 
545  void ClientInvocationServiceImpl::start() {
546  }
547 
548  void ClientInvocationServiceImpl::add_backup_listener() {
549  if (this->backup_acks_enabled_) {
550  auto &listener_service = this->client_.get_client_listener_service();
551  listener_service.register_listener(std::make_shared<BackupListenerMessageCodec>(),
552  std::make_shared<noop_backup_event_handler>()).get();
553  }
554  }
555 
556  void ClientInvocationServiceImpl::shutdown() {
557  is_shutdown_.store(true);
558  }
559 
560  std::chrono::milliseconds ClientInvocationServiceImpl::get_invocation_timeout() const {
561  return invocation_timeout_;
562  }
563 
564  std::chrono::milliseconds ClientInvocationServiceImpl::get_invocation_retry_pause() const {
565  return invocation_retry_pause_;
566  }
567 
568  bool ClientInvocationServiceImpl::is_redo_operation() {
569  return client_.get_client_config().is_redo_operation();
570  }
571 
572  void
573  ClientInvocationServiceImpl::handle_client_message(const std::shared_ptr<ClientInvocation> &invocation,
574  const std::shared_ptr<protocol::ClientMessage> &response) {
575  try {
576  if (protocol::codec::ErrorCodec::EXCEPTION_MESSAGE_TYPE == response->get_message_type()) {
577  auto error_holder = protocol::codec::ErrorCodec::decode(*response);
578  invocation->notify_exception(
579  client_.get_client_exception_factory().create_exception(error_holder));
580  } else {
581  invocation->notify(response);
582  }
583  } catch (std::exception &e) {
584  HZ_LOG(logger_, severe,
585  boost::str(boost::format("Failed to process response for %1%. %2%")
586  % *invocation % e.what())
587  );
588  }
589  }
590 
591  bool ClientInvocationServiceImpl::send(const std::shared_ptr<impl::ClientInvocation>& invocation,
592  const std::shared_ptr<connection::Connection>& connection) {
593  if (is_shutdown_) {
594  BOOST_THROW_EXCEPTION(
595  exception::hazelcast_client_not_active("ClientInvocationServiceImpl::send",
596  "Client is shut down"));
597  }
598 
599  if (backup_acks_enabled_) {
600  invocation->get_client_message()->add_flag(protocol::ClientMessage::BACKUP_AWARE_FLAG);
601  }
602 
603  write_to_connection(*connection, invocation);
604  invocation->set_send_connection(connection);
605  return true;
606  }
607 
608  void ClientInvocationServiceImpl::write_to_connection(connection::Connection &connection,
609  const std::shared_ptr<ClientInvocation> &client_invocation) {
610  auto clientMessage = client_invocation->get_client_message();
611  connection.write(client_invocation);
612  }
613 
614  void ClientInvocationServiceImpl::check_invocation_allowed() {
615  client_.get_connection_manager().check_invocation_allowed();
616  }
617 
618  bool ClientInvocationServiceImpl::invoke(std::shared_ptr<ClientInvocation> invocation) {
619  auto connection = client_.get_connection_manager().get_random_connection();
620  if (!connection) {
621  HZ_LOG(logger_, finest, "No connection found to invoke");
622  return false;
623  }
624  return send(invocation, connection);
625  }
626 
627  DefaultAddressProvider::DefaultAddressProvider(config::client_network_config &network_config)
628  : network_config_(network_config) {}
629 
630  std::vector<address> DefaultAddressProvider::load_addresses() {
631  std::vector<address> addresses = network_config_.get_addresses();
632  if (addresses.empty()) {
633  addresses.emplace_back("127.0.0.1", 5701);
634  }
635 
636  // TODO Implement AddressHelper to add alternative ports for the same address
637 
638  return addresses;
639  }
640 
641  boost::optional<address> DefaultAddressProvider::translate(const address &addr) {
642  return addr;
643  }
644 
645  bool DefaultAddressProvider::is_default_provider() {
646  return true;
647  }
648 
649  const boost::shared_ptr<ClientClusterServiceImpl::member_list_snapshot> ClientClusterServiceImpl::EMPTY_SNAPSHOT(
650  new ClientClusterServiceImpl::member_list_snapshot{-1});
651 
652  constexpr boost::chrono::milliseconds ClientClusterServiceImpl::INITIAL_MEMBERS_TIMEOUT;
653  const endpoint_qualifier ClientClusterServiceImpl::CLIENT{1, ""};
654  const endpoint_qualifier ClientClusterServiceImpl::MEMBER{0, ""};
655 
656  ClientClusterServiceImpl::ClientClusterServiceImpl(hazelcast::client::spi::ClientContext &client)
657  : client_(client), member_list_snapshot_(EMPTY_SNAPSHOT),
658  labels_(client.get_client_config().get_labels()),
659  initial_list_fetched_latch_(1) {
660  }
661 
662  boost::uuids::uuid ClientClusterServiceImpl::add_membership_listener_without_init(
663  membership_listener &&listener) {
664  std::lock_guard<std::mutex> g(listeners_lock_);
665  auto id = client_.random_uuid();
666  listeners_.emplace(id, std::move(listener));
667  return id;
668  }
669 
670  boost::optional<member> ClientClusterServiceImpl::get_member(boost::uuids::uuid uuid) const {
671  assert(!uuid.is_nil());
672  auto members_view_ptr = member_list_snapshot_.load();
673  const auto it = members_view_ptr->members.find(uuid);
674  if (it == members_view_ptr->members.end()) {
675  return boost::none;
676  }
677  return {it->second};
678  }
679 
680  std::vector<member> ClientClusterServiceImpl::get_member_list() const {
681  auto members_view_ptr = member_list_snapshot_.load();
682  std::vector<member> result;
683  result.reserve(members_view_ptr->members.size());
684  for (const auto &e : members_view_ptr->members) {
685  result.emplace_back(e.second);
686  }
687  return result;
688  }
689 
690  void ClientClusterServiceImpl::start() {
691  for (auto &listener : client_.get_client_config().get_membership_listeners()) {
692  add_membership_listener(membership_listener(listener));
693  }
694  }
695 
696  void ClientClusterServiceImpl::fire_initial_membership_event(const initial_membership_event &event) {
697  std::lock_guard<std::mutex> g(listeners_lock_);
698 
699  for (auto &item : listeners_) {
700  membership_listener &listener = item.second;
701  if (listener.init_) {
702  listener.init_(event);
703  }
704  }
705  }
706 
707  void ClientClusterServiceImpl::shutdown() {
708  initial_list_fetched_latch_.try_count_down();
709  }
710 
711  boost::uuids::uuid
712  ClientClusterServiceImpl::add_membership_listener(membership_listener &&listener) {
713  std::lock_guard<std::mutex> cluster_view_g(cluster_view_lock_);
714 
715  auto id = add_membership_listener_without_init(std::move(listener));
716 
717  std::lock_guard<std::mutex> listeners_g(listeners_lock_);
718  auto added_listener = listeners_[id];
719 
720  if (added_listener.init_) {
721  auto &cluster = client_.get_cluster();
722  auto members_ptr = member_list_snapshot_.load();
723  if (!members_ptr->members.empty()) {
724  std::unordered_set<member> members;
725  for (const auto &e : members_ptr->members) {
726  members.insert(e.second);
727  }
728  added_listener.init_(initial_membership_event(cluster, members));
729  }
730  }
731 
732  return id;
733  }
734 
735  bool ClientClusterServiceImpl::remove_membership_listener(boost::uuids::uuid registration_id) {
736  std::lock_guard<std::mutex> g(listeners_lock_);
737  return listeners_.erase(registration_id) == 1;
738  }
739 
740  std::vector<member>
741  ClientClusterServiceImpl::get_members(const member_selector &selector) const {
742  std::vector<member> result;
743  for (auto &&member : get_member_list()) {
744  if (selector.select(member)) {
745  result.emplace_back(std::move(member));
746  }
747  }
748 
749  return result;
750  }
751 
752  local_endpoint ClientClusterServiceImpl::get_local_client() const {
753  connection::ClientConnectionManagerImpl &cm = client_.get_connection_manager();
754  auto connection = cm.get_random_connection();
755  auto inetSocketAddress = connection ? connection->get_local_socket_address() : boost::none;
756  auto uuid = cm.get_client_uuid();
757  return local_endpoint(uuid, std::move(inetSocketAddress), client_.get_name(), labels_);
758  }
759 
760  void ClientClusterServiceImpl::clear_member_list_version() {
761  std::lock_guard<std::mutex> g(cluster_view_lock_);
762  auto &lg = client_.get_logger();
763  HZ_LOG(lg, finest, "Resetting the member list version ");
764  auto cluster_view_snapshot = member_list_snapshot_.load();
765  //This check is necessary so that `clear_member_list_version` when handling auth response will not
766  //intervene with client failover logic
767  if (cluster_view_snapshot != EMPTY_SNAPSHOT) {
768  member_list_snapshot_.store(boost::shared_ptr<member_list_snapshot>(
769  new member_list_snapshot{0, cluster_view_snapshot->members}));
770  }
771  }
772 
773  std::vector<membership_event> ClientClusterServiceImpl::clear_member_list_and_return_events() {
774  std::lock_guard<std::mutex> g(cluster_view_lock_);
775 
776  auto &lg = client_.get_logger();
777  HZ_LOG(lg, finest, "Resetting the member list");
778 
779  auto previous_list = member_list_snapshot_.load()->members;
780 
781  member_list_snapshot_.store(
782  boost::shared_ptr<member_list_snapshot>(new member_list_snapshot{0}));
783 
784  return detect_membership_events(previous_list,
785  std::unordered_map<boost::uuids::uuid, member, boost::hash<boost::uuids::uuid>>());
786  }
787 
788  void ClientClusterServiceImpl::clear_member_list() {
789  auto events = clear_member_list_and_return_events();
790  fire_events(std::move(events));
791  }
792 
793  void
794  ClientClusterServiceImpl::handle_event(int32_t version, const std::vector<member> &member_infos) {
795  auto &lg = client_.get_logger();
796  HZ_LOG(lg, finest,
797  boost::str(boost::format("Handling new snapshot with membership version: %1%, "
798  "membersString %2%")
799  % version
800  % members_string(create_snapshot(version, member_infos)))
801  );
802  auto cluster_view_snapshot = member_list_snapshot_.load();
803  if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
804  std::lock_guard<std::mutex> g(cluster_view_lock_);
805  cluster_view_snapshot = member_list_snapshot_.load();
806  if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
807  //this means this is the first time client connected to cluster
808  apply_initial_state(version, member_infos);
809  initial_list_fetched_latch_.count_down();
810  return;
811  }
812  }
813 
814  std::vector<membership_event> events;
815  if (version >= cluster_view_snapshot->version) {
816  std::lock_guard<std::mutex> g(cluster_view_lock_);
817  cluster_view_snapshot = member_list_snapshot_.load();
818  if (version >= cluster_view_snapshot->version) {
819  auto prev_members = cluster_view_snapshot->members;
820  auto snapshot = boost::make_shared<member_list_snapshot>(create_snapshot(version, member_infos));
821  member_list_snapshot_.store(snapshot);
822  events = detect_membership_events(prev_members, snapshot->members);
823  }
824  }
825 
826  fire_events(std::move(events));
827  }
828 
829  ClientClusterServiceImpl::member_list_snapshot
830  ClientClusterServiceImpl::create_snapshot(int32_t version, const std::vector<member> &members) {
831  member_list_snapshot result;
832  result.version = version;
833  for (auto &m : members) {
834  auto const &address_map = m.address_map();
835  if (address_map.empty()) {
836  result.members.insert({m.get_uuid(), m});
837  } else {
838  auto found = address_map.find(CLIENT);
839  address member_address;
840  if (found != address_map.end()) {
841  member_address = found->second;
842  } else {
843  found = address_map.find(MEMBER);
844  assert(found != address_map.end());
845  member_address = found->second;
846  }
847  member new_member(member_address, m.get_uuid(), m.is_lite_member(), m.get_attributes(), m.address_map());
848  result.members.emplace(new_member.get_uuid(), std::move(new_member));
849  }
850  }
851 
852  return result;
853  }
854 
855  std::string
856  ClientClusterServiceImpl::members_string(const ClientClusterServiceImpl::member_list_snapshot& snapshot) {
857  std::stringstream out;
858  auto const &members = snapshot.members;
859  out << std::endl << std::endl << "Members [" << members.size() << "] {";
860  for (auto const &e : members) {
861  out << std::endl << "\t" << e.second;
862  }
863  out << std::endl << "}" << std::endl;
864  return out.str();
865  }
866 
867  void
868  ClientClusterServiceImpl::apply_initial_state(int32_t version, const std::vector<member> &member_infos) {
869  auto snapshot = boost::make_shared<member_list_snapshot>(create_snapshot(version, member_infos));
870  member_list_snapshot_.store(snapshot);
871  HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
872  std::unordered_set<member> members;
873  for(auto const &e : snapshot->members) {
874  members.insert(e.second);
875  }
876  std::lock_guard<std::mutex> g(listeners_lock_);
877  for (auto &item : listeners_) {
878  membership_listener &listener = item.second;
879  if (listener.init_) {
880  listener.init_(initial_membership_event(client_.get_cluster(), members));
881  }
882  }
883  }
884 
885  std::vector<membership_event> ClientClusterServiceImpl::detect_membership_events(
886  std::unordered_map<boost::uuids::uuid, member, boost::hash<boost::uuids::uuid>> previous_members,
887  const std::unordered_map<boost::uuids::uuid, member, boost::hash<boost::uuids::uuid>>& current_members) {
888  std::vector<member> new_members;
889 
890  for (auto const & e : current_members) {
891  if (!previous_members.erase(e.first)) {
892  new_members.emplace_back(e.second);
893  }
894  }
895 
896  std::vector<membership_event> events;
897 
898  // removal events should be added before added events
899  for (auto const &e : previous_members) {
900  events.emplace_back(client_.get_cluster(), e.second, membership_event::membership_event_type::MEMBER_LEFT, current_members);
901  auto connection = client_.get_connection_manager().get_connection(e.second.get_uuid());
902  if (connection) {
903  connection->close("", std::make_exception_ptr(exception::target_disconnected(
904  "ClientClusterServiceImpl::detect_membership_events", (boost::format(
905  "The client has closed the connection to this member, after receiving a member left event from the cluster. %1%") %
906  *connection).str())));
907  }
908  }
909  for (auto const &member : new_members) {
910  events.emplace_back(client_.get_cluster(), member, membership_event::membership_event_type::MEMBER_JOINED, current_members);
911  }
912 
913  if (!events.empty()) {
914  auto snapshot = member_list_snapshot_.load();
915  if (!snapshot->members.empty()) {
916  HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
917  }
918  }
919  return events;
920  }
921 
922  void ClientClusterServiceImpl::fire_events(std::vector<membership_event> events) {
923  std::lock_guard<std::mutex> g(listeners_lock_);
924 
925  for (auto const &event : events) {
926  for (auto &item : listeners_) {
927  membership_listener &listener = item.second;
928  if (event.get_event_type() == membership_event::membership_event_type::MEMBER_JOINED) {
929  listener.joined_(event);
930  } else {
931  listener.left_(event);
932  }
933  }
934  }
935  }
936 
937  void ClientClusterServiceImpl::wait_initial_member_list_fetched() const {
938  // safe to const cast here since latch operations are already thread safe ops.
939  if ((const_cast<boost::latch&>(initial_list_fetched_latch_)).wait_for(INITIAL_MEMBERS_TIMEOUT) == boost::cv_status::timeout) {
940  BOOST_THROW_EXCEPTION(exception::illegal_state(
941  "ClientClusterServiceImpl::wait_initial_member_list_fetched",
942  "Could not get initial member list from cluster!"));
943  }
944  }
945 
946  bool
947  ClientInvocationServiceImpl::invoke_on_connection(const std::shared_ptr<ClientInvocation> &invocation,
948  const std::shared_ptr<connection::Connection> &connection) {
949  return send(invocation, connection);
950  }
951 
952  bool ClientInvocationServiceImpl::invoke_on_partition_owner(
953  const std::shared_ptr<ClientInvocation> &invocation, int partition_id) {
954  auto partition_owner = client_.get_partition_service().get_partition_owner(partition_id);
955  if (partition_owner.is_nil()) {
956  HZ_LOG(logger_, finest,
957  boost::str(boost::format("Partition owner is not assigned yet for partition %1%")
958  % partition_id)
959  );
960  return false;
961  }
962  return invoke_on_target(invocation, partition_owner);
963  }
964 
965  bool ClientInvocationServiceImpl::invoke_on_target(const std::shared_ptr<ClientInvocation> &invocation,
966  boost::uuids::uuid uuid) {
967  assert (!uuid.is_nil());
968  auto connection = client_.get_connection_manager().get_connection(uuid);
969  if (!connection) {
970  HZ_LOG(logger_, finest,
971  boost::str(boost::format("Client is not connected to target : %1%")
972  % uuid)
973  );
974  return false;
975  }
976  return send(invocation, connection);
977  }
978 
979  bool ClientInvocationServiceImpl::is_smart_routing() const {
980  return smart_routing_;
981  }
982 
983  const std::chrono::milliseconds &ClientInvocationServiceImpl::get_backup_timeout() const {
984  return backup_timeout_;
985  }
986 
987  bool ClientInvocationServiceImpl::fail_on_indeterminate_state() const {
988  return fail_on_indeterminate_operation_state_;
989  }
990 
991  ClientExecutionServiceImpl::ClientExecutionServiceImpl(const std::string &name,
992  const client_properties &properties,
993  int32_t pool_size,
994  spi::lifecycle_service &service)
995  : lifecycle_service_(service), client_properties_(properties) {}
996 
997  void ClientExecutionServiceImpl::start() {
998  int internalPoolSize = client_properties_.get_integer(
999  client_properties_.get_internal_executor_pool_size());
1000  if (internalPoolSize <= 0) {
1001  internalPoolSize = util::IOUtil::to_value<int>(
1002  client_properties::INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT);
1003  }
1004 
1005  internal_executor_.reset(new hazelcast::util::hz_thread_pool(internalPoolSize));
1006 
1007  user_executor_.reset(new hazelcast::util::hz_thread_pool());
1008  }
1009 
1010  void ClientExecutionServiceImpl::shutdown() {
1011  shutdown_thread_pool(internal_executor_.get());
1012  shutdown_thread_pool(user_executor_.get());
1013  }
1014 
1015  util::hz_thread_pool &ClientExecutionServiceImpl::get_user_executor() {
1016  return *user_executor_;
1017  }
1018 
1019  void ClientExecutionServiceImpl::shutdown_thread_pool(hazelcast::util::hz_thread_pool *pool) {
1020  if (!pool) {
1021  return;
1022  }
1023  pool->close();
1024  }
1025 
1026  constexpr int ClientInvocation::MAX_FAST_INVOCATION_COUNT;
1027  constexpr int ClientInvocation::UNASSIGNED_PARTITION;
1028 
1029  ClientInvocation::ClientInvocation(spi::ClientContext &client_context,
1030  std::shared_ptr<protocol::ClientMessage> &&message,
1031  const std::string &name,
1032  int partition,
1033  const std::shared_ptr<connection::Connection> &conn,
1034  boost::uuids::uuid uuid) :
1035  logger_(client_context.get_logger()),
1036  lifecycle_service_(client_context.get_lifecycle_service()),
1037  invocation_service_(client_context.get_invocation_service()),
1038  execution_service_(client_context.get_client_execution_service().shared_from_this()),
1039  call_id_sequence_(client_context.get_call_id_sequence()),
1040  uuid_(uuid),
1041  partition_id_(partition),
1042  start_time_(std::chrono::steady_clock::now()),
1043  retry_pause_(invocation_service_.get_invocation_retry_pause()),
1044  object_name_(name),
1045  connection_(conn), bound_to_single_connection_(conn != nullptr),
1046  invoke_count_(0), urgent_(false), smart_routing_(invocation_service_.is_smart_routing()) {
1047  message->set_partition_id(partition_id_);
1048  client_message_ = boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(message);
1049  set_send_connection(nullptr);
1050  }
1051 
1052  ClientInvocation::~ClientInvocation() = default;
1053 
1054  boost::future<protocol::ClientMessage> ClientInvocation::invoke() {
1055  assert (client_message_.load());
1056  // for back pressure
1057  call_id_sequence_->next();
1058  invoke_on_selection();
1059  if (!lifecycle_service_.is_running()) {
1060  return invocation_promise_.get_future().then([](boost::future<protocol::ClientMessage> f) {
1061  return f.get();
1062  });
1063  }
1064  auto id_seq = call_id_sequence_;
1065  return invocation_promise_.get_future().then(execution_service_->get_user_executor(),
1066  [=](boost::future<protocol::ClientMessage> f) {
1067  id_seq->complete();
1068  return f.get();
1069  });
1070  }
1071 
1072  boost::future<protocol::ClientMessage> ClientInvocation::invoke_urgent() {
1073  assert(client_message_.load());
1074  urgent_ = true;
1075  // for back pressure
1076  call_id_sequence_->force_next();
1077  invoke_on_selection();
1078  if (!lifecycle_service_.is_running()) {
1079  return invocation_promise_.get_future().then([](boost::future<protocol::ClientMessage> f) {
1080  return f.get();
1081  });
1082  }
1083  auto id_seq = call_id_sequence_;
1084  return invocation_promise_.get_future().then(execution_service_->get_user_executor(),
1085  [=](boost::future<protocol::ClientMessage> f) {
1086  id_seq->complete();
1087  return f.get();
1088  });
1089  }
1090 
1091  void ClientInvocation::invoke_on_selection() {
1092  try {
1093  invoke_count_++;
1094  if (!urgent_) {
1095  invocation_service_.check_invocation_allowed();
1096  }
1097 
1098  if (is_bind_to_single_connection()) {
1099  bool invoked = false;
1100  auto conn = connection_.lock();
1101  if (conn) {
1102  invoked = invocation_service_.invoke_on_connection(shared_from_this(), conn);
1103  }
1104  if (!invoked) {
1105  std::string message;
1106  if (conn) {
1107  message = (boost::format("Could not invoke on connection %1%") % *conn).str();
1108  } else {
1109  message = "Could not invoke. Bound to a connection that is deleted already.";
1110  }
1111  notify_exception(std::make_exception_ptr(
1112  exception::io("ClientInvocation::invoke_on_selection", message)));
1113  }
1114  return;
1115  }
1116 
1117  bool invoked = false;
1118  if (smart_routing_) {
1119  if (partition_id_ != -1) {
1120  invoked = invocation_service_.invoke_on_partition_owner(shared_from_this(), partition_id_);
1121  } else if (!uuid_.is_nil()) {
1122  invoked = invocation_service_.invoke_on_target(shared_from_this(), uuid_);
1123  } else {
1124  invoked = invocation_service_.invoke(shared_from_this());
1125  }
1126  if (!invoked) {
1127  invoked = invocation_service_.invoke(shared_from_this());
1128  }
1129  } else {
1130  invoked = invocation_service_.invoke(shared_from_this());
1131  }
1132  if (!invoked) {
1133  notify_exception(std::make_exception_ptr(exception::io("No connection found to invoke")));
1134  }
1135  } catch (exception::iexception &) {
1136  notify_exception(std::current_exception());
1137  } catch (std::exception &) {
1138  assert(false);
1139  }
1140  }
1141 
1142  bool ClientInvocation::is_bind_to_single_connection() const {
1143  return bound_to_single_connection_;
1144  }
1145 
1146  void ClientInvocation::run() {
1147  retry();
1148  }
1149 
1150  void ClientInvocation::retry() {
1151  // retry modifies the client message and should not reuse the client message.
1152  // It could be the case that it is in write queue of the connection.
1153  client_message_ = boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(copy_message());
1154 
1155  try {
1156  invoke_on_selection();
1157  } catch (exception::iexception &e) {
1158  set_exception(e, boost::current_exception());
1159  } catch (std::exception &) {
1160  assert(false);
1161  }
1162  }
1163 
1164  void ClientInvocation::set_exception(const std::exception &e, boost::exception_ptr exception_ptr) {
1165  invoked_or_exception_set_.store(true);
1166  try {
1167  auto send_conn = send_connection_.load();
1168  if (send_conn) {
1169  auto connection = send_conn->lock();
1170  if (connection) {
1171  auto call_id = client_message_.load()->get()->get_correlation_id();
1172  boost::asio::post(connection->get_socket().get_executor(), [=]() {
1173  connection->deregister_invocation(call_id);
1174  });
1175  }
1176  }
1177  invocation_promise_.set_exception(std::move(exception_ptr));
1178  } catch (boost::promise_already_satisfied &se) {
1179  if (!event_handler_) {
1180  HZ_LOG(logger_, finest,
1181  boost::str(boost::format("Failed to set the exception for invocation. "
1182  "%1%, %2% Exception to be set: %3%")
1183  % se.what() % *this % e.what())
1184  );
1185  }
1186  }
1187  }
1188 
1189  void ClientInvocation::notify_exception(std::exception_ptr exception) {
1190  erase_invocation();
1191  try {
1192  std::rethrow_exception(exception);
1193  } catch (exception::iexception &iex) {
1194  log_exception(iex);
1195 
1196  if (!lifecycle_service_.is_running()) {
1197  try {
1198  std::throw_with_nested(boost::enable_current_exception(
1199  exception::hazelcast_client_not_active(iex.get_source(),
1200  "Client is shutting down")));
1201  } catch (exception::iexception &e) {
1202  set_exception(e, boost::current_exception());
1203  }
1204  return;
1205  }
1206 
1207  if (!should_retry(iex)) {
1208  set_exception(iex, boost::current_exception());
1209  return;
1210  }
1211 
1212  auto timePassed = std::chrono::steady_clock::now() - start_time_;
1213  if (timePassed > invocation_service_.get_invocation_timeout()) {
1214  HZ_LOG(logger_, finest,
1215  boost::str(boost::format("Exception will not be retried because "
1216  "invocation timed out. %1%") % iex.what())
1217  );
1218 
1219  auto now = std::chrono::steady_clock::now();
1220 
1221  auto timeoutException = (exception::exception_builder<exception::operation_timeout>(
1222  "ClientInvocation::newoperation_timeout_exception") << *this
1223  << " timed out because exception occurred after client invocation timeout "
1224  << std::chrono::duration_cast<std::chrono::milliseconds>(invocation_service_.get_invocation_timeout()).count()
1225  << "msecs. Last exception:" << iex
1226  << " Current time :" << util::StringUtil::time_to_string(now) << ". "
1227  << "Start time: " << util::StringUtil::time_to_string(start_time_)
1228  << ". Total elapsed time: " <<
1229  std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time_).count()
1230  << " ms. ").build();
1231  try {
1232  BOOST_THROW_EXCEPTION(timeoutException);
1233  } catch (...) {
1234  set_exception(timeoutException, boost::current_exception());
1235  }
1236 
1237  return;
1238  }
1239 
1240  try {
1241  execute();
1242  } catch (std::exception &e) {
1243  set_exception(e, boost::current_exception());
1244  }
1245  } catch (...) {
1246  assert(false);
1247  }
1248  }
1249 
1250  void ClientInvocation::erase_invocation() const {
1251  if (!this->event_handler_) {
1252  auto sent_connection = get_send_connection();
1253  if (sent_connection) {
1254  auto this_invocation = shared_from_this();
1255  boost::asio::post(sent_connection->get_socket().get_executor(), [=] () {
1256  sent_connection->invocations.erase(this_invocation->get_client_message()->get_correlation_id());
1257  });
1258  }
1259  }
1260  }
1261 
1262  bool ClientInvocation::should_retry(exception::iexception &exception) {
1263  auto errorCode = exception.get_error_code();
1264  if (is_bind_to_single_connection() && (errorCode == protocol::IO || errorCode == protocol::TARGET_DISCONNECTED)) {
1265  return false;
1266  }
1267 
1268  if (!uuid_.is_nil() && errorCode == protocol::TARGET_NOT_MEMBER) {
1269  //when invocation send to a specific member
1270  //if target is no longer a member, we should not retry
1271  //note that this exception could come from the server
1272  return false;
1273  }
1274 
1275  if (errorCode == protocol::IO || errorCode == protocol::HAZELCAST_INSTANCE_NOT_ACTIVE || exception.is_retryable()) {
1276  return true;
1277  }
1278  if (errorCode == protocol::TARGET_DISCONNECTED) {
1279  return client_message_.load()->get()->is_retryable() || invocation_service_.is_redo_operation();
1280  }
1281  return false;
1282  }
1283 
1284  std::ostream &operator<<(std::ostream &os, const ClientInvocation &invocation) {
1285  std::ostringstream target;
1286  if (invocation.is_bind_to_single_connection()) {
1287  auto conn = invocation.connection_.lock();
1288  if (conn) {
1289  target << "connection " << *conn;
1290  }
1291  } else if (invocation.partition_id_ != -1) {
1292  target << "partition " << invocation.partition_id_;
1293  } else if (!invocation.uuid_.is_nil()) {
1294  target << "uuid " << boost::to_string(invocation.uuid_);
1295  } else {
1296  target << "random";
1297  }
1298  os << "ClientInvocation{" << "requestMessage = " << *invocation.client_message_.load()->get()
1299  << ", objectName = "
1300  << invocation.object_name_ << ", target = " << target.str() << ", sendConnection = ";
1301  auto sendConnection = invocation.get_send_connection();
1302  if (sendConnection) {
1303  os << *sendConnection;
1304  } else {
1305  os << "nullptr";
1306  }
1307  os << ", backup_acks_expected_ = " << static_cast<int>(invocation.backup_acks_expected_)
1308  << ", backup_acks_received = " << invocation.backup_acks_received_;
1309 
1310  if (invocation.pending_response_) {
1311  os << ", pending_response: " << *invocation.pending_response_;
1312  }
1313 
1314  os << '}';
1315 
1316  return os;
1317  }
1318 
1319  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1320  std::shared_ptr<protocol::ClientMessage> &&client_message,
1321  const std::string &object_name,
1322  int partition_id) {
1323  return std::shared_ptr<ClientInvocation>(
1324  new ClientInvocation(client_context, std::move(client_message), object_name, partition_id));
1325  }
1326 
1327  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1328  std::shared_ptr<protocol::ClientMessage> &&client_message,
1329  const std::string &object_name,
1330  const std::shared_ptr<connection::Connection> &connection) {
1331  return std::shared_ptr<ClientInvocation>(
1332  new ClientInvocation(client_context, std::move(client_message), object_name, UNASSIGNED_PARTITION,
1333  connection));
1334  }
1335 
1336 
1337  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1338  std::shared_ptr<protocol::ClientMessage> &&client_message,
1339  const std::string &object_name,
1340  boost::uuids::uuid uuid) {
1341  return std::shared_ptr<ClientInvocation>(
1342  new ClientInvocation(client_context, std::move(client_message), object_name, UNASSIGNED_PARTITION,
1343  nullptr, uuid));
1344  }
1345 
1346  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1347  protocol::ClientMessage &client_message,
1348  const std::string &object_name,
1349  int partition_id) {
1350  return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1351  object_name, partition_id);
1352  }
1353 
1354  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1355  protocol::ClientMessage &client_message,
1356  const std::string &object_name,
1357  const std::shared_ptr<connection::Connection> &connection) {
1358  return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1359  object_name, connection);
1360  }
1361 
1362  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1363  protocol::ClientMessage &client_message,
1364  const std::string &object_name,
1365  boost::uuids::uuid uuid) {
1366  return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1367  object_name, uuid);
1368  }
1369 
1370  std::shared_ptr<connection::Connection> ClientInvocation::get_send_connection() const {
1371  return send_connection_.load()->lock();
1372  }
1373 
1374  void ClientInvocation::wait_invoked() const {
1375  //it could be either invoked or cancelled before invoked
1376  while (!invoked_or_exception_set_) {
1377  std::this_thread::sleep_for(retry_pause_);
1378  }
1379  }
1380 
1381  void
1382  ClientInvocation::set_send_connection(const std::shared_ptr<connection::Connection> &conn) {
1383  send_connection_.store(boost::make_shared<std::weak_ptr<connection::Connection>>(conn));
1384  invoked_or_exception_set_.store(true);
1385  }
1386 
1387  void ClientInvocation::notify(const std::shared_ptr<protocol::ClientMessage> &msg) {
1388  if (!msg) {
1389  BOOST_THROW_EXCEPTION(exception::illegal_argument("response can't be null"));
1390  }
1391 
1392  int8_t expected_backups = msg->get_number_of_backups();
1393 
1394  // if a regular response comes and there are backups, we need to wait for the backups
1395  // when the backups complete, the response will be send by the last backup or backup-timeout-handle mechanism kicks on
1396  if (expected_backups > backup_acks_received_) {
1397  // so the invocation has backups and since not all backups have completed, we need to wait
1398  // (it could be that backups arrive earlier than the response)
1399 
1400  pending_response_received_time_ = std::chrono::steady_clock::now();
1401 
1402  backup_acks_expected_ = expected_backups;
1403 
1404  // it is very important that the response is set after the backupsAcksExpected is set, else the system
1405  // can assume the invocation is complete because there is a response and no backups need to respond
1406  pending_response_ = msg;
1407 
1408  // we are done since not all backups have completed. Therefore we should not notify the future
1409  return;
1410  }
1411 
1412  // we are going to notify the future that a response is available; this can happen when:
1413  // - we had a regular operation (so no backups we need to wait for) that completed
1414  // - we had a backup-aware operation that has completed, but also all its backups have completed
1415  complete(msg);
1416  }
1417 
1418  void ClientInvocation::complete(const std::shared_ptr<protocol::ClientMessage> &msg) {
1419  try {
1420  // TODO: move msg content here?
1421  this->invocation_promise_.set_value(*msg);
1422  } catch (std::exception &e) {
1423  HZ_LOG(logger_, warning,
1424  boost::str(boost::format("Failed to set the response for invocation. "
1425  "Dropping the response. %1%, %2% Response: %3%")
1426  % e.what() % *this % *msg)
1427  );
1428  }
1429  this->erase_invocation();
1430  }
1431 
1432  std::shared_ptr<protocol::ClientMessage> ClientInvocation::get_client_message() const {
1433  return *client_message_.load();
1434  }
1435 
1436  const std::shared_ptr<EventHandler<protocol::ClientMessage> > &
1437  ClientInvocation::get_event_handler() const {
1438  return event_handler_;
1439  }
1440 
1441  void ClientInvocation::set_event_handler(
1442  const std::shared_ptr<EventHandler<protocol::ClientMessage> > &handler) {
1443  ClientInvocation::event_handler_ = handler;
1444  }
1445 
1446  void ClientInvocation::execute() {
1447  auto this_invocation = shared_from_this();
1448  auto command = [=]() {
1449  this_invocation->run();
1450  };
1451 
1452  // first we force a new invocation slot because we are going to return our old invocation slot immediately after
1453  // It is important that we first 'force' taking a new slot; otherwise it could be that a sneaky invocation gets
1454  // through that takes our slot!
1455  int64_t callId = call_id_sequence_->force_next();
1456  client_message_.load()->get()->set_correlation_id(callId);
1457 
1458  //we release the old slot
1459  call_id_sequence_->complete();
1460 
1461  if (invoke_count_ < MAX_FAST_INVOCATION_COUNT) {
1462  // fast retry for the first few invocations
1463  execution_service_->execute(command);
1464  } else {
1465  // progressive retry delay
1466  int64_t delayMillis = util::min<int64_t>(static_cast<int64_t>(1) << (invoke_count_ - MAX_FAST_INVOCATION_COUNT),
1467  std::chrono::duration_cast<std::chrono::milliseconds>(retry_pause_).count());
1468  retry_timer_ = execution_service_->schedule(command, std::chrono::milliseconds(delayMillis));
1469  }
1470  }
1471 
1472  const std::string ClientInvocation::get_name() const {
1473  return "ClientInvocation";
1474  }
1475 
1476  std::shared_ptr<protocol::ClientMessage> ClientInvocation::copy_message() {
1477  return std::make_shared<protocol::ClientMessage>(**client_message_.load());
1478  }
1479 
1480  boost::promise<protocol::ClientMessage> &ClientInvocation::get_promise() {
1481  return invocation_promise_;
1482  }
1483 
1484  void ClientInvocation::log_exception(exception::iexception &e) {
1485  HZ_LOG(logger_, finest,
1486  boost::str(boost::format("Invocation got an exception %1%, invoke count : %2%, "
1487  "exception : %3%")
1488  % *this % invoke_count_.load() % e)
1489  );
1490  }
1491 
1492  void ClientInvocation::notify_backup() {
1493  ++backup_acks_received_;
1494 
1495  if (!pending_response_) {
1496  // no pendingResponse has been set, so we are done since the invocation on the primary needs to complete first
1497  return;
1498  }
1499 
1500  // if a pendingResponse is set, then the backupsAcksExpected has been set (so we can now safely read backupsAcksExpected)
1501  if (backup_acks_expected_ != backup_acks_received_) {
1502  // we managed to complete a backup, but we were not the one completing the last backup, so we are done
1503  return;
1504  }
1505 
1506  // we are the lucky one since we just managed to complete the last backup for this invocation and since the
1507  // pendingResponse is set, we can set it on the future
1508  complete_with_pending_response();
1509  }
1510 
1511  void
1512  ClientInvocation::detect_and_handle_backup_timeout(const std::chrono::milliseconds &backup_timeout) {
1513  // if the backups have completed, we are done; this also filters out all non backup-aware operations
1514  // since the backupsAcksExpected will always be equal to the backupsAcksReceived
1515  if (backup_acks_expected_ == backup_acks_received_) {
1516  return;
1517  }
1518 
1519  // if no response has yet been received, we we are done; we are only going to re-invoke an operation
1520  // if the response of the primary has been received, but the backups have not replied
1521  if (!pending_response_) {
1522  return;
1523  }
1524 
1525  // if this has not yet expired (so has not been in the system for a too long period) we ignore it
1526  if (pending_response_received_time_ + backup_timeout >= std::chrono::steady_clock::now()) {
1527  return;
1528  }
1529 
1530  if (invocation_service_.fail_on_indeterminate_state()) {
1531  auto exception = boost::enable_current_exception((exception::exception_builder<exception::indeterminate_operation_state>(
1532  "ClientInvocation::detect_and_handle_backup_timeout") << *this
1533  << " failed because backup acks missed.").build());
1534  notify_exception(std::make_exception_ptr(exception));
1535  return;
1536  }
1537 
1538  // the backups have not yet completed, but we are going to release the future anyway if a pendingResponse has been set
1539  complete_with_pending_response();
1540  }
1541 
1542  void ClientInvocation::complete_with_pending_response() {
1543  complete(pending_response_);
1544  }
1545 
1546  ClientContext &impl::ClientTransactionManagerServiceImpl::get_client() const {
1547  return client_;
1548  }
1549 
1550  ClientTransactionManagerServiceImpl::ClientTransactionManagerServiceImpl(ClientContext &client)
1551  : client_(client) {}
1552 
1553  std::shared_ptr<connection::Connection> ClientTransactionManagerServiceImpl::connect() {
1554  auto &invocationService = client_.get_invocation_service();
1555  auto startTime = std::chrono::steady_clock::now();
1556  auto invocationTimeout = invocationService.get_invocation_timeout();
1557  client_config &clientConfig = client_.get_client_config();
1558  bool smartRouting = clientConfig.get_network_config().is_smart_routing();
1559 
1560  while (client_.get_lifecycle_service().is_running()) {
1561  try {
1562  auto connection = client_.get_connection_manager().get_random_connection();
1563  if (!connection) {
1564  throw_exception(smartRouting);
1565  }
1566  return connection;
1567  } catch (exception::hazelcast_client_offline &) {
1568  throw;
1569  } catch (exception::iexception &) {
1570  if (std::chrono::steady_clock::now() - startTime > invocationTimeout) {
1571  std::rethrow_exception(
1572  new_operation_timeout_exception(std::current_exception(), invocationTimeout,
1573  startTime));
1574  }
1575  }
1576  std::this_thread::sleep_for(invocationService.get_invocation_retry_pause());
1577  }
1578  BOOST_THROW_EXCEPTION(
1579  exception::hazelcast_client_not_active("ClientTransactionManagerServiceImpl::connect",
1580  "Client is shutdown"));
1581  }
1582 
1583  std::exception_ptr
1584  ClientTransactionManagerServiceImpl::new_operation_timeout_exception(std::exception_ptr cause,
1585  std::chrono::milliseconds invocation_timeout,
1586  std::chrono::steady_clock::time_point start_time) {
1587  std::ostringstream sb;
1588  auto now = std::chrono::steady_clock::now();
1589  sb
1590  << "Creating transaction context timed out because exception occurred after client invocation timeout "
1591  << std::chrono::duration_cast<std::chrono::milliseconds>(invocation_timeout).count() << " ms. " << "Current time: "
1592  << util::StringUtil::time_to_string(std::chrono::steady_clock::now()) << ". " << "Start time: "
1593  << util::StringUtil::time_to_string(start_time) << ". Total elapsed time: "
1594  << std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time).count() << " ms. ";
1595  try {
1596  std::rethrow_exception(cause);
1597  } catch (...) {
1598  try {
1599  std::throw_with_nested(boost::enable_current_exception(exception::operation_timeout(
1600  "ClientTransactionManagerServiceImpl::newoperation_timeout_exception", sb.str())));
1601  } catch (...) {
1602  return std::current_exception();
1603  }
1604  }
1605  return nullptr;
1606  }
1607 
1608  void ClientTransactionManagerServiceImpl::throw_exception(bool smart_routing) {
1609  auto &client_config = client_.get_client_config();
1610  auto &connection_strategy_Config = client_config.get_connection_strategy_config();
1611  auto reconnect_mode = connection_strategy_Config.get_reconnect_mode();
1612  if (reconnect_mode == config::client_connection_strategy_config::reconnect_mode::ASYNC) {
1613  BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
1614  "ClientTransactionManagerServiceImpl::throw_exception", ""));
1615  }
1616  if (smart_routing) {
1617  auto members = client_.get_cluster().get_members();
1618  std::ostringstream msg;
1619  if (members.empty()) {
1620  msg << "No address was return by the LoadBalancer since there are no members in the cluster";
1621  } else {
1622  msg << "No address was return by the LoadBalancer. "
1623  "But the cluster contains the following members:{\n";
1624  for (auto const &m : members) {
1625  msg << '\t' << m << '\n';
1626  }
1627  msg << "}";
1628  }
1629  BOOST_THROW_EXCEPTION(exception::illegal_state(
1630  "ClientTransactionManagerServiceImpl::throw_exception", msg.str()));
1631  }
1632  BOOST_THROW_EXCEPTION(exception::illegal_state(
1633  "ClientTransactionManagerServiceImpl::throw_exception",
1634  "No active connection is found"));
1635  }
1636 
1637  ClientPartitionServiceImpl::ClientPartitionServiceImpl(ClientContext &client)
1638  : client_(client), logger_(client.get_logger()), partition_count_(0),
1639  partition_table_(boost::shared_ptr<partition_table>(new partition_table{0, -1})) {
1640  }
1641 
1642  void ClientPartitionServiceImpl::handle_event(int32_t connection_id, int32_t version,
1643  const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
1644  HZ_LOG(logger_, finest,
1645  boost::str(boost::format("Handling new partition table with partitionStateVersion: %1%") % version)
1646  );
1647 
1648  while (true) {
1649  auto current = partition_table_.load();
1650  if (!should_be_applied(connection_id, version, partitions, *current)) {
1651  return;
1652  }
1653  if (partition_table_.compare_exchange_strong(current, boost::shared_ptr<partition_table>(
1654  new partition_table{connection_id, version, convert_to_map(partitions)}))) {
1655  HZ_LOG(logger_, finest,
1656  boost::str(boost::format("Applied partition table with partitionStateVersion : %1%") % version)
1657  );
1658  return;
1659  }
1660 
1661  }
1662  }
1663 
1664  boost::uuids::uuid ClientPartitionServiceImpl::get_partition_owner(int32_t partition_id) {
1665  auto table_ptr = partition_table_.load();
1666  auto it = table_ptr->partitions.find(partition_id);
1667  if (it != table_ptr->partitions.end()) {
1668  return it->second;
1669  }
1670  return boost::uuids::nil_uuid();
1671  }
1672 
1673  int32_t ClientPartitionServiceImpl::get_partition_id(const serialization::pimpl::data &key) {
1674  int32_t pc = get_partition_count();
1675  if (pc <= 0) {
1676  return 0;
1677  }
1678  int hash = key.get_partition_hash();
1679  return util::HashUtil::hash_to_index(hash, pc);
1680  }
1681 
1682  int32_t ClientPartitionServiceImpl::get_partition_count() {
1683  return partition_count_.load();
1684  }
1685 
1686  std::shared_ptr<client::impl::Partition> ClientPartitionServiceImpl::get_partition(int partition_id) {
1687  return std::shared_ptr<client::impl::Partition>(new PartitionImpl(partition_id, client_, *this));
1688  }
1689 
1690  bool ClientPartitionServiceImpl::check_and_set_partition_count(int32_t new_partition_count) {
1691  int32_t expected = 0;
1692  if (partition_count_.compare_exchange_strong(expected, new_partition_count)) {
1693  return true;
1694  }
1695  return partition_count_.load() == new_partition_count;
1696  }
1697 
1698  bool
1699  ClientPartitionServiceImpl::should_be_applied(int32_t connection_id, int32_t version,
1700  const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions,
1701  const partition_table &current) {
1702  auto &lg = client_.get_logger();
1703  if (partitions.empty()) {
1704  if (logger_.enabled(logger::level::finest)) {
1705  log_failure(connection_id, version, current, "response is empty");
1706  }
1707  return false;
1708  }
1709  if (!current.connection_id || connection_id != current.connection_id) {
1710  HZ_LOG(lg, finest,
1711  ([&current, connection_id](){
1712  auto frmt = boost::format("Event coming from a new connection. Old connection id: %1%, "
1713  "new connection %2%");
1714 
1715  if (current.connection_id) {
1716  frmt = frmt % current.connection_id;
1717  } else {
1718  frmt = frmt % "none";
1719  }
1720 
1721  return boost::str(frmt % connection_id);
1722  })()
1723  );
1724 
1725  return true;
1726  }
1727  if (version <= current.version) {
1728  if (lg.enabled(logger::level::finest)) {
1729  log_failure(connection_id, version, current, "response state version is old");
1730  }
1731  return false;
1732  }
1733  return true;
1734  }
1735 
1736  void ClientPartitionServiceImpl::log_failure(int32_t connection_id, int32_t version,
1737  const ClientPartitionServiceImpl::partition_table &current,
1738  const std::string &cause) {
1739  HZ_LOG(logger_, finest,
1740  [&](){
1741  auto frmt = boost::format(" We will not apply the response, since %1% ."
1742  " Response is from connection with id %2%. "
1743  "Current connection id is %3%, response state version:%4%. "
1744  "Current state version: %5%");
1745  if (current.connection_id) {
1746  return boost::str(frmt % cause % connection_id % current.connection_id % version %
1747  current.version);
1748  }
1749  else {
1750  return boost::str(frmt % cause % connection_id % "nullptr" % version % current.version);
1751  }
1752  }()
1753  );
1754  }
1755 
1756  void ClientPartitionServiceImpl::reset() {
1757  partition_table_.store(nullptr);
1758  }
1759 
1760  std::unordered_map<int32_t, boost::uuids::uuid> ClientPartitionServiceImpl::convert_to_map(
1761  const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
1762  std::unordered_map<int32_t, boost::uuids::uuid> new_partitions;
1763  for (auto const &e : partitions) {
1764  for (auto pid: e.second) {
1765  new_partitions.insert({pid, e.first});
1766  }
1767  }
1768  return new_partitions;
1769  }
1770 
1771  int ClientPartitionServiceImpl::PartitionImpl::get_partition_id() const {
1772  return partition_id_;
1773  }
1774 
1775  boost::optional<member> ClientPartitionServiceImpl::PartitionImpl::get_owner() const {
1776  auto owner = partition_service_.get_partition_owner(partition_id_);
1777  if (!owner.is_nil()) {
1778  return client_.get_client_cluster_service().get_member(owner);
1779  }
1780  return boost::none;
1781  }
1782 
1783  ClientPartitionServiceImpl::PartitionImpl::PartitionImpl(int partition_id, ClientContext &client,
1784  ClientPartitionServiceImpl &partition_service)
1785  : partition_id_(partition_id), client_(client), partition_service_(partition_service) {
1786  }
1787 
1788  namespace sequence {
1789  CallIdSequenceWithoutBackpressure::CallIdSequenceWithoutBackpressure() : head_(0) {}
1790 
1791  CallIdSequenceWithoutBackpressure::~CallIdSequenceWithoutBackpressure() = default;
1792 
1793  int32_t CallIdSequenceWithoutBackpressure::get_max_concurrent_invocations() const {
1794  return INT32_MAX;
1795  }
1796 
1797  int64_t CallIdSequenceWithoutBackpressure::next() {
1798  return force_next();
1799  }
1800 
1801  int64_t CallIdSequenceWithoutBackpressure::force_next() {
1802  return ++head_;
1803  }
1804 
1805  void CallIdSequenceWithoutBackpressure::complete() {
1806  // no-op
1807  }
1808 
1809  int64_t CallIdSequenceWithoutBackpressure::get_last_call_id() {
1810  return head_;
1811  }
1812 
1813  // TODO: see if we can utilize std::hardware_destructive_interference_size
1814  AbstractCallIdSequence::AbstractCallIdSequence(int32_t max_concurrent_invocations) {
1815  std::ostringstream out;
1816  out << "maxConcurrentInvocations should be a positive number. maxConcurrentInvocations="
1817  << max_concurrent_invocations;
1818  this->max_concurrent_invocations_ = util::Preconditions::check_positive(max_concurrent_invocations,
1819  out.str());
1820 
1821  for (size_t i = 0; i < longs_.size(); ++i) {
1822  longs_[i] = 0;
1823  }
1824  }
1825 
1826  AbstractCallIdSequence::~AbstractCallIdSequence() = default;
1827 
1828  int32_t AbstractCallIdSequence::get_max_concurrent_invocations() const {
1829  return max_concurrent_invocations_;
1830  }
1831 
1832  int64_t AbstractCallIdSequence::next() {
1833  if (!has_space()) {
1834  handle_no_space_left();
1835  }
1836  return force_next();
1837  }
1838 
1839  int64_t AbstractCallIdSequence::force_next() {
1840  return ++longs_[INDEX_HEAD];
1841  }
1842 
1843  void AbstractCallIdSequence::complete() {
1844  ++longs_[INDEX_TAIL];
1845  assert(longs_[INDEX_TAIL] <= longs_[INDEX_HEAD]);
1846  }
1847 
1848  int64_t AbstractCallIdSequence::get_last_call_id() {
1849  return longs_[INDEX_HEAD];
1850  }
1851 
1852  bool AbstractCallIdSequence::has_space() {
1853  return longs_[INDEX_HEAD] - longs_[INDEX_TAIL] < max_concurrent_invocations_;
1854  }
1855 
1856  int64_t AbstractCallIdSequence::get_tail() {
1857  return longs_[INDEX_TAIL];
1858  }
1859 
1860  const std::unique_ptr<util::concurrent::IdleStrategy> CallIdSequenceWithBackpressure::IDLER(
1861  new util::concurrent::BackoffIdleStrategy(
1862  0, 0, std::chrono::duration_cast<std::chrono::nanoseconds>(
1863  std::chrono::microseconds(1000)).count(),
1864  std::chrono::duration_cast<std::chrono::nanoseconds>(
1865  std::chrono::microseconds(MAX_DELAY_MS * 1000)).count()));
1866 
1867  CallIdSequenceWithBackpressure::CallIdSequenceWithBackpressure(int32_t max_concurrent_invocations,
1868  int64_t backoff_timeout_ms)
1869  : AbstractCallIdSequence(max_concurrent_invocations) {
1870  std::ostringstream out;
1871  out << "backoffTimeoutMs should be a positive number. backoffTimeoutMs=" << backoff_timeout_ms;
1872  util::Preconditions::check_positive(backoff_timeout_ms, out.str());
1873 
1874  backoff_timeout_nanos_ = std::chrono::duration_cast<std::chrono::nanoseconds>(
1875  std::chrono::milliseconds(backoff_timeout_ms)).count();
1876  }
1877 
1878  void CallIdSequenceWithBackpressure::handle_no_space_left() {
1879  auto start = std::chrono::steady_clock::now();
1880  for (int64_t idleCount = 0;; idleCount++) {
1881  int64_t elapsedNanos = std::chrono::duration_cast<std::chrono::nanoseconds>(
1882  std::chrono::steady_clock::now() - start).count();
1883  if (elapsedNanos > backoff_timeout_nanos_) {
1884  throw (exception::exception_builder<exception::hazelcast_overload>(
1885  "CallIdSequenceWithBackpressure::handleNoSpaceLeft")
1886  << "Timed out trying to acquire another call ID."
1887  << " maxConcurrentInvocations = " << get_max_concurrent_invocations()
1888  << ", backoffTimeout = "
1889  << std::chrono::microseconds(backoff_timeout_nanos_ / 1000).count()
1890  << " msecs, elapsed:"
1891  << std::chrono::microseconds(elapsedNanos / 1000).count() << " msecs").build();
1892  }
1893  IDLER->idle(idleCount);
1894  if (has_space()) {
1895  return;
1896  }
1897 
1898  }
1899  }
1900 
1901  FailFastCallIdSequence::FailFastCallIdSequence(int32_t max_concurrent_invocations)
1902  : AbstractCallIdSequence(max_concurrent_invocations) {}
1903 
1904  void FailFastCallIdSequence::handle_no_space_left() {
1905  throw (exception::exception_builder<exception::hazelcast_overload>(
1906  "FailFastCallIdSequence::handleNoSpaceLeft")
1907  << "Maximum invocation count is reached. maxConcurrentInvocations = "
1908  << get_max_concurrent_invocations()).build();
1909 
1910  }
1911 
1912  std::unique_ptr<CallIdSequence> CallIdFactory::new_call_id_sequence(bool is_back_pressure_enabled,
1913  int32_t max_allowed_concurrent_invocations,
1914  int64_t backoff_timeout_ms) {
1915  if (!is_back_pressure_enabled) {
1916  return std::unique_ptr<CallIdSequence>(new CallIdSequenceWithoutBackpressure());
1917  } else if (backoff_timeout_ms <= 0) {
1918  return std::unique_ptr<CallIdSequence>(
1919  new FailFastCallIdSequence(max_allowed_concurrent_invocations));
1920  } else {
1921  return std::unique_ptr<CallIdSequence>(
1922  new CallIdSequenceWithBackpressure(max_allowed_concurrent_invocations,
1923  backoff_timeout_ms));
1924  }
1925  }
1926  }
1927 
1928  namespace listener {
1929  listener_service_impl::listener_service_impl(ClientContext &client_context,
1930  int32_t event_thread_count)
1931  : client_context_(client_context),
1932  serialization_service_(client_context.get_serialization_service()),
1933  logger_(client_context.get_logger()),
1934  client_connection_manager_(client_context.get_connection_manager()),
1935  number_of_event_threads_(event_thread_count),
1936  smart_(client_context.get_client_config().get_network_config().is_smart_routing()) {
1937  auto &invocationService = client_context.get_invocation_service();
1938  invocation_timeout_ = invocationService.get_invocation_timeout();
1939  invocation_retry_pause_ = invocationService.get_invocation_retry_pause();
1940  }
1941 
1942  bool listener_service_impl::registers_local_only() const {
1943  return smart_;
1944  }
1945 
1946  boost::future<boost::uuids::uuid>
1947  listener_service_impl::register_listener(
1948  std::shared_ptr<ListenerMessageCodec> listener_message_codec,
1949  std::shared_ptr<client::impl::BaseEventHandler> handler) {
1950  auto task = boost::packaged_task<boost::uuids::uuid()>([=]() {
1951  return register_listener_internal(listener_message_codec, handler);
1952  });
1953  auto f = task.get_future();
1954  boost::asio::post(registration_executor_->get_executor(), std::move(task));
1955  return f;
1956  }
1957 
1958  boost::future<bool> listener_service_impl::deregister_listener(boost::uuids::uuid registration_id) {
1959  util::Preconditions::check_not_nill(registration_id, "Nil userRegistrationId is not allowed!");
1960 
1961  boost::packaged_task<bool()> task([=]() {
1962  return deregister_listener_internal(registration_id);
1963  });
1964  auto f = task.get_future();
1965  boost::asio::post(registration_executor_->get_executor(), std::move(task));
1966  return f;
1967  }
1968 
1969  void listener_service_impl::connection_added(
1970  const std::shared_ptr<connection::Connection> connection) {
1971  boost::asio::post(registration_executor_->get_executor(), [=]() { connection_added_internal(connection); });
1972  }
1973 
1974  void listener_service_impl::connection_removed(
1975  const std::shared_ptr<connection::Connection> connection) {
1976  boost::asio::post(registration_executor_->get_executor(), [=]() { connection_removed_internal(connection); });
1977  }
1978 
1979  void
1980  listener_service_impl::remove_event_handler(int64_t call_id,
1981  const std::shared_ptr<connection::Connection> &connection) {
1982  boost::asio::post(connection->get_socket().get_executor(),
1983  std::packaged_task<void()>([=]() {
1984  connection->deregister_invocation(call_id);
1985  }));
1986  }
1987 
1988  void listener_service_impl::handle_client_message(
1989  const std::shared_ptr<ClientInvocation> invocation,
1990  const std::shared_ptr<protocol::ClientMessage> response) {
1991  try {
1992  auto partitionId = response->get_partition_id();
1993  if (partitionId == -1) {
1994  // execute on random thread on the thread pool
1995  boost::asio::post(event_executor_->get_executor(), [=]() { process_event_message(invocation, response); });
1996  return;
1997  }
1998 
1999  // process on certain thread which is same for the partition id
2000  boost::asio::post(event_strands_[partitionId % event_strands_.size()],
2001  [=]() { process_event_message(invocation, response); });
2002 
2003  } catch (const std::exception &e) {
2004  if (client_context_.get_lifecycle_service().is_running()) {
2005  HZ_LOG(logger_, warning,
2006  boost::str(boost::format("Delivery of event message to event handler failed. %1%, %2%, %3%")
2007  % e.what() % *response % *invocation)
2008  );
2009  }
2010  }
2011  }
2012 
2013  void listener_service_impl::shutdown() {
2014  ClientExecutionServiceImpl::shutdown_thread_pool(event_executor_.get());
2015  ClientExecutionServiceImpl::shutdown_thread_pool(registration_executor_.get());
2016  }
2017 
2018  void listener_service_impl::start() {
2019  event_executor_.reset(new hazelcast::util::hz_thread_pool(number_of_event_threads_));
2020  registration_executor_.reset(new hazelcast::util::hz_thread_pool(1));
2021 
2022  for (int i = 0; i < number_of_event_threads_; ++i) {
2023  event_strands_.emplace_back(event_executor_->get_executor());
2024  }
2025 
2026  client_connection_manager_.add_connection_listener(shared_from_this());
2027  }
2028 
2029  boost::uuids::uuid listener_service_impl::register_listener_internal(
2030  std::shared_ptr<ListenerMessageCodec> listener_message_codec,
2031  std::shared_ptr<client::impl::BaseEventHandler> handler) {
2032  auto user_registration_id = client_context_.random_uuid();
2033 
2034  std::shared_ptr<listener_registration> registration(new listener_registration{listener_message_codec, handler});
2035  registrations_.put(user_registration_id, registration);
2036  for (auto const &connection : client_connection_manager_.get_active_connections()) {
2037  try {
2038  invoke(registration, connection);
2039  } catch (exception::iexception &e) {
2040  if (connection->is_alive()) {
2041  deregister_listener_internal(user_registration_id);
2042  BOOST_THROW_EXCEPTION((exception::exception_builder<exception::hazelcast_>(
2043  "ClientListenerService::RegisterListenerTask::call")
2044  << "Listener can not be added " << e).build());
2045  }
2046  }
2047  }
2048  return user_registration_id;
2049  }
2050 
2051  bool
2052  listener_service_impl::deregister_listener_internal(boost::uuids::uuid user_registration_id) {
2053  auto listenerRegistration = registrations_.get(user_registration_id);
2054  if (!listenerRegistration) {
2055  return false;
2056  }
2057  bool successful = true;
2058 
2059  auto listener_registrations = listenerRegistration->registrations.entry_set();
2060  for (auto it = listener_registrations.begin();it != listener_registrations.end();) {
2061  const auto &registration = it->second;
2062  const auto& subscriber = it->first;
2063  try {
2064  const auto &listenerMessageCodec = listenerRegistration->codec;
2065  auto serverRegistrationId = registration->server_registration_id;
2066  auto request = listenerMessageCodec->encode_remove_request(serverRegistrationId);
2067  auto invocation = ClientInvocation::create(client_context_,request, "",
2068  subscriber);
2069  invocation->invoke().get();
2070 
2071  remove_event_handler(registration->call_id, subscriber);
2072 
2073  it = listener_registrations.erase(it);
2074  } catch (exception::iexception &e) {
2075  ++it;
2076  if (subscriber->is_alive()) {
2077  successful = false;
2078  std::ostringstream endpoint;
2079  if (subscriber->get_remote_address()) {
2080  endpoint << *subscriber->get_remote_address();
2081  } else {
2082  endpoint << "null";
2083  }
2084  HZ_LOG(logger_, warning,
2085  boost::str(boost::format("ClientListenerService::deregisterListenerInternal "
2086  "Deregistration of listener with ID %1% "
2087  "has failed to address %2% %3%")
2088  % user_registration_id
2089  % subscriber->get_remote_address() % e)
2090  );
2091  }
2092  }
2093  }
2094  if (successful) {
2095  registrations_.remove(user_registration_id);
2096  }
2097  return successful;
2098  }
2099 
2100  void listener_service_impl::connection_added_internal(
2101  const std::shared_ptr<connection::Connection> &connection) {
2102  for (const auto &listener_registration : registrations_.values()) {
2103  invoke_from_internal_thread(listener_registration, connection);
2104  }
2105  }
2106 
2107  void listener_service_impl::connection_removed_internal(
2108  const std::shared_ptr<connection::Connection> &connection) {
2109  for (auto &registry : registrations_.values()) {
2110  registry->registrations.remove(connection);
2111  }
2112  }
2113 
2114  void
2115  listener_service_impl::invoke_from_internal_thread(
2116  const std::shared_ptr<listener_registration> &listener_registration,
2117  const std::shared_ptr<connection::Connection> &connection) {
2118  try {
2119  invoke(listener_registration, connection);
2120  } catch (exception::iexception &e) {
2121  HZ_LOG(logger_, warning,
2122  boost::str(boost::format("Listener with pointer %1% can not be added to "
2123  "a new connection: %2%, reason: %3%")
2124  % listener_registration.get() % *connection % e)
2125  );
2126  }
2127  }
2128 
2129  void
2130  listener_service_impl::invoke(const std::shared_ptr<listener_registration> &listener_registration,
2131  const std::shared_ptr<connection::Connection> &connection) {
2132  if (listener_registration->registrations.contains_key(connection)) {
2133  return;
2134  }
2135 
2136  const auto &codec = listener_registration->codec;
2137  auto request = codec->encode_add_request(registers_local_only());
2138  const auto &handler = listener_registration->handler;
2139  handler->before_listener_register();
2140 
2141  auto invocation = ClientInvocation::create(client_context_,
2142  std::make_shared<protocol::ClientMessage>(std::move(request)), "",
2143  connection);
2144  invocation->set_event_handler(handler);
2145  auto clientMessage = invocation->invoke_urgent().get();
2146 
2147  auto serverRegistrationId = codec->decode_add_response(clientMessage);
2148  handler->on_listener_register();
2149  int64_t correlationId = invocation->get_client_message()->get_correlation_id();
2150 
2151  (*listener_registration).registrations.put(connection, std::shared_ptr<connection_registration>(
2152  new connection_registration{serverRegistrationId, correlationId}));
2153  }
2154 
2155  void listener_service_impl::process_event_message(
2156  const std::shared_ptr<ClientInvocation> invocation,
2157  const std::shared_ptr<protocol::ClientMessage> response) {
2158  auto eventHandler = invocation->get_event_handler();
2159  if (!eventHandler) {
2160  if (client_context_.get_lifecycle_service().is_running()) {
2161  HZ_LOG(logger_, warning,
2162  boost::str(boost::format("No eventHandler for invocation. "
2163  "Ignoring this invocation response. %1%")
2164  % *invocation)
2165  );
2166  }
2167 
2168  return;
2169  }
2170 
2171  try {
2172  eventHandler->handle(*response);
2173  } catch (std::exception &e) {
2174  if (client_context_.get_lifecycle_service().is_running()) {
2175  HZ_LOG(logger_, warning,
2176  boost::str(boost::format("Delivery of event message to event handler failed. %1%, %2%, %3%")
2177  % e.what() % *response % *invocation)
2178  );
2179  }
2180  }
2181  }
2182 
2183  listener_service_impl::~listener_service_impl() = default;
2184 
2185  void cluster_view_listener::start() {
2186  client_context_.get_connection_manager().add_connection_listener(shared_from_this());
2187  }
2188 
2189  void cluster_view_listener::connection_added(const std::shared_ptr<connection::Connection> connection) {
2190  try_register(connection);
2191  }
2192 
2193  void cluster_view_listener::connection_removed(const std::shared_ptr<connection::Connection> connection) {
2194  try_reregister_to_random_connection(connection->get_connection_id());
2195  }
2196 
2197  cluster_view_listener::cluster_view_listener(ClientContext &client_context) : client_context_(
2198  client_context) {}
2199 
2200  void cluster_view_listener::try_register(std::shared_ptr<connection::Connection> connection) {
2201  int32_t expected_id = -1;
2202  if (!listener_added_connection_id_.compare_exchange_strong(expected_id,
2203  connection->get_connection_id())) {
2204  // already registering/registered to another connection
2205  return;
2206  }
2207 
2208  auto invocation = ClientInvocation::create(client_context_,
2209  std::make_shared<protocol::ClientMessage>(
2210  protocol::codec::client_addclusterviewlistener_encode()), "", connection);
2211 
2212  auto handler = std::make_shared<event_handler>(connection->get_connection_id(), *this);
2213  invocation->set_event_handler(handler);
2214  handler->before_listener_register();
2215 
2216  std::weak_ptr<cluster_view_listener> weak_self = shared_from_this();
2217  auto conn_id = connection->get_connection_id();
2218 
2219  invocation->invoke_urgent().then(
2220  [weak_self, handler, conn_id](boost::future<protocol::ClientMessage> f) {
2221  auto self = weak_self.lock();
2222  if (!self)
2223  return;
2224 
2225  if (f.has_value()) {
2226  handler->on_listener_register();
2227  return;
2228  }
2229 
2230  //completes with exception, listener needs to be reregistered
2231  self->try_reregister_to_random_connection(conn_id);
2232  });
2233 
2234  }
2235 
2236  void cluster_view_listener::try_reregister_to_random_connection(int32_t old_connection_id) {
2237  if (!listener_added_connection_id_.compare_exchange_strong(old_connection_id, -1)) {
2238  //somebody else already trying to reregister
2239  return;
2240  }
2241  auto new_connection = client_context_.get_connection_manager().get_random_connection();
2242  if (new_connection) {
2243  try_register(new_connection);
2244  }
2245  }
2246 
2247  cluster_view_listener::~cluster_view_listener() = default;
2248 
2249  void
2250  cluster_view_listener::event_handler::handle_membersview(int32_t version,
2251  const std::vector<member> &member_infos) {
2252  view_listener.client_context_.get_client_cluster_service().handle_event(version, member_infos);
2253  }
2254 
2255  void
2256  cluster_view_listener::event_handler::handle_partitionsview(int32_t version,
2257  const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
2258  view_listener.client_context_.get_partition_service().handle_event(connection_id, version, partitions);
2259  }
2260 
2261  void cluster_view_listener::event_handler::before_listener_register() {
2262  view_listener.client_context_.get_client_cluster_service().clear_member_list_version();
2263  auto &lg = view_listener.client_context_.get_logger();
2264  HZ_LOG(lg, finest,
2265  boost::str(boost::format(
2266  "Register attempt of cluster_view_listener::event_handler to connection with id %1%") %
2267  connection_id));
2268  }
2269 
2270  void cluster_view_listener::event_handler::on_listener_register() {
2271  auto &lg = view_listener.client_context_.get_logger();
2272  HZ_LOG(lg, finest,
2273  boost::str(boost::format(
2274  "Registered cluster_view_listener::event_handler to connection with id %1%") %
2275  connection_id));
2276  }
2277 
2278  cluster_view_listener::event_handler::event_handler(int connectionId,
2279  cluster_view_listener &viewListener)
2280  : connection_id(connectionId), view_listener(viewListener) {}
2281  }
2282 
2283  protocol::ClientMessage
2284  ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_add_request(bool local_only) const {
2285  return protocol::codec::client_localbackuplistener_encode();
2286  }
2287 
2288  protocol::ClientMessage ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_remove_request(
2289  boost::uuids::uuid real_registration_id) const {
2290  assert(0);
2291  return protocol::ClientMessage(0);
2292  }
2293 
2294  void ClientInvocationServiceImpl::noop_backup_event_handler::handle_backup(
2295  int64_t source_invocation_correlation_id) {
2296  assert(0);
2297  }
2298 
2299  namespace discovery {
2300  remote_address_provider::remote_address_provider(
2301  std::function<std::unordered_map<address, address>()> addr_map_method,
2302  bool use_public) : refresh_address_map_(std::move(addr_map_method)),
2303  use_public_(use_public) {}
2304 
2305  std::vector<address> remote_address_provider::load_addresses() {
2306  auto address_map = refresh_address_map_();
2307  std::lock_guard<std::mutex> guard(lock_);
2308  private_to_public_ = address_map;
2309  std::vector<address> addresses;
2310  addresses.reserve(address_map.size());
2311  for (const auto &addr_pair : address_map) {
2312  addresses.push_back(addr_pair.first);
2313  }
2314  return addresses;
2315  }
2316 
2317  boost::optional<address> remote_address_provider::translate(const address &addr) {
2318  // if it is inside cloud, return private address otherwise we need to translate it.
2319  if (!use_public_) {
2320  return addr;
2321  }
2322 
2323  {
2324  std::lock_guard<std::mutex> guard(lock_);
2325  auto found = private_to_public_.find(addr);
2326  if (found != private_to_public_.end()) {
2327  return found->second;
2328  }
2329  }
2330 
2331  auto address_map = refresh_address_map_();
2332 
2333  std::lock_guard<std::mutex> guard(lock_);
2334  private_to_public_ = address_map;
2335 
2336  auto found = private_to_public_.find(addr);
2337  if (found != private_to_public_.end()) {
2338  return found->second;
2339  }
2340 
2341  return boost::none;
2342  }
2343 
2344 #ifdef HZ_BUILD_WITH_SSL
2345  cloud_discovery::cloud_discovery(config::cloud_config &config, std::string cloud_base_url,
2346  std::chrono::steady_clock::duration timeout)
2347  : cloud_config_(config), cloud_base_url_(cloud_base_url), timeout_(timeout) {}
2348 #else
2349  cloud_discovery::cloud_discovery(config::cloud_config &config, std::string cloud_base_url,
2350  std::chrono::steady_clock::duration timeout) {}
2351 #endif // HZ_BUILD_WITH_SSL
2352 
2353  std::unordered_map<address, address> cloud_discovery::get_addresses() {
2354 #ifdef HZ_BUILD_WITH_SSL
2355  try {
2356  util::SyncHttpsClient httpsConnection(cloud_base_url_, std::string(CLOUD_URL_PATH) +
2357  cloud_config_.discovery_token, timeout_);
2358  auto &conn_stream = httpsConnection.connect_and_get_response();
2359  return parse_json_response(conn_stream);
2360  } catch (std::exception &e) {
2361  std::throw_with_nested(boost::enable_current_exception(
2362  exception::illegal_state("cloud_discovery::get_addresses",
2363  e.what())));
2364  }
2365 #else
2366  util::Preconditions::check_ssl("cloud_discovery::get_addresses");
2367  return std::unordered_map<address, address>();
2368 #endif
2369  }
2370 
2371  std::unordered_map<address, address>
2372  cloud_discovery::parse_json_response(std::istream &conn_stream) {
2373  namespace pt = boost::property_tree;
2374 
2375  pt::ptree root;
2376  pt::read_json(conn_stream, root);
2377 
2378  std::unordered_map<address, address> addresses;
2379  for (const auto &item : root) {
2380  auto private_address = item.second.get<std::string>(PRIVATE_ADDRESS_PROPERTY);
2381  auto public_address = item.second.get<std::string>(PUBLIC_ADDRESS_PROPERTY);
2382 
2383  address public_addr = create_address(public_address, -1);
2384  //if it is not explicitly given, create the private address with public addresses port
2385  auto private_addr = create_address(private_address, public_addr.get_port());
2386  addresses.emplace(std::move(private_addr), std::move(public_addr));
2387  }
2388 
2389  return addresses;
2390  }
2391 
2392  address cloud_discovery::create_address(const std::string &hostname, int default_port) {
2393  auto address_holder = util::AddressUtil::get_address_holder(hostname, default_port);
2394  auto scoped_hostname = util::AddressHelper::get_scoped_hostname(address_holder);
2395  return address(std::move(scoped_hostname), address_holder.get_port());
2396  }
2397  }
2398  }
2399  }
2400  }
2401 }
2402 
2403 namespace std {
2404  bool less<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
2405  const hazelcast::client::spi::DefaultObjectNamespace &lhs,
2406  const hazelcast::client::spi::DefaultObjectNamespace &rhs) const {
2407  int result = lhs.get_service_name().compare(rhs.get_service_name());
2408  if (result < 0) {
2409  return true;
2410  }
2411 
2412  if (result > 0) {
2413  return false;
2414  }
2415 
2416  return lhs.get_object_name().compare(rhs.get_object_name()) < 0;
2417  }
2418 
2419  std::size_t
2420  hash<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
2421  const hazelcast::client::spi::DefaultObjectNamespace &k) const noexcept {
2422  return std::hash<std::string>()(k.get_service_name() + k.get_object_name());
2423  }
2424 }
2425 
2426 
Hazelcast cluster interface.
Definition: cluster.h:36
const std::unordered_set< member > & get_members() const
Returns an immutable set of ordered members at the moment this MembershipListener is registered.
Definition: spi.cpp:60
cluster & get_cluster()
Returns the cluster of the event.
Definition: spi.cpp:64
lifecycle_state get_state() const
Definition: spi.cpp:76
lifecycle_event(lifecycle_state state)
Constructor.
Definition: spi.cpp:72