18#include <unordered_set>
20#include "hazelcast/client/execution_callback.h"
21#include "hazelcast/client/lifecycle_event.h"
22#include "hazelcast/client/connection/AddressProvider.h"
23#include "hazelcast/client/spi/impl/ClientInvocation.h"
24#include "hazelcast/util/Util.h"
25#include "hazelcast/client/protocol/AuthenticationStatus.h"
26#include "hazelcast/client/exception/protocol_exceptions.h"
27#include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
28#include "hazelcast/client/connection/ConnectionListener.h"
29#include "hazelcast/client/connection/Connection.h"
30#include "hazelcast/client/spi/ClientContext.h"
31#include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
32#include "hazelcast/client/serialization/serialization.h"
33#include "hazelcast/client/protocol/UsernamePasswordCredentials.h"
34#include "hazelcast/client/protocol/codec/codecs.h"
35#include "hazelcast/client/client_config.h"
36#include "hazelcast/client/socket_interceptor.h"
37#include "hazelcast/client/config/client_network_config.h"
38#include "hazelcast/client/client_properties.h"
39#include "hazelcast/client/connection/HeartbeatManager.h"
40#include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
41#include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
42#include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
43#include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
44#include "hazelcast/client/internal/socket/TcpSocket.h"
45#include "hazelcast/client/internal/socket/SSLSocket.h"
46#include "hazelcast/client/config/ssl_config.h"
47#include "hazelcast/util/IOUtil.h"
48#include "hazelcast/client/internal/socket/SocketFactory.h"
49#include "hazelcast/client/connection/wait_strategy.h"
54constexpr size_t ClientConnectionManagerImpl::EXECUTOR_CORE_POOL_SIZE;
55const endpoint_qualifier ClientConnectionManagerImpl::PUBLIC_ENDPOINT_QUALIFIER{
60constexpr int ClientConnectionManagerImpl::SQL_CONNECTION_RANDOM_ATTEMPTS;
62ClientConnectionManagerImpl::ClientConnectionManagerImpl(
63 spi::ClientContext& client,
64 std::unique_ptr<AddressProvider> address_provider)
66 , logger_(client.get_logger())
67 , connection_timeout_millis_((std::chrono::milliseconds::max)())
69 , socket_interceptor_(client.get_client_config().get_socket_interceptor())
70 , address_provider_(std::move(address_provider))
71 , connection_id_gen_(0)
72 , heartbeat_(client, *this)
73 , async_start_(client.get_client_config()
74 .get_connection_strategy_config()
76 , reconnect_mode_(client.get_client_config()
77 .get_connection_strategy_config()
78 .get_reconnect_mode())
79 , smart_routing_enabled_(
80 client.get_client_config().get_network_config().is_smart_routing())
81 , client_uuid_(client.random_uuid())
82 , authentication_timeout_(
83 boost::chrono::milliseconds(heartbeat_.get_heartbeat_timeout().count()))
84 , load_balancer_(client.get_client_config().get_load_balancer())
85 , wait_strategy_(client.get_client_config()
86 .get_connection_strategy_config()
89 , cluster_id_(boost::uuids::nil_uuid())
90 , client_state_(client_state::INITIAL)
91 , connect_to_cluster_task_submitted_(false)
92 , established_initial_cluster_connection(false)
93 , use_public_address_(
94 address_provider_->is_default_provider() &&
95 client.get_client_config().get_network_config().use_public_address())
97 config::client_network_config& networkConfig =
98 client.get_client_config().get_network_config();
99 auto connTimeout = networkConfig.get_connection_timeout();
100 if (connTimeout.count() > 0) {
101 connection_timeout_millis_ = std::chrono::milliseconds(connTimeout);
104 client_properties& clientProperties = client.get_client_properties();
105 shuffle_member_list_ =
106 clientProperties.get_boolean(clientProperties.get_shuffle_member_list());
110ClientConnectionManagerImpl::start()
112 bool expected =
false;
113 if (!alive_.compare_exchange_strong(expected,
true)) {
117 io_context_.reset(
new boost::asio::io_context);
119 new boost::asio::ip::tcp::resolver(io_context_->get_executor()));
120 socket_factory_.reset(
new internal::socket::SocketFactory(
121 client_, *io_context_, *io_resolver_));
122 io_guard_.reset(
new boost::asio::io_context::work(*io_context_));
124 if (!socket_factory_->start()) {
128 socket_interceptor_ = client_.get_client_config().get_socket_interceptor();
130 io_thread_ = std::thread([=]() { io_context_->run(); });
133 new hazelcast::util::hz_thread_pool(EXECUTOR_CORE_POOL_SIZE));
134 connect_to_members_timer_ =
135 boost::asio::steady_timer(executor_->get_executor());
139 connect_to_cluster();
140 if (smart_routing_enabled_) {
141 schedule_connect_to_all_members();
144 load_balancer_.init_(client_.get_cluster());
150ClientConnectionManagerImpl::schedule_connect_to_all_members()
152 if (!client_.get_lifecycle_service().is_running()) {
156 connect_to_members_timer_->expires_from_now(
157 boost::asio::chrono::seconds(1));
158 connect_to_members_timer_->async_wait([=](boost::system::error_code ec) {
159 if (ec == boost::asio::error::operation_aborted) {
162 connect_to_all_members();
164 schedule_connect_to_all_members();
169ClientConnectionManagerImpl::shutdown()
171 bool expected =
true;
172 if (!alive_.compare_exchange_strong(expected,
false)) {
176 if (connect_to_members_timer_) {
177 connect_to_members_timer_->cancel();
180 heartbeat_.shutdown();
183 for (
auto& connection : active_connections_.values()) {
185 util::IOUtil::close_resource(connection.get(),
186 "Hazelcast client is shutting down");
189 spi::impl::ClientExecutionServiceImpl::shutdown_thread_pool(
196 connection_listeners_.clear();
197 active_connections_.clear();
198 active_connection_ids_.clear();
201std::shared_ptr<Connection>
202ClientConnectionManagerImpl::get_or_connect(
const member& m)
204 const auto& uuid = m.get_uuid();
205 auto connection = active_connections_.get(uuid);
210 address addr = translate(m);
211 return connect(addr);
214std::vector<std::shared_ptr<Connection>>
215ClientConnectionManagerImpl::get_active_connections()
217 return active_connections_.values();
220std::shared_ptr<Connection>
221ClientConnectionManagerImpl::get_connection(boost::uuids::uuid uuid)
223 return active_connections_.get(uuid);
226ClientConnectionManagerImpl::auth_response
227ClientConnectionManagerImpl::authenticate_on_cluster(
228 std::shared_ptr<Connection>& connection)
231 encode_authentication_request(client_.get_serialization_service());
232 auto clientInvocation =
233 spi::impl::ClientInvocation::create(client_, request,
"", connection);
234 auto f = clientInvocation->invoke_urgent();
236 struct auth_response result;
238 if (f.wait_for(authentication_timeout_) !=
239 boost::future_status::ready) {
240 BOOST_THROW_EXCEPTION(exception::timeout(
241 "ClientConnectionManagerImpl::authenticate",
242 (boost::format(
"Authentication response is "
243 "not received for %1% msecs for %2%") %
244 authentication_timeout_.count() % *clientInvocation)
247 auto response = f.get();
248 auto* initial_frame =
249 reinterpret_cast<protocol::ClientMessage::frame_header_type*
>(
250 response.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN));
251 result = { response.get<
byte>(),
252 response.get<boost::uuids::uuid>(),
253 response.get<
byte>(),
254 response.get<int32_t>(),
255 response.get<boost::uuids::uuid>() };
257 response.rd_ptr(
static_cast<int32_t
>(initial_frame->frame_len) -
258 protocol::ClientMessage::RESPONSE_HEADER_LEN -
259 2 * protocol::ClientMessage::UINT8_SIZE -
260 2 * (
sizeof(boost::uuids::uuid) +
261 protocol::ClientMessage::UINT8_SIZE) -
262 protocol::ClientMessage::INT32_SIZE);
264 result.server_address = response.get_nullable<address>();
265 result.server_version = response.get<std::string>();
266 }
catch (exception::iexception&) {
267 connection->close(
"Failed to authenticate connection",
268 std::current_exception());
272 auto authentication_status = (protocol::authentication_status)result.status;
273 switch (authentication_status) {
274 case protocol::AUTHENTICATED: {
277 case protocol::CREDENTIALS_FAILED: {
278 auto e = std::make_exception_ptr(exception::authentication(
279 "ClientConnectionManagerImpl::authenticate_on_cluster",
280 "Authentication failed. The configured cluster name on the "
281 "client (see client_config::set_cluster_name()) does not match "
282 "the one configured in the cluster or the credentials set in the "
283 "Client security config could not be authenticated"));
284 connection->close(
"Failed to authenticate connection", e);
285 std::rethrow_exception(e);
287 case protocol::NOT_ALLOWED_IN_CLUSTER: {
288 auto e = std::make_exception_ptr(exception::authentication(
289 "ClientConnectionManagerImpl::authenticate_on_cluster",
290 "Client is not allowed in the cluster"));
291 connection->close(
"Failed to authenticate connection", e);
292 std::rethrow_exception(e);
295 auto e = std::make_exception_ptr(exception::authentication(
296 "ClientConnectionManagerImpl::authenticate_on_cluster",
298 "Authentication status code not supported. status: %1%") %
299 authentication_status)
301 connection->close(
"Failed to authenticate connection", e);
302 std::rethrow_exception(e);
308operator<<(std::ostream& os, ClientConnectionManagerImpl::client_state state)
310 using client_state = ClientConnectionManagerImpl::client_state;
313 case client_state::INITIAL:
314 return os <<
"INITIAL";
315 case client_state::CONNECTED_TO_CLUSTER:
316 return os <<
"CONNECTED_TO_CLUSTER";
317 case client_state::INITIALIZED_ON_CLUSTER:
318 return os <<
"INITIALIZED_ON_CLUSTER";
319 case client_state::DISCONNECTED_FROM_CLUSTER:
320 return os <<
"DISCONNECTED_FROM_CLUSTER";
326protocol::ClientMessage
327ClientConnectionManagerImpl::encode_authentication_request(
328 serialization::pimpl::SerializationService& ss)
330 byte serializationVersion = ss.get_version();
331 client_config& clientConfig = client_.get_client_config();
332 auto cluster_name = clientConfig.get_cluster_name();
334 auto credential = clientConfig.get_credentials();
336 return protocol::codec::client_authentication_encode(
341 protocol::ClientTypes::CPP,
342 serializationVersion,
348 switch (credential->type()) {
349 case security::credentials::credential_type::username_password: {
351 std::static_pointer_cast<security::username_password_credentials>(
353 return protocol::codec::client_authentication_encode(
358 protocol::ClientTypes::CPP,
359 serializationVersion,
364 case security::credentials::credential_type::token: {
366 std::static_pointer_cast<security::token_credentials>(credential);
367 return protocol::codec::client_authenticationcustom_encode(
371 protocol::ClientTypes::CPP,
372 serializationVersion,
379 return protocol::ClientMessage();
383ClientConnectionManagerImpl::fire_connection_added_event(
384 const std::shared_ptr<Connection>& connection)
386 for (
const std::shared_ptr<ConnectionListener>& connectionListener :
387 connection_listeners_.to_array()) {
388 connectionListener->connection_added(connection);
393ClientConnectionManagerImpl::fire_connection_removed_event(
394 const std::shared_ptr<Connection>& connection)
396 for (
const auto& listener : connection_listeners_.to_array()) {
397 listener->connection_removed(connection);
402ClientConnectionManagerImpl::shutdown_with_external_thread(
403 std::weak_ptr<client::impl::hazelcast_client_instance_impl> client_impl)
406 std::shared_ptr<client::impl::hazelcast_client_instance_impl>
407 clientInstance = client_impl.lock();
408 if (!clientInstance ||
409 !clientInstance->get_lifecycle_service().is_running()) {
414 clientInstance->get_lifecycle_service().shutdown();
415 }
catch (exception::iexception& e) {
416 HZ_LOG(*clientInstance->get_logger(),
418 boost::str(boost::format(
"Exception during client shutdown "
419 "%1%.clientShutdown-:%2%") %
420 clientInstance->get_name() % e));
426ClientConnectionManagerImpl::submit_connect_to_cluster_task()
428 bool expected =
false;
429 if (!connect_to_cluster_task_submitted_.compare_exchange_strong(expected,
434 std::weak_ptr<client::impl::hazelcast_client_instance_impl> c =
435 client_.get_hazelcast_client_implementation();
436 boost::asio::post(executor_->get_executor(), [=]() {
438 do_connect_to_cluster();
440 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
441 connect_to_cluster_task_submitted_ = false;
442 if (active_connections_.empty()) {
446 boost::str(boost::format(
"No connection to cluster: %1%") %
449 submit_connect_to_cluster_task();
452 } catch (std::exception& e) {
455 boost::str(boost::format(
"Could not connect to any cluster, "
456 "shutting down the client: %1%") %
459 shutdown_with_external_thread(c);
465ClientConnectionManagerImpl::connect_to_all_members()
467 if (!client_.get_lifecycle_service().is_running() ||
468 active_connections_.empty()) {
473 client_.get_client_cluster_service().get_member_list()) {
476 std::lock_guard<std::recursive_mutex> guard{ client_state_mutex_ };
478 if (client_state_ == client_state::DISCONNECTED_FROM_CLUSTER) {
488 if (client_.get_lifecycle_service().is_running() &&
489 !get_connection(m.get_uuid()) &&
490 connecting_members_.get_or_put_if_absent(m,
nullptr).second) {
493 member member_to_connect = m;
495 executor_->get_executor(), [member_to_connect,
this]() {
497 if (!client_.get_lifecycle_service().is_running()) {
500 if (!get_connection(member_to_connect.get_uuid())) {
501 get_or_connect(member_to_connect);
503 connecting_members_.remove(member_to_connect);
504 } catch (std::exception&) {
505 connecting_members_.remove(member_to_connect);
513ClientConnectionManagerImpl::do_connect_to_cluster()
515 std::unordered_set<address> tried_addresses;
516 wait_strategy_.reset();
519 std::unordered_set<address> tried_addresses_per_attempt;
521 client_.get_client_cluster_service().get_member_list();
522 if (shuffle_member_list_) {
523 shuffle(member_list);
527 for (
const auto& m : member_list) {
528 check_client_active();
529 tried_addresses_per_attempt.insert(m.get_address());
530 auto connection = try_connect(m);
537 for (
const address& server_address : get_possible_member_addresses()) {
538 check_client_active();
539 if (!tried_addresses_per_attempt.insert(server_address).second) {
544 auto connection = try_connect<address>(server_address);
549 tried_addresses.insert(tried_addresses_per_attempt.begin(),
550 tried_addresses_per_attempt.end());
554 check_client_active();
555 }
while (wait_strategy_.sleep());
557 std::ostringstream out;
558 out <<
"Unable to connect to any address! The following addresses were "
560 for (
const auto& address : tried_addresses) {
561 out << address <<
" , ";
564 BOOST_THROW_EXCEPTION(exception::illegal_state(
565 "ConnectionManager::do_connect_to_cluster", out.str()));
569ClientConnectionManagerImpl::get_possible_member_addresses()
571 std::vector<address> addresses;
573 client_.get_client_cluster_service().get_member_list()) {
574 addresses.emplace_back(std::move(member.get_address()));
577 if (shuffle_member_list_) {
581 std::vector<address> provided_addresses =
582 address_provider_->load_addresses();
584 if (shuffle_member_list_) {
585 shuffle(provided_addresses);
589 addresses.end(), provided_addresses.begin(), provided_addresses.end());
595ClientConnectionManagerImpl::connect_to_cluster()
598 submit_connect_to_cluster_task();
600 do_connect_to_cluster();
605ClientConnectionManagerImpl::is_alive()
611ClientConnectionManagerImpl::on_connection_close(
612 const std::shared_ptr<Connection>& connection)
614 auto endpoint = connection->get_remote_address();
615 auto member_uuid = connection->get_remote_uuid();
617 auto socket_remote_address = connection->get_socket().get_remote_endpoint();
622 boost::str(boost::format(
623 "Destroying %1% , but it has end-point set to null "
624 "-> not removing it from a connection map") %
630 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
632 if (active_connections_.remove(member_uuid, connection)) {
633 active_connection_ids_.remove(connection->get_connection_id());
639 "Removed connection to endpoint: %1%, connection: %2%") %
640 *endpoint % *connection));
642 if (active_connections_.empty()) {
643 if (client_state_ == client_state::INITIALIZED_ON_CLUSTER) {
644 fire_life_cycle_event(
645 lifecycle_event::lifecycle_state::CLIENT_DISCONNECTED);
648 client_state_ = client_state::DISCONNECTED_FROM_CLUSTER;
649 trigger_cluster_reconnection();
652 fire_connection_removed_event(connection);
657 boost::str(boost::format(
658 "Destroying a connection, but there is no mapping "
659 "%1% -> %2% in the connection map.") %
660 endpoint % *connection));
666ClientConnectionManagerImpl::add_connection_listener(
667 const std::shared_ptr<ConnectionListener>& connection_listener)
669 connection_listeners_.add(connection_listener);
672ClientConnectionManagerImpl::~ClientConnectionManagerImpl()
678ClientConnectionManagerImpl::check_client_active()
680 if (!client_.get_lifecycle_service().is_running()) {
681 BOOST_THROW_EXCEPTION(exception::hazelcast_client_not_active(
682 "ClientConnectionManagerImpl::check_client_active",
683 "Client is shutdown"));
688ClientConnectionManagerImpl::initialize_client_on_cluster(
689 boost::uuids::uuid target_cluster_id)
691 if (!client_.get_lifecycle_service().is_running()) {
697 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
699 if (target_cluster_id != cluster_id_) {
701 hazelcast::logger::level::warning,
702 (boost::format(
"Won't send client state to cluster: %1%"
703 " Because switched to a new cluster: %2%") %
704 target_cluster_id % cluster_id_)
711 client_.get_hazelcast_client_implementation()->send_state_to_cluster();
714 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
716 if (target_cluster_id == cluster_id_) {
719 (boost::format(
"Client state is sent to cluster: %1%") %
723 client_state_ = client_state::INITIALIZED_ON_CLUSTER;
724 fire_life_cycle_event(lifecycle_event::CLIENT_CONNECTED);
725 }
else if (logger_.enabled(hazelcast::logger::level::fine)) {
726 logger_.log(hazelcast::logger::level::warning,
727 (boost::format(
"Cannot set client state to %1%"
728 " because current cluster id: %2%"
729 " is different than expected cluster"
731 client_state::INITIALIZED_ON_CLUSTER %
732 cluster_id_ % target_cluster_id)
736 }
catch (
const std::exception& e) {
737 auto cluster_name = client_.get_client_config().get_cluster_name();
740 hazelcast::logger::level::warning,
741 (boost::format(
"Failure during sending state to the cluster. %1%") %
746 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
748 if (cluster_id_ == target_cluster_id) {
749 if (logger_.enabled(hazelcast::logger::level::fine)) {
751 hazelcast::logger::level::warning,
753 "Retrying sending to the cluster: %1%, name: %2%") %
754 target_cluster_id % cluster_name)
758 auto self = shared_from_this();
761 executor_->get_executor(), [self, target_cluster_id]() {
762 self->initialize_client_on_cluster(target_cluster_id);
769std::shared_ptr<Connection>
770ClientConnectionManagerImpl::on_authenticated(
771 const std::shared_ptr<Connection>& connection,
772 auth_response& response)
775 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
776 check_partition_count(response.partition_count);
777 connection->set_connected_server_version(response.server_version);
778 connection->set_remote_address(response.server_address);
779 connection->set_remote_uuid(response.member_uuid);
781 auto existing_connection =
782 active_connections_.get(response.member_uuid);
783 if (existing_connection) {
786 "Duplicate connection to same member with uuid : %1%") %
787 boost::uuids::to_string(response.member_uuid))
789 return existing_connection;
792 auto new_cluster_id = response.cluster_id;
793 boost::uuids::uuid current_cluster_id = cluster_id_;
797 boost::str(boost::format(
798 "Checking the cluster: %1%, current cluster: %2%") %
799 new_cluster_id % current_cluster_id));
801 auto cluster_id_changed = !current_cluster_id.is_nil() &&
802 !(new_cluster_id == current_cluster_id);
803 if (cluster_id_changed) {
809 "Switching from current cluster: %1% to new cluster: %2%") %
810 current_cluster_id % new_cluster_id));
811 client_.get_hazelcast_client_implementation()->on_cluster_restart();
814 auto connections_empty = active_connections_.empty();
815 active_connection_ids_.put(connection->get_connection_id(), connection);
816 active_connections_.put(response.member_uuid, connection);
817 if (connections_empty) {
821 cluster_id_ = new_cluster_id;
823 if (established_initial_cluster_connection) {
836 client_state_ = client_state::CONNECTED_TO_CLUSTER;
837 auto self = shared_from_this();
839 executor_->get_executor(), [self, new_cluster_id]() {
840 self->initialize_client_on_cluster(new_cluster_id);
843 established_initial_cluster_connection =
true;
844 client_state_ = client_state::INITIALIZED_ON_CLUSTER;
846 fire_life_cycle_event(
847 lifecycle_event::lifecycle_state::CLIENT_CONNECTED);
851 auto local_address = connection->get_local_socket_address();
858 "Authenticated with server %1%:%2%, server version: %3%, "
859 "local address: %4%. %5%") %
860 response.server_address % response.member_uuid %
861 response.server_version % *local_address % *connection));
868 "Authenticated with server %1%:%2%, server version: %3%, "
869 "no local address: (connection disconnected ?). %4%") %
870 response.server_address % response.member_uuid %
871 response.server_version % *connection));
874 fire_connection_added_event(connection);
882 if (!connection->is_alive()) {
883 on_connection_close(connection);
889 if (!client_.get_lifecycle_service().is_running()) {
890 connection->close(
"Client is shutdown");
897ClientConnectionManagerImpl::fire_life_cycle_event(
898 lifecycle_event::lifecycle_state state)
900 client_.get_lifecycle_service().fire_lifecycle_event(state);
904ClientConnectionManagerImpl::check_partition_count(int32_t new_partition_count)
906 auto& partition_service =
907 static_cast<spi::impl::ClientPartitionServiceImpl&
>(
908 client_.get_partition_service());
909 if (!partition_service.check_and_set_partition_count(new_partition_count)) {
910 BOOST_THROW_EXCEPTION(exception::client_not_allowed_in_cluster(
911 "ClientConnectionManagerImpl::check_partition_count",
913 "Client can not work with this cluster because it has a different "
915 "Expected partition count: %1%, Member partition count: %2%") %
916 partition_service.get_partition_count() % new_partition_count)
922ClientConnectionManagerImpl::trigger_cluster_reconnection()
924 if (reconnect_mode_ ==
925 config::client_connection_strategy_config::reconnect_mode::OFF) {
927 logger_, info,
"RECONNECT MODE is off. Shutting down the client.");
928 shutdown_with_external_thread(
929 client_.get_hazelcast_client_implementation());
933 if (client_.get_lifecycle_service().is_running()) {
934 submit_connect_to_cluster_task();
938std::shared_ptr<Connection>
939ClientConnectionManagerImpl::get_random_connection()
941 if (smart_routing_enabled_) {
942 auto member = load_balancer_.next_(client_.get_cluster());
946 auto connection = get_connection(member->get_uuid());
952 auto connections = active_connections_.values();
953 if (connections.empty()) {
957 return connections[0];
961ClientConnectionManagerImpl::get_client_uuid()
const
967ClientConnectionManagerImpl::check_invocation_allowed()
969 client_state state = client_state_;
970 if (state == client_state::INITIALIZED_ON_CLUSTER &&
971 active_connections_.size() > 0) {
975 if (state == client_state::INITIAL) {
977 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
978 "ClientConnectionManagerImpl::check_invocation_allowed",
979 "No connection found to cluster and async start is configured."));
981 BOOST_THROW_EXCEPTION(exception::io(
982 "No connection found to cluster since the client is starting."));
984 }
else if (reconnect_mode_ == config::client_connection_strategy_config::
985 reconnect_mode::ASYNC) {
986 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
987 "ClientConnectionManagerImpl::check_invocation_allowed",
988 "No connection found to cluster and reconnect mode is async."));
990 BOOST_THROW_EXCEPTION(
991 exception::io(
"ClientConnectionManagerImpl::check_invocation_allowed",
992 "No connection found to cluster."));
997ClientConnectionManagerImpl::client_initialized_on_cluster()
const
999 std::lock_guard<std::recursive_mutex> guard{ client_state_mutex_ };
1001 return client_state_ == client_state::INITIALIZED_ON_CLUSTER;
1005ClientConnectionManagerImpl::connect_to_all_cluster_members()
1007 if (!smart_routing_enabled_) {
1011 for (
const auto& member :
1012 client_.get_client_cluster_service().get_member_list()) {
1015 get_or_connect(member);
1016 }
catch (std::exception&) {
1023ClientConnectionManagerImpl::notify_backup(int64_t call_id)
1025 struct correlation_id
1027 int32_t connnection_id;
1033 correlation_id composed_id;
1035 c_id_union.id = call_id;
1036 auto connection_id = c_id_union.composed_id.connnection_id;
1037 auto connection = active_connection_ids_.get(connection_id);
1041 boost::asio::post(connection->get_socket().get_executor(), [=]() {
1042 auto invocation_it = connection->invocations.find(call_id);
1043 if (invocation_it != connection->invocations.end()) {
1044 invocation_it->second->notify_backup();
1049std::shared_ptr<Connection>
1050ClientConnectionManagerImpl::connect(
const address& addr)
1054 boost::str(boost::format(
"Trying to connect to %1%.") % addr));
1056 auto connection = std::make_shared<Connection>(addr,
1058 ++connection_id_gen_,
1061 connection_timeout_millis_);
1062 connection->connect();
1065 socket_interceptor_.connect_(connection->get_socket());
1067 auto result = authenticate_on_cluster(connection);
1069 return on_authenticated(connection, result);
1073ClientConnectionManagerImpl::translate(
const member& m)
1075 if (use_public_address_) {
1076 auto public_addr_it = m.address_map().find(PUBLIC_ENDPOINT_QUALIFIER);
1077 if (public_addr_it != m.address_map().end()) {
1078 return public_addr_it->second;
1080 return m.get_address();
1084 boost::optional<address> addr =
1085 address_provider_->translate(m.get_address());
1088 throw exception::hazelcast_(boost::str(
1089 boost::format(
"Address Provider could not translate %1%") % m));
1093 }
catch (
const exception::hazelcast_&) {
1095 logger::level::warning,
1096 boost::str(boost::format(
"Address Provider could not translate %1%") %
1103std::shared_ptr<connection::Connection>
1104ClientConnectionManagerImpl::connection_for_sql(
1105 std::function<boost::optional<member>()> member_of_large_same_version_group,
1106 std::function<boost::optional<member>(boost::uuids::uuid)> get_cluster_member)
1108 if (smart_routing_enabled_) {
1113 for (
int i = 0; i < SQL_CONNECTION_RANDOM_ATTEMPTS; i++) {
1114 auto member = member_of_large_same_version_group();
1118 auto connection = active_connections_.get(member->get_uuid());
1127 std::shared_ptr<connection::Connection> first_connection;
1128 for (
const auto& connection_entry : active_connections_.entry_set()) {
1129 if (!first_connection) {
1130 first_connection = connection_entry.second;
1132 const auto& member_id = connection_entry.first;
1133 auto member = get_cluster_member(member_id);
1134 if (!member || member->is_lite_member()) {
1137 return connection_entry.second;
1141 return first_connection;
1144ReadHandler::ReadHandler(Connection& connection,
size_t buffer_size)
1145 : buffer(new char[buffer_size])
1146 , byte_buffer(buffer, buffer_size)
1147 , builder_(connection)
1148 , last_read_time_(
std::chrono::steady_clock::now().time_since_epoch())
1151ReadHandler::~ReadHandler()
1157ReadHandler::handle()
1159 last_read_time_ = std::chrono::steady_clock::now().time_since_epoch();
1161 if (byte_buffer.position() == 0)
1169 while (byte_buffer.has_remaining() && builder_.on_data(byte_buffer)) {
1172 if (byte_buffer.has_remaining()) {
1173 byte_buffer.compact();
1175 byte_buffer.clear();
1179std::chrono::steady_clock::time_point
1180ReadHandler::get_last_read_time()
const
1182 return std::chrono::steady_clock::time_point{ last_read_time_ };
1186AddressProvider::is_default_provider()
1191Connection::Connection(
1192 const address& address,
1193 spi::ClientContext& client_context,
1195 internal::socket::SocketFactory& socket_factory,
1196 ClientConnectionManagerImpl& client_connection_manager,
1197 std::chrono::milliseconds& connect_timeout_in_millis)
1198 : read_handler(*this, 16 << 10)
1199 , start_time_(
std::chrono::system_clock::now())
1200 , closed_time_duration_()
1201 , client_context_(client_context)
1202 , invocation_service_(client_context.get_invocation_service())
1203 , connection_id_(connection_id)
1204 , remote_uuid_(boost::uuids::nil_uuid())
1205 , logger_(client_context.get_logger())
1207 , last_write_time_(
std::chrono::steady_clock::now().time_since_epoch())
1209 (void)client_connection_manager;
1210 socket_ = socket_factory.create(address, connect_timeout_in_millis);
1213Connection::~Connection() =
default;
1216Connection::connect()
1218 socket_->connect(shared_from_this());
1219 backup_timer_.reset(
1220 new boost::asio::steady_timer(socket_->get_executor().context()));
1221 auto backupTimeout =
1222 static_cast<spi::impl::ClientInvocationServiceImpl&
>(invocation_service_)
1223 .get_backup_timeout();
1224 auto this_connection = shared_from_this();
1225 schedule_periodic_backup_cleanup(backupTimeout, this_connection);
1229Connection::schedule_periodic_backup_cleanup(
1230 std::chrono::milliseconds backup_timeout,
1231 std::shared_ptr<Connection> this_connection)
1237 backup_timer_->expires_from_now(backup_timeout);
1238 backup_timer_->async_wait(
1239 socket_->get_executor().wrap([=](boost::system::error_code ec) {
1243 for (
const auto& it : this_connection->invocations) {
1244 it.second->detect_and_handle_backup_timeout(backup_timeout);
1247 schedule_periodic_backup_cleanup(backup_timeout, this_connection);
1258Connection::close(
const std::string& reason)
1260 close(reason,
nullptr);
1264Connection::close(
const std::string& reason, std::exception_ptr cause)
1266 bool expected =
true;
1267 if (!alive_.compare_exchange_strong(expected,
false)) {
1271 closed_time_duration_.store(
1272 std::chrono::duration_cast<std::chrono::milliseconds>(
1273 std::chrono::steady_clock::now().time_since_epoch()));
1275 if (backup_timer_) {
1276 boost::system::error_code ignored;
1277 backup_timer_->cancel(ignored);
1280 close_cause_ = cause;
1281 close_reason_ = reason;
1287 }
catch (exception::iexception& e) {
1289 client_context_.get_logger(),
1291 boost::str(boost::format(
"Exception while closing connection %1%") %
1295 auto thisConnection = shared_from_this();
1296 client_context_.get_connection_manager().on_connection_close(
1299 boost::asio::post(socket_->get_executor(), [=]() {
1300 for (auto& invocationEntry : thisConnection->invocations) {
1301 invocationEntry.second->notify_exception(std::make_exception_ptr(
1302 boost::enable_current_exception(exception::target_disconnected(
1303 "Connection::close", thisConnection->get_close_reason()))));
1310 const std::shared_ptr<spi::impl::ClientInvocation>& client_invocation)
1312 socket_->async_write(shared_from_this(), client_invocation);
1315const boost::optional<address>&
1316Connection::get_remote_address()
const
1318 return remote_address_;
1322Connection::set_remote_address(boost::optional<address> endpoint)
1324 this->remote_address_ = std::move(endpoint);
1328Connection::handle_client_message(
1329 const std::shared_ptr<protocol::ClientMessage>& message)
1331 auto correlationId = message->get_correlation_id();
1332 auto invocationIterator = invocations.find(correlationId);
1333 if (invocationIterator == invocations.end()) {
1336 boost::str(boost::format(
"No invocation for callId: %1%. "
1337 "Dropping this message: %2%") %
1338 correlationId % *message));
1341 auto invocation = invocationIterator->second;
1342 auto flags = message->get_header_flags();
1343 if (message->is_flag_set(flags,
1344 protocol::ClientMessage::BACKUP_EVENT_FLAG)) {
1345 message->rd_ptr(protocol::ClientMessage::EVENT_HEADER_LEN);
1346 correlationId = message->get<int64_t>();
1347 client_context_.get_connection_manager().notify_backup(correlationId);
1348 }
else if (message->is_flag_set(flags,
1349 protocol::ClientMessage::IS_EVENT_FLAG)) {
1350 client_context_.get_client_listener_service().handle_client_message(
1351 invocation, message);
1353 invocation_service_.handle_client_message(invocation, message);
1358Connection::get_connection_id()
const
1360 return connection_id_;
1364Connection::is_alive()
const
1370Connection::get_close_reason()
const
1372 return close_reason_;
1376Connection::log_close()
1378 std::ostringstream message;
1379 message << *
this <<
" closed. Reason: ";
1380 if (!close_reason_.empty()) {
1381 message << close_reason_;
1382 }
else if (close_cause_) {
1384 std::rethrow_exception(close_cause_);
1385 }
catch (exception::iexception& ie) {
1386 message << ie.get_source() <<
"[" + ie.get_message() <<
"]";
1389 message <<
"Socket explicitly closed";
1392 if (client_context_.get_lifecycle_service().is_running()) {
1393 if (!close_cause_) {
1394 HZ_LOG(logger_, info, message.str());
1397 std::rethrow_exception(close_cause_);
1398 }
catch (exception::iexception& ie) {
1402 boost::str(boost::format(
"%1% %2%") % message.str() % ie));
1407 logger_, finest, message.str() + [
this]() -> std::string {
1410 std::rethrow_exception(close_cause_);
1411 } catch (exception::iexception& ie) {
1421Connection::operator==(
const Connection& rhs)
const
1423 return connection_id_ == rhs.connection_id_;
1427Connection::operator!=(
const Connection& rhs)
const
1429 return !(rhs == *
this);
1433Connection::get_connected_server_version_string()
const
1435 return connected_server_version_string_;
1439Connection::set_connected_server_version(
const std::string& connected_server)
1441 Connection::connected_server_version_string_ = connected_server;
1444boost::optional<address>
1445Connection::get_local_socket_address()
const
1447 return socket_->local_socket_address();
1450std::chrono::steady_clock::time_point
1451Connection::last_read_time()
const
1453 return read_handler.get_last_read_time();
1457Connection::inner_close()
1463 auto thisConnection = shared_from_this();
1464 boost::asio::post(socket_->get_executor(),
1465 [=]() { thisConnection->socket_->close(); });
1469operator<<(std::ostream& os,
const Connection& connection)
1472 <<
"alive=" << connection.is_alive()
1473 <<
", connection id=" << connection.get_connection_id()
1474 <<
", remote endpoint=";
1475 if (connection.get_remote_address()) {
1476 os << *connection.get_remote_address();
1480 os <<
", last_read_time="
1481 << util::StringUtil::time_to_string(connection.last_read_time())
1482 <<
", last_write_time="
1483 << util::StringUtil::time_to_string(connection.last_write_time())
1485 << util::StringUtil::time_to_string(
1486 std::chrono::steady_clock::time_point(
1487 std::chrono::duration_cast<std::chrono::steady_clock::duration>(
1488 connection.closed_time_duration_.load())))
1489 <<
", connected server version="
1490 << connection.connected_server_version_string_ <<
'}';
1496Connection::operator<(
const Connection& rhs)
const
1498 return connection_id_ < rhs.connection_id_;
1501std::chrono::system_clock::time_point
1502Connection::get_start_time()
const
1508Connection::get_socket()
1514Connection::deregister_invocation(int64_t call_id)
1516 invocations.erase(call_id);
1520Connection::get_remote_uuid()
const
1522 return remote_uuid_;
1526Connection::set_remote_uuid(boost::uuids::uuid remote_uuid)
1528 remote_uuid_ = remote_uuid;
1532Connection::last_write_time(std::chrono::steady_clock::time_point tp)
1534 last_write_time_ = tp.time_since_epoch();
1537std::chrono::steady_clock::time_point
1538Connection::last_write_time()
const
1540 return std::chrono::steady_clock::time_point{ last_write_time_ };
1543HeartbeatManager::HeartbeatManager(
1544 spi::ClientContext& client,
1545 ClientConnectionManagerImpl& connection_manager)
1547 , client_connection_manager_(connection_manager)
1548 , logger_(client.get_logger())
1550 client_properties& clientProperties = client.get_client_properties();
1551 auto timeout_millis =
1552 clientProperties.get_long(clientProperties.get_heartbeat_timeout());
1553 heartbeat_timeout_ = std::chrono::milliseconds(
1556 : util::IOUtil::to_value<int64_t>(
1557 client_properties::PROP_HEARTBEAT_TIMEOUT_DEFAULT));
1559 auto interval_millis =
1560 clientProperties.get_long(clientProperties.get_heartbeat_interval());
1561 heartbeat_interval_ = std::chrono::milliseconds(
1564 : util::IOUtil::to_value<int64_t>(
1565 client_properties::PROP_HEARTBEAT_INTERVAL_DEFAULT));
1569HeartbeatManager::start()
1571 spi::impl::ClientExecutionServiceImpl& clientExecutionService =
1572 client_.get_client_execution_service();
1574 timer_ = clientExecutionService.schedule_with_repetition(
1576 if (!client_connection_manager_.is_alive()) {
1580 for (
auto& connection :
1581 client_connection_manager_.get_active_connections()) {
1582 check_connection(connection);
1585 heartbeat_interval_,
1586 heartbeat_interval_);
1590HeartbeatManager::check_connection(
1591 const std::shared_ptr<Connection>& connection)
1593 if (!connection->is_alive()) {
1597 auto now = std::chrono::steady_clock::now();
1598 if (now - connection->last_read_time() > heartbeat_timeout_) {
1602 boost::format(
"Heartbeat failed over the connection: %1%") %
1604 on_heartbeat_stopped(connection,
"Heartbeat timed out");
1608 if (now - connection->last_write_time() > heartbeat_interval_) {
1609 auto request = protocol::codec::client_ping_encode();
1610 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
1611 spi::impl::ClientInvocation::create(client_, request,
"", connection);
1612 clientInvocation->invoke_urgent();
1617HeartbeatManager::on_heartbeat_stopped(
1618 const std::shared_ptr<Connection>& connection,
1619 const std::string& reason)
1623 std::make_exception_ptr(
1624 (exception::exception_builder<exception::target_disconnected>(
1625 "HeartbeatManager::onHeartbeatStopped")
1626 <<
"Heartbeat timed out to connection " << *connection)
1631HeartbeatManager::shutdown()
1634 boost::system::error_code ignored;
1635 timer_->cancel(ignored);
1639std::chrono::milliseconds
1640HeartbeatManager::get_heartbeat_timeout()
const
1642 return heartbeat_timeout_;
1646wait_strategy::reset()
1649 cluster_connect_attempt_begin_ = std::chrono::steady_clock::now();
1650 current_backoff_millis_ =
1651 (std::min)(max_backoff_millis_, initial_backoff_millis_);
1654wait_strategy::wait_strategy(
1655 const config::connection_retry_config& retry_config,
1657 : initial_backoff_millis_(retry_config.get_initial_backoff_duration())
1658 , max_backoff_millis_(retry_config.get_max_backoff_duration())
1659 , multiplier_(retry_config.get_multiplier())
1660 , jitter_(retry_config.get_jitter())
1662 , cluster_connect_timeout_millis_(retry_config.get_cluster_connect_timeout())
1664 if (cluster_connect_timeout_millis_ == std::chrono::milliseconds::max()) {
1665 cluster_connect_timeout_text_ =
"INFINITE";
1667 cluster_connect_timeout_text_ =
1668 (boost::format(
"%1% msecs") % cluster_connect_timeout_millis_.count())
1674wait_strategy::sleep()
1677 using namespace std::chrono;
1678 auto current_time = steady_clock::now();
1679 auto time_passed = duration_cast<milliseconds>(
1680 current_time - cluster_connect_attempt_begin_);
1681 if (time_passed > cluster_connect_timeout_millis_) {
1685 (boost::format(
"Unable to get live cluster connection, cluster "
1686 "connect timeout (%1%) is reached. Attempt %2%.") %
1687 cluster_connect_timeout_text_ % attempt_)
1693 auto actual_sleep_time =
1694 current_backoff_millis_ + milliseconds(
static_cast<milliseconds::rep
>(
1695 current_backoff_millis_.count() * jitter_ *
1696 (2.0 * random_(random_generator_) - 1.0)));
1698 actual_sleep_time = (std::min)(
1699 actual_sleep_time, cluster_connect_timeout_millis_ - time_passed);
1705 "Unable to get live cluster connection, retry in %1% ms, attempt: %2% "
1706 ", cluster connect timeout: %3% , max backoff millis: %4%") %
1707 actual_sleep_time.count() % attempt_ % cluster_connect_timeout_text_ %
1708 max_backoff_millis_.count())
1711 std::this_thread::sleep_for(actual_sleep_time);
1713 current_backoff_millis_ =
1714 (std::min)(milliseconds(
static_cast<milliseconds::rep
>(
1715 current_backoff_millis_.count() * multiplier_)),
1716 max_backoff_millis_);
1723SocketFactory::SocketFactory(spi::ClientContext& client_context,
1724 boost::asio::io_context& io,
1725 boost::asio::ip::tcp::resolver& resolver)
1726 : client_context_(client_context)
1728 , io_resolver_(resolver)
1732SocketFactory::start()
1734#ifdef HZ_BUILD_WITH_SSL
1736 client_context_.get_client_config().get_network_config().get_ssl_config();
1737 if (sslConfig.is_enabled()) {
1738 if (sslConfig.ssl_context_) {
1739 ssl_context_ = sslConfig.ssl_context_;
1741 ssl_context_ = std::make_shared<boost::asio::ssl::context>(
1742 (boost::asio::ssl::context_base::method)sslConfig.get_protocol());
1744 ssl_context_->set_verify_mode(boost::asio::ssl::verify_peer);
1745 ssl_context_->set_default_verify_paths();
1747 const std::vector<std::string>& verifyFiles =
1748 sslConfig.get_verify_files();
1749 bool success =
true;
1750 logger& lg = client_context_.get_logger();
1751 for (
const auto& f : verifyFiles) {
1752 boost::system::error_code ec;
1753 ssl_context_->load_verify_file(f, ec);
1759 boost::format(
"SocketFactory::start: Failed to load CA "
1760 "verify file at %1% %2%") %
1767 ssl_context_.reset();
1770 "SocketFactory::start: Failed to load one or more "
1771 "configured CA verify files (PEM files). Please "
1772 "correct the files and retry.");
1778 const std::string& cipherList = sslConfig.get_cipher_list();
1779 if (!cipherList.empty()) {
1780 if (!SSL_CTX_set_cipher_list(ssl_context_->native_handle(),
1781 cipherList.c_str())) {
1782 logger& lg = client_context_.get_logger();
1785 std::string(
"SocketFactory::start: Could not load any "
1786 "of the ciphers in the config provided "
1794 (void)client_context_;
1799std::unique_ptr<hazelcast::client::socket>
1800SocketFactory::create(
const address& address,
1801 std::chrono::milliseconds& connect_timeout_in_millis)
1803#ifdef HZ_BUILD_WITH_SSL
1804 if (ssl_context_.get()) {
1805 return std::unique_ptr<hazelcast::client::socket>(
1806 new internal::socket::SSLSocket(io_,
1809 client_context_.get_client_config()
1810 .get_network_config()
1811 .get_socket_options(),
1812 connect_timeout_in_millis,
1817 return std::unique_ptr<hazelcast::client::socket>(
1818 new internal::socket::TcpSocket(io_,
1820 client_context_.get_client_config()
1821 .get_network_config()
1822 .get_socket_options(),
1823 connect_timeout_in_millis,
1827#ifdef HZ_BUILD_WITH_SSL
1829SSLSocket::SSLSocket(boost::asio::io_context& io_service,
1830 boost::asio::ssl::context& ssl_context,
1831 const client::address& address,
1832 client::config::socket_options& socket_options,
1833 std::chrono::milliseconds& connect_timeout_in_millis,
1834 boost::asio::ip::tcp::resolver& resolver)
1835 : BaseSocket<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>(
1840 connect_timeout_in_millis,
1844std::vector<SSLSocket::CipherInfo>
1845SSLSocket::get_ciphers()
1847 STACK_OF(SSL_CIPHER)* ciphers = SSL_get_ciphers(socket_.native_handle());
1848 std::vector<CipherInfo> supportedCiphers;
1849 for (
int i = 0; i < sk_SSL_CIPHER_num(ciphers); ++i) {
1850 struct SSLSocket::CipherInfo info;
1851 const SSL_CIPHER* cipher = sk_SSL_CIPHER_value(ciphers, i);
1852 info.name = SSL_CIPHER_get_name(cipher);
1853 info.number_of_bits = SSL_CIPHER_get_bits(cipher, 0);
1854 info.version = SSL_CIPHER_get_version(cipher);
1856 info.description = SSL_CIPHER_description(cipher, descBuf, 256);
1857 supportedCiphers.push_back(info);
1859 return supportedCiphers;
1863SSLSocket::post_connect()
1865 socket_.handshake(boost::asio::ssl::stream_base::client);
1869operator<<(std::ostream& out,
const SSLSocket::CipherInfo& info)
1873 << info.name <<
", Bits:" << info.number_of_bits
1874 <<
", Version:" << info.version <<
", Description:" << info.description
1882TcpSocket::TcpSocket(boost::asio::io_context& io,
1883 const address& address,
1884 client::config::socket_options& socket_options,
1885 std::chrono::milliseconds& connect_timeout_in_millis,
1886 boost::asio::ip::tcp::resolver& resolver)
1887 : BaseSocket<boost::asio::ip::tcp::socket>(resolver,
1891 connect_timeout_in_millis)
1901hash<std::shared_ptr<hazelcast::client::connection::Connection>>::operator()(
1902 const std::shared_ptr<hazelcast::client::connection::Connection>& conn)
1908 return std::abs(conn->get_connection_id());