34 #include <boost/uuid/uuid_hash.hpp>
35 #include <boost/functional/hash.hpp>
36 #include <boost/property_tree/ptree.hpp>
37 #include <boost/property_tree/json_parser.hpp>
39 #include "hazelcast/client/hazelcast_client.h"
40 #include <hazelcast/client/protocol/codec/ErrorCodec.h>
41 #include <hazelcast/client/spi/impl/ListenerMessageCodec.h>
42 #include <hazelcast/client/spi/impl/ClientClusterServiceImpl.h>
43 #include <hazelcast/client/spi/impl/listener/cluster_view_listener.h>
44 #include <hazelcast/client/spi/impl/listener/listener_service_impl.h>
45 #include <hazelcast/client/spi/impl/discovery/remote_address_provider.h>
46 #include <hazelcast/client/spi/impl/discovery/cloud_discovery.h>
47 #include <hazelcast/util/AddressUtil.h>
48 #include "hazelcast/client/member_selectors.h"
49 #include "hazelcast/client/lifecycle_event.h"
50 #include "hazelcast/client/initial_membership_event.h"
51 #include "hazelcast/client/membership_event.h"
52 #include "hazelcast/client/lifecycle_listener.h"
53 #include "hazelcast/client/spi/ProxyManager.h"
54 #include "hazelcast/client/spi/ClientProxy.h"
55 #include "hazelcast/client/spi/ClientContext.h"
56 #include "hazelcast/client/spi/impl/ClientInvocation.h"
57 #include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
58 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
59 #include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
60 #include "hazelcast/client/spi/impl/DefaultAddressProvider.h"
61 #include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithBackpressure.h"
62 #include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithoutBackpressure.h"
63 #include "hazelcast/client/spi/impl/sequence/FailFastCallIdSequence.h"
64 #include "hazelcast/client/spi/impl/sequence/CallIdFactory.h"
65 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
66 #include "hazelcast/util/AddressHelper.h"
67 #include "hazelcast/util/HashUtil.h"
68 #include "hazelcast/util/concurrent/BackoffIdleStrategy.h"
69 #ifdef HZ_BUILD_WITH_SSL
70 #include <hazelcast/util/SyncHttpsClient.h>
83 initial_membership_event::initial_membership_event(
cluster &
cluster, std::unordered_set<member> members) : cluster_(
84 cluster), members_(std::move(members)) {
96 ProxyManager::ProxyManager(ClientContext &context) : client_(context) {
99 void ProxyManager::init() {
102 void ProxyManager::destroy() {
103 std::lock_guard<std::recursive_mutex> guard(lock_);
104 for (
auto &p : proxies_) {
106 auto proxy = p.second.get();
107 p.second.get()->on_shutdown();
108 }
catch (std::exception &se) {
109 auto &lg = client_.get_logger();
111 boost::str(boost::format(
"Proxy was not created, "
112 "hence onShutdown can be called. Exception: %1%")
120 boost::future<void> ProxyManager::initialize(
const std::shared_ptr<ClientProxy> &client_proxy) {
121 auto clientMessage = protocol::codec::client_createproxy_encode(client_proxy->get_name(),
122 client_proxy->get_service_name());
123 return spi::impl::ClientInvocation::create(client_, clientMessage,
124 client_proxy->get_service_name())->invoke().then(
125 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
127 client_proxy->on_initialize();
131 boost::future<void> ProxyManager::destroy_proxy(ClientProxy &proxy) {
132 DefaultObjectNamespace objectNamespace(proxy.get_service_name(), proxy.get_name());
133 std::shared_ptr<ClientProxy> registeredProxy;
135 std::lock_guard<std::recursive_mutex> guard(lock_);
136 auto it = proxies_.find(objectNamespace);
137 registeredProxy = it == proxies_.end() ? nullptr : it->second.get();
138 if (it != proxies_.end()) {
144 if (registeredProxy) {
146 proxy.destroy_locally();
147 return proxy.destroy_remotely();
148 }
catch (exception::iexception &) {
149 proxy.destroy_remotely();
153 if (&proxy != registeredProxy.get()) {
158 proxy.destroy_locally();
161 if (&proxy != registeredProxy.get()) {
166 proxy.destroy_locally();
170 return boost::make_ready_future();
173 ClientContext::ClientContext(
const client::hazelcast_client &hazelcast_client) : hazelcast_client_(
174 *hazelcast_client.client_impl_) {
177 ClientContext::ClientContext(client::impl::hazelcast_client_instance_impl &hazelcast_client)
178 : hazelcast_client_(hazelcast_client) {
181 serialization::pimpl::SerializationService &ClientContext::get_serialization_service() {
182 return hazelcast_client_.serialization_service_;
185 impl::ClientClusterServiceImpl & ClientContext::get_client_cluster_service() {
186 return hazelcast_client_.cluster_service_;
189 impl::ClientInvocationServiceImpl &ClientContext::get_invocation_service() {
190 return *hazelcast_client_.invocation_service_;
193 client_config &ClientContext::get_client_config() {
194 return hazelcast_client_.client_config_;
197 impl::ClientPartitionServiceImpl & ClientContext::get_partition_service() {
198 return *hazelcast_client_.partition_service_;
201 lifecycle_service &ClientContext::get_lifecycle_service() {
202 return hazelcast_client_.lifecycle_service_;
205 spi::impl::listener::listener_service_impl &ClientContext::get_client_listener_service() {
206 return *hazelcast_client_.listener_service_;
209 connection::ClientConnectionManagerImpl &ClientContext::get_connection_manager() {
210 return *hazelcast_client_.connection_manager_;
213 internal::nearcache::NearCacheManager &ClientContext::get_near_cache_manager() {
214 return *hazelcast_client_.near_cache_manager_;
217 client_properties &ClientContext::get_client_properties() {
218 return hazelcast_client_.client_properties_;
221 cluster &ClientContext::get_cluster() {
222 return hazelcast_client_.cluster_;
225 std::shared_ptr<impl::sequence::CallIdSequence> &ClientContext::get_call_id_sequence()
const {
226 return hazelcast_client_.call_id_sequence_;
229 const protocol::ClientExceptionFactory &ClientContext::get_client_exception_factory()
const {
230 return hazelcast_client_.get_exception_factory();
233 const std::string &ClientContext::get_name()
const {
234 return hazelcast_client_.get_name();
237 impl::ClientExecutionServiceImpl &ClientContext::get_client_execution_service()
const {
238 return *hazelcast_client_.execution_service_;
241 const std::shared_ptr<client::impl::ClientLockReferenceIdGenerator> &
242 ClientContext::get_lock_reference_id_generator() {
243 return hazelcast_client_.get_lock_reference_id_generator();
246 std::shared_ptr<client::impl::hazelcast_client_instance_impl>
247 ClientContext::get_hazelcast_client_implementation() {
248 return hazelcast_client_.shared_from_this();
251 spi::ProxyManager &ClientContext::get_proxy_manager() {
252 return hazelcast_client_.get_proxy_manager();
255 logger &ClientContext::get_logger() {
256 return *hazelcast_client_.logger_;
259 client::impl::statistics::Statistics &ClientContext::get_clientstatistics() {
260 return *hazelcast_client_.statistics_;
263 spi::impl::listener::cluster_view_listener &ClientContext::get_cluster_view_listener() {
264 return *hazelcast_client_.cluster_listener_;
267 boost::uuids::uuid ClientContext::random_uuid() {
268 return hazelcast_client_.random_uuid();
271 cp::internal::session::proxy_session_manager &ClientContext::get_proxy_session_manager() {
272 return hazelcast_client_.proxy_session_manager_;
275 lifecycle_service::lifecycle_service(ClientContext &client_context,
276 const std::vector<lifecycle_listener> &listeners) :
277 client_context_(client_context), listeners_(),
278 shutdown_completed_latch_(1) {
279 for (
const auto &listener: listeners) {
280 add_listener(lifecycle_listener(listener));
284 bool lifecycle_service::start() {
285 bool expected =
false;
286 if (!active_.compare_exchange_strong(expected,
true)) {
290 fire_lifecycle_event(lifecycle_event::STARTED);
292 client_context_.get_client_execution_service().start();
294 client_context_.get_client_listener_service().start();
296 client_context_.get_invocation_service().start();
298 client_context_.get_client_cluster_service().start();
300 client_context_.get_cluster_view_listener().start();
302 if (!client_context_.get_connection_manager().start()) {
306 auto &connectionStrategyConfig = client_context_.get_client_config().get_connection_strategy_config();
307 if (!connectionStrategyConfig.is_async_start()) {
309 wait_for_initial_membership_event();
310 client_context_.get_connection_manager().connect_to_all_cluster_members();
313 client_context_.get_invocation_service().add_backup_listener();
315 client_context_.get_clientstatistics().start();
320 void lifecycle_service::shutdown() {
321 bool expected =
true;
322 if (!active_.compare_exchange_strong(expected,
false)) {
323 shutdown_completed_latch_.wait();
327 fire_lifecycle_event(lifecycle_event::SHUTTING_DOWN);
328 client_context_.get_proxy_session_manager().shutdown();
329 client_context_.get_clientstatistics().shutdown();
330 client_context_.get_proxy_manager().destroy();
331 client_context_.get_connection_manager().shutdown();
332 client_context_.get_client_cluster_service().shutdown();
333 client_context_.get_invocation_service().shutdown();
334 client_context_.get_client_listener_service().shutdown();
335 client_context_.get_near_cache_manager().destroy_all_near_caches();
336 fire_lifecycle_event(lifecycle_event::SHUTDOWN);
337 client_context_.get_client_execution_service().shutdown();
338 client_context_.get_serialization_service().dispose();
339 shutdown_completed_latch_.count_down();
340 }
catch (std::exception &e) {
341 HZ_LOG(client_context_.get_logger(), info,
342 boost::str(boost::format(
"An exception occured during LifecycleService shutdown. %1%")
345 shutdown_completed_latch_.count_down();
349 boost::uuids::uuid lifecycle_service::add_listener(lifecycle_listener &&lifecycle_listener) {
350 std::lock_guard<std::mutex> lg(listener_lock_);
351 const auto id = uuid_generator_();
352 listeners_.emplace(
id, std::move(lifecycle_listener));
356 bool lifecycle_service::remove_listener(
const boost::uuids::uuid ®istration_id) {
357 std::lock_guard<std::mutex> guard(listener_lock_);
358 return listeners_.erase(registration_id) == 1;
361 void lifecycle_service::fire_lifecycle_event(
const lifecycle_event &lifecycle_event) {
362 std::lock_guard<std::mutex> guard(listener_lock_);
363 logger &lg = client_context_.get_logger();
365 std::function<void(lifecycle_listener &)> fire_one;
367 switch (lifecycle_event.get_state()) {
368 case lifecycle_event::STARTING : {
370 std::string date(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_DATE));
371 util::git_date_to_hazelcast_log_date(date);
372 std::string commitId(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_ID));
373 commitId.erase(std::remove(commitId.begin(), commitId.end(),
'"'), commitId.end());
376 (boost::format(
"(%1%:%2%) LifecycleService::LifecycleEvent Client (%3%) is STARTING") %
377 date % commitId % client_context_.get_connection_manager().get_client_uuid()).str());
379 util::hz_snprintf(msg, 100,
"(%s:%s) LifecycleService::LifecycleEvent STARTING", date.c_str(),
381 HZ_LOG(lg, info, msg);
383 fire_one = [](lifecycle_listener &listener) {
384 listener.starting_();
388 case lifecycle_event::STARTED : {
389 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent STARTED");
391 fire_one = [](lifecycle_listener &listener) {
396 case lifecycle_event::SHUTTING_DOWN : {
397 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent SHUTTING_DOWN");
399 fire_one = [](lifecycle_listener &listener) {
400 listener.shutting_down_();
404 case lifecycle_event::SHUTDOWN : {
405 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent SHUTDOWN");
407 fire_one = [](lifecycle_listener &listener) {
408 listener.shutdown_();
412 case lifecycle_event::CLIENT_CONNECTED : {
413 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent CLIENT_CONNECTED");
415 fire_one = [](lifecycle_listener &listener) {
416 listener.connected_();
420 case lifecycle_event::CLIENT_DISCONNECTED : {
421 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent CLIENT_DISCONNECTED");
423 fire_one = [](lifecycle_listener &listener) {
424 listener.disconnected_();
430 for (
auto &item: listeners_) {
431 fire_one(item.second);
435 bool lifecycle_service::is_running() {
439 lifecycle_service::~lifecycle_service() {
445 void lifecycle_service::wait_for_initial_membership_event()
const {
446 client_context_.get_client_cluster_service().wait_initial_member_list_fetched();
449 DefaultObjectNamespace::DefaultObjectNamespace(
const std::string &service,
const std::string &
object)
450 : service_name_(service), object_name_(object) {
454 const std::string &DefaultObjectNamespace::get_service_name()
const {
455 return service_name_;
458 const std::string &DefaultObjectNamespace::get_object_name()
const {
462 bool DefaultObjectNamespace::operator==(
const DefaultObjectNamespace &rhs)
const {
463 return service_name_ == rhs.service_name_ && object_name_ == rhs.object_name_;
466 ClientProxy::ClientProxy(
const std::string &name,
const std::string &service_name, ClientContext &context)
467 : name_(name), service_name_(service_name), context_(context) {}
469 ClientProxy::~ClientProxy() =
default;
471 const std::string &ClientProxy::get_name()
const {
475 const std::string &ClientProxy::get_service_name()
const {
476 return service_name_;
479 ClientContext &ClientProxy::get_context() {
483 void ClientProxy::on_destroy() {
486 boost::future<void> ClientProxy::destroy() {
487 return get_context().get_proxy_manager().destroy_proxy(*
this);
490 void ClientProxy::destroy_locally() {
495 }
catch (exception::iexception &) {
502 bool ClientProxy::pre_destroy() {
506 void ClientProxy::post_destroy() {
509 void ClientProxy::on_initialize() {
512 void ClientProxy::on_shutdown() {
515 serialization::pimpl::SerializationService &ClientProxy::get_serialization_service() {
516 return context_.get_serialization_service();
519 boost::future<void> ClientProxy::destroy_remotely() {
520 auto clientMessage = protocol::codec::client_destroyproxy_encode(
521 get_name(), get_service_name());
522 return spi::impl::ClientInvocation::create(get_context(), std::make_shared<protocol::ClientMessage>(
523 std::move(clientMessage)), get_name())->invoke().then(
524 boost::launch::sync, [](boost::future<protocol::ClientMessage> f) { f.get(); });
527 boost::future<boost::uuids::uuid>
528 ClientProxy::register_listener(std::shared_ptr<impl::ListenerMessageCodec> listener_message_codec,
529 std::shared_ptr<client::impl::BaseEventHandler> handler) {
530 handler->set_logger(&get_context().get_logger());
531 return get_context().get_client_listener_service().register_listener(listener_message_codec,
535 boost::future<bool> ClientProxy::deregister_listener(boost::uuids::uuid registration_id) {
536 return get_context().get_client_listener_service().deregister_listener(registration_id);
541 ListenerMessageCodec::decode_add_response(protocol::ClientMessage &msg)
const {
542 return msg.get_first_uuid();
545 bool ListenerMessageCodec::decode_remove_response(protocol::ClientMessage &msg)
const {
546 return msg.get_first_fixed_sized_field<
bool>();
549 ClientInvocationServiceImpl::ClientInvocationServiceImpl(ClientContext &client)
550 : client_(client), logger_(client.get_logger()),
551 invocation_timeout_(std::chrono::seconds(client.get_client_properties().get_integer(
552 client.get_client_properties().get_invocation_timeout_seconds()))),
553 invocation_retry_pause_(std::chrono::milliseconds(client.get_client_properties().get_long(
554 client.get_client_properties().get_invocation_retry_pause_millis()))),
555 smart_routing_(client.get_client_config().get_network_config().is_smart_routing()),
556 backup_acks_enabled_(smart_routing_ && client.get_client_config().backup_acks_enabled()),
557 fail_on_indeterminate_operation_state_(client.get_client_properties().get_boolean(client.get_client_properties().fail_on_indeterminate_state())),
558 backup_timeout_(std::chrono::milliseconds(client.get_client_properties().get_integer(client.get_client_properties().backup_timeout_millis()))) {}
560 void ClientInvocationServiceImpl::start() {
563 void ClientInvocationServiceImpl::add_backup_listener() {
564 if (this->backup_acks_enabled_) {
565 auto &listener_service = this->client_.get_client_listener_service();
566 listener_service.register_listener(std::make_shared<BackupListenerMessageCodec>(),
567 std::make_shared<noop_backup_event_handler>()).get();
571 void ClientInvocationServiceImpl::shutdown() {
572 is_shutdown_.store(
true);
575 std::chrono::milliseconds ClientInvocationServiceImpl::get_invocation_timeout()
const {
576 return invocation_timeout_;
579 std::chrono::milliseconds ClientInvocationServiceImpl::get_invocation_retry_pause()
const {
580 return invocation_retry_pause_;
583 bool ClientInvocationServiceImpl::is_redo_operation() {
584 return client_.get_client_config().is_redo_operation();
588 ClientInvocationServiceImpl::handle_client_message(
const std::shared_ptr<ClientInvocation> &invocation,
589 const std::shared_ptr<protocol::ClientMessage> &response) {
591 if (protocol::codec::ErrorCodec::EXCEPTION_MESSAGE_TYPE == response->get_message_type()) {
592 auto error_holder = protocol::codec::ErrorCodec::decode(*response);
593 invocation->notify_exception(
594 client_.get_client_exception_factory().create_exception(error_holder));
596 invocation->notify(response);
598 }
catch (std::exception &e) {
599 HZ_LOG(logger_, severe,
600 boost::str(boost::format(
"Failed to process response for %1%. %2%")
601 % *invocation % e.what())
606 bool ClientInvocationServiceImpl::send(
const std::shared_ptr<impl::ClientInvocation>& invocation,
607 const std::shared_ptr<connection::Connection>& connection) {
609 BOOST_THROW_EXCEPTION(
610 exception::hazelcast_client_not_active(
"ClientInvocationServiceImpl::send",
611 "Client is shut down"));
614 if (backup_acks_enabled_) {
615 invocation->get_client_message()->add_flag(protocol::ClientMessage::BACKUP_AWARE_FLAG);
618 write_to_connection(*connection, invocation);
619 invocation->set_send_connection(connection);
623 void ClientInvocationServiceImpl::write_to_connection(connection::Connection &connection,
624 const std::shared_ptr<ClientInvocation> &client_invocation) {
625 auto clientMessage = client_invocation->get_client_message();
626 connection.write(client_invocation);
629 void ClientInvocationServiceImpl::check_invocation_allowed() {
630 client_.get_connection_manager().check_invocation_allowed();
633 bool ClientInvocationServiceImpl::invoke(std::shared_ptr<ClientInvocation> invocation) {
634 auto connection = client_.get_connection_manager().get_random_connection();
636 HZ_LOG(logger_, finest,
"No connection found to invoke");
639 return send(invocation, connection);
642 DefaultAddressProvider::DefaultAddressProvider(config::client_network_config &network_config)
643 : network_config_(network_config) {}
645 std::vector<address> DefaultAddressProvider::load_addresses() {
646 std::vector<address> addresses = network_config_.get_addresses();
647 if (addresses.empty()) {
648 addresses.emplace_back(
"127.0.0.1", 5701);
656 boost::optional<address> DefaultAddressProvider::translate(
const address &addr) {
660 const boost::shared_ptr<ClientClusterServiceImpl::member_list_snapshot> ClientClusterServiceImpl::EMPTY_SNAPSHOT(
661 new ClientClusterServiceImpl::member_list_snapshot{-1});
663 constexpr boost::chrono::milliseconds ClientClusterServiceImpl::INITIAL_MEMBERS_TIMEOUT;
665 ClientClusterServiceImpl::ClientClusterServiceImpl(hazelcast::client::spi::ClientContext &client)
666 : client_(client), member_list_snapshot_(EMPTY_SNAPSHOT),
667 labels_(client.get_client_config().get_labels()),
668 initial_list_fetched_latch_(1) {
671 boost::uuids::uuid ClientClusterServiceImpl::add_membership_listener_without_init(
672 membership_listener &&listener) {
673 std::lock_guard<std::mutex> g(listeners_lock_);
674 auto id = client_.random_uuid();
675 listeners_.emplace(
id, std::move(listener));
679 boost::optional<member> ClientClusterServiceImpl::get_member(boost::uuids::uuid uuid)
const {
680 assert(!uuid.is_nil());
681 auto members_view_ptr = member_list_snapshot_.load();
682 const auto it = members_view_ptr->members.find(uuid);
683 if (it == members_view_ptr->members.end()) {
689 std::vector<member> ClientClusterServiceImpl::get_member_list()
const {
690 auto members_view_ptr = member_list_snapshot_.load();
691 std::vector<member> result;
692 result.reserve(members_view_ptr->members.size());
693 for (
const auto &e : members_view_ptr->members) {
694 result.emplace_back(e.second);
699 void ClientClusterServiceImpl::start() {
700 for (
auto &listener : client_.get_client_config().get_membership_listeners()) {
701 add_membership_listener(membership_listener(listener));
705 void ClientClusterServiceImpl::fire_initial_membership_event(
const initial_membership_event &event) {
706 std::lock_guard<std::mutex> g(listeners_lock_);
708 for (
auto &item : listeners_) {
709 membership_listener &listener = item.second;
710 if (listener.init_) {
711 listener.init_(event);
716 void ClientClusterServiceImpl::shutdown() {
717 initial_list_fetched_latch_.try_count_down();
721 ClientClusterServiceImpl::add_membership_listener(membership_listener &&listener) {
722 std::lock_guard<std::mutex> cluster_view_g(cluster_view_lock_);
724 auto id = add_membership_listener_without_init(std::move(listener));
726 std::lock_guard<std::mutex> listeners_g(listeners_lock_);
727 auto added_listener = listeners_[id];
729 if (added_listener.init_) {
730 auto &cluster = client_.get_cluster();
731 auto members_ptr = member_list_snapshot_.load();
732 if (!members_ptr->members.empty()) {
733 std::unordered_set<member> members;
734 for (
const auto &e : members_ptr->members) {
735 members.insert(e.second);
737 added_listener.init_(initial_membership_event(cluster, members));
744 bool ClientClusterServiceImpl::remove_membership_listener(boost::uuids::uuid registration_id) {
745 std::lock_guard<std::mutex> g(listeners_lock_);
746 return listeners_.erase(registration_id) == 1;
750 ClientClusterServiceImpl::get_members(
const member_selector &selector)
const {
751 std::vector<member> result;
752 for (
auto &&member : get_member_list()) {
753 if (selector.select(member)) {
754 result.emplace_back(std::move(member));
761 local_endpoint ClientClusterServiceImpl::get_local_client()
const {
762 connection::ClientConnectionManagerImpl &cm = client_.get_connection_manager();
763 auto connection = cm.get_random_connection();
764 auto inetSocketAddress = connection ? connection->get_local_socket_address() : boost::none;
765 auto uuid = cm.get_client_uuid();
766 return local_endpoint(uuid, std::move(inetSocketAddress), client_.get_name(), labels_);
769 void ClientClusterServiceImpl::clear_member_list_version() {
770 std::lock_guard<std::mutex> g(cluster_view_lock_);
771 auto &lg = client_.get_logger();
772 HZ_LOG(lg, finest,
"Resetting the member list version ");
773 auto cluster_view_snapshot = member_list_snapshot_.load();
776 if (cluster_view_snapshot != EMPTY_SNAPSHOT) {
777 member_list_snapshot_.store(boost::shared_ptr<member_list_snapshot>(
778 new member_list_snapshot{0, cluster_view_snapshot->members}));
782 std::vector<membership_event> ClientClusterServiceImpl::clear_member_list_and_return_events() {
783 std::lock_guard<std::mutex> g(cluster_view_lock_);
785 auto &lg = client_.get_logger();
786 HZ_LOG(lg, finest,
"Resetting the member list");
788 auto previous_list = member_list_snapshot_.load()->members;
790 member_list_snapshot_.store(
791 boost::shared_ptr<member_list_snapshot>(
new member_list_snapshot{0}));
793 return detect_membership_events(previous_list,
794 std::unordered_map<boost::uuids::uuid, member, boost::hash<boost::uuids::uuid>>());
797 void ClientClusterServiceImpl::clear_member_list() {
798 auto events = clear_member_list_and_return_events();
799 fire_events(std::move(events));
803 ClientClusterServiceImpl::handle_event(int32_t version,
const std::vector<member> &member_infos) {
804 auto &lg = client_.get_logger();
806 boost::str(boost::format(
"Handling new snapshot with membership version: %1%, "
809 % members_string(create_snapshot(version, member_infos)))
811 auto cluster_view_snapshot = member_list_snapshot_.load();
812 if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
813 std::lock_guard<std::mutex> g(cluster_view_lock_);
814 cluster_view_snapshot = member_list_snapshot_.load();
815 if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
817 apply_initial_state(version, member_infos);
818 initial_list_fetched_latch_.count_down();
823 std::vector<membership_event> events;
824 if (version >= cluster_view_snapshot->version) {
825 std::lock_guard<std::mutex> g(cluster_view_lock_);
826 cluster_view_snapshot = member_list_snapshot_.load();
827 if (version >= cluster_view_snapshot->version) {
828 auto prev_members = cluster_view_snapshot->members;
829 auto snapshot = boost::make_shared<member_list_snapshot>(create_snapshot(version, member_infos));
830 member_list_snapshot_.store(snapshot);
831 events = detect_membership_events(prev_members, snapshot->members);
835 fire_events(std::move(events));
838 ClientClusterServiceImpl::member_list_snapshot
839 ClientClusterServiceImpl::create_snapshot(int32_t version,
const std::vector<member> &members) {
840 member_list_snapshot result;
841 result.version = version;
842 for (
auto &m : members) {
843 result.members.insert({m.get_uuid(), m});
850 ClientClusterServiceImpl::members_string(
const ClientClusterServiceImpl::member_list_snapshot& snapshot) {
851 std::stringstream out;
852 auto const &members = snapshot.members;
853 out << std::endl << std::endl <<
"Members [" << members.size() <<
"] {";
854 for (
auto const &e : members) {
855 out << std::endl <<
"\t" << e.second;
857 out << std::endl <<
"}" << std::endl;
862 ClientClusterServiceImpl::apply_initial_state(int32_t version,
const std::vector<member> &member_infos) {
863 auto snapshot = boost::make_shared<member_list_snapshot>(create_snapshot(version, member_infos));
864 member_list_snapshot_.store(snapshot);
865 HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
866 std::unordered_set<member> members;
867 for(
auto const &e : snapshot->members) {
868 members.insert(e.second);
870 std::lock_guard<std::mutex> g(listeners_lock_);
871 for (
auto &item : listeners_) {
872 membership_listener &listener = item.second;
873 if (listener.init_) {
874 listener.init_(initial_membership_event(client_.get_cluster(), members));
879 std::vector<membership_event> ClientClusterServiceImpl::detect_membership_events(
880 std::unordered_map<boost::uuids::uuid, member, boost::hash<boost::uuids::uuid>> previous_members,
881 const std::unordered_map<boost::uuids::uuid, member, boost::hash<boost::uuids::uuid>>& current_members) {
882 std::vector<member> new_members;
884 for (
auto const & e : current_members) {
885 if (!previous_members.erase(e.first)) {
886 new_members.emplace_back(e.second);
890 std::vector<membership_event> events;
893 for (
auto const &e : previous_members) {
894 events.emplace_back(client_.get_cluster(), e.second, membership_event::membership_event_type::MEMBER_LEFT, current_members);
895 auto connection = client_.get_connection_manager().get_connection(e.second.get_uuid());
897 connection->close(
"", std::make_exception_ptr(exception::target_disconnected(
898 "ClientClusterServiceImpl::detect_membership_events", (boost::format(
899 "The client has closed the connection to this member, after receiving a member left event from the cluster. %1%") %
900 *connection).str())));
903 for (
auto const &member : new_members) {
904 events.emplace_back(client_.get_cluster(), member, membership_event::membership_event_type::MEMBER_JOINED, current_members);
907 if (!events.empty()) {
908 auto snapshot = member_list_snapshot_.load();
909 if (!snapshot->members.empty()) {
910 HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
916 void ClientClusterServiceImpl::fire_events(std::vector<membership_event> events) {
917 std::lock_guard<std::mutex> g(listeners_lock_);
919 for (
auto const &event : events) {
920 for (
auto &item : listeners_) {
921 membership_listener &listener = item.second;
922 if (event.get_event_type() == membership_event::membership_event_type::MEMBER_JOINED) {
923 listener.joined_(event);
925 listener.left_(event);
931 void ClientClusterServiceImpl::wait_initial_member_list_fetched()
const {
933 if ((
const_cast<boost::latch&
>(initial_list_fetched_latch_)).wait_for(INITIAL_MEMBERS_TIMEOUT) == boost::cv_status::timeout) {
934 BOOST_THROW_EXCEPTION(exception::illegal_state(
935 "ClientClusterServiceImpl::wait_initial_member_list_fetched",
936 "Could not get initial member list from cluster!"));
941 ClientInvocationServiceImpl::invoke_on_connection(
const std::shared_ptr<ClientInvocation> &invocation,
942 const std::shared_ptr<connection::Connection> &connection) {
943 return send(invocation, connection);
946 bool ClientInvocationServiceImpl::invoke_on_partition_owner(
947 const std::shared_ptr<ClientInvocation> &invocation,
int partition_id) {
948 auto partition_owner = client_.get_partition_service().get_partition_owner(partition_id);
949 if (partition_owner.is_nil()) {
950 HZ_LOG(logger_, finest,
951 boost::str(boost::format(
"Partition owner is not assigned yet for partition %1%")
956 return invoke_on_target(invocation, partition_owner);
959 bool ClientInvocationServiceImpl::invoke_on_target(
const std::shared_ptr<ClientInvocation> &invocation,
960 boost::uuids::uuid uuid) {
961 assert (!uuid.is_nil());
962 auto connection = client_.get_connection_manager().get_connection(uuid);
964 HZ_LOG(logger_, finest,
965 boost::str(boost::format(
"Client is not connected to target : %1%")
970 return send(invocation, connection);
973 bool ClientInvocationServiceImpl::is_smart_routing()
const {
974 return smart_routing_;
977 const std::chrono::milliseconds &ClientInvocationServiceImpl::get_backup_timeout()
const {
978 return backup_timeout_;
981 bool ClientInvocationServiceImpl::fail_on_indeterminate_state()
const {
982 return fail_on_indeterminate_operation_state_;
985 ClientExecutionServiceImpl::ClientExecutionServiceImpl(
const std::string &name,
986 const client_properties &properties,
988 spi::lifecycle_service &service)
989 : lifecycle_service_(service), client_properties_(properties) {}
991 void ClientExecutionServiceImpl::start() {
992 int internalPoolSize = client_properties_.get_integer(
993 client_properties_.get_internal_executor_pool_size());
994 if (internalPoolSize <= 0) {
995 internalPoolSize = util::IOUtil::to_value<int>(
996 client_properties::INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT);
999 internal_executor_.reset(
new hazelcast::util::hz_thread_pool(internalPoolSize));
1001 user_executor_.reset(
new hazelcast::util::hz_thread_pool());
1004 void ClientExecutionServiceImpl::shutdown() {
1005 shutdown_thread_pool(internal_executor_.get());
1006 shutdown_thread_pool(user_executor_.get());
1009 util::hz_thread_pool &ClientExecutionServiceImpl::get_user_executor() {
1010 return *user_executor_;
1013 void ClientExecutionServiceImpl::shutdown_thread_pool(hazelcast::util::hz_thread_pool *pool) {
1020 constexpr
int ClientInvocation::MAX_FAST_INVOCATION_COUNT;
1021 constexpr
int ClientInvocation::UNASSIGNED_PARTITION;
1023 ClientInvocation::ClientInvocation(spi::ClientContext &client_context,
1024 std::shared_ptr<protocol::ClientMessage> &&message,
1025 const std::string &name,
1027 const std::shared_ptr<connection::Connection> &conn,
1028 boost::uuids::uuid uuid) :
1029 logger_(client_context.get_logger()),
1030 lifecycle_service_(client_context.get_lifecycle_service()),
1031 invocation_service_(client_context.get_invocation_service()),
1032 execution_service_(client_context.get_client_execution_service().shared_from_this()),
1033 call_id_sequence_(client_context.get_call_id_sequence()),
1035 partition_id_(partition),
1036 start_time_(std::chrono::steady_clock::now()),
1037 retry_pause_(invocation_service_.get_invocation_retry_pause()),
1039 connection_(conn), bound_to_single_connection_(conn != nullptr),
1040 invoke_count_(0), urgent_(false), smart_routing_(invocation_service_.is_smart_routing()) {
1041 message->set_partition_id(partition_id_);
1042 client_message_ = boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(message);
1043 set_send_connection(
nullptr);
1046 ClientInvocation::~ClientInvocation() =
default;
1048 boost::future<protocol::ClientMessage> ClientInvocation::invoke() {
1049 assert (client_message_.load());
1051 call_id_sequence_->next();
1052 invoke_on_selection();
1053 if (!lifecycle_service_.is_running()) {
1054 return invocation_promise_.get_future().then([](boost::future<protocol::ClientMessage> f) {
1058 auto id_seq = call_id_sequence_;
1059 return invocation_promise_.get_future().then(execution_service_->get_user_executor(),
1060 [=](boost::future<protocol::ClientMessage> f) {
1066 boost::future<protocol::ClientMessage> ClientInvocation::invoke_urgent() {
1067 assert(client_message_.load());
1070 call_id_sequence_->force_next();
1071 invoke_on_selection();
1072 if (!lifecycle_service_.is_running()) {
1073 return invocation_promise_.get_future().then([](boost::future<protocol::ClientMessage> f) {
1077 auto id_seq = call_id_sequence_;
1078 return invocation_promise_.get_future().then(execution_service_->get_user_executor(),
1079 [=](boost::future<protocol::ClientMessage> f) {
1085 void ClientInvocation::invoke_on_selection() {
1089 invocation_service_.check_invocation_allowed();
1092 if (is_bind_to_single_connection()) {
1093 bool invoked =
false;
1094 auto conn = connection_.lock();
1096 invoked = invocation_service_.invoke_on_connection(shared_from_this(), conn);
1099 std::string message;
1101 message = (boost::format(
"Could not invoke on connection %1%") % *conn).str();
1103 message =
"Could not invoke. Bound to a connection that is deleted already.";
1105 notify_exception(std::make_exception_ptr(
1106 exception::io(
"ClientInvocation::invoke_on_selection", message)));
1111 bool invoked =
false;
1112 if (smart_routing_) {
1113 if (partition_id_ != -1) {
1114 invoked = invocation_service_.invoke_on_partition_owner(shared_from_this(), partition_id_);
1115 }
else if (!uuid_.is_nil()) {
1116 invoked = invocation_service_.invoke_on_target(shared_from_this(), uuid_);
1118 invoked = invocation_service_.invoke(shared_from_this());
1121 invoked = invocation_service_.invoke(shared_from_this());
1124 invoked = invocation_service_.invoke(shared_from_this());
1127 notify_exception(std::make_exception_ptr(exception::io(
"No connection found to invoke")));
1129 }
catch (exception::iexception &) {
1130 notify_exception(std::current_exception());
1131 }
catch (std::exception &) {
1136 bool ClientInvocation::is_bind_to_single_connection()
const {
1137 return bound_to_single_connection_;
1140 void ClientInvocation::run() {
1144 void ClientInvocation::retry() {
1147 client_message_ = boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(copy_message());
1150 invoke_on_selection();
1151 }
catch (exception::iexception &e) {
1152 set_exception(e, boost::current_exception());
1153 }
catch (std::exception &) {
1158 void ClientInvocation::set_exception(
const std::exception &e, boost::exception_ptr exception_ptr) {
1159 invoked_or_exception_set_.store(
true);
1161 auto send_conn = send_connection_.load();
1163 auto connection = send_conn->lock();
1165 auto call_id = client_message_.load()->get()->get_correlation_id();
1166 boost::asio::post(connection->get_socket().get_executor(), [=]() {
1167 connection->deregister_invocation(call_id);
1171 invocation_promise_.set_exception(std::move(exception_ptr));
1172 }
catch (boost::promise_already_satisfied &se) {
1173 if (!event_handler_) {
1174 HZ_LOG(logger_, finest,
1175 boost::str(boost::format(
"Failed to set the exception for invocation. "
1176 "%1%, %2% Exception to be set: %3%")
1177 % se.what() % *
this % e.what())
1183 void ClientInvocation::notify_exception(std::exception_ptr exception) {
1186 std::rethrow_exception(exception);
1187 }
catch (exception::iexception &iex) {
1190 if (!lifecycle_service_.is_running()) {
1192 std::throw_with_nested(boost::enable_current_exception(
1193 exception::hazelcast_client_not_active(iex.get_source(),
1194 "Client is shutting down")));
1195 }
catch (exception::iexception &e) {
1196 set_exception(e, boost::current_exception());
1201 if (!should_retry(iex)) {
1202 set_exception(iex, boost::current_exception());
1206 auto timePassed = std::chrono::steady_clock::now() - start_time_;
1207 if (timePassed > invocation_service_.get_invocation_timeout()) {
1208 HZ_LOG(logger_, finest,
1209 boost::str(boost::format(
"Exception will not be retried because "
1210 "invocation timed out. %1%") % iex.what())
1213 auto now = std::chrono::steady_clock::now();
1215 auto timeoutException = (exception::exception_builder<exception::operation_timeout>(
1216 "ClientInvocation::newoperation_timeout_exception") << *
this
1217 <<
" timed out because exception occurred after client invocation timeout "
1218 << std::chrono::duration_cast<std::chrono::milliseconds>(invocation_service_.get_invocation_timeout()).count()
1219 <<
"msecs. Last exception:" << iex
1220 <<
" Current time :" << util::StringUtil::time_to_string(now) <<
". "
1221 <<
"Start time: " << util::StringUtil::time_to_string(start_time_)
1222 <<
". Total elapsed time: " <<
1223 std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time_).count()
1224 <<
" ms. ").build();
1226 BOOST_THROW_EXCEPTION(timeoutException);
1228 set_exception(timeoutException, boost::current_exception());
1236 }
catch (std::exception &e) {
1237 set_exception(e, boost::current_exception());
1244 void ClientInvocation::erase_invocation()
const {
1245 if (!this->event_handler_) {
1246 auto sent_connection = get_send_connection();
1247 if (sent_connection) {
1248 auto this_invocation = shared_from_this();
1249 boost::asio::post(sent_connection->get_socket().get_executor(), [=] () {
1250 sent_connection->invocations.erase(this_invocation->get_client_message()->get_correlation_id());
1256 bool ClientInvocation::should_retry(exception::iexception &exception) {
1257 auto errorCode = exception.get_error_code();
1258 if (is_bind_to_single_connection() && (errorCode == protocol::IO || errorCode == protocol::TARGET_DISCONNECTED)) {
1262 if (!uuid_.is_nil() && errorCode == protocol::TARGET_NOT_MEMBER) {
1269 if (errorCode == protocol::IO || errorCode == protocol::HAZELCAST_INSTANCE_NOT_ACTIVE || exception.is_retryable()) {
1272 if (errorCode == protocol::TARGET_DISCONNECTED) {
1273 return client_message_.load()->get()->is_retryable() || invocation_service_.is_redo_operation();
1278 std::ostream &operator<<(std::ostream &os,
const ClientInvocation &invocation) {
1279 std::ostringstream target;
1280 if (invocation.is_bind_to_single_connection()) {
1281 auto conn = invocation.connection_.lock();
1283 target <<
"connection " << *conn;
1285 }
else if (invocation.partition_id_ != -1) {
1286 target <<
"partition " << invocation.partition_id_;
1287 }
else if (!invocation.uuid_.is_nil()) {
1288 target <<
"uuid " << boost::to_string(invocation.uuid_);
1292 os <<
"ClientInvocation{" <<
"requestMessage = " << *invocation.client_message_.load()->get()
1293 <<
", objectName = "
1294 << invocation.object_name_ <<
", target = " << target.str() <<
", sendConnection = ";
1295 auto sendConnection = invocation.get_send_connection();
1296 if (sendConnection) {
1297 os << *sendConnection;
1301 os <<
", backup_acks_expected_ = " <<
static_cast<int>(invocation.backup_acks_expected_)
1302 <<
", backup_acks_received = " << invocation.backup_acks_received_;
1304 if (invocation.pending_response_) {
1305 os <<
", pending_response: " << *invocation.pending_response_;
1313 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1314 std::shared_ptr<protocol::ClientMessage> &&client_message,
1315 const std::string &object_name,
1317 return std::shared_ptr<ClientInvocation>(
1318 new ClientInvocation(client_context, std::move(client_message), object_name, partition_id));
1321 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1322 std::shared_ptr<protocol::ClientMessage> &&client_message,
1323 const std::string &object_name,
1324 const std::shared_ptr<connection::Connection> &connection) {
1325 return std::shared_ptr<ClientInvocation>(
1326 new ClientInvocation(client_context, std::move(client_message), object_name, UNASSIGNED_PARTITION,
1331 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1332 std::shared_ptr<protocol::ClientMessage> &&client_message,
1333 const std::string &object_name,
1334 boost::uuids::uuid uuid) {
1335 return std::shared_ptr<ClientInvocation>(
1336 new ClientInvocation(client_context, std::move(client_message), object_name, UNASSIGNED_PARTITION,
1340 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1341 protocol::ClientMessage &client_message,
1342 const std::string &object_name,
1344 return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1345 object_name, partition_id);
1348 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1349 protocol::ClientMessage &client_message,
1350 const std::string &object_name,
1351 const std::shared_ptr<connection::Connection> &connection) {
1352 return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1353 object_name, connection);
1356 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1357 protocol::ClientMessage &client_message,
1358 const std::string &object_name,
1359 boost::uuids::uuid uuid) {
1360 return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1364 std::shared_ptr<connection::Connection> ClientInvocation::get_send_connection()
const {
1365 return send_connection_.load()->lock();
1368 void ClientInvocation::wait_invoked()
const {
1370 while (!invoked_or_exception_set_) {
1371 std::this_thread::sleep_for(retry_pause_);
1376 ClientInvocation::set_send_connection(
const std::shared_ptr<connection::Connection> &conn) {
1377 send_connection_.store(boost::make_shared<std::weak_ptr<connection::Connection>>(conn));
1378 invoked_or_exception_set_.store(
true);
1381 void ClientInvocation::notify(
const std::shared_ptr<protocol::ClientMessage> &msg) {
1383 BOOST_THROW_EXCEPTION(exception::illegal_argument(
"response can't be null"));
1386 int8_t expected_backups = msg->get_number_of_backups();
1390 if (expected_backups > backup_acks_received_) {
1394 pending_response_received_time_ = std::chrono::steady_clock::now();
1396 backup_acks_expected_ = expected_backups;
1400 pending_response_ = msg;
1412 void ClientInvocation::complete(
const std::shared_ptr<protocol::ClientMessage> &msg) {
1415 this->invocation_promise_.set_value(*msg);
1416 }
catch (std::exception &e) {
1417 HZ_LOG(logger_, warning,
1418 boost::str(boost::format(
"Failed to set the response for invocation. "
1419 "Dropping the response. %1%, %2% Response: %3%")
1420 % e.what() % *
this % *msg)
1423 this->erase_invocation();
1426 std::shared_ptr<protocol::ClientMessage> ClientInvocation::get_client_message()
const {
1427 return *client_message_.load();
1430 const std::shared_ptr<EventHandler<protocol::ClientMessage> > &
1431 ClientInvocation::get_event_handler()
const {
1432 return event_handler_;
1435 void ClientInvocation::set_event_handler(
1436 const std::shared_ptr<EventHandler<protocol::ClientMessage> > &handler) {
1437 ClientInvocation::event_handler_ = handler;
1440 void ClientInvocation::execute() {
1441 auto this_invocation = shared_from_this();
1442 auto command = [=]() {
1443 this_invocation->run();
1449 int64_t callId = call_id_sequence_->force_next();
1450 client_message_.load()->get()->set_correlation_id(callId);
1453 call_id_sequence_->complete();
1455 if (invoke_count_ < MAX_FAST_INVOCATION_COUNT) {
1457 execution_service_->execute(command);
1460 int64_t delayMillis = util::min<int64_t>(
static_cast<int64_t
>(1) << (invoke_count_ - MAX_FAST_INVOCATION_COUNT),
1461 std::chrono::duration_cast<std::chrono::milliseconds>(retry_pause_).count());
1462 retry_timer_ = execution_service_->schedule(command, std::chrono::milliseconds(delayMillis));
1466 const std::string ClientInvocation::get_name()
const {
1467 return "ClientInvocation";
1470 std::shared_ptr<protocol::ClientMessage> ClientInvocation::copy_message() {
1471 return std::make_shared<protocol::ClientMessage>(**client_message_.load());
1474 boost::promise<protocol::ClientMessage> &ClientInvocation::get_promise() {
1475 return invocation_promise_;
1478 void ClientInvocation::log_exception(exception::iexception &e) {
1479 HZ_LOG(logger_, finest,
1480 boost::str(boost::format(
"Invocation got an exception %1%, invoke count : %2%, "
1482 % *
this % invoke_count_.load() % e)
1486 void ClientInvocation::notify_backup() {
1487 ++backup_acks_received_;
1489 if (!pending_response_) {
1495 if (backup_acks_expected_ != backup_acks_received_) {
1502 complete_with_pending_response();
1506 ClientInvocation::detect_and_handle_backup_timeout(
const std::chrono::milliseconds &backup_timeout) {
1509 if (backup_acks_expected_ == backup_acks_received_) {
1515 if (!pending_response_) {
1520 if (pending_response_received_time_ + backup_timeout >= std::chrono::steady_clock::now()) {
1524 if (invocation_service_.fail_on_indeterminate_state()) {
1525 auto exception = boost::enable_current_exception((exception::exception_builder<exception::indeterminate_operation_state>(
1526 "ClientInvocation::detect_and_handle_backup_timeout") << *
this
1527 <<
" failed because backup acks missed.").build());
1528 notify_exception(std::make_exception_ptr(exception));
1533 complete_with_pending_response();
1536 void ClientInvocation::complete_with_pending_response() {
1537 complete(pending_response_);
1540 ClientContext &impl::ClientTransactionManagerServiceImpl::get_client()
const {
1544 ClientTransactionManagerServiceImpl::ClientTransactionManagerServiceImpl(ClientContext &client)
1545 : client_(client) {}
1547 std::shared_ptr<connection::Connection> ClientTransactionManagerServiceImpl::connect() {
1548 auto &invocationService = client_.get_invocation_service();
1549 auto startTime = std::chrono::steady_clock::now();
1550 auto invocationTimeout = invocationService.get_invocation_timeout();
1551 client_config &clientConfig = client_.get_client_config();
1552 bool smartRouting = clientConfig.get_network_config().is_smart_routing();
1554 while (client_.get_lifecycle_service().is_running()) {
1556 auto connection = client_.get_connection_manager().get_random_connection();
1558 throw_exception(smartRouting);
1561 }
catch (exception::hazelcast_client_offline &) {
1563 }
catch (exception::iexception &) {
1564 if (std::chrono::steady_clock::now() - startTime > invocationTimeout) {
1565 std::rethrow_exception(
1566 new_operation_timeout_exception(std::current_exception(), invocationTimeout,
1570 std::this_thread::sleep_for(invocationService.get_invocation_retry_pause());
1572 BOOST_THROW_EXCEPTION(
1573 exception::hazelcast_client_not_active(
"ClientTransactionManagerServiceImpl::connect",
1574 "Client is shutdown"));
1578 ClientTransactionManagerServiceImpl::new_operation_timeout_exception(std::exception_ptr cause,
1579 std::chrono::milliseconds invocation_timeout,
1580 std::chrono::steady_clock::time_point start_time) {
1581 std::ostringstream sb;
1582 auto now = std::chrono::steady_clock::now();
1584 <<
"Creating transaction context timed out because exception occurred after client invocation timeout "
1585 << std::chrono::duration_cast<std::chrono::milliseconds>(invocation_timeout).count() <<
" ms. " <<
"Current time: "
1586 << util::StringUtil::time_to_string(std::chrono::steady_clock::now()) <<
". " <<
"Start time: "
1587 << util::StringUtil::time_to_string(start_time) <<
". Total elapsed time: "
1588 << std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time).count() <<
" ms. ";
1590 std::rethrow_exception(cause);
1593 std::throw_with_nested(boost::enable_current_exception(exception::operation_timeout(
1594 "ClientTransactionManagerServiceImpl::newoperation_timeout_exception", sb.str())));
1596 return std::current_exception();
1602 void ClientTransactionManagerServiceImpl::throw_exception(
bool smart_routing) {
1603 auto &client_config = client_.get_client_config();
1604 auto &connection_strategy_Config = client_config.get_connection_strategy_config();
1605 auto reconnect_mode = connection_strategy_Config.get_reconnect_mode();
1606 if (reconnect_mode == config::client_connection_strategy_config::reconnect_mode::ASYNC) {
1607 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
1608 "ClientTransactionManagerServiceImpl::throw_exception",
""));
1610 if (smart_routing) {
1611 auto members = client_.get_cluster().get_members();
1612 std::ostringstream msg;
1613 if (members.empty()) {
1614 msg <<
"No address was return by the LoadBalancer since there are no members in the cluster";
1616 msg <<
"No address was return by the LoadBalancer. "
1617 "But the cluster contains the following members:{\n";
1618 for (
auto const &m : members) {
1619 msg <<
'\t' << m <<
'\n';
1623 BOOST_THROW_EXCEPTION(exception::illegal_state(
1624 "ClientTransactionManagerServiceImpl::throw_exception", msg.str()));
1626 BOOST_THROW_EXCEPTION(exception::illegal_state(
1627 "ClientTransactionManagerServiceImpl::throw_exception",
1628 "No active connection is found"));
1631 ClientPartitionServiceImpl::ClientPartitionServiceImpl(ClientContext &client)
1632 : client_(client), logger_(client.get_logger()), partition_count_(0),
1633 partition_table_(boost::shared_ptr<partition_table>(new partition_table{0, -1})) {
1636 void ClientPartitionServiceImpl::handle_event(int32_t connection_id, int32_t version,
1637 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
1638 HZ_LOG(logger_, finest,
1639 boost::str(boost::format(
"Handling new partition table with partitionStateVersion: %1%") % version)
1643 auto current = partition_table_.load();
1644 if (!should_be_applied(connection_id, version, partitions, *current)) {
1647 if (partition_table_.compare_exchange_strong(current, boost::shared_ptr<partition_table>(
1648 new partition_table{connection_id, version, convert_to_map(partitions)}))) {
1649 HZ_LOG(logger_, finest,
1650 boost::str(boost::format(
"Applied partition table with partitionStateVersion : %1%") % version)
1658 boost::uuids::uuid ClientPartitionServiceImpl::get_partition_owner(int32_t partition_id) {
1659 auto table_ptr = partition_table_.load();
1660 auto it = table_ptr->partitions.find(partition_id);
1661 if (it != table_ptr->partitions.end()) {
1664 return boost::uuids::nil_uuid();
1667 int32_t ClientPartitionServiceImpl::get_partition_id(
const serialization::pimpl::data &key) {
1668 int32_t pc = get_partition_count();
1672 int hash = key.get_partition_hash();
1673 return util::HashUtil::hash_to_index(hash, pc);
1676 int32_t ClientPartitionServiceImpl::get_partition_count() {
1677 return partition_count_.load();
1680 std::shared_ptr<client::impl::Partition> ClientPartitionServiceImpl::get_partition(
int partition_id) {
1681 return std::shared_ptr<client::impl::Partition>(
new PartitionImpl(partition_id, client_, *
this));
1684 bool ClientPartitionServiceImpl::check_and_set_partition_count(int32_t new_partition_count) {
1685 int32_t expected = 0;
1686 if (partition_count_.compare_exchange_strong(expected, new_partition_count)) {
1689 return partition_count_.load() == new_partition_count;
1693 ClientPartitionServiceImpl::should_be_applied(int32_t connection_id, int32_t version,
1694 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions,
1695 const partition_table ¤t) {
1696 auto &lg = client_.get_logger();
1697 if (partitions.empty()) {
1698 if (logger_.enabled(logger::level::finest)) {
1699 log_failure(connection_id, version, current,
"response is empty");
1703 if (!current.connection_id || connection_id != current.connection_id) {
1705 ([¤t, connection_id](){
1706 auto frmt = boost::format(
"Event coming from a new connection. Old connection id: %1%, "
1707 "new connection %2%");
1709 if (current.connection_id) {
1710 frmt = frmt % current.connection_id;
1712 frmt = frmt %
"none";
1715 return boost::str(frmt % connection_id);
1721 if (version <= current.version) {
1722 if (lg.enabled(logger::level::finest)) {
1723 log_failure(connection_id, version, current,
"response state version is old");
1730 void ClientPartitionServiceImpl::log_failure(int32_t connection_id, int32_t version,
1731 const ClientPartitionServiceImpl::partition_table ¤t,
1732 const std::string &cause) {
1733 HZ_LOG(logger_, finest,
1735 auto frmt = boost::format(
" We will not apply the response, since %1% ."
1736 " Response is from connection with id %2%. "
1737 "Current connection id is %3%, response state version:%4%. "
1738 "Current state version: %5%");
1739 if (current.connection_id) {
1740 return boost::str(frmt % cause % connection_id % current.connection_id % version %
1744 return boost::str(frmt % cause % connection_id %
"nullptr" % version % current.version);
1750 void ClientPartitionServiceImpl::reset() {
1751 partition_table_.store(
nullptr);
1754 std::unordered_map<int32_t, boost::uuids::uuid> ClientPartitionServiceImpl::convert_to_map(
1755 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
1756 std::unordered_map<int32_t, boost::uuids::uuid> new_partitions;
1757 for (
auto const &e : partitions) {
1758 for (
auto pid: e.second) {
1759 new_partitions.insert({pid, e.first});
1762 return new_partitions;
1765 int ClientPartitionServiceImpl::PartitionImpl::get_partition_id()
const {
1766 return partition_id_;
1769 boost::optional<member> ClientPartitionServiceImpl::PartitionImpl::get_owner()
const {
1770 auto owner = partition_service_.get_partition_owner(partition_id_);
1771 if (!owner.is_nil()) {
1772 return client_.get_client_cluster_service().get_member(owner);
1777 ClientPartitionServiceImpl::PartitionImpl::PartitionImpl(
int partition_id, ClientContext &client,
1778 ClientPartitionServiceImpl &partition_service)
1779 : partition_id_(partition_id), client_(client), partition_service_(partition_service) {
1782 namespace sequence {
1783 CallIdSequenceWithoutBackpressure::CallIdSequenceWithoutBackpressure() : head_(0) {}
1785 CallIdSequenceWithoutBackpressure::~CallIdSequenceWithoutBackpressure() =
default;
1787 int32_t CallIdSequenceWithoutBackpressure::get_max_concurrent_invocations()
const {
1791 int64_t CallIdSequenceWithoutBackpressure::next() {
1792 return force_next();
1795 int64_t CallIdSequenceWithoutBackpressure::force_next() {
1799 void CallIdSequenceWithoutBackpressure::complete() {
1803 int64_t CallIdSequenceWithoutBackpressure::get_last_call_id() {
1808 AbstractCallIdSequence::AbstractCallIdSequence(int32_t max_concurrent_invocations) {
1809 std::ostringstream out;
1810 out <<
"maxConcurrentInvocations should be a positive number. maxConcurrentInvocations="
1811 << max_concurrent_invocations;
1812 this->max_concurrent_invocations_ = util::Preconditions::check_positive(max_concurrent_invocations,
1815 for (
size_t i = 0; i < longs_.size(); ++i) {
1820 AbstractCallIdSequence::~AbstractCallIdSequence() =
default;
1822 int32_t AbstractCallIdSequence::get_max_concurrent_invocations()
const {
1823 return max_concurrent_invocations_;
1826 int64_t AbstractCallIdSequence::next() {
1828 handle_no_space_left();
1830 return force_next();
1833 int64_t AbstractCallIdSequence::force_next() {
1834 return ++longs_[INDEX_HEAD];
1837 void AbstractCallIdSequence::complete() {
1838 ++longs_[INDEX_TAIL];
1839 assert(longs_[INDEX_TAIL] <= longs_[INDEX_HEAD]);
1842 int64_t AbstractCallIdSequence::get_last_call_id() {
1843 return longs_[INDEX_HEAD];
1846 bool AbstractCallIdSequence::has_space() {
1847 return longs_[INDEX_HEAD] - longs_[INDEX_TAIL] < max_concurrent_invocations_;
1850 int64_t AbstractCallIdSequence::get_tail() {
1851 return longs_[INDEX_TAIL];
1854 const std::unique_ptr<util::concurrent::IdleStrategy> CallIdSequenceWithBackpressure::IDLER(
1855 new util::concurrent::BackoffIdleStrategy(
1856 0, 0, std::chrono::duration_cast<std::chrono::nanoseconds>(
1857 std::chrono::microseconds(1000)).count(),
1858 std::chrono::duration_cast<std::chrono::nanoseconds>(
1859 std::chrono::microseconds(MAX_DELAY_MS * 1000)).count()));
1861 CallIdSequenceWithBackpressure::CallIdSequenceWithBackpressure(int32_t max_concurrent_invocations,
1862 int64_t backoff_timeout_ms)
1863 : AbstractCallIdSequence(max_concurrent_invocations) {
1864 std::ostringstream out;
1865 out <<
"backoffTimeoutMs should be a positive number. backoffTimeoutMs=" << backoff_timeout_ms;
1866 util::Preconditions::check_positive(backoff_timeout_ms, out.str());
1868 backoff_timeout_nanos_ = std::chrono::duration_cast<std::chrono::nanoseconds>(
1869 std::chrono::milliseconds(backoff_timeout_ms)).count();
1872 void CallIdSequenceWithBackpressure::handle_no_space_left() {
1873 auto start = std::chrono::steady_clock::now();
1874 for (int64_t idleCount = 0;; idleCount++) {
1875 int64_t elapsedNanos = std::chrono::duration_cast<std::chrono::nanoseconds>(
1876 std::chrono::steady_clock::now() - start).count();
1877 if (elapsedNanos > backoff_timeout_nanos_) {
1878 throw (exception::exception_builder<exception::hazelcast_overload>(
1879 "CallIdSequenceWithBackpressure::handleNoSpaceLeft")
1880 <<
"Timed out trying to acquire another call ID."
1881 <<
" maxConcurrentInvocations = " << get_max_concurrent_invocations()
1882 <<
", backoffTimeout = "
1883 << std::chrono::microseconds(backoff_timeout_nanos_ / 1000).count()
1884 <<
" msecs, elapsed:"
1885 << std::chrono::microseconds(elapsedNanos / 1000).count() <<
" msecs").build();
1887 IDLER->idle(idleCount);
1895 FailFastCallIdSequence::FailFastCallIdSequence(int32_t max_concurrent_invocations)
1896 : AbstractCallIdSequence(max_concurrent_invocations) {}
1898 void FailFastCallIdSequence::handle_no_space_left() {
1899 throw (exception::exception_builder<exception::hazelcast_overload>(
1900 "FailFastCallIdSequence::handleNoSpaceLeft")
1901 <<
"Maximum invocation count is reached. maxConcurrentInvocations = "
1902 << get_max_concurrent_invocations()).build();
1906 std::unique_ptr<CallIdSequence> CallIdFactory::new_call_id_sequence(
bool is_back_pressure_enabled,
1907 int32_t max_allowed_concurrent_invocations,
1908 int64_t backoff_timeout_ms) {
1909 if (!is_back_pressure_enabled) {
1910 return std::unique_ptr<CallIdSequence>(
new CallIdSequenceWithoutBackpressure());
1911 }
else if (backoff_timeout_ms <= 0) {
1912 return std::unique_ptr<CallIdSequence>(
1913 new FailFastCallIdSequence(max_allowed_concurrent_invocations));
1915 return std::unique_ptr<CallIdSequence>(
1916 new CallIdSequenceWithBackpressure(max_allowed_concurrent_invocations,
1917 backoff_timeout_ms));
1922 namespace listener {
1923 listener_service_impl::listener_service_impl(ClientContext &client_context,
1924 int32_t event_thread_count)
1925 : client_context_(client_context),
1926 serialization_service_(client_context.get_serialization_service()),
1927 logger_(client_context.get_logger()),
1928 client_connection_manager_(client_context.get_connection_manager()),
1929 number_of_event_threads_(event_thread_count),
1930 smart_(client_context.get_client_config().get_network_config().is_smart_routing()) {
1931 auto &invocationService = client_context.get_invocation_service();
1932 invocation_timeout_ = invocationService.get_invocation_timeout();
1933 invocation_retry_pause_ = invocationService.get_invocation_retry_pause();
1936 bool listener_service_impl::registers_local_only()
const {
1940 boost::future<boost::uuids::uuid>
1941 listener_service_impl::register_listener(
1942 std::shared_ptr<ListenerMessageCodec> listener_message_codec,
1943 std::shared_ptr<client::impl::BaseEventHandler> handler) {
1944 auto task = boost::packaged_task<boost::uuids::uuid()>([=]() {
1945 return register_listener_internal(listener_message_codec, handler);
1947 auto f = task.get_future();
1948 boost::asio::post(registration_executor_->get_executor(), std::move(task));
1952 boost::future<bool> listener_service_impl::deregister_listener(boost::uuids::uuid registration_id) {
1953 util::Preconditions::check_not_nill(registration_id,
"Nil userRegistrationId is not allowed!");
1955 boost::packaged_task<bool()> task([=]() {
1956 return deregister_listener_internal(registration_id);
1958 auto f = task.get_future();
1959 boost::asio::post(registration_executor_->get_executor(), std::move(task));
1963 void listener_service_impl::connection_added(
1964 const std::shared_ptr<connection::Connection> connection) {
1965 boost::asio::post(registration_executor_->get_executor(), [=]() { connection_added_internal(connection); });
1968 void listener_service_impl::connection_removed(
1969 const std::shared_ptr<connection::Connection> connection) {
1970 boost::asio::post(registration_executor_->get_executor(), [=]() { connection_removed_internal(connection); });
1974 listener_service_impl::remove_event_handler(int64_t call_id,
1975 const std::shared_ptr<connection::Connection> &connection) {
1976 boost::asio::post(connection->get_socket().get_executor(),
1977 std::packaged_task<
void()>([=]() {
1978 connection->deregister_invocation(call_id);
1982 void listener_service_impl::handle_client_message(
1983 const std::shared_ptr<ClientInvocation> invocation,
1984 const std::shared_ptr<protocol::ClientMessage> response) {
1986 auto partitionId = response->get_partition_id();
1987 if (partitionId == -1) {
1989 boost::asio::post(event_executor_->get_executor(), [=]() { process_event_message(invocation, response); });
1994 boost::asio::post(event_strands_[partitionId % event_strands_.size()],
1995 [=]() { process_event_message(invocation, response); });
1997 }
catch (
const std::exception &e) {
1998 if (client_context_.get_lifecycle_service().is_running()) {
1999 HZ_LOG(logger_, warning,
2000 boost::str(boost::format(
"Delivery of event message to event handler failed. %1%, %2%, %3%")
2001 % e.what() % *response % *invocation)
2007 void listener_service_impl::shutdown() {
2008 ClientExecutionServiceImpl::shutdown_thread_pool(event_executor_.get());
2009 ClientExecutionServiceImpl::shutdown_thread_pool(registration_executor_.get());
2012 void listener_service_impl::start() {
2013 event_executor_.reset(
new hazelcast::util::hz_thread_pool(number_of_event_threads_));
2014 registration_executor_.reset(
new hazelcast::util::hz_thread_pool(1));
2016 for (
int i = 0; i < number_of_event_threads_; ++i) {
2017 event_strands_.emplace_back(event_executor_->get_executor());
2020 client_connection_manager_.add_connection_listener(shared_from_this());
2023 boost::uuids::uuid listener_service_impl::register_listener_internal(
2024 std::shared_ptr<ListenerMessageCodec> listener_message_codec,
2025 std::shared_ptr<client::impl::BaseEventHandler> handler) {
2026 auto user_registration_id = client_context_.random_uuid();
2028 std::shared_ptr<listener_registration> registration(
new listener_registration{listener_message_codec, handler});
2029 registrations_.put(user_registration_id, registration);
2030 for (
auto const &connection : client_connection_manager_.get_active_connections()) {
2032 invoke(registration, connection);
2033 }
catch (exception::iexception &e) {
2034 if (connection->is_alive()) {
2035 deregister_listener_internal(user_registration_id);
2036 BOOST_THROW_EXCEPTION((exception::exception_builder<exception::hazelcast_>(
2037 "ClientListenerService::RegisterListenerTask::call")
2038 <<
"Listener can not be added " << e).build());
2042 return user_registration_id;
2046 listener_service_impl::deregister_listener_internal(boost::uuids::uuid user_registration_id) {
2047 auto listenerRegistration = registrations_.get(user_registration_id);
2048 if (!listenerRegistration) {
2051 bool successful =
true;
2053 auto listener_registrations = listenerRegistration->registrations.entry_set();
2054 for (
auto it = listener_registrations.begin();it != listener_registrations.end();) {
2055 const auto ®istration = it->second;
2056 const auto& subscriber = it->first;
2058 const auto &listenerMessageCodec = listenerRegistration->codec;
2059 auto serverRegistrationId = registration->server_registration_id;
2060 auto request = listenerMessageCodec->encode_remove_request(serverRegistrationId);
2061 auto invocation = ClientInvocation::create(client_context_,request,
"",
2063 invocation->invoke().get();
2065 remove_event_handler(registration->call_id, subscriber);
2067 it = listener_registrations.erase(it);
2068 }
catch (exception::iexception &e) {
2070 if (subscriber->is_alive()) {
2072 std::ostringstream endpoint;
2073 if (subscriber->get_remote_address()) {
2074 endpoint << *subscriber->get_remote_address();
2078 HZ_LOG(logger_, warning,
2079 boost::str(boost::format(
"ClientListenerService::deregisterListenerInternal "
2080 "Deregistration of listener with ID %1% "
2081 "has failed to address %2% %3%")
2082 % user_registration_id
2083 % subscriber->get_remote_address() % e)
2089 registrations_.remove(user_registration_id);
2094 void listener_service_impl::connection_added_internal(
2095 const std::shared_ptr<connection::Connection> &connection) {
2096 for (
const auto &listener_registration : registrations_.values()) {
2097 invoke_from_internal_thread(listener_registration, connection);
2101 void listener_service_impl::connection_removed_internal(
2102 const std::shared_ptr<connection::Connection> &connection) {
2103 for (
auto ®istry : registrations_.values()) {
2104 registry->registrations.remove(connection);
2109 listener_service_impl::invoke_from_internal_thread(
2110 const std::shared_ptr<listener_registration> &listener_registration,
2111 const std::shared_ptr<connection::Connection> &connection) {
2113 invoke(listener_registration, connection);
2114 }
catch (exception::iexception &e) {
2115 HZ_LOG(logger_, warning,
2116 boost::str(boost::format(
"Listener with pointer %1% can not be added to "
2117 "a new connection: %2%, reason: %3%")
2118 % listener_registration.get() % *connection % e)
2124 listener_service_impl::invoke(
const std::shared_ptr<listener_registration> &listener_registration,
2125 const std::shared_ptr<connection::Connection> &connection) {
2126 if (listener_registration->registrations.contains_key(connection)) {
2130 const auto &codec = listener_registration->codec;
2131 auto request = codec->encode_add_request(registers_local_only());
2132 const auto &handler = listener_registration->handler;
2133 handler->before_listener_register();
2135 auto invocation = ClientInvocation::create(client_context_,
2136 std::make_shared<protocol::ClientMessage>(std::move(request)),
"",
2138 invocation->set_event_handler(handler);
2139 auto clientMessage = invocation->invoke_urgent().get();
2141 auto serverRegistrationId = codec->decode_add_response(clientMessage);
2142 handler->on_listener_register();
2143 int64_t correlationId = invocation->get_client_message()->get_correlation_id();
2145 (*listener_registration).registrations.put(connection, std::shared_ptr<connection_registration>(
2146 new connection_registration{serverRegistrationId, correlationId}));
2149 void listener_service_impl::process_event_message(
2150 const std::shared_ptr<ClientInvocation> invocation,
2151 const std::shared_ptr<protocol::ClientMessage> response) {
2152 auto eventHandler = invocation->get_event_handler();
2153 if (!eventHandler) {
2154 if (client_context_.get_lifecycle_service().is_running()) {
2155 HZ_LOG(logger_, warning,
2156 boost::str(boost::format(
"No eventHandler for invocation. "
2157 "Ignoring this invocation response. %1%")
2166 eventHandler->handle(*response);
2167 }
catch (std::exception &e) {
2168 if (client_context_.get_lifecycle_service().is_running()) {
2169 HZ_LOG(logger_, warning,
2170 boost::str(boost::format(
"Delivery of event message to event handler failed. %1%, %2%, %3%")
2171 % e.what() % *response % *invocation)
2177 listener_service_impl::~listener_service_impl() =
default;
2179 void cluster_view_listener::start() {
2180 client_context_.get_connection_manager().add_connection_listener(shared_from_this());
2183 void cluster_view_listener::connection_added(
const std::shared_ptr<connection::Connection> connection) {
2184 try_register(connection);
2187 void cluster_view_listener::connection_removed(
const std::shared_ptr<connection::Connection> connection) {
2188 try_reregister_to_random_connection(connection->get_connection_id());
2191 cluster_view_listener::cluster_view_listener(ClientContext &client_context) : client_context_(
2194 void cluster_view_listener::try_register(std::shared_ptr<connection::Connection> connection) {
2195 int32_t expected_id = -1;
2196 if (!listener_added_connection_id_.compare_exchange_strong(expected_id,
2197 connection->get_connection_id())) {
2202 auto invocation = ClientInvocation::create(client_context_,
2203 std::make_shared<protocol::ClientMessage>(
2204 protocol::codec::client_addclusterviewlistener_encode()),
"", connection);
2206 auto handler = std::make_shared<event_handler>(connection->get_connection_id(), *
this);
2207 invocation->set_event_handler(handler);
2208 handler->before_listener_register();
2210 std::weak_ptr<cluster_view_listener> weak_self = shared_from_this();
2211 auto conn_id = connection->get_connection_id();
2213 invocation->invoke_urgent().then(
2214 [weak_self, handler, conn_id](boost::future<protocol::ClientMessage> f) {
2215 auto self = weak_self.lock();
2219 if (f.has_value()) {
2220 handler->on_listener_register();
2225 self->try_reregister_to_random_connection(conn_id);
2230 void cluster_view_listener::try_reregister_to_random_connection(int32_t old_connection_id) {
2231 if (!listener_added_connection_id_.compare_exchange_strong(old_connection_id, -1)) {
2235 auto new_connection = client_context_.get_connection_manager().get_random_connection();
2236 if (new_connection) {
2237 try_register(new_connection);
2241 cluster_view_listener::~cluster_view_listener() =
default;
2244 cluster_view_listener::event_handler::handle_membersview(int32_t version,
2245 const std::vector<member> &member_infos) {
2246 view_listener.client_context_.get_client_cluster_service().handle_event(version, member_infos);
2250 cluster_view_listener::event_handler::handle_partitionsview(int32_t version,
2251 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
2252 view_listener.client_context_.get_partition_service().handle_event(connection_id, version, partitions);
2255 void cluster_view_listener::event_handler::before_listener_register() {
2256 view_listener.client_context_.get_client_cluster_service().clear_member_list_version();
2257 auto &lg = view_listener.client_context_.get_logger();
2259 boost::str(boost::format(
2260 "Register attempt of cluster_view_listener::event_handler to connection with id %1%") %
2264 void cluster_view_listener::event_handler::on_listener_register() {
2265 auto &lg = view_listener.client_context_.get_logger();
2267 boost::str(boost::format(
2268 "Registered cluster_view_listener::event_handler to connection with id %1%") %
2272 cluster_view_listener::event_handler::event_handler(
int connectionId,
2273 cluster_view_listener &viewListener)
2274 : connection_id(connectionId), view_listener(viewListener) {}
2277 protocol::ClientMessage
2278 ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_add_request(
bool local_only)
const {
2279 return protocol::codec::client_localbackuplistener_encode();
2282 protocol::ClientMessage ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_remove_request(
2283 boost::uuids::uuid real_registration_id)
const {
2285 return protocol::ClientMessage(0);
2288 void ClientInvocationServiceImpl::noop_backup_event_handler::handle_backup(
2289 int64_t source_invocation_correlation_id) {
2293 namespace discovery {
2294 remote_address_provider::remote_address_provider(
2295 std::function<std::unordered_map<address, address>()> addr_map_method,
2296 bool use_public) : refresh_address_map_(std::move(addr_map_method)),
2297 use_public_(use_public) {}
2299 std::vector<address> remote_address_provider::load_addresses() {
2300 auto address_map = refresh_address_map_();
2301 std::lock_guard<std::mutex> guard(lock_);
2302 private_to_public_ = address_map;
2303 std::vector<address> addresses;
2304 addresses.reserve(address_map.size());
2305 for (
const auto &addr_pair : address_map) {
2306 addresses.push_back(addr_pair.first);
2311 boost::optional<address> remote_address_provider::translate(
const address &addr) {
2318 std::lock_guard<std::mutex> guard(lock_);
2319 auto found = private_to_public_.find(addr);
2320 if (found != private_to_public_.end()) {
2321 return found->second;
2325 auto address_map = refresh_address_map_();
2327 std::lock_guard<std::mutex> guard(lock_);
2328 private_to_public_ = address_map;
2330 auto found = private_to_public_.find(addr);
2331 if (found != private_to_public_.end()) {
2332 return found->second;
2338 #ifdef HZ_BUILD_WITH_SSL
2339 cloud_discovery::cloud_discovery(config::cloud_config &config, std::string cloud_base_url,
2340 std::chrono::steady_clock::duration timeout)
2341 : cloud_config_(config), cloud_base_url_(cloud_base_url), timeout_(timeout) {}
2343 cloud_discovery::cloud_discovery(config::cloud_config &config, std::string cloud_base_url,
2344 std::chrono::steady_clock::duration timeout) {}
2347 std::unordered_map<address, address> cloud_discovery::get_addresses() {
2348 #ifdef HZ_BUILD_WITH_SSL
2350 util::SyncHttpsClient httpsConnection(cloud_base_url_, std::string(CLOUD_URL_PATH) +
2351 cloud_config_.discovery_token, timeout_);
2352 auto &conn_stream = httpsConnection.connect_and_get_response();
2353 return parse_json_response(conn_stream);
2354 }
catch (std::exception &e) {
2355 std::throw_with_nested(boost::enable_current_exception(
2356 exception::illegal_state(
"cloud_discovery::get_addresses",
2360 util::Preconditions::check_ssl(
"cloud_discovery::get_addresses");
2361 return std::unordered_map<address, address>();
2365 std::unordered_map<address, address>
2366 cloud_discovery::parse_json_response(std::istream &conn_stream) {
2367 namespace pt = boost::property_tree;
2370 pt::read_json(conn_stream, root);
2372 std::unordered_map<address, address> addresses;
2373 for (
const auto &item : root) {
2374 auto private_address = item.second.get<std::string>(PRIVATE_ADDRESS_PROPERTY);
2375 auto public_address = item.second.get<std::string>(PUBLIC_ADDRESS_PROPERTY);
2377 address public_addr = create_address(public_address, -1);
2379 auto private_addr = create_address(private_address, public_addr.get_port());
2380 addresses.emplace(std::move(private_addr), std::move(public_addr));
2386 address cloud_discovery::create_address(
const std::string &hostname,
int default_port) {
2387 auto address_holder = util::AddressUtil::get_address_holder(hostname, default_port);
2388 auto scoped_hostname = util::AddressHelper::get_scoped_hostname(address_holder);
2389 return address(std::move(scoped_hostname), address_holder.get_port());
2398 bool less<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
2399 const hazelcast::client::spi::DefaultObjectNamespace &lhs,
2400 const hazelcast::client::spi::DefaultObjectNamespace &rhs)
const {
2401 int result = lhs.get_service_name().compare(rhs.get_service_name());
2410 return lhs.get_object_name().compare(rhs.get_object_name()) < 0;
2414 hash<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
2415 const hazelcast::client::spi::DefaultObjectNamespace &k)
const noexcept {
2416 return std::hash<std::string>()(k.get_service_name() + k.get_object_name());
Hazelcast cluster interface.
const std::unordered_set< member > & get_members() const
Returns an immutable set of ordered members at the moment this MembershipListener is registered.
cluster & get_cluster()
Returns the cluster of the event.
lifecycle_state get_state() const
lifecycle_state
State enum.
lifecycle_event(lifecycle_state state)
Constructor.