19#include <boost/uuid/uuid_hash.hpp>
20#include <boost/functional/hash.hpp>
21#include <boost/property_tree/ptree.hpp>
22#include <boost/property_tree/json_parser.hpp>
24#include "hazelcast/client/hazelcast_client.h"
25#include <hazelcast/client/protocol/codec/ErrorCodec.h>
26#include <hazelcast/client/spi/impl/ListenerMessageCodec.h>
27#include <hazelcast/client/spi/impl/ClientClusterServiceImpl.h>
28#include <hazelcast/client/spi/impl/listener/cluster_view_listener.h>
29#include <hazelcast/client/spi/impl/listener/listener_service_impl.h>
30#include <hazelcast/client/spi/impl/discovery/remote_address_provider.h>
31#include <hazelcast/client/spi/impl/discovery/cloud_discovery.h>
32#include <hazelcast/util/AddressUtil.h>
33#include "hazelcast/client/member_selectors.h"
34#include "hazelcast/client/lifecycle_event.h"
35#include "hazelcast/client/initial_membership_event.h"
36#include "hazelcast/client/membership_event.h"
37#include "hazelcast/client/lifecycle_listener.h"
38#include "hazelcast/client/spi/ProxyManager.h"
39#include "hazelcast/client/spi/ClientProxy.h"
40#include "hazelcast/client/spi/ClientContext.h"
41#include "hazelcast/client/spi/impl/ClientInvocation.h"
42#include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
43#include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
44#include "hazelcast/client/impl/statistics/Statistics.h"
45#include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
46#include "hazelcast/client/spi/impl/DefaultAddressProvider.h"
47#include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithBackpressure.h"
48#include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithoutBackpressure.h"
49#include "hazelcast/client/spi/impl/sequence/FailFastCallIdSequence.h"
50#include "hazelcast/client/spi/impl/sequence/CallIdFactory.h"
51#include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
52#include "hazelcast/util/AddressHelper.h"
53#include "hazelcast/util/HashUtil.h"
54#include "hazelcast/util/concurrent/BackoffIdleStrategy.h"
55#ifdef HZ_BUILD_WITH_SSL
56#include <hazelcast/util/SyncHttpsClient.h>
61const std::unordered_set<member>&
73initial_membership_event::initial_membership_event(
75 std::unordered_set<member> members)
77 , members_(
std::move(members))
91ProxyManager::ProxyManager(ClientContext& context)
100ProxyManager::destroy()
102 std::lock_guard<std::recursive_mutex> guard(lock_);
103 for (
auto& p : proxies_) {
105 auto proxy = p.second.get();
106 p.second.get()->on_shutdown();
107 }
catch (std::exception& se) {
108 auto& lg = client_.get_logger();
112 boost::str(boost::format(
113 "Proxy was not created, "
114 "hence onShutdown can be called. Exception: %1%") %
122ProxyManager::initialize(
const std::shared_ptr<ClientProxy>& client_proxy)
124 auto clientMessage = protocol::codec::client_createproxy_encode(
125 client_proxy->get_name(), client_proxy->get_service_name());
126 return spi::impl::ClientInvocation::create(
127 client_, clientMessage, client_proxy->get_service_name())
129 .then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
131 client_proxy->on_initialize();
136ProxyManager::destroy_proxy(ClientProxy& proxy)
138 DefaultObjectNamespace objectNamespace(proxy.get_service_name(),
140 std::shared_ptr<ClientProxy> registeredProxy;
142 std::lock_guard<std::recursive_mutex> guard(lock_);
143 auto it = proxies_.find(objectNamespace);
144 registeredProxy = it == proxies_.end() ? nullptr : it->second.get();
145 if (it != proxies_.end()) {
151 if (registeredProxy) {
153 proxy.destroy_locally();
154 return proxy.destroy_remotely();
156 proxy.destroy_remotely();
160 if (&proxy != registeredProxy.get()) {
166 proxy.destroy_locally();
169 if (&proxy != registeredProxy.get()) {
175 proxy.destroy_locally();
179 return boost::make_ready_future();
182ClientContext::ClientContext(
const client::hazelcast_client&
hazelcast_client)
186ClientContext::ClientContext(
191serialization::pimpl::SerializationService&
192ClientContext::get_serialization_service()
194 return hazelcast_client_.serialization_service_;
197impl::ClientClusterServiceImpl&
198ClientContext::get_client_cluster_service()
200 return hazelcast_client_.cluster_service_;
203impl::ClientInvocationServiceImpl&
204ClientContext::get_invocation_service()
206 return *hazelcast_client_.invocation_service_;
210ClientContext::get_client_config()
212 return hazelcast_client_.client_config_;
215impl::ClientPartitionServiceImpl&
216ClientContext::get_partition_service()
218 return *hazelcast_client_.partition_service_;
222ClientContext::get_lifecycle_service()
224 return hazelcast_client_.lifecycle_service_;
227spi::impl::listener::listener_service_impl&
228ClientContext::get_client_listener_service()
230 return *hazelcast_client_.listener_service_;
233connection::ClientConnectionManagerImpl&
234ClientContext::get_connection_manager()
236 return *hazelcast_client_.connection_manager_;
239internal::nearcache::NearCacheManager&
240ClientContext::get_near_cache_manager()
242 return *hazelcast_client_.near_cache_manager_;
246ClientContext::get_client_properties()
248 return hazelcast_client_.client_properties_;
252ClientContext::get_cluster()
254 return hazelcast_client_.cluster_;
257std::shared_ptr<impl::sequence::CallIdSequence>&
258ClientContext::get_call_id_sequence()
const
260 return hazelcast_client_.call_id_sequence_;
263const protocol::ClientExceptionFactory&
264ClientContext::get_client_exception_factory()
const
266 return hazelcast_client_.get_exception_factory();
270ClientContext::get_name()
const
272 return hazelcast_client_.get_name();
275impl::ClientExecutionServiceImpl&
276ClientContext::get_client_execution_service()
const
278 return *hazelcast_client_.execution_service_;
281const std::shared_ptr<client::impl::ClientLockReferenceIdGenerator>&
282ClientContext::get_lock_reference_id_generator()
284 return hazelcast_client_.get_lock_reference_id_generator();
287std::shared_ptr<client::impl::hazelcast_client_instance_impl>
288ClientContext::get_hazelcast_client_implementation()
290 return hazelcast_client_.shared_from_this();
294ClientContext::get_proxy_manager()
296 return hazelcast_client_.get_proxy_manager();
300ClientContext::get_logger()
302 return *hazelcast_client_.logger_;
305client::impl::statistics::Statistics&
306ClientContext::get_clientstatistics()
308 return *hazelcast_client_.statistics_;
311spi::impl::listener::cluster_view_listener&
312ClientContext::get_cluster_view_listener()
314 return *hazelcast_client_.cluster_listener_;
318ClientContext::random_uuid()
320 return hazelcast_client_.random_uuid();
323cp::internal::session::proxy_session_manager&
324ClientContext::get_proxy_session_manager()
326 return hazelcast_client_.proxy_session_manager_;
329serialization::pimpl::default_schema_service&
330ClientContext::get_schema_service()
332 return hazelcast_client_.schema_service_;
335lifecycle_service::lifecycle_service(
336 ClientContext& client_context,
337 const std::vector<lifecycle_listener>& listeners)
338 : client_context_(client_context)
340 , shutdown_completed_latch_(1)
342 for (
const auto& listener : listeners) {
343 add_listener(lifecycle_listener(listener));
348lifecycle_service::start()
350 bool expected =
false;
351 if (!active_.compare_exchange_strong(expected,
true)) {
355 fire_lifecycle_event(lifecycle_event::STARTED);
357 client_context_.get_client_execution_service().start();
359 client_context_.get_client_listener_service().start();
361 client_context_.get_invocation_service().start();
363 client_context_.get_client_cluster_service().start();
365 client_context_.get_cluster_view_listener().start();
367 if (!client_context_.get_connection_manager().start()) {
371 auto& connectionStrategyConfig =
372 client_context_.get_client_config().get_connection_strategy_config();
373 if (!connectionStrategyConfig.is_async_start()) {
376 wait_for_initial_membership_event();
377 client_context_.get_connection_manager()
378 .connect_to_all_cluster_members();
381 client_context_.get_invocation_service().add_backup_listener();
383 client_context_.get_clientstatistics().start();
389lifecycle_service::shutdown()
391 bool expected =
true;
392 if (!active_.compare_exchange_strong(expected,
false)) {
393 shutdown_completed_latch_.wait();
397 fire_lifecycle_event(lifecycle_event::SHUTTING_DOWN);
398 client_context_.get_proxy_session_manager().shutdown();
399 client_context_.get_clientstatistics().shutdown();
400 client_context_.get_proxy_manager().destroy();
401 client_context_.get_connection_manager().shutdown();
402 client_context_.get_client_cluster_service().shutdown();
403 client_context_.get_invocation_service().shutdown();
404 client_context_.get_client_listener_service().shutdown();
405 client_context_.get_near_cache_manager().destroy_all_near_caches();
406 fire_lifecycle_event(lifecycle_event::SHUTDOWN);
407 client_context_.get_client_execution_service().shutdown();
408 client_context_.get_serialization_service().dispose();
409 shutdown_completed_latch_.count_down();
410 }
catch (std::exception& e) {
412 client_context_.get_logger(),
416 "An exception occured during LifecycleService shutdown. %1%") %
418 shutdown_completed_latch_.count_down();
425 std::lock_guard<std::mutex> lg(listener_lock_);
426 const auto id = uuid_generator_();
432lifecycle_service::remove_listener(
const boost::uuids::uuid& registration_id)
434 std::lock_guard<std::mutex> guard(listener_lock_);
435 return listeners_.erase(registration_id) == 1;
441 std::lock_guard<std::mutex> guard(listener_lock_);
442 logger& lg = client_context_.get_logger();
447 case lifecycle_event::STARTING: {
449 std::string date(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_DATE));
450 util::git_date_to_hazelcast_log_date(date);
451 std::string commitId(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_ID));
452 commitId.erase(std::remove(commitId.begin(), commitId.end(),
'"'),
457 (boost::format(
"(%1%:%2%) LifecycleService::LifecycleEvent "
458 "Client (%3%) is STARTING") %
460 client_context_.get_connection_manager().get_client_uuid())
466 "(%s:%s) LifecycleService::LifecycleEvent STARTING",
469 HZ_LOG(lg, info, msg);
472 listener.starting_();
476 case lifecycle_event::STARTED: {
477 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent STARTED");
484 case lifecycle_event::SHUTTING_DOWN: {
485 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent SHUTTING_DOWN");
488 listener.shutting_down_();
492 case lifecycle_event::SHUTDOWN: {
493 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent SHUTDOWN");
496 listener.shutdown_();
500 case lifecycle_event::CLIENT_CONNECTED: {
502 lg, info,
"LifecycleService::LifecycleEvent CLIENT_CONNECTED");
505 listener.connected_();
509 case lifecycle_event::CLIENT_DISCONNECTED: {
511 lg, info,
"LifecycleService::LifecycleEvent CLIENT_DISCONNECTED");
514 listener.disconnected_();
520 for (
auto& item : listeners_) {
521 fire_one(item.second);
526lifecycle_service::is_running()
531lifecycle_service::~lifecycle_service()
539lifecycle_service::wait_for_initial_membership_event()
const
541 client_context_.get_client_cluster_service()
542 .wait_initial_member_list_fetched();
545DefaultObjectNamespace::DefaultObjectNamespace(
const std::string& service,
546 const std::string&
object)
547 : service_name_(service)
548 , object_name_(object)
552DefaultObjectNamespace::get_service_name()
const
554 return service_name_;
558DefaultObjectNamespace::get_object_name()
const
564DefaultObjectNamespace::operator==(
const DefaultObjectNamespace& rhs)
const
566 return service_name_ == rhs.service_name_ &&
567 object_name_ == rhs.object_name_;
570ClientProxy::ClientProxy(
const std::string& name,
571 const std::string& service_name,
572 ClientContext& context)
574 , service_name_(service_name)
578ClientProxy::~ClientProxy() =
default;
581ClientProxy::get_name()
const
587ClientProxy::get_service_name()
const
589 return service_name_;
593ClientProxy::get_context()
599ClientProxy::on_destroy()
603ClientProxy::destroy()
605 return get_context().get_proxy_manager().destroy_proxy(*
this);
609ClientProxy::destroy_locally()
623ClientProxy::pre_destroy()
629ClientProxy::post_destroy()
633ClientProxy::on_initialize()
637ClientProxy::on_shutdown()
640serialization::pimpl::SerializationService&
641ClientProxy::get_serialization_service()
643 return context_.get_serialization_service();
647ClientProxy::destroy_remotely()
649 auto clientMessage = protocol::codec::client_destroyproxy_encode(
650 get_name(), get_service_name());
651 return spi::impl::ClientInvocation::create(
653 std::make_shared<protocol::ClientMessage>(
654 std::move(clientMessage)),
657 .then(boost::launch::sync,
658 [](boost::future<protocol::ClientMessage> f) { f.get(); });
661boost::future<boost::uuids::uuid>
662ClientProxy::register_listener(
663 std::shared_ptr<impl::ListenerMessageCodec> listener_message_codec,
664 std::shared_ptr<client::impl::BaseEventHandler> handler)
666 return get_context().get_client_listener_service().register_listener(
667 listener_message_codec, handler);
671ClientProxy::deregister_listener(boost::uuids::uuid registration_id)
673 return get_context().get_client_listener_service().deregister_listener(
679ListenerMessageCodec::decode_add_response(protocol::ClientMessage& msg)
const
681 return msg.get_first_uuid();
685ListenerMessageCodec::decode_remove_response(protocol::ClientMessage& msg)
const
687 return msg.get_first_fixed_sized_field<
bool>();
690ClientInvocationServiceImpl::ClientInvocationServiceImpl(ClientContext& client)
692 , logger_(client.get_logger())
693 , invocation_timeout_(
694 std::chrono::seconds(client.get_client_properties().get_integer(
695 client.get_client_properties().get_invocation_timeout_seconds())))
696 , invocation_retry_pause_(
697 std::chrono::milliseconds(client.get_client_properties().get_long(
698 client.get_client_properties().get_invocation_retry_pause_millis())))
700 client.get_client_config().get_network_config().is_smart_routing())
701 , backup_acks_enabled_(smart_routing_ &&
702 client.get_client_config().backup_acks_enabled())
703 , fail_on_indeterminate_operation_state_(
704 client.get_client_properties().get_boolean(
705 client.get_client_properties().fail_on_indeterminate_state()))
707 std::chrono::milliseconds(client.get_client_properties().get_integer(
708 client.get_client_properties().backup_timeout_millis())))
712ClientInvocationServiceImpl::start()
716ClientInvocationServiceImpl::add_backup_listener()
718 if (this->backup_acks_enabled_) {
719 auto& listener_service = this->client_.get_client_listener_service();
721 .register_listener(std::make_shared<BackupListenerMessageCodec>(),
722 std::make_shared<noop_backup_event_handler>(logger_))
728ClientInvocationServiceImpl::shutdown()
730 is_shutdown_.store(
true);
733std::chrono::milliseconds
734ClientInvocationServiceImpl::get_invocation_timeout()
const
736 return invocation_timeout_;
739std::chrono::milliseconds
740ClientInvocationServiceImpl::get_invocation_retry_pause()
const
742 return invocation_retry_pause_;
746ClientInvocationServiceImpl::is_redo_operation()
748 return client_.get_client_config().is_redo_operation();
752ClientInvocationServiceImpl::handle_client_message(
753 const std::shared_ptr<ClientInvocation>& invocation,
754 const std::shared_ptr<protocol::ClientMessage>& response)
757 if (protocol::codec::ErrorCodec::EXCEPTION_MESSAGE_TYPE ==
758 response->get_message_type()) {
759 auto error_holder = protocol::codec::ErrorCodec::decode(*response);
760 invocation->notify_exception(
761 client_.get_client_exception_factory().create_exception(
764 invocation->notify(response);
766 }
catch (std::exception& e) {
770 boost::str(boost::format(
"Failed to process response for %1%. %2%") %
771 *invocation % e.what()));
776ClientInvocationServiceImpl::send(
777 const std::shared_ptr<impl::ClientInvocation>& invocation,
778 const std::shared_ptr<connection::Connection>& connection)
781 BOOST_THROW_EXCEPTION(exception::hazelcast_client_not_active(
782 "ClientInvocationServiceImpl::send",
"Client is shut down"));
785 if (backup_acks_enabled_) {
786 invocation->get_client_message()->add_flag(
787 protocol::ClientMessage::BACKUP_AWARE_FLAG);
790 write_to_connection(*connection, invocation);
791 invocation->set_send_connection(connection);
796ClientInvocationServiceImpl::write_to_connection(
797 connection::Connection& connection,
798 const std::shared_ptr<ClientInvocation>& client_invocation)
800 auto clientMessage = client_invocation->get_client_message();
801 connection.write(client_invocation);
805ClientInvocationServiceImpl::check_invocation_allowed()
807 client_.get_connection_manager().check_invocation_allowed();
811ClientInvocationServiceImpl::check_urgent_invocation_allowed(
812 const ClientInvocation& invocation)
814 if (client_.get_connection_manager().client_initialized_on_cluster()) {
821 if (!client_.get_hazelcast_client_implementation()
822 ->should_check_urgent_invocations()) {
839 if (invocation.get_client_message()
840 ->contains_serialized_data_in_request()) {
842 "ClientInvocationServiceImpl::check_urgent_invocation_allowed",
849ClientInvocationServiceImpl::invoke(
850 std::shared_ptr<ClientInvocation> invocation)
852 auto connection = client_.get_connection_manager().get_random_connection();
854 HZ_LOG(logger_, finest,
"No connection found to invoke");
857 return send(invocation, connection);
860DefaultAddressProvider::DefaultAddressProvider(
862 : network_config_(network_config)
866DefaultAddressProvider::load_addresses()
868 std::vector<address> addresses = network_config_.get_addresses();
869 if (addresses.empty()) {
870 addresses.emplace_back(
"127.0.0.1", 5701);
879boost::optional<address>
880DefaultAddressProvider::translate(
const address& addr)
886DefaultAddressProvider::is_default_provider()
891const boost::shared_ptr<ClientClusterServiceImpl::member_list_snapshot>
892 ClientClusterServiceImpl::EMPTY_SNAPSHOT(
893 new ClientClusterServiceImpl::member_list_snapshot{ -1 });
895constexpr boost::chrono::milliseconds
896 ClientClusterServiceImpl::INITIAL_MEMBERS_TIMEOUT;
900ClientClusterServiceImpl::ClientClusterServiceImpl(
901 hazelcast::client::spi::ClientContext& client)
903 , member_list_snapshot_(EMPTY_SNAPSHOT)
904 , labels_(client.get_client_config().get_labels())
905 , initial_list_fetched_latch_(1)
909ClientClusterServiceImpl::add_membership_listener_without_init(
912 std::lock_guard<std::mutex> g(listeners_lock_);
913 auto id = client_.random_uuid();
914 listeners_.emplace(
id, std::move(listener));
918boost::optional<member>
919ClientClusterServiceImpl::get_member(boost::uuids::uuid uuid)
const
921 assert(!uuid.is_nil());
922 auto members_view_ptr = member_list_snapshot_.load();
923 const auto it = members_view_ptr->members.find(uuid);
924 if (it == members_view_ptr->members.end()) {
927 return { it->second };
931ClientClusterServiceImpl::get_member_list()
const
933 auto members_view_ptr = member_list_snapshot_.load();
934 std::vector<member> result;
935 result.reserve(members_view_ptr->members.size());
936 for (
const auto& e : members_view_ptr->members) {
937 result.emplace_back(e.second);
943ClientClusterServiceImpl::start()
945 for (
auto& listener :
946 client_.get_client_config().get_membership_listeners()) {
952ClientClusterServiceImpl::fire_initial_membership_event(
955 std::lock_guard<std::mutex> g(listeners_lock_);
957 for (
auto& item : listeners_) {
959 if (listener.init_) {
960 listener.init_(event);
966ClientClusterServiceImpl::shutdown()
968 initial_list_fetched_latch_.try_count_down();
972ClientClusterServiceImpl::add_membership_listener(
975 std::lock_guard<std::mutex> cluster_view_g(cluster_view_lock_);
977 auto id = add_membership_listener_without_init(std::move(listener));
979 std::lock_guard<std::mutex> listeners_g(listeners_lock_);
980 auto added_listener = listeners_[id];
982 if (added_listener.init_) {
983 auto&
cluster = client_.get_cluster();
984 auto members_ptr = member_list_snapshot_.load();
985 if (!members_ptr->members.empty()) {
986 std::unordered_set<member> members;
987 for (
const auto& e : members_ptr->members) {
988 members.insert(e.second);
998ClientClusterServiceImpl::remove_membership_listener(
999 boost::uuids::uuid registration_id)
1001 std::lock_guard<std::mutex> g(listeners_lock_);
1002 return listeners_.erase(registration_id) == 1;
1006ClientClusterServiceImpl::get_members(
const member_selector& selector)
const
1008 std::vector<member> result;
1009 for (
auto&&
member : get_member_list()) {
1010 if (selector.select(
member)) {
1011 result.emplace_back(std::move(
member));
1019ClientClusterServiceImpl::get_local_client()
const
1021 connection::ClientConnectionManagerImpl& cm =
1022 client_.get_connection_manager();
1023 auto connection = cm.get_random_connection();
1024 auto inetSocketAddress =
1025 connection ? connection->get_local_socket_address() : boost::none;
1026 auto uuid = cm.get_client_uuid();
1028 uuid, std::move(inetSocketAddress), client_.get_name(), labels_);
1032ClientClusterServiceImpl::clear_member_list_version()
1034 std::lock_guard<std::mutex> g(cluster_view_lock_);
1035 auto& lg = client_.get_logger();
1036 HZ_LOG(lg, finest,
"Resetting the member list version ");
1037 auto cluster_view_snapshot = member_list_snapshot_.load();
1040 if (cluster_view_snapshot != EMPTY_SNAPSHOT) {
1041 member_list_snapshot_.store(boost::shared_ptr<member_list_snapshot>(
1042 new member_list_snapshot{ 0, cluster_view_snapshot->members }));
1046std::vector<membership_event>
1047ClientClusterServiceImpl::clear_member_list_and_return_events()
1049 std::lock_guard<std::mutex> g(cluster_view_lock_);
1051 auto& lg = client_.get_logger();
1052 HZ_LOG(lg, finest,
"Resetting the member list");
1054 auto previous_list = member_list_snapshot_.load()->members;
1056 member_list_snapshot_.store(
1057 boost::shared_ptr<member_list_snapshot>(
new member_list_snapshot{ 0 }));
1059 return detect_membership_events(
1061 std::unordered_map<boost::uuids::uuid,
1063 boost::hash<boost::uuids::uuid>>());
1067ClientClusterServiceImpl::clear_member_list()
1069 auto events = clear_member_list_and_return_events();
1070 fire_events(std::move(events));
1074ClientClusterServiceImpl::handle_event(int32_t version,
1075 const std::vector<member>& member_infos)
1077 auto& lg = client_.get_logger();
1082 boost::format(
"Handling new snapshot with membership version: %1%, "
1083 "membersString %2%") %
1084 version % members_string(create_snapshot(version, member_infos))));
1085 auto cluster_view_snapshot = member_list_snapshot_.load();
1086 if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
1087 std::lock_guard<std::mutex> g(cluster_view_lock_);
1088 cluster_view_snapshot = member_list_snapshot_.load();
1089 if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
1091 apply_initial_state(version, member_infos);
1092 initial_list_fetched_latch_.count_down();
1097 std::vector<membership_event> events;
1098 if (version >= cluster_view_snapshot->version) {
1099 std::lock_guard<std::mutex> g(cluster_view_lock_);
1100 cluster_view_snapshot = member_list_snapshot_.load();
1101 if (version >= cluster_view_snapshot->version) {
1102 auto prev_members = cluster_view_snapshot->members;
1103 auto snapshot = boost::make_shared<member_list_snapshot>(
1104 create_snapshot(version, member_infos));
1105 member_list_snapshot_.store(snapshot);
1106 events = detect_membership_events(prev_members, snapshot->members);
1110 fire_events(std::move(events));
1113ClientClusterServiceImpl::member_list_snapshot
1114ClientClusterServiceImpl::create_snapshot(int32_t version,
1115 const std::vector<member>& members)
1117 member_list_snapshot result;
1118 result.version = version;
1119 for (
auto& m : members) {
1120 auto const& address_map = m.address_map();
1121 if (address_map.empty()) {
1122 result.members.insert({ m.get_uuid(), m });
1124 auto found = address_map.find(CLIENT);
1126 if (found != address_map.end()) {
1127 member_address = found->second;
1129 found = address_map.find(MEMBER);
1130 assert(found != address_map.end());
1131 member_address = found->second;
1133 member new_member(member_address,
1139 result.members.emplace(new_member.get_uuid(),
1140 std::move(new_member));
1148ClientClusterServiceImpl::members_string(
1149 const ClientClusterServiceImpl::member_list_snapshot& snapshot)
1151 std::stringstream out;
1152 auto const& members = snapshot.members;
1153 out << std::endl << std::endl <<
"Members [" << members.size() <<
"] {";
1154 for (
auto const& e : members) {
1155 out << std::endl <<
"\t" << e.second;
1157 out << std::endl <<
"}" << std::endl;
1162ClientClusterServiceImpl::apply_initial_state(
1164 const std::vector<member>& member_infos)
1166 auto snapshot = boost::make_shared<member_list_snapshot>(
1167 create_snapshot(version, member_infos));
1168 member_list_snapshot_.store(snapshot);
1169 HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
1170 std::unordered_set<member> members;
1171 for (
auto const& e : snapshot->members) {
1172 members.insert(e.second);
1174 std::lock_guard<std::mutex> g(listeners_lock_);
1175 for (
auto& item : listeners_) {
1177 if (listener.init_) {
1184std::vector<membership_event>
1185ClientClusterServiceImpl::detect_membership_events(
1186 std::unordered_map<boost::uuids::uuid,
1188 boost::hash<boost::uuids::uuid>> previous_members,
1189 const std::unordered_map<boost::uuids::uuid,
1191 boost::hash<boost::uuids::uuid>>& current_members)
1193 std::vector<member> new_members;
1195 for (
auto const& e : current_members) {
1196 if (!previous_members.erase(e.first)) {
1197 new_members.emplace_back(e.second);
1201 std::vector<membership_event> events;
1204 for (
auto const& e : previous_members) {
1205 events.emplace_back(
1206 client_.get_cluster(),
1208 membership_event::membership_event_type::MEMBER_LEFT,
1211 client_.get_connection_manager().get_connection(e.second.get_uuid());
1215 std::make_exception_ptr(exception::target_disconnected(
1216 "ClientClusterServiceImpl::detect_membership_events",
1218 "The client has closed the connection to this member, after "
1219 "receiving a member left event from the cluster. %1%") %
1224 for (
auto const&
member : new_members) {
1225 events.emplace_back(
1226 client_.get_cluster(),
1228 membership_event::membership_event_type::MEMBER_JOINED,
1232 if (!events.empty()) {
1233 auto snapshot = member_list_snapshot_.load();
1234 if (!snapshot->members.empty()) {
1235 HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
1242ClientClusterServiceImpl::fire_events(std::vector<membership_event> events)
1244 std::lock_guard<std::mutex> g(listeners_lock_);
1246 for (
auto const& event : events) {
1247 for (
auto& item : listeners_) {
1249 if (event.get_event_type() ==
1250 membership_event::membership_event_type::MEMBER_JOINED) {
1251 listener.joined_(event);
1253 listener.left_(event);
1260ClientClusterServiceImpl::wait_initial_member_list_fetched()
const
1264 if ((
const_cast<boost::latch&
>(initial_list_fetched_latch_))
1265 .wait_for(INITIAL_MEMBERS_TIMEOUT) == boost::cv_status::timeout) {
1266 BOOST_THROW_EXCEPTION(exception::illegal_state(
1267 "ClientClusterServiceImpl::wait_initial_member_list_fetched",
1268 "Could not get initial member list from cluster!"));
1273ClientInvocationServiceImpl::invoke_on_connection(
1274 const std::shared_ptr<ClientInvocation>& invocation,
1275 const std::shared_ptr<connection::Connection>& connection)
1277 return send(invocation, connection);
1281ClientInvocationServiceImpl::invoke_on_partition_owner(
1282 const std::shared_ptr<ClientInvocation>& invocation,
1285 auto partition_owner =
1286 client_.get_partition_service().get_partition_owner(partition_id);
1287 if (partition_owner.is_nil()) {
1292 "Partition owner is not assigned yet for partition %1%") %
1296 return invoke_on_target(invocation, partition_owner);
1300ClientInvocationServiceImpl::invoke_on_target(
1301 const std::shared_ptr<ClientInvocation>& invocation,
1302 boost::uuids::uuid uuid)
1304 assert(!uuid.is_nil());
1305 auto connection = client_.get_connection_manager().get_connection(uuid);
1310 boost::str(boost::format(
"Client is not connected to target : %1%") %
1314 return send(invocation, connection);
1318ClientInvocationServiceImpl::is_smart_routing()
const
1320 return smart_routing_;
1323const std::chrono::milliseconds&
1324ClientInvocationServiceImpl::get_backup_timeout()
const
1326 return backup_timeout_;
1330ClientInvocationServiceImpl::fail_on_indeterminate_state()
const
1332 return fail_on_indeterminate_operation_state_;
1335ClientExecutionServiceImpl::ClientExecutionServiceImpl(
1336 const std::string& name,
1338 int32_t user_pool_size,
1339 spi::lifecycle_service& service)
1340 : lifecycle_service_(service)
1341 , client_properties_(properties)
1342 , user_pool_size_(user_pool_size)
1348ClientExecutionServiceImpl::start()
1350 int internalPoolSize = client_properties_.get_integer(
1351 client_properties_.get_internal_executor_pool_size());
1352 if (internalPoolSize <= 0) {
1353 internalPoolSize = util::IOUtil::to_value<int>(
1354 client_properties::INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT);
1357 internal_executor_.reset(
1358 new hazelcast::util::hz_thread_pool(internalPoolSize));
1360 if (user_pool_size_ <= 0) {
1361 user_executor_.reset(
new hazelcast::util::hz_thread_pool());
1363 user_executor_.reset(
1364 new hazelcast::util::hz_thread_pool(user_pool_size_));
1367 schema_replication_executor_.reset(
new hazelcast::util::hz_thread_pool());
1371ClientExecutionServiceImpl::shutdown()
1373 shutdown_thread_pool(internal_executor_.get());
1374 shutdown_thread_pool(user_executor_.get());
1375 shutdown_thread_pool(schema_replication_executor_.get());
1378util::hz_thread_pool&
1379ClientExecutionServiceImpl::get_user_executor()
1381 return *user_executor_;
1384util::hz_thread_pool&
1385ClientExecutionServiceImpl::get_schema_replication_executor()
1387 return *schema_replication_executor_;
1391ClientExecutionServiceImpl::shutdown_thread_pool(
1392 hazelcast::util::hz_thread_pool* pool)
1400constexpr int ClientInvocation::MAX_FAST_INVOCATION_COUNT;
1401constexpr int ClientInvocation::UNASSIGNED_PARTITION;
1403ClientInvocation::ClientInvocation(
1404 spi::ClientContext& client_context,
1405 std::shared_ptr<protocol::ClientMessage>&& message,
1406 const std::string& name,
1408 const std::shared_ptr<connection::Connection>& conn,
1409 boost::uuids::uuid uuid)
1410 : logger_(client_context.get_logger())
1411 , lifecycle_service_(client_context.get_lifecycle_service())
1412 , invocation_service_(client_context.get_invocation_service())
1413 , execution_service_(
1414 client_context.get_client_execution_service().shared_from_this())
1415 , schema_service_(client_context.get_schema_service())
1416 , call_id_sequence_(client_context.get_call_id_sequence())
1418 , partition_id_(partition)
1419 , start_time_(std::chrono::steady_clock::now())
1420 , retry_pause_(invocation_service_.get_invocation_retry_pause())
1421 , object_name_(name)
1423 , bound_to_single_connection_(conn != nullptr)
1426 , smart_routing_(invocation_service_.is_smart_routing())
1428 message->set_partition_id(partition_id_);
1430 boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(message);
1431 set_send_connection(
nullptr);
1434ClientInvocation::~ClientInvocation() =
default;
1436boost::future<protocol::ClientMessage>
1437ClientInvocation::invoke()
1439 assert(client_message_.load());
1441 auto actual_work = [
this]() {
1443 call_id_sequence_->next();
1444 invoke_on_selection();
1445 if (!lifecycle_service_.is_running()) {
1446 return invocation_promise_.get_future().then(
1447 [](boost::future<protocol::ClientMessage> f) {
return f.get(); });
1449 auto id_seq = call_id_sequence_;
1450 return invocation_promise_.get_future().then(
1451 execution_service_->get_user_executor(),
1452 [=](boost::future<protocol::ClientMessage> f) {
1458 const auto& schemas =
1459 (*(client_message_.load()))->schemas_will_be_replicated();
1461 if (!schemas.empty()) {
1462 auto self = shared_from_this();
1464 return replicate_schemas(schemas)
1465 .then(boost::launch::sync,
1466 [actual_work, self](boost::future<void> replication) {
1469 return actual_work();
1474 return actual_work();
1477boost::future<protocol::ClientMessage>
1478ClientInvocation::invoke_urgent()
1480 assert(client_message_.load());
1484 call_id_sequence_->force_next();
1485 invoke_on_selection();
1486 if (!lifecycle_service_.is_running()) {
1487 return invocation_promise_.get_future().then(
1488 [](boost::future<protocol::ClientMessage> f) {
return f.get(); });
1490 auto id_seq = call_id_sequence_;
1491 return invocation_promise_.get_future().then(
1492 execution_service_->get_user_executor(),
1493 [=](boost::future<protocol::ClientMessage> f) {
1500ClientInvocation::replicate_schemas(
1501 std::vector<serialization::pimpl::schema> schemas)
1503 std::weak_ptr<ClientInvocation> self = shared_from_this();
1505 return boost::async(
1506 execution_service_->get_schema_replication_executor(), [self, schemas]() {
1507 auto invocation = self.lock();
1512 for (const serialization::pimpl::schema& s : schemas) {
1513 invocation->schema_service_.replicate_schema_in_cluster(s);
1519ClientInvocation::invoke_on_selection()
1524 invocation_service_.check_urgent_invocation_allowed(*
this);
1526 invocation_service_.check_invocation_allowed();
1529 if (is_bind_to_single_connection()) {
1530 bool invoked =
false;
1531 auto conn = connection_.lock();
1533 invoked = invocation_service_.invoke_on_connection(
1534 shared_from_this(), conn);
1537 std::string message;
1540 (boost::format(
"Could not invoke on connection %1%") %
1544 message =
"Could not invoke. Bound to a connection that is "
1547 notify_exception(std::make_exception_ptr(exception::io(
1548 "ClientInvocation::invoke_on_selection", message)));
1553 bool invoked =
false;
1554 if (smart_routing_) {
1555 if (partition_id_ != -1) {
1556 invoked = invocation_service_.invoke_on_partition_owner(
1557 shared_from_this(), partition_id_);
1558 }
else if (!uuid_.is_nil()) {
1559 invoked = invocation_service_.invoke_on_target(
1560 shared_from_this(), uuid_);
1562 invoked = invocation_service_.invoke(shared_from_this());
1565 invoked = invocation_service_.invoke(shared_from_this());
1568 invoked = invocation_service_.invoke(shared_from_this());
1571 notify_exception(std::make_exception_ptr(
1572 exception::io(
"No connection found to invoke")));
1574 }
catch (exception::iexception&) {
1575 notify_exception(std::current_exception());
1576 }
catch (std::exception&) {
1582ClientInvocation::is_bind_to_single_connection()
const
1584 return bound_to_single_connection_;
1588ClientInvocation::run()
1594ClientInvocation::retry()
1600 boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(
1604 invoke_on_selection();
1605 }
catch (exception::iexception& e) {
1606 set_exception(e, boost::current_exception());
1607 }
catch (std::exception&) {
1613ClientInvocation::set_exception(
const std::exception& e,
1614 boost::exception_ptr exception_ptr)
1616 invoked_or_exception_set_.store(
true);
1618 auto send_conn = send_connection_.load();
1620 auto connection = send_conn->lock();
1623 client_message_.load()->get()->get_correlation_id();
1625 connection->get_socket().get_executor(),
1626 [=]() { connection->deregister_invocation(call_id); });
1629 invocation_promise_.set_exception(std::move(exception_ptr));
1630 }
catch (boost::promise_already_satisfied& se) {
1631 if (!event_handler_) {
1634 boost::str(boost::format(
1635 "Failed to set the exception for invocation. "
1636 "%1%, %2% Exception to be set: %3%") %
1637 se.what() % *
this % e.what()));
1643ClientInvocation::notify_exception(std::exception_ptr exception)
1647 std::rethrow_exception(exception);
1648 }
catch (exception::iexception& iex) {
1651 if (!lifecycle_service_.is_running()) {
1653 std::throw_with_nested(boost::enable_current_exception(
1654 exception::hazelcast_client_not_active(
1655 iex.get_source(),
"Client is shutting down")));
1656 }
catch (exception::iexception& e) {
1657 set_exception(e, boost::current_exception());
1662 if (!should_retry(iex)) {
1663 set_exception(iex, boost::current_exception());
1667 auto timePassed = std::chrono::steady_clock::now() - start_time_;
1668 if (timePassed > invocation_service_.get_invocation_timeout()) {
1672 boost::str(boost::format(
"Exception will not be retried because "
1673 "invocation timed out. %1%") %
1676 auto now = std::chrono::steady_clock::now();
1678 auto timeoutException =
1679 (exception::exception_builder<exception::operation_timeout>(
1680 "ClientInvocation::newoperation_timeout_exception")
1682 <<
" timed out because exception occurred after client "
1683 "invocation timeout "
1684 << std::chrono::duration_cast<std::chrono::milliseconds>(
1685 invocation_service_.get_invocation_timeout())
1687 <<
"msecs. Last exception:" << iex <<
" Current time :"
1688 << util::StringUtil::time_to_string(now) <<
". "
1690 << util::StringUtil::time_to_string(start_time_)
1691 <<
". Total elapsed time: "
1692 << std::chrono::duration_cast<std::chrono::milliseconds>(
1698 BOOST_THROW_EXCEPTION(timeoutException);
1700 set_exception(timeoutException, boost::current_exception());
1708 }
catch (std::exception& e) {
1709 set_exception(e, boost::current_exception());
1717ClientInvocation::erase_invocation()
const
1719 if (!this->event_handler_) {
1720 auto sent_connection = get_send_connection();
1721 if (sent_connection) {
1722 auto this_invocation = shared_from_this();
1723 boost::asio::post(sent_connection->get_socket().get_executor(),
1725 sent_connection->invocations.erase(
1726 this_invocation->get_client_message()
1727 ->get_correlation_id());
1734ClientInvocation::should_retry(exception::iexception& exception)
1736 auto errorCode = exception.get_error_code();
1737 if (is_bind_to_single_connection() &&
1738 (errorCode == protocol::IO ||
1739 errorCode == protocol::TARGET_DISCONNECTED)) {
1743 if (!uuid_.is_nil() && errorCode == protocol::TARGET_NOT_MEMBER) {
1750 if (errorCode == protocol::IO ||
1751 errorCode == protocol::HAZELCAST_INSTANCE_NOT_ACTIVE ||
1752 exception.is_retryable()) {
1755 if (errorCode == protocol::TARGET_DISCONNECTED) {
1756 return client_message_.load()->get()->is_retryable() ||
1757 invocation_service_.is_redo_operation();
1763operator<<(std::ostream& os,
const ClientInvocation& invocation)
1765 std::ostringstream target;
1766 if (invocation.is_bind_to_single_connection()) {
1767 auto conn = invocation.connection_.lock();
1769 target <<
"connection " << *conn;
1771 }
else if (invocation.partition_id_ != -1) {
1772 target <<
"partition " << invocation.partition_id_;
1773 }
else if (!invocation.uuid_.is_nil()) {
1774 target <<
"uuid " << boost::to_string(invocation.uuid_);
1778 os <<
"ClientInvocation{"
1779 <<
"requestMessage = " << *invocation.client_message_.load()->get()
1780 <<
", objectName = " << invocation.object_name_
1781 <<
", target = " << target.str() <<
", sendConnection = ";
1782 auto sendConnection = invocation.get_send_connection();
1783 if (sendConnection) {
1784 os << *sendConnection;
1788 os <<
", backup_acks_expected_ = "
1789 <<
static_cast<int>(invocation.backup_acks_expected_)
1790 <<
", backup_acks_received = " << invocation.backup_acks_received_;
1792 if (invocation.pending_response_) {
1793 os <<
", pending_response: " << *invocation.pending_response_;
1801std::shared_ptr<ClientInvocation>
1802ClientInvocation::create(
1803 spi::ClientContext& client_context,
1804 std::shared_ptr<protocol::ClientMessage>&& client_message,
1805 const std::string& object_name,
1808 return std::shared_ptr<ClientInvocation>(
new ClientInvocation(
1809 client_context, std::move(client_message), object_name, partition_id));
1812std::shared_ptr<ClientInvocation>
1813ClientInvocation::create(
1814 spi::ClientContext& client_context,
1815 std::shared_ptr<protocol::ClientMessage>&& client_message,
1816 const std::string& object_name,
1817 const std::shared_ptr<connection::Connection>& connection)
1819 return std::shared_ptr<ClientInvocation>(
1820 new ClientInvocation(client_context,
1821 std::move(client_message),
1823 UNASSIGNED_PARTITION,
1827std::shared_ptr<ClientInvocation>
1828ClientInvocation::create(
1829 spi::ClientContext& client_context,
1830 std::shared_ptr<protocol::ClientMessage>&& client_message,
1831 const std::string& object_name,
1832 boost::uuids::uuid uuid)
1834 return std::shared_ptr<ClientInvocation>(
1835 new ClientInvocation(client_context,
1836 std::move(client_message),
1838 UNASSIGNED_PARTITION,
1843std::shared_ptr<ClientInvocation>
1844ClientInvocation::create(spi::ClientContext& client_context,
1845 protocol::ClientMessage& client_message,
1846 const std::string& object_name,
1851 std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1856std::shared_ptr<ClientInvocation>
1857ClientInvocation::create(
1858 spi::ClientContext& client_context,
1859 protocol::ClientMessage& client_message,
1860 const std::string& object_name,
1861 const std::shared_ptr<connection::Connection>& connection)
1865 std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1870std::shared_ptr<ClientInvocation>
1871ClientInvocation::create(spi::ClientContext& client_context,
1872 protocol::ClientMessage& client_message,
1873 const std::string& object_name,
1874 boost::uuids::uuid uuid)
1878 std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1883std::shared_ptr<connection::Connection>
1884ClientInvocation::get_send_connection()
const
1886 return send_connection_.load()->lock();
1890ClientInvocation::wait_invoked()
const
1893 while (!invoked_or_exception_set_) {
1894 std::this_thread::sleep_for(retry_pause_);
1899ClientInvocation::set_send_connection(
1900 const std::shared_ptr<connection::Connection>& conn)
1902 send_connection_.store(
1903 boost::make_shared<std::weak_ptr<connection::Connection>>(conn));
1904 invoked_or_exception_set_.store(
true);
1908ClientInvocation::notify(
const std::shared_ptr<protocol::ClientMessage>& msg)
1911 BOOST_THROW_EXCEPTION(
1912 exception::illegal_argument(
"response can't be null"));
1915 int8_t expected_backups = msg->get_number_of_backups();
1920 if (expected_backups > backup_acks_received_) {
1925 pending_response_received_time_ = std::chrono::steady_clock::now();
1927 backup_acks_expected_ = expected_backups;
1933 pending_response_ = msg;
1950ClientInvocation::complete(
const std::shared_ptr<protocol::ClientMessage>& msg)
1954 this->invocation_promise_.set_value(*msg);
1955 }
catch (std::exception& e) {
1958 boost::str(boost::format(
1959 "Failed to set the response for invocation. "
1960 "Dropping the response. %1%, %2% Response: %3%") %
1961 e.what() % *
this % *msg));
1963 this->erase_invocation();
1966std::shared_ptr<protocol::ClientMessage>
1967ClientInvocation::get_client_message()
const
1969 return *client_message_.load();
1972const std::shared_ptr<EventHandler<protocol::ClientMessage>>&
1973ClientInvocation::get_event_handler()
const
1975 return event_handler_;
1979ClientInvocation::set_event_handler(
1980 const std::shared_ptr<EventHandler<protocol::ClientMessage>>& handler)
1982 ClientInvocation::event_handler_ = handler;
1986ClientInvocation::execute()
1988 auto this_invocation = shared_from_this();
1989 auto command = [=]() { this_invocation->run(); };
1995 int64_t callId = call_id_sequence_->force_next();
1996 client_message_.load()->get()->set_correlation_id(callId);
1999 call_id_sequence_->complete();
2001 if (invoke_count_ < MAX_FAST_INVOCATION_COUNT) {
2003 execution_service_->execute(command);
2006 int64_t delayMillis = util::min<int64_t>(
2007 static_cast<int64_t
>(1)
2008 << (invoke_count_ - MAX_FAST_INVOCATION_COUNT),
2009 std::chrono::duration_cast<std::chrono::milliseconds>(retry_pause_)
2011 retry_timer_ = execution_service_->schedule(
2012 command, std::chrono::milliseconds(delayMillis));
2017ClientInvocation::get_name()
const
2019 return "ClientInvocation";
2022std::shared_ptr<protocol::ClientMessage>
2023ClientInvocation::copy_message()
2025 return std::make_shared<protocol::ClientMessage>(**client_message_.load());
2028boost::promise<protocol::ClientMessage>&
2029ClientInvocation::get_promise()
2031 return invocation_promise_;
2035ClientInvocation::log_exception(exception::iexception& e)
2039 boost::str(boost::format(
2040 "Invocation got an exception %1%, invoke count : %2%, "
2041 "exception : %3%") %
2042 *
this % invoke_count_.load() % e));
2046ClientInvocation::notify_backup()
2048 ++backup_acks_received_;
2050 if (!pending_response_) {
2058 if (backup_acks_expected_ != backup_acks_received_) {
2067 complete_with_pending_response();
2071ClientInvocation::detect_and_handle_backup_timeout(
2072 const std::chrono::milliseconds& backup_timeout)
2077 if (backup_acks_expected_ == backup_acks_received_) {
2084 if (!pending_response_) {
2090 if (pending_response_received_time_ + backup_timeout >=
2091 std::chrono::steady_clock::now()) {
2095 if (invocation_service_.fail_on_indeterminate_state()) {
2096 auto exception = boost::enable_current_exception(
2097 (exception::exception_builder<
2098 exception::indeterminate_operation_state>(
2099 "ClientInvocation::detect_and_handle_backup_timeout")
2100 << *
this <<
" failed because backup acks missed.")
2102 notify_exception(std::make_exception_ptr(exception));
2108 complete_with_pending_response();
2112ClientInvocation::complete_with_pending_response()
2114 complete(pending_response_);
2118impl::ClientTransactionManagerServiceImpl::get_client()
const
2123ClientTransactionManagerServiceImpl::ClientTransactionManagerServiceImpl(
2124 ClientContext& client)
2128std::shared_ptr<connection::Connection>
2129ClientTransactionManagerServiceImpl::connect()
2131 auto& invocationService = client_.get_invocation_service();
2132 auto startTime = std::chrono::steady_clock::now();
2133 auto invocationTimeout = invocationService.get_invocation_timeout();
2137 while (client_.get_lifecycle_service().is_running()) {
2140 client_.get_connection_manager().get_random_connection();
2142 throw_exception(smartRouting);
2145 }
catch (exception::hazelcast_client_offline&) {
2148 if (std::chrono::steady_clock::now() - startTime >
2149 invocationTimeout) {
2150 std::rethrow_exception(new_operation_timeout_exception(
2151 std::current_exception(), invocationTimeout, startTime));
2154 std::this_thread::sleep_for(
2155 invocationService.get_invocation_retry_pause());
2157 BOOST_THROW_EXCEPTION(exception::hazelcast_client_not_active(
2158 "ClientTransactionManagerServiceImpl::connect",
"Client is shutdown"));
2162ClientTransactionManagerServiceImpl::new_operation_timeout_exception(
2163 std::exception_ptr cause,
2164 std::chrono::milliseconds invocation_timeout,
2165 std::chrono::steady_clock::time_point start_time)
2167 std::ostringstream sb;
2168 auto now = std::chrono::steady_clock::now();
2169 sb <<
"Creating transaction context timed out because exception occurred "
2170 "after client invocation timeout "
2171 << std::chrono::duration_cast<std::chrono::milliseconds>(
2176 << util::StringUtil::time_to_string(std::chrono::steady_clock::now())
2178 <<
"Start time: " << util::StringUtil::time_to_string(start_time)
2179 <<
". Total elapsed time: "
2180 << std::chrono::duration_cast<std::chrono::milliseconds>(now -
2185 std::rethrow_exception(cause);
2188 std::throw_with_nested(boost::enable_current_exception(
2189 exception::operation_timeout(
"ClientTransactionManagerServiceImpl"
2190 "::newoperation_timeout_exception",
2193 return std::current_exception();
2200ClientTransactionManagerServiceImpl::throw_exception(
bool smart_routing)
2203 auto& connection_strategy_Config =
2206 if (reconnect_mode ==
2208 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
2209 "ClientTransactionManagerServiceImpl::throw_exception",
""));
2211 if (smart_routing) {
2212 auto members = client_.get_cluster().get_members();
2213 std::ostringstream msg;
2214 if (members.empty()) {
2215 msg <<
"No address was return by the LoadBalancer since there are "
2216 "no members in the cluster";
2218 msg <<
"No address was return by the LoadBalancer. "
2219 "But the cluster contains the following members:{\n";
2220 for (
auto const& m : members) {
2221 msg <<
'\t' << m <<
'\n';
2225 BOOST_THROW_EXCEPTION(exception::illegal_state(
2226 "ClientTransactionManagerServiceImpl::throw_exception", msg.str()));
2228 BOOST_THROW_EXCEPTION(exception::illegal_state(
2229 "ClientTransactionManagerServiceImpl::throw_exception",
2230 "No active connection is found"));
2233ClientPartitionServiceImpl::ClientPartitionServiceImpl(ClientContext& client)
2235 , logger_(client.get_logger())
2236 , partition_count_(0)
2238 boost::shared_ptr<partition_table>(new partition_table{ 0, -1 }))
2242ClientPartitionServiceImpl::handle_event(
2243 int32_t connection_id,
2245 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>>&
2252 "Handling new partition table with partitionStateVersion: %1%") %
2256 auto current = partition_table_.load();
2257 if (!should_be_applied(connection_id, version, partitions, *current)) {
2260 if (partition_table_.compare_exchange_strong(
2262 boost::shared_ptr<partition_table>(
new partition_table{
2263 connection_id, version, convert_to_map(partitions) }))) {
2269 "Applied partition table with partitionStateVersion : %1%") %
2277ClientPartitionServiceImpl::get_partition_owner(int32_t partition_id)
2279 auto table_ptr = partition_table_.load();
2280 auto it = table_ptr->partitions.find(partition_id);
2281 if (it != table_ptr->partitions.end()) {
2284 return boost::uuids::nil_uuid();
2288ClientPartitionServiceImpl::get_partition_id(
2289 const serialization::pimpl::data& key)
2291 int32_t pc = get_partition_count();
2295 int hash = key.get_partition_hash();
2296 return util::HashUtil::hash_to_index(hash, pc);
2300ClientPartitionServiceImpl::get_partition_count()
2302 return partition_count_.load();
2305std::shared_ptr<client::impl::Partition>
2306ClientPartitionServiceImpl::get_partition(
int partition_id)
2308 return std::shared_ptr<client::impl::Partition>(
2309 new PartitionImpl(partition_id, client_, *
this));
2313ClientPartitionServiceImpl::check_and_set_partition_count(
2314 int32_t new_partition_count)
2316 int32_t expected = 0;
2317 if (partition_count_.compare_exchange_strong(expected,
2318 new_partition_count)) {
2321 return partition_count_.load() == new_partition_count;
2325ClientPartitionServiceImpl::should_be_applied(
2326 int32_t connection_id,
2328 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>>&
2330 const partition_table& current)
2332 auto& lg = client_.get_logger();
2333 if (partitions.empty()) {
2334 if (logger_.enabled(logger::level::finest)) {
2335 log_failure(connection_id, version, current,
"response is empty");
2339 if (!current.connection_id || connection_id != current.connection_id) {
2341 lg, finest, ([¤t, connection_id]() {
2342 auto frmt = boost::format(
2343 "Event coming from a new connection. Old connection id: %1%, "
2344 "new connection %2%");
2346 if (current.connection_id) {
2347 frmt = frmt % current.connection_id;
2349 frmt = frmt %
"none";
2352 return boost::str(frmt % connection_id);
2357 if (version <= current.version) {
2358 if (lg.enabled(logger::level::finest)) {
2360 connection_id, version, current,
"response state version is old");
2368ClientPartitionServiceImpl::log_failure(
2369 int32_t connection_id,
2371 const ClientPartitionServiceImpl::partition_table& current,
2372 const std::string& cause)
2374 HZ_LOG(logger_, finest, [&]() {
2375 auto frmt = boost::format(
2376 " We will not apply the response, since %1% ."
2377 " Response is from connection with id %2%. "
2378 "Current connection id is %3%, response state version:%4%. "
2379 "Current state version: %5%");
2380 if (current.connection_id) {
2381 return boost::str(frmt % cause % connection_id %
2382 current.connection_id % version %
2385 return boost::str(frmt % cause % connection_id %
"nullptr" %
2386 version % current.version);
2392ClientPartitionServiceImpl::reset()
2394 partition_table_.store(
nullptr);
2397std::unordered_map<int32_t, boost::uuids::uuid>
2398ClientPartitionServiceImpl::convert_to_map(
2399 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>>&
2402 std::unordered_map<int32_t, boost::uuids::uuid> new_partitions;
2403 for (
auto const& e : partitions) {
2404 for (
auto pid : e.second) {
2405 new_partitions.insert({ pid, e.first });
2408 return new_partitions;
2412ClientPartitionServiceImpl::PartitionImpl::get_partition_id()
const
2414 return partition_id_;
2417boost::optional<member>
2418ClientPartitionServiceImpl::PartitionImpl::get_owner()
const
2420 auto owner = partition_service_.get_partition_owner(partition_id_);
2421 if (!owner.is_nil()) {
2422 return client_.get_client_cluster_service().get_member(owner);
2427ClientPartitionServiceImpl::PartitionImpl::PartitionImpl(
2429 ClientContext& client,
2430 ClientPartitionServiceImpl& partition_service)
2431 : partition_id_(partition_id)
2433 , partition_service_(partition_service)
2437CallIdSequenceWithoutBackpressure::CallIdSequenceWithoutBackpressure()
2441CallIdSequenceWithoutBackpressure::~CallIdSequenceWithoutBackpressure() =
2445CallIdSequenceWithoutBackpressure::get_max_concurrent_invocations()
const
2451CallIdSequenceWithoutBackpressure::next()
2453 return force_next();
2457CallIdSequenceWithoutBackpressure::force_next()
2463CallIdSequenceWithoutBackpressure::complete()
2469CallIdSequenceWithoutBackpressure::get_last_call_id()
2475AbstractCallIdSequence::AbstractCallIdSequence(
2476 int32_t max_concurrent_invocations)
2478 std::ostringstream out;
2479 out <<
"maxConcurrentInvocations should be a positive number. "
2480 "maxConcurrentInvocations="
2481 << max_concurrent_invocations;
2482 this->max_concurrent_invocations_ = util::Preconditions::check_positive(
2483 max_concurrent_invocations, out.str());
2485 for (
size_t i = 0; i < longs_.size(); ++i) {
2490AbstractCallIdSequence::~AbstractCallIdSequence() =
default;
2493AbstractCallIdSequence::get_max_concurrent_invocations()
const
2495 return max_concurrent_invocations_;
2499AbstractCallIdSequence::next()
2502 handle_no_space_left();
2504 return force_next();
2508AbstractCallIdSequence::force_next()
2510 return ++longs_[INDEX_HEAD];
2514AbstractCallIdSequence::complete()
2516 ++longs_[INDEX_TAIL];
2517 assert(longs_[INDEX_TAIL] <= longs_[INDEX_HEAD]);
2521AbstractCallIdSequence::get_last_call_id()
2523 return longs_[INDEX_HEAD];
2527AbstractCallIdSequence::has_space()
2529 return longs_[INDEX_HEAD] - longs_[INDEX_TAIL] <
2530 max_concurrent_invocations_;
2534AbstractCallIdSequence::get_tail()
2536 return longs_[INDEX_TAIL];
2539const std::unique_ptr<util::concurrent::IdleStrategy>
2540 CallIdSequenceWithBackpressure::IDLER(
2541 new util::concurrent::BackoffIdleStrategy(
2544 std::chrono::duration_cast<std::chrono::nanoseconds>(
2545 std::chrono::microseconds(1000))
2547 std::chrono::duration_cast<std::chrono::nanoseconds>(
2548 std::chrono::microseconds(MAX_DELAY_MS * 1000))
2551CallIdSequenceWithBackpressure::CallIdSequenceWithBackpressure(
2552 int32_t max_concurrent_invocations,
2553 int64_t backoff_timeout_ms)
2554 : AbstractCallIdSequence(max_concurrent_invocations)
2556 std::ostringstream out;
2557 out <<
"backoffTimeoutMs should be a positive number. backoffTimeoutMs="
2558 << backoff_timeout_ms;
2559 util::Preconditions::check_positive(backoff_timeout_ms, out.str());
2561 backoff_timeout_nanos_ =
2562 std::chrono::duration_cast<std::chrono::nanoseconds>(
2563 std::chrono::milliseconds(backoff_timeout_ms))
2568CallIdSequenceWithBackpressure::handle_no_space_left()
2570 auto start = std::chrono::steady_clock::now();
2571 for (int64_t idleCount = 0;; idleCount++) {
2572 int64_t elapsedNanos =
2573 std::chrono::duration_cast<std::chrono::nanoseconds>(
2574 std::chrono::steady_clock::now() - start)
2576 if (elapsedNanos > backoff_timeout_nanos_) {
2578 "CallIdSequenceWithBackpressure::handleNoSpaceLeft")
2579 <<
"Timed out trying to acquire another call ID."
2580 <<
" maxConcurrentInvocations = "
2581 << get_max_concurrent_invocations() <<
", backoffTimeout = "
2582 << std::chrono::microseconds(backoff_timeout_nanos_ / 1000)
2584 <<
" msecs, elapsed:"
2585 << std::chrono::microseconds(elapsedNanos / 1000).count()
2589 IDLER->idle(idleCount);
2596FailFastCallIdSequence::FailFastCallIdSequence(
2597 int32_t max_concurrent_invocations)
2598 : AbstractCallIdSequence(max_concurrent_invocations)
2602FailFastCallIdSequence::handle_no_space_left()
2605 "FailFastCallIdSequence::handleNoSpaceLeft")
2606 <<
"Maximum invocation count is reached. maxConcurrentInvocations = "
2607 << get_max_concurrent_invocations())
2611std::unique_ptr<CallIdSequence>
2612CallIdFactory::new_call_id_sequence(
bool is_back_pressure_enabled,
2613 int32_t max_allowed_concurrent_invocations,
2614 int64_t backoff_timeout_ms)
2616 if (!is_back_pressure_enabled) {
2617 return std::unique_ptr<CallIdSequence>(
2618 new CallIdSequenceWithoutBackpressure());
2619 }
else if (backoff_timeout_ms <= 0) {
2620 return std::unique_ptr<CallIdSequence>(
2621 new FailFastCallIdSequence(max_allowed_concurrent_invocations));
2623 return std::unique_ptr<CallIdSequence>(
2624 new CallIdSequenceWithBackpressure(max_allowed_concurrent_invocations,
2625 backoff_timeout_ms));
2631listener_service_impl::listener_service_impl(ClientContext& client_context,
2632 int32_t event_thread_count)
2633 : client_context_(client_context)
2634 , serialization_service_(client_context.get_serialization_service())
2635 , logger_(client_context.get_logger())
2636 , client_connection_manager_(client_context.get_connection_manager())
2637 , number_of_event_threads_(event_thread_count)
2638 , smart_(client_context.get_client_config()
2639 .get_network_config()
2640 .is_smart_routing())
2642 auto& invocationService = client_context.get_invocation_service();
2643 invocation_timeout_ = invocationService.get_invocation_timeout();
2644 invocation_retry_pause_ = invocationService.get_invocation_retry_pause();
2648listener_service_impl::registers_local_only()
const
2653boost::future<boost::uuids::uuid>
2654listener_service_impl::register_listener(
2655 std::shared_ptr<ListenerMessageCodec> listener_message_codec,
2656 std::shared_ptr<client::impl::BaseEventHandler> handler)
2658 auto task = boost::packaged_task<boost::uuids::uuid()>([=]() {
2659 return register_listener_internal(listener_message_codec, handler);
2661 auto f = task.get_future();
2662 boost::asio::post(registration_executor_->get_executor(), std::move(task));
2667listener_service_impl::deregister_listener(boost::uuids::uuid registration_id)
2669 util::Preconditions::check_not_nill(
2670 registration_id,
"Nil userRegistrationId is not allowed!");
2672 boost::packaged_task<bool()> task(
2673 [=]() {
return deregister_listener_internal(registration_id); });
2674 auto f = task.get_future();
2675 boost::asio::post(registration_executor_->get_executor(), std::move(task));
2680listener_service_impl::connection_added(
2681 const std::shared_ptr<connection::Connection> connection)
2683 boost::asio::post(registration_executor_->get_executor(),
2684 [=]() { connection_added_internal(connection); });
2688listener_service_impl::connection_removed(
2689 const std::shared_ptr<connection::Connection> connection)
2691 boost::asio::post(registration_executor_->get_executor(),
2692 [=]() { connection_removed_internal(connection); });
2696listener_service_impl::remove_event_handler(
2698 const std::shared_ptr<connection::Connection>& connection)
2700 boost::asio::post(connection->get_socket().get_executor(),
2701 std::packaged_task<
void()>(
2702 [=]() { connection->deregister_invocation(call_id); }));
2706listener_service_impl::handle_client_message(
2707 const std::shared_ptr<ClientInvocation> invocation,
2708 const std::shared_ptr<protocol::ClientMessage> response)
2711 auto partitionId = response->get_partition_id();
2712 if (partitionId == -1) {
2714 boost::asio::post(event_executor_->get_executor(), [=]() {
2715 process_event_message(invocation, response);
2722 event_strands_[partitionId % event_strands_.size()],
2723 [=]() { process_event_message(invocation, response); });
2725 }
catch (
const std::exception& e) {
2726 if (client_context_.get_lifecycle_service().is_running()) {
2730 boost::str(boost::format(
"Delivery of event message to event "
2731 "handler failed. %1%, %2%, %3%") %
2732 e.what() % *response % *invocation));
2738listener_service_impl::shutdown()
2740 ClientExecutionServiceImpl::shutdown_thread_pool(event_executor_.get());
2741 ClientExecutionServiceImpl::shutdown_thread_pool(
2742 registration_executor_.get());
2746listener_service_impl::start()
2748 event_executor_.reset(
2749 new hazelcast::util::hz_thread_pool(number_of_event_threads_));
2750 registration_executor_.reset(
new hazelcast::util::hz_thread_pool(1));
2752 for (
int i = 0; i < number_of_event_threads_; ++i) {
2753 event_strands_.emplace_back(event_executor_->get_executor());
2756 client_connection_manager_.add_connection_listener(shared_from_this());
2760listener_service_impl::register_listener_internal(
2761 std::shared_ptr<ListenerMessageCodec> listener_message_codec,
2762 std::shared_ptr<client::impl::BaseEventHandler> handler)
2764 auto user_registration_id = client_context_.random_uuid();
2766 std::shared_ptr<listener_registration> registration(
2767 new listener_registration{ listener_message_codec, handler });
2768 registrations_.put(user_registration_id, registration);
2769 for (
auto const& connection :
2770 client_connection_manager_.get_active_connections()) {
2772 invoke(registration, connection);
2774 if (connection->is_alive()) {
2775 deregister_listener_internal(user_registration_id);
2776 BOOST_THROW_EXCEPTION(
2778 "ClientListenerService::RegisterListenerTask::call")
2779 <<
"Listener can not be added " << e)
2784 return user_registration_id;
2788listener_service_impl::deregister_listener_internal(
2789 boost::uuids::uuid user_registration_id)
2791 auto listenerRegistration = registrations_.get(user_registration_id);
2792 if (!listenerRegistration) {
2795 bool successful =
true;
2797 auto listener_registrations =
2798 listenerRegistration->registrations.entry_set();
2799 for (
auto it = listener_registrations.begin();
2800 it != listener_registrations.end();) {
2801 const auto& registration = it->second;
2802 const auto& subscriber = it->first;
2804 const auto& listenerMessageCodec = listenerRegistration->codec;
2805 auto serverRegistrationId = registration->server_registration_id;
2807 listenerMessageCodec->encode_remove_request(serverRegistrationId);
2808 auto invocation = ClientInvocation::create(
2809 client_context_, request,
"", subscriber);
2810 invocation->invoke().get();
2812 remove_event_handler(registration->call_id, subscriber);
2814 it = listener_registrations.erase(it);
2817 if (subscriber->is_alive()) {
2820 if (subscriber->get_remote_address()) {
2821 endpoint << *subscriber->get_remote_address();
2829 "ClientListenerService::deregisterListenerInternal "
2830 "Deregistration of listener with ID %1% "
2831 "has failed to address %2% %3%") %
2832 user_registration_id %
2833 subscriber->get_remote_address() % e));
2838 registrations_.remove(user_registration_id);
2844listener_service_impl::connection_added_internal(
2845 const std::shared_ptr<connection::Connection>& connection)
2847 for (
const auto& listener_registration : registrations_.values()) {
2848 invoke_from_internal_thread(listener_registration, connection);
2853listener_service_impl::connection_removed_internal(
2854 const std::shared_ptr<connection::Connection>& connection)
2856 for (
auto& registry : registrations_.values()) {
2857 registry->registrations.remove(connection);
2862listener_service_impl::invoke_from_internal_thread(
2863 const std::shared_ptr<listener_registration>& listener_registration,
2864 const std::shared_ptr<connection::Connection>& connection)
2867 invoke(listener_registration, connection);
2872 boost::format(
"Listener with pointer %1% can not be added to "
2873 "a new connection: %2%, reason: %3%") %
2874 listener_registration.get() % *connection % e));
2879listener_service_impl::invoke(
2880 const std::shared_ptr<listener_registration>& listener_registration,
2881 const std::shared_ptr<connection::Connection>& connection)
2883 if (listener_registration->registrations.contains_key(connection)) {
2887 const auto& codec = listener_registration->codec;
2888 auto request = codec->encode_add_request(registers_local_only());
2889 const auto& handler = listener_registration->handler;
2890 handler->before_listener_register();
2892 auto invocation = ClientInvocation::create(
2894 std::make_shared<protocol::ClientMessage>(std::move(request)),
2897 invocation->set_event_handler(handler);
2898 auto clientMessage = invocation->invoke_urgent().get();
2900 auto serverRegistrationId = codec->decode_add_response(clientMessage);
2901 handler->on_listener_register();
2902 int64_t correlationId =
2903 invocation->get_client_message()->get_correlation_id();
2905 (*listener_registration)
2908 std::shared_ptr<connection_registration>(
2909 new connection_registration{ serverRegistrationId, correlationId }));
2913listener_service_impl::process_event_message(
2914 const std::shared_ptr<ClientInvocation> invocation,
2915 const std::shared_ptr<protocol::ClientMessage> response)
2917 auto eventHandler = invocation->get_event_handler();
2918 if (!eventHandler) {
2919 if (client_context_.get_lifecycle_service().is_running()) {
2923 boost::format(
"No eventHandler for invocation. "
2924 "Ignoring this invocation response. %1%") %
2932 eventHandler->handle(*response);
2933 }
catch (std::exception& e) {
2934 if (client_context_.get_lifecycle_service().is_running()) {
2938 boost::str(boost::format(
"Delivery of event message to event "
2939 "handler failed. %1%, %2%, %3%") %
2940 e.what() % *response % *invocation));
2945listener_service_impl::~listener_service_impl() =
default;
2948cluster_view_listener::start()
2950 client_context_.get_connection_manager().add_connection_listener(
2951 shared_from_this());
2955cluster_view_listener::connection_added(
2956 const std::shared_ptr<connection::Connection> connection)
2958 try_register(connection);
2962cluster_view_listener::connection_removed(
2963 const std::shared_ptr<connection::Connection> connection)
2965 try_reregister_to_random_connection(connection->get_connection_id());
2968cluster_view_listener::cluster_view_listener(ClientContext& client_context)
2969 : client_context_(client_context)
2973cluster_view_listener::try_register(
2974 std::shared_ptr<connection::Connection> connection)
2976 int32_t expected_id = -1;
2977 if (!listener_added_connection_id_.compare_exchange_strong(
2978 expected_id, connection->get_connection_id())) {
2983 auto invocation = ClientInvocation::create(
2985 std::make_shared<protocol::ClientMessage>(
2986 protocol::codec::client_addclusterviewlistener_encode()),
2990 auto handler = std::make_shared<event_handler>(
2991 connection->get_connection_id(), *
this, client_context_.get_logger());
2992 invocation->set_event_handler(handler);
2993 handler->before_listener_register();
2995 std::weak_ptr<cluster_view_listener> weak_self = shared_from_this();
2996 auto conn_id = connection->get_connection_id();
2998 invocation->invoke_urgent().then(
2999 boost::launch::sync,
3000 [weak_self, handler, conn_id](boost::future<protocol::ClientMessage> f) {
3001 auto self = weak_self.lock();
3005 if (f.has_value()) {
3006 handler->on_listener_register();
3012 }
catch (exception::hazelcast_client_not_active& e) {
3020 self->try_reregister_to_random_connection(conn_id);
3025cluster_view_listener::try_reregister_to_random_connection(
3026 int32_t old_connection_id)
3028 if (!listener_added_connection_id_.compare_exchange_strong(
3029 old_connection_id, -1)) {
3033 auto new_connection =
3034 client_context_.get_connection_manager().get_random_connection();
3035 if (new_connection) {
3036 try_register(new_connection);
3040cluster_view_listener::~cluster_view_listener() =
default;
3043cluster_view_listener::event_handler::handle_membersview(
3045 const std::vector<member>& member_infos)
3047 view_listener.client_context_.get_client_cluster_service().handle_event(
3048 version, member_infos);
3052cluster_view_listener::event_handler::handle_partitionsview(
3054 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>>&
3057 view_listener.client_context_.get_partition_service().handle_event(
3058 connection_id, version, partitions);
3062cluster_view_listener::event_handler::before_listener_register()
3064 view_listener.client_context_.get_client_cluster_service()
3065 .clear_member_list_version();
3066 auto& lg = view_listener.client_context_.get_logger();
3070 boost::str(boost::format(
3071 "Register attempt of cluster_view_listener::event_handler "
3072 "to connection with id %1%") %
3077cluster_view_listener::event_handler::on_listener_register()
3079 auto& lg = view_listener.client_context_.get_logger();
3083 boost::format(
"Registered cluster_view_listener::event_handler to "
3084 "connection with id %1%") %
3088cluster_view_listener::event_handler::event_handler(
3090 cluster_view_listener& viewListener,
3092 : protocol::codec::client_addclusterviewlistener_handler(logger)
3093 , connection_id(connectionId)
3094 , view_listener(viewListener)
3098protocol::ClientMessage
3099ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_add_request(
3102 return protocol::codec::client_localbackuplistener_encode();
3105protocol::ClientMessage
3106ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_remove_request(
3107 boost::uuids::uuid )
const
3110 return protocol::ClientMessage(0);
3114ClientInvocationServiceImpl::noop_backup_event_handler::handle_backup(
3119ClientInvocationServiceImpl::noop_backup_event_handler::
3120 noop_backup_event_handler(
logger& l)
3121 : client_localbackuplistener_handler(l)
3125namespace discovery {
3126remote_address_provider::remote_address_provider(
3127 std::function<std::unordered_map<address, address>()> addr_map_method,
3129 : refresh_address_map_(std::move(addr_map_method))
3130 , use_public_(use_public)
3134remote_address_provider::load_addresses()
3136 auto address_map = refresh_address_map_();
3137 std::lock_guard<std::mutex> guard(lock_);
3138 private_to_public_ = address_map;
3139 std::vector<address> addresses;
3140 addresses.reserve(address_map.size());
3141 for (
const auto& addr_pair : address_map) {
3142 addresses.push_back(addr_pair.first);
3147boost::optional<address>
3148remote_address_provider::translate(
const address& addr)
3157 std::lock_guard<std::mutex> guard(lock_);
3158 auto found = private_to_public_.find(addr);
3159 if (found != private_to_public_.end()) {
3160 return found->second;
3164 auto address_map = refresh_address_map_();
3166 std::lock_guard<std::mutex> guard(lock_);
3167 private_to_public_ = address_map;
3169 auto found = private_to_public_.find(addr);
3170 if (found != private_to_public_.end()) {
3171 return found->second;
3177#ifdef HZ_BUILD_WITH_SSL
3179 std::string cloud_base_url,
3180 std::chrono::steady_clock::duration timeout)
3181 : cloud_config_(config)
3182 , cloud_base_url_(cloud_base_url)
3188 std::chrono::steady_clock::duration )
3192std::unordered_map<address, address>
3193cloud_discovery::get_addresses()
3195#ifdef HZ_BUILD_WITH_SSL
3197 util::SyncHttpsClient httpsConnection(cloud_base_url_,
3198 std::string(CLOUD_URL_PATH) +
3199 cloud_config_.discovery_token,
3201 cloud_config_.discovery_token);
3202 auto& conn_stream = httpsConnection.connect_and_get_response();
3203 return parse_json_response(conn_stream);
3204 }
catch (std::exception& e) {
3205 std::throw_with_nested(
3206 boost::enable_current_exception(exception::illegal_state(
3207 "cloud_discovery::get_addresses", e.what())));
3210 util::Preconditions::check_ssl(
"cloud_discovery::get_addresses");
3211 return std::unordered_map<address, address>();
3215std::unordered_map<address, address>
3216cloud_discovery::parse_json_response(std::istream& conn_stream)
3218 namespace pt = boost::property_tree;
3221 pt::read_json(conn_stream, root);
3223 std::unordered_map<address, address> addresses;
3224 for (
const auto& item : root) {
3225 auto private_address =
3226 item.second.get<std::string>(PRIVATE_ADDRESS_PROPERTY);
3227 auto public_address =
3228 item.second.get<std::string>(PUBLIC_ADDRESS_PROPERTY);
3230 address public_addr = create_address(public_address, -1);
3234 create_address(private_address, public_addr.get_port());
3235 addresses.emplace(std::move(private_addr), std::move(public_addr));
3242cloud_discovery::create_address(
const std::string& hostname,
int default_port)
3244 auto address_holder =
3245 util::AddressUtil::get_address_holder(hostname, default_port);
3246 auto scoped_hostname =
3247 util::AddressHelper::get_scoped_hostname(address_holder);
3248 return address(std::move(scoped_hostname), address_holder.get_port());
3258less<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
3259 const hazelcast::client::spi::DefaultObjectNamespace& lhs,
3260 const hazelcast::client::spi::DefaultObjectNamespace& rhs)
const
3262 int result = lhs.get_service_name().compare(rhs.get_service_name());
3271 return lhs.get_object_name().compare(rhs.get_object_name()) < 0;
3275hash<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
3276 const hazelcast::client::spi::DefaultObjectNamespace& k)
const noexcept
3278 return std::hash<std::string>()(k.get_service_name() + k.get_object_name());
Represents an address of a client or member in the cluster.
hazelcast_client configuration class.
config::client_network_config & get_network_config()
Gets {}.
Client Properties is an internal class.
Hazelcast cluster interface.
reconnect_mode get_reconnect_mode() const
@ ASYNC
Reconnect to cluster without blocking invocations.
Contains configuration parameters for client network related behaviour.
bool is_smart_routing() const
See client_network_config::setSmartRouting(boolean) for details.
Endpoint represents a peer in the cluster.
Base class for all exception originated from Hazelcast methods.
A event that is sent when a MembershipListener is registered.
const std::unordered_set< member > & get_members() const
Returns an immutable set of ordered members at the moment this MembershipListener is registered.
cluster & get_cluster()
Returns the cluster of the event.
Event to be fired when lifecycle states are changed.
lifecycle_state get_state() const
lifecycle_state
State enum.
lifecycle_event(lifecycle_state state)
Constructor.
Listener object for listening lifecycle events of hazelcast instance.
The Client interface allows to get information about a connected client's socket address,...
Cluster membership listener.
hazelcast.cloud configuration to let the client connect the cluster via hazelcast....