34 #include <unordered_set>
36 #include "hazelcast/client/execution_callback.h"
37 #include "hazelcast/client/lifecycle_event.h"
38 #include "hazelcast/client/connection/AddressProvider.h"
39 #include "hazelcast/client/spi/impl/ClientInvocation.h"
40 #include "hazelcast/util/Util.h"
41 #include "hazelcast/client/protocol/AuthenticationStatus.h"
42 #include "hazelcast/client/exception/protocol_exceptions.h"
43 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
44 #include "hazelcast/client/connection/ConnectionListener.h"
45 #include "hazelcast/client/connection/Connection.h"
46 #include "hazelcast/client/spi/ClientContext.h"
47 #include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
48 #include "hazelcast/client/serialization/serialization.h"
49 #include "hazelcast/client/protocol/UsernamePasswordCredentials.h"
50 #include "hazelcast/client/protocol/codec/codecs.h"
51 #include "hazelcast/client/client_config.h"
52 #include "hazelcast/client/socket_interceptor.h"
53 #include "hazelcast/client/config/client_network_config.h"
54 #include "hazelcast/client/client_properties.h"
55 #include "hazelcast/client/connection/HeartbeatManager.h"
56 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
57 #include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
58 #include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
59 #include "hazelcast/client/internal/socket/TcpSocket.h"
60 #include "hazelcast/client/internal/socket/SSLSocket.h"
61 #include "hazelcast/client/config/ssl_config.h"
62 #include "hazelcast/util/IOUtil.h"
63 #include "hazelcast/client/internal/socket/SocketFactory.h"
64 #include "hazelcast/client/connection/wait_strategy.h"
68 namespace connection {
69 constexpr
size_t ClientConnectionManagerImpl::EXECUTOR_CORE_POOL_SIZE;
71 ClientConnectionManagerImpl::ClientConnectionManagerImpl(spi::ClientContext &client,
72 std::unique_ptr<AddressProvider> address_provider)
73 : alive_(false), logger_(client.get_logger()),
74 connection_timeout_millis_((std::chrono::milliseconds::max) ()),
75 client_(client), socket_interceptor_(client.get_client_config().get_socket_interceptor()),
76 address_provider_(std::move(address_provider)), connection_id_gen_(0),
77 heartbeat_(client, *this),
78 async_start_(client.get_client_config().get_connection_strategy_config().is_async_start()),
79 reconnect_mode_(client.get_client_config().get_connection_strategy_config().get_reconnect_mode()),
80 smart_routing_enabled_(client.get_client_config().get_network_config().is_smart_routing()),
81 client_uuid_(client.random_uuid()),
82 authentication_timeout_(boost::chrono::milliseconds(heartbeat_.get_heartbeat_timeout().count())),
83 load_balancer_(client.get_client_config().get_load_balancer()),
84 wait_strategy_(client.get_client_config().get_connection_strategy_config().get_retry_config(),
85 logger_), cluster_id_(boost::uuids::nil_uuid()),
86 connect_to_cluster_task_submitted_(false) {
88 config::client_network_config &networkConfig = client.get_client_config().get_network_config();
89 auto connTimeout = networkConfig.get_connection_timeout();
90 if (connTimeout.count() > 0) {
91 connection_timeout_millis_ = std::chrono::milliseconds(connTimeout);
94 client_properties &clientProperties = client.get_client_properties();
95 shuffle_member_list_ = clientProperties.get_boolean(clientProperties.get_shuffle_member_list());
98 bool ClientConnectionManagerImpl::start() {
99 bool expected =
false;
100 if (!alive_.compare_exchange_strong(expected,
true)) {
104 io_context_.reset(
new boost::asio::io_context);
105 io_resolver_.reset(
new boost::asio::ip::tcp::resolver(io_context_->get_executor()));
106 socket_factory_.reset(
new internal::socket::SocketFactory(client_, *io_context_, *io_resolver_));
107 io_guard_.reset(
new boost::asio::io_context::work(*io_context_));
109 if (!socket_factory_->start()) {
113 socket_interceptor_ = client_.get_client_config().get_socket_interceptor();
115 io_thread_ = std::thread([=]() { io_context_->run(); });
117 executor_.reset(
new hazelcast::util::hz_thread_pool(EXECUTOR_CORE_POOL_SIZE));
118 connect_to_members_timer_ = boost::asio::steady_timer(executor_->get_executor());
122 connect_to_cluster();
123 if (smart_routing_enabled_) {
124 schedule_connect_to_all_members();
127 load_balancer_.init_(client_.get_cluster());
132 void ClientConnectionManagerImpl::schedule_connect_to_all_members() {
133 connect_to_members_timer_->expires_from_now(boost::asio::chrono::seconds(1));
134 connect_to_members_timer_->async_wait([=](boost::system::error_code ec) {
135 if (ec == boost::asio::error::operation_aborted) {
138 connect_to_all_members();
140 if (!client_.get_lifecycle_service().is_running()) {
144 schedule_connect_to_all_members();
148 void ClientConnectionManagerImpl::shutdown() {
149 bool expected =
true;
150 if (!alive_.compare_exchange_strong(expected,
false)) {
154 if (connect_to_members_timer_) {
155 connect_to_members_timer_->cancel();
158 heartbeat_.shutdown();
161 for (
auto &connection : active_connections_.values()) {
163 util::IOUtil::close_resource(connection.get(),
"Hazelcast client is shutting down");
166 spi::impl::ClientExecutionServiceImpl::shutdown_thread_pool(executor_.get());
172 connection_listeners_.clear();
173 active_connections_.clear();
174 active_connection_ids_.clear();
177 std::shared_ptr<Connection>
178 ClientConnectionManagerImpl::get_or_connect(
const address &address) {
179 auto connection = get_connection(address);
184 return connect(address);
187 std::shared_ptr<Connection>
188 ClientConnectionManagerImpl::get_or_connect(
const member &m) {
189 const auto &uuid = m.get_uuid();
190 auto connection = active_connections_.get(uuid);
195 return connect(m.get_address());
198 std::vector<std::shared_ptr<Connection> > ClientConnectionManagerImpl::get_active_connections() {
199 return active_connections_.values();
202 std::shared_ptr<Connection>
203 ClientConnectionManagerImpl::get_connection(
const address &address) {
204 for (
const auto &connection : active_connections_.values()) {
205 auto remote_address = connection->get_remote_address();
206 if (remote_address && *remote_address == address) {
213 std::shared_ptr<Connection> ClientConnectionManagerImpl::get_connection(boost::uuids::uuid uuid) {
214 return active_connections_.get(uuid);
217 ClientConnectionManagerImpl::auth_response
218 ClientConnectionManagerImpl::authenticate_on_cluster(std::shared_ptr<Connection> &connection) {
219 auto request = encode_authentication_request(client_.get_serialization_service());
220 auto clientInvocation = spi::impl::ClientInvocation::create(client_, request,
"", connection);
221 auto f = clientInvocation->invoke_urgent();
223 struct auth_response result;
225 if (f.wait_for(authentication_timeout_) != boost::future_status::ready) {
226 BOOST_THROW_EXCEPTION(exception::timeout(
227 "ClientConnectionManagerImpl::authenticate", (boost::format(
"Authentication response is "
228 "not received for %1% msecs for %2%") %
229 authentication_timeout_.count() %
230 *clientInvocation).str()));
232 auto response = f.get();
233 auto *initial_frame =
reinterpret_cast<protocol::ClientMessage::frame_header_t *
>(response.rd_ptr(
234 protocol::ClientMessage::RESPONSE_HEADER_LEN));
236 response.get<
byte>(), response.get<boost::uuids::uuid>(),
237 response.get<
byte>(), response.get<int32_t>(),
238 response.get<boost::uuids::uuid>()
241 response.rd_ptr(
static_cast<int32_t
>(initial_frame->frame_len) -
242 protocol::ClientMessage::RESPONSE_HEADER_LEN -
243 2 * protocol::ClientMessage::UINT8_SIZE -
244 2 * (
sizeof(boost::uuids::uuid) + protocol::ClientMessage::UINT8_SIZE) -
245 protocol::ClientMessage::INT32_SIZE);
247 result.server_address = response.get_nullable<address>();
248 result.server_version = response.get<std::string>();
249 }
catch (exception::iexception &) {
250 connection->close(
"Failed to authenticate connection", std::current_exception());
254 auto authentication_status = (protocol::authentication_status) result.status;
255 switch (authentication_status) {
256 case protocol::AUTHENTICATED: {
259 case protocol::CREDENTIALS_FAILED: {
260 auto e = std::make_exception_ptr(
261 exception::authentication(
"ClientConnectionManagerImpl::authenticate_on_cluster",
262 "Authentication failed. The configured cluster name on the client (see client_config::set_cluster_name()) does not match the one configured in the cluster or the credentials set in the Client security config could not be authenticated"));
263 connection->close(
"Failed to authenticate connection", e);
264 std::rethrow_exception(e);
266 case protocol::NOT_ALLOWED_IN_CLUSTER: {
267 auto e = std::make_exception_ptr(
268 exception::authentication(
"ClientConnectionManagerImpl::authenticate_on_cluster",
269 "Client is not allowed in the cluster"));
270 connection->close(
"Failed to authenticate connection", e);
271 std::rethrow_exception(e);
274 auto e = std::make_exception_ptr(exception::authentication(
275 "ClientConnectionManagerImpl::authenticate_on_cluster",
276 (boost::format(
"Authentication status code not supported. status: %1%") %
277 authentication_status).str()));
278 connection->close(
"Failed to authenticate connection", e);
279 std::rethrow_exception(e);
284 protocol::ClientMessage
285 ClientConnectionManagerImpl::encode_authentication_request(serialization::pimpl::SerializationService &ss) {
286 byte serializationVersion = ss.get_version();
287 client_config &clientConfig = client_.get_client_config();
288 auto cluster_name = clientConfig.get_cluster_name();
290 auto credential = clientConfig.get_credentials();
292 return protocol::codec::client_authentication_encode(cluster_name,
nullptr,
nullptr,
293 client_uuid_, protocol::ClientTypes::CPP,
294 serializationVersion, HAZELCAST_VERSION,
295 client_.get_name(), labels_);
298 switch(credential->type()) {
299 case security::credentials::credential_type::username_password:
301 auto cr = std::static_pointer_cast<security::username_password_credentials>(credential);
302 return protocol::codec::client_authentication_encode(cluster_name, &cr->name(), &cr->password(),
303 client_uuid_, protocol::ClientTypes::CPP,
304 serializationVersion, HAZELCAST_VERSION,
305 client_.get_name(), labels_);
307 case security::credentials::credential_type::token:
309 auto cr = std::static_pointer_cast<security::token_credentials>(credential);
310 return protocol::codec::client_authenticationcustom_encode(cluster_name, cr->token(),
311 client_uuid_, protocol::ClientTypes::CPP,
312 serializationVersion, HAZELCAST_VERSION,
313 client_.get_name(), labels_);
317 return protocol::ClientMessage();
321 ClientConnectionManagerImpl::fire_connection_added_event(
const std::shared_ptr<Connection> &connection) {
322 for (
const std::shared_ptr<ConnectionListener> &connectionListener : connection_listeners_.to_array()) {
323 connectionListener->connection_added(connection);
328 ClientConnectionManagerImpl::fire_connection_removed_event(
const std::shared_ptr<Connection> &connection) {
329 for (
const auto &listener : connection_listeners_.to_array()) {
330 listener->connection_removed(connection);
335 ClientConnectionManagerImpl::shutdown_with_external_thread(
336 std::weak_ptr<client::impl::hazelcast_client_instance_impl> client_impl) {
338 std::shared_ptr<client::impl::hazelcast_client_instance_impl> clientInstance = client_impl.lock();
339 if (!clientInstance || !clientInstance->get_lifecycle_service().is_running()) {
344 clientInstance->get_lifecycle_service().shutdown();
345 } catch (exception::iexception &e) {
346 HZ_LOG(*clientInstance->get_logger(), severe,
347 boost::str(boost::format(
"Exception during client shutdown "
348 "%1%.clientShutdown-:%2%")
349 % clientInstance->get_name()
356 void ClientConnectionManagerImpl::submit_connect_to_cluster_task() {
357 bool expected =
false;
358 if (!connect_to_cluster_task_submitted_.compare_exchange_strong(expected,
true)) {
362 std::weak_ptr<client::impl::hazelcast_client_instance_impl> c = client_.get_hazelcast_client_implementation();
363 boost::asio::post(executor_->get_executor(), [=]() {
365 do_connect_to_cluster();
367 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
368 connect_to_cluster_task_submitted_ = false;
369 if (active_connections_.empty()) {
370 HZ_LOG(logger_, finest,
371 boost::str(boost::format(
"No connection to cluster: %1%")
375 submit_connect_to_cluster_task();
378 } catch (std::exception &e) {
379 HZ_LOG(logger_, warning,
380 boost::str(boost::format(
"Could not connect to any cluster, "
381 "shutting down the client: %1%")
385 shutdown_with_external_thread(c);
390 void ClientConnectionManagerImpl::connect_to_all_members() {
391 if (!client_.get_lifecycle_service().is_running() || active_connections_.empty()) {
395 for (
const auto &member : client_.get_client_cluster_service().get_member_list()) {
396 const auto& member_addr = member.get_address();
398 if (client_.get_lifecycle_service().is_running() && !get_connection(member_addr)
399 && connecting_addresses_.get_or_put_if_absent(member_addr,
nullptr).second) {
401 address addr = member_addr;
402 boost::asio::post(executor_->get_executor(), [=]() {
404 if (!client_.get_lifecycle_service().is_running()) {
407 if (!get_connection(member.get_uuid())) {
408 get_or_connect(addr);
410 connecting_addresses_.remove(addr);
411 } catch (std::exception &) {
412 connecting_addresses_.remove(addr);
419 bool ClientConnectionManagerImpl::do_connect_to_cluster() {
420 std::unordered_set<address> tried_addresses;
421 wait_strategy_.reset();
424 std::unordered_set<address> tried_addresses_per_attempt;
425 auto member_list = client_.get_client_cluster_service().get_member_list();
426 if (shuffle_member_list_) {
427 shuffle(member_list);
431 for (
const auto &m : member_list) {
432 check_client_active();
433 tried_addresses_per_attempt.insert(m.get_address());
434 auto connection = try_connect(m);
440 for (
const address &server_address : get_possible_member_addresses()) {
441 check_client_active();
442 if (!tried_addresses_per_attempt.insert(server_address).second) {
446 auto connection = try_connect<address>(server_address);
451 tried_addresses.insert(tried_addresses_per_attempt.begin(), tried_addresses_per_attempt.end());
454 check_client_active();
455 }
while (wait_strategy_.sleep());
457 std::ostringstream out;
458 out <<
"Unable to connect to any address! The following addresses were tried: { ";
459 for (
const auto &address : tried_addresses) {
460 out << address <<
" , ";
463 BOOST_THROW_EXCEPTION(
464 exception::illegal_state(
"ConnectionManager::do_connect_to_cluster", out.str()));
467 std::vector<address> ClientConnectionManagerImpl::get_possible_member_addresses() {
468 std::vector<address> addresses;
469 for (
auto &&member : client_.get_client_cluster_service().get_member_list()) {
470 addresses.emplace_back(std::move(member.get_address()));
473 if (shuffle_member_list_) {
477 std::vector<address> provided_addresses = address_provider_->load_addresses();
479 if (shuffle_member_list_) {
480 shuffle(provided_addresses);
483 addresses.insert(addresses.end(), provided_addresses.begin(), provided_addresses.end());
488 void ClientConnectionManagerImpl::connect_to_cluster() {
490 submit_connect_to_cluster_task();
492 do_connect_to_cluster();
496 bool ClientConnectionManagerImpl::is_alive() {
500 void ClientConnectionManagerImpl::on_connection_close(
const std::shared_ptr<Connection> &connection) {
501 auto endpoint = connection->get_remote_address();
502 auto member_uuid = connection->get_remote_uuid();
504 auto socket_remote_address = connection->get_socket().get_remote_endpoint();
507 HZ_LOG(logger_, finest,
508 boost::str(boost::format(
"Destroying %1% , but it has end-point set to null "
509 "-> not removing it from a connection map")
515 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
516 if (active_connections_.remove(member_uuid, connection)) {
517 active_connection_ids_.remove(connection->get_connection_id());
519 HZ_LOG(logger_, info,
520 boost::str(boost::format(
"Removed connection to endpoint: %1%, connection: %2%")
521 % *endpoint % *connection)
524 if (active_connections_.empty()) {
525 fire_life_cycle_event(lifecycle_event::lifecycle_state::CLIENT_DISCONNECTED);
527 trigger_cluster_reconnection();
530 fire_connection_removed_event(connection);
532 HZ_LOG(logger_, finest,
533 boost::str(boost::format(
"Destroying a connection, but there is no mapping "
534 "%1% -> %2% in the connection map.")
535 % endpoint % *connection)
541 ClientConnectionManagerImpl::add_connection_listener(
542 const std::shared_ptr<ConnectionListener> &connection_listener) {
543 connection_listeners_.add(connection_listener);
546 ClientConnectionManagerImpl::~ClientConnectionManagerImpl() {
550 logger &ClientConnectionManagerImpl::get_logger() {
551 return client_.get_logger();
554 void ClientConnectionManagerImpl::check_client_active() {
555 if (!client_.get_lifecycle_service().is_running()) {
556 BOOST_THROW_EXCEPTION(exception::hazelcast_client_not_active(
557 "ClientConnectionManagerImpl::check_client_active",
"Client is shutdown"));
561 std::shared_ptr<Connection>
562 ClientConnectionManagerImpl::on_authenticated(
const std::shared_ptr<Connection> &connection,
563 auth_response &response) {
565 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
566 check_partition_count(response.partition_count);
567 connection->set_connected_server_version(response.server_version);
568 connection->set_remote_address(response.server_address);
569 connection->set_remote_uuid(response.member_uuid);
571 auto existing_connection = active_connections_.get(response.member_uuid);
572 if (existing_connection) {
573 connection->close((boost::format(
"Duplicate connection to same member with uuid : %1%") %
574 boost::uuids::to_string(response.member_uuid)).str());
575 return existing_connection;
578 auto new_cluster_id = response.cluster_id;
579 boost::uuids::uuid current_cluster_id = cluster_id_;
581 HZ_LOG(logger_, finest,
582 boost::str(boost::format(
"Checking the cluster: %1%, current cluster: %2%")
583 % new_cluster_id % current_cluster_id)
586 auto cluster_id_changed = !current_cluster_id.is_nil() && !(new_cluster_id == current_cluster_id);
587 if (cluster_id_changed) {
588 HZ_LOG(logger_, warning,
589 boost::str(boost::format(
"Switching from current cluster: %1% to new cluster: %2%")
590 % current_cluster_id % new_cluster_id)
592 client_.get_hazelcast_client_implementation()->on_cluster_restart();
595 auto connections_empty = active_connections_.empty();
596 active_connection_ids_.put(connection->get_connection_id(), connection);
597 active_connections_.put(response.member_uuid, connection);
598 if (connections_empty) {
599 cluster_id_ = new_cluster_id;
600 fire_life_cycle_event(lifecycle_event::lifecycle_state::CLIENT_CONNECTED);
603 auto local_address = connection->get_local_socket_address();
605 HZ_LOG(logger_, info,
606 boost::str(boost::format(
"Authenticated with server %1%:%2%, server version: %3%, "
607 "local address: %4%. %5%")
608 % response.server_address % response.member_uuid
609 % response.server_version % *local_address % *connection)
612 HZ_LOG(logger_, info,
613 boost::str(boost::format(
"Authenticated with server %1%:%2%, server version: %3%, "
614 "no local address: (connection disconnected ?). %5%")
615 % response.server_address % response.member_uuid
616 % response.server_version % *connection)
620 fire_connection_added_event(connection);
628 if (!connection->is_alive()) {
629 on_connection_close(connection);
634 if (!client_.get_lifecycle_service().is_running()) {
635 connection->close(
"Client is shutdown");
641 void ClientConnectionManagerImpl::fire_life_cycle_event(lifecycle_event::lifecycle_state state) {
642 client_.get_lifecycle_service().fire_lifecycle_event(state);
645 void ClientConnectionManagerImpl::check_partition_count(int32_t new_partition_count) {
646 auto &partition_service =
static_cast<spi::impl::ClientPartitionServiceImpl &
>(client_.get_partition_service());
647 if (!partition_service.check_and_set_partition_count(new_partition_count)) {
648 BOOST_THROW_EXCEPTION(exception::client_not_allowed_in_cluster(
"ClientConnectionManagerImpl::check_partition_count",
649 (boost::format(
"Client can not work with this cluster because it has a different partition count. "
650 "Expected partition count: %1%, Member partition count: %2%")
651 %partition_service.get_partition_count() %new_partition_count).str()));
655 void ClientConnectionManagerImpl::trigger_cluster_reconnection() {
656 if (reconnect_mode_ == config::client_connection_strategy_config::reconnect_mode::OFF) {
657 HZ_LOG(logger_, info,
"RECONNECT MODE is off. Shutting down the client.");
658 shutdown_with_external_thread(client_.get_hazelcast_client_implementation());
662 if (client_.get_lifecycle_service().is_running()) {
663 submit_connect_to_cluster_task();
667 std::shared_ptr<Connection> ClientConnectionManagerImpl::get_random_connection() {
668 if (smart_routing_enabled_) {
669 auto member = load_balancer_.next_(client_.get_cluster());
673 auto connection = get_connection(member->get_uuid());
679 auto connections = active_connections_.values();
680 if (connections.empty()) {
684 return connections[0];
687 boost::uuids::uuid ClientConnectionManagerImpl::get_client_uuid()
const {
691 void ClientConnectionManagerImpl::check_invocation_allowed() {
692 if (active_connections_.size() > 0) {
697 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
698 "ClientConnectionManagerImpl::check_invocation_allowed",
699 "No connection found to cluster and async start is configured."));
700 }
else if (reconnect_mode_ == config::client_connection_strategy_config::reconnect_mode::ASYNC) {
701 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
702 "ClientConnectionManagerImpl::check_invocation_allowed",
703 "No connection found to cluster and reconnect mode is async."));
705 BOOST_THROW_EXCEPTION(
706 exception::io(
"ClientConnectionManagerImpl::check_invocation_allowed",
707 "No connection found to cluster."));
711 void ClientConnectionManagerImpl::connect_to_all_cluster_members() {
712 if (!smart_routing_enabled_) {
716 for (
const auto &member : client_.get_client_cluster_service().get_member_list()) {
718 get_or_connect(member.get_address());
719 }
catch (std::exception &) {
725 void ClientConnectionManagerImpl::notify_backup(int64_t call_id) {
726 struct correlation_id {
727 int32_t connnection_id;
732 correlation_id composed_id;
734 c_id_union.id = call_id;
735 auto connection_id = c_id_union.composed_id.connnection_id;
736 auto connection = active_connection_ids_.get(connection_id);
740 boost::asio::post(connection->get_socket().get_executor(), [=]() {
741 auto invocation_it = connection->invocations.find(call_id);
742 if (invocation_it != connection->invocations.end()) {
743 invocation_it->second->notify_backup();
748 std::shared_ptr<Connection> ClientConnectionManagerImpl::connect(
const address &addr) {
749 auto target = address_provider_->translate(addr);
751 BOOST_THROW_EXCEPTION(exception::null_pointer(
"ClientConnectionManagerImpl::connect",
752 (boost::format(
"Address Provider could not translate "
753 "address %2%") % target).str()));
755 HZ_LOG(logger_, info, boost::str(
756 boost::format(
"Trying to connect to %1%. Translated address:%2%.") % addr % target));
758 auto connection = std::make_shared<Connection>(*target, client_, ++connection_id_gen_,
759 *socket_factory_, *
this, connection_timeout_millis_);
760 connection->connect();
763 socket_interceptor_.connect_(connection->get_socket());
765 auto result = authenticate_on_cluster(connection);
767 return on_authenticated(connection, result);
770 ReadHandler::ReadHandler(Connection &connection,
size_t buffer_size)
771 : buffer(new char[buffer_size]), byte_buffer(buffer, buffer_size), builder_(connection),
772 last_read_time_duration_(std::chrono::duration_cast<std::chrono::milliseconds>(
773 std::chrono::steady_clock::now().time_since_epoch())) {
776 ReadHandler::~ReadHandler() {
780 void ReadHandler::handle() {
781 last_read_time_duration_ = std::chrono::duration_cast<std::chrono::milliseconds>(
782 std::chrono::steady_clock::now().time_since_epoch());
784 if (byte_buffer.position() == 0)
791 while (byte_buffer.has_remaining() && builder_.on_data(byte_buffer)) {
794 if (byte_buffer.has_remaining()) {
795 byte_buffer.compact();
801 std::chrono::steady_clock::time_point ReadHandler::get_last_read_time()
const {
802 return std::chrono::steady_clock::time_point(
803 std::chrono::duration_cast<std::chrono::steady_clock::duration>(last_read_time_duration_.load()));
806 Connection::Connection(
const address &address, spi::ClientContext &client_context,
int connection_id,
807 internal::socket::SocketFactory &socket_factory,
808 ClientConnectionManagerImpl &client_connection_manager,
809 std::chrono::milliseconds &connect_timeout_in_millis)
810 : read_handler(*this, 16 << 10),
811 start_time_(std::chrono::system_clock::now()),
812 closed_time_duration_(),
813 client_context_(client_context),
814 invocation_service_(client_context.get_invocation_service()),
815 connection_id_(connection_id),
816 remote_uuid_(boost::uuids::nil_uuid()), logger_(client_context.get_logger()), alive_(true) {
817 socket_ = socket_factory.create(address, connect_timeout_in_millis);
820 Connection::~Connection() =
default;
822 void Connection::connect() {
823 socket_->connect(shared_from_this());
824 backup_timer_.reset(
new boost::asio::steady_timer(socket_->get_executor().context()));
825 auto backupTimeout =
static_cast<spi::impl::ClientInvocationServiceImpl &
>(invocation_service_).get_backup_timeout();
826 auto this_connection = shared_from_this();
827 schedule_periodic_backup_cleanup(backupTimeout, this_connection);
830 void Connection::schedule_periodic_backup_cleanup(std::chrono::milliseconds backup_timeout,
831 std::shared_ptr<Connection> this_connection) {
832 backup_timer_->expires_from_now(backup_timeout);
833 backup_timer_->async_wait(socket_->get_executor().wrap([=](boost::system::error_code ec) {
837 for (
const auto &it : this_connection->invocations) {
838 it.second->detect_and_handle_backup_timeout(backup_timeout);
841 schedule_periodic_backup_cleanup(backup_timeout, this_connection);
845 void Connection::close() {
849 void Connection::close(
const std::string &reason) {
850 close(reason,
nullptr);
853 void Connection::close(
const std::string &reason, std::exception_ptr cause) {
854 bool expected =
true;
855 if (!alive_.compare_exchange_strong(expected,
false)) {
859 closed_time_duration_.store(std::chrono::duration_cast<std::chrono::milliseconds>(
860 std::chrono::steady_clock::now().time_since_epoch()));
863 boost::system::error_code ignored;
864 backup_timer_->cancel(ignored);
867 close_cause_ = cause;
868 close_reason_ = reason;
874 }
catch (exception::iexception &e) {
875 HZ_LOG(client_context_.get_logger(), warning,
876 boost::str(boost::format(
"Exception while closing connection %1%")
881 auto thisConnection = shared_from_this();
882 client_context_.get_connection_manager().on_connection_close(thisConnection);
884 boost::asio::post(socket_->get_executor(), [=]() {
885 for (auto &invocationEntry : thisConnection->invocations) {
886 invocationEntry.second->notify_exception(
887 std::make_exception_ptr(boost::enable_current_exception(
888 exception::target_disconnected(
"Connection::close",
889 thisConnection->get_close_reason()))));
894 void Connection::write(
const std::shared_ptr<spi::impl::ClientInvocation> &client_invocation) {
895 socket_->async_write(shared_from_this(), client_invocation);
898 const boost::optional<address> &Connection::get_remote_address()
const {
899 return remote_address_;
902 void Connection::set_remote_address(boost::optional<address> endpoint) {
903 this->remote_address_ = std::move(endpoint);
906 void Connection::handle_client_message(
const std::shared_ptr<protocol::ClientMessage> &message) {
907 auto correlationId = message->get_correlation_id();
908 auto invocationIterator = invocations.find(correlationId);
909 if (invocationIterator == invocations.end()) {
910 HZ_LOG(logger_, warning,
911 boost::str(boost::format(
"No invocation for callId: %1%. "
912 "Dropping this message: %2%")
913 % correlationId % *message)
917 auto invocation = invocationIterator->second;
918 auto flags = message->get_header_flags();
919 if (message->is_flag_set(flags, protocol::ClientMessage::BACKUP_EVENT_FLAG)) {
920 message->rd_ptr(protocol::ClientMessage::EVENT_HEADER_LEN);
921 correlationId = message->get<int64_t>();
922 client_context_.get_connection_manager().notify_backup(correlationId);
923 }
else if (message->is_flag_set(flags, protocol::ClientMessage::IS_EVENT_FLAG)) {
924 client_context_.get_client_listener_service().handle_client_message(invocation, message);
926 invocation_service_.handle_client_message(invocation, message);
930 int32_t Connection::get_connection_id()
const {
931 return connection_id_;
934 bool Connection::is_alive()
const {
938 const std::string &Connection::get_close_reason()
const {
939 return close_reason_;
942 void Connection::log_close() {
943 std::ostringstream message;
944 message << *
this <<
" closed. Reason: ";
945 if (!close_reason_.empty()) {
946 message << close_reason_;
947 }
else if (close_cause_) {
949 std::rethrow_exception(close_cause_);
950 }
catch (exception::iexception &ie) {
951 message << ie.get_source() <<
"[" + ie.get_message() <<
"]";
954 message <<
"Socket explicitly closed";
957 if (client_context_.get_lifecycle_service().is_running()) {
959 HZ_LOG(logger_, info, message.str());
962 std::rethrow_exception(close_cause_);
963 }
catch (exception::iexception &ie) {
964 HZ_LOG(logger_, warning,
965 boost::str(boost::format(
"%1%%2%") % message.str() % ie)
970 HZ_LOG(logger_, finest,
972 [
this]() -> std::string {
975 std::rethrow_exception(close_cause_);
976 } catch (exception::iexception &ie) {
986 bool Connection::operator==(
const Connection &rhs)
const {
987 return connection_id_ == rhs.connection_id_;
990 bool Connection::operator!=(
const Connection &rhs)
const {
991 return !(rhs == *
this);
994 const std::string &Connection::get_connected_server_version_string()
const {
995 return connected_server_version_string_;
998 void Connection::set_connected_server_version(
const std::string &connected_server) {
999 Connection::connected_server_version_string_ = connected_server;
1002 boost::optional<address> Connection::get_local_socket_address()
const {
1003 return socket_->local_socket_address();
1006 std::chrono::steady_clock::time_point Connection::last_read_time()
const {
1007 return read_handler.get_last_read_time();
1010 void Connection::inner_close() {
1015 auto thisConnection = shared_from_this();
1016 boost::asio::post(socket_->get_executor(), [=] () { thisConnection->socket_->close(); });
1019 std::ostream &operator<<(std::ostream &os,
const Connection &connection) {
1020 os <<
"ClientConnection{"
1021 <<
"alive=" << connection.is_alive()
1022 <<
", connectionId=" << connection.get_connection_id()
1023 <<
", remoteEndpoint=";
1024 if (connection.get_remote_address()) {
1025 os << *connection.get_remote_address();
1029 os <<
", lastReadTime=" << util::StringUtil::time_to_string(connection.last_read_time())
1030 <<
", closedTime=" << util::StringUtil::time_to_string(std::chrono::steady_clock::time_point(
1031 std::chrono::duration_cast<std::chrono::steady_clock::duration>(connection.closed_time_duration_.load())))
1032 <<
", connected server version=" << connection.connected_server_version_string_
1038 bool Connection::operator<(
const Connection &rhs)
const {
1039 return connection_id_ < rhs.connection_id_;
1042 std::chrono::system_clock::time_point Connection::get_start_time()
const {
1046 socket &Connection::get_socket() {
1050 void Connection::deregister_invocation(int64_t call_id) {
1051 invocations.erase(call_id);
1054 boost::uuids::uuid Connection::get_remote_uuid()
const {
1055 return remote_uuid_;
1058 void Connection::set_remote_uuid(boost::uuids::uuid remote_uuid) {
1059 remote_uuid_ = remote_uuid;
1062 HeartbeatManager::HeartbeatManager(spi::ClientContext &client,
1063 ClientConnectionManagerImpl &connection_manager)
1064 : client_(client), client_connection_manager_(connection_manager), logger_(client.get_logger()) {
1065 client_properties &clientProperties = client.get_client_properties();
1066 auto timeout_millis = clientProperties.get_long(clientProperties.get_heartbeat_timeout());
1067 heartbeat_timeout_ = std::chrono::milliseconds(
1068 timeout_millis > 0 ? timeout_millis : util::IOUtil::to_value<int64_t>(
1069 client_properties::PROP_HEARTBEAT_TIMEOUT_DEFAULT));
1071 auto interval_millis = clientProperties.get_long(clientProperties.get_heartbeat_interval());
1072 heartbeat_interval_ = std::chrono::milliseconds(interval_millis > 0 ? interval_millis
1073 : util::IOUtil::to_value<int64_t>(client_properties::PROP_HEARTBEAT_INTERVAL_DEFAULT));
1076 void HeartbeatManager::start() {
1077 spi::impl::ClientExecutionServiceImpl &clientExecutionService = client_.get_client_execution_service();
1079 timer_ = clientExecutionService.schedule_with_repetition([=]() {
1080 if (!client_connection_manager_.is_alive()) {
1084 for (
auto &connection : client_connection_manager_.get_active_connections()) {
1085 check_connection(connection);
1087 }, heartbeat_interval_, heartbeat_interval_);
1090 void HeartbeatManager::check_connection(
const std::shared_ptr<Connection> &connection) {
1091 if (!connection->is_alive()) {
1095 auto now = std::chrono::steady_clock::now();
1096 if (now - connection->last_read_time() > heartbeat_timeout_) {
1097 HZ_LOG(logger_, warning,
1098 boost::str(boost::format(
"Heartbeat failed over the connection: %1%") % *connection)
1100 on_heartbeat_stopped(connection,
"Heartbeat timed out");
1104 if (now - connection->last_read_time() > heartbeat_interval_) {
1105 auto request = protocol::codec::client_ping_encode();
1106 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation = spi::impl::ClientInvocation::create(
1107 client_, request,
"", connection);
1108 clientInvocation->invoke_urgent();
1113 HeartbeatManager::on_heartbeat_stopped(
const std::shared_ptr<Connection> &connection,
1114 const std::string &reason) {
1115 connection->close(reason, std::make_exception_ptr(
1116 (exception::exception_builder<exception::target_disconnected>(
1117 "HeartbeatManager::onHeartbeatStopped") <<
"Heartbeat timed out to connection "
1118 << *connection).build()));
1121 void HeartbeatManager::shutdown() {
1123 boost::system::error_code ignored;
1124 timer_->cancel(ignored);
1128 std::chrono::milliseconds HeartbeatManager::get_heartbeat_timeout()
const {
1129 return heartbeat_timeout_;
1132 void wait_strategy::reset() {
1134 cluster_connect_attempt_begin_ = std::chrono::steady_clock::now();
1135 current_backoff_millis_ = (std::min)(max_backoff_millis_, initial_backoff_millis_);
1138 wait_strategy::wait_strategy(
const config::connection_retry_config &retry_config, logger &log)
1139 : initial_backoff_millis_(retry_config.get_initial_backoff_duration()),
1140 max_backoff_millis_(retry_config.get_max_backoff_duration()),
1141 multiplier_(retry_config.get_multiplier()), jitter_(retry_config.get_jitter()), logger_(log),
1142 cluster_connect_timeout_millis_(retry_config.get_cluster_connect_timeout()) {
1143 if (cluster_connect_timeout_millis_ == std::chrono::milliseconds::max()) {
1144 cluster_connect_timeout_text_ =
"INFINITE";
1146 cluster_connect_timeout_text_ = (boost::format(
"%1% msecs")
1147 %cluster_connect_timeout_millis_.count()).str();
1151 bool wait_strategy::sleep() {
1153 using namespace std::chrono;
1154 auto current_time = steady_clock::now();
1155 auto time_passed = duration_cast<milliseconds>(current_time - cluster_connect_attempt_begin_);
1156 if (time_passed > cluster_connect_timeout_millis_) {
1157 HZ_LOG(logger_, warning, (boost::format(
1158 "Unable to get live cluster connection, cluster connect timeout (%1%) is reached. Attempt %2%.")
1159 %cluster_connect_timeout_text_ %attempt_).str());
1164 auto actual_sleep_time = current_backoff_millis_ + milliseconds(
1165 static_cast<milliseconds::rep
>(current_backoff_millis_.count() * jitter_ *
1166 (2.0 * random_(random_generator_) - 1.0)));
1168 actual_sleep_time = (std::min)(actual_sleep_time, cluster_connect_timeout_millis_ - time_passed);
1170 HZ_LOG(logger_, warning, (boost::format(
1171 "Unable to get live cluster connection, retry in %1% ms, attempt: %2% , cluster connect timeout: %3% , max backoff millis: %4%") %
1172 actual_sleep_time.count() % attempt_ %
1173 cluster_connect_timeout_text_ %
1174 max_backoff_millis_.count()).str());
1176 std::this_thread::sleep_for(actual_sleep_time);
1178 current_backoff_millis_ = (std::min)(
1179 milliseconds(
static_cast<milliseconds::rep
>(current_backoff_millis_.count() * multiplier_)),
1180 max_backoff_millis_);
1185 namespace internal {
1187 SocketFactory::SocketFactory(spi::ClientContext &client_context, boost::asio::io_context &io,
1188 boost::asio::ip::tcp::resolver &resolver)
1189 : client_context_(client_context), io_(io), io_resolver_(resolver) {
1192 bool SocketFactory::start() {
1193 #ifdef HZ_BUILD_WITH_SSL
1194 auto &sslConfig = client_context_.get_client_config().get_network_config().get_ssl_config();
1195 if (sslConfig.is_enabled()) {
1196 if (sslConfig.ssl_context_) {
1197 ssl_context_ = sslConfig.ssl_context_;
1199 ssl_context_ = std::make_shared<boost::asio::ssl::context>(
1200 (boost::asio::ssl::context_base::method) sslConfig.get_protocol());
1202 ssl_context_->set_verify_mode(boost::asio::ssl::verify_peer);
1203 ssl_context_->set_default_verify_paths();
1205 const std::vector<std::string> &verifyFiles = sslConfig.get_verify_files();
1206 bool success =
true;
1207 logger &lg = client_context_.get_logger();
1208 for (
const auto &f : verifyFiles) {
1209 boost::system::error_code ec;
1210 ssl_context_->load_verify_file(f, ec);
1213 boost::str(boost::format(
"SocketFactory::start: Failed to load CA "
1214 "verify file at %1% %2%")
1222 ssl_context_.reset();
1224 "SocketFactory::start: Failed to load one or more "
1225 "configured CA verify files (PEM files). Please "
1226 "correct the files and retry."
1233 const std::string &cipherList = sslConfig.get_cipher_list();
1234 if (!cipherList.empty()) {
1235 if (!SSL_CTX_set_cipher_list(ssl_context_->native_handle(), cipherList.c_str())) {
1236 logger &lg = client_context_.get_logger();
1238 std::string(
"SocketFactory::start: Could not load any "
1239 "of the ciphers in the config provided "
1240 "ciphers:") + cipherList
1248 (void) client_context_;
1253 std::unique_ptr<hazelcast::client::socket> SocketFactory::create(
const address &address,
1254 std::chrono::milliseconds &connect_timeout_in_millis) {
1255 #ifdef HZ_BUILD_WITH_SSL
1256 if (ssl_context_.get()) {
1257 return std::unique_ptr<hazelcast::client::socket>(
1258 new internal::socket::SSLSocket(io_, *ssl_context_, address,
1259 client_context_.get_client_config().get_network_config().get_socket_options(),
1260 connect_timeout_in_millis, io_resolver_));
1264 return std::unique_ptr<hazelcast::client::socket>(
new internal::socket::TcpSocket(io_, address,
1265 client_context_.get_client_config().get_network_config().get_socket_options(),
1266 connect_timeout_in_millis,
1270 #ifdef HZ_BUILD_WITH_SSL
1272 SSLSocket::SSLSocket(boost::asio::io_context &io_service, boost::asio::ssl::context &ssl_context,
1273 const client::address &address, client::config::socket_options &socket_options,
1274 std::chrono::milliseconds &connect_timeout_in_millis,
1275 boost::asio::ip::tcp::resolver &resolver)
1276 : BaseSocket<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>(resolver, address,
1277 socket_options, io_service,connect_timeout_in_millis, ssl_context) {
1280 std::vector<SSLSocket::CipherInfo> SSLSocket::get_ciphers() {
1281 STACK_OF(SSL_CIPHER) *ciphers = SSL_get_ciphers(socket_.native_handle());
1282 std::vector<CipherInfo> supportedCiphers;
1283 for (
int i = 0; i < sk_SSL_CIPHER_num(ciphers); ++i) {
1284 struct SSLSocket::CipherInfo info;
1285 const SSL_CIPHER *cipher = sk_SSL_CIPHER_value(ciphers, i);
1286 info.name = SSL_CIPHER_get_name(cipher);
1287 info.number_of_bits = SSL_CIPHER_get_bits(cipher, 0);
1288 info.version = SSL_CIPHER_get_version(cipher);
1290 info.description = SSL_CIPHER_description(cipher, descBuf, 256);
1291 supportedCiphers.push_back(info);
1293 return supportedCiphers;
1296 void SSLSocket::post_connect() {
1297 socket_.handshake(boost::asio::ssl::stream_base::client);
1300 std::ostream &operator<<(std::ostream &out,
const SSLSocket::CipherInfo &info) {
1302 "Name: " << info.name <<
1303 ", Bits:" << info.number_of_bits <<
1304 ", Version:" << info.version <<
1305 ", Description:" << info.description <<
"}";
1312 TcpSocket::TcpSocket(boost::asio::io_context &io,
const address &address,
1313 client::config::socket_options &socket_options,
1314 std::chrono::milliseconds &connect_timeout_in_millis,
1315 boost::asio::ip::tcp::resolver &resolver)
1316 : BaseSocket<boost::asio::ip::tcp::socket>(resolver, address, socket_options, io,
1317 connect_timeout_in_millis) {
1326 std::size_t hash<std::shared_ptr<hazelcast::client::connection::Connection>>::operator()(
1327 const std::shared_ptr<hazelcast::client::connection::Connection> &conn)
const noexcept {
1331 return std::abs(conn->get_connection_id());