34 #include <boost/uuid/uuid_hash.hpp>
35 #include <boost/functional/hash.hpp>
36 #include <boost/property_tree/ptree.hpp>
37 #include <boost/property_tree/json_parser.hpp>
39 #include "hazelcast/client/hazelcast_client.h"
40 #include <hazelcast/client/protocol/codec/ErrorCodec.h>
41 #include <hazelcast/client/spi/impl/ListenerMessageCodec.h>
42 #include <hazelcast/client/spi/impl/ClientClusterServiceImpl.h>
43 #include <hazelcast/client/spi/impl/listener/cluster_view_listener.h>
44 #include <hazelcast/client/spi/impl/listener/listener_service_impl.h>
45 #include <hazelcast/client/spi/impl/discovery/remote_address_provider.h>
46 #include <hazelcast/client/spi/impl/discovery/cloud_discovery.h>
47 #include <hazelcast/util/AddressUtil.h>
48 #include "hazelcast/client/member_selectors.h"
49 #include "hazelcast/client/lifecycle_event.h"
50 #include "hazelcast/client/initial_membership_event.h"
51 #include "hazelcast/client/membership_event.h"
52 #include "hazelcast/client/lifecycle_listener.h"
53 #include "hazelcast/client/spi/ProxyManager.h"
54 #include "hazelcast/client/spi/ClientProxy.h"
55 #include "hazelcast/client/spi/ClientContext.h"
56 #include "hazelcast/client/spi/impl/ClientInvocation.h"
57 #include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
58 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
59 #include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
60 #include "hazelcast/client/spi/impl/DefaultAddressProvider.h"
61 #include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithBackpressure.h"
62 #include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithoutBackpressure.h"
63 #include "hazelcast/client/spi/impl/sequence/FailFastCallIdSequence.h"
64 #include "hazelcast/client/spi/impl/sequence/CallIdFactory.h"
65 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
66 #include "hazelcast/util/AddressHelper.h"
67 #include "hazelcast/util/HashUtil.h"
68 #include "hazelcast/util/concurrent/BackoffIdleStrategy.h"
69 #ifdef HZ_BUILD_WITH_SSL
70 #include <hazelcast/util/SyncHttpsClient.h>
83 initial_membership_event::initial_membership_event(
cluster &
cluster, std::unordered_set<member> members) : cluster_(
84 cluster), members_(std::move(members)) {
96 ProxyManager::ProxyManager(ClientContext &context) : client_(context) {
99 void ProxyManager::init() {
102 void ProxyManager::destroy() {
103 std::lock_guard<std::recursive_mutex> guard(lock_);
104 for (
auto &p : proxies_) {
106 auto proxy = p.second.get();
107 p.second.get()->on_shutdown();
108 }
catch (std::exception &se) {
109 auto &lg = client_.get_logger();
111 boost::str(boost::format(
"Proxy was not created, "
112 "hence onShutdown can be called. Exception: %1%")
120 boost::future<void> ProxyManager::initialize(
const std::shared_ptr<ClientProxy> &client_proxy) {
121 auto clientMessage = protocol::codec::client_createproxy_encode(client_proxy->get_name(),
122 client_proxy->get_service_name());
123 return spi::impl::ClientInvocation::create(client_, clientMessage,
124 client_proxy->get_service_name())->invoke().then(
125 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
127 client_proxy->on_initialize();
131 boost::future<void> ProxyManager::destroy_proxy(ClientProxy &proxy) {
132 DefaultObjectNamespace objectNamespace(proxy.get_service_name(), proxy.get_name());
133 std::shared_ptr<ClientProxy> registeredProxy;
135 std::lock_guard<std::recursive_mutex> guard(lock_);
136 auto it = proxies_.find(objectNamespace);
137 registeredProxy = it == proxies_.end() ? nullptr : it->second.get();
138 if (it != proxies_.end()) {
144 if (registeredProxy) {
146 proxy.destroy_locally();
147 return proxy.destroy_remotely();
148 }
catch (exception::iexception &) {
149 proxy.destroy_remotely();
153 if (&proxy != registeredProxy.get()) {
158 proxy.destroy_locally();
161 if (&proxy != registeredProxy.get()) {
166 proxy.destroy_locally();
170 return boost::make_ready_future();
173 ClientContext::ClientContext(
const client::hazelcast_client &hazelcast_client) : hazelcast_client_(
174 *hazelcast_client.client_impl_) {
177 ClientContext::ClientContext(client::impl::hazelcast_client_instance_impl &hazelcast_client)
178 : hazelcast_client_(hazelcast_client) {
181 serialization::pimpl::SerializationService &ClientContext::get_serialization_service() {
182 return hazelcast_client_.serialization_service_;
185 impl::ClientClusterServiceImpl & ClientContext::get_client_cluster_service() {
186 return hazelcast_client_.cluster_service_;
189 impl::ClientInvocationServiceImpl &ClientContext::get_invocation_service() {
190 return *hazelcast_client_.invocation_service_;
193 client_config &ClientContext::get_client_config() {
194 return hazelcast_client_.client_config_;
197 impl::ClientPartitionServiceImpl & ClientContext::get_partition_service() {
198 return *hazelcast_client_.partition_service_;
201 lifecycle_service &ClientContext::get_lifecycle_service() {
202 return hazelcast_client_.lifecycle_service_;
205 spi::impl::listener::listener_service_impl &ClientContext::get_client_listener_service() {
206 return *hazelcast_client_.listener_service_;
209 connection::ClientConnectionManagerImpl &ClientContext::get_connection_manager() {
210 return *hazelcast_client_.connection_manager_;
213 internal::nearcache::NearCacheManager &ClientContext::get_near_cache_manager() {
214 return *hazelcast_client_.near_cache_manager_;
217 client_properties &ClientContext::get_client_properties() {
218 return hazelcast_client_.client_properties_;
221 cluster &ClientContext::get_cluster() {
222 return hazelcast_client_.cluster_;
225 std::shared_ptr<impl::sequence::CallIdSequence> &ClientContext::get_call_id_sequence()
const {
226 return hazelcast_client_.call_id_sequence_;
229 const protocol::ClientExceptionFactory &ClientContext::get_client_exception_factory()
const {
230 return hazelcast_client_.get_exception_factory();
233 const std::string &ClientContext::get_name()
const {
234 return hazelcast_client_.get_name();
237 impl::ClientExecutionServiceImpl &ClientContext::get_client_execution_service()
const {
238 return *hazelcast_client_.execution_service_;
241 const std::shared_ptr<client::impl::ClientLockReferenceIdGenerator> &
242 ClientContext::get_lock_reference_id_generator() {
243 return hazelcast_client_.get_lock_reference_id_generator();
246 std::shared_ptr<client::impl::hazelcast_client_instance_impl>
247 ClientContext::get_hazelcast_client_implementation() {
248 return hazelcast_client_.shared_from_this();
251 spi::ProxyManager &ClientContext::get_proxy_manager() {
252 return hazelcast_client_.get_proxy_manager();
255 logger &ClientContext::get_logger() {
256 return *hazelcast_client_.logger_;
259 client::impl::statistics::Statistics &ClientContext::get_clientstatistics() {
260 return *hazelcast_client_.statistics_;
263 spi::impl::listener::cluster_view_listener &ClientContext::get_cluster_view_listener() {
264 return *hazelcast_client_.cluster_listener_;
267 boost::uuids::uuid ClientContext::random_uuid() {
268 return hazelcast_client_.random_uuid();
271 cp::internal::session::proxy_session_manager &ClientContext::get_proxy_session_manager() {
272 return hazelcast_client_.proxy_session_manager_;
275 lifecycle_service::lifecycle_service(ClientContext &client_context,
276 const std::vector<lifecycle_listener> &listeners) :
277 client_context_(client_context), listeners_(),
278 shutdown_completed_latch_(1) {
279 for (
const auto &listener: listeners) {
280 add_listener(lifecycle_listener(listener));
284 bool lifecycle_service::start() {
285 bool expected =
false;
286 if (!active_.compare_exchange_strong(expected,
true)) {
290 fire_lifecycle_event(lifecycle_event::STARTED);
292 client_context_.get_client_execution_service().start();
294 client_context_.get_client_listener_service().start();
296 client_context_.get_invocation_service().start();
298 client_context_.get_client_cluster_service().start();
300 client_context_.get_cluster_view_listener().start();
302 if (!client_context_.get_connection_manager().start()) {
306 auto &connectionStrategyConfig = client_context_.get_client_config().get_connection_strategy_config();
307 if (!connectionStrategyConfig.is_async_start()) {
309 wait_for_initial_membership_event();
310 client_context_.get_connection_manager().connect_to_all_cluster_members();
313 client_context_.get_invocation_service().add_backup_listener();
315 client_context_.get_clientstatistics().start();
320 void lifecycle_service::shutdown() {
321 bool expected =
true;
322 if (!active_.compare_exchange_strong(expected,
false)) {
323 shutdown_completed_latch_.wait();
327 fire_lifecycle_event(lifecycle_event::SHUTTING_DOWN);
328 client_context_.get_proxy_session_manager().shutdown();
329 client_context_.get_clientstatistics().shutdown();
330 client_context_.get_proxy_manager().destroy();
331 client_context_.get_connection_manager().shutdown();
332 client_context_.get_client_cluster_service().shutdown();
333 client_context_.get_invocation_service().shutdown();
334 client_context_.get_client_listener_service().shutdown();
335 client_context_.get_near_cache_manager().destroy_all_near_caches();
336 fire_lifecycle_event(lifecycle_event::SHUTDOWN);
337 client_context_.get_client_execution_service().shutdown();
338 client_context_.get_serialization_service().dispose();
339 shutdown_completed_latch_.count_down();
340 }
catch (std::exception &e) {
341 HZ_LOG(client_context_.get_logger(), info,
342 boost::str(boost::format(
"An exception occured during LifecycleService shutdown. %1%")
345 shutdown_completed_latch_.count_down();
349 boost::uuids::uuid lifecycle_service::add_listener(lifecycle_listener &&lifecycle_listener) {
350 std::lock_guard<std::mutex> lg(listener_lock_);
351 const auto id = uuid_generator_();
352 listeners_.emplace(
id, std::move(lifecycle_listener));
356 bool lifecycle_service::remove_listener(
const boost::uuids::uuid ®istration_id) {
357 std::lock_guard<std::mutex> guard(listener_lock_);
358 return listeners_.erase(registration_id) == 1;
361 void lifecycle_service::fire_lifecycle_event(
const lifecycle_event &lifecycle_event) {
362 std::lock_guard<std::mutex> guard(listener_lock_);
363 logger &lg = client_context_.get_logger();
365 std::function<void(lifecycle_listener &)> fire_one;
367 switch (lifecycle_event.get_state()) {
368 case lifecycle_event::STARTING : {
370 std::string date(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_DATE));
371 util::git_date_to_hazelcast_log_date(date);
372 std::string commitId(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_ID));
373 commitId.erase(std::remove(commitId.begin(), commitId.end(),
'"'), commitId.end());
376 (boost::format(
"(%1%:%2%) LifecycleService::LifecycleEvent Client (%3%) is STARTING") %
377 date % commitId % client_context_.get_connection_manager().get_client_uuid()).str());
379 util::hz_snprintf(msg, 100,
"(%s:%s) LifecycleService::LifecycleEvent STARTING", date.c_str(),
381 HZ_LOG(lg, info, msg);
383 fire_one = [](lifecycle_listener &listener) {
384 listener.starting_();
388 case lifecycle_event::STARTED : {
389 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent STARTED");
391 fire_one = [](lifecycle_listener &listener) {
396 case lifecycle_event::SHUTTING_DOWN : {
397 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent SHUTTING_DOWN");
399 fire_one = [](lifecycle_listener &listener) {
400 listener.shutting_down_();
404 case lifecycle_event::SHUTDOWN : {
405 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent SHUTDOWN");
407 fire_one = [](lifecycle_listener &listener) {
408 listener.shutdown_();
412 case lifecycle_event::CLIENT_CONNECTED : {
413 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent CLIENT_CONNECTED");
415 fire_one = [](lifecycle_listener &listener) {
416 listener.connected_();
420 case lifecycle_event::CLIENT_DISCONNECTED : {
421 HZ_LOG(lg, info,
"LifecycleService::LifecycleEvent CLIENT_DISCONNECTED");
423 fire_one = [](lifecycle_listener &listener) {
424 listener.disconnected_();
430 for (
auto &item: listeners_) {
431 fire_one(item.second);
435 bool lifecycle_service::is_running() {
439 lifecycle_service::~lifecycle_service() {
445 void lifecycle_service::wait_for_initial_membership_event()
const {
446 client_context_.get_client_cluster_service().wait_initial_member_list_fetched();
449 DefaultObjectNamespace::DefaultObjectNamespace(
const std::string &service,
const std::string &
object)
450 : service_name_(service), object_name_(object) {
454 const std::string &DefaultObjectNamespace::get_service_name()
const {
455 return service_name_;
458 const std::string &DefaultObjectNamespace::get_object_name()
const {
462 bool DefaultObjectNamespace::operator==(
const DefaultObjectNamespace &rhs)
const {
463 return service_name_ == rhs.service_name_ && object_name_ == rhs.object_name_;
466 ClientProxy::ClientProxy(
const std::string &name,
const std::string &service_name, ClientContext &context)
467 : name_(name), service_name_(service_name), context_(context) {}
469 ClientProxy::~ClientProxy() =
default;
471 const std::string &ClientProxy::get_name()
const {
475 const std::string &ClientProxy::get_service_name()
const {
476 return service_name_;
479 ClientContext &ClientProxy::get_context() {
483 void ClientProxy::on_destroy() {
486 boost::future<void> ClientProxy::destroy() {
487 return get_context().get_proxy_manager().destroy_proxy(*
this);
490 void ClientProxy::destroy_locally() {
495 }
catch (exception::iexception &) {
502 bool ClientProxy::pre_destroy() {
506 void ClientProxy::post_destroy() {
509 void ClientProxy::on_initialize() {
512 void ClientProxy::on_shutdown() {
515 serialization::pimpl::SerializationService &ClientProxy::get_serialization_service() {
516 return context_.get_serialization_service();
519 boost::future<void> ClientProxy::destroy_remotely() {
520 auto clientMessage = protocol::codec::client_destroyproxy_encode(
521 get_name(), get_service_name());
522 return spi::impl::ClientInvocation::create(get_context(), std::make_shared<protocol::ClientMessage>(
523 std::move(clientMessage)), get_name())->invoke().then(
524 boost::launch::sync, [](boost::future<protocol::ClientMessage> f) { f.get(); });
527 boost::future<boost::uuids::uuid>
528 ClientProxy::register_listener(std::shared_ptr<impl::ListenerMessageCodec> listener_message_codec,
529 std::shared_ptr<client::impl::BaseEventHandler> handler) {
530 handler->set_logger(&get_context().get_logger());
531 return get_context().get_client_listener_service().register_listener(listener_message_codec,
535 boost::future<bool> ClientProxy::deregister_listener(boost::uuids::uuid registration_id) {
536 return get_context().get_client_listener_service().deregister_listener(registration_id);
541 ListenerMessageCodec::decode_add_response(protocol::ClientMessage &msg)
const {
542 return msg.get_first_uuid();
545 bool ListenerMessageCodec::decode_remove_response(protocol::ClientMessage &msg)
const {
546 return msg.get_first_fixed_sized_field<
bool>();
549 ClientInvocationServiceImpl::ClientInvocationServiceImpl(ClientContext &client)
550 : client_(client), logger_(client.get_logger()),
551 invocation_timeout_(std::chrono::seconds(client.get_client_properties().get_integer(
552 client.get_client_properties().get_invocation_timeout_seconds()))),
553 invocation_retry_pause_(std::chrono::milliseconds(client.get_client_properties().get_long(
554 client.get_client_properties().get_invocation_retry_pause_millis()))),
555 smart_routing_(client.get_client_config().get_network_config().is_smart_routing()),
556 backup_acks_enabled_(smart_routing_ && client.get_client_config().backup_acks_enabled()),
557 fail_on_indeterminate_operation_state_(client.get_client_properties().get_boolean(client.get_client_properties().fail_on_indeterminate_state())),
558 backup_timeout_(std::chrono::milliseconds(client.get_client_properties().get_integer(client.get_client_properties().backup_timeout_millis()))) {}
560 void ClientInvocationServiceImpl::start() {
563 void ClientInvocationServiceImpl::add_backup_listener() {
564 if (this->backup_acks_enabled_) {
565 auto &listener_service = this->client_.get_client_listener_service();
566 listener_service.register_listener(std::make_shared<BackupListenerMessageCodec>(),
567 std::make_shared<noop_backup_event_handler>()).get();
571 void ClientInvocationServiceImpl::shutdown() {
572 is_shutdown_.store(
true);
575 std::chrono::milliseconds ClientInvocationServiceImpl::get_invocation_timeout()
const {
576 return invocation_timeout_;
579 std::chrono::milliseconds ClientInvocationServiceImpl::get_invocation_retry_pause()
const {
580 return invocation_retry_pause_;
583 bool ClientInvocationServiceImpl::is_redo_operation() {
584 return client_.get_client_config().is_redo_operation();
588 ClientInvocationServiceImpl::handle_client_message(
const std::shared_ptr<ClientInvocation> &invocation,
589 const std::shared_ptr<protocol::ClientMessage> &response) {
591 if (protocol::codec::ErrorCodec::EXCEPTION_MESSAGE_TYPE == response->get_message_type()) {
592 auto error_holder = protocol::codec::ErrorCodec::decode(*response);
593 invocation->notify_exception(
594 client_.get_client_exception_factory().create_exception(error_holder));
596 invocation->notify(response);
598 }
catch (std::exception &e) {
599 HZ_LOG(logger_, severe,
600 boost::str(boost::format(
"Failed to process response for %1%. %2%")
601 % *invocation % e.what())
606 bool ClientInvocationServiceImpl::send(
const std::shared_ptr<impl::ClientInvocation>& invocation,
607 const std::shared_ptr<connection::Connection>& connection) {
609 BOOST_THROW_EXCEPTION(
610 exception::hazelcast_client_not_active(
"ClientInvocationServiceImpl::send",
611 "Client is shut down"));
614 if (backup_acks_enabled_) {
615 invocation->get_client_message()->add_flag(protocol::ClientMessage::BACKUP_AWARE_FLAG);
618 write_to_connection(*connection, invocation);
619 invocation->set_send_connection(connection);
623 void ClientInvocationServiceImpl::write_to_connection(connection::Connection &connection,
624 const std::shared_ptr<ClientInvocation> &client_invocation) {
625 auto clientMessage = client_invocation->get_client_message();
626 connection.write(client_invocation);
629 void ClientInvocationServiceImpl::check_invocation_allowed() {
630 client_.get_connection_manager().check_invocation_allowed();
633 bool ClientInvocationServiceImpl::invoke(std::shared_ptr<ClientInvocation> invocation) {
634 auto connection = client_.get_connection_manager().get_random_connection();
636 HZ_LOG(logger_, finest,
"No connection found to invoke");
639 return send(invocation, connection);
642 DefaultAddressProvider::DefaultAddressProvider(config::client_network_config &network_config)
643 : network_config_(network_config) {}
645 std::vector<address> DefaultAddressProvider::load_addresses() {
646 std::vector<address> addresses = network_config_.get_addresses();
647 if (addresses.empty()) {
648 addresses.emplace_back(
"127.0.0.1", 5701);
656 boost::optional<address> DefaultAddressProvider::translate(
const address &addr) {
660 const boost::shared_ptr<ClientClusterServiceImpl::member_list_snapshot> ClientClusterServiceImpl::EMPTY_SNAPSHOT(
661 new ClientClusterServiceImpl::member_list_snapshot{-1});
663 constexpr boost::chrono::milliseconds ClientClusterServiceImpl::INITIAL_MEMBERS_TIMEOUT;
665 ClientClusterServiceImpl::ClientClusterServiceImpl(hazelcast::client::spi::ClientContext &client)
666 : client_(client), member_list_snapshot_(EMPTY_SNAPSHOT),
667 labels_(client.get_client_config().get_labels()),
668 initial_list_fetched_latch_(1) {
671 boost::uuids::uuid ClientClusterServiceImpl::add_membership_listener_without_init(
672 membership_listener &&listener) {
673 std::lock_guard<std::mutex> g(listeners_lock_);
674 auto id = client_.random_uuid();
675 listeners_.emplace(
id, std::move(listener));
679 boost::optional<member> ClientClusterServiceImpl::get_member(boost::uuids::uuid uuid)
const {
680 assert(!uuid.is_nil());
681 auto members_view_ptr = member_list_snapshot_.load();
682 const auto it = members_view_ptr->members.find(uuid);
683 if (it == members_view_ptr->members.end()) {
689 std::vector<member> ClientClusterServiceImpl::get_member_list()
const {
690 auto members_view_ptr = member_list_snapshot_.load();
691 std::vector<member> result;
692 result.reserve(members_view_ptr->members.size());
693 for (
const auto &e : members_view_ptr->members) {
694 result.emplace_back(e.second);
699 void ClientClusterServiceImpl::start() {
700 for (
auto &listener : client_.get_client_config().get_membership_listeners()) {
701 add_membership_listener(membership_listener(listener));
705 void ClientClusterServiceImpl::fire_initial_membership_event(
const initial_membership_event &event) {
706 std::lock_guard<std::mutex> g(listeners_lock_);
708 for (
auto &item : listeners_) {
709 membership_listener &listener = item.second;
710 if (listener.init_) {
711 listener.init_(event);
716 void ClientClusterServiceImpl::shutdown() {
717 initial_list_fetched_latch_.try_count_down();
721 ClientClusterServiceImpl::add_membership_listener(membership_listener &&listener) {
722 std::lock_guard<std::mutex> cluster_view_g(cluster_view_lock_);
724 auto id = add_membership_listener_without_init(std::move(listener));
726 std::lock_guard<std::mutex> listeners_g(listeners_lock_);
727 auto added_listener = listeners_[id];
729 if (added_listener.init_) {
730 auto &cluster = client_.get_cluster();
731 auto members_ptr = member_list_snapshot_.load();
732 if (!members_ptr->members.empty()) {
733 std::unordered_set<member> members;
734 for (
const auto &e : members_ptr->members) {
735 members.insert(e.second);
737 added_listener.init_(initial_membership_event(cluster, members));
744 bool ClientClusterServiceImpl::remove_membership_listener(boost::uuids::uuid registration_id) {
745 std::lock_guard<std::mutex> g(listeners_lock_);
746 return listeners_.erase(registration_id) == 1;
750 ClientClusterServiceImpl::get_members(
const member_selector &selector)
const {
751 std::vector<member> result;
752 for (
auto &&member : get_member_list()) {
753 if (selector.select(member)) {
754 result.emplace_back(std::move(member));
761 local_endpoint ClientClusterServiceImpl::get_local_client()
const {
762 connection::ClientConnectionManagerImpl &cm = client_.get_connection_manager();
763 auto connection = cm.get_random_connection();
764 auto inetSocketAddress = connection ? connection->get_local_socket_address() : boost::none;
765 auto uuid = cm.get_client_uuid();
766 return local_endpoint(uuid, std::move(inetSocketAddress), client_.get_name(), labels_);
769 void ClientClusterServiceImpl::clear_member_list_version() {
770 std::lock_guard<std::mutex> g(cluster_view_lock_);
771 auto &lg = client_.get_logger();
772 HZ_LOG(lg, finest,
"Resetting the member list version ");
773 auto cluster_view_snapshot = member_list_snapshot_.load();
776 if (cluster_view_snapshot != EMPTY_SNAPSHOT) {
777 member_list_snapshot_.store(boost::shared_ptr<member_list_snapshot>(
778 new member_list_snapshot{0, cluster_view_snapshot->members}));
782 std::vector<membership_event> ClientClusterServiceImpl::clear_member_list_and_return_events() {
783 std::lock_guard<std::mutex> g(cluster_view_lock_);
785 auto &lg = client_.get_logger();
786 HZ_LOG(lg, finest,
"Resetting the member list");
788 auto previous_list = member_list_snapshot_.load()->members;
790 member_list_snapshot_.store(
791 boost::shared_ptr<member_list_snapshot>(
new member_list_snapshot{0, {}}));
793 return detect_membership_events(previous_list, {});
796 void ClientClusterServiceImpl::clear_member_list() {
797 auto events = clear_member_list_and_return_events();
798 fire_events(std::move(events));
802 ClientClusterServiceImpl::handle_event(int32_t version,
const std::vector<member> &member_infos) {
803 auto &lg = client_.get_logger();
805 boost::str(boost::format(
"Handling new snapshot with membership version: %1%, "
808 % members_string(create_snapshot(version, member_infos)))
810 auto cluster_view_snapshot = member_list_snapshot_.load();
811 if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
812 std::lock_guard<std::mutex> g(cluster_view_lock_);
813 cluster_view_snapshot = member_list_snapshot_.load();
814 if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
816 apply_initial_state(version, member_infos);
817 initial_list_fetched_latch_.count_down();
822 std::vector<membership_event> events;
823 if (version >= cluster_view_snapshot->version) {
824 std::lock_guard<std::mutex> g(cluster_view_lock_);
825 cluster_view_snapshot = member_list_snapshot_.load();
826 if (version >= cluster_view_snapshot->version) {
827 auto prev_members = cluster_view_snapshot->members;
828 auto snapshot = boost::make_shared<member_list_snapshot>(create_snapshot(version, member_infos));
829 member_list_snapshot_.store(snapshot);
830 events = detect_membership_events(prev_members, snapshot->members);
834 fire_events(std::move(events));
837 ClientClusterServiceImpl::member_list_snapshot
838 ClientClusterServiceImpl::create_snapshot(int32_t version,
const std::vector<member> &members) {
839 member_list_snapshot result;
840 result.version = version;
841 for (
auto &m : members) {
842 result.members.insert({m.get_uuid(), m});
849 ClientClusterServiceImpl::members_string(
const ClientClusterServiceImpl::member_list_snapshot& snapshot) {
850 std::stringstream out;
851 auto const &members = snapshot.members;
852 out << std::endl << std::endl <<
"Members [" << members.size() <<
"] {";
853 for (
auto const &e : members) {
854 out << std::endl <<
"\t" << e.second;
856 out << std::endl <<
"}" << std::endl;
861 ClientClusterServiceImpl::apply_initial_state(int32_t version,
const std::vector<member> &member_infos) {
862 auto snapshot = boost::make_shared<member_list_snapshot>(create_snapshot(version, member_infos));
863 member_list_snapshot_.store(snapshot);
864 HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
865 std::unordered_set<member> members;
866 for(
auto const &e : snapshot->members) {
867 members.insert(e.second);
869 std::lock_guard<std::mutex> g(listeners_lock_);
870 for (
auto &item : listeners_) {
871 membership_listener &listener = item.second;
872 if (listener.init_) {
873 listener.init_(initial_membership_event(client_.get_cluster(), members));
878 std::vector<membership_event> ClientClusterServiceImpl::detect_membership_events(
879 std::unordered_map<boost::uuids::uuid, member, boost::hash<boost::uuids::uuid>> previous_members,
880 const std::unordered_map<boost::uuids::uuid, member, boost::hash<boost::uuids::uuid>>& current_members) {
881 std::vector<member> new_members;
883 for (
auto const & e : current_members) {
884 if (!previous_members.erase(e.first)) {
885 new_members.emplace_back(e.second);
889 std::vector<membership_event> events;
892 for (
auto const &e : previous_members) {
893 events.emplace_back(client_.get_cluster(), e.second, membership_event::membership_event_type::MEMBER_LEFT, current_members);
894 auto connection = client_.get_connection_manager().get_connection(e.second.get_uuid());
896 connection->close(
"", std::make_exception_ptr(exception::target_disconnected(
897 "ClientClusterServiceImpl::detect_membership_events", (boost::format(
898 "The client has closed the connection to this member, after receiving a member left event from the cluster. %1%") %
899 *connection).str())));
902 for (
auto const &member : new_members) {
903 events.emplace_back(client_.get_cluster(), member, membership_event::membership_event_type::MEMBER_JOINED, current_members);
906 if (!events.empty()) {
907 auto snapshot = member_list_snapshot_.load();
908 if (!snapshot->members.empty()) {
909 HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
915 void ClientClusterServiceImpl::fire_events(std::vector<membership_event> events) {
916 std::lock_guard<std::mutex> g(listeners_lock_);
918 for (
auto const &event : events) {
919 for (
auto &item : listeners_) {
920 membership_listener &listener = item.second;
921 if (event.get_event_type() == membership_event::membership_event_type::MEMBER_JOINED) {
922 listener.joined_(event);
924 listener.left_(event);
930 void ClientClusterServiceImpl::wait_initial_member_list_fetched()
const {
932 if ((
const_cast<boost::latch&
>(initial_list_fetched_latch_)).wait_for(INITIAL_MEMBERS_TIMEOUT) == boost::cv_status::timeout) {
933 BOOST_THROW_EXCEPTION(exception::illegal_state(
934 "ClientClusterServiceImpl::wait_initial_member_list_fetched",
935 "Could not get initial member list from cluster!"));
940 ClientInvocationServiceImpl::invoke_on_connection(
const std::shared_ptr<ClientInvocation> &invocation,
941 const std::shared_ptr<connection::Connection> &connection) {
942 return send(invocation, connection);
945 bool ClientInvocationServiceImpl::invoke_on_partition_owner(
946 const std::shared_ptr<ClientInvocation> &invocation,
int partition_id) {
947 auto partition_owner = client_.get_partition_service().get_partition_owner(partition_id);
948 if (partition_owner.is_nil()) {
949 HZ_LOG(logger_, finest,
950 boost::str(boost::format(
"Partition owner is not assigned yet for partition %1%")
955 return invoke_on_target(invocation, partition_owner);
958 bool ClientInvocationServiceImpl::invoke_on_target(
const std::shared_ptr<ClientInvocation> &invocation,
959 boost::uuids::uuid uuid) {
960 assert (!uuid.is_nil());
961 auto connection = client_.get_connection_manager().get_connection(uuid);
963 HZ_LOG(logger_, finest,
964 boost::str(boost::format(
"Client is not connected to target : %1%")
969 return send(invocation, connection);
972 bool ClientInvocationServiceImpl::is_smart_routing()
const {
973 return smart_routing_;
976 const std::chrono::milliseconds &ClientInvocationServiceImpl::get_backup_timeout()
const {
977 return backup_timeout_;
980 bool ClientInvocationServiceImpl::fail_on_indeterminate_state()
const {
981 return fail_on_indeterminate_operation_state_;
984 ClientExecutionServiceImpl::ClientExecutionServiceImpl(
const std::string &name,
985 const client_properties &properties,
987 spi::lifecycle_service &service)
988 : lifecycle_service_(service), client_properties_(properties) {}
990 void ClientExecutionServiceImpl::start() {
991 int internalPoolSize = client_properties_.get_integer(
992 client_properties_.get_internal_executor_pool_size());
993 if (internalPoolSize <= 0) {
994 internalPoolSize = util::IOUtil::to_value<int>(
995 client_properties::INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT);
998 internal_executor_.reset(
new hazelcast::util::hz_thread_pool(internalPoolSize));
1000 user_executor_.reset(
new hazelcast::util::hz_thread_pool());
1003 void ClientExecutionServiceImpl::shutdown() {
1004 shutdown_thread_pool(internal_executor_.get());
1005 shutdown_thread_pool(user_executor_.get());
1008 util::hz_thread_pool &ClientExecutionServiceImpl::get_user_executor() {
1009 return *user_executor_;
1012 void ClientExecutionServiceImpl::shutdown_thread_pool(hazelcast::util::hz_thread_pool *pool) {
1019 constexpr
int ClientInvocation::MAX_FAST_INVOCATION_COUNT;
1020 constexpr
int ClientInvocation::UNASSIGNED_PARTITION;
1022 ClientInvocation::ClientInvocation(spi::ClientContext &client_context,
1023 std::shared_ptr<protocol::ClientMessage> &&message,
1024 const std::string &name,
1026 const std::shared_ptr<connection::Connection> &conn,
1027 boost::uuids::uuid uuid) :
1028 logger_(client_context.get_logger()),
1029 lifecycle_service_(client_context.get_lifecycle_service()),
1030 invocation_service_(client_context.get_invocation_service()),
1031 execution_service_(client_context.get_client_execution_service().shared_from_this()),
1032 call_id_sequence_(client_context.get_call_id_sequence()),
1034 partition_id_(partition),
1035 start_time_(std::chrono::steady_clock::now()),
1036 retry_pause_(invocation_service_.get_invocation_retry_pause()),
1038 connection_(conn), bound_to_single_connection_(conn != nullptr),
1039 invoke_count_(0), urgent_(false), smart_routing_(invocation_service_.is_smart_routing()) {
1040 message->set_partition_id(partition_id_);
1041 client_message_ = boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(message);
1042 set_send_connection(
nullptr);
1045 ClientInvocation::~ClientInvocation() =
default;
1047 boost::future<protocol::ClientMessage> ClientInvocation::invoke() {
1048 assert (client_message_.load());
1050 call_id_sequence_->next();
1051 invoke_on_selection();
1052 if (!lifecycle_service_.is_running()) {
1053 return invocation_promise_.get_future().then([](boost::future<protocol::ClientMessage> f) {
1057 auto id_seq = call_id_sequence_;
1058 return invocation_promise_.get_future().then(execution_service_->get_user_executor(),
1059 [=](boost::future<protocol::ClientMessage> f) {
1065 boost::future<protocol::ClientMessage> ClientInvocation::invoke_urgent() {
1066 assert(client_message_.load());
1069 call_id_sequence_->force_next();
1070 invoke_on_selection();
1071 if (!lifecycle_service_.is_running()) {
1072 return invocation_promise_.get_future().then([](boost::future<protocol::ClientMessage> f) {
1076 auto id_seq = call_id_sequence_;
1077 return invocation_promise_.get_future().then(execution_service_->get_user_executor(),
1078 [=](boost::future<protocol::ClientMessage> f) {
1084 void ClientInvocation::invoke_on_selection() {
1088 invocation_service_.check_invocation_allowed();
1091 if (is_bind_to_single_connection()) {
1092 bool invoked =
false;
1093 auto conn = connection_.lock();
1095 invoked = invocation_service_.invoke_on_connection(shared_from_this(), conn);
1098 std::string message;
1100 message = (boost::format(
"Could not invoke on connection %1%") % *conn).str();
1102 message =
"Could not invoke. Bound to a connection that is deleted already.";
1104 notify_exception(std::make_exception_ptr(
1105 exception::io(
"ClientInvocation::invoke_on_selection", message)));
1110 bool invoked =
false;
1111 if (smart_routing_) {
1112 if (partition_id_ != -1) {
1113 invoked = invocation_service_.invoke_on_partition_owner(shared_from_this(), partition_id_);
1114 }
else if (!uuid_.is_nil()) {
1115 invoked = invocation_service_.invoke_on_target(shared_from_this(), uuid_);
1117 invoked = invocation_service_.invoke(shared_from_this());
1120 invoked = invocation_service_.invoke(shared_from_this());
1123 invoked = invocation_service_.invoke(shared_from_this());
1126 notify_exception(std::make_exception_ptr(exception::io(
"No connection found to invoke")));
1128 }
catch (exception::iexception &) {
1129 notify_exception(std::current_exception());
1130 }
catch (std::exception &) {
1135 bool ClientInvocation::is_bind_to_single_connection()
const {
1136 return bound_to_single_connection_;
1139 void ClientInvocation::run() {
1143 void ClientInvocation::retry() {
1146 client_message_ = boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(copy_message());
1149 invoke_on_selection();
1150 }
catch (exception::iexception &e) {
1151 set_exception(e, boost::current_exception());
1152 }
catch (std::exception &) {
1157 void ClientInvocation::set_exception(
const std::exception &e, boost::exception_ptr exception_ptr) {
1158 invoked_or_exception_set_.store(
true);
1160 auto send_conn = send_connection_.load();
1162 auto connection = send_conn->lock();
1164 auto call_id = client_message_.load()->get()->get_correlation_id();
1165 boost::asio::post(connection->get_socket().get_executor(), [=]() {
1166 connection->deregister_invocation(call_id);
1170 invocation_promise_.set_exception(std::move(exception_ptr));
1171 }
catch (boost::promise_already_satisfied &se) {
1172 if (!event_handler_) {
1173 HZ_LOG(logger_, finest,
1174 boost::str(boost::format(
"Failed to set the exception for invocation. "
1175 "%1%, %2% Exception to be set: %3%")
1176 % se.what() % *
this % e.what())
1182 void ClientInvocation::notify_exception(std::exception_ptr exception) {
1185 std::rethrow_exception(exception);
1186 }
catch (exception::iexception &iex) {
1189 if (!lifecycle_service_.is_running()) {
1191 std::throw_with_nested(boost::enable_current_exception(
1192 exception::hazelcast_client_not_active(iex.get_source(),
1193 "Client is shutting down")));
1194 }
catch (exception::iexception &e) {
1195 set_exception(e, boost::current_exception());
1200 if (!should_retry(iex)) {
1201 set_exception(iex, boost::current_exception());
1205 auto timePassed = std::chrono::steady_clock::now() - start_time_;
1206 if (timePassed > invocation_service_.get_invocation_timeout()) {
1207 HZ_LOG(logger_, finest,
1208 boost::str(boost::format(
"Exception will not be retried because "
1209 "invocation timed out. %1%") % iex.what())
1212 auto now = std::chrono::steady_clock::now();
1214 auto timeoutException = (exception::exception_builder<exception::operation_timeout>(
1215 "ClientInvocation::newoperation_timeout_exception") << *
this
1216 <<
" timed out because exception occurred after client invocation timeout "
1217 << std::chrono::duration_cast<std::chrono::milliseconds>(invocation_service_.get_invocation_timeout()).count()
1218 <<
"msecs. Last exception:" << iex
1219 <<
" Current time :" << util::StringUtil::time_to_string(now) <<
". "
1220 <<
"Start time: " << util::StringUtil::time_to_string(start_time_)
1221 <<
". Total elapsed time: " <<
1222 std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time_).count()
1223 <<
" ms. ").build();
1225 BOOST_THROW_EXCEPTION(timeoutException);
1227 set_exception(timeoutException, boost::current_exception());
1235 }
catch (std::exception &e) {
1236 set_exception(e, boost::current_exception());
1243 void ClientInvocation::erase_invocation()
const {
1244 if (!this->event_handler_) {
1245 auto sent_connection = get_send_connection();
1246 if (sent_connection) {
1247 auto this_invocation = shared_from_this();
1248 boost::asio::post(sent_connection->get_socket().get_executor(), [=] () {
1249 sent_connection->invocations.erase(this_invocation->get_client_message()->get_correlation_id());
1255 bool ClientInvocation::should_retry(exception::iexception &exception) {
1256 auto errorCode = exception.get_error_code();
1257 if (is_bind_to_single_connection() && (errorCode == protocol::IO || errorCode == protocol::TARGET_DISCONNECTED)) {
1261 if (!uuid_.is_nil() && errorCode == protocol::TARGET_NOT_MEMBER) {
1268 if (errorCode == protocol::IO || errorCode == protocol::HAZELCAST_INSTANCE_NOT_ACTIVE || exception.is_retryable()) {
1271 if (errorCode == protocol::TARGET_DISCONNECTED) {
1272 return client_message_.load()->get()->is_retryable() || invocation_service_.is_redo_operation();
1277 std::ostream &operator<<(std::ostream &os,
const ClientInvocation &invocation) {
1278 std::ostringstream target;
1279 if (invocation.is_bind_to_single_connection()) {
1280 auto conn = invocation.connection_.lock();
1282 target <<
"connection " << *conn;
1284 }
else if (invocation.partition_id_ != -1) {
1285 target <<
"partition " << invocation.partition_id_;
1286 }
else if (!invocation.uuid_.is_nil()) {
1287 target <<
"uuid " << boost::to_string(invocation.uuid_);
1291 os <<
"ClientInvocation{" <<
"requestMessage = " << *invocation.client_message_.load()->get()
1292 <<
", objectName = "
1293 << invocation.object_name_ <<
", target = " << target.str() <<
", sendConnection = ";
1294 auto sendConnection = invocation.get_send_connection();
1295 if (sendConnection) {
1296 os << *sendConnection;
1300 os <<
", backup_acks_expected_ = " <<
static_cast<int>(invocation.backup_acks_expected_)
1301 <<
", backup_acks_received = " << invocation.backup_acks_received_;
1303 if (invocation.pending_response_) {
1304 os <<
", pending_response: " << *invocation.pending_response_;
1312 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1313 std::shared_ptr<protocol::ClientMessage> &&client_message,
1314 const std::string &object_name,
1316 return std::shared_ptr<ClientInvocation>(
1317 new ClientInvocation(client_context, std::move(client_message), object_name, partition_id));
1320 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1321 std::shared_ptr<protocol::ClientMessage> &&client_message,
1322 const std::string &object_name,
1323 const std::shared_ptr<connection::Connection> &connection) {
1324 return std::shared_ptr<ClientInvocation>(
1325 new ClientInvocation(client_context, std::move(client_message), object_name, UNASSIGNED_PARTITION,
1330 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1331 std::shared_ptr<protocol::ClientMessage> &&client_message,
1332 const std::string &object_name,
1333 boost::uuids::uuid uuid) {
1334 return std::shared_ptr<ClientInvocation>(
1335 new ClientInvocation(client_context, std::move(client_message), object_name, UNASSIGNED_PARTITION,
1339 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1340 protocol::ClientMessage &client_message,
1341 const std::string &object_name,
1343 return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1344 object_name, partition_id);
1347 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1348 protocol::ClientMessage &client_message,
1349 const std::string &object_name,
1350 const std::shared_ptr<connection::Connection> &connection) {
1351 return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1352 object_name, connection);
1355 std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1356 protocol::ClientMessage &client_message,
1357 const std::string &object_name,
1358 boost::uuids::uuid uuid) {
1359 return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1363 std::shared_ptr<connection::Connection> ClientInvocation::get_send_connection()
const {
1364 return send_connection_.load()->lock();
1367 void ClientInvocation::wait_invoked()
const {
1369 while (!invoked_or_exception_set_) {
1370 std::this_thread::sleep_for(retry_pause_);
1375 ClientInvocation::set_send_connection(
const std::shared_ptr<connection::Connection> &conn) {
1376 send_connection_.store(boost::make_shared<std::weak_ptr<connection::Connection>>(conn));
1377 invoked_or_exception_set_.store(
true);
1380 void ClientInvocation::notify(
const std::shared_ptr<protocol::ClientMessage> &msg) {
1382 BOOST_THROW_EXCEPTION(exception::illegal_argument(
"response can't be null"));
1385 int8_t expected_backups = msg->get_number_of_backups();
1389 if (expected_backups > backup_acks_received_) {
1393 pending_response_received_time_ = std::chrono::steady_clock::now();
1395 backup_acks_expected_ = expected_backups;
1399 pending_response_ = msg;
1411 void ClientInvocation::complete(
const std::shared_ptr<protocol::ClientMessage> &msg) {
1414 this->invocation_promise_.set_value(*msg);
1415 }
catch (std::exception &e) {
1416 HZ_LOG(logger_, warning,
1417 boost::str(boost::format(
"Failed to set the response for invocation. "
1418 "Dropping the response. %1%, %2% Response: %3%")
1419 % e.what() % *
this % *msg)
1422 this->erase_invocation();
1425 std::shared_ptr<protocol::ClientMessage> ClientInvocation::get_client_message()
const {
1426 return *client_message_.load();
1429 const std::shared_ptr<EventHandler<protocol::ClientMessage> > &
1430 ClientInvocation::get_event_handler()
const {
1431 return event_handler_;
1434 void ClientInvocation::set_event_handler(
1435 const std::shared_ptr<EventHandler<protocol::ClientMessage> > &handler) {
1436 ClientInvocation::event_handler_ = handler;
1439 void ClientInvocation::execute() {
1440 auto this_invocation = shared_from_this();
1441 auto command = [=]() {
1442 this_invocation->run();
1448 int64_t callId = call_id_sequence_->force_next();
1449 client_message_.load()->get()->set_correlation_id(callId);
1452 call_id_sequence_->complete();
1454 if (invoke_count_ < MAX_FAST_INVOCATION_COUNT) {
1456 execution_service_->execute(command);
1459 int64_t delayMillis = util::min<int64_t>(
static_cast<int64_t
>(1) << (invoke_count_ - MAX_FAST_INVOCATION_COUNT),
1460 std::chrono::duration_cast<std::chrono::milliseconds>(retry_pause_).count());
1461 retry_timer_ = execution_service_->schedule(command, std::chrono::milliseconds(delayMillis));
1465 const std::string ClientInvocation::get_name()
const {
1466 return "ClientInvocation";
1469 std::shared_ptr<protocol::ClientMessage> ClientInvocation::copy_message() {
1470 return std::make_shared<protocol::ClientMessage>(**client_message_.load());
1473 boost::promise<protocol::ClientMessage> &ClientInvocation::get_promise() {
1474 return invocation_promise_;
1477 void ClientInvocation::log_exception(exception::iexception &e) {
1478 HZ_LOG(logger_, finest,
1479 boost::str(boost::format(
"Invocation got an exception %1%, invoke count : %2%, "
1481 % *
this % invoke_count_.load() % e)
1485 void ClientInvocation::notify_backup() {
1486 ++backup_acks_received_;
1488 if (!pending_response_) {
1494 if (backup_acks_expected_ != backup_acks_received_) {
1501 complete_with_pending_response();
1505 ClientInvocation::detect_and_handle_backup_timeout(
const std::chrono::milliseconds &backup_timeout) {
1508 if (backup_acks_expected_ == backup_acks_received_) {
1514 if (!pending_response_) {
1519 if (pending_response_received_time_ + backup_timeout >= std::chrono::steady_clock::now()) {
1523 if (invocation_service_.fail_on_indeterminate_state()) {
1524 auto exception = boost::enable_current_exception((exception::exception_builder<exception::indeterminate_operation_state>(
1525 "ClientInvocation::detect_and_handle_backup_timeout") << *
this
1526 <<
" failed because backup acks missed.").build());
1527 notify_exception(std::make_exception_ptr(exception));
1532 complete_with_pending_response();
1535 void ClientInvocation::complete_with_pending_response() {
1536 complete(pending_response_);
1539 ClientContext &impl::ClientTransactionManagerServiceImpl::get_client()
const {
1543 ClientTransactionManagerServiceImpl::ClientTransactionManagerServiceImpl(ClientContext &client)
1544 : client_(client) {}
1546 std::shared_ptr<connection::Connection> ClientTransactionManagerServiceImpl::connect() {
1547 auto &invocationService = client_.get_invocation_service();
1548 auto startTime = std::chrono::steady_clock::now();
1549 auto invocationTimeout = invocationService.get_invocation_timeout();
1550 client_config &clientConfig = client_.get_client_config();
1551 bool smartRouting = clientConfig.get_network_config().is_smart_routing();
1553 while (client_.get_lifecycle_service().is_running()) {
1555 auto connection = client_.get_connection_manager().get_random_connection();
1557 throw_exception(smartRouting);
1560 }
catch (exception::hazelcast_client_offline &) {
1562 }
catch (exception::iexception &) {
1563 if (std::chrono::steady_clock::now() - startTime > invocationTimeout) {
1564 std::rethrow_exception(
1565 new_operation_timeout_exception(std::current_exception(), invocationTimeout,
1569 std::this_thread::sleep_for(invocationService.get_invocation_retry_pause());
1571 BOOST_THROW_EXCEPTION(
1572 exception::hazelcast_client_not_active(
"ClientTransactionManagerServiceImpl::connect",
1573 "Client is shutdown"));
1577 ClientTransactionManagerServiceImpl::new_operation_timeout_exception(std::exception_ptr cause,
1578 std::chrono::milliseconds invocation_timeout,
1579 std::chrono::steady_clock::time_point start_time) {
1580 std::ostringstream sb;
1581 auto now = std::chrono::steady_clock::now();
1583 <<
"Creating transaction context timed out because exception occurred after client invocation timeout "
1584 << std::chrono::duration_cast<std::chrono::milliseconds>(invocation_timeout).count() <<
" ms. " <<
"Current time: "
1585 << util::StringUtil::time_to_string(std::chrono::steady_clock::now()) <<
". " <<
"Start time: "
1586 << util::StringUtil::time_to_string(start_time) <<
". Total elapsed time: "
1587 << std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time).count() <<
" ms. ";
1589 std::rethrow_exception(cause);
1592 std::throw_with_nested(boost::enable_current_exception(exception::operation_timeout(
1593 "ClientTransactionManagerServiceImpl::newoperation_timeout_exception", sb.str())));
1595 return std::current_exception();
1601 void ClientTransactionManagerServiceImpl::throw_exception(
bool smart_routing) {
1602 auto &client_config = client_.get_client_config();
1603 auto &connection_strategy_Config = client_config.get_connection_strategy_config();
1604 auto reconnect_mode = connection_strategy_Config.get_reconnect_mode();
1605 if (reconnect_mode == config::client_connection_strategy_config::reconnect_mode::ASYNC) {
1606 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
1607 "ClientTransactionManagerServiceImpl::throw_exception",
""));
1609 if (smart_routing) {
1610 auto members = client_.get_cluster().get_members();
1611 std::ostringstream msg;
1612 if (members.empty()) {
1613 msg <<
"No address was return by the LoadBalancer since there are no members in the cluster";
1615 msg <<
"No address was return by the LoadBalancer. "
1616 "But the cluster contains the following members:{\n";
1617 for (
auto const &m : members) {
1618 msg <<
'\t' << m <<
'\n';
1622 BOOST_THROW_EXCEPTION(exception::illegal_state(
1623 "ClientTransactionManagerServiceImpl::throw_exception", msg.str()));
1625 BOOST_THROW_EXCEPTION(exception::illegal_state(
1626 "ClientTransactionManagerServiceImpl::throw_exception",
1627 "No active connection is found"));
1630 ClientPartitionServiceImpl::ClientPartitionServiceImpl(ClientContext &client)
1631 : client_(client), logger_(client.get_logger()), partition_count_(0),
1632 partition_table_(boost::shared_ptr<partition_table>(new partition_table{0, -1})) {
1635 void ClientPartitionServiceImpl::handle_event(int32_t connection_id, int32_t version,
1636 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
1637 HZ_LOG(logger_, finest,
1638 boost::str(boost::format(
"Handling new partition table with partitionStateVersion: %1%") % version)
1642 auto current = partition_table_.load();
1643 if (!should_be_applied(connection_id, version, partitions, *current)) {
1646 if (partition_table_.compare_exchange_strong(current, boost::shared_ptr<partition_table>(
1647 new partition_table{connection_id, version, convert_to_map(partitions)}))) {
1648 HZ_LOG(logger_, finest,
1649 boost::str(boost::format(
"Applied partition table with partitionStateVersion : %1%") % version)
1657 boost::uuids::uuid ClientPartitionServiceImpl::get_partition_owner(int32_t partition_id) {
1658 auto table_ptr = partition_table_.load();
1659 auto it = table_ptr->partitions.find(partition_id);
1660 if (it != table_ptr->partitions.end()) {
1663 return boost::uuids::nil_uuid();
1666 int32_t ClientPartitionServiceImpl::get_partition_id(
const serialization::pimpl::data &key) {
1667 int32_t pc = get_partition_count();
1671 int hash = key.get_partition_hash();
1672 return util::HashUtil::hash_to_index(hash, pc);
1675 int32_t ClientPartitionServiceImpl::get_partition_count() {
1676 return partition_count_.load();
1679 std::shared_ptr<client::impl::Partition> ClientPartitionServiceImpl::get_partition(
int partition_id) {
1680 return std::shared_ptr<client::impl::Partition>(
new PartitionImpl(partition_id, client_, *
this));
1683 bool ClientPartitionServiceImpl::check_and_set_partition_count(int32_t new_partition_count) {
1684 int32_t expected = 0;
1685 if (partition_count_.compare_exchange_strong(expected, new_partition_count)) {
1688 return partition_count_.load() == new_partition_count;
1692 ClientPartitionServiceImpl::should_be_applied(int32_t connection_id, int32_t version,
1693 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions,
1694 const partition_table ¤t) {
1695 auto &lg = client_.get_logger();
1696 if (partitions.empty()) {
1697 if (logger_.enabled(logger::level::finest)) {
1698 log_failure(connection_id, version, current,
"response is empty");
1702 if (!current.connection_id || connection_id != current.connection_id) {
1704 ([¤t, connection_id](){
1705 auto frmt = boost::format(
"Event coming from a new connection. Old connection id: %1%, "
1706 "new connection %2%");
1708 if (current.connection_id) {
1709 frmt = frmt % current.connection_id;
1711 frmt = frmt %
"none";
1714 return boost::str(frmt % connection_id);
1720 if (version <= current.version) {
1721 if (lg.enabled(logger::level::finest)) {
1722 log_failure(connection_id, version, current,
"response state version is old");
1729 void ClientPartitionServiceImpl::log_failure(int32_t connection_id, int32_t version,
1730 const ClientPartitionServiceImpl::partition_table ¤t,
1731 const std::string &cause) {
1732 HZ_LOG(logger_, finest,
1734 auto frmt = boost::format(
" We will not apply the response, since %1% ."
1735 " Response is from connection with id %2%. "
1736 "Current connection id is %3%, response state version:%4%. "
1737 "Current state version: %5%");
1738 if (current.connection_id) {
1739 return boost::str(frmt % cause % connection_id % current.connection_id % version %
1743 return boost::str(frmt % cause % connection_id %
"nullptr" % version % current.version);
1749 void ClientPartitionServiceImpl::reset() {
1750 partition_table_.store(
nullptr);
1753 std::unordered_map<int32_t, boost::uuids::uuid> ClientPartitionServiceImpl::convert_to_map(
1754 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
1755 std::unordered_map<int32_t, boost::uuids::uuid> new_partitions;
1756 for (
auto const &e : partitions) {
1757 for (
auto pid: e.second) {
1758 new_partitions.insert({pid, e.first});
1761 return new_partitions;
1764 int ClientPartitionServiceImpl::PartitionImpl::get_partition_id()
const {
1765 return partition_id_;
1768 boost::optional<member> ClientPartitionServiceImpl::PartitionImpl::get_owner()
const {
1769 auto owner = partition_service_.get_partition_owner(partition_id_);
1770 if (!owner.is_nil()) {
1771 return client_.get_client_cluster_service().get_member(owner);
1776 ClientPartitionServiceImpl::PartitionImpl::PartitionImpl(
int partition_id, ClientContext &client,
1777 ClientPartitionServiceImpl &partition_service)
1778 : partition_id_(partition_id), client_(client), partition_service_(partition_service) {
1781 namespace sequence {
1782 CallIdSequenceWithoutBackpressure::CallIdSequenceWithoutBackpressure() : head_(0) {}
1784 CallIdSequenceWithoutBackpressure::~CallIdSequenceWithoutBackpressure() =
default;
1786 int32_t CallIdSequenceWithoutBackpressure::get_max_concurrent_invocations()
const {
1790 int64_t CallIdSequenceWithoutBackpressure::next() {
1791 return force_next();
1794 int64_t CallIdSequenceWithoutBackpressure::force_next() {
1798 void CallIdSequenceWithoutBackpressure::complete() {
1802 int64_t CallIdSequenceWithoutBackpressure::get_last_call_id() {
1807 AbstractCallIdSequence::AbstractCallIdSequence(int32_t max_concurrent_invocations) {
1808 std::ostringstream out;
1809 out <<
"maxConcurrentInvocations should be a positive number. maxConcurrentInvocations="
1810 << max_concurrent_invocations;
1811 this->max_concurrent_invocations_ = util::Preconditions::check_positive(max_concurrent_invocations,
1814 for (
size_t i = 0; i < longs_.size(); ++i) {
1819 AbstractCallIdSequence::~AbstractCallIdSequence() =
default;
1821 int32_t AbstractCallIdSequence::get_max_concurrent_invocations()
const {
1822 return max_concurrent_invocations_;
1825 int64_t AbstractCallIdSequence::next() {
1827 handle_no_space_left();
1829 return force_next();
1832 int64_t AbstractCallIdSequence::force_next() {
1833 return ++longs_[INDEX_HEAD];
1836 void AbstractCallIdSequence::complete() {
1837 ++longs_[INDEX_TAIL];
1838 assert(longs_[INDEX_TAIL] <= longs_[INDEX_HEAD]);
1841 int64_t AbstractCallIdSequence::get_last_call_id() {
1842 return longs_[INDEX_HEAD];
1845 bool AbstractCallIdSequence::has_space() {
1846 return longs_[INDEX_HEAD] - longs_[INDEX_TAIL] < max_concurrent_invocations_;
1849 int64_t AbstractCallIdSequence::get_tail() {
1850 return longs_[INDEX_TAIL];
1853 const std::unique_ptr<util::concurrent::IdleStrategy> CallIdSequenceWithBackpressure::IDLER(
1854 new util::concurrent::BackoffIdleStrategy(
1855 0, 0, std::chrono::duration_cast<std::chrono::nanoseconds>(
1856 std::chrono::microseconds(1000)).count(),
1857 std::chrono::duration_cast<std::chrono::nanoseconds>(
1858 std::chrono::microseconds(MAX_DELAY_MS * 1000)).count()));
1860 CallIdSequenceWithBackpressure::CallIdSequenceWithBackpressure(int32_t max_concurrent_invocations,
1861 int64_t backoff_timeout_ms)
1862 : AbstractCallIdSequence(max_concurrent_invocations) {
1863 std::ostringstream out;
1864 out <<
"backoffTimeoutMs should be a positive number. backoffTimeoutMs=" << backoff_timeout_ms;
1865 util::Preconditions::check_positive(backoff_timeout_ms, out.str());
1867 backoff_timeout_nanos_ = std::chrono::duration_cast<std::chrono::nanoseconds>(
1868 std::chrono::milliseconds(backoff_timeout_ms)).count();
1871 void CallIdSequenceWithBackpressure::handle_no_space_left() {
1872 auto start = std::chrono::steady_clock::now();
1873 for (int64_t idleCount = 0;; idleCount++) {
1874 int64_t elapsedNanos = std::chrono::duration_cast<std::chrono::nanoseconds>(
1875 std::chrono::steady_clock::now() - start).count();
1876 if (elapsedNanos > backoff_timeout_nanos_) {
1877 throw (exception::exception_builder<exception::hazelcast_overload>(
1878 "CallIdSequenceWithBackpressure::handleNoSpaceLeft")
1879 <<
"Timed out trying to acquire another call ID."
1880 <<
" maxConcurrentInvocations = " << get_max_concurrent_invocations()
1881 <<
", backoffTimeout = "
1882 << std::chrono::microseconds(backoff_timeout_nanos_ / 1000).count()
1883 <<
" msecs, elapsed:"
1884 << std::chrono::microseconds(elapsedNanos / 1000).count() <<
" msecs").build();
1886 IDLER->idle(idleCount);
1894 FailFastCallIdSequence::FailFastCallIdSequence(int32_t max_concurrent_invocations)
1895 : AbstractCallIdSequence(max_concurrent_invocations) {}
1897 void FailFastCallIdSequence::handle_no_space_left() {
1898 throw (exception::exception_builder<exception::hazelcast_overload>(
1899 "FailFastCallIdSequence::handleNoSpaceLeft")
1900 <<
"Maximum invocation count is reached. maxConcurrentInvocations = "
1901 << get_max_concurrent_invocations()).build();
1905 std::unique_ptr<CallIdSequence> CallIdFactory::new_call_id_sequence(
bool is_back_pressure_enabled,
1906 int32_t max_allowed_concurrent_invocations,
1907 int64_t backoff_timeout_ms) {
1908 if (!is_back_pressure_enabled) {
1909 return std::unique_ptr<CallIdSequence>(
new CallIdSequenceWithoutBackpressure());
1910 }
else if (backoff_timeout_ms <= 0) {
1911 return std::unique_ptr<CallIdSequence>(
1912 new FailFastCallIdSequence(max_allowed_concurrent_invocations));
1914 return std::unique_ptr<CallIdSequence>(
1915 new CallIdSequenceWithBackpressure(max_allowed_concurrent_invocations,
1916 backoff_timeout_ms));
1921 namespace listener {
1922 listener_service_impl::listener_service_impl(ClientContext &client_context,
1923 int32_t event_thread_count)
1924 : client_context_(client_context),
1925 serialization_service_(client_context.get_serialization_service()),
1926 logger_(client_context.get_logger()),
1927 client_connection_manager_(client_context.get_connection_manager()),
1928 number_of_event_threads_(event_thread_count),
1929 smart_(client_context.get_client_config().get_network_config().is_smart_routing()) {
1930 auto &invocationService = client_context.get_invocation_service();
1931 invocation_timeout_ = invocationService.get_invocation_timeout();
1932 invocation_retry_pause_ = invocationService.get_invocation_retry_pause();
1935 bool listener_service_impl::registers_local_only()
const {
1939 boost::future<boost::uuids::uuid>
1940 listener_service_impl::register_listener(
1941 std::shared_ptr<ListenerMessageCodec> listener_message_codec,
1942 std::shared_ptr<client::impl::BaseEventHandler> handler) {
1943 auto task = boost::packaged_task<boost::uuids::uuid()>([=]() {
1944 return register_listener_internal(listener_message_codec, handler);
1946 auto f = task.get_future();
1947 boost::asio::post(registration_executor_->get_executor(), std::move(task));
1951 boost::future<bool> listener_service_impl::deregister_listener(boost::uuids::uuid registration_id) {
1952 util::Preconditions::check_not_nill(registration_id,
"Nil userRegistrationId is not allowed!");
1954 boost::packaged_task<bool()> task([=]() {
1955 return deregister_listener_internal(registration_id);
1957 auto f = task.get_future();
1958 boost::asio::post(registration_executor_->get_executor(), std::move(task));
1962 void listener_service_impl::connection_added(
1963 const std::shared_ptr<connection::Connection> connection) {
1964 boost::asio::post(registration_executor_->get_executor(), [=]() { connection_added_internal(connection); });
1967 void listener_service_impl::connection_removed(
1968 const std::shared_ptr<connection::Connection> connection) {
1969 boost::asio::post(registration_executor_->get_executor(), [=]() { connection_removed_internal(connection); });
1973 listener_service_impl::remove_event_handler(int64_t call_id,
1974 const std::shared_ptr<connection::Connection> &connection) {
1975 boost::asio::post(connection->get_socket().get_executor(),
1976 std::packaged_task<
void()>([=]() {
1977 connection->deregister_invocation(call_id);
1981 void listener_service_impl::handle_client_message(
1982 const std::shared_ptr<ClientInvocation> invocation,
1983 const std::shared_ptr<protocol::ClientMessage> response) {
1985 auto partitionId = response->get_partition_id();
1986 if (partitionId == -1) {
1988 boost::asio::post(event_executor_->get_executor(), [=]() { process_event_message(invocation, response); });
1993 boost::asio::post(event_strands_[partitionId % event_strands_.size()],
1994 [=]() { process_event_message(invocation, response); });
1996 }
catch (
const std::exception &e) {
1997 if (client_context_.get_lifecycle_service().is_running()) {
1998 HZ_LOG(logger_, warning,
1999 boost::str(boost::format(
"Delivery of event message to event handler failed. %1%, %2%, %3%")
2000 % e.what() % *response % *invocation)
2006 void listener_service_impl::shutdown() {
2007 ClientExecutionServiceImpl::shutdown_thread_pool(event_executor_.get());
2008 ClientExecutionServiceImpl::shutdown_thread_pool(registration_executor_.get());
2011 void listener_service_impl::start() {
2012 event_executor_.reset(
new hazelcast::util::hz_thread_pool(number_of_event_threads_));
2013 registration_executor_.reset(
new hazelcast::util::hz_thread_pool(1));
2015 for (
int i = 0; i < number_of_event_threads_; ++i) {
2016 event_strands_.emplace_back(event_executor_->get_executor());
2019 client_connection_manager_.add_connection_listener(shared_from_this());
2022 boost::uuids::uuid listener_service_impl::register_listener_internal(
2023 std::shared_ptr<ListenerMessageCodec> listener_message_codec,
2024 std::shared_ptr<client::impl::BaseEventHandler> handler) {
2025 auto user_registration_id = client_context_.random_uuid();
2027 std::shared_ptr<listener_registration> registration(
new listener_registration{listener_message_codec, handler});
2028 registrations_.put(user_registration_id, registration);
2029 for (
auto const &connection : client_connection_manager_.get_active_connections()) {
2031 invoke(registration, connection);
2032 }
catch (exception::iexception &e) {
2033 if (connection->is_alive()) {
2034 deregister_listener_internal(user_registration_id);
2035 BOOST_THROW_EXCEPTION((exception::exception_builder<exception::hazelcast_>(
2036 "ClientListenerService::RegisterListenerTask::call")
2037 <<
"Listener can not be added " << e).build());
2041 return user_registration_id;
2045 listener_service_impl::deregister_listener_internal(boost::uuids::uuid user_registration_id) {
2046 auto listenerRegistration = registrations_.get(user_registration_id);
2047 if (!listenerRegistration) {
2050 bool successful =
true;
2052 auto listener_registrations = listenerRegistration->registrations.entry_set();
2053 for (
auto it = listener_registrations.begin();it != listener_registrations.end();) {
2054 const auto ®istration = it->second;
2055 const auto& subscriber = it->first;
2057 const auto &listenerMessageCodec = listenerRegistration->codec;
2058 auto serverRegistrationId = registration->server_registration_id;
2059 auto request = listenerMessageCodec->encode_remove_request(serverRegistrationId);
2060 auto invocation = ClientInvocation::create(client_context_,request,
"",
2062 invocation->invoke().get();
2064 remove_event_handler(registration->call_id, subscriber);
2066 it = listener_registrations.erase(it);
2067 }
catch (exception::iexception &e) {
2069 if (subscriber->is_alive()) {
2071 std::ostringstream endpoint;
2072 if (subscriber->get_remote_address()) {
2073 endpoint << *subscriber->get_remote_address();
2077 HZ_LOG(logger_, warning,
2078 boost::str(boost::format(
"ClientListenerService::deregisterListenerInternal "
2079 "Deregistration of listener with ID %1% "
2080 "has failed to address %2% %3%")
2081 % user_registration_id
2082 % subscriber->get_remote_address() % e)
2088 registrations_.remove(user_registration_id);
2093 void listener_service_impl::connection_added_internal(
2094 const std::shared_ptr<connection::Connection> &connection) {
2095 for (
const auto &listener_registration : registrations_.values()) {
2096 invoke_from_internal_thread(listener_registration, connection);
2100 void listener_service_impl::connection_removed_internal(
2101 const std::shared_ptr<connection::Connection> &connection) {
2102 for (
auto ®istry : registrations_.values()) {
2103 registry->registrations.remove(connection);
2108 listener_service_impl::invoke_from_internal_thread(
2109 const std::shared_ptr<listener_registration> &listener_registration,
2110 const std::shared_ptr<connection::Connection> &connection) {
2112 invoke(listener_registration, connection);
2113 }
catch (exception::iexception &e) {
2114 HZ_LOG(logger_, warning,
2115 boost::str(boost::format(
"Listener with pointer %1% can not be added to "
2116 "a new connection: %2%, reason: %3%")
2117 % listener_registration.get() % *connection % e)
2123 listener_service_impl::invoke(
const std::shared_ptr<listener_registration> &listener_registration,
2124 const std::shared_ptr<connection::Connection> &connection) {
2125 if (listener_registration->registrations.contains_key(connection)) {
2129 const auto &codec = listener_registration->codec;
2130 auto request = codec->encode_add_request(registers_local_only());
2131 const auto &handler = listener_registration->handler;
2132 handler->before_listener_register();
2134 auto invocation = ClientInvocation::create(client_context_,
2135 std::make_shared<protocol::ClientMessage>(std::move(request)),
"",
2137 invocation->set_event_handler(handler);
2138 auto clientMessage = invocation->invoke_urgent().get();
2140 auto serverRegistrationId = codec->decode_add_response(clientMessage);
2141 handler->on_listener_register();
2142 int64_t correlationId = invocation->get_client_message()->get_correlation_id();
2144 (*listener_registration).registrations.put(connection, std::shared_ptr<connection_registration>(
2145 new connection_registration{serverRegistrationId, correlationId}));
2148 void listener_service_impl::process_event_message(
2149 const std::shared_ptr<ClientInvocation> invocation,
2150 const std::shared_ptr<protocol::ClientMessage> response) {
2151 auto eventHandler = invocation->get_event_handler();
2152 if (!eventHandler) {
2153 if (client_context_.get_lifecycle_service().is_running()) {
2154 HZ_LOG(logger_, warning,
2155 boost::str(boost::format(
"No eventHandler for invocation. "
2156 "Ignoring this invocation response. %1%")
2165 eventHandler->handle(*response);
2166 }
catch (std::exception &e) {
2167 if (client_context_.get_lifecycle_service().is_running()) {
2168 HZ_LOG(logger_, warning,
2169 boost::str(boost::format(
"Delivery of event message to event handler failed. %1%, %2%, %3%")
2170 % e.what() % *response % *invocation)
2176 listener_service_impl::~listener_service_impl() =
default;
2178 void cluster_view_listener::start() {
2179 client_context_.get_connection_manager().add_connection_listener(shared_from_this());
2182 void cluster_view_listener::connection_added(
const std::shared_ptr<connection::Connection> connection) {
2183 try_register(connection);
2186 void cluster_view_listener::connection_removed(
const std::shared_ptr<connection::Connection> connection) {
2187 try_reregister_to_random_connection(connection->get_connection_id());
2190 cluster_view_listener::cluster_view_listener(ClientContext &client_context) : client_context_(
2193 void cluster_view_listener::try_register(std::shared_ptr<connection::Connection> connection) {
2194 int32_t expected_id = -1;
2195 if (!listener_added_connection_id_.compare_exchange_strong(expected_id,
2196 connection->get_connection_id())) {
2201 auto invocation = ClientInvocation::create(client_context_,
2202 std::make_shared<protocol::ClientMessage>(
2203 protocol::codec::client_addclusterviewlistener_encode()),
"", connection);
2205 auto handler = std::make_shared<event_handler>(connection->get_connection_id(), *
this);
2206 invocation->set_event_handler(handler);
2207 handler->before_listener_register();
2209 std::weak_ptr<cluster_view_listener> weak_self = shared_from_this();
2210 auto conn_id = connection->get_connection_id();
2212 invocation->invoke_urgent().then(
2213 [weak_self, handler, conn_id](boost::future<protocol::ClientMessage> f) {
2214 auto self = weak_self.lock();
2218 if (f.has_value()) {
2219 handler->on_listener_register();
2224 self->try_reregister_to_random_connection(conn_id);
2229 void cluster_view_listener::try_reregister_to_random_connection(int32_t old_connection_id) {
2230 if (!listener_added_connection_id_.compare_exchange_strong(old_connection_id, -1)) {
2234 auto new_connection = client_context_.get_connection_manager().get_random_connection();
2235 if (new_connection) {
2236 try_register(new_connection);
2240 cluster_view_listener::~cluster_view_listener() =
default;
2243 cluster_view_listener::event_handler::handle_membersview(int32_t version,
2244 const std::vector<member> &member_infos) {
2245 view_listener.client_context_.get_client_cluster_service().handle_event(version, member_infos);
2249 cluster_view_listener::event_handler::handle_partitionsview(int32_t version,
2250 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
2251 view_listener.client_context_.get_partition_service().handle_event(connection_id, version, partitions);
2254 void cluster_view_listener::event_handler::before_listener_register() {
2255 view_listener.client_context_.get_client_cluster_service().clear_member_list_version();
2256 auto &lg = view_listener.client_context_.get_logger();
2258 boost::str(boost::format(
2259 "Register attempt of cluster_view_listener::event_handler to connection with id %1%") %
2263 void cluster_view_listener::event_handler::on_listener_register() {
2264 auto &lg = view_listener.client_context_.get_logger();
2266 boost::str(boost::format(
2267 "Registered cluster_view_listener::event_handler to connection with id %1%") %
2271 cluster_view_listener::event_handler::event_handler(
int connectionId,
2272 cluster_view_listener &viewListener)
2273 : connection_id(connectionId), view_listener(viewListener) {}
2276 protocol::ClientMessage
2277 ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_add_request(
bool local_only)
const {
2278 return protocol::codec::client_localbackuplistener_encode();
2281 protocol::ClientMessage ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_remove_request(
2282 boost::uuids::uuid real_registration_id)
const {
2284 return protocol::ClientMessage(0);
2287 void ClientInvocationServiceImpl::noop_backup_event_handler::handle_backup(
2288 int64_t source_invocation_correlation_id) {
2292 namespace discovery {
2293 remote_address_provider::remote_address_provider(
2294 std::function<std::unordered_map<address, address>()> addr_map_method,
2295 bool use_public) : refresh_address_map_(std::move(addr_map_method)),
2296 use_public_(use_public) {}
2298 std::vector<address> remote_address_provider::load_addresses() {
2299 auto address_map = refresh_address_map_();
2300 std::lock_guard<std::mutex> guard(lock_);
2301 private_to_public_ = address_map;
2302 std::vector<address> addresses;
2303 addresses.reserve(address_map.size());
2304 for (
const auto &addr_pair : address_map) {
2305 addresses.push_back(addr_pair.first);
2310 boost::optional<address> remote_address_provider::translate(
const address &addr) {
2317 std::lock_guard<std::mutex> guard(lock_);
2318 auto found = private_to_public_.find(addr);
2319 if (found != private_to_public_.end()) {
2320 return found->second;
2324 auto address_map = refresh_address_map_();
2326 std::lock_guard<std::mutex> guard(lock_);
2327 private_to_public_ = address_map;
2329 auto found = private_to_public_.find(addr);
2330 if (found != private_to_public_.end()) {
2331 return found->second;
2337 #ifdef HZ_BUILD_WITH_SSL
2338 cloud_discovery::cloud_discovery(config::cloud_config &config, std::string cloud_base_url,
2339 std::chrono::steady_clock::duration timeout)
2340 : cloud_config_(config), cloud_base_url_(cloud_base_url), timeout_(timeout) {}
2342 cloud_discovery::cloud_discovery(config::cloud_config &config, std::string cloud_base_url,
2343 std::chrono::steady_clock::duration timeout) {}
2346 std::unordered_map<address, address> cloud_discovery::get_addresses() {
2347 #ifdef HZ_BUILD_WITH_SSL
2349 util::SyncHttpsClient httpsConnection(cloud_base_url_, std::string(CLOUD_URL_PATH) +
2350 cloud_config_.discovery_token, timeout_);
2351 auto &conn_stream = httpsConnection.connect_and_get_response();
2352 return parse_json_response(conn_stream);
2353 }
catch (std::exception &e) {
2354 std::throw_with_nested(boost::enable_current_exception(
2355 exception::illegal_state(
"cloud_discovery::get_addresses",
2359 util::Preconditions::check_ssl(
"cloud_discovery::get_addresses");
2364 std::unordered_map<address, address>
2365 cloud_discovery::parse_json_response(std::istream &conn_stream) {
2366 namespace pt = boost::property_tree;
2369 pt::read_json(conn_stream, root);
2371 std::unordered_map<address, address> addresses;
2372 for (
const auto &item : root) {
2373 auto private_address = item.second.get<std::string>(PRIVATE_ADDRESS_PROPERTY);
2374 auto public_address = item.second.get<std::string>(PUBLIC_ADDRESS_PROPERTY);
2376 address public_addr = create_address(public_address, -1);
2378 auto private_addr = create_address(private_address, public_addr.get_port());
2379 addresses.emplace(std::move(private_addr), std::move(public_addr));
2385 address cloud_discovery::create_address(
const std::string &hostname,
int default_port) {
2386 auto address_holder = util::AddressUtil::get_address_holder(hostname, default_port);
2387 auto scoped_hostname = util::AddressHelper::get_scoped_hostname(address_holder);
2388 return address(std::move(scoped_hostname), address_holder.get_port());
2392 ClientPartitionServiceImpl::partition_table::partition_table(int32_t connectionId, int32_t version,
2393 const std::unordered_map<int32_t, boost::uuids::uuid> &partitions)
2394 : connection_id(connectionId), version(version), partitions(partitions) {}
2401 bool less<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
2402 const hazelcast::client::spi::DefaultObjectNamespace &lhs,
2403 const hazelcast::client::spi::DefaultObjectNamespace &rhs)
const {
2404 int result = lhs.get_service_name().compare(rhs.get_service_name());
2413 return lhs.get_object_name().compare(rhs.get_object_name()) < 0;
2417 hash<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
2418 const hazelcast::client::spi::DefaultObjectNamespace &k)
const noexcept {
2419 return std::hash<std::string>()(k.get_service_name() + k.get_object_name());
Hazelcast cluster interface.
const std::unordered_set< member > & get_members() const
Returns an immutable set of ordered members at the moment this MembershipListener is registered.
cluster & get_cluster()
Returns the cluster of the event.
lifecycle_state get_state() const
lifecycle_state
State enum.
lifecycle_event(lifecycle_state state)
Constructor.