19 #include <boost/uuid/uuid_hash.hpp>
20 #include <boost/functional/hash.hpp>
21 #include <boost/property_tree/ptree.hpp>
22 #include <boost/property_tree/json_parser.hpp>
24 #include "hazelcast/client/hazelcast_client.h"
25 #include <hazelcast/client/protocol/codec/ErrorCodec.h>
26 #include <hazelcast/client/spi/impl/ListenerMessageCodec.h>
27 #include <hazelcast/client/spi/impl/ClientClusterServiceImpl.h>
28 #include <hazelcast/client/spi/impl/listener/cluster_view_listener.h>
29 #include <hazelcast/client/spi/impl/listener/listener_service_impl.h>
30 #include <hazelcast/client/spi/impl/discovery/remote_address_provider.h>
31 #include <hazelcast/client/spi/impl/discovery/cloud_discovery.h>
32 #include <hazelcast/util/AddressUtil.h>
33 #include "hazelcast/client/member_selectors.h"
34 #include "hazelcast/client/lifecycle_event.h"
35 #include "hazelcast/client/initial_membership_event.h"
36 #include "hazelcast/client/membership_event.h"
37 #include "hazelcast/client/lifecycle_listener.h"
38 #include "hazelcast/client/spi/ProxyManager.h"
39 #include "hazelcast/client/spi/ClientProxy.h"
40 #include "hazelcast/client/spi/ClientContext.h"
41 #include "hazelcast/client/spi/impl/ClientInvocation.h"
42 #include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
43 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
44 #include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
45 #include "hazelcast/client/spi/impl/DefaultAddressProvider.h"
46 #include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithBackpressure.h"
47 #include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithoutBackpressure.h"
48 #include "hazelcast/client/spi/impl/sequence/FailFastCallIdSequence.h"
49 #include "hazelcast/client/spi/impl/sequence/CallIdFactory.h"
50 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
51 #include "hazelcast/util/AddressHelper.h"
52 #include "hazelcast/util/HashUtil.h"
53 #include "hazelcast/util/concurrent/BackoffIdleStrategy.h"
54 #ifdef HZ_BUILD_WITH_SSL
55 #include <hazelcast/util/SyncHttpsClient.h>
60 const std::unordered_set<member>&
72 initial_membership_event::initial_membership_event(
74 std::unordered_set<member> members)
76 , members_(std::move(members))
90 ProxyManager::ProxyManager(ClientContext& context)
99 ProxyManager::destroy()
101 std::lock_guard<std::recursive_mutex> guard(lock_);
102 for (
auto& p : proxies_) {
104 auto proxy = p.second.get();
105 p.second.get()->on_shutdown();
106 }
catch (std::exception& se) {
107 auto& lg = client_.get_logger();
111 boost::str(boost::format(
112 "Proxy was not created, "
113 "hence onShutdown can be called. Exception: %1%") %
121 ProxyManager::initialize(
const std::shared_ptr<ClientProxy>& client_proxy)
123 auto clientMessage = protocol::codec::client_createproxy_encode(
124 client_proxy->get_name(), client_proxy->get_service_name());
125 return spi::impl::ClientInvocation::create(
126 client_, clientMessage, client_proxy->get_service_name())
128 .then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
130 client_proxy->on_initialize();
135 ProxyManager::destroy_proxy(ClientProxy& proxy)
137 DefaultObjectNamespace objectNamespace(proxy.get_service_name(),
139 std::shared_ptr<ClientProxy> registeredProxy;
141 std::lock_guard<std::recursive_mutex> guard(lock_);
142 auto it = proxies_.find(objectNamespace);
143 registeredProxy = it == proxies_.end() ? nullptr : it->second.get();
144 if (it != proxies_.end()) {
150 if (registeredProxy) {
152 proxy.destroy_locally();
153 return proxy.destroy_remotely();
154 }
catch (exception::iexception&) {
155 proxy.destroy_remotely();
159 if (&proxy != registeredProxy.get()) {
165 proxy.destroy_locally();
168 if (&proxy != registeredProxy.get()) {
174 proxy.destroy_locally();
178 return boost::make_ready_future();
181 ClientContext::ClientContext(
const client::hazelcast_client& hazelcast_client)
182 : hazelcast_client_(*hazelcast_client.client_impl_)
185 ClientContext::ClientContext(
186 client::impl::hazelcast_client_instance_impl& hazelcast_client)
187 : hazelcast_client_(hazelcast_client)
190 serialization::pimpl::SerializationService&
191 ClientContext::get_serialization_service()
193 return hazelcast_client_.serialization_service_;
196 impl::ClientClusterServiceImpl&
197 ClientContext::get_client_cluster_service()
199 return hazelcast_client_.cluster_service_;
202 impl::ClientInvocationServiceImpl&
203 ClientContext::get_invocation_service()
205 return *hazelcast_client_.invocation_service_;
209 ClientContext::get_client_config()
211 return hazelcast_client_.client_config_;
214 impl::ClientPartitionServiceImpl&
215 ClientContext::get_partition_service()
217 return *hazelcast_client_.partition_service_;
221 ClientContext::get_lifecycle_service()
223 return hazelcast_client_.lifecycle_service_;
226 spi::impl::listener::listener_service_impl&
227 ClientContext::get_client_listener_service()
229 return *hazelcast_client_.listener_service_;
232 connection::ClientConnectionManagerImpl&
233 ClientContext::get_connection_manager()
235 return *hazelcast_client_.connection_manager_;
238 internal::nearcache::NearCacheManager&
239 ClientContext::get_near_cache_manager()
241 return *hazelcast_client_.near_cache_manager_;
245 ClientContext::get_client_properties()
247 return hazelcast_client_.client_properties_;
251 ClientContext::get_cluster()
253 return hazelcast_client_.cluster_;
256 std::shared_ptr<impl::sequence::CallIdSequence>&
257 ClientContext::get_call_id_sequence()
const
259 return hazelcast_client_.call_id_sequence_;
262 const protocol::ClientExceptionFactory&
263 ClientContext::get_client_exception_factory()
const
265 return hazelcast_client_.get_exception_factory();
269 ClientContext::get_name()
const
271 return hazelcast_client_.get_name();
274 impl::ClientExecutionServiceImpl&
275 ClientContext::get_client_execution_service()
const
277 return *hazelcast_client_.execution_service_;
280 const std::shared_ptr<client::impl::ClientLockReferenceIdGenerator>&
281 ClientContext::get_lock_reference_id_generator()
283 return hazelcast_client_.get_lock_reference_id_generator();
286 std::shared_ptr<client::impl::hazelcast_client_instance_impl>
287 ClientContext::get_hazelcast_client_implementation()
289 return hazelcast_client_.shared_from_this();
293 ClientContext::get_proxy_manager()
295 return hazelcast_client_.get_proxy_manager();
299 ClientContext::get_logger()
301 return *hazelcast_client_.logger_;
304 client::impl::statistics::Statistics&
305 ClientContext::get_clientstatistics()
307 return *hazelcast_client_.statistics_;
310 spi::impl::listener::cluster_view_listener&
311 ClientContext::get_cluster_view_listener()
313 return *hazelcast_client_.cluster_listener_;
317 ClientContext::random_uuid()
319 return hazelcast_client_.random_uuid();
322 cp::internal::session::proxy_session_manager&
323 ClientContext::get_proxy_session_manager()
325 return hazelcast_client_.proxy_session_manager_;
328 lifecycle_service::lifecycle_service(
329 ClientContext& client_context,
330 const std::vector<lifecycle_listener>& listeners)
331 : client_context_(client_context)
333 , shutdown_completed_latch_(1)
335 for (
const auto& listener : listeners) {
336 add_listener(lifecycle_listener(listener));
341 lifecycle_service::start()
343 bool expected =
false;
344 if (!active_.compare_exchange_strong(expected,
true)) {
348 fire_lifecycle_event(lifecycle_event::STARTED);
350 client_context_.get_client_execution_service().start();
352 client_context_.get_client_listener_service().start();
354 client_context_.get_invocation_service().start();
356 client_context_.get_client_cluster_service().start();
358 client_context_.get_cluster_view_listener().start();
360 if (!client_context_.get_connection_manager().start()) {
364 auto& connectionStrategyConfig =
365 client_context_.get_client_config().get_connection_strategy_config();
366 if (!connectionStrategyConfig.is_async_start()) {
369 wait_for_initial_membership_event();
370 client_context_.get_connection_manager()
371 .connect_to_all_cluster_members();
374 client_context_.get_invocation_service().add_backup_listener();
376 client_context_.get_clientstatistics().start();
382 lifecycle_service::shutdown()
384 bool expected =
true;
385 if (!active_.compare_exchange_strong(expected,
false)) {
386 shutdown_completed_latch_.wait();
390 fire_lifecycle_event(lifecycle_event::SHUTTING_DOWN);
391 client_context_.get_proxy_session_manager().shutdown();
392 client_context_.get_clientstatistics().shutdown();
393 client_context_.get_proxy_manager().destroy();
394 client_context_.get_connection_manager().shutdown();
395 client_context_.get_client_cluster_service().shutdown();
396 client_context_.get_invocation_service().shutdown();
397 client_context_.get_client_listener_service().shutdown();
398 client_context_.get_near_cache_manager().destroy_all_near_caches();
399 fire_lifecycle_event(lifecycle_event::SHUTDOWN);
400 client_context_.get_client_execution_service().shutdown();
401 client_context_.get_serialization_service().dispose();
402 shutdown_completed_latch_.count_down();
403 }
catch (std::exception& e) {
405 client_context_.get_logger(),
409 "An exception occured during LifecycleService shutdown. %1%") %
411 shutdown_completed_latch_.count_down();
416 lifecycle_service::add_listener(lifecycle_listener&& lifecycle_listener)
418 std::lock_guard<std::mutex> lg(listener_lock_);
419 const auto id = uuid_generator_();
420 listeners_.emplace(
id, std::move(lifecycle_listener));
425 lifecycle_service::remove_listener(
const boost::uuids::uuid& registration_id)
427 std::lock_guard<std::mutex> guard(listener_lock_);
428 return listeners_.erase(registration_id) == 1;
432 lifecycle_service::fire_lifecycle_event(
const lifecycle_event& lifecycle_event)
434 std::lock_guard<std::mutex> guard(listener_lock_);
435 logger& lg = client_context_.get_logger();
437 std::function<void(lifecycle_listener&)> fire_one;
439 switch (lifecycle_event.get_state()) {
440 case lifecycle_event::STARTING: {
442 std::string date(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_DATE));
443 util::git_date_to_hazelcast_log_date(date);
444 std::string commitId(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_ID));
445 commitId.erase(std::remove(commitId.begin(), commitId.end(),
'"'),
450 (boost::format(
"(%1%:%2%) LifecycleService::LifecycleEvent "
451 "Client (%3%) is STARTING") %
453 client_context_.get_connection_manager().get_client_uuid())
459 "(%s:%s) LifecycleService::LifecycleEvent STARTING",
462 HZ_LOG(lg, info, msg);
464 fire_one = [](lifecycle_listener& listener) {
465 listener.starting_();
469 case lifecycle_event::STARTED: {
470 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent STARTED");
472 fire_one = [](lifecycle_listener& listener) {
477 case lifecycle_event::SHUTTING_DOWN: {
478 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent SHUTTING_DOWN");
480 fire_one = [](lifecycle_listener& listener) {
481 listener.shutting_down_();
485 case lifecycle_event::SHUTDOWN: {
486 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent SHUTDOWN");
488 fire_one = [](lifecycle_listener& listener) {
489 listener.shutdown_();
493 case lifecycle_event::CLIENT_CONNECTED: {
495 lg, info,
"LifecycleService::LifecycleEvent CLIENT_CONNECTED");
497 fire_one = [](lifecycle_listener& listener) {
498 listener.connected_();
502 case lifecycle_event::CLIENT_DISCONNECTED: {
504 lg, info,
"LifecycleService::LifecycleEvent CLIENT_DISCONNECTED");
506 fire_one = [](lifecycle_listener& listener) {
507 listener.disconnected_();
513 for (
auto& item : listeners_) {
514 fire_one(item.second);
519 lifecycle_service::is_running()
524 lifecycle_service::~lifecycle_service()
532 lifecycle_service::wait_for_initial_membership_event()
const
534 client_context_.get_client_cluster_service()
535 .wait_initial_member_list_fetched();
538 DefaultObjectNamespace::DefaultObjectNamespace(
const std::string& service,
539 const std::string&
object)
540 : service_name_(service)
541 , object_name_(object)
545 DefaultObjectNamespace::get_service_name()
const
547 return service_name_;
551 DefaultObjectNamespace::get_object_name()
const
557 DefaultObjectNamespace::operator==(
const DefaultObjectNamespace& rhs)
const
559 return service_name_ == rhs.service_name_ &&
560 object_name_ == rhs.object_name_;
563 ClientProxy::ClientProxy(
const std::string& name,
564 const std::string& service_name,
565 ClientContext& context)
567 , service_name_(service_name)
571 ClientProxy::~ClientProxy() =
default;
574 ClientProxy::get_name()
const
580 ClientProxy::get_service_name()
const
582 return service_name_;
586 ClientProxy::get_context()
592 ClientProxy::on_destroy()
596 ClientProxy::destroy()
598 return get_context().get_proxy_manager().destroy_proxy(*
this);
602 ClientProxy::destroy_locally()
608 }
catch (exception::iexception&) {
616 ClientProxy::pre_destroy()
622 ClientProxy::post_destroy()
626 ClientProxy::on_initialize()
630 ClientProxy::on_shutdown()
633 serialization::pimpl::SerializationService&
634 ClientProxy::get_serialization_service()
636 return context_.get_serialization_service();
640 ClientProxy::destroy_remotely()
642 auto clientMessage = protocol::codec::client_destroyproxy_encode(
643 get_name(), get_service_name());
644 return spi::impl::ClientInvocation::create(
646 std::make_shared<protocol::ClientMessage>(
647 std::move(clientMessage)),
650 .then(boost::launch::sync,
651 [](boost::future<protocol::ClientMessage> f) { f.get(); });
654 boost::future<boost::uuids::uuid>
655 ClientProxy::register_listener(
656 std::shared_ptr<impl::ListenerMessageCodec> listener_message_codec,
657 std::shared_ptr<client::impl::BaseEventHandler> handler)
659 handler->set_logger(&get_context().get_logger());
660 return get_context().get_client_listener_service().register_listener(
661 listener_message_codec, handler);
665 ClientProxy::deregister_listener(boost::uuids::uuid registration_id)
667 return get_context().get_client_listener_service().deregister_listener(
673 ListenerMessageCodec::decode_add_response(protocol::ClientMessage& msg)
const
675 return msg.get_first_uuid();
679 ListenerMessageCodec::decode_remove_response(protocol::ClientMessage& msg)
const
681 return msg.get_first_fixed_sized_field<
bool>();
684 ClientInvocationServiceImpl::ClientInvocationServiceImpl(ClientContext& client)
686 , logger_(client.get_logger())
687 , invocation_timeout_(
688 std::chrono::seconds(client.get_client_properties().get_integer(
689 client.get_client_properties().get_invocation_timeout_seconds())))
690 , invocation_retry_pause_(
691 std::chrono::milliseconds(client.get_client_properties().get_long(
692 client.get_client_properties().get_invocation_retry_pause_millis())))
694 client.get_client_config().get_network_config().is_smart_routing())
695 , backup_acks_enabled_(smart_routing_ &&
696 client.get_client_config().backup_acks_enabled())
697 , fail_on_indeterminate_operation_state_(
698 client.get_client_properties().get_boolean(
699 client.get_client_properties().fail_on_indeterminate_state()))
701 std::chrono::milliseconds(client.get_client_properties().get_integer(
702 client.get_client_properties().backup_timeout_millis())))
706 ClientInvocationServiceImpl::start()
710 ClientInvocationServiceImpl::add_backup_listener()
712 if (this->backup_acks_enabled_) {
713 auto& listener_service = this->client_.get_client_listener_service();
715 .register_listener(std::make_shared<BackupListenerMessageCodec>(),
716 std::make_shared<noop_backup_event_handler>())
722 ClientInvocationServiceImpl::shutdown()
724 is_shutdown_.store(
true);
727 std::chrono::milliseconds
728 ClientInvocationServiceImpl::get_invocation_timeout()
const
730 return invocation_timeout_;
733 std::chrono::milliseconds
734 ClientInvocationServiceImpl::get_invocation_retry_pause()
const
736 return invocation_retry_pause_;
740 ClientInvocationServiceImpl::is_redo_operation()
742 return client_.get_client_config().is_redo_operation();
746 ClientInvocationServiceImpl::handle_client_message(
747 const std::shared_ptr<ClientInvocation>& invocation,
748 const std::shared_ptr<protocol::ClientMessage>& response)
751 if (protocol::codec::ErrorCodec::EXCEPTION_MESSAGE_TYPE ==
752 response->get_message_type()) {
753 auto error_holder = protocol::codec::ErrorCodec::decode(*response);
754 invocation->notify_exception(
755 client_.get_client_exception_factory().create_exception(
758 invocation->notify(response);
760 }
catch (std::exception& e) {
764 boost::str(boost::format(
"Failed to process response for %1%. %2%") %
765 *invocation % e.what()));
770 ClientInvocationServiceImpl::send(
771 const std::shared_ptr<impl::ClientInvocation>& invocation,
772 const std::shared_ptr<connection::Connection>& connection)
775 BOOST_THROW_EXCEPTION(exception::hazelcast_client_not_active(
776 "ClientInvocationServiceImpl::send",
"Client is shut down"));
779 if (backup_acks_enabled_) {
780 invocation->get_client_message()->add_flag(
781 protocol::ClientMessage::BACKUP_AWARE_FLAG);
784 write_to_connection(*connection, invocation);
785 invocation->set_send_connection(connection);
790 ClientInvocationServiceImpl::write_to_connection(
791 connection::Connection& connection,
792 const std::shared_ptr<ClientInvocation>& client_invocation)
794 auto clientMessage = client_invocation->get_client_message();
795 connection.write(client_invocation);
799 ClientInvocationServiceImpl::check_invocation_allowed()
801 client_.get_connection_manager().check_invocation_allowed();
805 ClientInvocationServiceImpl::invoke(
806 std::shared_ptr<ClientInvocation> invocation)
808 auto connection = client_.get_connection_manager().get_random_connection();
810 HZ_LOG(logger_, finest,
"No connection found to invoke");
813 return send(invocation, connection);
816 DefaultAddressProvider::DefaultAddressProvider(
817 config::client_network_config& network_config)
818 : network_config_(network_config)
822 DefaultAddressProvider::load_addresses()
824 std::vector<address> addresses = network_config_.get_addresses();
825 if (addresses.empty()) {
826 addresses.emplace_back(
"127.0.0.1", 5701);
835 boost::optional<address>
836 DefaultAddressProvider::translate(
const address& addr)
842 DefaultAddressProvider::is_default_provider()
847 const boost::shared_ptr<ClientClusterServiceImpl::member_list_snapshot>
848 ClientClusterServiceImpl::EMPTY_SNAPSHOT(
849 new ClientClusterServiceImpl::member_list_snapshot{ -1 });
851 constexpr boost::chrono::milliseconds
852 ClientClusterServiceImpl::INITIAL_MEMBERS_TIMEOUT;
853 const endpoint_qualifier ClientClusterServiceImpl::CLIENT{ 1,
"" };
854 const endpoint_qualifier ClientClusterServiceImpl::MEMBER{ 0,
"" };
856 ClientClusterServiceImpl::ClientClusterServiceImpl(
857 hazelcast::client::spi::ClientContext& client)
859 , member_list_snapshot_(EMPTY_SNAPSHOT)
860 , labels_(client.get_client_config().get_labels())
861 , initial_list_fetched_latch_(1)
865 ClientClusterServiceImpl::add_membership_listener_without_init(
866 membership_listener&& listener)
868 std::lock_guard<std::mutex> g(listeners_lock_);
869 auto id = client_.random_uuid();
870 listeners_.emplace(
id, std::move(listener));
874 boost::optional<member>
875 ClientClusterServiceImpl::get_member(boost::uuids::uuid uuid)
const
877 assert(!uuid.is_nil());
878 auto members_view_ptr = member_list_snapshot_.load();
879 const auto it = members_view_ptr->members.find(uuid);
880 if (it == members_view_ptr->members.end()) {
883 return { it->second };
887 ClientClusterServiceImpl::get_member_list()
const
889 auto members_view_ptr = member_list_snapshot_.load();
890 std::vector<member> result;
891 result.reserve(members_view_ptr->members.size());
892 for (
const auto& e : members_view_ptr->members) {
893 result.emplace_back(e.second);
899 ClientClusterServiceImpl::start()
901 for (
auto& listener :
902 client_.get_client_config().get_membership_listeners()) {
903 add_membership_listener(membership_listener(listener));
908 ClientClusterServiceImpl::fire_initial_membership_event(
909 const initial_membership_event& event)
911 std::lock_guard<std::mutex> g(listeners_lock_);
913 for (
auto& item : listeners_) {
914 membership_listener& listener = item.second;
915 if (listener.init_) {
916 listener.init_(event);
922 ClientClusterServiceImpl::shutdown()
924 initial_list_fetched_latch_.try_count_down();
928 ClientClusterServiceImpl::add_membership_listener(
929 membership_listener&& listener)
931 std::lock_guard<std::mutex> cluster_view_g(cluster_view_lock_);
933 auto id = add_membership_listener_without_init(std::move(listener));
935 std::lock_guard<std::mutex> listeners_g(listeners_lock_);
936 auto added_listener = listeners_[id];
938 if (added_listener.init_) {
939 auto& cluster = client_.get_cluster();
940 auto members_ptr = member_list_snapshot_.load();
941 if (!members_ptr->members.empty()) {
942 std::unordered_set<member> members;
943 for (
const auto& e : members_ptr->members) {
944 members.insert(e.second);
946 added_listener.init_(initial_membership_event(cluster, members));
954 ClientClusterServiceImpl::remove_membership_listener(
955 boost::uuids::uuid registration_id)
957 std::lock_guard<std::mutex> g(listeners_lock_);
958 return listeners_.erase(registration_id) == 1;
962 ClientClusterServiceImpl::get_members(
const member_selector& selector)
const
964 std::vector<member> result;
965 for (
auto&& member : get_member_list()) {
966 if (selector.select(member)) {
967 result.emplace_back(std::move(member));
975 ClientClusterServiceImpl::get_local_client()
const
977 connection::ClientConnectionManagerImpl& cm =
978 client_.get_connection_manager();
979 auto connection = cm.get_random_connection();
980 auto inetSocketAddress =
981 connection ? connection->get_local_socket_address() : boost::none;
982 auto uuid = cm.get_client_uuid();
983 return local_endpoint(
984 uuid, std::move(inetSocketAddress), client_.get_name(), labels_);
988 ClientClusterServiceImpl::clear_member_list_version()
990 std::lock_guard<std::mutex> g(cluster_view_lock_);
991 auto& lg = client_.get_logger();
992 HZ_LOG(lg, finest,
"Resetting the member list version ");
993 auto cluster_view_snapshot = member_list_snapshot_.load();
996 if (cluster_view_snapshot != EMPTY_SNAPSHOT) {
997 member_list_snapshot_.store(boost::shared_ptr<member_list_snapshot>(
998 new member_list_snapshot{ 0, cluster_view_snapshot->members }));
1002 std::vector<membership_event>
1003 ClientClusterServiceImpl::clear_member_list_and_return_events()
1005 std::lock_guard<std::mutex> g(cluster_view_lock_);
1007 auto& lg = client_.get_logger();
1008 HZ_LOG(lg, finest,
"Resetting the member list");
1010 auto previous_list = member_list_snapshot_.load()->members;
1012 member_list_snapshot_.store(
1013 boost::shared_ptr<member_list_snapshot>(
new member_list_snapshot{ 0 }));
1015 return detect_membership_events(
1017 std::unordered_map<boost::uuids::uuid,
1019 boost::hash<boost::uuids::uuid>>());
1023 ClientClusterServiceImpl::clear_member_list()
1025 auto events = clear_member_list_and_return_events();
1026 fire_events(std::move(events));
1030 ClientClusterServiceImpl::handle_event(int32_t version,
1031 const std::vector<member>& member_infos)
1033 auto& lg = client_.get_logger();
1038 boost::format(
"Handling new snapshot with membership version: %1%, "
1039 "membersString %2%") %
1040 version % members_string(create_snapshot(version, member_infos))));
1041 auto cluster_view_snapshot = member_list_snapshot_.load();
1042 if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
1043 std::lock_guard<std::mutex> g(cluster_view_lock_);
1044 cluster_view_snapshot = member_list_snapshot_.load();
1045 if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
1047 apply_initial_state(version, member_infos);
1048 initial_list_fetched_latch_.count_down();
1053 std::vector<membership_event> events;
1054 if (version >= cluster_view_snapshot->version) {
1055 std::lock_guard<std::mutex> g(cluster_view_lock_);
1056 cluster_view_snapshot = member_list_snapshot_.load();
1057 if (version >= cluster_view_snapshot->version) {
1058 auto prev_members = cluster_view_snapshot->members;
1059 auto snapshot = boost::make_shared<member_list_snapshot>(
1060 create_snapshot(version, member_infos));
1061 member_list_snapshot_.store(snapshot);
1062 events = detect_membership_events(prev_members, snapshot->members);
1066 fire_events(std::move(events));
1069 ClientClusterServiceImpl::member_list_snapshot
1070 ClientClusterServiceImpl::create_snapshot(int32_t version,
1071 const std::vector<member>& members)
1073 member_list_snapshot result;
1074 result.version = version;
1075 for (
auto& m : members) {
1076 auto const& address_map = m.address_map();
1077 if (address_map.empty()) {
1078 result.members.insert({ m.get_uuid(), m });
1080 auto found = address_map.find(CLIENT);
1081 address member_address;
1082 if (found != address_map.end()) {
1083 member_address = found->second;
1085 found = address_map.find(MEMBER);
1086 assert(found != address_map.end());
1087 member_address = found->second;
1089 member new_member(member_address,
1095 result.members.emplace(new_member.get_uuid(),
1096 std::move(new_member));
1104 ClientClusterServiceImpl::members_string(
1105 const ClientClusterServiceImpl::member_list_snapshot& snapshot)
1107 std::stringstream out;
1108 auto const& members = snapshot.members;
1109 out << std::endl << std::endl <<
"Members [" << members.size() <<
"] {";
1110 for (
auto const& e : members) {
1111 out << std::endl <<
"\t" << e.second;
1113 out << std::endl <<
"}" << std::endl;
1118 ClientClusterServiceImpl::apply_initial_state(
1120 const std::vector<member>& member_infos)
1122 auto snapshot = boost::make_shared<member_list_snapshot>(
1123 create_snapshot(version, member_infos));
1124 member_list_snapshot_.store(snapshot);
1125 HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
1126 std::unordered_set<member> members;
1127 for (
auto const& e : snapshot->members) {
1128 members.insert(e.second);
1130 std::lock_guard<std::mutex> g(listeners_lock_);
1131 for (
auto& item : listeners_) {
1132 membership_listener& listener = item.second;
1133 if (listener.init_) {
1135 initial_membership_event(client_.get_cluster(), members));
1140 std::vector<membership_event>
1141 ClientClusterServiceImpl::detect_membership_events(
1142 std::unordered_map<boost::uuids::uuid,
1144 boost::hash<boost::uuids::uuid>> previous_members,
1145 const std::unordered_map<boost::uuids::uuid,
1147 boost::hash<boost::uuids::uuid>>& current_members)
1149 std::vector<member> new_members;
1151 for (
auto const& e : current_members) {
1152 if (!previous_members.erase(e.first)) {
1153 new_members.emplace_back(e.second);
1157 std::vector<membership_event> events;
1160 for (
auto const& e : previous_members) {
1161 events.emplace_back(
1162 client_.get_cluster(),
1164 membership_event::membership_event_type::MEMBER_LEFT,
1167 client_.get_connection_manager().get_connection(e.second.get_uuid());
1171 std::make_exception_ptr(exception::target_disconnected(
1172 "ClientClusterServiceImpl::detect_membership_events",
1174 "The client has closed the connection to this member, after "
1175 "receiving a member left event from the cluster. %1%") %
1180 for (
auto const& member : new_members) {
1181 events.emplace_back(
1182 client_.get_cluster(),
1184 membership_event::membership_event_type::MEMBER_JOINED,
1188 if (!events.empty()) {
1189 auto snapshot = member_list_snapshot_.load();
1190 if (!snapshot->members.empty()) {
1191 HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
1198 ClientClusterServiceImpl::fire_events(std::vector<membership_event> events)
1200 std::lock_guard<std::mutex> g(listeners_lock_);
1202 for (
auto const& event : events) {
1203 for (
auto& item : listeners_) {
1204 membership_listener& listener = item.second;
1205 if (event.get_event_type() ==
1206 membership_event::membership_event_type::MEMBER_JOINED) {
1207 listener.joined_(event);
1209 listener.left_(event);
1216 ClientClusterServiceImpl::wait_initial_member_list_fetched()
const
1220 if ((
const_cast<boost::latch&
>(initial_list_fetched_latch_))
1221 .wait_for(INITIAL_MEMBERS_TIMEOUT) == boost::cv_status::timeout) {
1222 BOOST_THROW_EXCEPTION(exception::illegal_state(
1223 "ClientClusterServiceImpl::wait_initial_member_list_fetched",
1224 "Could not get initial member list from cluster!"));
1229 ClientInvocationServiceImpl::invoke_on_connection(
1230 const std::shared_ptr<ClientInvocation>& invocation,
1231 const std::shared_ptr<connection::Connection>& connection)
1233 return send(invocation, connection);
1237 ClientInvocationServiceImpl::invoke_on_partition_owner(
1238 const std::shared_ptr<ClientInvocation>& invocation,
1241 auto partition_owner =
1242 client_.get_partition_service().get_partition_owner(partition_id);
1243 if (partition_owner.is_nil()) {
1248 "Partition owner is not assigned yet for partition %1%") %
1252 return invoke_on_target(invocation, partition_owner);
1256 ClientInvocationServiceImpl::invoke_on_target(
1257 const std::shared_ptr<ClientInvocation>& invocation,
1258 boost::uuids::uuid uuid)
1260 assert(!uuid.is_nil());
1261 auto connection = client_.get_connection_manager().get_connection(uuid);
1266 boost::str(boost::format(
"Client is not connected to target : %1%") %
1270 return send(invocation, connection);
1274 ClientInvocationServiceImpl::is_smart_routing()
const
1276 return smart_routing_;
1279 const std::chrono::milliseconds&
1280 ClientInvocationServiceImpl::get_backup_timeout()
const
1282 return backup_timeout_;
1286 ClientInvocationServiceImpl::fail_on_indeterminate_state()
const
1288 return fail_on_indeterminate_operation_state_;
1291 ClientExecutionServiceImpl::ClientExecutionServiceImpl(
1292 const std::string& name,
1293 const client_properties& properties,
1295 spi::lifecycle_service& service)
1296 : lifecycle_service_(service)
1297 , client_properties_(properties)
1301 ClientExecutionServiceImpl::start()
1303 int internalPoolSize = client_properties_.get_integer(
1304 client_properties_.get_internal_executor_pool_size());
1305 if (internalPoolSize <= 0) {
1306 internalPoolSize = util::IOUtil::to_value<int>(
1307 client_properties::INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT);
1310 internal_executor_.reset(
1311 new hazelcast::util::hz_thread_pool(internalPoolSize));
1313 user_executor_.reset(
new hazelcast::util::hz_thread_pool());
1317 ClientExecutionServiceImpl::shutdown()
1319 shutdown_thread_pool(internal_executor_.get());
1320 shutdown_thread_pool(user_executor_.get());
1323 util::hz_thread_pool&
1324 ClientExecutionServiceImpl::get_user_executor()
1326 return *user_executor_;
1330 ClientExecutionServiceImpl::shutdown_thread_pool(
1331 hazelcast::util::hz_thread_pool* pool)
1339 constexpr
int ClientInvocation::MAX_FAST_INVOCATION_COUNT;
1340 constexpr
int ClientInvocation::UNASSIGNED_PARTITION;
1342 ClientInvocation::ClientInvocation(
1343 spi::ClientContext& client_context,
1344 std::shared_ptr<protocol::ClientMessage>&& message,
1345 const std::string& name,
1347 const std::shared_ptr<connection::Connection>& conn,
1348 boost::uuids::uuid uuid)
1349 : logger_(client_context.get_logger())
1350 , lifecycle_service_(client_context.get_lifecycle_service())
1351 , invocation_service_(client_context.get_invocation_service())
1352 , execution_service_(
1353 client_context.get_client_execution_service().shared_from_this())
1354 , call_id_sequence_(client_context.get_call_id_sequence())
1356 , partition_id_(partition)
1357 , start_time_(std::chrono::steady_clock::now())
1358 , retry_pause_(invocation_service_.get_invocation_retry_pause())
1359 , object_name_(name)
1361 , bound_to_single_connection_(conn != nullptr)
1364 , smart_routing_(invocation_service_.is_smart_routing())
1366 message->set_partition_id(partition_id_);
1368 boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(message);
1369 set_send_connection(
nullptr);
1372 ClientInvocation::~ClientInvocation() =
default;
1374 boost::future<protocol::ClientMessage>
1375 ClientInvocation::invoke()
1377 assert(client_message_.load());
1379 call_id_sequence_->next();
1380 invoke_on_selection();
1381 if (!lifecycle_service_.is_running()) {
1382 return invocation_promise_.get_future().then(
1383 [](boost::future<protocol::ClientMessage> f) {
return f.get(); });
1385 auto id_seq = call_id_sequence_;
1386 return invocation_promise_.get_future().then(
1387 execution_service_->get_user_executor(),
1388 [=](boost::future<protocol::ClientMessage> f) {
1394 boost::future<protocol::ClientMessage>
1395 ClientInvocation::invoke_urgent()
1397 assert(client_message_.load());
1400 call_id_sequence_->force_next();
1401 invoke_on_selection();
1402 if (!lifecycle_service_.is_running()) {
1403 return invocation_promise_.get_future().then(
1404 [](boost::future<protocol::ClientMessage> f) {
return f.get(); });
1406 auto id_seq = call_id_sequence_;
1407 return invocation_promise_.get_future().then(
1408 execution_service_->get_user_executor(),
1409 [=](boost::future<protocol::ClientMessage> f) {
1416 ClientInvocation::invoke_on_selection()
1421 invocation_service_.check_invocation_allowed();
1424 if (is_bind_to_single_connection()) {
1425 bool invoked =
false;
1426 auto conn = connection_.lock();
1428 invoked = invocation_service_.invoke_on_connection(
1429 shared_from_this(), conn);
1432 std::string message;
1435 (boost::format(
"Could not invoke on connection %1%") %
1439 message =
"Could not invoke. Bound to a connection that is "
1442 notify_exception(std::make_exception_ptr(exception::io(
1443 "ClientInvocation::invoke_on_selection", message)));
1448 bool invoked =
false;
1449 if (smart_routing_) {
1450 if (partition_id_ != -1) {
1451 invoked = invocation_service_.invoke_on_partition_owner(
1452 shared_from_this(), partition_id_);
1453 }
else if (!uuid_.is_nil()) {
1454 invoked = invocation_service_.invoke_on_target(
1455 shared_from_this(), uuid_);
1457 invoked = invocation_service_.invoke(shared_from_this());
1460 invoked = invocation_service_.invoke(shared_from_this());
1463 invoked = invocation_service_.invoke(shared_from_this());
1466 notify_exception(std::make_exception_ptr(
1467 exception::io(
"No connection found to invoke")));
1469 }
catch (exception::iexception&) {
1470 notify_exception(std::current_exception());
1471 }
catch (std::exception&) {
1477 ClientInvocation::is_bind_to_single_connection()
const
1479 return bound_to_single_connection_;
1483 ClientInvocation::run()
1489 ClientInvocation::retry()
1495 boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(
1499 invoke_on_selection();
1500 }
catch (exception::iexception& e) {
1501 set_exception(e, boost::current_exception());
1502 }
catch (std::exception&) {
1508 ClientInvocation::set_exception(
const std::exception& e,
1509 boost::exception_ptr exception_ptr)
1511 invoked_or_exception_set_.store(
true);
1513 auto send_conn = send_connection_.load();
1515 auto connection = send_conn->lock();
1518 client_message_.load()->get()->get_correlation_id();
1520 connection->get_socket().get_executor(),
1521 [=]() { connection->deregister_invocation(call_id); });
1524 invocation_promise_.set_exception(std::move(exception_ptr));
1525 }
catch (boost::promise_already_satisfied& se) {
1526 if (!event_handler_) {
1529 boost::str(boost::format(
1530 "Failed to set the exception for invocation. "
1531 "%1%, %2% Exception to be set: %3%") %
1532 se.what() % *
this % e.what()));
1538 ClientInvocation::notify_exception(std::exception_ptr exception)
1542 std::rethrow_exception(exception);
1543 }
catch (exception::iexception& iex) {
1546 if (!lifecycle_service_.is_running()) {
1548 std::throw_with_nested(boost::enable_current_exception(
1549 exception::hazelcast_client_not_active(
1550 iex.get_source(),
"Client is shutting down")));
1551 }
catch (exception::iexception& e) {
1552 set_exception(e, boost::current_exception());
1557 if (!should_retry(iex)) {
1558 set_exception(iex, boost::current_exception());
1562 auto timePassed = std::chrono::steady_clock::now() - start_time_;
1563 if (timePassed > invocation_service_.get_invocation_timeout()) {
1567 boost::str(boost::format(
"Exception will not be retried because "
1568 "invocation timed out. %1%") %
1571 auto now = std::chrono::steady_clock::now();
1573 auto timeoutException =
1574 (exception::exception_builder<exception::operation_timeout>(
1575 "ClientInvocation::newoperation_timeout_exception")
1577 <<
" timed out because exception occurred after client "
1578 "invocation timeout "
1579 << std::chrono::duration_cast<std::chrono::milliseconds>(
1580 invocation_service_.get_invocation_timeout())
1582 <<
"msecs. Last exception:" << iex <<
" Current time :"
1583 << util::StringUtil::time_to_string(now) <<
". "
1585 << util::StringUtil::time_to_string(start_time_)
1586 <<
". Total elapsed time: "
1587 << std::chrono::duration_cast<std::chrono::milliseconds>(
1593 BOOST_THROW_EXCEPTION(timeoutException);
1595 set_exception(timeoutException, boost::current_exception());
1603 }
catch (std::exception& e) {
1604 set_exception(e, boost::current_exception());
1612 ClientInvocation::erase_invocation()
const
1614 if (!this->event_handler_) {
1615 auto sent_connection = get_send_connection();
1616 if (sent_connection) {
1617 auto this_invocation = shared_from_this();
1618 boost::asio::post(sent_connection->get_socket().get_executor(),
1620 sent_connection->invocations.erase(
1621 this_invocation->get_client_message()
1622 ->get_correlation_id());
1629 ClientInvocation::should_retry(exception::iexception& exception)
1631 auto errorCode = exception.get_error_code();
1632 if (is_bind_to_single_connection() &&
1633 (errorCode == protocol::IO ||
1634 errorCode == protocol::TARGET_DISCONNECTED)) {
1638 if (!uuid_.is_nil() && errorCode == protocol::TARGET_NOT_MEMBER) {
1645 if (errorCode == protocol::IO ||
1646 errorCode == protocol::HAZELCAST_INSTANCE_NOT_ACTIVE ||
1647 exception.is_retryable()) {
1650 if (errorCode == protocol::TARGET_DISCONNECTED) {
1651 return client_message_.load()->get()->is_retryable() ||
1652 invocation_service_.is_redo_operation();
1658 operator<<(std::ostream& os,
const ClientInvocation& invocation)
1660 std::ostringstream target;
1661 if (invocation.is_bind_to_single_connection()) {
1662 auto conn = invocation.connection_.lock();
1664 target <<
"connection " << *conn;
1666 }
else if (invocation.partition_id_ != -1) {
1667 target <<
"partition " << invocation.partition_id_;
1668 }
else if (!invocation.uuid_.is_nil()) {
1669 target <<
"uuid " << boost::to_string(invocation.uuid_);
1673 os <<
"ClientInvocation{"
1674 <<
"requestMessage = " << *invocation.client_message_.load()->get()
1675 <<
", objectName = " << invocation.object_name_
1676 <<
", target = " << target.str() <<
", sendConnection = ";
1677 auto sendConnection = invocation.get_send_connection();
1678 if (sendConnection) {
1679 os << *sendConnection;
1683 os <<
", backup_acks_expected_ = "
1684 <<
static_cast<int>(invocation.backup_acks_expected_)
1685 <<
", backup_acks_received = " << invocation.backup_acks_received_;
1687 if (invocation.pending_response_) {
1688 os <<
", pending_response: " << *invocation.pending_response_;
1696 std::shared_ptr<ClientInvocation>
1697 ClientInvocation::create(
1698 spi::ClientContext& client_context,
1699 std::shared_ptr<protocol::ClientMessage>&& client_message,
1700 const std::string& object_name,
1703 return std::shared_ptr<ClientInvocation>(
new ClientInvocation(
1704 client_context, std::move(client_message), object_name, partition_id));
1707 std::shared_ptr<ClientInvocation>
1708 ClientInvocation::create(
1709 spi::ClientContext& client_context,
1710 std::shared_ptr<protocol::ClientMessage>&& client_message,
1711 const std::string& object_name,
1712 const std::shared_ptr<connection::Connection>& connection)
1714 return std::shared_ptr<ClientInvocation>(
1715 new ClientInvocation(client_context,
1716 std::move(client_message),
1718 UNASSIGNED_PARTITION,
1722 std::shared_ptr<ClientInvocation>
1723 ClientInvocation::create(
1724 spi::ClientContext& client_context,
1725 std::shared_ptr<protocol::ClientMessage>&& client_message,
1726 const std::string& object_name,
1727 boost::uuids::uuid uuid)
1729 return std::shared_ptr<ClientInvocation>(
1730 new ClientInvocation(client_context,
1731 std::move(client_message),
1733 UNASSIGNED_PARTITION,
1738 std::shared_ptr<ClientInvocation>
1739 ClientInvocation::create(spi::ClientContext& client_context,
1740 protocol::ClientMessage& client_message,
1741 const std::string& object_name,
1746 std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1751 std::shared_ptr<ClientInvocation>
1752 ClientInvocation::create(
1753 spi::ClientContext& client_context,
1754 protocol::ClientMessage& client_message,
1755 const std::string& object_name,
1756 const std::shared_ptr<connection::Connection>& connection)
1760 std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1765 std::shared_ptr<ClientInvocation>
1766 ClientInvocation::create(spi::ClientContext& client_context,
1767 protocol::ClientMessage& client_message,
1768 const std::string& object_name,
1769 boost::uuids::uuid uuid)
1773 std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1778 std::shared_ptr<connection::Connection>
1779 ClientInvocation::get_send_connection()
const
1781 return send_connection_.load()->lock();
1785 ClientInvocation::wait_invoked()
const
1788 while (!invoked_or_exception_set_) {
1789 std::this_thread::sleep_for(retry_pause_);
1794 ClientInvocation::set_send_connection(
1795 const std::shared_ptr<connection::Connection>& conn)
1797 send_connection_.store(
1798 boost::make_shared<std::weak_ptr<connection::Connection>>(conn));
1799 invoked_or_exception_set_.store(
true);
1803 ClientInvocation::notify(
const std::shared_ptr<protocol::ClientMessage>& msg)
1806 BOOST_THROW_EXCEPTION(
1807 exception::illegal_argument(
"response can't be null"));
1810 int8_t expected_backups = msg->get_number_of_backups();
1815 if (expected_backups > backup_acks_received_) {
1820 pending_response_received_time_ = std::chrono::steady_clock::now();
1822 backup_acks_expected_ = expected_backups;
1828 pending_response_ = msg;
1845 ClientInvocation::complete(
const std::shared_ptr<protocol::ClientMessage>& msg)
1849 this->invocation_promise_.set_value(*msg);
1850 }
catch (std::exception& e) {
1853 boost::str(boost::format(
1854 "Failed to set the response for invocation. "
1855 "Dropping the response. %1%, %2% Response: %3%") %
1856 e.what() % *
this % *msg));
1858 this->erase_invocation();
1861 std::shared_ptr<protocol::ClientMessage>
1862 ClientInvocation::get_client_message()
const
1864 return *client_message_.load();
1867 const std::shared_ptr<EventHandler<protocol::ClientMessage>>&
1868 ClientInvocation::get_event_handler()
const
1870 return event_handler_;
1874 ClientInvocation::set_event_handler(
1875 const std::shared_ptr<EventHandler<protocol::ClientMessage>>& handler)
1877 ClientInvocation::event_handler_ = handler;
1881 ClientInvocation::execute()
1883 auto this_invocation = shared_from_this();
1884 auto command = [=]() { this_invocation->run(); };
1890 int64_t callId = call_id_sequence_->force_next();
1891 client_message_.load()->get()->set_correlation_id(callId);
1894 call_id_sequence_->complete();
1896 if (invoke_count_ < MAX_FAST_INVOCATION_COUNT) {
1898 execution_service_->execute(command);
1901 int64_t delayMillis = util::min<int64_t>(
1902 static_cast<int64_t
>(1)
1903 << (invoke_count_ - MAX_FAST_INVOCATION_COUNT),
1904 std::chrono::duration_cast<std::chrono::milliseconds>(retry_pause_)
1906 retry_timer_ = execution_service_->schedule(
1907 command, std::chrono::milliseconds(delayMillis));
1912 ClientInvocation::get_name()
const
1914 return "ClientInvocation";
1917 std::shared_ptr<protocol::ClientMessage>
1918 ClientInvocation::copy_message()
1920 return std::make_shared<protocol::ClientMessage>(**client_message_.load());
1923 boost::promise<protocol::ClientMessage>&
1924 ClientInvocation::get_promise()
1926 return invocation_promise_;
1930 ClientInvocation::log_exception(exception::iexception& e)
1934 boost::str(boost::format(
1935 "Invocation got an exception %1%, invoke count : %2%, "
1936 "exception : %3%") %
1937 *
this % invoke_count_.load() % e));
1941 ClientInvocation::notify_backup()
1943 ++backup_acks_received_;
1945 if (!pending_response_) {
1953 if (backup_acks_expected_ != backup_acks_received_) {
1962 complete_with_pending_response();
1966 ClientInvocation::detect_and_handle_backup_timeout(
1967 const std::chrono::milliseconds& backup_timeout)
1972 if (backup_acks_expected_ == backup_acks_received_) {
1979 if (!pending_response_) {
1985 if (pending_response_received_time_ + backup_timeout >=
1986 std::chrono::steady_clock::now()) {
1990 if (invocation_service_.fail_on_indeterminate_state()) {
1991 auto exception = boost::enable_current_exception(
1992 (exception::exception_builder<
1993 exception::indeterminate_operation_state>(
1994 "ClientInvocation::detect_and_handle_backup_timeout")
1995 << *
this <<
" failed because backup acks missed.")
1997 notify_exception(std::make_exception_ptr(exception));
2003 complete_with_pending_response();
2007 ClientInvocation::complete_with_pending_response()
2009 complete(pending_response_);
2013 impl::ClientTransactionManagerServiceImpl::get_client()
const
2018 ClientTransactionManagerServiceImpl::ClientTransactionManagerServiceImpl(
2019 ClientContext& client)
2023 std::shared_ptr<connection::Connection>
2024 ClientTransactionManagerServiceImpl::connect()
2026 auto& invocationService = client_.get_invocation_service();
2027 auto startTime = std::chrono::steady_clock::now();
2028 auto invocationTimeout = invocationService.get_invocation_timeout();
2029 client_config& clientConfig = client_.get_client_config();
2030 bool smartRouting = clientConfig.get_network_config().is_smart_routing();
2032 while (client_.get_lifecycle_service().is_running()) {
2035 client_.get_connection_manager().get_random_connection();
2037 throw_exception(smartRouting);
2040 }
catch (exception::hazelcast_client_offline&) {
2042 }
catch (exception::iexception&) {
2043 if (std::chrono::steady_clock::now() - startTime >
2044 invocationTimeout) {
2045 std::rethrow_exception(new_operation_timeout_exception(
2046 std::current_exception(), invocationTimeout, startTime));
2049 std::this_thread::sleep_for(
2050 invocationService.get_invocation_retry_pause());
2052 BOOST_THROW_EXCEPTION(exception::hazelcast_client_not_active(
2053 "ClientTransactionManagerServiceImpl::connect",
"Client is shutdown"));
2057 ClientTransactionManagerServiceImpl::new_operation_timeout_exception(
2058 std::exception_ptr cause,
2059 std::chrono::milliseconds invocation_timeout,
2060 std::chrono::steady_clock::time_point start_time)
2062 std::ostringstream sb;
2063 auto now = std::chrono::steady_clock::now();
2064 sb <<
"Creating transaction context timed out because exception occurred "
2065 "after client invocation timeout "
2066 << std::chrono::duration_cast<std::chrono::milliseconds>(
2071 << util::StringUtil::time_to_string(std::chrono::steady_clock::now())
2073 <<
"Start time: " << util::StringUtil::time_to_string(start_time)
2074 <<
". Total elapsed time: "
2075 << std::chrono::duration_cast<std::chrono::milliseconds>(now -
2080 std::rethrow_exception(cause);
2083 std::throw_with_nested(boost::enable_current_exception(
2084 exception::operation_timeout(
"ClientTransactionManagerServiceImpl"
2085 "::newoperation_timeout_exception",
2088 return std::current_exception();
2095 ClientTransactionManagerServiceImpl::throw_exception(
bool smart_routing)
2097 auto& client_config = client_.get_client_config();
2098 auto& connection_strategy_Config =
2099 client_config.get_connection_strategy_config();
2100 auto reconnect_mode = connection_strategy_Config.get_reconnect_mode();
2101 if (reconnect_mode ==
2102 config::client_connection_strategy_config::reconnect_mode::ASYNC) {
2103 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
2104 "ClientTransactionManagerServiceImpl::throw_exception",
""));
2106 if (smart_routing) {
2107 auto members = client_.get_cluster().get_members();
2108 std::ostringstream msg;
2109 if (members.empty()) {
2110 msg <<
"No address was return by the LoadBalancer since there are "
2111 "no members in the cluster";
2113 msg <<
"No address was return by the LoadBalancer. "
2114 "But the cluster contains the following members:{\n";
2115 for (
auto const& m : members) {
2116 msg <<
'\t' << m <<
'\n';
2120 BOOST_THROW_EXCEPTION(exception::illegal_state(
2121 "ClientTransactionManagerServiceImpl::throw_exception", msg.str()));
2123 BOOST_THROW_EXCEPTION(exception::illegal_state(
2124 "ClientTransactionManagerServiceImpl::throw_exception",
2125 "No active connection is found"));
2128 ClientPartitionServiceImpl::ClientPartitionServiceImpl(ClientContext& client)
2130 , logger_(client.get_logger())
2131 , partition_count_(0)
2133 boost::shared_ptr<partition_table>(new partition_table{ 0, -1 }))
2137 ClientPartitionServiceImpl::handle_event(
2138 int32_t connection_id,
2140 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>>&
2147 "Handling new partition table with partitionStateVersion: %1%") %
2151 auto current = partition_table_.load();
2152 if (!should_be_applied(connection_id, version, partitions, *current)) {
2155 if (partition_table_.compare_exchange_strong(
2157 boost::shared_ptr<partition_table>(
new partition_table{
2158 connection_id, version, convert_to_map(partitions) }))) {
2164 "Applied partition table with partitionStateVersion : %1%") %
2172 ClientPartitionServiceImpl::get_partition_owner(int32_t partition_id)
2174 auto table_ptr = partition_table_.load();
2175 auto it = table_ptr->partitions.find(partition_id);
2176 if (it != table_ptr->partitions.end()) {
2179 return boost::uuids::nil_uuid();
2183 ClientPartitionServiceImpl::get_partition_id(
2184 const serialization::pimpl::data& key)
2186 int32_t pc = get_partition_count();
2190 int hash = key.get_partition_hash();
2191 return util::HashUtil::hash_to_index(hash, pc);
2195 ClientPartitionServiceImpl::get_partition_count()
2197 return partition_count_.load();
2200 std::shared_ptr<client::impl::Partition>
2201 ClientPartitionServiceImpl::get_partition(
int partition_id)
2203 return std::shared_ptr<client::impl::Partition>(
2204 new PartitionImpl(partition_id, client_, *
this));
2208 ClientPartitionServiceImpl::check_and_set_partition_count(
2209 int32_t new_partition_count)
2211 int32_t expected = 0;
2212 if (partition_count_.compare_exchange_strong(expected,
2213 new_partition_count)) {
2216 return partition_count_.load() == new_partition_count;
2220 ClientPartitionServiceImpl::should_be_applied(
2221 int32_t connection_id,
2223 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>>&
2225 const partition_table& current)
2227 auto& lg = client_.get_logger();
2228 if (partitions.empty()) {
2229 if (logger_.enabled(logger::level::finest)) {
2230 log_failure(connection_id, version, current,
"response is empty");
2234 if (!current.connection_id || connection_id != current.connection_id) {
2236 lg, finest, ([¤t, connection_id]() {
2237 auto frmt = boost::format(
2238 "Event coming from a new connection. Old connection id: %1%, "
2239 "new connection %2%");
2241 if (current.connection_id) {
2242 frmt = frmt % current.connection_id;
2244 frmt = frmt %
"none";
2247 return boost::str(frmt % connection_id);
2252 if (version <= current.version) {
2253 if (lg.enabled(logger::level::finest)) {
2255 connection_id, version, current,
"response state version is old");
2263 ClientPartitionServiceImpl::log_failure(
2264 int32_t connection_id,
2266 const ClientPartitionServiceImpl::partition_table& current,
2267 const std::string& cause)
2269 HZ_LOG(logger_, finest, [&]() {
2270 auto frmt = boost::format(
2271 " We will not apply the response, since %1% ."
2272 " Response is from connection with id %2%. "
2273 "Current connection id is %3%, response state version:%4%. "
2274 "Current state version: %5%");
2275 if (current.connection_id) {
2276 return boost::str(frmt % cause % connection_id %
2277 current.connection_id % version %
2280 return boost::str(frmt % cause % connection_id %
"nullptr" %
2281 version % current.version);
2287 ClientPartitionServiceImpl::reset()
2289 partition_table_.store(
nullptr);
2292 std::unordered_map<int32_t, boost::uuids::uuid>
2293 ClientPartitionServiceImpl::convert_to_map(
2294 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>>&
2297 std::unordered_map<int32_t, boost::uuids::uuid> new_partitions;
2298 for (
auto const& e : partitions) {
2299 for (
auto pid : e.second) {
2300 new_partitions.insert({ pid, e.first });
2303 return new_partitions;
2307 ClientPartitionServiceImpl::PartitionImpl::get_partition_id()
const
2309 return partition_id_;
2312 boost::optional<member>
2313 ClientPartitionServiceImpl::PartitionImpl::get_owner()
const
2315 auto owner = partition_service_.get_partition_owner(partition_id_);
2316 if (!owner.is_nil()) {
2317 return client_.get_client_cluster_service().get_member(owner);
2322 ClientPartitionServiceImpl::PartitionImpl::PartitionImpl(
2324 ClientContext& client,
2325 ClientPartitionServiceImpl& partition_service)
2326 : partition_id_(partition_id)
2328 , partition_service_(partition_service)
2331 namespace sequence {
2332 CallIdSequenceWithoutBackpressure::CallIdSequenceWithoutBackpressure()
2336 CallIdSequenceWithoutBackpressure::~CallIdSequenceWithoutBackpressure() =
2340 CallIdSequenceWithoutBackpressure::get_max_concurrent_invocations()
const
2346 CallIdSequenceWithoutBackpressure::next()
2348 return force_next();
2352 CallIdSequenceWithoutBackpressure::force_next()
2358 CallIdSequenceWithoutBackpressure::complete()
2364 CallIdSequenceWithoutBackpressure::get_last_call_id()
2370 AbstractCallIdSequence::AbstractCallIdSequence(
2371 int32_t max_concurrent_invocations)
2373 std::ostringstream out;
2374 out <<
"maxConcurrentInvocations should be a positive number. "
2375 "maxConcurrentInvocations="
2376 << max_concurrent_invocations;
2377 this->max_concurrent_invocations_ = util::Preconditions::check_positive(
2378 max_concurrent_invocations, out.str());
2380 for (
size_t i = 0; i < longs_.size(); ++i) {
2385 AbstractCallIdSequence::~AbstractCallIdSequence() =
default;
2388 AbstractCallIdSequence::get_max_concurrent_invocations()
const
2390 return max_concurrent_invocations_;
2394 AbstractCallIdSequence::next()
2397 handle_no_space_left();
2399 return force_next();
2403 AbstractCallIdSequence::force_next()
2405 return ++longs_[INDEX_HEAD];
2409 AbstractCallIdSequence::complete()
2411 ++longs_[INDEX_TAIL];
2412 assert(longs_[INDEX_TAIL] <= longs_[INDEX_HEAD]);
2416 AbstractCallIdSequence::get_last_call_id()
2418 return longs_[INDEX_HEAD];
2422 AbstractCallIdSequence::has_space()
2424 return longs_[INDEX_HEAD] - longs_[INDEX_TAIL] <
2425 max_concurrent_invocations_;
2429 AbstractCallIdSequence::get_tail()
2431 return longs_[INDEX_TAIL];
2434 const std::unique_ptr<util::concurrent::IdleStrategy>
2435 CallIdSequenceWithBackpressure::IDLER(
2436 new util::concurrent::BackoffIdleStrategy(
2439 std::chrono::duration_cast<std::chrono::nanoseconds>(
2440 std::chrono::microseconds(1000))
2442 std::chrono::duration_cast<std::chrono::nanoseconds>(
2443 std::chrono::microseconds(MAX_DELAY_MS * 1000))
2446 CallIdSequenceWithBackpressure::CallIdSequenceWithBackpressure(
2447 int32_t max_concurrent_invocations,
2448 int64_t backoff_timeout_ms)
2449 : AbstractCallIdSequence(max_concurrent_invocations)
2451 std::ostringstream out;
2452 out <<
"backoffTimeoutMs should be a positive number. backoffTimeoutMs="
2453 << backoff_timeout_ms;
2454 util::Preconditions::check_positive(backoff_timeout_ms, out.str());
2456 backoff_timeout_nanos_ =
2457 std::chrono::duration_cast<std::chrono::nanoseconds>(
2458 std::chrono::milliseconds(backoff_timeout_ms))
2463 CallIdSequenceWithBackpressure::handle_no_space_left()
2465 auto start = std::chrono::steady_clock::now();
2466 for (int64_t idleCount = 0;; idleCount++) {
2467 int64_t elapsedNanos =
2468 std::chrono::duration_cast<std::chrono::nanoseconds>(
2469 std::chrono::steady_clock::now() - start)
2471 if (elapsedNanos > backoff_timeout_nanos_) {
2472 throw(exception::exception_builder<exception::hazelcast_overload>(
2473 "CallIdSequenceWithBackpressure::handleNoSpaceLeft")
2474 <<
"Timed out trying to acquire another call ID."
2475 <<
" maxConcurrentInvocations = "
2476 << get_max_concurrent_invocations() <<
", backoffTimeout = "
2477 << std::chrono::microseconds(backoff_timeout_nanos_ / 1000)
2479 <<
" msecs, elapsed:"
2480 << std::chrono::microseconds(elapsedNanos / 1000).count()
2484 IDLER->idle(idleCount);
2491 FailFastCallIdSequence::FailFastCallIdSequence(
2492 int32_t max_concurrent_invocations)
2493 : AbstractCallIdSequence(max_concurrent_invocations)
2497 FailFastCallIdSequence::handle_no_space_left()
2499 throw(exception::exception_builder<exception::hazelcast_overload>(
2500 "FailFastCallIdSequence::handleNoSpaceLeft")
2501 <<
"Maximum invocation count is reached. maxConcurrentInvocations = "
2502 << get_max_concurrent_invocations())
2506 std::unique_ptr<CallIdSequence>
2507 CallIdFactory::new_call_id_sequence(
bool is_back_pressure_enabled,
2508 int32_t max_allowed_concurrent_invocations,
2509 int64_t backoff_timeout_ms)
2511 if (!is_back_pressure_enabled) {
2512 return std::unique_ptr<CallIdSequence>(
2513 new CallIdSequenceWithoutBackpressure());
2514 }
else if (backoff_timeout_ms <= 0) {
2515 return std::unique_ptr<CallIdSequence>(
2516 new FailFastCallIdSequence(max_allowed_concurrent_invocations));
2518 return std::unique_ptr<CallIdSequence>(
2519 new CallIdSequenceWithBackpressure(max_allowed_concurrent_invocations,
2520 backoff_timeout_ms));
2525 namespace listener {
2526 listener_service_impl::listener_service_impl(ClientContext& client_context,
2527 int32_t event_thread_count)
2528 : client_context_(client_context)
2529 , serialization_service_(client_context.get_serialization_service())
2530 , logger_(client_context.get_logger())
2531 , client_connection_manager_(client_context.get_connection_manager())
2532 , number_of_event_threads_(event_thread_count)
2533 , smart_(client_context.get_client_config()
2534 .get_network_config()
2535 .is_smart_routing())
2537 auto& invocationService = client_context.get_invocation_service();
2538 invocation_timeout_ = invocationService.get_invocation_timeout();
2539 invocation_retry_pause_ = invocationService.get_invocation_retry_pause();
2543 listener_service_impl::registers_local_only()
const
2548 boost::future<boost::uuids::uuid>
2549 listener_service_impl::register_listener(
2550 std::shared_ptr<ListenerMessageCodec> listener_message_codec,
2551 std::shared_ptr<client::impl::BaseEventHandler> handler)
2553 auto task = boost::packaged_task<boost::uuids::uuid()>([=]() {
2554 return register_listener_internal(listener_message_codec, handler);
2556 auto f = task.get_future();
2557 boost::asio::post(registration_executor_->get_executor(), std::move(task));
2562 listener_service_impl::deregister_listener(boost::uuids::uuid registration_id)
2564 util::Preconditions::check_not_nill(
2565 registration_id,
"Nil userRegistrationId is not allowed!");
2567 boost::packaged_task<bool()> task(
2568 [=]() {
return deregister_listener_internal(registration_id); });
2569 auto f = task.get_future();
2570 boost::asio::post(registration_executor_->get_executor(), std::move(task));
2575 listener_service_impl::connection_added(
2576 const std::shared_ptr<connection::Connection> connection)
2578 boost::asio::post(registration_executor_->get_executor(),
2579 [=]() { connection_added_internal(connection); });
2583 listener_service_impl::connection_removed(
2584 const std::shared_ptr<connection::Connection> connection)
2586 boost::asio::post(registration_executor_->get_executor(),
2587 [=]() { connection_removed_internal(connection); });
2591 listener_service_impl::remove_event_handler(
2593 const std::shared_ptr<connection::Connection>& connection)
2595 boost::asio::post(connection->get_socket().get_executor(),
2596 std::packaged_task<
void()>(
2597 [=]() { connection->deregister_invocation(call_id); }));
2601 listener_service_impl::handle_client_message(
2602 const std::shared_ptr<ClientInvocation> invocation,
2603 const std::shared_ptr<protocol::ClientMessage> response)
2606 auto partitionId = response->get_partition_id();
2607 if (partitionId == -1) {
2609 boost::asio::post(event_executor_->get_executor(), [=]() {
2610 process_event_message(invocation, response);
2617 event_strands_[partitionId % event_strands_.size()],
2618 [=]() { process_event_message(invocation, response); });
2620 }
catch (
const std::exception& e) {
2621 if (client_context_.get_lifecycle_service().is_running()) {
2625 boost::str(boost::format(
"Delivery of event message to event "
2626 "handler failed. %1%, %2%, %3%") %
2627 e.what() % *response % *invocation));
2633 listener_service_impl::shutdown()
2635 ClientExecutionServiceImpl::shutdown_thread_pool(event_executor_.get());
2636 ClientExecutionServiceImpl::shutdown_thread_pool(
2637 registration_executor_.get());
2641 listener_service_impl::start()
2643 event_executor_.reset(
2644 new hazelcast::util::hz_thread_pool(number_of_event_threads_));
2645 registration_executor_.reset(
new hazelcast::util::hz_thread_pool(1));
2647 for (
int i = 0; i < number_of_event_threads_; ++i) {
2648 event_strands_.emplace_back(event_executor_->get_executor());
2651 client_connection_manager_.add_connection_listener(shared_from_this());
2655 listener_service_impl::register_listener_internal(
2656 std::shared_ptr<ListenerMessageCodec> listener_message_codec,
2657 std::shared_ptr<client::impl::BaseEventHandler> handler)
2659 auto user_registration_id = client_context_.random_uuid();
2661 std::shared_ptr<listener_registration> registration(
2662 new listener_registration{ listener_message_codec, handler });
2663 registrations_.put(user_registration_id, registration);
2664 for (
auto const& connection :
2665 client_connection_manager_.get_active_connections()) {
2667 invoke(registration, connection);
2668 }
catch (exception::iexception& e) {
2669 if (connection->is_alive()) {
2670 deregister_listener_internal(user_registration_id);
2671 BOOST_THROW_EXCEPTION(
2672 (exception::exception_builder<exception::hazelcast_>(
2673 "ClientListenerService::RegisterListenerTask::call")
2674 <<
"Listener can not be added " << e)
2679 return user_registration_id;
2683 listener_service_impl::deregister_listener_internal(
2684 boost::uuids::uuid user_registration_id)
2686 auto listenerRegistration = registrations_.get(user_registration_id);
2687 if (!listenerRegistration) {
2690 bool successful =
true;
2692 auto listener_registrations =
2693 listenerRegistration->registrations.entry_set();
2694 for (
auto it = listener_registrations.begin();
2695 it != listener_registrations.end();) {
2696 const auto& registration = it->second;
2697 const auto& subscriber = it->first;
2699 const auto& listenerMessageCodec = listenerRegistration->codec;
2700 auto serverRegistrationId = registration->server_registration_id;
2702 listenerMessageCodec->encode_remove_request(serverRegistrationId);
2703 auto invocation = ClientInvocation::create(
2704 client_context_, request,
"", subscriber);
2705 invocation->invoke().get();
2707 remove_event_handler(registration->call_id, subscriber);
2709 it = listener_registrations.erase(it);
2710 }
catch (exception::iexception& e) {
2712 if (subscriber->is_alive()) {
2714 std::ostringstream endpoint;
2715 if (subscriber->get_remote_address()) {
2716 endpoint << *subscriber->get_remote_address();
2724 "ClientListenerService::deregisterListenerInternal "
2725 "Deregistration of listener with ID %1% "
2726 "has failed to address %2% %3%") %
2727 user_registration_id %
2728 subscriber->get_remote_address() % e));
2733 registrations_.remove(user_registration_id);
2739 listener_service_impl::connection_added_internal(
2740 const std::shared_ptr<connection::Connection>& connection)
2742 for (
const auto& listener_registration : registrations_.values()) {
2743 invoke_from_internal_thread(listener_registration, connection);
2748 listener_service_impl::connection_removed_internal(
2749 const std::shared_ptr<connection::Connection>& connection)
2751 for (
auto& registry : registrations_.values()) {
2752 registry->registrations.remove(connection);
2757 listener_service_impl::invoke_from_internal_thread(
2758 const std::shared_ptr<listener_registration>& listener_registration,
2759 const std::shared_ptr<connection::Connection>& connection)
2762 invoke(listener_registration, connection);
2763 }
catch (exception::iexception& e) {
2767 boost::format(
"Listener with pointer %1% can not be added to "
2768 "a new connection: %2%, reason: %3%") %
2769 listener_registration.get() % *connection % e));
2774 listener_service_impl::invoke(
2775 const std::shared_ptr<listener_registration>& listener_registration,
2776 const std::shared_ptr<connection::Connection>& connection)
2778 if (listener_registration->registrations.contains_key(connection)) {
2782 const auto& codec = listener_registration->codec;
2783 auto request = codec->encode_add_request(registers_local_only());
2784 const auto& handler = listener_registration->handler;
2785 handler->before_listener_register();
2787 auto invocation = ClientInvocation::create(
2789 std::make_shared<protocol::ClientMessage>(std::move(request)),
2792 invocation->set_event_handler(handler);
2793 auto clientMessage = invocation->invoke_urgent().get();
2795 auto serverRegistrationId = codec->decode_add_response(clientMessage);
2796 handler->on_listener_register();
2797 int64_t correlationId =
2798 invocation->get_client_message()->get_correlation_id();
2800 (*listener_registration)
2803 std::shared_ptr<connection_registration>(
2804 new connection_registration{ serverRegistrationId, correlationId }));
2808 listener_service_impl::process_event_message(
2809 const std::shared_ptr<ClientInvocation> invocation,
2810 const std::shared_ptr<protocol::ClientMessage> response)
2812 auto eventHandler = invocation->get_event_handler();
2813 if (!eventHandler) {
2814 if (client_context_.get_lifecycle_service().is_running()) {
2818 boost::format(
"No eventHandler for invocation. "
2819 "Ignoring this invocation response. %1%") %
2827 eventHandler->handle(*response);
2828 }
catch (std::exception& e) {
2829 if (client_context_.get_lifecycle_service().is_running()) {
2833 boost::str(boost::format(
"Delivery of event message to event "
2834 "handler failed. %1%, %2%, %3%") %
2835 e.what() % *response % *invocation));
2840 listener_service_impl::~listener_service_impl() =
default;
2843 cluster_view_listener::start()
2845 client_context_.get_connection_manager().add_connection_listener(
2846 shared_from_this());
2850 cluster_view_listener::connection_added(
2851 const std::shared_ptr<connection::Connection> connection)
2853 try_register(connection);
2857 cluster_view_listener::connection_removed(
2858 const std::shared_ptr<connection::Connection> connection)
2860 try_reregister_to_random_connection(connection->get_connection_id());
2863 cluster_view_listener::cluster_view_listener(ClientContext& client_context)
2864 : client_context_(client_context)
2868 cluster_view_listener::try_register(
2869 std::shared_ptr<connection::Connection> connection)
2871 int32_t expected_id = -1;
2872 if (!listener_added_connection_id_.compare_exchange_strong(
2873 expected_id, connection->get_connection_id())) {
2878 auto invocation = ClientInvocation::create(
2880 std::make_shared<protocol::ClientMessage>(
2881 protocol::codec::client_addclusterviewlistener_encode()),
2886 std::make_shared<event_handler>(connection->get_connection_id(), *
this);
2887 invocation->set_event_handler(handler);
2888 handler->before_listener_register();
2890 std::weak_ptr<cluster_view_listener> weak_self = shared_from_this();
2891 auto conn_id = connection->get_connection_id();
2893 invocation->invoke_urgent().then(
2894 [weak_self, handler, conn_id](boost::future<protocol::ClientMessage> f) {
2895 auto self = weak_self.lock();
2899 if (f.has_value()) {
2900 handler->on_listener_register();
2905 self->try_reregister_to_random_connection(conn_id);
2910 cluster_view_listener::try_reregister_to_random_connection(
2911 int32_t old_connection_id)
2913 if (!listener_added_connection_id_.compare_exchange_strong(
2914 old_connection_id, -1)) {
2918 auto new_connection =
2919 client_context_.get_connection_manager().get_random_connection();
2920 if (new_connection) {
2921 try_register(new_connection);
2925 cluster_view_listener::~cluster_view_listener() =
default;
2928 cluster_view_listener::event_handler::handle_membersview(
2930 const std::vector<member>& member_infos)
2932 view_listener.client_context_.get_client_cluster_service().handle_event(
2933 version, member_infos);
2937 cluster_view_listener::event_handler::handle_partitionsview(
2939 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>>&
2942 view_listener.client_context_.get_partition_service().handle_event(
2943 connection_id, version, partitions);
2947 cluster_view_listener::event_handler::before_listener_register()
2949 view_listener.client_context_.get_client_cluster_service()
2950 .clear_member_list_version();
2951 auto& lg = view_listener.client_context_.get_logger();
2955 boost::str(boost::format(
2956 "Register attempt of cluster_view_listener::event_handler "
2957 "to connection with id %1%") %
2962 cluster_view_listener::event_handler::on_listener_register()
2964 auto& lg = view_listener.client_context_.get_logger();
2968 boost::format(
"Registered cluster_view_listener::event_handler to "
2969 "connection with id %1%") %
2973 cluster_view_listener::event_handler::event_handler(
2975 cluster_view_listener& viewListener)
2976 : connection_id(connectionId)
2977 , view_listener(viewListener)
2981 protocol::ClientMessage
2982 ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_add_request(
2983 bool local_only)
const
2985 return protocol::codec::client_localbackuplistener_encode();
2988 protocol::ClientMessage
2989 ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_remove_request(
2990 boost::uuids::uuid real_registration_id)
const
2993 return protocol::ClientMessage(0);
2997 ClientInvocationServiceImpl::noop_backup_event_handler::handle_backup(
2998 int64_t source_invocation_correlation_id)
3003 namespace discovery {
3004 remote_address_provider::remote_address_provider(
3005 std::function<std::unordered_map<address, address>()> addr_map_method,
3007 : refresh_address_map_(std::move(addr_map_method))
3008 , use_public_(use_public)
3011 std::vector<address>
3012 remote_address_provider::load_addresses()
3014 auto address_map = refresh_address_map_();
3015 std::lock_guard<std::mutex> guard(lock_);
3016 private_to_public_ = address_map;
3017 std::vector<address> addresses;
3018 addresses.reserve(address_map.size());
3019 for (
const auto& addr_pair : address_map) {
3020 addresses.push_back(addr_pair.first);
3025 boost::optional<address>
3026 remote_address_provider::translate(
const address& addr)
3035 std::lock_guard<std::mutex> guard(lock_);
3036 auto found = private_to_public_.find(addr);
3037 if (found != private_to_public_.end()) {
3038 return found->second;
3042 auto address_map = refresh_address_map_();
3044 std::lock_guard<std::mutex> guard(lock_);
3045 private_to_public_ = address_map;
3047 auto found = private_to_public_.find(addr);
3048 if (found != private_to_public_.end()) {
3049 return found->second;
3055 #ifdef HZ_BUILD_WITH_SSL
3056 cloud_discovery::cloud_discovery(config::cloud_config& config,
3057 std::string cloud_base_url,
3058 std::chrono::steady_clock::duration timeout)
3059 : cloud_config_(config)
3060 , cloud_base_url_(cloud_base_url)
3064 cloud_discovery::cloud_discovery(config::cloud_config& config,
3065 std::string cloud_base_url,
3066 std::chrono::steady_clock::duration timeout)
3070 std::unordered_map<address, address>
3071 cloud_discovery::get_addresses()
3073 #ifdef HZ_BUILD_WITH_SSL
3075 util::SyncHttpsClient httpsConnection(cloud_base_url_,
3076 std::string(CLOUD_URL_PATH) +
3077 cloud_config_.discovery_token,
3079 cloud_config_.discovery_token);
3080 auto& conn_stream = httpsConnection.connect_and_get_response();
3081 return parse_json_response(conn_stream);
3082 }
catch (std::exception& e) {
3083 std::throw_with_nested(
3084 boost::enable_current_exception(exception::illegal_state(
3085 "cloud_discovery::get_addresses", e.what())));
3088 util::Preconditions::check_ssl(
"cloud_discovery::get_addresses");
3089 return std::unordered_map<address, address>();
3093 std::unordered_map<address, address>
3094 cloud_discovery::parse_json_response(std::istream& conn_stream)
3096 namespace pt = boost::property_tree;
3099 pt::read_json(conn_stream, root);
3101 std::unordered_map<address, address> addresses;
3102 for (
const auto& item : root) {
3103 auto private_address =
3104 item.second.get<std::string>(PRIVATE_ADDRESS_PROPERTY);
3105 auto public_address =
3106 item.second.get<std::string>(PUBLIC_ADDRESS_PROPERTY);
3108 address public_addr = create_address(public_address, -1);
3112 create_address(private_address, public_addr.get_port());
3113 addresses.emplace(std::move(private_addr), std::move(public_addr));
3120 cloud_discovery::create_address(
const std::string& hostname,
int default_port)
3122 auto address_holder =
3123 util::AddressUtil::get_address_holder(hostname, default_port);
3124 auto scoped_hostname =
3125 util::AddressHelper::get_scoped_hostname(address_holder);
3126 return address(std::move(scoped_hostname), address_holder.get_port());
3136 less<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
3137 const hazelcast::client::spi::DefaultObjectNamespace& lhs,
3138 const hazelcast::client::spi::DefaultObjectNamespace& rhs)
const
3140 int result = lhs.get_service_name().compare(rhs.get_service_name());
3149 return lhs.get_object_name().compare(rhs.get_object_name()) < 0;
3153 hash<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
3154 const hazelcast::client::spi::DefaultObjectNamespace& k)
const noexcept
3156 return std::hash<std::string>()(k.get_service_name() + k.get_object_name());
Hazelcast cluster interface.
const std::unordered_set< member > & get_members() const
Returns an immutable set of ordered members at the moment this MembershipListener is registered.
cluster & get_cluster()
Returns the cluster of the event.
lifecycle_state get_state() const
lifecycle_state
State enum.
lifecycle_event(lifecycle_state state)
Constructor.