18 #include <unordered_set>
20 #include "hazelcast/client/execution_callback.h"
21 #include "hazelcast/client/lifecycle_event.h"
22 #include "hazelcast/client/connection/AddressProvider.h"
23 #include "hazelcast/client/spi/impl/ClientInvocation.h"
24 #include "hazelcast/util/Util.h"
25 #include "hazelcast/client/protocol/AuthenticationStatus.h"
26 #include "hazelcast/client/exception/protocol_exceptions.h"
27 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
28 #include "hazelcast/client/connection/ConnectionListener.h"
29 #include "hazelcast/client/connection/Connection.h"
30 #include "hazelcast/client/spi/ClientContext.h"
31 #include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
32 #include "hazelcast/client/serialization/serialization.h"
33 #include "hazelcast/client/protocol/UsernamePasswordCredentials.h"
34 #include "hazelcast/client/protocol/codec/codecs.h"
35 #include "hazelcast/client/client_config.h"
36 #include "hazelcast/client/socket_interceptor.h"
37 #include "hazelcast/client/config/client_network_config.h"
38 #include "hazelcast/client/client_properties.h"
39 #include "hazelcast/client/connection/HeartbeatManager.h"
40 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
41 #include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
42 #include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
43 #include "hazelcast/client/internal/socket/TcpSocket.h"
44 #include "hazelcast/client/internal/socket/SSLSocket.h"
45 #include "hazelcast/client/config/ssl_config.h"
46 #include "hazelcast/util/IOUtil.h"
47 #include "hazelcast/client/internal/socket/SocketFactory.h"
48 #include "hazelcast/client/connection/wait_strategy.h"
52 namespace connection {
53 constexpr
size_t ClientConnectionManagerImpl::EXECUTOR_CORE_POOL_SIZE;
54 const endpoint_qualifier ClientConnectionManagerImpl::PUBLIC_ENDPOINT_QUALIFIER{
59 constexpr
int ClientConnectionManagerImpl::SQL_CONNECTION_RANDOM_ATTEMPTS;
61 ClientConnectionManagerImpl::ClientConnectionManagerImpl(
62 spi::ClientContext& client,
63 std::unique_ptr<AddressProvider> address_provider)
65 , logger_(client.get_logger())
66 , connection_timeout_millis_((std::chrono::milliseconds::max)())
68 , socket_interceptor_(client.get_client_config().get_socket_interceptor())
69 , address_provider_(std::move(address_provider))
70 , connection_id_gen_(0)
71 , heartbeat_(client, *this)
72 , async_start_(client.get_client_config()
73 .get_connection_strategy_config()
75 , reconnect_mode_(client.get_client_config()
76 .get_connection_strategy_config()
77 .get_reconnect_mode())
78 , smart_routing_enabled_(
79 client.get_client_config().get_network_config().is_smart_routing())
80 , client_uuid_(client.random_uuid())
81 , authentication_timeout_(
82 boost::chrono::milliseconds(heartbeat_.get_heartbeat_timeout().count()))
83 , load_balancer_(client.get_client_config().get_load_balancer())
84 , wait_strategy_(client.get_client_config()
85 .get_connection_strategy_config()
88 , cluster_id_(boost::uuids::nil_uuid())
89 , connect_to_cluster_task_submitted_(false)
90 , use_public_address_(
91 address_provider_->is_default_provider() &&
92 client.get_client_config().get_network_config().use_public_address())
94 config::client_network_config& networkConfig =
95 client.get_client_config().get_network_config();
96 auto connTimeout = networkConfig.get_connection_timeout();
97 if (connTimeout.count() > 0) {
98 connection_timeout_millis_ = std::chrono::milliseconds(connTimeout);
101 client_properties& clientProperties = client.get_client_properties();
102 shuffle_member_list_ =
103 clientProperties.get_boolean(clientProperties.get_shuffle_member_list());
107 ClientConnectionManagerImpl::start()
109 bool expected =
false;
110 if (!alive_.compare_exchange_strong(expected,
true)) {
114 io_context_.reset(
new boost::asio::io_context);
116 new boost::asio::ip::tcp::resolver(io_context_->get_executor()));
117 socket_factory_.reset(
new internal::socket::SocketFactory(
118 client_, *io_context_, *io_resolver_));
119 io_guard_.reset(
new boost::asio::io_context::work(*io_context_));
121 if (!socket_factory_->start()) {
125 socket_interceptor_ = client_.get_client_config().get_socket_interceptor();
127 io_thread_ = std::thread([=]() { io_context_->run(); });
130 new hazelcast::util::hz_thread_pool(EXECUTOR_CORE_POOL_SIZE));
131 connect_to_members_timer_ =
132 boost::asio::steady_timer(executor_->get_executor());
136 connect_to_cluster();
137 if (smart_routing_enabled_) {
138 schedule_connect_to_all_members();
141 load_balancer_.init_(client_.get_cluster());
147 ClientConnectionManagerImpl::schedule_connect_to_all_members()
149 if (!client_.get_lifecycle_service().is_running()) {
153 connect_to_members_timer_->expires_from_now(
154 boost::asio::chrono::seconds(1));
155 connect_to_members_timer_->async_wait([=](boost::system::error_code ec) {
156 if (ec == boost::asio::error::operation_aborted) {
159 connect_to_all_members();
161 schedule_connect_to_all_members();
166 ClientConnectionManagerImpl::shutdown()
168 bool expected =
true;
169 if (!alive_.compare_exchange_strong(expected,
false)) {
173 if (connect_to_members_timer_) {
174 connect_to_members_timer_->cancel();
177 heartbeat_.shutdown();
180 for (
auto& connection : active_connections_.values()) {
182 util::IOUtil::close_resource(connection.get(),
183 "Hazelcast client is shutting down");
186 spi::impl::ClientExecutionServiceImpl::shutdown_thread_pool(
193 connection_listeners_.clear();
194 active_connections_.clear();
195 active_connection_ids_.clear();
198 std::shared_ptr<Connection>
199 ClientConnectionManagerImpl::get_or_connect(
const member& m)
201 const auto& uuid = m.get_uuid();
202 auto connection = active_connections_.get(uuid);
207 address addr = translate(m);
208 return connect(addr);
211 std::vector<std::shared_ptr<Connection>>
212 ClientConnectionManagerImpl::get_active_connections()
214 return active_connections_.values();
217 std::shared_ptr<Connection>
218 ClientConnectionManagerImpl::get_connection(boost::uuids::uuid uuid)
220 return active_connections_.get(uuid);
223 ClientConnectionManagerImpl::auth_response
224 ClientConnectionManagerImpl::authenticate_on_cluster(
225 std::shared_ptr<Connection>& connection)
228 encode_authentication_request(client_.get_serialization_service());
229 auto clientInvocation =
230 spi::impl::ClientInvocation::create(client_, request,
"", connection);
231 auto f = clientInvocation->invoke_urgent();
233 struct auth_response result;
235 if (f.wait_for(authentication_timeout_) !=
236 boost::future_status::ready) {
237 BOOST_THROW_EXCEPTION(exception::timeout(
238 "ClientConnectionManagerImpl::authenticate",
239 (boost::format(
"Authentication response is "
240 "not received for %1% msecs for %2%") %
241 authentication_timeout_.count() % *clientInvocation)
244 auto response = f.get();
245 auto* initial_frame =
246 reinterpret_cast<protocol::ClientMessage::frame_header_type*
>(
247 response.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN));
248 result = { response.get<
byte>(),
249 response.get<boost::uuids::uuid>(),
250 response.get<
byte>(),
251 response.get<int32_t>(),
252 response.get<boost::uuids::uuid>() };
254 response.rd_ptr(
static_cast<int32_t
>(initial_frame->frame_len) -
255 protocol::ClientMessage::RESPONSE_HEADER_LEN -
256 2 * protocol::ClientMessage::UINT8_SIZE -
257 2 * (
sizeof(boost::uuids::uuid) +
258 protocol::ClientMessage::UINT8_SIZE) -
259 protocol::ClientMessage::INT32_SIZE);
261 result.server_address = response.get_nullable<address>();
262 result.server_version = response.get<std::string>();
263 }
catch (exception::iexception&) {
264 connection->close(
"Failed to authenticate connection",
265 std::current_exception());
269 auto authentication_status = (protocol::authentication_status)result.status;
270 switch (authentication_status) {
271 case protocol::AUTHENTICATED: {
274 case protocol::CREDENTIALS_FAILED: {
275 auto e = std::make_exception_ptr(exception::authentication(
276 "ClientConnectionManagerImpl::authenticate_on_cluster",
277 "Authentication failed. The configured cluster name on the "
278 "client (see client_config::set_cluster_name()) does not match "
279 "the one configured in the cluster or the credentials set in the "
280 "Client security config could not be authenticated"));
281 connection->close(
"Failed to authenticate connection", e);
282 std::rethrow_exception(e);
284 case protocol::NOT_ALLOWED_IN_CLUSTER: {
285 auto e = std::make_exception_ptr(exception::authentication(
286 "ClientConnectionManagerImpl::authenticate_on_cluster",
287 "Client is not allowed in the cluster"));
288 connection->close(
"Failed to authenticate connection", e);
289 std::rethrow_exception(e);
292 auto e = std::make_exception_ptr(exception::authentication(
293 "ClientConnectionManagerImpl::authenticate_on_cluster",
295 "Authentication status code not supported. status: %1%") %
296 authentication_status)
298 connection->close(
"Failed to authenticate connection", e);
299 std::rethrow_exception(e);
304 protocol::ClientMessage
305 ClientConnectionManagerImpl::encode_authentication_request(
306 serialization::pimpl::SerializationService& ss)
308 byte serializationVersion = ss.get_version();
309 client_config& clientConfig = client_.get_client_config();
310 auto cluster_name = clientConfig.get_cluster_name();
312 auto credential = clientConfig.get_credentials();
314 return protocol::codec::client_authentication_encode(
319 protocol::ClientTypes::CPP,
320 serializationVersion,
326 switch (credential->type()) {
327 case security::credentials::credential_type::username_password: {
329 std::static_pointer_cast<security::username_password_credentials>(
331 return protocol::codec::client_authentication_encode(
336 protocol::ClientTypes::CPP,
337 serializationVersion,
342 case security::credentials::credential_type::token: {
344 std::static_pointer_cast<security::token_credentials>(credential);
345 return protocol::codec::client_authenticationcustom_encode(
349 protocol::ClientTypes::CPP,
350 serializationVersion,
357 return protocol::ClientMessage();
361 ClientConnectionManagerImpl::fire_connection_added_event(
362 const std::shared_ptr<Connection>& connection)
364 for (
const std::shared_ptr<ConnectionListener>& connectionListener :
365 connection_listeners_.to_array()) {
366 connectionListener->connection_added(connection);
371 ClientConnectionManagerImpl::fire_connection_removed_event(
372 const std::shared_ptr<Connection>& connection)
374 for (
const auto& listener : connection_listeners_.to_array()) {
375 listener->connection_removed(connection);
380 ClientConnectionManagerImpl::shutdown_with_external_thread(
381 std::weak_ptr<client::impl::hazelcast_client_instance_impl> client_impl)
384 std::shared_ptr<client::impl::hazelcast_client_instance_impl>
385 clientInstance = client_impl.lock();
386 if (!clientInstance ||
387 !clientInstance->get_lifecycle_service().is_running()) {
392 clientInstance->get_lifecycle_service().shutdown();
393 } catch (exception::iexception& e) {
394 HZ_LOG(*clientInstance->get_logger(),
396 boost::str(boost::format(
"Exception during client shutdown "
397 "%1%.clientShutdown-:%2%") %
398 clientInstance->get_name() % e));
404 ClientConnectionManagerImpl::submit_connect_to_cluster_task()
406 bool expected =
false;
407 if (!connect_to_cluster_task_submitted_.compare_exchange_strong(expected,
412 std::weak_ptr<client::impl::hazelcast_client_instance_impl> c =
413 client_.get_hazelcast_client_implementation();
414 boost::asio::post(executor_->get_executor(), [=]() {
416 do_connect_to_cluster();
418 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
419 connect_to_cluster_task_submitted_ = false;
420 if (active_connections_.empty()) {
424 boost::str(boost::format(
"No connection to cluster: %1%") %
427 submit_connect_to_cluster_task();
430 } catch (std::exception& e) {
433 boost::str(boost::format(
"Could not connect to any cluster, "
434 "shutting down the client: %1%") %
437 shutdown_with_external_thread(c);
443 ClientConnectionManagerImpl::connect_to_all_members()
445 if (!client_.get_lifecycle_service().is_running() ||
446 active_connections_.empty()) {
451 client_.get_client_cluster_service().get_member_list()) {
452 if (client_.get_lifecycle_service().is_running() &&
453 !get_connection(m.get_uuid()) &&
454 connecting_members_.get_or_put_if_absent(m,
nullptr).second) {
457 member member_to_connect = m;
459 executor_->get_executor(), [member_to_connect,
this]() {
461 if (!client_.get_lifecycle_service().is_running()) {
464 if (!get_connection(member_to_connect.get_uuid())) {
465 get_or_connect(member_to_connect);
467 connecting_members_.remove(member_to_connect);
468 } catch (std::exception&) {
469 connecting_members_.remove(member_to_connect);
477 ClientConnectionManagerImpl::do_connect_to_cluster()
479 std::unordered_set<address> tried_addresses;
480 wait_strategy_.reset();
483 std::unordered_set<address> tried_addresses_per_attempt;
485 client_.get_client_cluster_service().get_member_list();
486 if (shuffle_member_list_) {
487 shuffle(member_list);
491 for (
const auto& m : member_list) {
492 check_client_active();
493 tried_addresses_per_attempt.insert(m.get_address());
494 auto connection = try_connect(m);
501 for (
const address& server_address : get_possible_member_addresses()) {
502 check_client_active();
503 if (!tried_addresses_per_attempt.insert(server_address).second) {
508 auto connection = try_connect<address>(server_address);
513 tried_addresses.insert(tried_addresses_per_attempt.begin(),
514 tried_addresses_per_attempt.end());
518 check_client_active();
519 }
while (wait_strategy_.sleep());
521 std::ostringstream out;
522 out <<
"Unable to connect to any address! The following addresses were "
524 for (
const auto& address : tried_addresses) {
525 out << address <<
" , ";
528 BOOST_THROW_EXCEPTION(exception::illegal_state(
529 "ConnectionManager::do_connect_to_cluster", out.str()));
533 ClientConnectionManagerImpl::get_possible_member_addresses()
535 std::vector<address> addresses;
537 client_.get_client_cluster_service().get_member_list()) {
538 addresses.emplace_back(std::move(member.get_address()));
541 if (shuffle_member_list_) {
545 std::vector<address> provided_addresses =
546 address_provider_->load_addresses();
548 if (shuffle_member_list_) {
549 shuffle(provided_addresses);
553 addresses.end(), provided_addresses.begin(), provided_addresses.end());
559 ClientConnectionManagerImpl::connect_to_cluster()
562 submit_connect_to_cluster_task();
564 do_connect_to_cluster();
569 ClientConnectionManagerImpl::is_alive()
575 ClientConnectionManagerImpl::on_connection_close(
576 const std::shared_ptr<Connection>& connection)
578 auto endpoint = connection->get_remote_address();
579 auto member_uuid = connection->get_remote_uuid();
581 auto socket_remote_address = connection->get_socket().get_remote_endpoint();
586 boost::str(boost::format(
587 "Destroying %1% , but it has end-point set to null "
588 "-> not removing it from a connection map") %
593 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
594 if (active_connections_.remove(member_uuid, connection)) {
595 active_connection_ids_.remove(connection->get_connection_id());
600 boost::str(boost::format(
601 "Removed connection to endpoint: %1%, connection: %2%") %
602 *endpoint % *connection));
604 if (active_connections_.empty()) {
605 fire_life_cycle_event(
606 lifecycle_event::lifecycle_state::CLIENT_DISCONNECTED);
608 trigger_cluster_reconnection();
611 fire_connection_removed_event(connection);
615 boost::str(boost::format(
616 "Destroying a connection, but there is no mapping "
617 "%1% -> %2% in the connection map.") %
618 endpoint % *connection));
623 ClientConnectionManagerImpl::add_connection_listener(
624 const std::shared_ptr<ConnectionListener>& connection_listener)
626 connection_listeners_.add(connection_listener);
629 ClientConnectionManagerImpl::~ClientConnectionManagerImpl()
635 ClientConnectionManagerImpl::get_logger()
637 return client_.get_logger();
641 ClientConnectionManagerImpl::check_client_active()
643 if (!client_.get_lifecycle_service().is_running()) {
644 BOOST_THROW_EXCEPTION(exception::hazelcast_client_not_active(
645 "ClientConnectionManagerImpl::check_client_active",
646 "Client is shutdown"));
650 std::shared_ptr<Connection>
651 ClientConnectionManagerImpl::on_authenticated(
652 const std::shared_ptr<Connection>& connection,
653 auth_response& response)
656 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
657 check_partition_count(response.partition_count);
658 connection->set_connected_server_version(response.server_version);
659 connection->set_remote_address(response.server_address);
660 connection->set_remote_uuid(response.member_uuid);
662 auto existing_connection =
663 active_connections_.get(response.member_uuid);
664 if (existing_connection) {
667 "Duplicate connection to same member with uuid : %1%") %
668 boost::uuids::to_string(response.member_uuid))
670 return existing_connection;
673 auto new_cluster_id = response.cluster_id;
674 boost::uuids::uuid current_cluster_id = cluster_id_;
678 boost::str(boost::format(
679 "Checking the cluster: %1%, current cluster: %2%") %
680 new_cluster_id % current_cluster_id));
682 auto cluster_id_changed = !current_cluster_id.is_nil() &&
683 !(new_cluster_id == current_cluster_id);
684 if (cluster_id_changed) {
690 "Switching from current cluster: %1% to new cluster: %2%") %
691 current_cluster_id % new_cluster_id));
692 client_.get_hazelcast_client_implementation()->on_cluster_restart();
695 auto connections_empty = active_connections_.empty();
696 active_connection_ids_.put(connection->get_connection_id(), connection);
697 active_connections_.put(response.member_uuid, connection);
698 if (connections_empty) {
699 cluster_id_ = new_cluster_id;
700 fire_life_cycle_event(
701 lifecycle_event::lifecycle_state::CLIENT_CONNECTED);
704 auto local_address = connection->get_local_socket_address();
711 "Authenticated with server %1%:%2%, server version: %3%, "
712 "local address: %4%. %5%") %
713 response.server_address % response.member_uuid %
714 response.server_version % *local_address % *connection));
721 "Authenticated with server %1%:%2%, server version: %3%, "
722 "no local address: (connection disconnected ?). %4%") %
723 response.server_address % response.member_uuid %
724 response.server_version % *connection));
727 fire_connection_added_event(connection);
735 if (!connection->is_alive()) {
736 on_connection_close(connection);
742 if (!client_.get_lifecycle_service().is_running()) {
743 connection->close(
"Client is shutdown");
750 ClientConnectionManagerImpl::fire_life_cycle_event(
751 lifecycle_event::lifecycle_state state)
753 client_.get_lifecycle_service().fire_lifecycle_event(state);
757 ClientConnectionManagerImpl::check_partition_count(int32_t new_partition_count)
759 auto& partition_service =
760 static_cast<spi::impl::ClientPartitionServiceImpl&
>(
761 client_.get_partition_service());
762 if (!partition_service.check_and_set_partition_count(new_partition_count)) {
763 BOOST_THROW_EXCEPTION(exception::client_not_allowed_in_cluster(
764 "ClientConnectionManagerImpl::check_partition_count",
766 "Client can not work with this cluster because it has a different "
768 "Expected partition count: %1%, Member partition count: %2%") %
769 partition_service.get_partition_count() % new_partition_count)
775 ClientConnectionManagerImpl::trigger_cluster_reconnection()
777 if (reconnect_mode_ ==
778 config::client_connection_strategy_config::reconnect_mode::OFF) {
780 logger_, info,
"RECONNECT MODE is off. Shutting down the client.");
781 shutdown_with_external_thread(
782 client_.get_hazelcast_client_implementation());
786 if (client_.get_lifecycle_service().is_running()) {
787 submit_connect_to_cluster_task();
791 std::shared_ptr<Connection>
792 ClientConnectionManagerImpl::get_random_connection()
794 if (smart_routing_enabled_) {
795 auto member = load_balancer_.next_(client_.get_cluster());
799 auto connection = get_connection(member->get_uuid());
805 auto connections = active_connections_.values();
806 if (connections.empty()) {
810 return connections[0];
814 ClientConnectionManagerImpl::get_client_uuid()
const
820 ClientConnectionManagerImpl::check_invocation_allowed()
822 if (active_connections_.size() > 0) {
827 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
828 "ClientConnectionManagerImpl::check_invocation_allowed",
829 "No connection found to cluster and async start is configured."));
830 }
else if (reconnect_mode_ == config::client_connection_strategy_config::
831 reconnect_mode::ASYNC) {
832 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
833 "ClientConnectionManagerImpl::check_invocation_allowed",
834 "No connection found to cluster and reconnect mode is async."));
836 BOOST_THROW_EXCEPTION(
837 exception::io(
"ClientConnectionManagerImpl::check_invocation_allowed",
838 "No connection found to cluster."));
843 ClientConnectionManagerImpl::connect_to_all_cluster_members()
845 if (!smart_routing_enabled_) {
849 for (
const auto& member :
850 client_.get_client_cluster_service().get_member_list()) {
852 get_or_connect(member);
853 }
catch (std::exception&) {
860 ClientConnectionManagerImpl::notify_backup(int64_t call_id)
862 struct correlation_id
864 int32_t connnection_id;
870 correlation_id composed_id;
872 c_id_union.id = call_id;
873 auto connection_id = c_id_union.composed_id.connnection_id;
874 auto connection = active_connection_ids_.get(connection_id);
878 boost::asio::post(connection->get_socket().get_executor(), [=]() {
879 auto invocation_it = connection->invocations.find(call_id);
880 if (invocation_it != connection->invocations.end()) {
881 invocation_it->second->notify_backup();
886 std::shared_ptr<Connection>
887 ClientConnectionManagerImpl::connect(
const address& addr)
891 boost::str(boost::format(
"Trying to connect to %1%.") % addr));
893 auto connection = std::make_shared<Connection>(addr,
895 ++connection_id_gen_,
898 connection_timeout_millis_);
899 connection->connect();
902 socket_interceptor_.connect_(connection->get_socket());
904 auto result = authenticate_on_cluster(connection);
906 return on_authenticated(connection, result);
910 ClientConnectionManagerImpl::translate(
const member& m)
912 if (use_public_address_) {
913 auto public_addr_it = m.address_map().find(PUBLIC_ENDPOINT_QUALIFIER);
914 if (public_addr_it != m.address_map().end()) {
915 return public_addr_it->second;
917 return m.get_address();
921 boost::optional<address> addr =
922 address_provider_->translate(m.get_address());
925 throw exception::hazelcast_(boost::str(
926 boost::format(
"Address Provider could not translate %1%") % m));
930 }
catch (
const exception::hazelcast_&) {
932 logger::level::warning,
933 boost::str(boost::format(
"Address Provider could not translate %1%") %
940 std::shared_ptr<connection::Connection>
941 ClientConnectionManagerImpl::connection_for_sql(
942 std::function<boost::optional<member>()> member_of_large_same_version_group,
943 std::function<boost::optional<member>(boost::uuids::uuid)> get_cluster_member)
945 if (smart_routing_enabled_) {
950 for (
int i = 0; i < SQL_CONNECTION_RANDOM_ATTEMPTS; i++) {
951 auto member = member_of_large_same_version_group();
955 auto connection = active_connections_.get(member->get_uuid());
964 std::shared_ptr<connection::Connection> first_connection;
965 for (
const auto& connection_entry : active_connections_.entry_set()) {
966 if (!first_connection) {
967 first_connection = connection_entry.second;
969 const auto& member_id = connection_entry.first;
970 auto member = get_cluster_member(member_id);
971 if (!member || member->is_lite_member()) {
974 return connection_entry.second;
978 return first_connection;
981 ReadHandler::ReadHandler(Connection& connection,
size_t buffer_size)
982 : buffer(new char[buffer_size])
983 , byte_buffer(buffer, buffer_size)
984 , builder_(connection)
985 , last_read_time_(std::chrono::steady_clock::now().time_since_epoch())
988 ReadHandler::~ReadHandler()
994 ReadHandler::handle()
996 last_read_time_ = std::chrono::steady_clock::now().time_since_epoch();
998 if (byte_buffer.position() == 0)
1006 while (byte_buffer.has_remaining() && builder_.on_data(byte_buffer)) {
1009 if (byte_buffer.has_remaining()) {
1010 byte_buffer.compact();
1012 byte_buffer.clear();
1016 std::chrono::steady_clock::time_point
1017 ReadHandler::get_last_read_time()
const
1019 return std::chrono::steady_clock::time_point{ last_read_time_ };
1023 AddressProvider::is_default_provider()
1028 Connection::Connection(
1029 const address& address,
1030 spi::ClientContext& client_context,
1032 internal::socket::SocketFactory& socket_factory,
1033 ClientConnectionManagerImpl& client_connection_manager,
1034 std::chrono::milliseconds& connect_timeout_in_millis)
1035 : read_handler(*this, 16 << 10)
1036 , start_time_(std::chrono::system_clock::now())
1037 , closed_time_duration_()
1038 , client_context_(client_context)
1039 , invocation_service_(client_context.get_invocation_service())
1040 , connection_id_(connection_id)
1041 , remote_uuid_(boost::uuids::nil_uuid())
1042 , logger_(client_context.get_logger())
1044 , last_write_time_(std::chrono::steady_clock::now().time_since_epoch())
1046 socket_ = socket_factory.create(address, connect_timeout_in_millis);
1049 Connection::~Connection() =
default;
1052 Connection::connect()
1054 socket_->connect(shared_from_this());
1055 backup_timer_.reset(
1056 new boost::asio::steady_timer(socket_->get_executor().context()));
1057 auto backupTimeout =
1058 static_cast<spi::impl::ClientInvocationServiceImpl&
>(invocation_service_)
1059 .get_backup_timeout();
1060 auto this_connection = shared_from_this();
1061 schedule_periodic_backup_cleanup(backupTimeout, this_connection);
1065 Connection::schedule_periodic_backup_cleanup(
1066 std::chrono::milliseconds backup_timeout,
1067 std::shared_ptr<Connection> this_connection)
1073 backup_timer_->expires_from_now(backup_timeout);
1074 backup_timer_->async_wait(
1075 socket_->get_executor().wrap([=](boost::system::error_code ec) {
1079 for (
const auto& it : this_connection->invocations) {
1080 it.second->detect_and_handle_backup_timeout(backup_timeout);
1083 schedule_periodic_backup_cleanup(backup_timeout, this_connection);
1094 Connection::close(
const std::string& reason)
1096 close(reason,
nullptr);
1100 Connection::close(
const std::string& reason, std::exception_ptr cause)
1102 bool expected =
true;
1103 if (!alive_.compare_exchange_strong(expected,
false)) {
1107 closed_time_duration_.store(
1108 std::chrono::duration_cast<std::chrono::milliseconds>(
1109 std::chrono::steady_clock::now().time_since_epoch()));
1111 if (backup_timer_) {
1112 boost::system::error_code ignored;
1113 backup_timer_->cancel(ignored);
1116 close_cause_ = cause;
1117 close_reason_ = reason;
1123 }
catch (exception::iexception& e) {
1125 client_context_.get_logger(),
1127 boost::str(boost::format(
"Exception while closing connection %1%") %
1131 auto thisConnection = shared_from_this();
1132 client_context_.get_connection_manager().on_connection_close(
1135 boost::asio::post(socket_->get_executor(), [=]() {
1136 for (auto& invocationEntry : thisConnection->invocations) {
1137 invocationEntry.second->notify_exception(std::make_exception_ptr(
1138 boost::enable_current_exception(exception::target_disconnected(
1139 "Connection::close", thisConnection->get_close_reason()))));
1146 const std::shared_ptr<spi::impl::ClientInvocation>& client_invocation)
1148 socket_->async_write(shared_from_this(), client_invocation);
1151 const boost::optional<address>&
1152 Connection::get_remote_address()
const
1154 return remote_address_;
1158 Connection::set_remote_address(boost::optional<address> endpoint)
1160 this->remote_address_ = std::move(endpoint);
1164 Connection::handle_client_message(
1165 const std::shared_ptr<protocol::ClientMessage>& message)
1167 auto correlationId = message->get_correlation_id();
1168 auto invocationIterator = invocations.find(correlationId);
1169 if (invocationIterator == invocations.end()) {
1172 boost::str(boost::format(
"No invocation for callId: %1%. "
1173 "Dropping this message: %2%") %
1174 correlationId % *message));
1177 auto invocation = invocationIterator->second;
1178 auto flags = message->get_header_flags();
1179 if (message->is_flag_set(flags,
1180 protocol::ClientMessage::BACKUP_EVENT_FLAG)) {
1181 message->rd_ptr(protocol::ClientMessage::EVENT_HEADER_LEN);
1182 correlationId = message->get<int64_t>();
1183 client_context_.get_connection_manager().notify_backup(correlationId);
1184 }
else if (message->is_flag_set(flags,
1185 protocol::ClientMessage::IS_EVENT_FLAG)) {
1186 client_context_.get_client_listener_service().handle_client_message(
1187 invocation, message);
1189 invocation_service_.handle_client_message(invocation, message);
1194 Connection::get_connection_id()
const
1196 return connection_id_;
1200 Connection::is_alive()
const
1206 Connection::get_close_reason()
const
1208 return close_reason_;
1212 Connection::log_close()
1214 std::ostringstream message;
1215 message << *
this <<
" closed. Reason: ";
1216 if (!close_reason_.empty()) {
1217 message << close_reason_;
1218 }
else if (close_cause_) {
1220 std::rethrow_exception(close_cause_);
1221 }
catch (exception::iexception& ie) {
1222 message << ie.get_source() <<
"[" + ie.get_message() <<
"]";
1225 message <<
"Socket explicitly closed";
1228 if (client_context_.get_lifecycle_service().is_running()) {
1229 if (!close_cause_) {
1230 HZ_LOG(logger_, info, message.str());
1233 std::rethrow_exception(close_cause_);
1234 }
catch (exception::iexception& ie) {
1238 boost::str(boost::format(
"%1% %2%") % message.str() % ie));
1243 logger_, finest, message.str() + [
this]() -> std::string {
1246 std::rethrow_exception(close_cause_);
1247 } catch (exception::iexception& ie) {
1257 Connection::operator==(
const Connection& rhs)
const
1259 return connection_id_ == rhs.connection_id_;
1263 Connection::operator!=(
const Connection& rhs)
const
1265 return !(rhs == *
this);
1269 Connection::get_connected_server_version_string()
const
1271 return connected_server_version_string_;
1275 Connection::set_connected_server_version(
const std::string& connected_server)
1277 Connection::connected_server_version_string_ = connected_server;
1280 boost::optional<address>
1281 Connection::get_local_socket_address()
const
1283 return socket_->local_socket_address();
1286 std::chrono::steady_clock::time_point
1287 Connection::last_read_time()
const
1289 return read_handler.get_last_read_time();
1293 Connection::inner_close()
1299 auto thisConnection = shared_from_this();
1300 boost::asio::post(socket_->get_executor(),
1301 [=]() { thisConnection->socket_->close(); });
1305 operator<<(std::ostream& os,
const Connection& connection)
1308 <<
"alive=" << connection.is_alive()
1309 <<
", connection id=" << connection.get_connection_id()
1310 <<
", remote endpoint=";
1311 if (connection.get_remote_address()) {
1312 os << *connection.get_remote_address();
1316 os <<
", last_read_time="
1317 << util::StringUtil::time_to_string(connection.last_read_time())
1318 <<
", last_write_time="
1319 << util::StringUtil::time_to_string(connection.last_write_time())
1321 << util::StringUtil::time_to_string(
1322 std::chrono::steady_clock::time_point(
1323 std::chrono::duration_cast<std::chrono::steady_clock::duration>(
1324 connection.closed_time_duration_.load())))
1325 <<
", connected server version="
1326 << connection.connected_server_version_string_ <<
'}';
1332 Connection::operator<(
const Connection& rhs)
const
1334 return connection_id_ < rhs.connection_id_;
1337 std::chrono::system_clock::time_point
1338 Connection::get_start_time()
const
1344 Connection::get_socket()
1350 Connection::deregister_invocation(int64_t call_id)
1352 invocations.erase(call_id);
1356 Connection::get_remote_uuid()
const
1358 return remote_uuid_;
1362 Connection::set_remote_uuid(boost::uuids::uuid remote_uuid)
1364 remote_uuid_ = remote_uuid;
1368 Connection::last_write_time(std::chrono::steady_clock::time_point tp)
1370 last_write_time_ = tp.time_since_epoch();
1373 std::chrono::steady_clock::time_point
1374 Connection::last_write_time()
const
1376 return std::chrono::steady_clock::time_point{ last_write_time_ };
1379 HeartbeatManager::HeartbeatManager(
1380 spi::ClientContext& client,
1381 ClientConnectionManagerImpl& connection_manager)
1383 , client_connection_manager_(connection_manager)
1384 , logger_(client.get_logger())
1386 client_properties& clientProperties = client.get_client_properties();
1387 auto timeout_millis =
1388 clientProperties.get_long(clientProperties.get_heartbeat_timeout());
1389 heartbeat_timeout_ = std::chrono::milliseconds(
1392 : util::IOUtil::to_value<int64_t>(
1393 client_properties::PROP_HEARTBEAT_TIMEOUT_DEFAULT));
1395 auto interval_millis =
1396 clientProperties.get_long(clientProperties.get_heartbeat_interval());
1397 heartbeat_interval_ = std::chrono::milliseconds(
1400 : util::IOUtil::to_value<int64_t>(
1401 client_properties::PROP_HEARTBEAT_INTERVAL_DEFAULT));
1405 HeartbeatManager::start()
1407 spi::impl::ClientExecutionServiceImpl& clientExecutionService =
1408 client_.get_client_execution_service();
1410 timer_ = clientExecutionService.schedule_with_repetition(
1412 if (!client_connection_manager_.is_alive()) {
1416 for (
auto& connection :
1417 client_connection_manager_.get_active_connections()) {
1418 check_connection(connection);
1421 heartbeat_interval_,
1422 heartbeat_interval_);
1426 HeartbeatManager::check_connection(
1427 const std::shared_ptr<Connection>& connection)
1429 if (!connection->is_alive()) {
1433 auto now = std::chrono::steady_clock::now();
1434 if (now - connection->last_read_time() > heartbeat_timeout_) {
1438 boost::format(
"Heartbeat failed over the connection: %1%") %
1440 on_heartbeat_stopped(connection,
"Heartbeat timed out");
1444 if (now - connection->last_write_time() > heartbeat_interval_) {
1445 auto request = protocol::codec::client_ping_encode();
1446 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
1447 spi::impl::ClientInvocation::create(client_, request,
"", connection);
1448 clientInvocation->invoke_urgent();
1453 HeartbeatManager::on_heartbeat_stopped(
1454 const std::shared_ptr<Connection>& connection,
1455 const std::string& reason)
1459 std::make_exception_ptr(
1460 (exception::exception_builder<exception::target_disconnected>(
1461 "HeartbeatManager::onHeartbeatStopped")
1462 <<
"Heartbeat timed out to connection " << *connection)
1467 HeartbeatManager::shutdown()
1470 boost::system::error_code ignored;
1471 timer_->cancel(ignored);
1475 std::chrono::milliseconds
1476 HeartbeatManager::get_heartbeat_timeout()
const
1478 return heartbeat_timeout_;
1482 wait_strategy::reset()
1485 cluster_connect_attempt_begin_ = std::chrono::steady_clock::now();
1486 current_backoff_millis_ =
1487 (std::min)(max_backoff_millis_, initial_backoff_millis_);
1490 wait_strategy::wait_strategy(
1491 const config::connection_retry_config& retry_config,
1493 : initial_backoff_millis_(retry_config.get_initial_backoff_duration())
1494 , max_backoff_millis_(retry_config.get_max_backoff_duration())
1495 , multiplier_(retry_config.get_multiplier())
1496 , jitter_(retry_config.get_jitter())
1498 , cluster_connect_timeout_millis_(retry_config.get_cluster_connect_timeout())
1500 if (cluster_connect_timeout_millis_ == std::chrono::milliseconds::max()) {
1501 cluster_connect_timeout_text_ =
"INFINITE";
1503 cluster_connect_timeout_text_ =
1504 (boost::format(
"%1% msecs") % cluster_connect_timeout_millis_.count())
1510 wait_strategy::sleep()
1513 using namespace std::chrono;
1514 auto current_time = steady_clock::now();
1515 auto time_passed = duration_cast<milliseconds>(
1516 current_time - cluster_connect_attempt_begin_);
1517 if (time_passed > cluster_connect_timeout_millis_) {
1521 (boost::format(
"Unable to get live cluster connection, cluster "
1522 "connect timeout (%1%) is reached. Attempt %2%.") %
1523 cluster_connect_timeout_text_ % attempt_)
1529 auto actual_sleep_time =
1530 current_backoff_millis_ + milliseconds(
static_cast<milliseconds::rep
>(
1531 current_backoff_millis_.count() * jitter_ *
1532 (2.0 * random_(random_generator_) - 1.0)));
1534 actual_sleep_time = (std::min)(
1535 actual_sleep_time, cluster_connect_timeout_millis_ - time_passed);
1541 "Unable to get live cluster connection, retry in %1% ms, attempt: %2% "
1542 ", cluster connect timeout: %3% , max backoff millis: %4%") %
1543 actual_sleep_time.count() % attempt_ % cluster_connect_timeout_text_ %
1544 max_backoff_millis_.count())
1547 std::this_thread::sleep_for(actual_sleep_time);
1549 current_backoff_millis_ =
1550 (std::min)(milliseconds(
static_cast<milliseconds::rep
>(
1551 current_backoff_millis_.count() * multiplier_)),
1552 max_backoff_millis_);
1557 namespace internal {
1559 SocketFactory::SocketFactory(spi::ClientContext& client_context,
1560 boost::asio::io_context& io,
1561 boost::asio::ip::tcp::resolver& resolver)
1562 : client_context_(client_context)
1564 , io_resolver_(resolver)
1568 SocketFactory::start()
1570 #ifdef HZ_BUILD_WITH_SSL
1572 client_context_.get_client_config().get_network_config().get_ssl_config();
1573 if (sslConfig.is_enabled()) {
1574 if (sslConfig.ssl_context_) {
1575 ssl_context_ = sslConfig.ssl_context_;
1577 ssl_context_ = std::make_shared<boost::asio::ssl::context>(
1578 (boost::asio::ssl::context_base::method)sslConfig.get_protocol());
1580 ssl_context_->set_verify_mode(boost::asio::ssl::verify_peer);
1581 ssl_context_->set_default_verify_paths();
1583 const std::vector<std::string>& verifyFiles =
1584 sslConfig.get_verify_files();
1585 bool success =
true;
1586 logger& lg = client_context_.get_logger();
1587 for (
const auto& f : verifyFiles) {
1588 boost::system::error_code ec;
1589 ssl_context_->load_verify_file(f, ec);
1595 boost::format(
"SocketFactory::start: Failed to load CA "
1596 "verify file at %1% %2%") %
1603 ssl_context_.reset();
1606 "SocketFactory::start: Failed to load one or more "
1607 "configured CA verify files (PEM files). Please "
1608 "correct the files and retry.");
1614 const std::string& cipherList = sslConfig.get_cipher_list();
1615 if (!cipherList.empty()) {
1616 if (!SSL_CTX_set_cipher_list(ssl_context_->native_handle(),
1617 cipherList.c_str())) {
1618 logger& lg = client_context_.get_logger();
1621 std::string(
"SocketFactory::start: Could not load any "
1622 "of the ciphers in the config provided "
1630 (void)client_context_;
1635 std::unique_ptr<hazelcast::client::socket>
1636 SocketFactory::create(
const address& address,
1637 std::chrono::milliseconds& connect_timeout_in_millis)
1639 #ifdef HZ_BUILD_WITH_SSL
1640 if (ssl_context_.get()) {
1641 return std::unique_ptr<hazelcast::client::socket>(
1642 new internal::socket::SSLSocket(io_,
1645 client_context_.get_client_config()
1646 .get_network_config()
1647 .get_socket_options(),
1648 connect_timeout_in_millis,
1653 return std::unique_ptr<hazelcast::client::socket>(
1654 new internal::socket::TcpSocket(io_,
1656 client_context_.get_client_config()
1657 .get_network_config()
1658 .get_socket_options(),
1659 connect_timeout_in_millis,
1663 #ifdef HZ_BUILD_WITH_SSL
1665 SSLSocket::SSLSocket(boost::asio::io_context& io_service,
1666 boost::asio::ssl::context& ssl_context,
1667 const client::address& address,
1668 client::config::socket_options& socket_options,
1669 std::chrono::milliseconds& connect_timeout_in_millis,
1670 boost::asio::ip::tcp::resolver& resolver)
1671 : BaseSocket<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>(
1676 connect_timeout_in_millis,
1680 std::vector<SSLSocket::CipherInfo>
1681 SSLSocket::get_ciphers()
1683 STACK_OF(SSL_CIPHER)* ciphers = SSL_get_ciphers(socket_.native_handle());
1684 std::vector<CipherInfo> supportedCiphers;
1685 for (
int i = 0; i < sk_SSL_CIPHER_num(ciphers); ++i) {
1686 struct SSLSocket::CipherInfo info;
1687 const SSL_CIPHER* cipher = sk_SSL_CIPHER_value(ciphers, i);
1688 info.name = SSL_CIPHER_get_name(cipher);
1689 info.number_of_bits = SSL_CIPHER_get_bits(cipher, 0);
1690 info.version = SSL_CIPHER_get_version(cipher);
1692 info.description = SSL_CIPHER_description(cipher, descBuf, 256);
1693 supportedCiphers.push_back(info);
1695 return supportedCiphers;
1699 SSLSocket::post_connect()
1701 socket_.handshake(boost::asio::ssl::stream_base::client);
1705 operator<<(std::ostream& out,
const SSLSocket::CipherInfo& info)
1709 << info.name <<
", Bits:" << info.number_of_bits
1710 <<
", Version:" << info.version <<
", Description:" << info.description
1718 TcpSocket::TcpSocket(boost::asio::io_context& io,
1719 const address& address,
1720 client::config::socket_options& socket_options,
1721 std::chrono::milliseconds& connect_timeout_in_millis,
1722 boost::asio::ip::tcp::resolver& resolver)
1723 : BaseSocket<boost::asio::ip::tcp::socket>(resolver,
1727 connect_timeout_in_millis)
1737 hash<std::shared_ptr<hazelcast::client::connection::Connection>>::operator()(
1738 const std::shared_ptr<hazelcast::client::connection::Connection>& conn)
1744 return std::abs(conn->get_connection_id());