19 #include <boost/uuid/uuid_hash.hpp>
20 #include <boost/functional/hash.hpp>
21 #include <boost/property_tree/ptree.hpp>
22 #include <boost/property_tree/json_parser.hpp>
24 #include "hazelcast/client/hazelcast_client.h"
25 #include <hazelcast/client/protocol/codec/ErrorCodec.h>
26 #include <hazelcast/client/spi/impl/ListenerMessageCodec.h>
27 #include <hazelcast/client/spi/impl/ClientClusterServiceImpl.h>
28 #include <hazelcast/client/spi/impl/listener/cluster_view_listener.h>
29 #include <hazelcast/client/spi/impl/listener/listener_service_impl.h>
30 #include <hazelcast/client/spi/impl/discovery/remote_address_provider.h>
31 #include <hazelcast/client/spi/impl/discovery/cloud_discovery.h>
32 #include <hazelcast/util/AddressUtil.h>
33 #include "hazelcast/client/member_selectors.h"
34 #include "hazelcast/client/lifecycle_event.h"
35 #include "hazelcast/client/initial_membership_event.h"
36 #include "hazelcast/client/membership_event.h"
37 #include "hazelcast/client/lifecycle_listener.h"
38 #include "hazelcast/client/spi/ProxyManager.h"
39 #include "hazelcast/client/spi/ClientProxy.h"
40 #include "hazelcast/client/spi/ClientContext.h"
41 #include "hazelcast/client/spi/impl/ClientInvocation.h"
42 #include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
43 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
44 #include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
45 #include "hazelcast/client/spi/impl/DefaultAddressProvider.h"
46 #include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithBackpressure.h"
47 #include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithoutBackpressure.h"
48 #include "hazelcast/client/spi/impl/sequence/FailFastCallIdSequence.h"
49 #include "hazelcast/client/spi/impl/sequence/CallIdFactory.h"
50 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
51 #include "hazelcast/util/AddressHelper.h"
52 #include "hazelcast/util/HashUtil.h"
53 #include "hazelcast/util/concurrent/BackoffIdleStrategy.h"
54 #ifdef HZ_BUILD_WITH_SSL
55 #include <hazelcast/util/SyncHttpsClient.h>
68 initial_membership_event::initial_membership_event(
cluster &
cluster, std::unordered_set<member> members) : cluster_(
69 cluster), members_(std::move(members)) {
81 ProxyManager::ProxyManager(ClientContext &context) : client_(context) {
84 void ProxyManager::init() {
87 void ProxyManager::destroy() {
88 std::lock_guard<std::recursive_mutex> guard(lock_);
89 for (
auto &p : proxies_) {
91 auto proxy = p.second.get();
92 p.second.get()->on_shutdown();
93 }
catch (std::exception &se) {
94 auto &lg = client_.get_logger();
96 boost::str(boost::format(
"Proxy was not created, "
97 "hence onShutdown can be called. Exception: %1%")
105 boost::future<void> ProxyManager::initialize(
const std::shared_ptr<ClientProxy> &client_proxy) {
106 auto clientMessage = protocol::codec::client_createproxy_encode(client_proxy->get_name(),
107 client_proxy->get_service_name());
108 return spi::impl::ClientInvocation::create(client_, clientMessage,
109 client_proxy->get_service_name())->invoke().then(
110 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
112 client_proxy->on_initialize();
116 boost::future<void> ProxyManager::destroy_proxy(ClientProxy &proxy) {
117 DefaultObjectNamespace objectNamespace(proxy.get_service_name(), proxy.get_name());
118 std::shared_ptr<ClientProxy> registeredProxy;
120 std::lock_guard<std::recursive_mutex> guard(lock_);
121 auto it = proxies_.find(objectNamespace);
122 registeredProxy = it == proxies_.end() ? nullptr : it->second.get();
123 if (it != proxies_.end()) {
129 if (registeredProxy) {
131 proxy.destroy_locally();
132 return proxy.destroy_remotely();
133 }
catch (exception::iexception &) {
134 proxy.destroy_remotely();
138 if (&proxy != registeredProxy.get()) {
143 proxy.destroy_locally();
146 if (&proxy != registeredProxy.get()) {
151 proxy.destroy_locally();
155 return boost::make_ready_future();
158 ClientContext::ClientContext(
const client::hazelcast_client &hazelcast_client) : hazelcast_client_(
159 *hazelcast_client.client_impl_) {
162 ClientContext::ClientContext(client::impl::hazelcast_client_instance_impl &hazelcast_client)
163 : hazelcast_client_(hazelcast_client) {
166 serialization::pimpl::SerializationService &ClientContext::get_serialization_service() {
167 return hazelcast_client_.serialization_service_;
170 impl::ClientClusterServiceImpl & ClientContext::get_client_cluster_service() {
171 return hazelcast_client_.cluster_service_;
174 impl::ClientInvocationServiceImpl &ClientContext::get_invocation_service() {
175 return *hazelcast_client_.invocation_service_;
178 client_config &ClientContext::get_client_config() {
179 return hazelcast_client_.client_config_;
182 impl::ClientPartitionServiceImpl & ClientContext::get_partition_service() {
183 return *hazelcast_client_.partition_service_;
186 lifecycle_service &ClientContext::get_lifecycle_service() {
187 return hazelcast_client_.lifecycle_service_;
190 spi::impl::listener::listener_service_impl &ClientContext::get_client_listener_service() {
191 return *hazelcast_client_.listener_service_;
194 connection::ClientConnectionManagerImpl &ClientContext::get_connection_manager() {
195 return *hazelcast_client_.connection_manager_;
198 internal::nearcache::NearCacheManager &ClientContext::get_near_cache_manager() {
199 return *hazelcast_client_.near_cache_manager_;
202 client_properties &ClientContext::get_client_properties() {
203 return hazelcast_client_.client_properties_;
206 cluster &ClientContext::get_cluster() {
207 return hazelcast_client_.cluster_;
210 std::shared_ptr<impl::sequence::CallIdSequence> &ClientContext::get_call_id_sequence()
const {
211 return hazelcast_client_.call_id_sequence_;
214 const protocol::ClientExceptionFactory &ClientContext::get_client_exception_factory()
const {
215 return hazelcast_client_.get_exception_factory();
218 const std::string &ClientContext::get_name()
const {
219 return hazelcast_client_.get_name();
222 impl::ClientExecutionServiceImpl &ClientContext::get_client_execution_service()
const {
223 return *hazelcast_client_.execution_service_;
226 const std::shared_ptr<client::impl::ClientLockReferenceIdGenerator> &
227 ClientContext::get_lock_reference_id_generator() {
228 return hazelcast_client_.get_lock_reference_id_generator();
231 std::shared_ptr<client::impl::hazelcast_client_instance_impl>
232 ClientContext::get_hazelcast_client_implementation() {
233 return hazelcast_client_.shared_from_this();
236 spi::ProxyManager &ClientContext::get_proxy_manager() {
237 return hazelcast_client_.get_proxy_manager();
240 logger &ClientContext::get_logger() {
241 return *hazelcast_client_.logger_;
244 client::impl::statistics::Statistics &ClientContext::get_clientstatistics() {
245 return *hazelcast_client_.statistics_;
248 spi::impl::listener::cluster_view_listener &ClientContext::get_cluster_view_listener() {
249 return *hazelcast_client_.cluster_listener_;
252 boost::uuids::uuid ClientContext::random_uuid() {
253 return hazelcast_client_.random_uuid();
256 cp::internal::session::proxy_session_manager &ClientContext::get_proxy_session_manager() {
257 return hazelcast_client_.proxy_session_manager_;
260 lifecycle_service::lifecycle_service(ClientContext &client_context,
261 const std::vector<lifecycle_listener> &listeners) :
262 client_context_(client_context), listeners_(),
263 shutdown_completed_latch_(1) {
264 for (
const auto &listener: listeners) {
265 add_listener(lifecycle_listener(listener));
269 bool lifecycle_service::start() {
270 bool expected =
false;
271 if (!active_.compare_exchange_strong(expected,
true)) {
275 fire_lifecycle_event(lifecycle_event::STARTED);
277 client_context_.get_client_execution_service().start();
279 client_context_.get_client_listener_service().start();
281 client_context_.get_invocation_service().start();
283 client_context_.get_client_cluster_service().start();
285 client_context_.get_cluster_view_listener().start();
287 if (!client_context_.get_connection_manager().start()) {
291 auto &connectionStrategyConfig = client_context_.get_client_config().get_connection_strategy_config();
292 if (!connectionStrategyConfig.is_async_start()) {
294 wait_for_initial_membership_event();
295 client_context_.get_connection_manager().connect_to_all_cluster_members();
298 client_context_.get_invocation_service().add_backup_listener();
300 client_context_.get_clientstatistics().start();
305 void lifecycle_service::shutdown() {
306 bool expected =
true;
307 if (!active_.compare_exchange_strong(expected,
false)) {
308 shutdown_completed_latch_.wait();
312 fire_lifecycle_event(lifecycle_event::SHUTTING_DOWN);
313 client_context_.get_proxy_session_manager().shutdown();
314 client_context_.get_clientstatistics().shutdown();
315 client_context_.get_proxy_manager().destroy();
316 client_context_.get_connection_manager().shutdown();
317 client_context_.get_client_cluster_service().shutdown();
318 client_context_.get_invocation_service().shutdown();
319 client_context_.get_client_listener_service().shutdown();
320 client_context_.get_near_cache_manager().destroy_all_near_caches();
321 fire_lifecycle_event(lifecycle_event::SHUTDOWN);
322 client_context_.get_client_execution_service().shutdown();
323 client_context_.get_serialization_service().dispose();
324 shutdown_completed_latch_.count_down();
325 }
catch (std::exception &e) {
326 HZ_LOG(client_context_.get_logger(), info,
327 boost::str(boost::format(
"An exception occured during LifecycleService shutdown. %1%")
330 shutdown_completed_latch_.count_down();
334 boost::uuids::uuid lifecycle_service::add_listener(lifecycle_listener &&lifecycle_listener) {
335 std::lock_guard<std::mutex> lg(listener_lock_);
336 const auto id = uuid_generator_();
337 listeners_.emplace(
id, std::move(lifecycle_listener));
341 bool lifecycle_service::remove_listener(
const boost::uuids::uuid ®istration_id) {
342 std::lock_guard<std::mutex> guard(listener_lock_);
343 return listeners_.erase(registration_id) == 1;
346 void lifecycle_service::fire_lifecycle_event(
const lifecycle_event &lifecycle_event) {
347 std::lock_guard<std::mutex> guard(listener_lock_);
348 logger &lg = client_context_.get_logger();
350 std::function<void(lifecycle_listener &)> fire_one;
352 switch (lifecycle_event.get_state()) {
353 case lifecycle_event::STARTING : {
355 std::string date(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_DATE));
356 util::git_date_to_hazelcast_log_date(date);
357 std::string commitId(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_ID));
358 commitId.erase(std::remove(commitId.begin(), commitId.end(),
'"'), commitId.end());
361 (boost::format(
"(%1%:%2%) LifecycleService::LifecycleEvent Client (%3%) is STARTING") %
362 date % commitId % client_context_.get_connection_manager().get_client_uuid()).str());
364 util::hz_snprintf(msg, 100,
"(%s:%s) LifecycleService::LifecycleEvent STARTING", date.c_str(),
366 HZ_LOG(lg, info, msg);
368 fire_one = [](lifecycle_listener &listener) {
369 listener.starting_();
373 case lifecycle_event::STARTED : {
374 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent STARTED");
376 fire_one = [](lifecycle_listener &listener) {
381 case lifecycle_event::SHUTTING_DOWN : {
382 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent SHUTTING_DOWN");
384 fire_one = [](lifecycle_listener &listener) {
385 listener.shutting_down_();
389 case lifecycle_event::SHUTDOWN : {
390 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent SHUTDOWN");
392 fire_one = [](lifecycle_listener &listener) {
393 listener.shutdown_();
397 case lifecycle_event::CLIENT_CONNECTED : {
398 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent CLIENT_CONNECTED");
400 fire_one = [](lifecycle_listener &listener) {
401 listener.connected_();
405 case lifecycle_event::CLIENT_DISCONNECTED : {
406 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent CLIENT_DISCONNECTED");
408 fire_one = [](lifecycle_listener &listener) {
409 listener.disconnected_();
415 for (
auto &item: listeners_) {
416 fire_one(item.second);
420 bool lifecycle_service::is_running() {
424 lifecycle_service::~lifecycle_service() {
430 void lifecycle_service::wait_for_initial_membership_event()
const {
431 client_context_.get_client_cluster_service().wait_initial_member_list_fetched();
434 DefaultObjectNamespace::DefaultObjectNamespace(
const std::string &service,
const std::string &
object)
435 : service_name_(service), object_name_(object) {
439 const std::string &DefaultObjectNamespace::get_service_name()
const {
440 return service_name_;
443 const std::string &DefaultObjectNamespace::get_object_name()
const {
447 bool DefaultObjectNamespace::operator==(
const DefaultObjectNamespace &rhs)
const {
448 return service_name_ == rhs.service_name_ && object_name_ == rhs.object_name_;
451 ClientProxy::ClientProxy(
const std::string &name,
const std::string &service_name, ClientContext &context)
452 : name_(name), service_name_(service_name), context_(context) {}
454 ClientProxy::~ClientProxy() =
default;
456 const std::string &ClientProxy::get_name()
const {
460 const std::string &ClientProxy::get_service_name()
const {
461 return service_name_;
464 ClientContext &ClientProxy::get_context() {
468 void ClientProxy::on_destroy() {
471 boost::future<void> ClientProxy::destroy() {
472 return get_context().get_proxy_manager().destroy_proxy(*
this);
475 void ClientProxy::destroy_locally() {
480 }
catch (exception::iexception &) {
487 bool ClientProxy::pre_destroy() {
491 void ClientProxy::post_destroy() {
494 void ClientProxy::on_initialize() {
497 void ClientProxy::on_shutdown() {
500 serialization::pimpl::SerializationService &ClientProxy::get_serialization_service() {
501 return context_.get_serialization_service();
504 boost::future<void> ClientProxy::destroy_remotely() {
505 auto clientMessage = protocol::codec::client_destroyproxy_encode(
506 get_name(), get_service_name());
507 return spi::impl::ClientInvocation::create(get_context(), std::make_shared<protocol::ClientMessage>(
508 std::move(clientMessage)), get_name())->invoke().then(
509 boost::launch::sync, [](boost::future<protocol::ClientMessage> f) { f.get(); });
512 boost::future<boost::uuids::uuid>
513 ClientProxy::register_listener(std::shared_ptr<impl::ListenerMessageCodec> listener_message_codec,
514 std::shared_ptr<client::impl::BaseEventHandler> handler) {
515 handler->set_logger(&get_context().get_logger());
516 return get_context().get_client_listener_service().register_listener(listener_message_codec,
520 boost::future<bool> ClientProxy::deregister_listener(boost::uuids::uuid registration_id) {
521 return get_context().get_client_listener_service().deregister_listener(registration_id);
526 ListenerMessageCodec::decode_add_response(protocol::ClientMessage &msg)
const {
527 return msg.get_first_uuid();
530 bool ListenerMessageCodec::decode_remove_response(protocol::ClientMessage &msg)
const {
531 return msg.get_first_fixed_sized_field<
bool>();
534 ClientInvocationServiceImpl::ClientInvocationServiceImpl(ClientContext &client)
535 : client_(client), logger_(client.get_logger()),
536 invocation_timeout_(std::chrono::seconds(client.get_client_properties().get_integer(
537 client.get_client_properties().get_invocation_timeout_seconds()))),
538 invocation_retry_pause_(std::chrono::milliseconds(client.get_client_properties().get_long(
539 client.get_client_properties().get_invocation_retry_pause_millis()))),
540 smart_routing_(client.get_client_config().get_network_config().is_smart_routing()),
541 backup_acks_enabled_(smart_routing_ && client.get_client_config().backup_acks_enabled()),
542 fail_on_indeterminate_operation_state_(client.get_client_properties().get_boolean(client.get_client_properties().fail_on_indeterminate_state())),
543 backup_timeout_(std::chrono::milliseconds(client.get_client_properties().get_integer(client.get_client_properties().backup_timeout_millis()))) {}
545 void ClientInvocationServiceImpl::start() {
548 void ClientInvocationServiceImpl::add_backup_listener() {
549 if (this->backup_acks_enabled_) {
550 auto &listener_service = this->client_.get_client_listener_service();
551 listener_service.register_listener(std::make_shared<BackupListenerMessageCodec>(),
552 std::make_shared<noop_backup_event_handler>()).get();
556 void ClientInvocationServiceImpl::shutdown() {
557 is_shutdown_.store(
true);
560 std::chrono::milliseconds ClientInvocationServiceImpl::get_invocation_timeout()
const {
561 return invocation_timeout_;
564 std::chrono::milliseconds ClientInvocationServiceImpl::get_invocation_retry_pause()
const {
565 return invocation_retry_pause_;
568 bool ClientInvocationServiceImpl::is_redo_operation() {
569 return client_.get_client_config().is_redo_operation();
573 ClientInvocationServiceImpl::handle_client_message(
const std::shared_ptr<ClientInvocation> &invocation,
574 const std::shared_ptr<protocol::ClientMessage> &response) {
576 if (protocol::codec::ErrorCodec::EXCEPTION_MESSAGE_TYPE == response->get_message_type()) {
577 auto error_holder = protocol::codec::ErrorCodec::decode(*response);
578 invocation->notify_exception(
579 client_.get_client_exception_factory().create_exception(error_holder));
581 invocation->notify(response);
583 }
catch (std::exception &e) {
584 HZ_LOG(logger_, severe,
585 boost::str(boost::format(
"Failed to process response for %1%. %2%")
586 % *invocation % e.what())
591 bool ClientInvocationServiceImpl::send(
const std::shared_ptr<impl::ClientInvocation>& invocation,
592 const std::shared_ptr<connection::Connection>& connection) {
594 BOOST_THROW_EXCEPTION(
595 exception::hazelcast_client_not_active(
"ClientInvocationServiceImpl::send",
596 "Client is shut down"));
599 if (backup_acks_enabled_) {
600 invocation->get_client_message()->add_flag(protocol::ClientMessage::BACKUP_AWARE_FLAG);
603 write_to_connection(*connection, invocation);
604 invocation->set_send_connection(connection);
608 void ClientInvocationServiceImpl::write_to_connection(connection::Connection &connection,
609 const std::shared_ptr<ClientInvocation> &client_invocation) {
610 auto clientMessage = client_invocation->get_client_message();
611 connection.write(client_invocation);
614 void ClientInvocationServiceImpl::check_invocation_allowed() {
615 client_.get_connection_manager().check_invocation_allowed();
618 bool ClientInvocationServiceImpl::invoke(std::shared_ptr<ClientInvocation> invocation) {
619 auto connection = client_.get_connection_manager().get_random_connection();
621 HZ_LOG(logger_, finest,
"No connection found to invoke");
624 return send(invocation, connection);
627 DefaultAddressProvider::DefaultAddressProvider(config::client_network_config &network_config)
628 : network_config_(network_config) {}
630 std::vector<address> DefaultAddressProvider::load_addresses() {
631 std::vector<address> addresses = network_config_.get_addresses();
632 if (addresses.empty()) {
633 addresses.emplace_back(
"127.0.0.1", 5701);
641 boost::optional<address> DefaultAddressProvider::translate(
const address &addr) {
645 bool DefaultAddressProvider::is_default_provider() {
649 const boost::shared_ptr<ClientClusterServiceImpl::member_list_snapshot> ClientClusterServiceImpl::EMPTY_SNAPSHOT(
650 new ClientClusterServiceImpl::member_list_snapshot{-1});
652 constexpr boost::chrono::milliseconds ClientClusterServiceImpl::INITIAL_MEMBERS_TIMEOUT;
653 const endpoint_qualifier ClientClusterServiceImpl::CLIENT{1,
""};
654 const endpoint_qualifier ClientClusterServiceImpl::MEMBER{0,
""};
656 ClientClusterServiceImpl::ClientClusterServiceImpl(hazelcast::client::spi::ClientContext &client)
657 : client_(client), member_list_snapshot_(EMPTY_SNAPSHOT),
658 labels_(client.get_client_config().get_labels()),
659 initial_list_fetched_latch_(1) {
662 boost::uuids::uuid ClientClusterServiceImpl::add_membership_listener_without_init(
663 membership_listener &&listener) {
664 std::lock_guard<std::mutex> g(listeners_lock_);
665 auto id = client_.random_uuid();
666 listeners_.emplace(
id, std::move(listener));
670 boost::optional<member> ClientClusterServiceImpl::get_member(boost::uuids::uuid uuid)
const {
671 assert(!uuid.is_nil());
672 auto members_view_ptr = member_list_snapshot_.load();
673 const auto it = members_view_ptr->members.find(uuid);
674 if (it == members_view_ptr->members.end()) {
680 std::vector<member> ClientClusterServiceImpl::get_member_list()
const {
681 auto members_view_ptr = member_list_snapshot_.load();
682 std::vector<member> result;
683 result.reserve(members_view_ptr->members.size());
684 for (
const auto &e : members_view_ptr->members) {
685 result.emplace_back(e.second);
690 void ClientClusterServiceImpl::start() {
691 for (
auto &listener : client_.get_client_config().get_membership_listeners()) {
692 add_membership_listener(membership_listener(listener));
696 void ClientClusterServiceImpl::fire_initial_membership_event(
const initial_membership_event &event) {
697 std::lock_guard<std::mutex> g(listeners_lock_);
699 for (
auto &item : listeners_) {
700 membership_listener &listener = item.second;
701 if (listener.init_) {
702 listener.init_(event);
707 void ClientClusterServiceImpl::shutdown() {
708 initial_list_fetched_latch_.try_count_down();
712 ClientClusterServiceImpl::add_membership_listener(membership_listener &&listener) {
713 std::lock_guard<std::mutex> cluster_view_g(cluster_view_lock_);
715 auto id = add_membership_listener_without_init(std::move(listener));
717 std::lock_guard<std::mutex> listeners_g(listeners_lock_);
718 auto added_listener = listeners_[id];
720 if (added_listener.init_) {
721 auto &cluster = client_.get_cluster();
722 auto members_ptr = member_list_snapshot_.load();
723 if (!members_ptr->members.empty()) {
724 std::unordered_set<member> members;
725 for (
const auto &e : members_ptr->members) {
726 members.insert(e.second);
728 added_listener.init_(initial_membership_event(cluster, members));
735 bool ClientClusterServiceImpl::remove_membership_listener(boost::uuids::uuid registration_id) {
736 std::lock_guard<std::mutex> g(listeners_lock_);
737 return listeners_.erase(registration_id) == 1;
741 ClientClusterServiceImpl::get_members(
const member_selector &selector)
const {
742 std::vector<member> result;
743 for (
auto &&member : get_member_list()) {
744 if (selector.select(member)) {
745 result.emplace_back(std::move(member));
752 local_endpoint ClientClusterServiceImpl::get_local_client()
const {
753 connection::ClientConnectionManagerImpl &cm = client_.get_connection_manager();
754 auto connection = cm.get_random_connection();
755 auto inetSocketAddress = connection ? connection->get_local_socket_address() : boost::none;
756 auto uuid = cm.get_client_uuid();
757 return local_endpoint(uuid, std::move(inetSocketAddress), client_.get_name(), labels_);
760 void ClientClusterServiceImpl::clear_member_list_version() {
761 std::lock_guard<std::mutex> g(cluster_view_lock_);
762 auto &lg = client_.get_logger();
763 HZ_LOG(lg, finest,
"Resetting the member list version ");
764 auto cluster_view_snapshot = member_list_snapshot_.load();
767 if (cluster_view_snapshot != EMPTY_SNAPSHOT) {
768 member_list_snapshot_.store(boost::shared_ptr<member_list_snapshot>(
769 new member_list_snapshot{0, cluster_view_snapshot->members}));
773 std::vector<membership_event> ClientClusterServiceImpl::clear_member_list_and_return_events() {
774 std::lock_guard<std::mutex> g(cluster_view_lock_);
776 auto &lg = client_.get_logger();
777 HZ_LOG(lg, finest,
"Resetting the member list");
779 auto previous_list = member_list_snapshot_.load()->members;
781 member_list_snapshot_.store(
782 boost::shared_ptr<member_list_snapshot>(
new member_list_snapshot{0}));
784 return detect_membership_events(previous_list,
785 std::unordered_map<boost::uuids::uuid, member, boost::hash<boost::uuids::uuid>>());
788 void ClientClusterServiceImpl::clear_member_list() {
789 auto events = clear_member_list_and_return_events();
790 fire_events(std::move(events));
794 ClientClusterServiceImpl::handle_event(int32_t version,
const std::vector<member> &member_infos) {
795 auto &lg = client_.get_logger();
797 boost::str(boost::format(
"Handling new snapshot with membership version: %1%, "
800 % members_string(create_snapshot(version, member_infos)))
802 auto cluster_view_snapshot = member_list_snapshot_.load();
803 if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
804 std::lock_guard<std::mutex> g(cluster_view_lock_);
805 cluster_view_snapshot = member_list_snapshot_.load();
806 if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
808 apply_initial_state(version, member_infos);
809 initial_list_fetched_latch_.count_down();
814 std::vector<membership_event> events;
815 if (version >= cluster_view_snapshot->version) {
816 std::lock_guard<std::mutex> g(cluster_view_lock_);
817 cluster_view_snapshot = member_list_snapshot_.load();
818 if (version >= cluster_view_snapshot->version) {
819 auto prev_members = cluster_view_snapshot->members;
820 auto snapshot = boost::make_shared<member_list_snapshot>(create_snapshot(version, member_infos));
821 member_list_snapshot_.store(snapshot);
822 events = detect_membership_events(prev_members, snapshot->members);
826 fire_events(std::move(events));
829 ClientClusterServiceImpl::member_list_snapshot
830 ClientClusterServiceImpl::create_snapshot(int32_t version,
const std::vector<member> &members) {
831 member_list_snapshot result;
832 result.version = version;
833 for (
auto &m : members) {
834 auto const &address_map = m.address_map();
835 if (address_map.empty()) {
836 result.members.insert({m.get_uuid(), m});
838 auto found = address_map.find(CLIENT);
839 address member_address;
840 if (found != address_map.end()) {
841 member_address = found->second;
843 found = address_map.find(MEMBER);
844 assert(found != address_map.end());
845 member_address = found->second;
847 member new_member(member_address, m.get_uuid(), m.is_lite_member(), m.get_attributes(), m.address_map());
848 result.members.emplace(new_member.get_uuid(), std::move(new_member));
856 ClientClusterServiceImpl::members_string(
const ClientClusterServiceImpl::member_list_snapshot& snapshot) {
857 std::stringstream out;
858 auto const &members = snapshot.members;
859 out << std::endl << std::endl <<
"Members [" << members.size() <<
"] {";
860 for (
auto const &e : members) {
861 out << std::endl <<
"\t" << e.second;
863 out << std::endl <<
"}" << std::endl;
868 ClientClusterServiceImpl::apply_initial_state(int32_t version,
const std::vector<member> &member_infos) {
869 auto snapshot = boost::make_shared<member_list_snapshot>(create_snapshot(version, member_infos));
870 member_list_snapshot_.store(snapshot);
871 HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
872 std::unordered_set<member> members;
873 for(
auto const &e : snapshot->members) {
874 members.insert(e.second);
876 std::lock_guard<std::mutex> g(listeners_lock_);
877 for (
auto &item : listeners_) {
878 membership_listener &listener = item.second;
879 if (listener.init_) {
880 listener.init_(initial_membership_event(client_.get_cluster(), members));
885 std::vector<membership_event> ClientClusterServiceImpl::detect_membership_events(
886 std::unordered_map<boost::uuids::uuid, member, boost::hash<boost::uuids::uuid>> previous_members,
887 const std::unordered_map<boost::uuids::uuid, member, boost::hash<boost::uuids::uuid>>& current_members) {
888 std::vector<member> new_members;
890 for (
auto const & e : current_members) {
891 if (!previous_members.erase(e.first)) {
892 new_members.emplace_back(e.second);
896 std::vector<membership_event> events;
899 for (
auto const &e : previous_members) {
900 events.emplace_back(client_.get_cluster(), e.second, membership_event::membership_event_type::MEMBER_LEFT, current_members);
901 auto connection = client_.get_connection_manager().get_connection(e.second.get_uuid());
903 connection->close(
"", std::make_exception_ptr(exception::target_disconnected(
904 "ClientClusterServiceImpl::detect_membership_events", (boost::format(
905 "The client has closed the connection to this member, after receiving a member left event from the cluster. %1%") %
906 *connection).str())));
909 for (
auto const &member : new_members) {
910 events.emplace_back(client_.get_cluster(), member, membership_event::membership_event_type::MEMBER_JOINED, current_members);
913 if (!events.empty()) {
914 auto snapshot = member_list_snapshot_.load();
915 if (!snapshot->members.empty()) {
916 HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
922 void ClientClusterServiceImpl::fire_events(std::vector<membership_event> events) {
923 std::lock_guard<std::mutex> g(listeners_lock_);
925 for (
auto const &event : events) {
926 for (
auto &item : listeners_) {
927 membership_listener &listener = item.second;
928 if (event.get_event_type() == membership_event::membership_event_type::MEMBER_JOINED) {
929 listener.joined_(event);
931 listener.left_(event);
937 void ClientClusterServiceImpl::wait_initial_member_list_fetched()
const {
939 if ((
const_cast<boost::latch&
>(initial_list_fetched_latch_)).wait_for(INITIAL_MEMBERS_TIMEOUT) == boost::cv_status::timeout) {
940 BOOST_THROW_EXCEPTION(exception::illegal_state(
941 "ClientClusterServiceImpl::wait_initial_member_list_fetched",
942 "Could not get initial member list from cluster!"));
947 ClientInvocationServiceImpl::invoke_on_connection(
const std::shared_ptr<ClientInvocation> &invocation,
948 const std::shared_ptr<connection::Connection> &connection) {
949 return send(invocation, connection);
952 bool ClientInvocationServiceImpl::invoke_on_partition_owner(
953 const std::shared_ptr<ClientInvocation> &invocation,
int partition_id) {
954 auto partition_owner = client_.get_partition_service().get_partition_owner(partition_id);
955 if (partition_owner.is_nil()) {
956 HZ_LOG(logger_, finest,
957 boost::str(boost::format(
"Partition owner is not assigned yet for partition %1%")
962 return invoke_on_target(invocation, partition_owner);
965 bool ClientInvocationServiceImpl::invoke_on_target(
const std::shared_ptr<ClientInvocation> &invocation,
966 boost::uuids::uuid uuid) {
967 assert (!uuid.is_nil());
968 auto connection = client_.get_connection_manager().get_connection(uuid);
970 HZ_LOG(logger_, finest,
971 boost::str(boost::format(
"Client is not connected to target : %1%")
976 return send(invocation, connection);
979 bool ClientInvocationServiceImpl::is_smart_routing()
const {
980 return smart_routing_;
983 const std::chrono::milliseconds &ClientInvocationServiceImpl::get_backup_timeout()
const {
984 return backup_timeout_;
987 bool ClientInvocationServiceImpl::fail_on_indeterminate_state()
const {
988 return fail_on_indeterminate_operation_state_;
991 ClientExecutionServiceImpl::ClientExecutionServiceImpl(
const std::string &name,
992 const client_properties &properties,
994 spi::lifecycle_service &service)
995 : lifecycle_service_(service), client_properties_(properties) {}
997 void ClientExecutionServiceImpl::start() {
998 int internalPoolSize = client_properties_.get_integer(
999 client_properties_.get_internal_executor_pool_size());
1000 if (internalPoolSize <= 0) {
1001 internalPoolSize = util::IOUtil::to_value<int>(
1002 client_properties::INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT);
1005 internal_executor_.reset(
new hazelcast::util::hz_thread_pool(internalPoolSize));
1007 user_executor_.reset(
new hazelcast::util::hz_thread_pool());
1010 void ClientExecutionServiceImpl::shutdown() {
1011 shutdown_thread_pool(internal_executor_.get());
1012 shutdown_thread_pool(user_executor_.get());
1015 util::hz_thread_pool &ClientExecutionServiceImpl::get_user_executor() {
1016 return *user_executor_;
1019 void ClientExecutionServiceImpl::shutdown_thread_pool(hazelcast::util::hz_thread_pool *pool) {
1026 constexpr
int ClientInvocation::MAX_FAST_INVOCATION_COUNT;
1027 constexpr
int ClientInvocation::UNASSIGNED_PARTITION;
1029 ClientInvocation::ClientInvocation(spi::ClientContext &client_context,
1030 std::shared_ptr<protocol::ClientMessage> &&message,
1031 const std::string &name,
1033 const std::shared_ptr<connection::Connection> &conn,
1034 boost::uuids::uuid uuid) :
1035 logger_(client_context.get_logger()),
1036 lifecycle_service_(client_context.get_lifecycle_service()),
1037 invocation_service_(client_context.get_invocation_service()),
1038 execution_service_(client_context.get_client_execution_service().shared_from_this()),
1039 call_id_sequence_(client_context.get_call_id_sequence()),
1041 partition_id_(partition),
1042 start_time_(std::chrono::steady_clock::now()),
1043 retry_pause_(invocation_service_.get_invocation_retry_pause()),
1045 connection_(conn), bound_to_single_connection_(conn != nullptr),
1046 invoke_count_(0), urgent_(false), smart_routing_(invocation_service_.is_smart_routing()) {
1047 message->set_partition_id(partition_id_);
1048 client_message_ = boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(message);
1049 set_send_connection(
nullptr);
1052 ClientInvocation::~ClientInvocation() =
default;
1054 boost::future<protocol::ClientMessage> ClientInvocation::invoke() {
1055 assert (client_message_.load());
1057 call_id_sequence_->next();
1058 invoke_on_selection();
1059 if (!lifecycle_service_.is_running()) {
1060 return invocation_promise_.get_future().then([](boost::future<protocol::ClientMessage> f) {
1064 auto id_seq = call_id_sequence_;
1065 return invocation_promise_.get_future().then(execution_service_->get_user_executor(),
1066 [=](boost::future<protocol::ClientMessage> f) {
1072 boost::future<protocol::ClientMessage> ClientInvocation::invoke_urgent() {
1073 assert(client_message_.load());
1076 call_id_sequence_->force_next();
1077 invoke_on_selection();
1078 if (!lifecycle_service_.is_running()) {
1079 return invocation_promise_.get_future().then([](boost::future<protocol::ClientMessage> f) {
1083 auto id_seq = call_id_sequence_;
1084 return invocation_promise_.get_future().then(execution_service_->get_user_executor(),
1085 [=](boost::future<protocol::ClientMessage> f) {
1091 void ClientInvocation::invoke_on_selection() {
1095 invocation_service_.check_invocation_allowed();
1098 if (is_bind_to_single_connection()) {
1099 bool invoked =
false;
1100 auto conn = connection_.lock();
1102 invoked = invocation_service_.invoke_on_connection(shared_from_this(), conn);
1105 std::string message;
1107 message = (boost::format(
"Could not invoke on connection %1%") % *conn).str();
1109 message =
"Could not invoke. Bound to a connection that is deleted already.";
1111 notify_exception(std::make_exception_ptr(
1112 exception::io(
"ClientInvocation::invoke_on_selection", message)));
1117 bool invoked =
false;
1118 if (smart_routing_) {
1119 if (partition_id_ != -1) {
1120 invoked = invocation_service_.invoke_on_partition_owner(shared_from_this(), partition_id_);
1121 }
else if (!uuid_.is_nil()) {
1122 invoked = invocation_service_.invoke_on_target(shared_from_this(), uuid_);
1124 invoked = invocation_service_.invoke(shared_from_this());
1127 invoked = invocation_service_.invoke(shared_from_this());
1130 invoked = invocation_service_.invoke(shared_from_this());
1133 notify_exception(std::make_exception_ptr(exception::io(
"No connection found to invoke")));
1135 }
catch (exception::iexception &) {
1136 notify_exception(std::current_exception());
1137 }
catch (std::exception &) {
1142 bool ClientInvocation::is_bind_to_single_connection()
const {
1143 return bound_to_single_connection_;
1146 void ClientInvocation::run() {
1150 void ClientInvocation::retry() {
1153 client_message_ = boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(copy_message());
1156 invoke_on_selection();
1157 }
catch (exception::iexception &e) {
1158 set_exception(e, boost::current_exception());
1159 }
catch (std::exception &) {
1164 void ClientInvocation::set_exception(
const std::exception &e, boost::exception_ptr exception_ptr) {
1165 invoked_or_exception_set_.store(
true);
1167 auto send_conn = send_connection_.load();
1169 auto connection = send_conn->lock();
1171 auto call_id = client_message_.load()->get()->get_correlation_id();
1172 boost::asio::post(connection->get_socket().get_executor(), [=]() {
1173 connection->deregister_invocation(call_id);
1177 invocation_promise_.set_exception(std::move(exception_ptr));
1178 }
catch (boost::promise_already_satisfied &se) {
1179 if (!event_handler_) {
1180 HZ_LOG(logger_, finest,
1181 boost::str(boost::format(
"Failed to set the exception for invocation. "
1182 "%1%, %2% Exception to be set: %3%")
1183 % se.what() % *
this % e.what())
1189 void ClientInvocation::notify_exception(std::exception_ptr exception) {
1192 std::rethrow_exception(exception);
1193 }
catch (exception::iexception &iex) {
1196 if (!lifecycle_service_.is_running()) {
1198 std::throw_with_nested(boost::enable_current_exception(
1199 exception::hazelcast_client_not_active(iex.get_source(),
1200 "Client is shutting down")));
1201 }
catch (exception::iexception &e) {
1202 set_exception(e, boost::current_exception());
1207 if (!should_retry(iex)) {
1208 set_exception(iex, boost::current_exception());
1212 auto timePassed = std::chrono::steady_clock::now() - start_time_;
1213 if (timePassed > invocation_service_.get_invocation_timeout()) {
1214 HZ_LOG(logger_, finest,
1215 boost::str(boost::format(
"Exception will not be retried because "
1216 "invocation timed out. %1%") % iex.what())
1219 auto now = std::chrono::steady_clock::now();
1221 auto timeoutException = (exception::exception_builder<exception::operation_timeout>(
1222 "ClientInvocation::newoperation_timeout_exception") << *
this
1223 <<
" timed out because exception occurred after client invocation timeout "
1224 << std::chrono::duration_cast<std::chrono::milliseconds>(invocation_service_.get_invocation_timeout()).count()
1225 <<
"msecs. Last exception:" << iex
1226 <<
" Current time :" << util::StringUtil::time_to_string(now) <<
". "
1227 <<
"Start time: " << util::StringUtil::time_to_string(start_time_)
1228 <<
". Total elapsed time: " <<
1229 std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time_).count()
1230 <<
" ms. ").build();
1232 BOOST_THROW_EXCEPTION(timeoutException);
1234 set_exception(timeoutException, boost::current_exception());
1242 }
catch (std::exception &e) {
1243 set_exception(e, boost::current_exception());
1250 void ClientInvocation::erase_invocation()
const {
1251 if (!this->event_handler_) {
1252 auto sent_connection = get_send_connection();
1253 if (sent_connection) {
1254 auto this_invocation = shared_from_this();
1255 boost::asio::post(sent_connection->get_socket().get_executor(), [=] () {
1256 sent_connection->invocations.erase(this_invocation->get_client_message()->get_correlation_id());
1262 bool ClientInvocation::should_retry(exception::iexception &exception) {
1263 auto errorCode = exception.get_error_code();
1264 if (is_bind_to_single_connection() && (errorCode == protocol::IO || errorCode == protocol::TARGET_DISCONNECTED)) {
1268 if (!uuid_.is_nil() && errorCode == protocol::TARGET_NOT_MEMBER) {
1275 if (errorCode == protocol::IO || errorCode == protocol::HAZELCAST_INSTANCE_NOT_ACTIVE || exception.is_retryable()) {
1278 if (errorCode == protocol::TARGET_DISCONNECTED) {
1279 return client_message_.load()->get()->is_retryable() || invocation_service_.is_redo_operation();
1284 std::ostream &operator<<(std::ostream &os,
const ClientInvocation &invocation) {
1285 std::ostringstream target;
1286 if (invocation.is_bind_to_single_connection()) {
1287 auto conn = invocation.connection_.lock();
1289 target <<
"connection " << *conn;
1291 }
else if (invocation.partition_id_ != -1) {
1292 target <<
"partition " << invocation.partition_id_;
1293 }
else if (!invocation.uuid_.is_nil()) {
1294 target <<
"uuid " << boost::to_string(invocation.uuid_);
1298 os <<
"ClientInvocation{" <<
"requestMessage = " << *invocation.client_message_.load()->get()
1299 <<
", objectName = "
1300 << invocation.object_name_ <<
", target = " << target.str() <<
", sendConnection = ";
1301 auto sendConnection = invocation.get_send_connection();
1302 if (sendConnection) {
1303 os << *sendConnection;
1307 os <<
", backup_acks_expected_ = " <<
static_cast<int>(invocation.backup_acks_expected_)
1308 <<
", backup_acks_received = " << invocation.backup_acks_received_;
1310 if (invocation.pending_response_) {
1311 os <<
", pending_response: " << *invocation.pending_response_;
1319 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1320 std::shared_ptr<protocol::ClientMessage> &&client_message,
1321 const std::string &object_name,
1323 return std::shared_ptr<ClientInvocation>(
1324 new ClientInvocation(client_context, std::move(client_message), object_name, partition_id));
1327 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1328 std::shared_ptr<protocol::ClientMessage> &&client_message,
1329 const std::string &object_name,
1330 const std::shared_ptr<connection::Connection> &connection) {
1331 return std::shared_ptr<ClientInvocation>(
1332 new ClientInvocation(client_context, std::move(client_message), object_name, UNASSIGNED_PARTITION,
1337 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1338 std::shared_ptr<protocol::ClientMessage> &&client_message,
1339 const std::string &object_name,
1340 boost::uuids::uuid uuid) {
1341 return std::shared_ptr<ClientInvocation>(
1342 new ClientInvocation(client_context, std::move(client_message), object_name, UNASSIGNED_PARTITION,
1346 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1347 protocol::ClientMessage &client_message,
1348 const std::string &object_name,
1350 return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1351 object_name, partition_id);
1354 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1355 protocol::ClientMessage &client_message,
1356 const std::string &object_name,
1357 const std::shared_ptr<connection::Connection> &connection) {
1358 return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1359 object_name, connection);
1362 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1363 protocol::ClientMessage &client_message,
1364 const std::string &object_name,
1365 boost::uuids::uuid uuid) {
1366 return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1370 std::shared_ptr<connection::Connection> ClientInvocation::get_send_connection()
const {
1371 return send_connection_.load()->lock();
1374 void ClientInvocation::wait_invoked()
const {
1376 while (!invoked_or_exception_set_) {
1377 std::this_thread::sleep_for(retry_pause_);
1382 ClientInvocation::set_send_connection(
const std::shared_ptr<connection::Connection> &conn) {
1383 send_connection_.store(boost::make_shared<std::weak_ptr<connection::Connection>>(conn));
1384 invoked_or_exception_set_.store(
true);
1387 void ClientInvocation::notify(
const std::shared_ptr<protocol::ClientMessage> &msg) {
1389 BOOST_THROW_EXCEPTION(exception::illegal_argument(
"response can't be null"));
1392 int8_t expected_backups = msg->get_number_of_backups();
1396 if (expected_backups > backup_acks_received_) {
1400 pending_response_received_time_ = std::chrono::steady_clock::now();
1402 backup_acks_expected_ = expected_backups;
1406 pending_response_ = msg;
1418 void ClientInvocation::complete(
const std::shared_ptr<protocol::ClientMessage> &msg) {
1421 this->invocation_promise_.set_value(*msg);
1422 }
catch (std::exception &e) {
1423 HZ_LOG(logger_, warning,
1424 boost::str(boost::format(
"Failed to set the response for invocation. "
1425 "Dropping the response. %1%, %2% Response: %3%")
1426 % e.what() % *
this % *msg)
1429 this->erase_invocation();
1432 std::shared_ptr<protocol::ClientMessage> ClientInvocation::get_client_message()
const {
1433 return *client_message_.load();
1436 const std::shared_ptr<EventHandler<protocol::ClientMessage> > &
1437 ClientInvocation::get_event_handler()
const {
1438 return event_handler_;
1441 void ClientInvocation::set_event_handler(
1442 const std::shared_ptr<EventHandler<protocol::ClientMessage> > &handler) {
1443 ClientInvocation::event_handler_ = handler;
1446 void ClientInvocation::execute() {
1447 auto this_invocation = shared_from_this();
1448 auto command = [=]() {
1449 this_invocation->run();
1455 int64_t callId = call_id_sequence_->force_next();
1456 client_message_.load()->get()->set_correlation_id(callId);
1459 call_id_sequence_->complete();
1461 if (invoke_count_ < MAX_FAST_INVOCATION_COUNT) {
1463 execution_service_->execute(command);
1466 int64_t delayMillis = util::min<int64_t>(
static_cast<int64_t
>(1) << (invoke_count_ - MAX_FAST_INVOCATION_COUNT),
1467 std::chrono::duration_cast<std::chrono::milliseconds>(retry_pause_).count());
1468 retry_timer_ = execution_service_->schedule(command, std::chrono::milliseconds(delayMillis));
1472 const std::string ClientInvocation::get_name()
const {
1473 return "ClientInvocation";
1476 std::shared_ptr<protocol::ClientMessage> ClientInvocation::copy_message() {
1477 return std::make_shared<protocol::ClientMessage>(**client_message_.load());
1480 boost::promise<protocol::ClientMessage> &ClientInvocation::get_promise() {
1481 return invocation_promise_;
1484 void ClientInvocation::log_exception(exception::iexception &e) {
1485 HZ_LOG(logger_, finest,
1486 boost::str(boost::format(
"Invocation got an exception %1%, invoke count : %2%, "
1488 % *
this % invoke_count_.load() % e)
1492 void ClientInvocation::notify_backup() {
1493 ++backup_acks_received_;
1495 if (!pending_response_) {
1501 if (backup_acks_expected_ != backup_acks_received_) {
1508 complete_with_pending_response();
1512 ClientInvocation::detect_and_handle_backup_timeout(
const std::chrono::milliseconds &backup_timeout) {
1515 if (backup_acks_expected_ == backup_acks_received_) {
1521 if (!pending_response_) {
1526 if (pending_response_received_time_ + backup_timeout >= std::chrono::steady_clock::now()) {
1530 if (invocation_service_.fail_on_indeterminate_state()) {
1531 auto exception = boost::enable_current_exception((exception::exception_builder<exception::indeterminate_operation_state>(
1532 "ClientInvocation::detect_and_handle_backup_timeout") << *
this
1533 <<
" failed because backup acks missed.").build());
1534 notify_exception(std::make_exception_ptr(exception));
1539 complete_with_pending_response();
1542 void ClientInvocation::complete_with_pending_response() {
1543 complete(pending_response_);
1546 ClientContext &impl::ClientTransactionManagerServiceImpl::get_client()
const {
1550 ClientTransactionManagerServiceImpl::ClientTransactionManagerServiceImpl(ClientContext &client)
1551 : client_(client) {}
1553 std::shared_ptr<connection::Connection> ClientTransactionManagerServiceImpl::connect() {
1554 auto &invocationService = client_.get_invocation_service();
1555 auto startTime = std::chrono::steady_clock::now();
1556 auto invocationTimeout = invocationService.get_invocation_timeout();
1557 client_config &clientConfig = client_.get_client_config();
1558 bool smartRouting = clientConfig.get_network_config().is_smart_routing();
1560 while (client_.get_lifecycle_service().is_running()) {
1562 auto connection = client_.get_connection_manager().get_random_connection();
1564 throw_exception(smartRouting);
1567 }
catch (exception::hazelcast_client_offline &) {
1569 }
catch (exception::iexception &) {
1570 if (std::chrono::steady_clock::now() - startTime > invocationTimeout) {
1571 std::rethrow_exception(
1572 new_operation_timeout_exception(std::current_exception(), invocationTimeout,
1576 std::this_thread::sleep_for(invocationService.get_invocation_retry_pause());
1578 BOOST_THROW_EXCEPTION(
1579 exception::hazelcast_client_not_active(
"ClientTransactionManagerServiceImpl::connect",
1580 "Client is shutdown"));
1584 ClientTransactionManagerServiceImpl::new_operation_timeout_exception(std::exception_ptr cause,
1585 std::chrono::milliseconds invocation_timeout,
1586 std::chrono::steady_clock::time_point start_time) {
1587 std::ostringstream sb;
1588 auto now = std::chrono::steady_clock::now();
1590 <<
"Creating transaction context timed out because exception occurred after client invocation timeout "
1591 << std::chrono::duration_cast<std::chrono::milliseconds>(invocation_timeout).count() <<
" ms. " <<
"Current time: "
1592 << util::StringUtil::time_to_string(std::chrono::steady_clock::now()) <<
". " <<
"Start time: "
1593 << util::StringUtil::time_to_string(start_time) <<
". Total elapsed time: "
1594 << std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time).count() <<
" ms. ";
1596 std::rethrow_exception(cause);
1599 std::throw_with_nested(boost::enable_current_exception(exception::operation_timeout(
1600 "ClientTransactionManagerServiceImpl::newoperation_timeout_exception", sb.str())));
1602 return std::current_exception();
1608 void ClientTransactionManagerServiceImpl::throw_exception(
bool smart_routing) {
1609 auto &client_config = client_.get_client_config();
1610 auto &connection_strategy_Config = client_config.get_connection_strategy_config();
1611 auto reconnect_mode = connection_strategy_Config.get_reconnect_mode();
1612 if (reconnect_mode == config::client_connection_strategy_config::reconnect_mode::ASYNC) {
1613 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
1614 "ClientTransactionManagerServiceImpl::throw_exception",
""));
1616 if (smart_routing) {
1617 auto members = client_.get_cluster().get_members();
1618 std::ostringstream msg;
1619 if (members.empty()) {
1620 msg <<
"No address was return by the LoadBalancer since there are no members in the cluster";
1622 msg <<
"No address was return by the LoadBalancer. "
1623 "But the cluster contains the following members:{\n";
1624 for (
auto const &m : members) {
1625 msg <<
'\t' << m <<
'\n';
1629 BOOST_THROW_EXCEPTION(exception::illegal_state(
1630 "ClientTransactionManagerServiceImpl::throw_exception", msg.str()));
1632 BOOST_THROW_EXCEPTION(exception::illegal_state(
1633 "ClientTransactionManagerServiceImpl::throw_exception",
1634 "No active connection is found"));
1637 ClientPartitionServiceImpl::ClientPartitionServiceImpl(ClientContext &client)
1638 : client_(client), logger_(client.get_logger()), partition_count_(0),
1639 partition_table_(boost::shared_ptr<partition_table>(new partition_table{0, -1})) {
1642 void ClientPartitionServiceImpl::handle_event(int32_t connection_id, int32_t version,
1643 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
1644 HZ_LOG(logger_, finest,
1645 boost::str(boost::format(
"Handling new partition table with partitionStateVersion: %1%") % version)
1649 auto current = partition_table_.load();
1650 if (!should_be_applied(connection_id, version, partitions, *current)) {
1653 if (partition_table_.compare_exchange_strong(current, boost::shared_ptr<partition_table>(
1654 new partition_table{connection_id, version, convert_to_map(partitions)}))) {
1655 HZ_LOG(logger_, finest,
1656 boost::str(boost::format(
"Applied partition table with partitionStateVersion : %1%") % version)
1664 boost::uuids::uuid ClientPartitionServiceImpl::get_partition_owner(int32_t partition_id) {
1665 auto table_ptr = partition_table_.load();
1666 auto it = table_ptr->partitions.find(partition_id);
1667 if (it != table_ptr->partitions.end()) {
1670 return boost::uuids::nil_uuid();
1673 int32_t ClientPartitionServiceImpl::get_partition_id(
const serialization::pimpl::data &key) {
1674 int32_t pc = get_partition_count();
1678 int hash = key.get_partition_hash();
1679 return util::HashUtil::hash_to_index(hash, pc);
1682 int32_t ClientPartitionServiceImpl::get_partition_count() {
1683 return partition_count_.load();
1686 std::shared_ptr<client::impl::Partition> ClientPartitionServiceImpl::get_partition(
int partition_id) {
1687 return std::shared_ptr<client::impl::Partition>(
new PartitionImpl(partition_id, client_, *
this));
1690 bool ClientPartitionServiceImpl::check_and_set_partition_count(int32_t new_partition_count) {
1691 int32_t expected = 0;
1692 if (partition_count_.compare_exchange_strong(expected, new_partition_count)) {
1695 return partition_count_.load() == new_partition_count;
1699 ClientPartitionServiceImpl::should_be_applied(int32_t connection_id, int32_t version,
1700 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions,
1701 const partition_table ¤t) {
1702 auto &lg = client_.get_logger();
1703 if (partitions.empty()) {
1704 if (logger_.enabled(logger::level::finest)) {
1705 log_failure(connection_id, version, current,
"response is empty");
1709 if (!current.connection_id || connection_id != current.connection_id) {
1711 ([¤t, connection_id](){
1712 auto frmt = boost::format(
"Event coming from a new connection. Old connection id: %1%, "
1713 "new connection %2%");
1715 if (current.connection_id) {
1716 frmt = frmt % current.connection_id;
1718 frmt = frmt %
"none";
1721 return boost::str(frmt % connection_id);
1727 if (version <= current.version) {
1728 if (lg.enabled(logger::level::finest)) {
1729 log_failure(connection_id, version, current,
"response state version is old");
1736 void ClientPartitionServiceImpl::log_failure(int32_t connection_id, int32_t version,
1737 const ClientPartitionServiceImpl::partition_table ¤t,
1738 const std::string &cause) {
1739 HZ_LOG(logger_, finest,
1741 auto frmt = boost::format(
" We will not apply the response, since %1% ."
1742 " Response is from connection with id %2%. "
1743 "Current connection id is %3%, response state version:%4%. "
1744 "Current state version: %5%");
1745 if (current.connection_id) {
1746 return boost::str(frmt % cause % connection_id % current.connection_id % version %
1750 return boost::str(frmt % cause % connection_id %
"nullptr" % version % current.version);
1756 void ClientPartitionServiceImpl::reset() {
1757 partition_table_.store(
nullptr);
1760 std::unordered_map<int32_t, boost::uuids::uuid> ClientPartitionServiceImpl::convert_to_map(
1761 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
1762 std::unordered_map<int32_t, boost::uuids::uuid> new_partitions;
1763 for (
auto const &e : partitions) {
1764 for (
auto pid: e.second) {
1765 new_partitions.insert({pid, e.first});
1768 return new_partitions;
1771 int ClientPartitionServiceImpl::PartitionImpl::get_partition_id()
const {
1772 return partition_id_;
1775 boost::optional<member> ClientPartitionServiceImpl::PartitionImpl::get_owner()
const {
1776 auto owner = partition_service_.get_partition_owner(partition_id_);
1777 if (!owner.is_nil()) {
1778 return client_.get_client_cluster_service().get_member(owner);
1783 ClientPartitionServiceImpl::PartitionImpl::PartitionImpl(
int partition_id, ClientContext &client,
1784 ClientPartitionServiceImpl &partition_service)
1785 : partition_id_(partition_id), client_(client), partition_service_(partition_service) {
1788 namespace sequence {
1789 CallIdSequenceWithoutBackpressure::CallIdSequenceWithoutBackpressure() : head_(0) {}
1791 CallIdSequenceWithoutBackpressure::~CallIdSequenceWithoutBackpressure() =
default;
1793 int32_t CallIdSequenceWithoutBackpressure::get_max_concurrent_invocations()
const {
1797 int64_t CallIdSequenceWithoutBackpressure::next() {
1798 return force_next();
1801 int64_t CallIdSequenceWithoutBackpressure::force_next() {
1805 void CallIdSequenceWithoutBackpressure::complete() {
1809 int64_t CallIdSequenceWithoutBackpressure::get_last_call_id() {
1814 AbstractCallIdSequence::AbstractCallIdSequence(int32_t max_concurrent_invocations) {
1815 std::ostringstream out;
1816 out <<
"maxConcurrentInvocations should be a positive number. maxConcurrentInvocations="
1817 << max_concurrent_invocations;
1818 this->max_concurrent_invocations_ = util::Preconditions::check_positive(max_concurrent_invocations,
1821 for (
size_t i = 0; i < longs_.size(); ++i) {
1826 AbstractCallIdSequence::~AbstractCallIdSequence() =
default;
1828 int32_t AbstractCallIdSequence::get_max_concurrent_invocations()
const {
1829 return max_concurrent_invocations_;
1832 int64_t AbstractCallIdSequence::next() {
1834 handle_no_space_left();
1836 return force_next();
1839 int64_t AbstractCallIdSequence::force_next() {
1840 return ++longs_[INDEX_HEAD];
1843 void AbstractCallIdSequence::complete() {
1844 ++longs_[INDEX_TAIL];
1845 assert(longs_[INDEX_TAIL] <= longs_[INDEX_HEAD]);
1848 int64_t AbstractCallIdSequence::get_last_call_id() {
1849 return longs_[INDEX_HEAD];
1852 bool AbstractCallIdSequence::has_space() {
1853 return longs_[INDEX_HEAD] - longs_[INDEX_TAIL] < max_concurrent_invocations_;
1856 int64_t AbstractCallIdSequence::get_tail() {
1857 return longs_[INDEX_TAIL];
1860 const std::unique_ptr<util::concurrent::IdleStrategy> CallIdSequenceWithBackpressure::IDLER(
1861 new util::concurrent::BackoffIdleStrategy(
1862 0, 0, std::chrono::duration_cast<std::chrono::nanoseconds>(
1863 std::chrono::microseconds(1000)).count(),
1864 std::chrono::duration_cast<std::chrono::nanoseconds>(
1865 std::chrono::microseconds(MAX_DELAY_MS * 1000)).count()));
1867 CallIdSequenceWithBackpressure::CallIdSequenceWithBackpressure(int32_t max_concurrent_invocations,
1868 int64_t backoff_timeout_ms)
1869 : AbstractCallIdSequence(max_concurrent_invocations) {
1870 std::ostringstream out;
1871 out <<
"backoffTimeoutMs should be a positive number. backoffTimeoutMs=" << backoff_timeout_ms;
1872 util::Preconditions::check_positive(backoff_timeout_ms, out.str());
1874 backoff_timeout_nanos_ = std::chrono::duration_cast<std::chrono::nanoseconds>(
1875 std::chrono::milliseconds(backoff_timeout_ms)).count();
1878 void CallIdSequenceWithBackpressure::handle_no_space_left() {
1879 auto start = std::chrono::steady_clock::now();
1880 for (int64_t idleCount = 0;; idleCount++) {
1881 int64_t elapsedNanos = std::chrono::duration_cast<std::chrono::nanoseconds>(
1882 std::chrono::steady_clock::now() - start).count();
1883 if (elapsedNanos > backoff_timeout_nanos_) {
1884 throw (exception::exception_builder<exception::hazelcast_overload>(
1885 "CallIdSequenceWithBackpressure::handleNoSpaceLeft")
1886 <<
"Timed out trying to acquire another call ID."
1887 <<
" maxConcurrentInvocations = " << get_max_concurrent_invocations()
1888 <<
", backoffTimeout = "
1889 << std::chrono::microseconds(backoff_timeout_nanos_ / 1000).count()
1890 <<
" msecs, elapsed:"
1891 << std::chrono::microseconds(elapsedNanos / 1000).count() <<
" msecs").build();
1893 IDLER->idle(idleCount);
1901 FailFastCallIdSequence::FailFastCallIdSequence(int32_t max_concurrent_invocations)
1902 : AbstractCallIdSequence(max_concurrent_invocations) {}
1904 void FailFastCallIdSequence::handle_no_space_left() {
1905 throw (exception::exception_builder<exception::hazelcast_overload>(
1906 "FailFastCallIdSequence::handleNoSpaceLeft")
1907 <<
"Maximum invocation count is reached. maxConcurrentInvocations = "
1908 << get_max_concurrent_invocations()).build();
1912 std::unique_ptr<CallIdSequence> CallIdFactory::new_call_id_sequence(
bool is_back_pressure_enabled,
1913 int32_t max_allowed_concurrent_invocations,
1914 int64_t backoff_timeout_ms) {
1915 if (!is_back_pressure_enabled) {
1916 return std::unique_ptr<CallIdSequence>(
new CallIdSequenceWithoutBackpressure());
1917 }
else if (backoff_timeout_ms <= 0) {
1918 return std::unique_ptr<CallIdSequence>(
1919 new FailFastCallIdSequence(max_allowed_concurrent_invocations));
1921 return std::unique_ptr<CallIdSequence>(
1922 new CallIdSequenceWithBackpressure(max_allowed_concurrent_invocations,
1923 backoff_timeout_ms));
1928 namespace listener {
1929 listener_service_impl::listener_service_impl(ClientContext &client_context,
1930 int32_t event_thread_count)
1931 : client_context_(client_context),
1932 serialization_service_(client_context.get_serialization_service()),
1933 logger_(client_context.get_logger()),
1934 client_connection_manager_(client_context.get_connection_manager()),
1935 number_of_event_threads_(event_thread_count),
1936 smart_(client_context.get_client_config().get_network_config().is_smart_routing()) {
1937 auto &invocationService = client_context.get_invocation_service();
1938 invocation_timeout_ = invocationService.get_invocation_timeout();
1939 invocation_retry_pause_ = invocationService.get_invocation_retry_pause();
1942 bool listener_service_impl::registers_local_only()
const {
1946 boost::future<boost::uuids::uuid>
1947 listener_service_impl::register_listener(
1948 std::shared_ptr<ListenerMessageCodec> listener_message_codec,
1949 std::shared_ptr<client::impl::BaseEventHandler> handler) {
1950 auto task = boost::packaged_task<boost::uuids::uuid()>([=]() {
1951 return register_listener_internal(listener_message_codec, handler);
1953 auto f = task.get_future();
1954 boost::asio::post(registration_executor_->get_executor(), std::move(task));
1958 boost::future<bool> listener_service_impl::deregister_listener(boost::uuids::uuid registration_id) {
1959 util::Preconditions::check_not_nill(registration_id,
"Nil userRegistrationId is not allowed!");
1961 boost::packaged_task<bool()> task([=]() {
1962 return deregister_listener_internal(registration_id);
1964 auto f = task.get_future();
1965 boost::asio::post(registration_executor_->get_executor(), std::move(task));
1969 void listener_service_impl::connection_added(
1970 const std::shared_ptr<connection::Connection> connection) {
1971 boost::asio::post(registration_executor_->get_executor(), [=]() { connection_added_internal(connection); });
1974 void listener_service_impl::connection_removed(
1975 const std::shared_ptr<connection::Connection> connection) {
1976 boost::asio::post(registration_executor_->get_executor(), [=]() { connection_removed_internal(connection); });
1980 listener_service_impl::remove_event_handler(int64_t call_id,
1981 const std::shared_ptr<connection::Connection> &connection) {
1982 boost::asio::post(connection->get_socket().get_executor(),
1983 std::packaged_task<
void()>([=]() {
1984 connection->deregister_invocation(call_id);
1988 void listener_service_impl::handle_client_message(
1989 const std::shared_ptr<ClientInvocation> invocation,
1990 const std::shared_ptr<protocol::ClientMessage> response) {
1992 auto partitionId = response->get_partition_id();
1993 if (partitionId == -1) {
1995 boost::asio::post(event_executor_->get_executor(), [=]() { process_event_message(invocation, response); });
2000 boost::asio::post(event_strands_[partitionId % event_strands_.size()],
2001 [=]() { process_event_message(invocation, response); });
2003 }
catch (
const std::exception &e) {
2004 if (client_context_.get_lifecycle_service().is_running()) {
2005 HZ_LOG(logger_, warning,
2006 boost::str(boost::format(
"Delivery of event message to event handler failed. %1%, %2%, %3%")
2007 % e.what() % *response % *invocation)
2013 void listener_service_impl::shutdown() {
2014 ClientExecutionServiceImpl::shutdown_thread_pool(event_executor_.get());
2015 ClientExecutionServiceImpl::shutdown_thread_pool(registration_executor_.get());
2018 void listener_service_impl::start() {
2019 event_executor_.reset(
new hazelcast::util::hz_thread_pool(number_of_event_threads_));
2020 registration_executor_.reset(
new hazelcast::util::hz_thread_pool(1));
2022 for (
int i = 0; i < number_of_event_threads_; ++i) {
2023 event_strands_.emplace_back(event_executor_->get_executor());
2026 client_connection_manager_.add_connection_listener(shared_from_this());
2029 boost::uuids::uuid listener_service_impl::register_listener_internal(
2030 std::shared_ptr<ListenerMessageCodec> listener_message_codec,
2031 std::shared_ptr<client::impl::BaseEventHandler> handler) {
2032 auto user_registration_id = client_context_.random_uuid();
2034 std::shared_ptr<listener_registration> registration(
new listener_registration{listener_message_codec, handler});
2035 registrations_.put(user_registration_id, registration);
2036 for (
auto const &connection : client_connection_manager_.get_active_connections()) {
2038 invoke(registration, connection);
2039 }
catch (exception::iexception &e) {
2040 if (connection->is_alive()) {
2041 deregister_listener_internal(user_registration_id);
2042 BOOST_THROW_EXCEPTION((exception::exception_builder<exception::hazelcast_>(
2043 "ClientListenerService::RegisterListenerTask::call")
2044 <<
"Listener can not be added " << e).build());
2048 return user_registration_id;
2052 listener_service_impl::deregister_listener_internal(boost::uuids::uuid user_registration_id) {
2053 auto listenerRegistration = registrations_.get(user_registration_id);
2054 if (!listenerRegistration) {
2057 bool successful =
true;
2059 auto listener_registrations = listenerRegistration->registrations.entry_set();
2060 for (
auto it = listener_registrations.begin();it != listener_registrations.end();) {
2061 const auto ®istration = it->second;
2062 const auto& subscriber = it->first;
2064 const auto &listenerMessageCodec = listenerRegistration->codec;
2065 auto serverRegistrationId = registration->server_registration_id;
2066 auto request = listenerMessageCodec->encode_remove_request(serverRegistrationId);
2067 auto invocation = ClientInvocation::create(client_context_,request,
"",
2069 invocation->invoke().get();
2071 remove_event_handler(registration->call_id, subscriber);
2073 it = listener_registrations.erase(it);
2074 }
catch (exception::iexception &e) {
2076 if (subscriber->is_alive()) {
2078 std::ostringstream endpoint;
2079 if (subscriber->get_remote_address()) {
2080 endpoint << *subscriber->get_remote_address();
2084 HZ_LOG(logger_, warning,
2085 boost::str(boost::format(
"ClientListenerService::deregisterListenerInternal "
2086 "Deregistration of listener with ID %1% "
2087 "has failed to address %2% %3%")
2088 % user_registration_id
2089 % subscriber->get_remote_address() % e)
2095 registrations_.remove(user_registration_id);
2100 void listener_service_impl::connection_added_internal(
2101 const std::shared_ptr<connection::Connection> &connection) {
2102 for (
const auto &listener_registration : registrations_.values()) {
2103 invoke_from_internal_thread(listener_registration, connection);
2107 void listener_service_impl::connection_removed_internal(
2108 const std::shared_ptr<connection::Connection> &connection) {
2109 for (
auto ®istry : registrations_.values()) {
2110 registry->registrations.remove(connection);
2115 listener_service_impl::invoke_from_internal_thread(
2116 const std::shared_ptr<listener_registration> &listener_registration,
2117 const std::shared_ptr<connection::Connection> &connection) {
2119 invoke(listener_registration, connection);
2120 }
catch (exception::iexception &e) {
2121 HZ_LOG(logger_, warning,
2122 boost::str(boost::format(
"Listener with pointer %1% can not be added to "
2123 "a new connection: %2%, reason: %3%")
2124 % listener_registration.get() % *connection % e)
2130 listener_service_impl::invoke(
const std::shared_ptr<listener_registration> &listener_registration,
2131 const std::shared_ptr<connection::Connection> &connection) {
2132 if (listener_registration->registrations.contains_key(connection)) {
2136 const auto &codec = listener_registration->codec;
2137 auto request = codec->encode_add_request(registers_local_only());
2138 const auto &handler = listener_registration->handler;
2139 handler->before_listener_register();
2141 auto invocation = ClientInvocation::create(client_context_,
2142 std::make_shared<protocol::ClientMessage>(std::move(request)),
"",
2144 invocation->set_event_handler(handler);
2145 auto clientMessage = invocation->invoke_urgent().get();
2147 auto serverRegistrationId = codec->decode_add_response(clientMessage);
2148 handler->on_listener_register();
2149 int64_t correlationId = invocation->get_client_message()->get_correlation_id();
2151 (*listener_registration).registrations.put(connection, std::shared_ptr<connection_registration>(
2152 new connection_registration{serverRegistrationId, correlationId}));
2155 void listener_service_impl::process_event_message(
2156 const std::shared_ptr<ClientInvocation> invocation,
2157 const std::shared_ptr<protocol::ClientMessage> response) {
2158 auto eventHandler = invocation->get_event_handler();
2159 if (!eventHandler) {
2160 if (client_context_.get_lifecycle_service().is_running()) {
2161 HZ_LOG(logger_, warning,
2162 boost::str(boost::format(
"No eventHandler for invocation. "
2163 "Ignoring this invocation response. %1%")
2172 eventHandler->handle(*response);
2173 }
catch (std::exception &e) {
2174 if (client_context_.get_lifecycle_service().is_running()) {
2175 HZ_LOG(logger_, warning,
2176 boost::str(boost::format(
"Delivery of event message to event handler failed. %1%, %2%, %3%")
2177 % e.what() % *response % *invocation)
2183 listener_service_impl::~listener_service_impl() =
default;
2185 void cluster_view_listener::start() {
2186 client_context_.get_connection_manager().add_connection_listener(shared_from_this());
2189 void cluster_view_listener::connection_added(
const std::shared_ptr<connection::Connection> connection) {
2190 try_register(connection);
2193 void cluster_view_listener::connection_removed(
const std::shared_ptr<connection::Connection> connection) {
2194 try_reregister_to_random_connection(connection->get_connection_id());
2197 cluster_view_listener::cluster_view_listener(ClientContext &client_context) : client_context_(
2200 void cluster_view_listener::try_register(std::shared_ptr<connection::Connection> connection) {
2201 int32_t expected_id = -1;
2202 if (!listener_added_connection_id_.compare_exchange_strong(expected_id,
2203 connection->get_connection_id())) {
2208 auto invocation = ClientInvocation::create(client_context_,
2209 std::make_shared<protocol::ClientMessage>(
2210 protocol::codec::client_addclusterviewlistener_encode()),
"", connection);
2212 auto handler = std::make_shared<event_handler>(connection->get_connection_id(), *
this);
2213 invocation->set_event_handler(handler);
2214 handler->before_listener_register();
2216 std::weak_ptr<cluster_view_listener> weak_self = shared_from_this();
2217 auto conn_id = connection->get_connection_id();
2219 invocation->invoke_urgent().then(
2220 [weak_self, handler, conn_id](boost::future<protocol::ClientMessage> f) {
2221 auto self = weak_self.lock();
2225 if (f.has_value()) {
2226 handler->on_listener_register();
2231 self->try_reregister_to_random_connection(conn_id);
2236 void cluster_view_listener::try_reregister_to_random_connection(int32_t old_connection_id) {
2237 if (!listener_added_connection_id_.compare_exchange_strong(old_connection_id, -1)) {
2241 auto new_connection = client_context_.get_connection_manager().get_random_connection();
2242 if (new_connection) {
2243 try_register(new_connection);
2247 cluster_view_listener::~cluster_view_listener() =
default;
2250 cluster_view_listener::event_handler::handle_membersview(int32_t version,
2251 const std::vector<member> &member_infos) {
2252 view_listener.client_context_.get_client_cluster_service().handle_event(version, member_infos);
2256 cluster_view_listener::event_handler::handle_partitionsview(int32_t version,
2257 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
2258 view_listener.client_context_.get_partition_service().handle_event(connection_id, version, partitions);
2261 void cluster_view_listener::event_handler::before_listener_register() {
2262 view_listener.client_context_.get_client_cluster_service().clear_member_list_version();
2263 auto &lg = view_listener.client_context_.get_logger();
2265 boost::str(boost::format(
2266 "Register attempt of cluster_view_listener::event_handler to connection with id %1%") %
2270 void cluster_view_listener::event_handler::on_listener_register() {
2271 auto &lg = view_listener.client_context_.get_logger();
2273 boost::str(boost::format(
2274 "Registered cluster_view_listener::event_handler to connection with id %1%") %
2278 cluster_view_listener::event_handler::event_handler(
int connectionId,
2279 cluster_view_listener &viewListener)
2280 : connection_id(connectionId), view_listener(viewListener) {}
2283 protocol::ClientMessage
2284 ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_add_request(
bool local_only)
const {
2285 return protocol::codec::client_localbackuplistener_encode();
2288 protocol::ClientMessage ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_remove_request(
2289 boost::uuids::uuid real_registration_id)
const {
2291 return protocol::ClientMessage(0);
2294 void ClientInvocationServiceImpl::noop_backup_event_handler::handle_backup(
2295 int64_t source_invocation_correlation_id) {
2299 namespace discovery {
2300 remote_address_provider::remote_address_provider(
2301 std::function<std::unordered_map<address, address>()> addr_map_method,
2302 bool use_public) : refresh_address_map_(std::move(addr_map_method)),
2303 use_public_(use_public) {}
2305 std::vector<address> remote_address_provider::load_addresses() {
2306 auto address_map = refresh_address_map_();
2307 std::lock_guard<std::mutex> guard(lock_);
2308 private_to_public_ = address_map;
2309 std::vector<address> addresses;
2310 addresses.reserve(address_map.size());
2311 for (
const auto &addr_pair : address_map) {
2312 addresses.push_back(addr_pair.first);
2317 boost::optional<address> remote_address_provider::translate(
const address &addr) {
2324 std::lock_guard<std::mutex> guard(lock_);
2325 auto found = private_to_public_.find(addr);
2326 if (found != private_to_public_.end()) {
2327 return found->second;
2331 auto address_map = refresh_address_map_();
2333 std::lock_guard<std::mutex> guard(lock_);
2334 private_to_public_ = address_map;
2336 auto found = private_to_public_.find(addr);
2337 if (found != private_to_public_.end()) {
2338 return found->second;
2344 #ifdef HZ_BUILD_WITH_SSL
2345 cloud_discovery::cloud_discovery(config::cloud_config &config, std::string cloud_base_url,
2346 std::chrono::steady_clock::duration timeout)
2347 : cloud_config_(config), cloud_base_url_(cloud_base_url), timeout_(timeout) {}
2349 cloud_discovery::cloud_discovery(config::cloud_config &config, std::string cloud_base_url,
2350 std::chrono::steady_clock::duration timeout) {}
2353 std::unordered_map<address, address> cloud_discovery::get_addresses() {
2354 #ifdef HZ_BUILD_WITH_SSL
2356 util::SyncHttpsClient httpsConnection(cloud_base_url_, std::string(CLOUD_URL_PATH) +
2357 cloud_config_.discovery_token, timeout_);
2358 auto &conn_stream = httpsConnection.connect_and_get_response();
2359 return parse_json_response(conn_stream);
2360 }
catch (std::exception &e) {
2361 std::throw_with_nested(boost::enable_current_exception(
2362 exception::illegal_state(
"cloud_discovery::get_addresses",
2366 util::Preconditions::check_ssl(
"cloud_discovery::get_addresses");
2367 return std::unordered_map<address, address>();
2371 std::unordered_map<address, address>
2372 cloud_discovery::parse_json_response(std::istream &conn_stream) {
2373 namespace pt = boost::property_tree;
2376 pt::read_json(conn_stream, root);
2378 std::unordered_map<address, address> addresses;
2379 for (
const auto &item : root) {
2380 auto private_address = item.second.get<std::string>(PRIVATE_ADDRESS_PROPERTY);
2381 auto public_address = item.second.get<std::string>(PUBLIC_ADDRESS_PROPERTY);
2383 address public_addr = create_address(public_address, -1);
2385 auto private_addr = create_address(private_address, public_addr.get_port());
2386 addresses.emplace(std::move(private_addr), std::move(public_addr));
2392 address cloud_discovery::create_address(
const std::string &hostname,
int default_port) {
2393 auto address_holder = util::AddressUtil::get_address_holder(hostname, default_port);
2394 auto scoped_hostname = util::AddressHelper::get_scoped_hostname(address_holder);
2395 return address(std::move(scoped_hostname), address_holder.get_port());
2404 bool less<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
2405 const hazelcast::client::spi::DefaultObjectNamespace &lhs,
2406 const hazelcast::client::spi::DefaultObjectNamespace &rhs)
const {
2407 int result = lhs.get_service_name().compare(rhs.get_service_name());
2416 return lhs.get_object_name().compare(rhs.get_object_name()) < 0;
2420 hash<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
2421 const hazelcast::client::spi::DefaultObjectNamespace &k)
const noexcept {
2422 return std::hash<std::string>()(k.get_service_name() + k.get_object_name());
Hazelcast cluster interface.
const std::unordered_set< member > & get_members() const
Returns an immutable set of ordered members at the moment this MembershipListener is registered.
cluster & get_cluster()
Returns the cluster of the event.
lifecycle_state get_state() const
lifecycle_state
State enum.
lifecycle_event(lifecycle_state state)
Constructor.