18#include <unordered_set>
20#include "hazelcast/client/execution_callback.h"
21#include "hazelcast/client/lifecycle_event.h"
22#include "hazelcast/client/connection/AddressProvider.h"
23#include "hazelcast/client/spi/impl/ClientInvocation.h"
24#include "hazelcast/util/Util.h"
25#include "hazelcast/client/protocol/AuthenticationStatus.h"
26#include "hazelcast/client/exception/protocol_exceptions.h"
27#include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
28#include "hazelcast/client/connection/ConnectionListener.h"
29#include "hazelcast/client/connection/Connection.h"
30#include "hazelcast/client/spi/ClientContext.h"
31#include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
32#include "hazelcast/client/serialization/serialization.h"
33#include "hazelcast/client/protocol/UsernamePasswordCredentials.h"
34#include "hazelcast/client/protocol/codec/codecs.h"
35#include "hazelcast/client/client_config.h"
36#include "hazelcast/client/socket_interceptor.h"
37#include "hazelcast/client/config/client_network_config.h"
38#include "hazelcast/client/client_properties.h"
39#include "hazelcast/client/connection/HeartbeatManager.h"
40#include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
41#include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
42#include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
43#include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
44#include "hazelcast/client/internal/socket/TcpSocket.h"
45#include "hazelcast/client/internal/socket/SSLSocket.h"
46#include "hazelcast/client/config/ssl_config.h"
47#include "hazelcast/util/IOUtil.h"
48#include "hazelcast/client/internal/socket/SocketFactory.h"
49#include "hazelcast/client/connection/wait_strategy.h"
54constexpr size_t ClientConnectionManagerImpl::EXECUTOR_CORE_POOL_SIZE;
55const endpoint_qualifier ClientConnectionManagerImpl::PUBLIC_ENDPOINT_QUALIFIER{
60constexpr int ClientConnectionManagerImpl::SQL_CONNECTION_RANDOM_ATTEMPTS;
62ClientConnectionManagerImpl::ClientConnectionManagerImpl(
63 spi::ClientContext& client,
64 std::unique_ptr<AddressProvider> address_provider)
66 , logger_(client.get_logger())
67 , connection_timeout_millis_((std::chrono::milliseconds::max)())
69 , socket_interceptor_(client.get_client_config().get_socket_interceptor())
70 , address_provider_(std::move(address_provider))
71 , connection_id_gen_(0)
72 , heartbeat_(client, *this)
73 , async_start_(client.get_client_config()
74 .get_connection_strategy_config()
76 , reconnect_mode_(client.get_client_config()
77 .get_connection_strategy_config()
78 .get_reconnect_mode())
79 , smart_routing_enabled_(
80 client.get_client_config().get_network_config().is_smart_routing())
81 , client_uuid_(client.random_uuid())
82 , authentication_timeout_(
83 boost::chrono::milliseconds(heartbeat_.get_heartbeat_timeout().count()))
84 , load_balancer_(client.get_client_config().get_load_balancer())
85 , wait_strategy_(client.get_client_config()
86 .get_connection_strategy_config()
89 , cluster_id_(boost::uuids::nil_uuid())
90 , client_state_(client_state::INITIAL)
91 , connect_to_cluster_task_submitted_(false)
92 , established_initial_cluster_connection(false)
93 , use_public_address_(
94 address_provider_->is_default_provider() &&
95 client.get_client_config().get_network_config().use_public_address())
97 config::client_network_config& networkConfig =
98 client.get_client_config().get_network_config();
99 auto connTimeout = networkConfig.get_connection_timeout();
100 if (connTimeout.count() > 0) {
101 connection_timeout_millis_ = std::chrono::milliseconds(connTimeout);
104 client_properties& clientProperties = client.get_client_properties();
105 shuffle_member_list_ =
106 clientProperties.get_boolean(clientProperties.get_shuffle_member_list());
110ClientConnectionManagerImpl::start()
112 bool expected =
false;
113 if (!alive_.compare_exchange_strong(expected,
true)) {
117 io_context_.reset(
new boost::asio::io_context);
119 new boost::asio::ip::tcp::resolver(io_context_->get_executor()));
120 socket_factory_.reset(
new internal::socket::SocketFactory(
121 client_, *io_context_, *io_resolver_));
122 auto guard = boost::asio::make_work_guard(*io_context_);
123 io_guard_ = std::unique_ptr<
124 boost::asio::executor_work_guard<boost::asio::io_context::executor_type>>(
125 new boost::asio::executor_work_guard<
126 boost::asio::io_context::executor_type>(std::move(guard)));
128 if (!socket_factory_->start()) {
132 socket_interceptor_ = client_.get_client_config().get_socket_interceptor();
134 io_thread_ = std::thread([=]() { io_context_->run(); });
137 new hazelcast::util::hz_thread_pool(EXECUTOR_CORE_POOL_SIZE));
138 connect_to_members_timer_ =
139 boost::asio::steady_timer(executor_->get_executor());
143 connect_to_cluster();
144 if (smart_routing_enabled_) {
145 schedule_connect_to_all_members();
148 load_balancer_.init_(client_.get_cluster());
154ClientConnectionManagerImpl::schedule_connect_to_all_members()
156 if (!client_.get_lifecycle_service().is_running()) {
160 connect_to_members_timer_->expires_after(
161 boost::asio::chrono::seconds(1));
162 connect_to_members_timer_->async_wait([=](boost::system::error_code ec) {
163 if (ec == boost::asio::error::operation_aborted) {
166 connect_to_all_members();
168 schedule_connect_to_all_members();
173ClientConnectionManagerImpl::shutdown()
175 bool expected =
true;
176 if (!alive_.compare_exchange_strong(expected,
false)) {
180 if (connect_to_members_timer_) {
181 connect_to_members_timer_->cancel();
184 heartbeat_.shutdown();
187 for (
auto& connection : active_connections_.values()) {
189 util::IOUtil::close_resource(connection.get(),
190 "Hazelcast client is shutting down");
193 spi::impl::ClientExecutionServiceImpl::shutdown_thread_pool(
200 connection_listeners_.clear();
201 active_connections_.clear();
202 active_connection_ids_.clear();
205std::shared_ptr<Connection>
206ClientConnectionManagerImpl::get_or_connect(
const member& m)
208 const auto& uuid = m.get_uuid();
209 auto connection = active_connections_.get(uuid);
214 address addr = translate(m);
215 return connect(addr);
218std::vector<std::shared_ptr<Connection>>
219ClientConnectionManagerImpl::get_active_connections()
221 return active_connections_.values();
224std::shared_ptr<Connection>
225ClientConnectionManagerImpl::get_connection(boost::uuids::uuid uuid)
227 return active_connections_.get(uuid);
230ClientConnectionManagerImpl::auth_response
231ClientConnectionManagerImpl::authenticate_on_cluster(
232 std::shared_ptr<Connection>& connection)
235 encode_authentication_request(client_.get_serialization_service());
236 auto clientInvocation =
237 spi::impl::ClientInvocation::create(client_, request,
"", connection);
238 auto f = clientInvocation->invoke_urgent();
240 struct auth_response result;
242 if (f.wait_for(authentication_timeout_) !=
243 boost::future_status::ready) {
244 BOOST_THROW_EXCEPTION(exception::timeout(
245 "ClientConnectionManagerImpl::authenticate",
246 (boost::format(
"Authentication response is "
247 "not received for %1% msecs for %2%") %
248 authentication_timeout_.count() % *clientInvocation)
251 auto response = f.get();
252 auto* initial_frame =
253 reinterpret_cast<protocol::ClientMessage::frame_header_type*
>(
254 response.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN));
255 result = { response.get<
byte>(),
256 response.get<boost::uuids::uuid>(),
257 response.get<
byte>(),
258 response.get<int32_t>(),
259 response.get<boost::uuids::uuid>() };
261 response.rd_ptr(
static_cast<int32_t
>(initial_frame->frame_len) -
262 protocol::ClientMessage::RESPONSE_HEADER_LEN -
263 2 * protocol::ClientMessage::UINT8_SIZE -
264 2 * (util::Bits::UUID_SIZE_IN_BYTES +
265 protocol::ClientMessage::UINT8_SIZE) -
266 protocol::ClientMessage::INT32_SIZE);
268 result.server_address = response.get_nullable<address>();
269 result.server_version = response.get<std::string>();
270 }
catch (exception::iexception&) {
271 connection->close(
"Failed to authenticate connection",
272 std::current_exception());
276 auto authentication_status = (protocol::authentication_status)result.status;
277 switch (authentication_status) {
278 case protocol::AUTHENTICATED: {
281 case protocol::CREDENTIALS_FAILED: {
282 auto e = std::make_exception_ptr(exception::authentication(
283 "ClientConnectionManagerImpl::authenticate_on_cluster",
284 "Authentication failed. The configured cluster name on the "
285 "client (see client_config::set_cluster_name()) does not match "
286 "the one configured in the cluster or the credentials set in the "
287 "Client security config could not be authenticated"));
288 connection->close(
"Failed to authenticate connection", e);
289 std::rethrow_exception(e);
291 case protocol::NOT_ALLOWED_IN_CLUSTER: {
292 auto e = std::make_exception_ptr(exception::authentication(
293 "ClientConnectionManagerImpl::authenticate_on_cluster",
294 "Client is not allowed in the cluster"));
295 connection->close(
"Failed to authenticate connection", e);
296 std::rethrow_exception(e);
299 auto e = std::make_exception_ptr(exception::authentication(
300 "ClientConnectionManagerImpl::authenticate_on_cluster",
302 "Authentication status code not supported. status: %1%") %
303 authentication_status)
305 connection->close(
"Failed to authenticate connection", e);
306 std::rethrow_exception(e);
312operator<<(std::ostream& os, ClientConnectionManagerImpl::client_state state)
314 using client_state = ClientConnectionManagerImpl::client_state;
317 case client_state::INITIAL:
318 return os <<
"INITIAL";
319 case client_state::CONNECTED_TO_CLUSTER:
320 return os <<
"CONNECTED_TO_CLUSTER";
321 case client_state::INITIALIZED_ON_CLUSTER:
322 return os <<
"INITIALIZED_ON_CLUSTER";
323 case client_state::DISCONNECTED_FROM_CLUSTER:
324 return os <<
"DISCONNECTED_FROM_CLUSTER";
330protocol::ClientMessage
331ClientConnectionManagerImpl::encode_authentication_request(
332 serialization::pimpl::SerializationService& ss)
334 byte serializationVersion = ss.get_version();
335 client_config& clientConfig = client_.get_client_config();
336 auto cluster_name = clientConfig.get_cluster_name();
338 auto credential = clientConfig.get_credentials();
340 return protocol::codec::client_authentication_encode(
345 protocol::ClientTypes::CPP,
346 serializationVersion,
354 switch (credential->type()) {
355 case security::credentials::credential_type::username_password: {
357 std::static_pointer_cast<security::username_password_credentials>(
359 return protocol::codec::client_authentication_encode(
364 protocol::ClientTypes::CPP,
365 serializationVersion,
372 case security::credentials::credential_type::token: {
374 std::static_pointer_cast<security::token_credentials>(credential);
375 return protocol::codec::client_authenticationcustom_encode(
379 protocol::ClientTypes::CPP,
380 serializationVersion,
389 return protocol::ClientMessage();
393ClientConnectionManagerImpl::fire_connection_added_event(
394 const std::shared_ptr<Connection>& connection)
396 for (
const std::shared_ptr<ConnectionListener>& connectionListener :
397 connection_listeners_.to_array()) {
398 connectionListener->connection_added(connection);
403ClientConnectionManagerImpl::fire_connection_removed_event(
404 const std::shared_ptr<Connection>& connection)
406 for (
const auto& listener : connection_listeners_.to_array()) {
407 listener->connection_removed(connection);
412ClientConnectionManagerImpl::shutdown_with_external_thread(
413 std::weak_ptr<client::impl::hazelcast_client_instance_impl> client_impl)
416 std::shared_ptr<client::impl::hazelcast_client_instance_impl>
417 clientInstance = client_impl.lock();
418 if (!clientInstance ||
419 !clientInstance->get_lifecycle_service().is_running()) {
424 clientInstance->get_lifecycle_service().shutdown();
425 }
catch (exception::iexception& e) {
426 HZ_LOG(*clientInstance->get_logger(),
428 boost::str(boost::format(
"Exception during client shutdown "
429 "%1%.clientShutdown-:%2%") %
430 clientInstance->get_name() % e));
436ClientConnectionManagerImpl::submit_connect_to_cluster_task()
438 bool expected =
false;
439 if (!connect_to_cluster_task_submitted_.compare_exchange_strong(expected,
444 std::weak_ptr<client::impl::hazelcast_client_instance_impl> c =
445 client_.get_hazelcast_client_implementation();
446 boost::asio::post(executor_->get_executor(), [=]() {
448 do_connect_to_cluster();
450 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
451 connect_to_cluster_task_submitted_ = false;
452 if (active_connections_.empty()) {
456 boost::str(boost::format(
"No connection to cluster: %1%") %
459 submit_connect_to_cluster_task();
462 } catch (std::exception& e) {
465 boost::str(boost::format(
"Could not connect to any cluster, "
466 "shutting down the client: %1%") %
469 shutdown_with_external_thread(c);
475ClientConnectionManagerImpl::connect_to_all_members()
477 if (!client_.get_lifecycle_service().is_running() ||
478 active_connections_.empty()) {
483 client_.get_client_cluster_service().get_member_list()) {
486 std::lock_guard<std::recursive_mutex> guard{ client_state_mutex_ };
488 if (client_state_ == client_state::DISCONNECTED_FROM_CLUSTER) {
498 if (client_.get_lifecycle_service().is_running() &&
499 !get_connection(m.get_uuid()) &&
500 connecting_members_.get_or_put_if_absent(m,
nullptr).second) {
503 member member_to_connect = m;
505 executor_->get_executor(), [member_to_connect,
this]() {
507 if (!client_.get_lifecycle_service().is_running()) {
510 if (!get_connection(member_to_connect.get_uuid())) {
511 get_or_connect(member_to_connect);
513 connecting_members_.remove(member_to_connect);
514 } catch (std::exception&) {
515 connecting_members_.remove(member_to_connect);
523ClientConnectionManagerImpl::do_connect_to_cluster()
525 std::unordered_set<address> tried_addresses;
526 wait_strategy_.reset();
529 std::unordered_set<address> tried_addresses_per_attempt;
531 client_.get_client_cluster_service().get_member_list();
532 if (shuffle_member_list_) {
533 shuffle(member_list);
537 for (
const auto& m : member_list) {
538 check_client_active();
539 tried_addresses_per_attempt.insert(m.get_address());
540 auto connection = try_connect(m);
547 for (
const address& server_address : get_possible_member_addresses()) {
548 check_client_active();
549 if (!tried_addresses_per_attempt.insert(server_address).second) {
554 auto connection = try_connect<address>(server_address);
559 tried_addresses.insert(tried_addresses_per_attempt.begin(),
560 tried_addresses_per_attempt.end());
564 check_client_active();
565 }
while (wait_strategy_.sleep());
567 std::ostringstream out;
568 out <<
"Unable to connect to any address! The following addresses were "
570 for (
const auto& address : tried_addresses) {
571 out << address <<
" , ";
574 BOOST_THROW_EXCEPTION(exception::illegal_state(
575 "ConnectionManager::do_connect_to_cluster", out.str()));
579ClientConnectionManagerImpl::get_possible_member_addresses()
581 std::vector<address> addresses;
583 client_.get_client_cluster_service().get_member_list()) {
584 addresses.emplace_back(std::move(member.get_address()));
587 if (shuffle_member_list_) {
591 std::vector<address> provided_addresses =
592 address_provider_->load_addresses();
594 if (shuffle_member_list_) {
595 shuffle(provided_addresses);
599 addresses.end(), provided_addresses.begin(), provided_addresses.end());
605ClientConnectionManagerImpl::connect_to_cluster()
608 submit_connect_to_cluster_task();
610 do_connect_to_cluster();
615ClientConnectionManagerImpl::is_alive()
621ClientConnectionManagerImpl::on_connection_close(
622 const std::shared_ptr<Connection>& connection)
624 auto endpoint = connection->get_remote_address();
625 auto member_uuid = connection->get_remote_uuid();
627 auto socket_remote_address = connection->get_socket().get_remote_endpoint();
632 boost::str(boost::format(
633 "Destroying %1% , but it has end-point set to null "
634 "-> not removing it from a connection map") %
640 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
642 if (active_connections_.remove(member_uuid, connection)) {
643 active_connection_ids_.remove(connection->get_connection_id());
649 "Removed connection to endpoint: %1%, connection: %2%") %
650 *endpoint % *connection));
652 if (active_connections_.empty()) {
653 if (client_state_ == client_state::INITIALIZED_ON_CLUSTER) {
654 fire_life_cycle_event(
655 lifecycle_event::lifecycle_state::CLIENT_DISCONNECTED);
658 client_state_ = client_state::DISCONNECTED_FROM_CLUSTER;
659 trigger_cluster_reconnection();
662 fire_connection_removed_event(connection);
667 boost::str(boost::format(
668 "Destroying a connection, but there is no mapping "
669 "%1% -> %2% in the connection map.") %
670 endpoint % *connection));
676ClientConnectionManagerImpl::add_connection_listener(
677 const std::shared_ptr<ConnectionListener>& connection_listener)
679 connection_listeners_.add(connection_listener);
682ClientConnectionManagerImpl::~ClientConnectionManagerImpl()
688ClientConnectionManagerImpl::check_client_active()
690 if (!client_.get_lifecycle_service().is_running()) {
691 BOOST_THROW_EXCEPTION(exception::hazelcast_client_not_active(
692 "ClientConnectionManagerImpl::check_client_active",
693 "Client is shutdown"));
698ClientConnectionManagerImpl::initialize_client_on_cluster(
699 boost::uuids::uuid target_cluster_id)
701 if (!client_.get_lifecycle_service().is_running()) {
707 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
709 if (target_cluster_id != cluster_id_) {
711 hazelcast::logger::level::warning,
712 (boost::format(
"Won't send client state to cluster: %1%"
713 " Because switched to a new cluster: %2%") %
714 target_cluster_id % cluster_id_)
721 client_.get_hazelcast_client_implementation()->send_state_to_cluster();
724 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
726 if (target_cluster_id == cluster_id_) {
729 (boost::format(
"Client state is sent to cluster: %1%") %
733 client_state_ = client_state::INITIALIZED_ON_CLUSTER;
734 fire_life_cycle_event(lifecycle_event::CLIENT_CONNECTED);
735 }
else if (logger_.enabled(hazelcast::logger::level::fine)) {
736 logger_.log(hazelcast::logger::level::warning,
737 (boost::format(
"Cannot set client state to %1%"
738 " because current cluster id: %2%"
739 " is different than expected cluster"
741 client_state::INITIALIZED_ON_CLUSTER %
742 cluster_id_ % target_cluster_id)
746 }
catch (
const std::exception& e) {
747 auto cluster_name = client_.get_client_config().get_cluster_name();
750 hazelcast::logger::level::warning,
751 (boost::format(
"Failure during sending state to the cluster. %1%") %
756 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
758 if (cluster_id_ == target_cluster_id) {
759 if (logger_.enabled(hazelcast::logger::level::fine)) {
761 hazelcast::logger::level::warning,
763 "Retrying sending to the cluster: %1%, name: %2%") %
764 target_cluster_id % cluster_name)
768 auto self = shared_from_this();
771 executor_->get_executor(), [self, target_cluster_id]() {
772 self->initialize_client_on_cluster(target_cluster_id);
779std::shared_ptr<Connection>
780ClientConnectionManagerImpl::on_authenticated(
781 const std::shared_ptr<Connection>& connection,
782 auth_response& response)
785 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
786 check_partition_count(response.partition_count);
787 connection->set_connected_server_version(response.server_version);
788 connection->set_remote_address(response.server_address);
789 connection->set_remote_uuid(response.member_uuid);
791 auto existing_connection =
792 active_connections_.get(response.member_uuid);
793 if (existing_connection) {
796 "Duplicate connection to same member with uuid : %1%") %
797 boost::uuids::to_string(response.member_uuid))
799 return existing_connection;
802 auto new_cluster_id = response.cluster_id;
803 boost::uuids::uuid current_cluster_id = cluster_id_;
807 boost::str(boost::format(
808 "Checking the cluster: %1%, current cluster: %2%") %
809 new_cluster_id % current_cluster_id));
811 auto cluster_id_changed = !current_cluster_id.is_nil() &&
812 !(new_cluster_id == current_cluster_id);
813 if (cluster_id_changed) {
819 "Switching from current cluster: %1% to new cluster: %2%") %
820 current_cluster_id % new_cluster_id));
821 client_.get_hazelcast_client_implementation()->on_cluster_restart();
824 auto connections_empty = active_connections_.empty();
825 active_connection_ids_.put(connection->get_connection_id(), connection);
826 active_connections_.put(response.member_uuid, connection);
827 if (connections_empty) {
831 cluster_id_ = new_cluster_id;
833 if (established_initial_cluster_connection) {
846 client_state_ = client_state::CONNECTED_TO_CLUSTER;
847 auto self = shared_from_this();
849 executor_->get_executor(), [self, new_cluster_id]() {
850 self->initialize_client_on_cluster(new_cluster_id);
853 established_initial_cluster_connection =
true;
854 client_state_ = client_state::INITIALIZED_ON_CLUSTER;
856 fire_life_cycle_event(
857 lifecycle_event::lifecycle_state::CLIENT_CONNECTED);
861 auto local_address = connection->get_local_socket_address();
868 "Authenticated with server %1%:%2%, server version: %3%, "
869 "local address: %4%. %5%") %
870 response.server_address % response.member_uuid %
871 response.server_version % *local_address % *connection));
878 "Authenticated with server %1%:%2%, server version: %3%, "
879 "no local address: (connection disconnected ?). %4%") %
880 response.server_address % response.member_uuid %
881 response.server_version % *connection));
884 fire_connection_added_event(connection);
892 if (!connection->is_alive()) {
893 on_connection_close(connection);
899 if (!client_.get_lifecycle_service().is_running()) {
900 connection->close(
"Client is shutdown");
907ClientConnectionManagerImpl::fire_life_cycle_event(
908 lifecycle_event::lifecycle_state state)
910 client_.get_lifecycle_service().fire_lifecycle_event(state);
914ClientConnectionManagerImpl::check_partition_count(int32_t new_partition_count)
916 auto& partition_service =
917 static_cast<spi::impl::ClientPartitionServiceImpl&
>(
918 client_.get_partition_service());
919 if (!partition_service.check_and_set_partition_count(new_partition_count)) {
920 BOOST_THROW_EXCEPTION(exception::client_not_allowed_in_cluster(
921 "ClientConnectionManagerImpl::check_partition_count",
923 "Client can not work with this cluster because it has a different "
925 "Expected partition count: %1%, Member partition count: %2%") %
926 partition_service.get_partition_count() % new_partition_count)
932ClientConnectionManagerImpl::trigger_cluster_reconnection()
934 if (reconnect_mode_ ==
935 config::client_connection_strategy_config::reconnect_mode::OFF) {
937 logger_, info,
"RECONNECT MODE is off. Shutting down the client.");
938 shutdown_with_external_thread(
939 client_.get_hazelcast_client_implementation());
943 if (client_.get_lifecycle_service().is_running()) {
944 submit_connect_to_cluster_task();
948std::shared_ptr<Connection>
949ClientConnectionManagerImpl::get_random_connection()
951 if (smart_routing_enabled_) {
952 auto member = load_balancer_.next_(client_.get_cluster());
956 auto connection = get_connection(member->get_uuid());
962 auto connections = active_connections_.values();
963 if (connections.empty()) {
967 return connections[0];
971ClientConnectionManagerImpl::get_client_uuid()
const
977ClientConnectionManagerImpl::check_invocation_allowed()
979 client_state state = client_state_;
980 if (state == client_state::INITIALIZED_ON_CLUSTER &&
981 active_connections_.size() > 0) {
985 if (state == client_state::INITIAL) {
987 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
988 "ClientConnectionManagerImpl::check_invocation_allowed",
989 "No connection found to cluster and async start is configured."));
991 BOOST_THROW_EXCEPTION(exception::io(
992 "No connection found to cluster since the client is starting."));
994 }
else if (reconnect_mode_ == config::client_connection_strategy_config::
995 reconnect_mode::ASYNC) {
996 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
997 "ClientConnectionManagerImpl::check_invocation_allowed",
998 "No connection found to cluster and reconnect mode is async."));
1000 BOOST_THROW_EXCEPTION(
1001 exception::io(
"ClientConnectionManagerImpl::check_invocation_allowed",
1002 "No connection found to cluster."));
1007ClientConnectionManagerImpl::client_initialized_on_cluster()
const
1009 std::lock_guard<std::recursive_mutex> guard{ client_state_mutex_ };
1011 return client_state_ == client_state::INITIALIZED_ON_CLUSTER;
1015ClientConnectionManagerImpl::connect_to_all_cluster_members()
1017 if (!smart_routing_enabled_) {
1021 for (
const auto& member :
1022 client_.get_client_cluster_service().get_member_list()) {
1025 get_or_connect(member);
1026 }
catch (std::exception&) {
1033ClientConnectionManagerImpl::notify_backup(int64_t call_id)
1035 struct correlation_id
1037 int32_t connnection_id;
1043 correlation_id composed_id;
1045 c_id_union.id = call_id;
1046 auto connection_id = c_id_union.composed_id.connnection_id;
1047 auto connection = active_connection_ids_.get(connection_id);
1051 boost::asio::post(connection->get_socket().get_executor(), [=]() {
1052 auto invocation_it = connection->invocations.find(call_id);
1053 if (invocation_it != connection->invocations.end()) {
1054 invocation_it->second->notify_backup();
1059std::shared_ptr<Connection>
1060ClientConnectionManagerImpl::connect(
const address& addr)
1064 boost::str(boost::format(
"Trying to connect to %1%.") % addr));
1066 auto connection = std::make_shared<Connection>(addr,
1068 ++connection_id_gen_,
1071 connection_timeout_millis_);
1072 connection->connect();
1075 socket_interceptor_.connect_(connection->get_socket());
1077 auto result = authenticate_on_cluster(connection);
1079 return on_authenticated(connection, result);
1083ClientConnectionManagerImpl::translate(
const member& m)
1085 if (use_public_address_) {
1086 auto public_addr_it = m.address_map().find(PUBLIC_ENDPOINT_QUALIFIER);
1087 if (public_addr_it != m.address_map().end()) {
1088 return public_addr_it->second;
1090 return m.get_address();
1094 boost::optional<address> addr =
1095 address_provider_->translate(m.get_address());
1098 throw exception::hazelcast_(boost::str(
1099 boost::format(
"Address Provider could not translate %1%") % m));
1103 }
catch (
const exception::hazelcast_&) {
1105 logger::level::warning,
1106 boost::str(boost::format(
"Address Provider could not translate %1%") %
1113std::shared_ptr<connection::Connection>
1114ClientConnectionManagerImpl::connection_for_sql(
1115 std::function<boost::optional<member>()> member_of_large_same_version_group,
1116 std::function<boost::optional<member>(boost::uuids::uuid)> get_cluster_member)
1118 if (smart_routing_enabled_) {
1123 for (
int i = 0; i < SQL_CONNECTION_RANDOM_ATTEMPTS; i++) {
1124 auto member = member_of_large_same_version_group();
1128 auto connection = active_connections_.get(member->get_uuid());
1137 std::shared_ptr<connection::Connection> first_connection;
1138 for (
const auto& connection_entry : active_connections_.entry_set()) {
1139 if (!first_connection) {
1140 first_connection = connection_entry.second;
1142 const auto& member_id = connection_entry.first;
1143 auto member = get_cluster_member(member_id);
1144 if (!member || member->is_lite_member()) {
1147 return connection_entry.second;
1151 return first_connection;
1154ReadHandler::ReadHandler(Connection& connection,
size_t buffer_size)
1155 : buffer(new char[buffer_size])
1156 , byte_buffer(buffer, buffer_size)
1157 , builder_(connection)
1158 , last_read_time_(
std::chrono::steady_clock::now().time_since_epoch())
1161ReadHandler::~ReadHandler()
1167ReadHandler::handle()
1169 last_read_time_ = std::chrono::steady_clock::now().time_since_epoch();
1171 if (byte_buffer.position() == 0)
1179 while (byte_buffer.has_remaining() && builder_.on_data(byte_buffer)) {
1182 if (byte_buffer.has_remaining()) {
1183 byte_buffer.compact();
1185 byte_buffer.clear();
1189std::chrono::steady_clock::time_point
1190ReadHandler::get_last_read_time()
const
1192 return std::chrono::steady_clock::time_point{ last_read_time_ };
1196AddressProvider::is_default_provider()
1201Connection::Connection(
1202 const address& address,
1203 spi::ClientContext& client_context,
1205 internal::socket::SocketFactory& socket_factory,
1206 ClientConnectionManagerImpl& client_connection_manager,
1207 std::chrono::milliseconds& connect_timeout_in_millis)
1208 : read_handler(*this, 16 << 10)
1209 , start_time_(
std::chrono::system_clock::now())
1210 , closed_time_duration_()
1211 , client_context_(client_context)
1212 , invocation_service_(client_context.get_invocation_service())
1213 , connection_id_(connection_id)
1214 , remote_uuid_(boost::uuids::nil_uuid())
1215 , logger_(client_context.get_logger())
1217 , last_write_time_(
std::chrono::steady_clock::now().time_since_epoch())
1219 (void)client_connection_manager;
1220 socket_ = socket_factory.create(address, connect_timeout_in_millis);
1223Connection::~Connection() =
default;
1226Connection::connect()
1228 socket_->connect(shared_from_this());
1229 backup_timer_.reset(
1230 new boost::asio::steady_timer(socket_->get_executor().context()));
1231 auto backupTimeout =
1232 static_cast<spi::impl::ClientInvocationServiceImpl&
>(invocation_service_)
1233 .get_backup_timeout();
1234 auto this_connection = shared_from_this();
1235 schedule_periodic_backup_cleanup(backupTimeout, this_connection);
1239Connection::schedule_periodic_backup_cleanup(
1240 std::chrono::milliseconds backup_timeout,
1241 std::shared_ptr<Connection> this_connection)
1247 backup_timer_->expires_after(backup_timeout);
1248 backup_timer_->async_wait(
1249 socket_->get_executor().wrap([=](boost::system::error_code ec) {
1253 for (
const auto& it : this_connection->invocations) {
1254 it.second->detect_and_handle_backup_timeout(backup_timeout);
1257 schedule_periodic_backup_cleanup(backup_timeout, this_connection);
1268Connection::close(
const std::string& reason)
1270 close(reason,
nullptr);
1274Connection::close(
const std::string& reason, std::exception_ptr cause)
1276 bool expected =
true;
1277 if (!alive_.compare_exchange_strong(expected,
false)) {
1281 closed_time_duration_.store(
1282 std::chrono::duration_cast<std::chrono::milliseconds>(
1283 std::chrono::steady_clock::now().time_since_epoch()));
1285 if (backup_timer_) {
1286 backup_timer_->cancel();
1289 close_cause_ = cause;
1290 close_reason_ = reason;
1296 }
catch (exception::iexception& e) {
1298 client_context_.get_logger(),
1300 boost::str(boost::format(
"Exception while closing connection %1%") %
1304 auto thisConnection = shared_from_this();
1305 client_context_.get_connection_manager().on_connection_close(
1308 boost::asio::post(socket_->get_executor(), [=]() {
1309 for (auto& invocationEntry : thisConnection->invocations) {
1310 invocationEntry.second->notify_exception(std::make_exception_ptr(
1311 boost::enable_current_exception(exception::target_disconnected(
1312 "Connection::close", thisConnection->get_close_reason()))));
1319 const std::shared_ptr<spi::impl::ClientInvocation>& client_invocation)
1321 socket_->async_write(shared_from_this(), client_invocation);
1324const boost::optional<address>&
1325Connection::get_remote_address()
const
1327 return remote_address_;
1331Connection::set_remote_address(boost::optional<address> endpoint)
1333 this->remote_address_ = std::move(endpoint);
1337Connection::handle_client_message(
1338 const std::shared_ptr<protocol::ClientMessage>& message)
1340 auto correlationId = message->get_correlation_id();
1341 auto invocationIterator = invocations.find(correlationId);
1342 if (invocationIterator == invocations.end()) {
1345 boost::str(boost::format(
"No invocation for callId: %1%. "
1346 "Dropping this message: %2%") %
1347 correlationId % *message));
1350 auto invocation = invocationIterator->second;
1351 auto flags = message->get_header_flags();
1352 if (message->is_flag_set(flags,
1353 protocol::ClientMessage::BACKUP_EVENT_FLAG)) {
1354 message->rd_ptr(protocol::ClientMessage::EVENT_HEADER_LEN);
1355 correlationId = message->get<int64_t>();
1356 client_context_.get_connection_manager().notify_backup(correlationId);
1357 }
else if (message->is_flag_set(flags,
1358 protocol::ClientMessage::IS_EVENT_FLAG)) {
1359 client_context_.get_client_listener_service().handle_client_message(
1360 invocation, message);
1362 invocation_service_.handle_client_message(invocation, message);
1367Connection::get_connection_id()
const
1369 return connection_id_;
1373Connection::is_alive()
const
1379Connection::get_close_reason()
const
1381 return close_reason_;
1385Connection::log_close()
1387 std::ostringstream message;
1388 message << *
this <<
" closed. Reason: ";
1389 if (!close_reason_.empty()) {
1390 message << close_reason_;
1391 }
else if (close_cause_) {
1393 std::rethrow_exception(close_cause_);
1394 }
catch (exception::iexception& ie) {
1395 message << ie.get_source() <<
"[" + ie.get_message() <<
"]";
1398 message <<
"Socket explicitly closed";
1401 if (client_context_.get_lifecycle_service().is_running()) {
1402 if (!close_cause_) {
1403 HZ_LOG(logger_, info, message.str());
1406 std::rethrow_exception(close_cause_);
1407 }
catch (exception::iexception& ie) {
1411 boost::str(boost::format(
"%1% %2%") % message.str() % ie));
1416 logger_, finest, message.str() + [
this]() -> std::string {
1419 std::rethrow_exception(close_cause_);
1420 } catch (exception::iexception& ie) {
1430Connection::operator==(
const Connection& rhs)
const
1432 return connection_id_ == rhs.connection_id_;
1436Connection::operator!=(
const Connection& rhs)
const
1438 return !(rhs == *
this);
1442Connection::get_connected_server_version_string()
const
1444 return connected_server_version_string_;
1448Connection::set_connected_server_version(
const std::string& connected_server)
1450 Connection::connected_server_version_string_ = connected_server;
1453boost::optional<address>
1454Connection::get_local_socket_address()
const
1456 return socket_->local_socket_address();
1459std::chrono::steady_clock::time_point
1460Connection::last_read_time()
const
1462 return read_handler.get_last_read_time();
1466Connection::inner_close()
1472 auto thisConnection = shared_from_this();
1473 boost::asio::post(socket_->get_executor(),
1474 [=]() { thisConnection->socket_->close(); });
1478operator<<(std::ostream& os,
const Connection& connection)
1481 <<
"alive=" << connection.is_alive()
1482 <<
", connection id=" << connection.get_connection_id()
1483 <<
", remote endpoint=";
1484 if (connection.get_remote_address()) {
1485 os << *connection.get_remote_address();
1489 os <<
", last_read_time="
1490 << util::StringUtil::time_to_string(connection.last_read_time())
1491 <<
", last_write_time="
1492 << util::StringUtil::time_to_string(connection.last_write_time())
1494 << util::StringUtil::time_to_string(
1495 std::chrono::steady_clock::time_point(
1496 std::chrono::duration_cast<std::chrono::steady_clock::duration>(
1497 connection.closed_time_duration_.load())))
1498 <<
", connected server version="
1499 << connection.connected_server_version_string_ <<
'}';
1505Connection::operator<(
const Connection& rhs)
const
1507 return connection_id_ < rhs.connection_id_;
1510std::chrono::system_clock::time_point
1511Connection::get_start_time()
const
1517Connection::get_socket()
1523Connection::deregister_invocation(int64_t call_id)
1525 invocations.erase(call_id);
1529Connection::get_remote_uuid()
const
1531 return remote_uuid_;
1535Connection::set_remote_uuid(boost::uuids::uuid remote_uuid)
1537 remote_uuid_ = remote_uuid;
1541Connection::last_write_time(std::chrono::steady_clock::time_point tp)
1543 last_write_time_ = tp.time_since_epoch();
1546std::chrono::steady_clock::time_point
1547Connection::last_write_time()
const
1549 return std::chrono::steady_clock::time_point{ last_write_time_ };
1552HeartbeatManager::HeartbeatManager(
1553 spi::ClientContext& client,
1554 ClientConnectionManagerImpl& connection_manager)
1556 , client_connection_manager_(connection_manager)
1557 , logger_(client.get_logger())
1559 client_properties& clientProperties = client.get_client_properties();
1560 auto timeout_millis =
1561 clientProperties.get_long(clientProperties.get_heartbeat_timeout());
1562 heartbeat_timeout_ = std::chrono::milliseconds(
1565 : util::IOUtil::to_value<int64_t>(
1566 client_properties::PROP_HEARTBEAT_TIMEOUT_DEFAULT));
1568 auto interval_millis =
1569 clientProperties.get_long(clientProperties.get_heartbeat_interval());
1570 heartbeat_interval_ = std::chrono::milliseconds(
1573 : util::IOUtil::to_value<int64_t>(
1574 client_properties::PROP_HEARTBEAT_INTERVAL_DEFAULT));
1578HeartbeatManager::start()
1580 spi::impl::ClientExecutionServiceImpl& clientExecutionService =
1581 client_.get_client_execution_service();
1583 timer_ = clientExecutionService.schedule_with_repetition(
1585 if (!client_connection_manager_.is_alive()) {
1589 for (
auto& connection :
1590 client_connection_manager_.get_active_connections()) {
1591 check_connection(connection);
1594 heartbeat_interval_,
1595 heartbeat_interval_);
1599HeartbeatManager::check_connection(
1600 const std::shared_ptr<Connection>& connection)
1602 if (!connection->is_alive()) {
1606 auto now = std::chrono::steady_clock::now();
1607 if (now - connection->last_read_time() > heartbeat_timeout_) {
1611 boost::format(
"Heartbeat failed over the connection: %1%") %
1613 on_heartbeat_stopped(connection,
"Heartbeat timed out");
1617 if (now - connection->last_write_time() > heartbeat_interval_) {
1618 auto request = protocol::codec::client_ping_encode();
1619 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
1620 spi::impl::ClientInvocation::create(client_, request,
"", connection);
1621 clientInvocation->invoke_urgent();
1626HeartbeatManager::on_heartbeat_stopped(
1627 const std::shared_ptr<Connection>& connection,
1628 const std::string& reason)
1632 std::make_exception_ptr(
1633 (exception::exception_builder<exception::target_disconnected>(
1634 "HeartbeatManager::onHeartbeatStopped")
1635 <<
"Heartbeat timed out to connection " << *connection)
1640HeartbeatManager::shutdown()
1647std::chrono::milliseconds
1648HeartbeatManager::get_heartbeat_timeout()
const
1650 return heartbeat_timeout_;
1654wait_strategy::reset()
1657 cluster_connect_attempt_begin_ = std::chrono::steady_clock::now();
1658 current_backoff_millis_ =
1659 (std::min)(max_backoff_millis_, initial_backoff_millis_);
1662wait_strategy::wait_strategy(
1663 const config::connection_retry_config& retry_config,
1665 : initial_backoff_millis_(retry_config.get_initial_backoff_duration())
1666 , max_backoff_millis_(retry_config.get_max_backoff_duration())
1667 , multiplier_(retry_config.get_multiplier())
1668 , jitter_(retry_config.get_jitter())
1670 , cluster_connect_timeout_millis_(retry_config.get_cluster_connect_timeout())
1672 if (cluster_connect_timeout_millis_ == std::chrono::milliseconds::max()) {
1673 cluster_connect_timeout_text_ =
"INFINITE";
1675 cluster_connect_timeout_text_ =
1676 (boost::format(
"%1% msecs") % cluster_connect_timeout_millis_.count())
1682wait_strategy::sleep()
1685 using namespace std::chrono;
1686 auto current_time = steady_clock::now();
1687 auto time_passed = duration_cast<milliseconds>(
1688 current_time - cluster_connect_attempt_begin_);
1689 if (time_passed > cluster_connect_timeout_millis_) {
1693 (boost::format(
"Unable to get live cluster connection, cluster "
1694 "connect timeout (%1%) is reached. Attempt %2%.") %
1695 cluster_connect_timeout_text_ % attempt_)
1701 auto actual_sleep_time =
1702 current_backoff_millis_ + milliseconds(
static_cast<milliseconds::rep
>(
1703 current_backoff_millis_.count() * jitter_ *
1704 (2.0 * random_(random_generator_) - 1.0)));
1706 actual_sleep_time = (std::min)(
1707 actual_sleep_time, cluster_connect_timeout_millis_ - time_passed);
1713 "Unable to get live cluster connection, retry in %1% ms, attempt: %2% "
1714 ", cluster connect timeout: %3% , max backoff millis: %4%") %
1715 actual_sleep_time.count() % attempt_ % cluster_connect_timeout_text_ %
1716 max_backoff_millis_.count())
1719 std::this_thread::sleep_for(actual_sleep_time);
1721 current_backoff_millis_ =
1722 (std::min)(milliseconds(
static_cast<milliseconds::rep
>(
1723 current_backoff_millis_.count() * multiplier_)),
1724 max_backoff_millis_);
1731SocketFactory::SocketFactory(spi::ClientContext& client_context,
1732 boost::asio::io_context& io,
1733 boost::asio::ip::tcp::resolver& resolver)
1734 : client_context_(client_context)
1736 , io_resolver_(resolver)
1740SocketFactory::start()
1742#ifdef HZ_BUILD_WITH_SSL
1744 client_context_.get_client_config().get_network_config().get_ssl_config();
1745 if (sslConfig.is_enabled()) {
1746 if (sslConfig.ssl_context_) {
1747 ssl_context_ = sslConfig.ssl_context_;
1749 ssl_context_ = std::make_shared<boost::asio::ssl::context>(
1750 (boost::asio::ssl::context_base::method)sslConfig.get_protocol());
1752 ssl_context_->set_verify_mode(boost::asio::ssl::verify_peer);
1753 ssl_context_->set_default_verify_paths();
1755 const std::vector<std::string>& verifyFiles =
1756 sslConfig.get_verify_files();
1757 bool success =
true;
1758 logger& lg = client_context_.get_logger();
1759 for (
const auto& f : verifyFiles) {
1760 boost::system::error_code ec;
1761 ssl_context_->load_verify_file(f, ec);
1767 boost::format(
"SocketFactory::start: Failed to load CA "
1768 "verify file at %1% %2%") %
1775 ssl_context_.reset();
1778 "SocketFactory::start: Failed to load one or more "
1779 "configured CA verify files (PEM files). Please "
1780 "correct the files and retry.");
1786 const std::string& cipherList = sslConfig.get_cipher_list();
1787 if (!cipherList.empty()) {
1788 if (!SSL_CTX_set_cipher_list(ssl_context_->native_handle(),
1789 cipherList.c_str())) {
1790 logger& lg = client_context_.get_logger();
1793 std::string(
"SocketFactory::start: Could not load any "
1794 "of the ciphers in the config provided "
1802 (void)client_context_;
1807std::unique_ptr<hazelcast::client::socket>
1808SocketFactory::create(
const address& address,
1809 std::chrono::milliseconds& connect_timeout_in_millis)
1811#ifdef HZ_BUILD_WITH_SSL
1812 if (ssl_context_.get()) {
1813 return std::unique_ptr<hazelcast::client::socket>(
1814 new internal::socket::SSLSocket(io_,
1817 client_context_.get_client_config()
1818 .get_network_config()
1819 .get_socket_options(),
1820 connect_timeout_in_millis,
1825 return std::unique_ptr<hazelcast::client::socket>(
1826 new internal::socket::TcpSocket(io_,
1828 client_context_.get_client_config()
1829 .get_network_config()
1830 .get_socket_options(),
1831 connect_timeout_in_millis,
1835#ifdef HZ_BUILD_WITH_SSL
1837SSLSocket::SSLSocket(boost::asio::io_context& io_service,
1838 boost::asio::ssl::context& ssl_context,
1839 const client::address& address,
1840 client::config::socket_options& socket_options,
1841 std::chrono::milliseconds& connect_timeout_in_millis,
1842 boost::asio::ip::tcp::resolver& resolver)
1843 : BaseSocket<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>(
1848 connect_timeout_in_millis,
1852std::vector<SSLSocket::CipherInfo>
1853SSLSocket::get_ciphers()
1855 STACK_OF(SSL_CIPHER)* ciphers = SSL_get_ciphers(socket_.native_handle());
1856 std::vector<CipherInfo> supportedCiphers;
1857 for (
int i = 0; i < sk_SSL_CIPHER_num(ciphers); ++i) {
1858 struct SSLSocket::CipherInfo info;
1859 const SSL_CIPHER* cipher = sk_SSL_CIPHER_value(ciphers, i);
1860 info.name = SSL_CIPHER_get_name(cipher);
1861 info.number_of_bits = SSL_CIPHER_get_bits(cipher, 0);
1862 info.version = SSL_CIPHER_get_version(cipher);
1864 info.description = SSL_CIPHER_description(cipher, descBuf, 256);
1865 supportedCiphers.push_back(info);
1867 return supportedCiphers;
1871SSLSocket::post_connect()
1873 socket_.handshake(boost::asio::ssl::stream_base::client);
1877operator<<(std::ostream& out,
const SSLSocket::CipherInfo& info)
1881 << info.name <<
", Bits:" << info.number_of_bits
1882 <<
", Version:" << info.version <<
", Description:" << info.description
1890TcpSocket::TcpSocket(boost::asio::io_context& io,
1891 const address& address,
1892 client::config::socket_options& socket_options,
1893 std::chrono::milliseconds& connect_timeout_in_millis,
1894 boost::asio::ip::tcp::resolver& resolver)
1895 : BaseSocket<boost::asio::ip::tcp::socket>(resolver,
1899 connect_timeout_in_millis)
1909hash<std::shared_ptr<hazelcast::client::connection::Connection>>::operator()(
1910 const std::shared_ptr<hazelcast::client::connection::Connection>& conn)
1916 return std::abs(conn->get_connection_id());