Hazelcast C++ Client
Hazelcast C++ Client Library
spi.cpp
1 /*
2  * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <utility>
18 
19 #include <boost/uuid/uuid_hash.hpp>
20 #include <boost/functional/hash.hpp>
21 #include <boost/property_tree/ptree.hpp>
22 #include <boost/property_tree/json_parser.hpp>
23 
24 #include "hazelcast/client/hazelcast_client.h"
25 #include <hazelcast/client/protocol/codec/ErrorCodec.h>
26 #include <hazelcast/client/spi/impl/ListenerMessageCodec.h>
27 #include <hazelcast/client/spi/impl/ClientClusterServiceImpl.h>
28 #include <hazelcast/client/spi/impl/listener/cluster_view_listener.h>
29 #include <hazelcast/client/spi/impl/listener/listener_service_impl.h>
30 #include <hazelcast/client/spi/impl/discovery/remote_address_provider.h>
31 #include <hazelcast/client/spi/impl/discovery/cloud_discovery.h>
32 #include <hazelcast/util/AddressUtil.h>
33 #include "hazelcast/client/member_selectors.h"
34 #include "hazelcast/client/lifecycle_event.h"
35 #include "hazelcast/client/initial_membership_event.h"
36 #include "hazelcast/client/membership_event.h"
37 #include "hazelcast/client/lifecycle_listener.h"
38 #include "hazelcast/client/spi/ProxyManager.h"
39 #include "hazelcast/client/spi/ClientProxy.h"
40 #include "hazelcast/client/spi/ClientContext.h"
41 #include "hazelcast/client/spi/impl/ClientInvocation.h"
42 #include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
43 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
44 #include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
45 #include "hazelcast/client/spi/impl/DefaultAddressProvider.h"
46 #include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithBackpressure.h"
47 #include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithoutBackpressure.h"
48 #include "hazelcast/client/spi/impl/sequence/FailFastCallIdSequence.h"
49 #include "hazelcast/client/spi/impl/sequence/CallIdFactory.h"
50 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
51 #include "hazelcast/util/AddressHelper.h"
52 #include "hazelcast/util/HashUtil.h"
53 #include "hazelcast/util/concurrent/BackoffIdleStrategy.h"
54 #ifdef HZ_BUILD_WITH_SSL
55 #include <hazelcast/util/SyncHttpsClient.h>
56 #endif // HZ_BUILD_WITH_SSL
57 
58 namespace hazelcast {
59 namespace client {
60 const std::unordered_set<member>&
62 {
63  return members_;
64 }
65 
66 cluster&
68 {
69  return cluster_;
70 }
71 
72 initial_membership_event::initial_membership_event(
74  std::unordered_set<member> members)
75  : cluster_(cluster)
76  , members_(std::move(members))
77 {}
78 
80  : state_(state)
81 {}
82 
85 {
86  return state_;
87 }
88 
89 namespace spi {
90 ProxyManager::ProxyManager(ClientContext& context)
91  : client_(context)
92 {}
93 
94 void
95 ProxyManager::init()
96 {}
97 
98 void
99 ProxyManager::destroy()
100 {
101  std::lock_guard<std::recursive_mutex> guard(lock_);
102  for (auto& p : proxies_) {
103  try {
104  auto proxy = p.second.get();
105  p.second.get()->on_shutdown();
106  } catch (std::exception& se) {
107  auto& lg = client_.get_logger();
108  HZ_LOG(
109  lg,
110  finest,
111  boost::str(boost::format(
112  "Proxy was not created, "
113  "hence onShutdown can be called. Exception: %1%") %
114  se.what()));
115  }
116  }
117  proxies_.clear();
118 }
119 
120 boost::future<void>
121 ProxyManager::initialize(const std::shared_ptr<ClientProxy>& client_proxy)
122 {
123  auto clientMessage = protocol::codec::client_createproxy_encode(
124  client_proxy->get_name(), client_proxy->get_service_name());
125  return spi::impl::ClientInvocation::create(
126  client_, clientMessage, client_proxy->get_service_name())
127  ->invoke()
128  .then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
129  f.get();
130  client_proxy->on_initialize();
131  });
132 }
133 
134 boost::future<void>
135 ProxyManager::destroy_proxy(ClientProxy& proxy)
136 {
137  DefaultObjectNamespace objectNamespace(proxy.get_service_name(),
138  proxy.get_name());
139  std::shared_ptr<ClientProxy> registeredProxy;
140  {
141  std::lock_guard<std::recursive_mutex> guard(lock_);
142  auto it = proxies_.find(objectNamespace);
143  registeredProxy = it == proxies_.end() ? nullptr : it->second.get();
144  if (it != proxies_.end()) {
145  proxies_.erase(it);
146  }
147  }
148 
149  try {
150  if (registeredProxy) {
151  try {
152  proxy.destroy_locally();
153  return proxy.destroy_remotely();
154  } catch (exception::iexception&) {
155  proxy.destroy_remotely();
156  throw;
157  }
158  }
159  if (&proxy != registeredProxy.get()) {
160  // The given proxy is stale and was already destroyed, but the
161  // caller may have allocated local resources in the context of this
162  // stale proxy instance after it was destroyed, so we have to
163  // cleanup it locally one more time to make sure there are no
164  // leaking local resources.
165  proxy.destroy_locally();
166  }
167  } catch (...) {
168  if (&proxy != registeredProxy.get()) {
169  // The given proxy is stale and was already destroyed, but the
170  // caller may have allocated local resources in the context of this
171  // stale proxy instance after it was destroyed, so we have to
172  // cleanup it locally one more time to make sure there are no
173  // leaking local resources.
174  proxy.destroy_locally();
175  }
176  throw;
177  }
178  return boost::make_ready_future();
179 }
180 
181 ClientContext::ClientContext(const client::hazelcast_client& hazelcast_client)
182  : hazelcast_client_(*hazelcast_client.client_impl_)
183 {}
184 
185 ClientContext::ClientContext(
186  client::impl::hazelcast_client_instance_impl& hazelcast_client)
187  : hazelcast_client_(hazelcast_client)
188 {}
189 
190 serialization::pimpl::SerializationService&
191 ClientContext::get_serialization_service()
192 {
193  return hazelcast_client_.serialization_service_;
194 }
195 
196 impl::ClientClusterServiceImpl&
197 ClientContext::get_client_cluster_service()
198 {
199  return hazelcast_client_.cluster_service_;
200 }
201 
202 impl::ClientInvocationServiceImpl&
203 ClientContext::get_invocation_service()
204 {
205  return *hazelcast_client_.invocation_service_;
206 }
207 
208 client_config&
209 ClientContext::get_client_config()
210 {
211  return hazelcast_client_.client_config_;
212 }
213 
214 impl::ClientPartitionServiceImpl&
215 ClientContext::get_partition_service()
216 {
217  return *hazelcast_client_.partition_service_;
218 }
219 
220 lifecycle_service&
221 ClientContext::get_lifecycle_service()
222 {
223  return hazelcast_client_.lifecycle_service_;
224 }
225 
226 spi::impl::listener::listener_service_impl&
227 ClientContext::get_client_listener_service()
228 {
229  return *hazelcast_client_.listener_service_;
230 }
231 
232 connection::ClientConnectionManagerImpl&
233 ClientContext::get_connection_manager()
234 {
235  return *hazelcast_client_.connection_manager_;
236 }
237 
238 internal::nearcache::NearCacheManager&
239 ClientContext::get_near_cache_manager()
240 {
241  return *hazelcast_client_.near_cache_manager_;
242 }
243 
244 client_properties&
245 ClientContext::get_client_properties()
246 {
247  return hazelcast_client_.client_properties_;
248 }
249 
250 cluster&
251 ClientContext::get_cluster()
252 {
253  return hazelcast_client_.cluster_;
254 }
255 
256 std::shared_ptr<impl::sequence::CallIdSequence>&
257 ClientContext::get_call_id_sequence() const
258 {
259  return hazelcast_client_.call_id_sequence_;
260 }
261 
262 const protocol::ClientExceptionFactory&
263 ClientContext::get_client_exception_factory() const
264 {
265  return hazelcast_client_.get_exception_factory();
266 }
267 
268 const std::string&
269 ClientContext::get_name() const
270 {
271  return hazelcast_client_.get_name();
272 }
273 
274 impl::ClientExecutionServiceImpl&
275 ClientContext::get_client_execution_service() const
276 {
277  return *hazelcast_client_.execution_service_;
278 }
279 
280 const std::shared_ptr<client::impl::ClientLockReferenceIdGenerator>&
281 ClientContext::get_lock_reference_id_generator()
282 {
283  return hazelcast_client_.get_lock_reference_id_generator();
284 }
285 
286 std::shared_ptr<client::impl::hazelcast_client_instance_impl>
287 ClientContext::get_hazelcast_client_implementation()
288 {
289  return hazelcast_client_.shared_from_this();
290 }
291 
292 spi::ProxyManager&
293 ClientContext::get_proxy_manager()
294 {
295  return hazelcast_client_.get_proxy_manager();
296 }
297 
298 logger&
299 ClientContext::get_logger()
300 {
301  return *hazelcast_client_.logger_;
302 }
303 
304 client::impl::statistics::Statistics&
305 ClientContext::get_clientstatistics()
306 {
307  return *hazelcast_client_.statistics_;
308 }
309 
310 spi::impl::listener::cluster_view_listener&
311 ClientContext::get_cluster_view_listener()
312 {
313  return *hazelcast_client_.cluster_listener_;
314 }
315 
316 boost::uuids::uuid
317 ClientContext::random_uuid()
318 {
319  return hazelcast_client_.random_uuid();
320 }
321 
322 cp::internal::session::proxy_session_manager&
323 ClientContext::get_proxy_session_manager()
324 {
325  return hazelcast_client_.proxy_session_manager_;
326 }
327 
328 lifecycle_service::lifecycle_service(
329  ClientContext& client_context,
330  const std::vector<lifecycle_listener>& listeners)
331  : client_context_(client_context)
332  , listeners_()
333  , shutdown_completed_latch_(1)
334 {
335  for (const auto& listener : listeners) {
336  add_listener(lifecycle_listener(listener));
337  }
338 }
339 
340 bool
341 lifecycle_service::start()
342 {
343  bool expected = false;
344  if (!active_.compare_exchange_strong(expected, true)) {
345  return false;
346  }
347 
348  fire_lifecycle_event(lifecycle_event::STARTED);
349 
350  client_context_.get_client_execution_service().start();
351 
352  client_context_.get_client_listener_service().start();
353 
354  client_context_.get_invocation_service().start();
355 
356  client_context_.get_client_cluster_service().start();
357 
358  client_context_.get_cluster_view_listener().start();
359 
360  if (!client_context_.get_connection_manager().start()) {
361  return false;
362  }
363 
364  auto& connectionStrategyConfig =
365  client_context_.get_client_config().get_connection_strategy_config();
366  if (!connectionStrategyConfig.is_async_start()) {
367  // The client needs to open connections to all members before any
368  // services requiring internal listeners start
369  wait_for_initial_membership_event();
370  client_context_.get_connection_manager()
371  .connect_to_all_cluster_members();
372  }
373 
374  client_context_.get_invocation_service().add_backup_listener();
375 
376  client_context_.get_clientstatistics().start();
377 
378  return true;
379 }
380 
381 void
382 lifecycle_service::shutdown()
383 {
384  bool expected = true;
385  if (!active_.compare_exchange_strong(expected, false)) {
386  shutdown_completed_latch_.wait();
387  return;
388  }
389  try {
390  fire_lifecycle_event(lifecycle_event::SHUTTING_DOWN);
391  client_context_.get_proxy_session_manager().shutdown();
392  client_context_.get_clientstatistics().shutdown();
393  client_context_.get_proxy_manager().destroy();
394  client_context_.get_connection_manager().shutdown();
395  client_context_.get_client_cluster_service().shutdown();
396  client_context_.get_invocation_service().shutdown();
397  client_context_.get_client_listener_service().shutdown();
398  client_context_.get_near_cache_manager().destroy_all_near_caches();
399  fire_lifecycle_event(lifecycle_event::SHUTDOWN);
400  client_context_.get_client_execution_service().shutdown();
401  client_context_.get_serialization_service().dispose();
402  shutdown_completed_latch_.count_down();
403  } catch (std::exception& e) {
404  HZ_LOG(
405  client_context_.get_logger(),
406  info,
407  boost::str(
408  boost::format(
409  "An exception occured during LifecycleService shutdown. %1%") %
410  e.what()));
411  shutdown_completed_latch_.count_down();
412  }
413 }
414 
415 boost::uuids::uuid
416 lifecycle_service::add_listener(lifecycle_listener&& lifecycle_listener)
417 {
418  std::lock_guard<std::mutex> lg(listener_lock_);
419  const auto id = uuid_generator_();
420  listeners_.emplace(id, std::move(lifecycle_listener));
421  return id;
422 }
423 
424 bool
425 lifecycle_service::remove_listener(const boost::uuids::uuid& registration_id)
426 {
427  std::lock_guard<std::mutex> guard(listener_lock_);
428  return listeners_.erase(registration_id) == 1;
429 }
430 
431 void
432 lifecycle_service::fire_lifecycle_event(const lifecycle_event& lifecycle_event)
433 {
434  std::lock_guard<std::mutex> guard(listener_lock_);
435  logger& lg = client_context_.get_logger();
436 
437  std::function<void(lifecycle_listener&)> fire_one;
438 
439  switch (lifecycle_event.get_state()) {
440  case lifecycle_event::STARTING: {
441  // convert the date string from "2016-04-20" to 20160420
442  std::string date(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_DATE));
443  util::git_date_to_hazelcast_log_date(date);
444  std::string commitId(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_ID));
445  commitId.erase(std::remove(commitId.begin(), commitId.end(), '"'),
446  commitId.end());
447 
448  HZ_LOG(lg,
449  info,
450  (boost::format("(%1%:%2%) LifecycleService::LifecycleEvent "
451  "Client (%3%) is STARTING") %
452  date % commitId %
453  client_context_.get_connection_manager().get_client_uuid())
454  .str());
455  char msg[100];
456  util::hz_snprintf(
457  msg,
458  100,
459  "(%s:%s) LifecycleService::LifecycleEvent STARTING",
460  date.c_str(),
461  commitId.c_str());
462  HZ_LOG(lg, info, msg);
463 
464  fire_one = [](lifecycle_listener& listener) {
465  listener.starting_();
466  };
467  break;
468  }
469  case lifecycle_event::STARTED: {
470  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent STARTED");
471 
472  fire_one = [](lifecycle_listener& listener) {
473  listener.started_();
474  };
475  break;
476  }
477  case lifecycle_event::SHUTTING_DOWN: {
478  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent SHUTTING_DOWN");
479 
480  fire_one = [](lifecycle_listener& listener) {
481  listener.shutting_down_();
482  };
483  break;
484  }
485  case lifecycle_event::SHUTDOWN: {
486  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent SHUTDOWN");
487 
488  fire_one = [](lifecycle_listener& listener) {
489  listener.shutdown_();
490  };
491  break;
492  }
493  case lifecycle_event::CLIENT_CONNECTED: {
494  HZ_LOG(
495  lg, info, "LifecycleService::LifecycleEvent CLIENT_CONNECTED");
496 
497  fire_one = [](lifecycle_listener& listener) {
498  listener.connected_();
499  };
500  break;
501  }
502  case lifecycle_event::CLIENT_DISCONNECTED: {
503  HZ_LOG(
504  lg, info, "LifecycleService::LifecycleEvent CLIENT_DISCONNECTED");
505 
506  fire_one = [](lifecycle_listener& listener) {
507  listener.disconnected_();
508  };
509  break;
510  }
511  }
512 
513  for (auto& item : listeners_) {
514  fire_one(item.second);
515  }
516 }
517 
518 bool
519 lifecycle_service::is_running()
520 {
521  return active_;
522 }
523 
524 lifecycle_service::~lifecycle_service()
525 {
526  if (active_) {
527  shutdown();
528  }
529 }
530 
531 void
532 lifecycle_service::wait_for_initial_membership_event() const
533 {
534  client_context_.get_client_cluster_service()
535  .wait_initial_member_list_fetched();
536 }
537 
538 DefaultObjectNamespace::DefaultObjectNamespace(const std::string& service,
539  const std::string& object)
540  : service_name_(service)
541  , object_name_(object)
542 {}
543 
544 const std::string&
545 DefaultObjectNamespace::get_service_name() const
546 {
547  return service_name_;
548 }
549 
550 const std::string&
551 DefaultObjectNamespace::get_object_name() const
552 {
553  return object_name_;
554 }
555 
556 bool
557 DefaultObjectNamespace::operator==(const DefaultObjectNamespace& rhs) const
558 {
559  return service_name_ == rhs.service_name_ &&
560  object_name_ == rhs.object_name_;
561 }
562 
563 ClientProxy::ClientProxy(const std::string& name,
564  const std::string& service_name,
565  ClientContext& context)
566  : name_(name)
567  , service_name_(service_name)
568  , context_(context)
569 {}
570 
571 ClientProxy::~ClientProxy() = default;
572 
573 const std::string&
574 ClientProxy::get_name() const
575 {
576  return name_;
577 }
578 
579 const std::string&
580 ClientProxy::get_service_name() const
581 {
582  return service_name_;
583 }
584 
585 ClientContext&
586 ClientProxy::get_context()
587 {
588  return context_;
589 }
590 
591 void
592 ClientProxy::on_destroy()
593 {}
594 
595 boost::future<void>
596 ClientProxy::destroy()
597 {
598  return get_context().get_proxy_manager().destroy_proxy(*this);
599 }
600 
601 void
602 ClientProxy::destroy_locally()
603 {
604  if (pre_destroy()) {
605  try {
606  on_destroy();
607  post_destroy();
608  } catch (exception::iexception&) {
609  post_destroy();
610  throw;
611  }
612  }
613 }
614 
615 bool
616 ClientProxy::pre_destroy()
617 {
618  return true;
619 }
620 
621 void
622 ClientProxy::post_destroy()
623 {}
624 
625 void
626 ClientProxy::on_initialize()
627 {}
628 
629 void
630 ClientProxy::on_shutdown()
631 {}
632 
633 serialization::pimpl::SerializationService&
634 ClientProxy::get_serialization_service()
635 {
636  return context_.get_serialization_service();
637 }
638 
639 boost::future<void>
640 ClientProxy::destroy_remotely()
641 {
642  auto clientMessage = protocol::codec::client_destroyproxy_encode(
643  get_name(), get_service_name());
644  return spi::impl::ClientInvocation::create(
645  get_context(),
646  std::make_shared<protocol::ClientMessage>(
647  std::move(clientMessage)),
648  get_name())
649  ->invoke()
650  .then(boost::launch::sync,
651  [](boost::future<protocol::ClientMessage> f) { f.get(); });
652 }
653 
654 boost::future<boost::uuids::uuid>
655 ClientProxy::register_listener(
656  std::shared_ptr<impl::ListenerMessageCodec> listener_message_codec,
657  std::shared_ptr<client::impl::BaseEventHandler> handler)
658 {
659  handler->set_logger(&get_context().get_logger());
660  return get_context().get_client_listener_service().register_listener(
661  listener_message_codec, handler);
662 }
663 
664 boost::future<bool>
665 ClientProxy::deregister_listener(boost::uuids::uuid registration_id)
666 {
667  return get_context().get_client_listener_service().deregister_listener(
668  registration_id);
669 }
670 
671 namespace impl {
672 boost::uuids::uuid
673 ListenerMessageCodec::decode_add_response(protocol::ClientMessage& msg) const
674 {
675  return msg.get_first_uuid();
676 }
677 
678 bool
679 ListenerMessageCodec::decode_remove_response(protocol::ClientMessage& msg) const
680 {
681  return msg.get_first_fixed_sized_field<bool>();
682 }
683 
684 ClientInvocationServiceImpl::ClientInvocationServiceImpl(ClientContext& client)
685  : client_(client)
686  , logger_(client.get_logger())
687  , invocation_timeout_(
688  std::chrono::seconds(client.get_client_properties().get_integer(
689  client.get_client_properties().get_invocation_timeout_seconds())))
690  , invocation_retry_pause_(
691  std::chrono::milliseconds(client.get_client_properties().get_long(
692  client.get_client_properties().get_invocation_retry_pause_millis())))
693  , smart_routing_(
694  client.get_client_config().get_network_config().is_smart_routing())
695  , backup_acks_enabled_(smart_routing_ &&
696  client.get_client_config().backup_acks_enabled())
697  , fail_on_indeterminate_operation_state_(
698  client.get_client_properties().get_boolean(
699  client.get_client_properties().fail_on_indeterminate_state()))
700  , backup_timeout_(
701  std::chrono::milliseconds(client.get_client_properties().get_integer(
702  client.get_client_properties().backup_timeout_millis())))
703 {}
704 
705 void
706 ClientInvocationServiceImpl::start()
707 {}
708 
709 void
710 ClientInvocationServiceImpl::add_backup_listener()
711 {
712  if (this->backup_acks_enabled_) {
713  auto& listener_service = this->client_.get_client_listener_service();
714  listener_service
715  .register_listener(std::make_shared<BackupListenerMessageCodec>(),
716  std::make_shared<noop_backup_event_handler>())
717  .get();
718  }
719 }
720 
721 void
722 ClientInvocationServiceImpl::shutdown()
723 {
724  is_shutdown_.store(true);
725 }
726 
727 std::chrono::milliseconds
728 ClientInvocationServiceImpl::get_invocation_timeout() const
729 {
730  return invocation_timeout_;
731 }
732 
733 std::chrono::milliseconds
734 ClientInvocationServiceImpl::get_invocation_retry_pause() const
735 {
736  return invocation_retry_pause_;
737 }
738 
739 bool
740 ClientInvocationServiceImpl::is_redo_operation()
741 {
742  return client_.get_client_config().is_redo_operation();
743 }
744 
745 void
746 ClientInvocationServiceImpl::handle_client_message(
747  const std::shared_ptr<ClientInvocation>& invocation,
748  const std::shared_ptr<protocol::ClientMessage>& response)
749 {
750  try {
751  if (protocol::codec::ErrorCodec::EXCEPTION_MESSAGE_TYPE ==
752  response->get_message_type()) {
753  auto error_holder = protocol::codec::ErrorCodec::decode(*response);
754  invocation->notify_exception(
755  client_.get_client_exception_factory().create_exception(
756  error_holder));
757  } else {
758  invocation->notify(response);
759  }
760  } catch (std::exception& e) {
761  HZ_LOG(
762  logger_,
763  severe,
764  boost::str(boost::format("Failed to process response for %1%. %2%") %
765  *invocation % e.what()));
766  }
767 }
768 
769 bool
770 ClientInvocationServiceImpl::send(
771  const std::shared_ptr<impl::ClientInvocation>& invocation,
772  const std::shared_ptr<connection::Connection>& connection)
773 {
774  if (is_shutdown_) {
775  BOOST_THROW_EXCEPTION(exception::hazelcast_client_not_active(
776  "ClientInvocationServiceImpl::send", "Client is shut down"));
777  }
778 
779  if (backup_acks_enabled_) {
780  invocation->get_client_message()->add_flag(
781  protocol::ClientMessage::BACKUP_AWARE_FLAG);
782  }
783 
784  write_to_connection(*connection, invocation);
785  invocation->set_send_connection(connection);
786  return true;
787 }
788 
789 void
790 ClientInvocationServiceImpl::write_to_connection(
791  connection::Connection& connection,
792  const std::shared_ptr<ClientInvocation>& client_invocation)
793 {
794  auto clientMessage = client_invocation->get_client_message();
795  connection.write(client_invocation);
796 }
797 
798 void
799 ClientInvocationServiceImpl::check_invocation_allowed()
800 {
801  client_.get_connection_manager().check_invocation_allowed();
802 }
803 
804 bool
805 ClientInvocationServiceImpl::invoke(
806  std::shared_ptr<ClientInvocation> invocation)
807 {
808  auto connection = client_.get_connection_manager().get_random_connection();
809  if (!connection) {
810  HZ_LOG(logger_, finest, "No connection found to invoke");
811  return false;
812  }
813  return send(invocation, connection);
814 }
815 
816 DefaultAddressProvider::DefaultAddressProvider(
817  config::client_network_config& network_config)
818  : network_config_(network_config)
819 {}
820 
821 std::vector<address>
822 DefaultAddressProvider::load_addresses()
823 {
824  std::vector<address> addresses = network_config_.get_addresses();
825  if (addresses.empty()) {
826  addresses.emplace_back("127.0.0.1", 5701);
827  }
828 
829  // TODO Implement AddressHelper to add alternative ports for the same
830  // address
831 
832  return addresses;
833 }
834 
835 boost::optional<address>
836 DefaultAddressProvider::translate(const address& addr)
837 {
838  return addr;
839 }
840 
841 bool
842 DefaultAddressProvider::is_default_provider()
843 {
844  return true;
845 }
846 
847 const boost::shared_ptr<ClientClusterServiceImpl::member_list_snapshot>
848  ClientClusterServiceImpl::EMPTY_SNAPSHOT(
849  new ClientClusterServiceImpl::member_list_snapshot{ -1 });
850 
851 constexpr boost::chrono::milliseconds
852  ClientClusterServiceImpl::INITIAL_MEMBERS_TIMEOUT;
853 const endpoint_qualifier ClientClusterServiceImpl::CLIENT{ 1, "" };
854 const endpoint_qualifier ClientClusterServiceImpl::MEMBER{ 0, "" };
855 
856 ClientClusterServiceImpl::ClientClusterServiceImpl(
857  hazelcast::client::spi::ClientContext& client)
858  : client_(client)
859  , member_list_snapshot_(EMPTY_SNAPSHOT)
860  , labels_(client.get_client_config().get_labels())
861  , initial_list_fetched_latch_(1)
862 {}
863 
864 boost::uuids::uuid
865 ClientClusterServiceImpl::add_membership_listener_without_init(
866  membership_listener&& listener)
867 {
868  std::lock_guard<std::mutex> g(listeners_lock_);
869  auto id = client_.random_uuid();
870  listeners_.emplace(id, std::move(listener));
871  return id;
872 }
873 
874 boost::optional<member>
875 ClientClusterServiceImpl::get_member(boost::uuids::uuid uuid) const
876 {
877  assert(!uuid.is_nil());
878  auto members_view_ptr = member_list_snapshot_.load();
879  const auto it = members_view_ptr->members.find(uuid);
880  if (it == members_view_ptr->members.end()) {
881  return boost::none;
882  }
883  return { it->second };
884 }
885 
886 std::vector<member>
887 ClientClusterServiceImpl::get_member_list() const
888 {
889  auto members_view_ptr = member_list_snapshot_.load();
890  std::vector<member> result;
891  result.reserve(members_view_ptr->members.size());
892  for (const auto& e : members_view_ptr->members) {
893  result.emplace_back(e.second);
894  }
895  return result;
896 }
897 
898 void
899 ClientClusterServiceImpl::start()
900 {
901  for (auto& listener :
902  client_.get_client_config().get_membership_listeners()) {
903  add_membership_listener(membership_listener(listener));
904  }
905 }
906 
907 void
908 ClientClusterServiceImpl::fire_initial_membership_event(
909  const initial_membership_event& event)
910 {
911  std::lock_guard<std::mutex> g(listeners_lock_);
912 
913  for (auto& item : listeners_) {
914  membership_listener& listener = item.second;
915  if (listener.init_) {
916  listener.init_(event);
917  }
918  }
919 }
920 
921 void
922 ClientClusterServiceImpl::shutdown()
923 {
924  initial_list_fetched_latch_.try_count_down();
925 }
926 
927 boost::uuids::uuid
928 ClientClusterServiceImpl::add_membership_listener(
929  membership_listener&& listener)
930 {
931  std::lock_guard<std::mutex> cluster_view_g(cluster_view_lock_);
932 
933  auto id = add_membership_listener_without_init(std::move(listener));
934 
935  std::lock_guard<std::mutex> listeners_g(listeners_lock_);
936  auto added_listener = listeners_[id];
937 
938  if (added_listener.init_) {
939  auto& cluster = client_.get_cluster();
940  auto members_ptr = member_list_snapshot_.load();
941  if (!members_ptr->members.empty()) {
942  std::unordered_set<member> members;
943  for (const auto& e : members_ptr->members) {
944  members.insert(e.second);
945  }
946  added_listener.init_(initial_membership_event(cluster, members));
947  }
948  }
949 
950  return id;
951 }
952 
953 bool
954 ClientClusterServiceImpl::remove_membership_listener(
955  boost::uuids::uuid registration_id)
956 {
957  std::lock_guard<std::mutex> g(listeners_lock_);
958  return listeners_.erase(registration_id) == 1;
959 }
960 
961 std::vector<member>
962 ClientClusterServiceImpl::get_members(const member_selector& selector) const
963 {
964  std::vector<member> result;
965  for (auto&& member : get_member_list()) {
966  if (selector.select(member)) {
967  result.emplace_back(std::move(member));
968  }
969  }
970 
971  return result;
972 }
973 
974 local_endpoint
975 ClientClusterServiceImpl::get_local_client() const
976 {
977  connection::ClientConnectionManagerImpl& cm =
978  client_.get_connection_manager();
979  auto connection = cm.get_random_connection();
980  auto inetSocketAddress =
981  connection ? connection->get_local_socket_address() : boost::none;
982  auto uuid = cm.get_client_uuid();
983  return local_endpoint(
984  uuid, std::move(inetSocketAddress), client_.get_name(), labels_);
985 }
986 
987 void
988 ClientClusterServiceImpl::clear_member_list_version()
989 {
990  std::lock_guard<std::mutex> g(cluster_view_lock_);
991  auto& lg = client_.get_logger();
992  HZ_LOG(lg, finest, "Resetting the member list version ");
993  auto cluster_view_snapshot = member_list_snapshot_.load();
994  // This check is necessary so that `clear_member_list_version` when handling
995  // auth response will not intervene with client failover logic
996  if (cluster_view_snapshot != EMPTY_SNAPSHOT) {
997  member_list_snapshot_.store(boost::shared_ptr<member_list_snapshot>(
998  new member_list_snapshot{ 0, cluster_view_snapshot->members }));
999  }
1000 }
1001 
1002 std::vector<membership_event>
1003 ClientClusterServiceImpl::clear_member_list_and_return_events()
1004 {
1005  std::lock_guard<std::mutex> g(cluster_view_lock_);
1006 
1007  auto& lg = client_.get_logger();
1008  HZ_LOG(lg, finest, "Resetting the member list");
1009 
1010  auto previous_list = member_list_snapshot_.load()->members;
1011 
1012  member_list_snapshot_.store(
1013  boost::shared_ptr<member_list_snapshot>(new member_list_snapshot{ 0 }));
1014 
1015  return detect_membership_events(
1016  previous_list,
1017  std::unordered_map<boost::uuids::uuid,
1018  member,
1019  boost::hash<boost::uuids::uuid>>());
1020 }
1021 
1022 void
1023 ClientClusterServiceImpl::clear_member_list()
1024 {
1025  auto events = clear_member_list_and_return_events();
1026  fire_events(std::move(events));
1027 }
1028 
1029 void
1030 ClientClusterServiceImpl::handle_event(int32_t version,
1031  const std::vector<member>& member_infos)
1032 {
1033  auto& lg = client_.get_logger();
1034  HZ_LOG(
1035  lg,
1036  finest,
1037  boost::str(
1038  boost::format("Handling new snapshot with membership version: %1%, "
1039  "membersString %2%") %
1040  version % members_string(create_snapshot(version, member_infos))));
1041  auto cluster_view_snapshot = member_list_snapshot_.load();
1042  if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
1043  std::lock_guard<std::mutex> g(cluster_view_lock_);
1044  cluster_view_snapshot = member_list_snapshot_.load();
1045  if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
1046  // this means this is the first time client connected to cluster
1047  apply_initial_state(version, member_infos);
1048  initial_list_fetched_latch_.count_down();
1049  return;
1050  }
1051  }
1052 
1053  std::vector<membership_event> events;
1054  if (version >= cluster_view_snapshot->version) {
1055  std::lock_guard<std::mutex> g(cluster_view_lock_);
1056  cluster_view_snapshot = member_list_snapshot_.load();
1057  if (version >= cluster_view_snapshot->version) {
1058  auto prev_members = cluster_view_snapshot->members;
1059  auto snapshot = boost::make_shared<member_list_snapshot>(
1060  create_snapshot(version, member_infos));
1061  member_list_snapshot_.store(snapshot);
1062  events = detect_membership_events(prev_members, snapshot->members);
1063  }
1064  }
1065 
1066  fire_events(std::move(events));
1067 }
1068 
1069 ClientClusterServiceImpl::member_list_snapshot
1070 ClientClusterServiceImpl::create_snapshot(int32_t version,
1071  const std::vector<member>& members)
1072 {
1073  member_list_snapshot result;
1074  result.version = version;
1075  for (auto& m : members) {
1076  auto const& address_map = m.address_map();
1077  if (address_map.empty()) {
1078  result.members.insert({ m.get_uuid(), m });
1079  } else {
1080  auto found = address_map.find(CLIENT);
1081  address member_address;
1082  if (found != address_map.end()) {
1083  member_address = found->second;
1084  } else {
1085  found = address_map.find(MEMBER);
1086  assert(found != address_map.end());
1087  member_address = found->second;
1088  }
1089  member new_member(member_address,
1090  m.get_uuid(),
1091  m.is_lite_member(),
1092  m.get_attributes(),
1093  m.address_map(),
1094  m.get_version());
1095  result.members.emplace(new_member.get_uuid(),
1096  std::move(new_member));
1097  }
1098  }
1099 
1100  return result;
1101 }
1102 
1103 std::string
1104 ClientClusterServiceImpl::members_string(
1105  const ClientClusterServiceImpl::member_list_snapshot& snapshot)
1106 {
1107  std::stringstream out;
1108  auto const& members = snapshot.members;
1109  out << std::endl << std::endl << "Members [" << members.size() << "] {";
1110  for (auto const& e : members) {
1111  out << std::endl << "\t" << e.second;
1112  }
1113  out << std::endl << "}" << std::endl;
1114  return out.str();
1115 }
1116 
1117 void
1118 ClientClusterServiceImpl::apply_initial_state(
1119  int32_t version,
1120  const std::vector<member>& member_infos)
1121 {
1122  auto snapshot = boost::make_shared<member_list_snapshot>(
1123  create_snapshot(version, member_infos));
1124  member_list_snapshot_.store(snapshot);
1125  HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
1126  std::unordered_set<member> members;
1127  for (auto const& e : snapshot->members) {
1128  members.insert(e.second);
1129  }
1130  std::lock_guard<std::mutex> g(listeners_lock_);
1131  for (auto& item : listeners_) {
1132  membership_listener& listener = item.second;
1133  if (listener.init_) {
1134  listener.init_(
1135  initial_membership_event(client_.get_cluster(), members));
1136  }
1137  }
1138 }
1139 
1140 std::vector<membership_event>
1141 ClientClusterServiceImpl::detect_membership_events(
1142  std::unordered_map<boost::uuids::uuid,
1143  member,
1144  boost::hash<boost::uuids::uuid>> previous_members,
1145  const std::unordered_map<boost::uuids::uuid,
1146  member,
1147  boost::hash<boost::uuids::uuid>>& current_members)
1148 {
1149  std::vector<member> new_members;
1150 
1151  for (auto const& e : current_members) {
1152  if (!previous_members.erase(e.first)) {
1153  new_members.emplace_back(e.second);
1154  }
1155  }
1156 
1157  std::vector<membership_event> events;
1158 
1159  // removal events should be added before added events
1160  for (auto const& e : previous_members) {
1161  events.emplace_back(
1162  client_.get_cluster(),
1163  e.second,
1164  membership_event::membership_event_type::MEMBER_LEFT,
1165  current_members);
1166  auto connection =
1167  client_.get_connection_manager().get_connection(e.second.get_uuid());
1168  if (connection) {
1169  connection->close(
1170  "",
1171  std::make_exception_ptr(exception::target_disconnected(
1172  "ClientClusterServiceImpl::detect_membership_events",
1173  (boost::format(
1174  "The client has closed the connection to this member, after "
1175  "receiving a member left event from the cluster. %1%") %
1176  *connection)
1177  .str())));
1178  }
1179  }
1180  for (auto const& member : new_members) {
1181  events.emplace_back(
1182  client_.get_cluster(),
1183  member,
1184  membership_event::membership_event_type::MEMBER_JOINED,
1185  current_members);
1186  }
1187 
1188  if (!events.empty()) {
1189  auto snapshot = member_list_snapshot_.load();
1190  if (!snapshot->members.empty()) {
1191  HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
1192  }
1193  }
1194  return events;
1195 }
1196 
1197 void
1198 ClientClusterServiceImpl::fire_events(std::vector<membership_event> events)
1199 {
1200  std::lock_guard<std::mutex> g(listeners_lock_);
1201 
1202  for (auto const& event : events) {
1203  for (auto& item : listeners_) {
1204  membership_listener& listener = item.second;
1205  if (event.get_event_type() ==
1206  membership_event::membership_event_type::MEMBER_JOINED) {
1207  listener.joined_(event);
1208  } else {
1209  listener.left_(event);
1210  }
1211  }
1212  }
1213 }
1214 
1215 void
1216 ClientClusterServiceImpl::wait_initial_member_list_fetched() const
1217 {
1218  // safe to const cast here since latch operations are already thread safe
1219  // ops.
1220  if ((const_cast<boost::latch&>(initial_list_fetched_latch_))
1221  .wait_for(INITIAL_MEMBERS_TIMEOUT) == boost::cv_status::timeout) {
1222  BOOST_THROW_EXCEPTION(exception::illegal_state(
1223  "ClientClusterServiceImpl::wait_initial_member_list_fetched",
1224  "Could not get initial member list from cluster!"));
1225  }
1226 }
1227 
1228 bool
1229 ClientInvocationServiceImpl::invoke_on_connection(
1230  const std::shared_ptr<ClientInvocation>& invocation,
1231  const std::shared_ptr<connection::Connection>& connection)
1232 {
1233  return send(invocation, connection);
1234 }
1235 
1236 bool
1237 ClientInvocationServiceImpl::invoke_on_partition_owner(
1238  const std::shared_ptr<ClientInvocation>& invocation,
1239  int partition_id)
1240 {
1241  auto partition_owner =
1242  client_.get_partition_service().get_partition_owner(partition_id);
1243  if (partition_owner.is_nil()) {
1244  HZ_LOG(logger_,
1245  finest,
1246  boost::str(
1247  boost::format(
1248  "Partition owner is not assigned yet for partition %1%") %
1249  partition_id));
1250  return false;
1251  }
1252  return invoke_on_target(invocation, partition_owner);
1253 }
1254 
1255 bool
1256 ClientInvocationServiceImpl::invoke_on_target(
1257  const std::shared_ptr<ClientInvocation>& invocation,
1258  boost::uuids::uuid uuid)
1259 {
1260  assert(!uuid.is_nil());
1261  auto connection = client_.get_connection_manager().get_connection(uuid);
1262  if (!connection) {
1263  HZ_LOG(
1264  logger_,
1265  finest,
1266  boost::str(boost::format("Client is not connected to target : %1%") %
1267  uuid));
1268  return false;
1269  }
1270  return send(invocation, connection);
1271 }
1272 
1273 bool
1274 ClientInvocationServiceImpl::is_smart_routing() const
1275 {
1276  return smart_routing_;
1277 }
1278 
1279 const std::chrono::milliseconds&
1280 ClientInvocationServiceImpl::get_backup_timeout() const
1281 {
1282  return backup_timeout_;
1283 }
1284 
1285 bool
1286 ClientInvocationServiceImpl::fail_on_indeterminate_state() const
1287 {
1288  return fail_on_indeterminate_operation_state_;
1289 }
1290 
1291 ClientExecutionServiceImpl::ClientExecutionServiceImpl(
1292  const std::string& name,
1293  const client_properties& properties,
1294  int32_t pool_size,
1295  spi::lifecycle_service& service)
1296  : lifecycle_service_(service)
1297  , client_properties_(properties)
1298 {}
1299 
1300 void
1301 ClientExecutionServiceImpl::start()
1302 {
1303  int internalPoolSize = client_properties_.get_integer(
1304  client_properties_.get_internal_executor_pool_size());
1305  if (internalPoolSize <= 0) {
1306  internalPoolSize = util::IOUtil::to_value<int>(
1307  client_properties::INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT);
1308  }
1309 
1310  internal_executor_.reset(
1311  new hazelcast::util::hz_thread_pool(internalPoolSize));
1312 
1313  user_executor_.reset(new hazelcast::util::hz_thread_pool());
1314 }
1315 
1316 void
1317 ClientExecutionServiceImpl::shutdown()
1318 {
1319  shutdown_thread_pool(internal_executor_.get());
1320  shutdown_thread_pool(user_executor_.get());
1321 }
1322 
1323 util::hz_thread_pool&
1324 ClientExecutionServiceImpl::get_user_executor()
1325 {
1326  return *user_executor_;
1327 }
1328 
1329 void
1330 ClientExecutionServiceImpl::shutdown_thread_pool(
1331  hazelcast::util::hz_thread_pool* pool)
1332 {
1333  if (!pool) {
1334  return;
1335  }
1336  pool->close();
1337 }
1338 
1339 constexpr int ClientInvocation::MAX_FAST_INVOCATION_COUNT;
1340 constexpr int ClientInvocation::UNASSIGNED_PARTITION;
1341 
1342 ClientInvocation::ClientInvocation(
1343  spi::ClientContext& client_context,
1344  std::shared_ptr<protocol::ClientMessage>&& message,
1345  const std::string& name,
1346  int partition,
1347  const std::shared_ptr<connection::Connection>& conn,
1348  boost::uuids::uuid uuid)
1349  : logger_(client_context.get_logger())
1350  , lifecycle_service_(client_context.get_lifecycle_service())
1351  , invocation_service_(client_context.get_invocation_service())
1352  , execution_service_(
1353  client_context.get_client_execution_service().shared_from_this())
1354  , call_id_sequence_(client_context.get_call_id_sequence())
1355  , uuid_(uuid)
1356  , partition_id_(partition)
1357  , start_time_(std::chrono::steady_clock::now())
1358  , retry_pause_(invocation_service_.get_invocation_retry_pause())
1359  , object_name_(name)
1360  , connection_(conn)
1361  , bound_to_single_connection_(conn != nullptr)
1362  , invoke_count_(0)
1363  , urgent_(false)
1364  , smart_routing_(invocation_service_.is_smart_routing())
1365 {
1366  message->set_partition_id(partition_id_);
1367  client_message_ =
1368  boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(message);
1369  set_send_connection(nullptr);
1370 }
1371 
1372 ClientInvocation::~ClientInvocation() = default;
1373 
1374 boost::future<protocol::ClientMessage>
1375 ClientInvocation::invoke()
1376 {
1377  assert(client_message_.load());
1378  // for back pressure
1379  call_id_sequence_->next();
1380  invoke_on_selection();
1381  if (!lifecycle_service_.is_running()) {
1382  return invocation_promise_.get_future().then(
1383  [](boost::future<protocol::ClientMessage> f) { return f.get(); });
1384  }
1385  auto id_seq = call_id_sequence_;
1386  return invocation_promise_.get_future().then(
1387  execution_service_->get_user_executor(),
1388  [=](boost::future<protocol::ClientMessage> f) {
1389  id_seq->complete();
1390  return f.get();
1391  });
1392 }
1393 
1394 boost::future<protocol::ClientMessage>
1395 ClientInvocation::invoke_urgent()
1396 {
1397  assert(client_message_.load());
1398  urgent_ = true;
1399  // for back pressure
1400  call_id_sequence_->force_next();
1401  invoke_on_selection();
1402  if (!lifecycle_service_.is_running()) {
1403  return invocation_promise_.get_future().then(
1404  [](boost::future<protocol::ClientMessage> f) { return f.get(); });
1405  }
1406  auto id_seq = call_id_sequence_;
1407  return invocation_promise_.get_future().then(
1408  execution_service_->get_user_executor(),
1409  [=](boost::future<protocol::ClientMessage> f) {
1410  id_seq->complete();
1411  return f.get();
1412  });
1413 }
1414 
1415 void
1416 ClientInvocation::invoke_on_selection()
1417 {
1418  try {
1419  invoke_count_++;
1420  if (!urgent_) {
1421  invocation_service_.check_invocation_allowed();
1422  }
1423 
1424  if (is_bind_to_single_connection()) {
1425  bool invoked = false;
1426  auto conn = connection_.lock();
1427  if (conn) {
1428  invoked = invocation_service_.invoke_on_connection(
1429  shared_from_this(), conn);
1430  }
1431  if (!invoked) {
1432  std::string message;
1433  if (conn) {
1434  message =
1435  (boost::format("Could not invoke on connection %1%") %
1436  *conn)
1437  .str();
1438  } else {
1439  message = "Could not invoke. Bound to a connection that is "
1440  "deleted already.";
1441  }
1442  notify_exception(std::make_exception_ptr(exception::io(
1443  "ClientInvocation::invoke_on_selection", message)));
1444  }
1445  return;
1446  }
1447 
1448  bool invoked = false;
1449  if (smart_routing_) {
1450  if (partition_id_ != -1) {
1451  invoked = invocation_service_.invoke_on_partition_owner(
1452  shared_from_this(), partition_id_);
1453  } else if (!uuid_.is_nil()) {
1454  invoked = invocation_service_.invoke_on_target(
1455  shared_from_this(), uuid_);
1456  } else {
1457  invoked = invocation_service_.invoke(shared_from_this());
1458  }
1459  if (!invoked) {
1460  invoked = invocation_service_.invoke(shared_from_this());
1461  }
1462  } else {
1463  invoked = invocation_service_.invoke(shared_from_this());
1464  }
1465  if (!invoked) {
1466  notify_exception(std::make_exception_ptr(
1467  exception::io("No connection found to invoke")));
1468  }
1469  } catch (exception::iexception&) {
1470  notify_exception(std::current_exception());
1471  } catch (std::exception&) {
1472  assert(false);
1473  }
1474 }
1475 
1476 bool
1477 ClientInvocation::is_bind_to_single_connection() const
1478 {
1479  return bound_to_single_connection_;
1480 }
1481 
1482 void
1483 ClientInvocation::run()
1484 {
1485  retry();
1486 }
1487 
1488 void
1489 ClientInvocation::retry()
1490 {
1491  // retry modifies the client message and should not reuse the client
1492  // message. It could be the case that it is in write queue of the
1493  // connection.
1494  client_message_ =
1495  boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(
1496  copy_message());
1497 
1498  try {
1499  invoke_on_selection();
1500  } catch (exception::iexception& e) {
1501  set_exception(e, boost::current_exception());
1502  } catch (std::exception&) {
1503  assert(false);
1504  }
1505 }
1506 
1507 void
1508 ClientInvocation::set_exception(const std::exception& e,
1509  boost::exception_ptr exception_ptr)
1510 {
1511  invoked_or_exception_set_.store(true);
1512  try {
1513  auto send_conn = send_connection_.load();
1514  if (send_conn) {
1515  auto connection = send_conn->lock();
1516  if (connection) {
1517  auto call_id =
1518  client_message_.load()->get()->get_correlation_id();
1519  boost::asio::post(
1520  connection->get_socket().get_executor(),
1521  [=]() { connection->deregister_invocation(call_id); });
1522  }
1523  }
1524  invocation_promise_.set_exception(std::move(exception_ptr));
1525  } catch (boost::promise_already_satisfied& se) {
1526  if (!event_handler_) {
1527  HZ_LOG(logger_,
1528  finest,
1529  boost::str(boost::format(
1530  "Failed to set the exception for invocation. "
1531  "%1%, %2% Exception to be set: %3%") %
1532  se.what() % *this % e.what()));
1533  }
1534  }
1535 }
1536 
1537 void
1538 ClientInvocation::notify_exception(std::exception_ptr exception)
1539 {
1540  erase_invocation();
1541  try {
1542  std::rethrow_exception(exception);
1543  } catch (exception::iexception& iex) {
1544  log_exception(iex);
1545 
1546  if (!lifecycle_service_.is_running()) {
1547  try {
1548  std::throw_with_nested(boost::enable_current_exception(
1549  exception::hazelcast_client_not_active(
1550  iex.get_source(), "Client is shutting down")));
1551  } catch (exception::iexception& e) {
1552  set_exception(e, boost::current_exception());
1553  }
1554  return;
1555  }
1556 
1557  if (!should_retry(iex)) {
1558  set_exception(iex, boost::current_exception());
1559  return;
1560  }
1561 
1562  auto timePassed = std::chrono::steady_clock::now() - start_time_;
1563  if (timePassed > invocation_service_.get_invocation_timeout()) {
1564  HZ_LOG(
1565  logger_,
1566  finest,
1567  boost::str(boost::format("Exception will not be retried because "
1568  "invocation timed out. %1%") %
1569  iex.what()));
1570 
1571  auto now = std::chrono::steady_clock::now();
1572 
1573  auto timeoutException =
1574  (exception::exception_builder<exception::operation_timeout>(
1575  "ClientInvocation::newoperation_timeout_exception")
1576  << *this
1577  << " timed out because exception occurred after client "
1578  "invocation timeout "
1579  << std::chrono::duration_cast<std::chrono::milliseconds>(
1580  invocation_service_.get_invocation_timeout())
1581  .count()
1582  << "msecs. Last exception:" << iex << " Current time :"
1583  << util::StringUtil::time_to_string(now) << ". "
1584  << "Start time: "
1585  << util::StringUtil::time_to_string(start_time_)
1586  << ". Total elapsed time: "
1587  << std::chrono::duration_cast<std::chrono::milliseconds>(
1588  now - start_time_)
1589  .count()
1590  << " ms. ")
1591  .build();
1592  try {
1593  BOOST_THROW_EXCEPTION(timeoutException);
1594  } catch (...) {
1595  set_exception(timeoutException, boost::current_exception());
1596  }
1597 
1598  return;
1599  }
1600 
1601  try {
1602  execute();
1603  } catch (std::exception& e) {
1604  set_exception(e, boost::current_exception());
1605  }
1606  } catch (...) {
1607  assert(false);
1608  }
1609 }
1610 
1611 void
1612 ClientInvocation::erase_invocation() const
1613 {
1614  if (!this->event_handler_) {
1615  auto sent_connection = get_send_connection();
1616  if (sent_connection) {
1617  auto this_invocation = shared_from_this();
1618  boost::asio::post(sent_connection->get_socket().get_executor(),
1619  [=]() {
1620  sent_connection->invocations.erase(
1621  this_invocation->get_client_message()
1622  ->get_correlation_id());
1623  });
1624  }
1625  }
1626 }
1627 
1628 bool
1629 ClientInvocation::should_retry(exception::iexception& exception)
1630 {
1631  auto errorCode = exception.get_error_code();
1632  if (is_bind_to_single_connection() &&
1633  (errorCode == protocol::IO ||
1634  errorCode == protocol::TARGET_DISCONNECTED)) {
1635  return false;
1636  }
1637 
1638  if (!uuid_.is_nil() && errorCode == protocol::TARGET_NOT_MEMBER) {
1639  // when invocation send to a specific member
1640  // if target is no longer a member, we should not retry
1641  // note that this exception could come from the server
1642  return false;
1643  }
1644 
1645  if (errorCode == protocol::IO ||
1646  errorCode == protocol::HAZELCAST_INSTANCE_NOT_ACTIVE ||
1647  exception.is_retryable()) {
1648  return true;
1649  }
1650  if (errorCode == protocol::TARGET_DISCONNECTED) {
1651  return client_message_.load()->get()->is_retryable() ||
1652  invocation_service_.is_redo_operation();
1653  }
1654  return false;
1655 }
1656 
1657 std::ostream&
1658 operator<<(std::ostream& os, const ClientInvocation& invocation)
1659 {
1660  std::ostringstream target;
1661  if (invocation.is_bind_to_single_connection()) {
1662  auto conn = invocation.connection_.lock();
1663  if (conn) {
1664  target << "connection " << *conn;
1665  }
1666  } else if (invocation.partition_id_ != -1) {
1667  target << "partition " << invocation.partition_id_;
1668  } else if (!invocation.uuid_.is_nil()) {
1669  target << "uuid " << boost::to_string(invocation.uuid_);
1670  } else {
1671  target << "random";
1672  }
1673  os << "ClientInvocation{"
1674  << "requestMessage = " << *invocation.client_message_.load()->get()
1675  << ", objectName = " << invocation.object_name_
1676  << ", target = " << target.str() << ", sendConnection = ";
1677  auto sendConnection = invocation.get_send_connection();
1678  if (sendConnection) {
1679  os << *sendConnection;
1680  } else {
1681  os << "nullptr";
1682  }
1683  os << ", backup_acks_expected_ = "
1684  << static_cast<int>(invocation.backup_acks_expected_)
1685  << ", backup_acks_received = " << invocation.backup_acks_received_;
1686 
1687  if (invocation.pending_response_) {
1688  os << ", pending_response: " << *invocation.pending_response_;
1689  }
1690 
1691  os << '}';
1692 
1693  return os;
1694 }
1695 
1696 std::shared_ptr<ClientInvocation>
1697 ClientInvocation::create(
1698  spi::ClientContext& client_context,
1699  std::shared_ptr<protocol::ClientMessage>&& client_message,
1700  const std::string& object_name,
1701  int partition_id)
1702 {
1703  return std::shared_ptr<ClientInvocation>(new ClientInvocation(
1704  client_context, std::move(client_message), object_name, partition_id));
1705 }
1706 
1707 std::shared_ptr<ClientInvocation>
1708 ClientInvocation::create(
1709  spi::ClientContext& client_context,
1710  std::shared_ptr<protocol::ClientMessage>&& client_message,
1711  const std::string& object_name,
1712  const std::shared_ptr<connection::Connection>& connection)
1713 {
1714  return std::shared_ptr<ClientInvocation>(
1715  new ClientInvocation(client_context,
1716  std::move(client_message),
1717  object_name,
1718  UNASSIGNED_PARTITION,
1719  connection));
1720 }
1721 
1722 std::shared_ptr<ClientInvocation>
1723 ClientInvocation::create(
1724  spi::ClientContext& client_context,
1725  std::shared_ptr<protocol::ClientMessage>&& client_message,
1726  const std::string& object_name,
1727  boost::uuids::uuid uuid)
1728 {
1729  return std::shared_ptr<ClientInvocation>(
1730  new ClientInvocation(client_context,
1731  std::move(client_message),
1732  object_name,
1733  UNASSIGNED_PARTITION,
1734  nullptr,
1735  uuid));
1736 }
1737 
1738 std::shared_ptr<ClientInvocation>
1739 ClientInvocation::create(spi::ClientContext& client_context,
1740  protocol::ClientMessage& client_message,
1741  const std::string& object_name,
1742  int partition_id)
1743 {
1744  return create(
1745  client_context,
1746  std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1747  object_name,
1748  partition_id);
1749 }
1750 
1751 std::shared_ptr<ClientInvocation>
1752 ClientInvocation::create(
1753  spi::ClientContext& client_context,
1754  protocol::ClientMessage& client_message,
1755  const std::string& object_name,
1756  const std::shared_ptr<connection::Connection>& connection)
1757 {
1758  return create(
1759  client_context,
1760  std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1761  object_name,
1762  connection);
1763 }
1764 
1765 std::shared_ptr<ClientInvocation>
1766 ClientInvocation::create(spi::ClientContext& client_context,
1767  protocol::ClientMessage& client_message,
1768  const std::string& object_name,
1769  boost::uuids::uuid uuid)
1770 {
1771  return create(
1772  client_context,
1773  std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1774  object_name,
1775  uuid);
1776 }
1777 
1778 std::shared_ptr<connection::Connection>
1779 ClientInvocation::get_send_connection() const
1780 {
1781  return send_connection_.load()->lock();
1782 }
1783 
1784 void
1785 ClientInvocation::wait_invoked() const
1786 {
1787  // it could be either invoked or cancelled before invoked
1788  while (!invoked_or_exception_set_) {
1789  std::this_thread::sleep_for(retry_pause_);
1790  }
1791 }
1792 
1793 void
1794 ClientInvocation::set_send_connection(
1795  const std::shared_ptr<connection::Connection>& conn)
1796 {
1797  send_connection_.store(
1798  boost::make_shared<std::weak_ptr<connection::Connection>>(conn));
1799  invoked_or_exception_set_.store(true);
1800 }
1801 
1802 void
1803 ClientInvocation::notify(const std::shared_ptr<protocol::ClientMessage>& msg)
1804 {
1805  if (!msg) {
1806  BOOST_THROW_EXCEPTION(
1807  exception::illegal_argument("response can't be null"));
1808  }
1809 
1810  int8_t expected_backups = msg->get_number_of_backups();
1811 
1812  // if a regular response comes and there are backups, we need to wait for
1813  // the backups when the backups complete, the response will be send by the
1814  // last backup or backup-timeout-handle mechanism kicks on
1815  if (expected_backups > backup_acks_received_) {
1816  // so the invocation has backups and since not all backups have
1817  // completed, we need to wait (it could be that backups arrive earlier
1818  // than the response)
1819 
1820  pending_response_received_time_ = std::chrono::steady_clock::now();
1821 
1822  backup_acks_expected_ = expected_backups;
1823 
1824  // it is very important that the response is set after the
1825  // backupsAcksExpected is set, else the system can assume the invocation
1826  // is complete because there is a response and no backups need to
1827  // respond
1828  pending_response_ = msg;
1829 
1830  // we are done since not all backups have completed. Therefore we should
1831  // not notify the future
1832  return;
1833  }
1834 
1835  // we are going to notify the future that a response is available; this can
1836  // happen when:
1837  // - we had a regular operation (so no backups we need to wait for) that
1838  // completed
1839  // - we had a backup-aware operation that has completed, but also all its
1840  // backups have completed
1841  complete(msg);
1842 }
1843 
1844 void
1845 ClientInvocation::complete(const std::shared_ptr<protocol::ClientMessage>& msg)
1846 {
1847  try {
1848  // TODO: move msg content here?
1849  this->invocation_promise_.set_value(*msg);
1850  } catch (std::exception& e) {
1851  HZ_LOG(logger_,
1852  warning,
1853  boost::str(boost::format(
1854  "Failed to set the response for invocation. "
1855  "Dropping the response. %1%, %2% Response: %3%") %
1856  e.what() % *this % *msg));
1857  }
1858  this->erase_invocation();
1859 }
1860 
1861 std::shared_ptr<protocol::ClientMessage>
1862 ClientInvocation::get_client_message() const
1863 {
1864  return *client_message_.load();
1865 }
1866 
1867 const std::shared_ptr<EventHandler<protocol::ClientMessage>>&
1868 ClientInvocation::get_event_handler() const
1869 {
1870  return event_handler_;
1871 }
1872 
1873 void
1874 ClientInvocation::set_event_handler(
1875  const std::shared_ptr<EventHandler<protocol::ClientMessage>>& handler)
1876 {
1877  ClientInvocation::event_handler_ = handler;
1878 }
1879 
1880 void
1881 ClientInvocation::execute()
1882 {
1883  auto this_invocation = shared_from_this();
1884  auto command = [=]() { this_invocation->run(); };
1885 
1886  // first we force a new invocation slot because we are going to return our
1887  // old invocation slot immediately after It is important that we first
1888  // 'force' taking a new slot; otherwise it could be that a sneaky invocation
1889  // gets through that takes our slot!
1890  int64_t callId = call_id_sequence_->force_next();
1891  client_message_.load()->get()->set_correlation_id(callId);
1892 
1893  // we release the old slot
1894  call_id_sequence_->complete();
1895 
1896  if (invoke_count_ < MAX_FAST_INVOCATION_COUNT) {
1897  // fast retry for the first few invocations
1898  execution_service_->execute(command);
1899  } else {
1900  // progressive retry delay
1901  int64_t delayMillis = util::min<int64_t>(
1902  static_cast<int64_t>(1)
1903  << (invoke_count_ - MAX_FAST_INVOCATION_COUNT),
1904  std::chrono::duration_cast<std::chrono::milliseconds>(retry_pause_)
1905  .count());
1906  retry_timer_ = execution_service_->schedule(
1907  command, std::chrono::milliseconds(delayMillis));
1908  }
1909 }
1910 
1911 const std::string
1912 ClientInvocation::get_name() const
1913 {
1914  return "ClientInvocation";
1915 }
1916 
1917 std::shared_ptr<protocol::ClientMessage>
1918 ClientInvocation::copy_message()
1919 {
1920  return std::make_shared<protocol::ClientMessage>(**client_message_.load());
1921 }
1922 
1923 boost::promise<protocol::ClientMessage>&
1924 ClientInvocation::get_promise()
1925 {
1926  return invocation_promise_;
1927 }
1928 
1929 void
1930 ClientInvocation::log_exception(exception::iexception& e)
1931 {
1932  HZ_LOG(logger_,
1933  finest,
1934  boost::str(boost::format(
1935  "Invocation got an exception %1%, invoke count : %2%, "
1936  "exception : %3%") %
1937  *this % invoke_count_.load() % e));
1938 }
1939 
1940 void
1941 ClientInvocation::notify_backup()
1942 {
1943  ++backup_acks_received_;
1944 
1945  if (!pending_response_) {
1946  // no pendingResponse has been set, so we are done since the invocation
1947  // on the primary needs to complete first
1948  return;
1949  }
1950 
1951  // if a pendingResponse is set, then the backupsAcksExpected has been set
1952  // (so we can now safely read backupsAcksExpected)
1953  if (backup_acks_expected_ != backup_acks_received_) {
1954  // we managed to complete a backup, but we were not the one completing
1955  // the last backup, so we are done
1956  return;
1957  }
1958 
1959  // we are the lucky one since we just managed to complete the last backup
1960  // for this invocation and since the pendingResponse is set, we can set it
1961  // on the future
1962  complete_with_pending_response();
1963 }
1964 
1965 void
1966 ClientInvocation::detect_and_handle_backup_timeout(
1967  const std::chrono::milliseconds& backup_timeout)
1968 {
1969  // if the backups have completed, we are done; this also filters out all non
1970  // backup-aware operations since the backupsAcksExpected will always be
1971  // equal to the backupsAcksReceived
1972  if (backup_acks_expected_ == backup_acks_received_) {
1973  return;
1974  }
1975 
1976  // if no response has yet been received, we we are done; we are only going
1977  // to re-invoke an operation if the response of the primary has been
1978  // received, but the backups have not replied
1979  if (!pending_response_) {
1980  return;
1981  }
1982 
1983  // if this has not yet expired (so has not been in the system for a too long
1984  // period) we ignore it
1985  if (pending_response_received_time_ + backup_timeout >=
1986  std::chrono::steady_clock::now()) {
1987  return;
1988  }
1989 
1990  if (invocation_service_.fail_on_indeterminate_state()) {
1991  auto exception = boost::enable_current_exception(
1992  (exception::exception_builder<
1993  exception::indeterminate_operation_state>(
1994  "ClientInvocation::detect_and_handle_backup_timeout")
1995  << *this << " failed because backup acks missed.")
1996  .build());
1997  notify_exception(std::make_exception_ptr(exception));
1998  return;
1999  }
2000 
2001  // the backups have not yet completed, but we are going to release the
2002  // future anyway if a pendingResponse has been set
2003  complete_with_pending_response();
2004 }
2005 
2006 void
2007 ClientInvocation::complete_with_pending_response()
2008 {
2009  complete(pending_response_);
2010 }
2011 
2012 ClientContext&
2013 impl::ClientTransactionManagerServiceImpl::get_client() const
2014 {
2015  return client_;
2016 }
2017 
2018 ClientTransactionManagerServiceImpl::ClientTransactionManagerServiceImpl(
2019  ClientContext& client)
2020  : client_(client)
2021 {}
2022 
2023 std::shared_ptr<connection::Connection>
2024 ClientTransactionManagerServiceImpl::connect()
2025 {
2026  auto& invocationService = client_.get_invocation_service();
2027  auto startTime = std::chrono::steady_clock::now();
2028  auto invocationTimeout = invocationService.get_invocation_timeout();
2029  client_config& clientConfig = client_.get_client_config();
2030  bool smartRouting = clientConfig.get_network_config().is_smart_routing();
2031 
2032  while (client_.get_lifecycle_service().is_running()) {
2033  try {
2034  auto connection =
2035  client_.get_connection_manager().get_random_connection();
2036  if (!connection) {
2037  throw_exception(smartRouting);
2038  }
2039  return connection;
2040  } catch (exception::hazelcast_client_offline&) {
2041  throw;
2042  } catch (exception::iexception&) {
2043  if (std::chrono::steady_clock::now() - startTime >
2044  invocationTimeout) {
2045  std::rethrow_exception(new_operation_timeout_exception(
2046  std::current_exception(), invocationTimeout, startTime));
2047  }
2048  }
2049  std::this_thread::sleep_for(
2050  invocationService.get_invocation_retry_pause());
2051  }
2052  BOOST_THROW_EXCEPTION(exception::hazelcast_client_not_active(
2053  "ClientTransactionManagerServiceImpl::connect", "Client is shutdown"));
2054 }
2055 
2056 std::exception_ptr
2057 ClientTransactionManagerServiceImpl::new_operation_timeout_exception(
2058  std::exception_ptr cause,
2059  std::chrono::milliseconds invocation_timeout,
2060  std::chrono::steady_clock::time_point start_time)
2061 {
2062  std::ostringstream sb;
2063  auto now = std::chrono::steady_clock::now();
2064  sb << "Creating transaction context timed out because exception occurred "
2065  "after client invocation timeout "
2066  << std::chrono::duration_cast<std::chrono::milliseconds>(
2067  invocation_timeout)
2068  .count()
2069  << " ms. "
2070  << "Current time: "
2071  << util::StringUtil::time_to_string(std::chrono::steady_clock::now())
2072  << ". "
2073  << "Start time: " << util::StringUtil::time_to_string(start_time)
2074  << ". Total elapsed time: "
2075  << std::chrono::duration_cast<std::chrono::milliseconds>(now -
2076  start_time)
2077  .count()
2078  << " ms. ";
2079  try {
2080  std::rethrow_exception(cause);
2081  } catch (...) {
2082  try {
2083  std::throw_with_nested(boost::enable_current_exception(
2084  exception::operation_timeout("ClientTransactionManagerServiceImpl"
2085  "::newoperation_timeout_exception",
2086  sb.str())));
2087  } catch (...) {
2088  return std::current_exception();
2089  }
2090  }
2091  return nullptr;
2092 }
2093 
2094 void
2095 ClientTransactionManagerServiceImpl::throw_exception(bool smart_routing)
2096 {
2097  auto& client_config = client_.get_client_config();
2098  auto& connection_strategy_Config =
2099  client_config.get_connection_strategy_config();
2100  auto reconnect_mode = connection_strategy_Config.get_reconnect_mode();
2101  if (reconnect_mode ==
2102  config::client_connection_strategy_config::reconnect_mode::ASYNC) {
2103  BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
2104  "ClientTransactionManagerServiceImpl::throw_exception", ""));
2105  }
2106  if (smart_routing) {
2107  auto members = client_.get_cluster().get_members();
2108  std::ostringstream msg;
2109  if (members.empty()) {
2110  msg << "No address was return by the LoadBalancer since there are "
2111  "no members in the cluster";
2112  } else {
2113  msg << "No address was return by the LoadBalancer. "
2114  "But the cluster contains the following members:{\n";
2115  for (auto const& m : members) {
2116  msg << '\t' << m << '\n';
2117  }
2118  msg << "}";
2119  }
2120  BOOST_THROW_EXCEPTION(exception::illegal_state(
2121  "ClientTransactionManagerServiceImpl::throw_exception", msg.str()));
2122  }
2123  BOOST_THROW_EXCEPTION(exception::illegal_state(
2124  "ClientTransactionManagerServiceImpl::throw_exception",
2125  "No active connection is found"));
2126 }
2127 
2128 ClientPartitionServiceImpl::ClientPartitionServiceImpl(ClientContext& client)
2129  : client_(client)
2130  , logger_(client.get_logger())
2131  , partition_count_(0)
2132  , partition_table_(
2133  boost::shared_ptr<partition_table>(new partition_table{ 0, -1 }))
2134 {}
2135 
2136 void
2137 ClientPartitionServiceImpl::handle_event(
2138  int32_t connection_id,
2139  int32_t version,
2140  const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>>&
2141  partitions)
2142 {
2143  HZ_LOG(logger_,
2144  finest,
2145  boost::str(
2146  boost::format(
2147  "Handling new partition table with partitionStateVersion: %1%") %
2148  version));
2149 
2150  while (true) {
2151  auto current = partition_table_.load();
2152  if (!should_be_applied(connection_id, version, partitions, *current)) {
2153  return;
2154  }
2155  if (partition_table_.compare_exchange_strong(
2156  current,
2157  boost::shared_ptr<partition_table>(new partition_table{
2158  connection_id, version, convert_to_map(partitions) }))) {
2159  HZ_LOG(
2160  logger_,
2161  finest,
2162  boost::str(
2163  boost::format(
2164  "Applied partition table with partitionStateVersion : %1%") %
2165  version));
2166  return;
2167  }
2168  }
2169 }
2170 
2171 boost::uuids::uuid
2172 ClientPartitionServiceImpl::get_partition_owner(int32_t partition_id)
2173 {
2174  auto table_ptr = partition_table_.load();
2175  auto it = table_ptr->partitions.find(partition_id);
2176  if (it != table_ptr->partitions.end()) {
2177  return it->second;
2178  }
2179  return boost::uuids::nil_uuid();
2180 }
2181 
2182 int32_t
2183 ClientPartitionServiceImpl::get_partition_id(
2184  const serialization::pimpl::data& key)
2185 {
2186  int32_t pc = get_partition_count();
2187  if (pc <= 0) {
2188  return 0;
2189  }
2190  int hash = key.get_partition_hash();
2191  return util::HashUtil::hash_to_index(hash, pc);
2192 }
2193 
2194 int32_t
2195 ClientPartitionServiceImpl::get_partition_count()
2196 {
2197  return partition_count_.load();
2198 }
2199 
2200 std::shared_ptr<client::impl::Partition>
2201 ClientPartitionServiceImpl::get_partition(int partition_id)
2202 {
2203  return std::shared_ptr<client::impl::Partition>(
2204  new PartitionImpl(partition_id, client_, *this));
2205 }
2206 
2207 bool
2208 ClientPartitionServiceImpl::check_and_set_partition_count(
2209  int32_t new_partition_count)
2210 {
2211  int32_t expected = 0;
2212  if (partition_count_.compare_exchange_strong(expected,
2213  new_partition_count)) {
2214  return true;
2215  }
2216  return partition_count_.load() == new_partition_count;
2217 }
2218 
2219 bool
2220 ClientPartitionServiceImpl::should_be_applied(
2221  int32_t connection_id,
2222  int32_t version,
2223  const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>>&
2224  partitions,
2225  const partition_table& current)
2226 {
2227  auto& lg = client_.get_logger();
2228  if (partitions.empty()) {
2229  if (logger_.enabled(logger::level::finest)) {
2230  log_failure(connection_id, version, current, "response is empty");
2231  }
2232  return false;
2233  }
2234  if (!current.connection_id || connection_id != current.connection_id) {
2235  HZ_LOG(
2236  lg, finest, ([&current, connection_id]() {
2237  auto frmt = boost::format(
2238  "Event coming from a new connection. Old connection id: %1%, "
2239  "new connection %2%");
2240 
2241  if (current.connection_id) {
2242  frmt = frmt % current.connection_id;
2243  } else {
2244  frmt = frmt % "none";
2245  }
2246 
2247  return boost::str(frmt % connection_id);
2248  })());
2249 
2250  return true;
2251  }
2252  if (version <= current.version) {
2253  if (lg.enabled(logger::level::finest)) {
2254  log_failure(
2255  connection_id, version, current, "response state version is old");
2256  }
2257  return false;
2258  }
2259  return true;
2260 }
2261 
2262 void
2263 ClientPartitionServiceImpl::log_failure(
2264  int32_t connection_id,
2265  int32_t version,
2266  const ClientPartitionServiceImpl::partition_table& current,
2267  const std::string& cause)
2268 {
2269  HZ_LOG(logger_, finest, [&]() {
2270  auto frmt = boost::format(
2271  " We will not apply the response, since %1% ."
2272  " Response is from connection with id %2%. "
2273  "Current connection id is %3%, response state version:%4%. "
2274  "Current state version: %5%");
2275  if (current.connection_id) {
2276  return boost::str(frmt % cause % connection_id %
2277  current.connection_id % version %
2278  current.version);
2279  } else {
2280  return boost::str(frmt % cause % connection_id % "nullptr" %
2281  version % current.version);
2282  }
2283  }());
2284 }
2285 
2286 void
2287 ClientPartitionServiceImpl::reset()
2288 {
2289  partition_table_.store(nullptr);
2290 }
2291 
2292 std::unordered_map<int32_t, boost::uuids::uuid>
2293 ClientPartitionServiceImpl::convert_to_map(
2294  const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>>&
2295  partitions)
2296 {
2297  std::unordered_map<int32_t, boost::uuids::uuid> new_partitions;
2298  for (auto const& e : partitions) {
2299  for (auto pid : e.second) {
2300  new_partitions.insert({ pid, e.first });
2301  }
2302  }
2303  return new_partitions;
2304 }
2305 
2306 int
2307 ClientPartitionServiceImpl::PartitionImpl::get_partition_id() const
2308 {
2309  return partition_id_;
2310 }
2311 
2312 boost::optional<member>
2313 ClientPartitionServiceImpl::PartitionImpl::get_owner() const
2314 {
2315  auto owner = partition_service_.get_partition_owner(partition_id_);
2316  if (!owner.is_nil()) {
2317  return client_.get_client_cluster_service().get_member(owner);
2318  }
2319  return boost::none;
2320 }
2321 
2322 ClientPartitionServiceImpl::PartitionImpl::PartitionImpl(
2323  int partition_id,
2324  ClientContext& client,
2325  ClientPartitionServiceImpl& partition_service)
2326  : partition_id_(partition_id)
2327  , client_(client)
2328  , partition_service_(partition_service)
2329 {}
2330 
2331 namespace sequence {
2332 CallIdSequenceWithoutBackpressure::CallIdSequenceWithoutBackpressure()
2333  : head_(0)
2334 {}
2335 
2336 CallIdSequenceWithoutBackpressure::~CallIdSequenceWithoutBackpressure() =
2337  default;
2338 
2339 int32_t
2340 CallIdSequenceWithoutBackpressure::get_max_concurrent_invocations() const
2341 {
2342  return INT32_MAX;
2343 }
2344 
2345 int64_t
2346 CallIdSequenceWithoutBackpressure::next()
2347 {
2348  return force_next();
2349 }
2350 
2351 int64_t
2352 CallIdSequenceWithoutBackpressure::force_next()
2353 {
2354  return ++head_;
2355 }
2356 
2357 void
2358 CallIdSequenceWithoutBackpressure::complete()
2359 {
2360  // no-op
2361 }
2362 
2363 int64_t
2364 CallIdSequenceWithoutBackpressure::get_last_call_id()
2365 {
2366  return head_;
2367 }
2368 
2369 // TODO: see if we can utilize std::hardware_destructive_interference_size
2370 AbstractCallIdSequence::AbstractCallIdSequence(
2371  int32_t max_concurrent_invocations)
2372 {
2373  std::ostringstream out;
2374  out << "maxConcurrentInvocations should be a positive number. "
2375  "maxConcurrentInvocations="
2376  << max_concurrent_invocations;
2377  this->max_concurrent_invocations_ = util::Preconditions::check_positive(
2378  max_concurrent_invocations, out.str());
2379 
2380  for (size_t i = 0; i < longs_.size(); ++i) {
2381  longs_[i] = 0;
2382  }
2383 }
2384 
2385 AbstractCallIdSequence::~AbstractCallIdSequence() = default;
2386 
2387 int32_t
2388 AbstractCallIdSequence::get_max_concurrent_invocations() const
2389 {
2390  return max_concurrent_invocations_;
2391 }
2392 
2393 int64_t
2394 AbstractCallIdSequence::next()
2395 {
2396  if (!has_space()) {
2397  handle_no_space_left();
2398  }
2399  return force_next();
2400 }
2401 
2402 int64_t
2403 AbstractCallIdSequence::force_next()
2404 {
2405  return ++longs_[INDEX_HEAD];
2406 }
2407 
2408 void
2409 AbstractCallIdSequence::complete()
2410 {
2411  ++longs_[INDEX_TAIL];
2412  assert(longs_[INDEX_TAIL] <= longs_[INDEX_HEAD]);
2413 }
2414 
2415 int64_t
2416 AbstractCallIdSequence::get_last_call_id()
2417 {
2418  return longs_[INDEX_HEAD];
2419 }
2420 
2421 bool
2422 AbstractCallIdSequence::has_space()
2423 {
2424  return longs_[INDEX_HEAD] - longs_[INDEX_TAIL] <
2425  max_concurrent_invocations_;
2426 }
2427 
2428 int64_t
2429 AbstractCallIdSequence::get_tail()
2430 {
2431  return longs_[INDEX_TAIL];
2432 }
2433 
2434 const std::unique_ptr<util::concurrent::IdleStrategy>
2435  CallIdSequenceWithBackpressure::IDLER(
2436  new util::concurrent::BackoffIdleStrategy(
2437  0,
2438  0,
2439  std::chrono::duration_cast<std::chrono::nanoseconds>(
2440  std::chrono::microseconds(1000))
2441  .count(),
2442  std::chrono::duration_cast<std::chrono::nanoseconds>(
2443  std::chrono::microseconds(MAX_DELAY_MS * 1000))
2444  .count()));
2445 
2446 CallIdSequenceWithBackpressure::CallIdSequenceWithBackpressure(
2447  int32_t max_concurrent_invocations,
2448  int64_t backoff_timeout_ms)
2449  : AbstractCallIdSequence(max_concurrent_invocations)
2450 {
2451  std::ostringstream out;
2452  out << "backoffTimeoutMs should be a positive number. backoffTimeoutMs="
2453  << backoff_timeout_ms;
2454  util::Preconditions::check_positive(backoff_timeout_ms, out.str());
2455 
2456  backoff_timeout_nanos_ =
2457  std::chrono::duration_cast<std::chrono::nanoseconds>(
2458  std::chrono::milliseconds(backoff_timeout_ms))
2459  .count();
2460 }
2461 
2462 void
2463 CallIdSequenceWithBackpressure::handle_no_space_left()
2464 {
2465  auto start = std::chrono::steady_clock::now();
2466  for (int64_t idleCount = 0;; idleCount++) {
2467  int64_t elapsedNanos =
2468  std::chrono::duration_cast<std::chrono::nanoseconds>(
2469  std::chrono::steady_clock::now() - start)
2470  .count();
2471  if (elapsedNanos > backoff_timeout_nanos_) {
2472  throw(exception::exception_builder<exception::hazelcast_overload>(
2473  "CallIdSequenceWithBackpressure::handleNoSpaceLeft")
2474  << "Timed out trying to acquire another call ID."
2475  << " maxConcurrentInvocations = "
2476  << get_max_concurrent_invocations() << ", backoffTimeout = "
2477  << std::chrono::microseconds(backoff_timeout_nanos_ / 1000)
2478  .count()
2479  << " msecs, elapsed:"
2480  << std::chrono::microseconds(elapsedNanos / 1000).count()
2481  << " msecs")
2482  .build();
2483  }
2484  IDLER->idle(idleCount);
2485  if (has_space()) {
2486  return;
2487  }
2488  }
2489 }
2490 
2491 FailFastCallIdSequence::FailFastCallIdSequence(
2492  int32_t max_concurrent_invocations)
2493  : AbstractCallIdSequence(max_concurrent_invocations)
2494 {}
2495 
2496 void
2497 FailFastCallIdSequence::handle_no_space_left()
2498 {
2499  throw(exception::exception_builder<exception::hazelcast_overload>(
2500  "FailFastCallIdSequence::handleNoSpaceLeft")
2501  << "Maximum invocation count is reached. maxConcurrentInvocations = "
2502  << get_max_concurrent_invocations())
2503  .build();
2504 }
2505 
2506 std::unique_ptr<CallIdSequence>
2507 CallIdFactory::new_call_id_sequence(bool is_back_pressure_enabled,
2508  int32_t max_allowed_concurrent_invocations,
2509  int64_t backoff_timeout_ms)
2510 {
2511  if (!is_back_pressure_enabled) {
2512  return std::unique_ptr<CallIdSequence>(
2513  new CallIdSequenceWithoutBackpressure());
2514  } else if (backoff_timeout_ms <= 0) {
2515  return std::unique_ptr<CallIdSequence>(
2516  new FailFastCallIdSequence(max_allowed_concurrent_invocations));
2517  } else {
2518  return std::unique_ptr<CallIdSequence>(
2519  new CallIdSequenceWithBackpressure(max_allowed_concurrent_invocations,
2520  backoff_timeout_ms));
2521  }
2522 }
2523 } // namespace sequence
2524 
2525 namespace listener {
2526 listener_service_impl::listener_service_impl(ClientContext& client_context,
2527  int32_t event_thread_count)
2528  : client_context_(client_context)
2529  , serialization_service_(client_context.get_serialization_service())
2530  , logger_(client_context.get_logger())
2531  , client_connection_manager_(client_context.get_connection_manager())
2532  , number_of_event_threads_(event_thread_count)
2533  , smart_(client_context.get_client_config()
2534  .get_network_config()
2535  .is_smart_routing())
2536 {
2537  auto& invocationService = client_context.get_invocation_service();
2538  invocation_timeout_ = invocationService.get_invocation_timeout();
2539  invocation_retry_pause_ = invocationService.get_invocation_retry_pause();
2540 }
2541 
2542 bool
2543 listener_service_impl::registers_local_only() const
2544 {
2545  return smart_;
2546 }
2547 
2548 boost::future<boost::uuids::uuid>
2549 listener_service_impl::register_listener(
2550  std::shared_ptr<ListenerMessageCodec> listener_message_codec,
2551  std::shared_ptr<client::impl::BaseEventHandler> handler)
2552 {
2553  auto task = boost::packaged_task<boost::uuids::uuid()>([=]() {
2554  return register_listener_internal(listener_message_codec, handler);
2555  });
2556  auto f = task.get_future();
2557  boost::asio::post(registration_executor_->get_executor(), std::move(task));
2558  return f;
2559 }
2560 
2561 boost::future<bool>
2562 listener_service_impl::deregister_listener(boost::uuids::uuid registration_id)
2563 {
2564  util::Preconditions::check_not_nill(
2565  registration_id, "Nil userRegistrationId is not allowed!");
2566 
2567  boost::packaged_task<bool()> task(
2568  [=]() { return deregister_listener_internal(registration_id); });
2569  auto f = task.get_future();
2570  boost::asio::post(registration_executor_->get_executor(), std::move(task));
2571  return f;
2572 }
2573 
2574 void
2575 listener_service_impl::connection_added(
2576  const std::shared_ptr<connection::Connection> connection)
2577 {
2578  boost::asio::post(registration_executor_->get_executor(),
2579  [=]() { connection_added_internal(connection); });
2580 }
2581 
2582 void
2583 listener_service_impl::connection_removed(
2584  const std::shared_ptr<connection::Connection> connection)
2585 {
2586  boost::asio::post(registration_executor_->get_executor(),
2587  [=]() { connection_removed_internal(connection); });
2588 }
2589 
2590 void
2591 listener_service_impl::remove_event_handler(
2592  int64_t call_id,
2593  const std::shared_ptr<connection::Connection>& connection)
2594 {
2595  boost::asio::post(connection->get_socket().get_executor(),
2596  std::packaged_task<void()>(
2597  [=]() { connection->deregister_invocation(call_id); }));
2598 }
2599 
2600 void
2601 listener_service_impl::handle_client_message(
2602  const std::shared_ptr<ClientInvocation> invocation,
2603  const std::shared_ptr<protocol::ClientMessage> response)
2604 {
2605  try {
2606  auto partitionId = response->get_partition_id();
2607  if (partitionId == -1) {
2608  // execute on random thread on the thread pool
2609  boost::asio::post(event_executor_->get_executor(), [=]() {
2610  process_event_message(invocation, response);
2611  });
2612  return;
2613  }
2614 
2615  // process on certain thread which is same for the partition id
2616  boost::asio::post(
2617  event_strands_[partitionId % event_strands_.size()],
2618  [=]() { process_event_message(invocation, response); });
2619 
2620  } catch (const std::exception& e) {
2621  if (client_context_.get_lifecycle_service().is_running()) {
2622  HZ_LOG(
2623  logger_,
2624  warning,
2625  boost::str(boost::format("Delivery of event message to event "
2626  "handler failed. %1%, %2%, %3%") %
2627  e.what() % *response % *invocation));
2628  }
2629  }
2630 }
2631 
2632 void
2633 listener_service_impl::shutdown()
2634 {
2635  ClientExecutionServiceImpl::shutdown_thread_pool(event_executor_.get());
2636  ClientExecutionServiceImpl::shutdown_thread_pool(
2637  registration_executor_.get());
2638 }
2639 
2640 void
2641 listener_service_impl::start()
2642 {
2643  event_executor_.reset(
2644  new hazelcast::util::hz_thread_pool(number_of_event_threads_));
2645  registration_executor_.reset(new hazelcast::util::hz_thread_pool(1));
2646 
2647  for (int i = 0; i < number_of_event_threads_; ++i) {
2648  event_strands_.emplace_back(event_executor_->get_executor());
2649  }
2650 
2651  client_connection_manager_.add_connection_listener(shared_from_this());
2652 }
2653 
2654 boost::uuids::uuid
2655 listener_service_impl::register_listener_internal(
2656  std::shared_ptr<ListenerMessageCodec> listener_message_codec,
2657  std::shared_ptr<client::impl::BaseEventHandler> handler)
2658 {
2659  auto user_registration_id = client_context_.random_uuid();
2660 
2661  std::shared_ptr<listener_registration> registration(
2662  new listener_registration{ listener_message_codec, handler });
2663  registrations_.put(user_registration_id, registration);
2664  for (auto const& connection :
2665  client_connection_manager_.get_active_connections()) {
2666  try {
2667  invoke(registration, connection);
2668  } catch (exception::iexception& e) {
2669  if (connection->is_alive()) {
2670  deregister_listener_internal(user_registration_id);
2671  BOOST_THROW_EXCEPTION(
2672  (exception::exception_builder<exception::hazelcast_>(
2673  "ClientListenerService::RegisterListenerTask::call")
2674  << "Listener can not be added " << e)
2675  .build());
2676  }
2677  }
2678  }
2679  return user_registration_id;
2680 }
2681 
2682 bool
2683 listener_service_impl::deregister_listener_internal(
2684  boost::uuids::uuid user_registration_id)
2685 {
2686  auto listenerRegistration = registrations_.get(user_registration_id);
2687  if (!listenerRegistration) {
2688  return false;
2689  }
2690  bool successful = true;
2691 
2692  auto listener_registrations =
2693  listenerRegistration->registrations.entry_set();
2694  for (auto it = listener_registrations.begin();
2695  it != listener_registrations.end();) {
2696  const auto& registration = it->second;
2697  const auto& subscriber = it->first;
2698  try {
2699  const auto& listenerMessageCodec = listenerRegistration->codec;
2700  auto serverRegistrationId = registration->server_registration_id;
2701  auto request =
2702  listenerMessageCodec->encode_remove_request(serverRegistrationId);
2703  auto invocation = ClientInvocation::create(
2704  client_context_, request, "", subscriber);
2705  invocation->invoke().get();
2706 
2707  remove_event_handler(registration->call_id, subscriber);
2708 
2709  it = listener_registrations.erase(it);
2710  } catch (exception::iexception& e) {
2711  ++it;
2712  if (subscriber->is_alive()) {
2713  successful = false;
2714  std::ostringstream endpoint;
2715  if (subscriber->get_remote_address()) {
2716  endpoint << *subscriber->get_remote_address();
2717  } else {
2718  endpoint << "null";
2719  }
2720  HZ_LOG(logger_,
2721  warning,
2722  boost::str(
2723  boost::format(
2724  "ClientListenerService::deregisterListenerInternal "
2725  "Deregistration of listener with ID %1% "
2726  "has failed to address %2% %3%") %
2727  user_registration_id %
2728  subscriber->get_remote_address() % e));
2729  }
2730  }
2731  }
2732  if (successful) {
2733  registrations_.remove(user_registration_id);
2734  }
2735  return successful;
2736 }
2737 
2738 void
2739 listener_service_impl::connection_added_internal(
2740  const std::shared_ptr<connection::Connection>& connection)
2741 {
2742  for (const auto& listener_registration : registrations_.values()) {
2743  invoke_from_internal_thread(listener_registration, connection);
2744  }
2745 }
2746 
2747 void
2748 listener_service_impl::connection_removed_internal(
2749  const std::shared_ptr<connection::Connection>& connection)
2750 {
2751  for (auto& registry : registrations_.values()) {
2752  registry->registrations.remove(connection);
2753  }
2754 }
2755 
2756 void
2757 listener_service_impl::invoke_from_internal_thread(
2758  const std::shared_ptr<listener_registration>& listener_registration,
2759  const std::shared_ptr<connection::Connection>& connection)
2760 {
2761  try {
2762  invoke(listener_registration, connection);
2763  } catch (exception::iexception& e) {
2764  HZ_LOG(logger_,
2765  warning,
2766  boost::str(
2767  boost::format("Listener with pointer %1% can not be added to "
2768  "a new connection: %2%, reason: %3%") %
2769  listener_registration.get() % *connection % e));
2770  }
2771 }
2772 
2773 void
2774 listener_service_impl::invoke(
2775  const std::shared_ptr<listener_registration>& listener_registration,
2776  const std::shared_ptr<connection::Connection>& connection)
2777 {
2778  if (listener_registration->registrations.contains_key(connection)) {
2779  return;
2780  }
2781 
2782  const auto& codec = listener_registration->codec;
2783  auto request = codec->encode_add_request(registers_local_only());
2784  const auto& handler = listener_registration->handler;
2785  handler->before_listener_register();
2786 
2787  auto invocation = ClientInvocation::create(
2788  client_context_,
2789  std::make_shared<protocol::ClientMessage>(std::move(request)),
2790  "",
2791  connection);
2792  invocation->set_event_handler(handler);
2793  auto clientMessage = invocation->invoke_urgent().get();
2794 
2795  auto serverRegistrationId = codec->decode_add_response(clientMessage);
2796  handler->on_listener_register();
2797  int64_t correlationId =
2798  invocation->get_client_message()->get_correlation_id();
2799 
2800  (*listener_registration)
2801  .registrations.put(
2802  connection,
2803  std::shared_ptr<connection_registration>(
2804  new connection_registration{ serverRegistrationId, correlationId }));
2805 }
2806 
2807 void
2808 listener_service_impl::process_event_message(
2809  const std::shared_ptr<ClientInvocation> invocation,
2810  const std::shared_ptr<protocol::ClientMessage> response)
2811 {
2812  auto eventHandler = invocation->get_event_handler();
2813  if (!eventHandler) {
2814  if (client_context_.get_lifecycle_service().is_running()) {
2815  HZ_LOG(logger_,
2816  warning,
2817  boost::str(
2818  boost::format("No eventHandler for invocation. "
2819  "Ignoring this invocation response. %1%") %
2820  *invocation));
2821  }
2822 
2823  return;
2824  }
2825 
2826  try {
2827  eventHandler->handle(*response);
2828  } catch (std::exception& e) {
2829  if (client_context_.get_lifecycle_service().is_running()) {
2830  HZ_LOG(
2831  logger_,
2832  warning,
2833  boost::str(boost::format("Delivery of event message to event "
2834  "handler failed. %1%, %2%, %3%") %
2835  e.what() % *response % *invocation));
2836  }
2837  }
2838 }
2839 
2840 listener_service_impl::~listener_service_impl() = default;
2841 
2842 void
2843 cluster_view_listener::start()
2844 {
2845  client_context_.get_connection_manager().add_connection_listener(
2846  shared_from_this());
2847 }
2848 
2849 void
2850 cluster_view_listener::connection_added(
2851  const std::shared_ptr<connection::Connection> connection)
2852 {
2853  try_register(connection);
2854 }
2855 
2856 void
2857 cluster_view_listener::connection_removed(
2858  const std::shared_ptr<connection::Connection> connection)
2859 {
2860  try_reregister_to_random_connection(connection->get_connection_id());
2861 }
2862 
2863 cluster_view_listener::cluster_view_listener(ClientContext& client_context)
2864  : client_context_(client_context)
2865 {}
2866 
2867 void
2868 cluster_view_listener::try_register(
2869  std::shared_ptr<connection::Connection> connection)
2870 {
2871  int32_t expected_id = -1;
2872  if (!listener_added_connection_id_.compare_exchange_strong(
2873  expected_id, connection->get_connection_id())) {
2874  // already registering/registered to another connection
2875  return;
2876  }
2877 
2878  auto invocation = ClientInvocation::create(
2879  client_context_,
2880  std::make_shared<protocol::ClientMessage>(
2881  protocol::codec::client_addclusterviewlistener_encode()),
2882  "",
2883  connection);
2884 
2885  auto handler =
2886  std::make_shared<event_handler>(connection->get_connection_id(), *this);
2887  invocation->set_event_handler(handler);
2888  handler->before_listener_register();
2889 
2890  std::weak_ptr<cluster_view_listener> weak_self = shared_from_this();
2891  auto conn_id = connection->get_connection_id();
2892 
2893  invocation->invoke_urgent().then(
2894  [weak_self, handler, conn_id](boost::future<protocol::ClientMessage> f) {
2895  auto self = weak_self.lock();
2896  if (!self)
2897  return;
2898 
2899  if (f.has_value()) {
2900  handler->on_listener_register();
2901  return;
2902  }
2903 
2904  // completes with exception, listener needs to be reregistered
2905  self->try_reregister_to_random_connection(conn_id);
2906  });
2907 }
2908 
2909 void
2910 cluster_view_listener::try_reregister_to_random_connection(
2911  int32_t old_connection_id)
2912 {
2913  if (!listener_added_connection_id_.compare_exchange_strong(
2914  old_connection_id, -1)) {
2915  // somebody else already trying to reregister
2916  return;
2917  }
2918  auto new_connection =
2919  client_context_.get_connection_manager().get_random_connection();
2920  if (new_connection) {
2921  try_register(new_connection);
2922  }
2923 }
2924 
2925 cluster_view_listener::~cluster_view_listener() = default;
2926 
2927 void
2928 cluster_view_listener::event_handler::handle_membersview(
2929  int32_t version,
2930  const std::vector<member>& member_infos)
2931 {
2932  view_listener.client_context_.get_client_cluster_service().handle_event(
2933  version, member_infos);
2934 }
2935 
2936 void
2937 cluster_view_listener::event_handler::handle_partitionsview(
2938  int32_t version,
2939  const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>>&
2940  partitions)
2941 {
2942  view_listener.client_context_.get_partition_service().handle_event(
2943  connection_id, version, partitions);
2944 }
2945 
2946 void
2947 cluster_view_listener::event_handler::before_listener_register()
2948 {
2949  view_listener.client_context_.get_client_cluster_service()
2950  .clear_member_list_version();
2951  auto& lg = view_listener.client_context_.get_logger();
2952  HZ_LOG(
2953  lg,
2954  finest,
2955  boost::str(boost::format(
2956  "Register attempt of cluster_view_listener::event_handler "
2957  "to connection with id %1%") %
2958  connection_id));
2959 }
2960 
2961 void
2962 cluster_view_listener::event_handler::on_listener_register()
2963 {
2964  auto& lg = view_listener.client_context_.get_logger();
2965  HZ_LOG(lg,
2966  finest,
2967  boost::str(
2968  boost::format("Registered cluster_view_listener::event_handler to "
2969  "connection with id %1%") %
2970  connection_id));
2971 }
2972 
2973 cluster_view_listener::event_handler::event_handler(
2974  int connectionId,
2975  cluster_view_listener& viewListener)
2976  : connection_id(connectionId)
2977  , view_listener(viewListener)
2978 {}
2979 } // namespace listener
2980 
2981 protocol::ClientMessage
2982 ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_add_request(
2983  bool local_only) const
2984 {
2985  return protocol::codec::client_localbackuplistener_encode();
2986 }
2987 
2988 protocol::ClientMessage
2989 ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_remove_request(
2990  boost::uuids::uuid real_registration_id) const
2991 {
2992  assert(0);
2993  return protocol::ClientMessage(0);
2994 }
2995 
2996 void
2997 ClientInvocationServiceImpl::noop_backup_event_handler::handle_backup(
2998  int64_t source_invocation_correlation_id)
2999 {
3000  assert(0);
3001 }
3002 
3003 namespace discovery {
3004 remote_address_provider::remote_address_provider(
3005  std::function<std::unordered_map<address, address>()> addr_map_method,
3006  bool use_public)
3007  : refresh_address_map_(std::move(addr_map_method))
3008  , use_public_(use_public)
3009 {}
3010 
3011 std::vector<address>
3012 remote_address_provider::load_addresses()
3013 {
3014  auto address_map = refresh_address_map_();
3015  std::lock_guard<std::mutex> guard(lock_);
3016  private_to_public_ = address_map;
3017  std::vector<address> addresses;
3018  addresses.reserve(address_map.size());
3019  for (const auto& addr_pair : address_map) {
3020  addresses.push_back(addr_pair.first);
3021  }
3022  return addresses;
3023 }
3024 
3025 boost::optional<address>
3026 remote_address_provider::translate(const address& addr)
3027 {
3028  // if it is inside cloud, return private address otherwise we need to
3029  // translate it.
3030  if (!use_public_) {
3031  return addr;
3032  }
3033 
3034  {
3035  std::lock_guard<std::mutex> guard(lock_);
3036  auto found = private_to_public_.find(addr);
3037  if (found != private_to_public_.end()) {
3038  return found->second;
3039  }
3040  }
3041 
3042  auto address_map = refresh_address_map_();
3043 
3044  std::lock_guard<std::mutex> guard(lock_);
3045  private_to_public_ = address_map;
3046 
3047  auto found = private_to_public_.find(addr);
3048  if (found != private_to_public_.end()) {
3049  return found->second;
3050  }
3051 
3052  return boost::none;
3053 }
3054 
3055 #ifdef HZ_BUILD_WITH_SSL
3056 cloud_discovery::cloud_discovery(config::cloud_config& config,
3057  std::string cloud_base_url,
3058  std::chrono::steady_clock::duration timeout)
3059  : cloud_config_(config)
3060  , cloud_base_url_(cloud_base_url)
3061  , timeout_(timeout)
3062 {}
3063 #else
3064 cloud_discovery::cloud_discovery(config::cloud_config& config,
3065  std::string cloud_base_url,
3066  std::chrono::steady_clock::duration timeout)
3067 {}
3068 #endif // HZ_BUILD_WITH_SSL
3069 
3070 std::unordered_map<address, address>
3071 cloud_discovery::get_addresses()
3072 {
3073 #ifdef HZ_BUILD_WITH_SSL
3074  try {
3075  util::SyncHttpsClient httpsConnection(cloud_base_url_,
3076  std::string(CLOUD_URL_PATH) +
3077  cloud_config_.discovery_token,
3078  timeout_,
3079  cloud_config_.discovery_token);
3080  auto& conn_stream = httpsConnection.connect_and_get_response();
3081  return parse_json_response(conn_stream);
3082  } catch (std::exception& e) {
3083  std::throw_with_nested(
3084  boost::enable_current_exception(exception::illegal_state(
3085  "cloud_discovery::get_addresses", e.what())));
3086  }
3087 #else
3088  util::Preconditions::check_ssl("cloud_discovery::get_addresses");
3089  return std::unordered_map<address, address>();
3090 #endif
3091 }
3092 
3093 std::unordered_map<address, address>
3094 cloud_discovery::parse_json_response(std::istream& conn_stream)
3095 {
3096  namespace pt = boost::property_tree;
3097 
3098  pt::ptree root;
3099  pt::read_json(conn_stream, root);
3100 
3101  std::unordered_map<address, address> addresses;
3102  for (const auto& item : root) {
3103  auto private_address =
3104  item.second.get<std::string>(PRIVATE_ADDRESS_PROPERTY);
3105  auto public_address =
3106  item.second.get<std::string>(PUBLIC_ADDRESS_PROPERTY);
3107 
3108  address public_addr = create_address(public_address, -1);
3109  // if it is not explicitly given, create the private address with public
3110  // addresses port
3111  auto private_addr =
3112  create_address(private_address, public_addr.get_port());
3113  addresses.emplace(std::move(private_addr), std::move(public_addr));
3114  }
3115 
3116  return addresses;
3117 }
3118 
3119 address
3120 cloud_discovery::create_address(const std::string& hostname, int default_port)
3121 {
3122  auto address_holder =
3123  util::AddressUtil::get_address_holder(hostname, default_port);
3124  auto scoped_hostname =
3125  util::AddressHelper::get_scoped_hostname(address_holder);
3126  return address(std::move(scoped_hostname), address_holder.get_port());
3127 }
3128 } // namespace discovery
3129 } // namespace impl
3130 } // namespace spi
3131 } // namespace client
3132 } // namespace hazelcast
3133 
3134 namespace std {
3135 bool
3136 less<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
3137  const hazelcast::client::spi::DefaultObjectNamespace& lhs,
3138  const hazelcast::client::spi::DefaultObjectNamespace& rhs) const
3139 {
3140  int result = lhs.get_service_name().compare(rhs.get_service_name());
3141  if (result < 0) {
3142  return true;
3143  }
3144 
3145  if (result > 0) {
3146  return false;
3147  }
3148 
3149  return lhs.get_object_name().compare(rhs.get_object_name()) < 0;
3150 }
3151 
3152 std::size_t
3153 hash<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
3154  const hazelcast::client::spi::DefaultObjectNamespace& k) const noexcept
3155 {
3156  return std::hash<std::string>()(k.get_service_name() + k.get_object_name());
3157 }
3158 } // namespace std
Hazelcast cluster interface.
Definition: cluster.h:37
const std::unordered_set< member > & get_members() const
Returns an immutable set of ordered members at the moment this MembershipListener is registered.
Definition: spi.cpp:61
cluster & get_cluster()
Returns the cluster of the event.
Definition: spi.cpp:67
lifecycle_state get_state() const
Definition: spi.cpp:84
lifecycle_event(lifecycle_state state)
Constructor.
Definition: spi.cpp:79