Hazelcast C++ Client
Hazelcast C++ Client Library
Loading...
Searching...
No Matches
network.cpp
1/*
2 * Copyright (c) 2008-2025, Hazelcast, Inc. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <cstdlib>
18#include <unordered_set>
19
20#include "hazelcast/client/execution_callback.h"
21#include "hazelcast/client/lifecycle_event.h"
22#include "hazelcast/client/connection/AddressProvider.h"
23#include "hazelcast/client/spi/impl/ClientInvocation.h"
24#include "hazelcast/util/Util.h"
25#include "hazelcast/client/protocol/AuthenticationStatus.h"
26#include "hazelcast/client/exception/protocol_exceptions.h"
27#include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
28#include "hazelcast/client/connection/ConnectionListener.h"
29#include "hazelcast/client/connection/Connection.h"
30#include "hazelcast/client/spi/ClientContext.h"
31#include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
32#include "hazelcast/client/serialization/serialization.h"
33#include "hazelcast/client/protocol/UsernamePasswordCredentials.h"
34#include "hazelcast/client/protocol/codec/codecs.h"
35#include "hazelcast/client/client_config.h"
36#include "hazelcast/client/socket_interceptor.h"
37#include "hazelcast/client/config/client_network_config.h"
38#include "hazelcast/client/client_properties.h"
39#include "hazelcast/client/connection/HeartbeatManager.h"
40#include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
41#include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
42#include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
43#include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
44#include "hazelcast/client/internal/socket/TcpSocket.h"
45#include "hazelcast/client/internal/socket/SSLSocket.h"
46#include "hazelcast/client/config/ssl_config.h"
47#include "hazelcast/util/IOUtil.h"
48#include "hazelcast/client/internal/socket/SocketFactory.h"
49#include "hazelcast/client/connection/wait_strategy.h"
50
51namespace hazelcast {
52namespace client {
53namespace connection {
54constexpr size_t ClientConnectionManagerImpl::EXECUTOR_CORE_POOL_SIZE;
55const endpoint_qualifier ClientConnectionManagerImpl::PUBLIC_ENDPOINT_QUALIFIER{
56 CLIENT,
57 "public"
58};
59
60constexpr int ClientConnectionManagerImpl::SQL_CONNECTION_RANDOM_ATTEMPTS;
61
62ClientConnectionManagerImpl::ClientConnectionManagerImpl(
63 spi::ClientContext& client,
64 std::unique_ptr<AddressProvider> address_provider)
65 : alive_(false)
66 , logger_(client.get_logger())
67 , connection_timeout_millis_((std::chrono::milliseconds::max)())
68 , client_(client)
69 , socket_interceptor_(client.get_client_config().get_socket_interceptor())
70 , address_provider_(std::move(address_provider))
71 , connection_id_gen_(0)
72 , heartbeat_(client, *this)
73 , async_start_(client.get_client_config()
74 .get_connection_strategy_config()
75 .is_async_start())
76 , reconnect_mode_(client.get_client_config()
77 .get_connection_strategy_config()
78 .get_reconnect_mode())
79 , smart_routing_enabled_(
80 client.get_client_config().get_network_config().is_smart_routing())
81 , client_uuid_(client.random_uuid())
82 , authentication_timeout_(
83 boost::chrono::milliseconds(heartbeat_.get_heartbeat_timeout().count()))
84 , load_balancer_(client.get_client_config().get_load_balancer())
85 , wait_strategy_(client.get_client_config()
86 .get_connection_strategy_config()
87 .get_retry_config(),
88 logger_)
89 , cluster_id_(boost::uuids::nil_uuid())
90 , client_state_(client_state::INITIAL)
91 , connect_to_cluster_task_submitted_(false)
92 , established_initial_cluster_connection(false)
93 , use_public_address_(
94 address_provider_->is_default_provider() &&
95 client.get_client_config().get_network_config().use_public_address())
96{
97 config::client_network_config& networkConfig =
98 client.get_client_config().get_network_config();
99 auto connTimeout = networkConfig.get_connection_timeout();
100 if (connTimeout.count() > 0) {
101 connection_timeout_millis_ = std::chrono::milliseconds(connTimeout);
102 }
103
104 client_properties& clientProperties = client.get_client_properties();
105 shuffle_member_list_ =
106 clientProperties.get_boolean(clientProperties.get_shuffle_member_list());
107}
108
109bool
110ClientConnectionManagerImpl::start()
111{
112 bool expected = false;
113 if (!alive_.compare_exchange_strong(expected, true)) {
114 return false;
115 }
116
117 io_context_.reset(new boost::asio::io_context);
118 io_resolver_.reset(
119 new boost::asio::ip::tcp::resolver(io_context_->get_executor()));
120 socket_factory_.reset(new internal::socket::SocketFactory(
121 client_, *io_context_, *io_resolver_));
122 auto guard = boost::asio::make_work_guard(*io_context_);
123 io_guard_ = std::unique_ptr<
124 boost::asio::executor_work_guard<boost::asio::io_context::executor_type>>(
125 new boost::asio::executor_work_guard<
126 boost::asio::io_context::executor_type>(std::move(guard)));
127
128 if (!socket_factory_->start()) {
129 return false;
130 }
131
132 socket_interceptor_ = client_.get_client_config().get_socket_interceptor();
133
134 io_thread_ = std::thread([=]() { io_context_->run(); });
135
136 executor_.reset(
137 new hazelcast::util::hz_thread_pool(EXECUTOR_CORE_POOL_SIZE));
138 connect_to_members_timer_ =
139 boost::asio::steady_timer(executor_->get_executor());
140
141 heartbeat_.start();
142
143 connect_to_cluster();
144 if (smart_routing_enabled_) {
145 schedule_connect_to_all_members();
146 }
147
148 load_balancer_.init_(client_.get_cluster());
149
150 return true;
151}
152
153void
154ClientConnectionManagerImpl::schedule_connect_to_all_members()
155{
156 if (!client_.get_lifecycle_service().is_running()) {
157 return;
158 }
159
160 connect_to_members_timer_->expires_after(
161 boost::asio::chrono::seconds(1));
162 connect_to_members_timer_->async_wait([=](boost::system::error_code ec) {
163 if (ec == boost::asio::error::operation_aborted) {
164 return;
165 }
166 connect_to_all_members();
167
168 schedule_connect_to_all_members();
169 });
170}
171
172void
173ClientConnectionManagerImpl::shutdown()
174{
175 bool expected = true;
176 if (!alive_.compare_exchange_strong(expected, false)) {
177 return;
178 }
179
180 if (connect_to_members_timer_) {
181 connect_to_members_timer_->cancel();
182 }
183
184 heartbeat_.shutdown();
185
186 // close connections
187 for (auto& connection : active_connections_.values()) {
188 // prevent any exceptions
189 util::IOUtil::close_resource(connection.get(),
190 "Hazelcast client is shutting down");
191 }
192
193 spi::impl::ClientExecutionServiceImpl::shutdown_thread_pool(
194 executor_.get());
195
196 // release the guard so that the io thread can stop gracefully
197 io_guard_.reset();
198 io_thread_.join();
199
200 connection_listeners_.clear();
201 active_connections_.clear();
202 active_connection_ids_.clear();
203}
204
205std::shared_ptr<Connection>
206ClientConnectionManagerImpl::get_or_connect(const member& m)
207{
208 const auto& uuid = m.get_uuid();
209 auto connection = active_connections_.get(uuid);
210 if (connection) {
211 return connection;
212 }
213
214 address addr = translate(m);
215 return connect(addr);
216}
217
218std::vector<std::shared_ptr<Connection>>
219ClientConnectionManagerImpl::get_active_connections()
220{
221 return active_connections_.values();
222}
223
224std::shared_ptr<Connection>
225ClientConnectionManagerImpl::get_connection(boost::uuids::uuid uuid)
226{
227 return active_connections_.get(uuid);
228}
229
230ClientConnectionManagerImpl::auth_response
231ClientConnectionManagerImpl::authenticate_on_cluster(
232 std::shared_ptr<Connection>& connection)
233{
234 auto request =
235 encode_authentication_request(client_.get_serialization_service());
236 auto clientInvocation =
237 spi::impl::ClientInvocation::create(client_, request, "", connection);
238 auto f = clientInvocation->invoke_urgent();
239
240 struct auth_response result;
241 try {
242 if (f.wait_for(authentication_timeout_) !=
243 boost::future_status::ready) {
244 BOOST_THROW_EXCEPTION(exception::timeout(
245 "ClientConnectionManagerImpl::authenticate",
246 (boost::format("Authentication response is "
247 "not received for %1% msecs for %2%") %
248 authentication_timeout_.count() % *clientInvocation)
249 .str()));
250 }
251 auto response = f.get();
252 auto* initial_frame =
253 reinterpret_cast<protocol::ClientMessage::frame_header_type*>(
254 response.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN));
255 result = { response.get<byte>(),
256 response.get<boost::uuids::uuid>(),
257 response.get<byte>(),
258 response.get<int32_t>(),
259 response.get<boost::uuids::uuid>() };
260 // skip first frame
261 response.rd_ptr(static_cast<int32_t>(initial_frame->frame_len) -
262 protocol::ClientMessage::RESPONSE_HEADER_LEN -
263 2 * protocol::ClientMessage::UINT8_SIZE -
264 2 * (util::Bits::UUID_SIZE_IN_BYTES +
265 protocol::ClientMessage::UINT8_SIZE) -
266 protocol::ClientMessage::INT32_SIZE);
267
268 result.server_address = response.get_nullable<address>();
269 result.server_version = response.get<std::string>();
270 } catch (exception::iexception&) {
271 connection->close("Failed to authenticate connection",
272 std::current_exception());
273 throw;
274 }
275
276 auto authentication_status = (protocol::authentication_status)result.status;
277 switch (authentication_status) {
278 case protocol::AUTHENTICATED: {
279 return result;
280 }
281 case protocol::CREDENTIALS_FAILED: {
282 auto e = std::make_exception_ptr(exception::authentication(
283 "ClientConnectionManagerImpl::authenticate_on_cluster",
284 "Authentication failed. The configured cluster name on the "
285 "client (see client_config::set_cluster_name()) does not match "
286 "the one configured in the cluster or the credentials set in the "
287 "Client security config could not be authenticated"));
288 connection->close("Failed to authenticate connection", e);
289 std::rethrow_exception(e);
290 }
291 case protocol::NOT_ALLOWED_IN_CLUSTER: {
292 auto e = std::make_exception_ptr(exception::authentication(
293 "ClientConnectionManagerImpl::authenticate_on_cluster",
294 "Client is not allowed in the cluster"));
295 connection->close("Failed to authenticate connection", e);
296 std::rethrow_exception(e);
297 }
298 default: {
299 auto e = std::make_exception_ptr(exception::authentication(
300 "ClientConnectionManagerImpl::authenticate_on_cluster",
301 (boost::format(
302 "Authentication status code not supported. status: %1%") %
303 authentication_status)
304 .str()));
305 connection->close("Failed to authenticate connection", e);
306 std::rethrow_exception(e);
307 }
308 }
309}
310
311std::ostream&
312operator<<(std::ostream& os, ClientConnectionManagerImpl::client_state state)
313{
314 using client_state = ClientConnectionManagerImpl::client_state;
315
316 switch (state) {
317 case client_state::INITIAL:
318 return os << "INITIAL";
319 case client_state::CONNECTED_TO_CLUSTER:
320 return os << "CONNECTED_TO_CLUSTER";
321 case client_state::INITIALIZED_ON_CLUSTER:
322 return os << "INITIALIZED_ON_CLUSTER";
323 case client_state::DISCONNECTED_FROM_CLUSTER:
324 return os << "DISCONNECTED_FROM_CLUSTER";
325 }
326
327 return os;
328}
329
330protocol::ClientMessage
331ClientConnectionManagerImpl::encode_authentication_request(
332 serialization::pimpl::SerializationService& ss)
333{
334 byte serializationVersion = ss.get_version();
335 client_config& clientConfig = client_.get_client_config();
336 auto cluster_name = clientConfig.get_cluster_name();
337
338 auto credential = clientConfig.get_credentials();
339 if (!credential) {
340 return protocol::codec::client_authentication_encode(
341 cluster_name,
342 nullptr,
343 nullptr,
344 client_uuid_,
345 protocol::ClientTypes::CPP,
346 serializationVersion,
347 HAZELCAST_VERSION,
348 client_.get_name(),
349 labels_,
350 ALL_MEMBERS_ROUTING, // default routing to ALL_MEMBERS
351 false);
352 }
353
354 switch (credential->type()) {
355 case security::credentials::credential_type::username_password: {
356 auto cr =
357 std::static_pointer_cast<security::username_password_credentials>(
358 credential);
359 return protocol::codec::client_authentication_encode(
360 cluster_name,
361 &cr->name(),
362 &cr->password(),
363 client_uuid_,
364 protocol::ClientTypes::CPP,
365 serializationVersion,
366 HAZELCAST_VERSION,
367 client_.get_name(),
368 labels_,
369 ALL_MEMBERS_ROUTING, // default routing to ALL_MEMBERS,
370 false);
371 }
372 case security::credentials::credential_type::token: {
373 auto cr =
374 std::static_pointer_cast<security::token_credentials>(credential);
375 return protocol::codec::client_authenticationcustom_encode(
376 cluster_name,
377 cr->token(),
378 client_uuid_,
379 protocol::ClientTypes::CPP,
380 serializationVersion,
381 HAZELCAST_VERSION,
382 client_.get_name(),
383 labels_,
384 ALL_MEMBERS_ROUTING, // default routing to ALL_MEMBERS
385 false);
386 }
387 }
388 assert(0);
389 return protocol::ClientMessage();
390}
391
392void
393ClientConnectionManagerImpl::fire_connection_added_event(
394 const std::shared_ptr<Connection>& connection)
395{
396 for (const std::shared_ptr<ConnectionListener>& connectionListener :
397 connection_listeners_.to_array()) {
398 connectionListener->connection_added(connection);
399 }
400}
401
402void
403ClientConnectionManagerImpl::fire_connection_removed_event(
404 const std::shared_ptr<Connection>& connection)
405{
406 for (const auto& listener : connection_listeners_.to_array()) {
407 listener->connection_removed(connection);
408 }
409}
410
411void
412ClientConnectionManagerImpl::shutdown_with_external_thread(
413 std::weak_ptr<client::impl::hazelcast_client_instance_impl> client_impl)
414{
415 std::thread([=] {
416 std::shared_ptr<client::impl::hazelcast_client_instance_impl>
417 clientInstance = client_impl.lock();
418 if (!clientInstance ||
419 !clientInstance->get_lifecycle_service().is_running()) {
420 return;
421 }
422
423 try {
424 clientInstance->get_lifecycle_service().shutdown();
425 } catch (exception::iexception& e) {
426 HZ_LOG(*clientInstance->get_logger(),
427 severe,
428 boost::str(boost::format("Exception during client shutdown "
429 "%1%.clientShutdown-:%2%") %
430 clientInstance->get_name() % e));
431 }
432 }).detach();
433}
434
435void
436ClientConnectionManagerImpl::submit_connect_to_cluster_task()
437{
438 bool expected = false;
439 if (!connect_to_cluster_task_submitted_.compare_exchange_strong(expected,
440 true)) {
441 return;
442 }
443
444 std::weak_ptr<client::impl::hazelcast_client_instance_impl> c =
445 client_.get_hazelcast_client_implementation();
446 boost::asio::post(executor_->get_executor(), [=]() {
447 try {
448 do_connect_to_cluster();
449
450 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
451 connect_to_cluster_task_submitted_ = false;
452 if (active_connections_.empty()) {
453 HZ_LOG(
454 logger_,
455 finest,
456 boost::str(boost::format("No connection to cluster: %1%") %
457 cluster_id_));
458
459 submit_connect_to_cluster_task();
460 }
461
462 } catch (std::exception& e) {
463 HZ_LOG(logger_,
464 warning,
465 boost::str(boost::format("Could not connect to any cluster, "
466 "shutting down the client: %1%") %
467 e.what()));
468
469 shutdown_with_external_thread(c);
470 }
471 });
472}
473
474void
475ClientConnectionManagerImpl::connect_to_all_members()
476{
477 if (!client_.get_lifecycle_service().is_running() ||
478 active_connections_.empty()) {
479 return;
480 }
481
482 for (const auto& m :
483 client_.get_client_cluster_service().get_member_list()) {
484
485 {
486 std::lock_guard<std::recursive_mutex> guard{ client_state_mutex_ };
487
488 if (client_state_ == client_state::DISCONNECTED_FROM_CLUSTER) {
489 // Best effort check to prevent this task from attempting to
490 // open a new connection when the client is either switching
491 // clusters or is not connected to any of the cluster members.
492 // In such occasions, only `do_connect_to_cluster`
493 // method should open new connections.
494 return;
495 }
496 }
497
498 if (client_.get_lifecycle_service().is_running() &&
499 !get_connection(m.get_uuid()) &&
500 connecting_members_.get_or_put_if_absent(m, nullptr).second) {
501 // submit a task for this address only if there is no other pending
502 // connection attempt for it
503 member member_to_connect = m;
504 boost::asio::post(
505 executor_->get_executor(), [member_to_connect, this]() {
506 try {
507 if (!client_.get_lifecycle_service().is_running()) {
508 return;
509 }
510 if (!get_connection(member_to_connect.get_uuid())) {
511 get_or_connect(member_to_connect);
512 }
513 connecting_members_.remove(member_to_connect);
514 } catch (std::exception&) {
515 connecting_members_.remove(member_to_connect);
516 }
517 });
518 }
519 }
520}
521
522void
523ClientConnectionManagerImpl::do_connect_to_cluster()
524{
525 std::unordered_set<address> tried_addresses;
526 wait_strategy_.reset();
527
528 do {
529 std::unordered_set<address> tried_addresses_per_attempt;
530 auto member_list =
531 client_.get_client_cluster_service().get_member_list();
532 if (shuffle_member_list_) {
533 shuffle(member_list);
534 }
535
536 // try to connect to a member in the member list first
537 for (const auto& m : member_list) {
538 check_client_active();
539 tried_addresses_per_attempt.insert(m.get_address());
540 auto connection = try_connect(m);
541 if (connection) {
542 return;
543 }
544 }
545 // try to connect to a member given via config(explicit config/discovery
546 // mechanisms)
547 for (const address& server_address : get_possible_member_addresses()) {
548 check_client_active();
549 if (!tried_addresses_per_attempt.insert(server_address).second) {
550 // if we can not add it means that it is already tried to be
551 // connected with the member list
552 continue;
553 }
554 auto connection = try_connect<address>(server_address);
555 if (connection) {
556 return;
557 }
558 }
559 tried_addresses.insert(tried_addresses_per_attempt.begin(),
560 tried_addresses_per_attempt.end());
561 // If the address provider loads no addresses, then the above loop is
562 // not entered and the lifecycle check is missing, hence we need to
563 // repeat the same check at this point.
564 check_client_active();
565 } while (wait_strategy_.sleep());
566
567 std::ostringstream out;
568 out << "Unable to connect to any address! The following addresses were "
569 "tried: { ";
570 for (const auto& address : tried_addresses) {
571 out << address << " , ";
572 }
573 out << "}";
574 BOOST_THROW_EXCEPTION(exception::illegal_state(
575 "ConnectionManager::do_connect_to_cluster", out.str()));
576}
577
578std::vector<address>
579ClientConnectionManagerImpl::get_possible_member_addresses()
580{
581 std::vector<address> addresses;
582 for (auto&& member :
583 client_.get_client_cluster_service().get_member_list()) {
584 addresses.emplace_back(std::move(member.get_address()));
585 }
586
587 if (shuffle_member_list_) {
588 shuffle(addresses);
589 }
590
591 std::vector<address> provided_addresses =
592 address_provider_->load_addresses();
593
594 if (shuffle_member_list_) {
595 shuffle(provided_addresses);
596 }
597
598 addresses.insert(
599 addresses.end(), provided_addresses.begin(), provided_addresses.end());
600
601 return addresses;
602}
603
604void
605ClientConnectionManagerImpl::connect_to_cluster()
606{
607 if (async_start_) {
608 submit_connect_to_cluster_task();
609 } else {
610 do_connect_to_cluster();
611 }
612}
613
614bool
615ClientConnectionManagerImpl::is_alive()
616{
617 return alive_;
618}
619
620void
621ClientConnectionManagerImpl::on_connection_close(
622 const std::shared_ptr<Connection>& connection)
623{
624 auto endpoint = connection->get_remote_address();
625 auto member_uuid = connection->get_remote_uuid();
626
627 auto socket_remote_address = connection->get_socket().get_remote_endpoint();
628
629 if (!endpoint) {
630 HZ_LOG(logger_,
631 finest,
632 boost::str(boost::format(
633 "Destroying %1% , but it has end-point set to null "
634 "-> not removing it from a connection map") %
635 *connection));
636 return;
637 }
638
639 {
640 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
641
642 if (active_connections_.remove(member_uuid, connection)) {
643 active_connection_ids_.remove(connection->get_connection_id());
644
645 HZ_LOG(logger_,
646 info,
647 boost::str(
648 boost::format(
649 "Removed connection to endpoint: %1%, connection: %2%") %
650 *endpoint % *connection));
651
652 if (active_connections_.empty()) {
653 if (client_state_ == client_state::INITIALIZED_ON_CLUSTER) {
654 fire_life_cycle_event(
655 lifecycle_event::lifecycle_state::CLIENT_DISCONNECTED);
656 }
657
658 client_state_ = client_state::DISCONNECTED_FROM_CLUSTER;
659 trigger_cluster_reconnection();
660 }
661
662 fire_connection_removed_event(connection);
663 } else {
664 HZ_LOG(
665 logger_,
666 finest,
667 boost::str(boost::format(
668 "Destroying a connection, but there is no mapping "
669 "%1% -> %2% in the connection map.") %
670 endpoint % *connection));
671 }
672 }
673}
674
675void
676ClientConnectionManagerImpl::add_connection_listener(
677 const std::shared_ptr<ConnectionListener>& connection_listener)
678{
679 connection_listeners_.add(connection_listener);
680}
681
682ClientConnectionManagerImpl::~ClientConnectionManagerImpl()
683{
684 shutdown();
685}
686
687void
688ClientConnectionManagerImpl::check_client_active()
689{
690 if (!client_.get_lifecycle_service().is_running()) {
691 BOOST_THROW_EXCEPTION(exception::hazelcast_client_not_active(
692 "ClientConnectionManagerImpl::check_client_active",
693 "Client is shutdown"));
694 }
695}
696
697void
698ClientConnectionManagerImpl::initialize_client_on_cluster(
699 boost::uuids::uuid target_cluster_id)
700{
701 if (!client_.get_lifecycle_service().is_running()) {
702 return;
703 }
704
705 try {
706 {
707 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
708
709 if (target_cluster_id != cluster_id_) {
710 logger_.log(
711 hazelcast::logger::level::warning,
712 (boost::format("Won't send client state to cluster: %1%"
713 " Because switched to a new cluster: %2%") %
714 target_cluster_id % cluster_id_)
715 .str());
716
717 return;
718 }
719 }
720
721 client_.get_hazelcast_client_implementation()->send_state_to_cluster();
722
723 {
724 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
725
726 if (target_cluster_id == cluster_id_) {
727 HZ_LOG(logger_,
728 fine,
729 (boost::format("Client state is sent to cluster: %1%") %
730 target_cluster_id)
731 .str());
732
733 client_state_ = client_state::INITIALIZED_ON_CLUSTER;
734 fire_life_cycle_event(lifecycle_event::CLIENT_CONNECTED);
735 } else if (logger_.enabled(hazelcast::logger::level::fine)) {
736 logger_.log(hazelcast::logger::level::warning,
737 (boost::format("Cannot set client state to %1%"
738 " because current cluster id: %2%"
739 " is different than expected cluster"
740 " id: %3%") %
741 client_state::INITIALIZED_ON_CLUSTER %
742 cluster_id_ % target_cluster_id)
743 .str());
744 }
745 }
746 } catch (const std::exception& e) {
747 auto cluster_name = client_.get_client_config().get_cluster_name();
748
749 logger_.log(
750 hazelcast::logger::level::warning,
751 (boost::format("Failure during sending state to the cluster. %1%") %
752 e.what())
753 .str());
754
755 {
756 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
757
758 if (cluster_id_ == target_cluster_id) {
759 if (logger_.enabled(hazelcast::logger::level::fine)) {
760 logger_.log(
761 hazelcast::logger::level::warning,
762 (boost::format(
763 "Retrying sending to the cluster: %1%, name: %2%") %
764 target_cluster_id % cluster_name)
765 .str());
766 }
767
768 auto self = shared_from_this();
769
770 boost::asio::post(
771 executor_->get_executor(), [self, target_cluster_id]() {
772 self->initialize_client_on_cluster(target_cluster_id);
773 });
774 }
775 }
776 }
777}
778
779std::shared_ptr<Connection>
780ClientConnectionManagerImpl::on_authenticated(
781 const std::shared_ptr<Connection>& connection,
782 auth_response& response)
783{
784 {
785 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
786 check_partition_count(response.partition_count);
787 connection->set_connected_server_version(response.server_version);
788 connection->set_remote_address(response.server_address);
789 connection->set_remote_uuid(response.member_uuid);
790
791 auto existing_connection =
792 active_connections_.get(response.member_uuid);
793 if (existing_connection) {
794 connection->close(
795 (boost::format(
796 "Duplicate connection to same member with uuid : %1%") %
797 boost::uuids::to_string(response.member_uuid))
798 .str());
799 return existing_connection;
800 }
801
802 auto new_cluster_id = response.cluster_id;
803 boost::uuids::uuid current_cluster_id = cluster_id_;
804
805 HZ_LOG(logger_,
806 finest,
807 boost::str(boost::format(
808 "Checking the cluster: %1%, current cluster: %2%") %
809 new_cluster_id % current_cluster_id));
810
811 auto cluster_id_changed = !current_cluster_id.is_nil() &&
812 !(new_cluster_id == current_cluster_id);
813 if (cluster_id_changed) {
814 HZ_LOG(
815 logger_,
816 warning,
817 boost::str(
818 boost::format(
819 "Switching from current cluster: %1% to new cluster: %2%") %
820 current_cluster_id % new_cluster_id));
821 client_.get_hazelcast_client_implementation()->on_cluster_restart();
822 }
823
824 auto connections_empty = active_connections_.empty();
825 active_connection_ids_.put(connection->get_connection_id(), connection);
826 active_connections_.put(response.member_uuid, connection);
827 if (connections_empty) {
828 // The first connection that opens a connection to the new cluster
829 // should set `clusterId`. This one will initiate
830 // `initializeClientOnCluster` if necessary.
831 cluster_id_ = new_cluster_id;
832
833 if (established_initial_cluster_connection) {
834 // In split brain, the client might connect to the one half
835 // of the cluster, and then later might reconnect to the
836 // other half, after the half it was connected to is
837 // completely dead. Since the cluster id is preserved in
838 // split brain scenarios, it is impossible to distinguish
839 // reconnection to the same cluster vs reconnection to the
840 // other half of the split brain. However, in the latter,
841 // we might need to send some state to the other half of
842 // the split brain (like Compact schemas or user code
843 // deployment classes). That forces us to send the client
844 // state to the cluster after the first cluster connection,
845 // regardless the cluster id is changed or not.
846 client_state_ = client_state::CONNECTED_TO_CLUSTER;
847 auto self = shared_from_this();
848 boost::asio::post(
849 executor_->get_executor(), [self, new_cluster_id]() {
850 self->initialize_client_on_cluster(new_cluster_id);
851 });
852 } else {
853 established_initial_cluster_connection = true;
854 client_state_ = client_state::INITIALIZED_ON_CLUSTER;
855
856 fire_life_cycle_event(
857 lifecycle_event::lifecycle_state::CLIENT_CONNECTED);
858 }
859 }
860
861 auto local_address = connection->get_local_socket_address();
862 if (local_address) {
863 HZ_LOG(
864 logger_,
865 info,
866 boost::str(
867 boost::format(
868 "Authenticated with server %1%:%2%, server version: %3%, "
869 "local address: %4%. %5%") %
870 response.server_address % response.member_uuid %
871 response.server_version % *local_address % *connection));
872 } else {
873 HZ_LOG(
874 logger_,
875 info,
876 boost::str(
877 boost::format(
878 "Authenticated with server %1%:%2%, server version: %3%, "
879 "no local address: (connection disconnected ?). %4%") %
880 response.server_address % response.member_uuid %
881 response.server_version % *connection));
882 }
883
884 fire_connection_added_event(connection);
885 }
886
887 // It could happen that this connection is already closed and
888 // on_connection_close() is called even before the synchronized block
889 // above is executed. In this case, now we have a closed but registered
890 // connection. We do a final check here to remove this connection
891 // if needed.
892 if (!connection->is_alive()) {
893 on_connection_close(connection);
894 return nullptr;
895 }
896
897 // If the client is shutdown in parallel, we need to close this new
898 // connection.
899 if (!client_.get_lifecycle_service().is_running()) {
900 connection->close("Client is shutdown");
901 }
902
903 return connection;
904}
905
906void
907ClientConnectionManagerImpl::fire_life_cycle_event(
908 lifecycle_event::lifecycle_state state)
909{
910 client_.get_lifecycle_service().fire_lifecycle_event(state);
911}
912
913void
914ClientConnectionManagerImpl::check_partition_count(int32_t new_partition_count)
915{
916 auto& partition_service =
917 static_cast<spi::impl::ClientPartitionServiceImpl&>(
918 client_.get_partition_service());
919 if (!partition_service.check_and_set_partition_count(new_partition_count)) {
920 BOOST_THROW_EXCEPTION(exception::client_not_allowed_in_cluster(
921 "ClientConnectionManagerImpl::check_partition_count",
922 (boost::format(
923 "Client can not work with this cluster because it has a different "
924 "partition count. "
925 "Expected partition count: %1%, Member partition count: %2%") %
926 partition_service.get_partition_count() % new_partition_count)
927 .str()));
928 }
929}
930
931void
932ClientConnectionManagerImpl::trigger_cluster_reconnection()
933{
934 if (reconnect_mode_ ==
935 config::client_connection_strategy_config::reconnect_mode::OFF) {
936 HZ_LOG(
937 logger_, info, "RECONNECT MODE is off. Shutting down the client.");
938 shutdown_with_external_thread(
939 client_.get_hazelcast_client_implementation());
940 return;
941 }
942
943 if (client_.get_lifecycle_service().is_running()) {
944 submit_connect_to_cluster_task();
945 }
946}
947
948std::shared_ptr<Connection>
949ClientConnectionManagerImpl::get_random_connection()
950{
951 if (smart_routing_enabled_) {
952 auto member = load_balancer_.next_(client_.get_cluster());
953 if (!member) {
954 return nullptr;
955 }
956 auto connection = get_connection(member->get_uuid());
957 if (connection) {
958 return connection;
959 }
960 }
961
962 auto connections = active_connections_.values();
963 if (connections.empty()) {
964 return nullptr;
965 }
966
967 return connections[0];
968}
969
970boost::uuids::uuid
971ClientConnectionManagerImpl::get_client_uuid() const
972{
973 return client_uuid_;
974}
975
976void
977ClientConnectionManagerImpl::check_invocation_allowed()
978{
979 client_state state = client_state_;
980 if (state == client_state::INITIALIZED_ON_CLUSTER &&
981 active_connections_.size() > 0) {
982 return;
983 }
984
985 if (state == client_state::INITIAL) {
986 if (async_start_) {
987 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
988 "ClientConnectionManagerImpl::check_invocation_allowed",
989 "No connection found to cluster and async start is configured."));
990 } else {
991 BOOST_THROW_EXCEPTION(exception::io(
992 "No connection found to cluster since the client is starting."));
993 }
994 } else if (reconnect_mode_ == config::client_connection_strategy_config::
995 reconnect_mode::ASYNC) {
996 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
997 "ClientConnectionManagerImpl::check_invocation_allowed",
998 "No connection found to cluster and reconnect mode is async."));
999 } else {
1000 BOOST_THROW_EXCEPTION(
1001 exception::io("ClientConnectionManagerImpl::check_invocation_allowed",
1002 "No connection found to cluster."));
1003 }
1004}
1005
1006bool
1007ClientConnectionManagerImpl::client_initialized_on_cluster() const
1008{
1009 std::lock_guard<std::recursive_mutex> guard{ client_state_mutex_ };
1010
1011 return client_state_ == client_state::INITIALIZED_ON_CLUSTER;
1012}
1013
1014void
1015ClientConnectionManagerImpl::connect_to_all_cluster_members()
1016{
1017 if (!smart_routing_enabled_) {
1018 return;
1019 }
1020
1021 for (const auto& member :
1022 client_.get_client_cluster_service().get_member_list()) {
1023
1024 try {
1025 get_or_connect(member);
1026 } catch (std::exception&) {
1027 // ignore
1028 }
1029 }
1030}
1031
1032void
1033ClientConnectionManagerImpl::notify_backup(int64_t call_id)
1034{
1035 struct correlation_id
1036 {
1037 int32_t connnection_id;
1038 int32_t call_id;
1039 };
1040 union
1041 {
1042 int64_t id;
1043 correlation_id composed_id;
1044 } c_id_union;
1045 c_id_union.id = call_id;
1046 auto connection_id = c_id_union.composed_id.connnection_id;
1047 auto connection = active_connection_ids_.get(connection_id);
1048 if (!connection) {
1049 return;
1050 }
1051 boost::asio::post(connection->get_socket().get_executor(), [=]() {
1052 auto invocation_it = connection->invocations.find(call_id);
1053 if (invocation_it != connection->invocations.end()) {
1054 invocation_it->second->notify_backup();
1055 }
1056 });
1057}
1058
1059std::shared_ptr<Connection>
1060ClientConnectionManagerImpl::connect(const address& addr)
1061{
1062 HZ_LOG(logger_,
1063 info,
1064 boost::str(boost::format("Trying to connect to %1%.") % addr));
1065
1066 auto connection = std::make_shared<Connection>(addr,
1067 client_,
1068 ++connection_id_gen_,
1069 *socket_factory_,
1070 *this,
1071 connection_timeout_millis_);
1072 connection->connect();
1073
1074 // call the interceptor from user thread
1075 socket_interceptor_.connect_(connection->get_socket());
1076
1077 auto result = authenticate_on_cluster(connection);
1078
1079 return on_authenticated(connection, result);
1080}
1081
1082address
1083ClientConnectionManagerImpl::translate(const member& m)
1084{
1085 if (use_public_address_) {
1086 auto public_addr_it = m.address_map().find(PUBLIC_ENDPOINT_QUALIFIER);
1087 if (public_addr_it != m.address_map().end()) {
1088 return public_addr_it->second;
1089 }
1090 return m.get_address();
1091 }
1092
1093 try {
1094 boost::optional<address> addr =
1095 address_provider_->translate(m.get_address());
1096
1097 if (!addr) {
1098 throw exception::hazelcast_(boost::str(
1099 boost::format("Address Provider could not translate %1%") % m));
1100 }
1101
1102 return *addr;
1103 } catch (const exception::hazelcast_&) {
1104 logger_.log(
1105 logger::level::warning,
1106 boost::str(boost::format("Address Provider could not translate %1%") %
1107 m));
1108
1109 throw;
1110 }
1111}
1112
1113std::shared_ptr<connection::Connection>
1114ClientConnectionManagerImpl::connection_for_sql(
1115 std::function<boost::optional<member>()> member_of_large_same_version_group,
1116 std::function<boost::optional<member>(boost::uuids::uuid)> get_cluster_member)
1117{
1118 if (smart_routing_enabled_) {
1119 // There might be a race - the chosen member might be just connected or
1120 // disconnected - try a couple of times, the
1121 // memberOfLargerSameVersionGroup returns a random connection, we might
1122 // be lucky...
1123 for (int i = 0; i < SQL_CONNECTION_RANDOM_ATTEMPTS; i++) {
1124 auto member = member_of_large_same_version_group();
1125 if (!member) {
1126 break;
1127 }
1128 auto connection = active_connections_.get(member->get_uuid());
1129 if (connection) {
1130 return connection;
1131 }
1132 }
1133 }
1134
1135 // Otherwise iterate over connections and return the first one that's not to
1136 // a lite member
1137 std::shared_ptr<connection::Connection> first_connection;
1138 for (const auto& connection_entry : active_connections_.entry_set()) {
1139 if (!first_connection) {
1140 first_connection = connection_entry.second;
1141 }
1142 const auto& member_id = connection_entry.first;
1143 auto member = get_cluster_member(member_id);
1144 if (!member || member->is_lite_member()) {
1145 continue;
1146 }
1147 return connection_entry.second;
1148 }
1149
1150 // Failed to get a connection to a data member
1151 return first_connection;
1152}
1153
1154ReadHandler::ReadHandler(Connection& connection, size_t buffer_size)
1155 : buffer(new char[buffer_size])
1156 , byte_buffer(buffer, buffer_size)
1157 , builder_(connection)
1158 , last_read_time_(std::chrono::steady_clock::now().time_since_epoch())
1159{}
1160
1161ReadHandler::~ReadHandler()
1162{
1163 delete[] buffer;
1164}
1165
1166void
1167ReadHandler::handle()
1168{
1169 last_read_time_ = std::chrono::steady_clock::now().time_since_epoch();
1170
1171 if (byte_buffer.position() == 0)
1172 return;
1173
1174 byte_buffer.flip();
1175
1176 // it is important to check the on_data return value since there may be left
1177 // data less than a message header size, and this may cause an infinite
1178 // loop.
1179 while (byte_buffer.has_remaining() && builder_.on_data(byte_buffer)) {
1180 }
1181
1182 if (byte_buffer.has_remaining()) {
1183 byte_buffer.compact();
1184 } else {
1185 byte_buffer.clear();
1186 }
1187}
1188
1189std::chrono::steady_clock::time_point
1190ReadHandler::get_last_read_time() const
1191{
1192 return std::chrono::steady_clock::time_point{ last_read_time_ };
1193}
1194
1195bool
1196AddressProvider::is_default_provider()
1197{
1198 return false;
1199}
1200
1201Connection::Connection(
1202 const address& address,
1203 spi::ClientContext& client_context,
1204 int connection_id, // NOLINT(cppcoreguidelines-pro-type-member-init)
1205 internal::socket::SocketFactory& socket_factory,
1206 ClientConnectionManagerImpl& client_connection_manager,
1207 std::chrono::milliseconds& connect_timeout_in_millis)
1208 : read_handler(*this, 16 << 10)
1209 , start_time_(std::chrono::system_clock::now())
1210 , closed_time_duration_()
1211 , client_context_(client_context)
1212 , invocation_service_(client_context.get_invocation_service())
1213 , connection_id_(connection_id)
1214 , remote_uuid_(boost::uuids::nil_uuid())
1215 , logger_(client_context.get_logger())
1216 , alive_(true)
1217 , last_write_time_(std::chrono::steady_clock::now().time_since_epoch())
1218{
1219 (void)client_connection_manager;
1220 socket_ = socket_factory.create(address, connect_timeout_in_millis);
1221}
1222
1223Connection::~Connection() = default;
1224
1225void
1226Connection::connect()
1227{
1228 socket_->connect(shared_from_this());
1229 backup_timer_.reset(
1230 new boost::asio::steady_timer(socket_->get_executor().context()));
1231 auto backupTimeout =
1232 static_cast<spi::impl::ClientInvocationServiceImpl&>(invocation_service_)
1233 .get_backup_timeout();
1234 auto this_connection = shared_from_this();
1235 schedule_periodic_backup_cleanup(backupTimeout, this_connection);
1236}
1237
1238void
1239Connection::schedule_periodic_backup_cleanup(
1240 std::chrono::milliseconds backup_timeout,
1241 std::shared_ptr<Connection> this_connection)
1242{
1243 if (!alive_) {
1244 return;
1245 }
1246
1247 backup_timer_->expires_after(backup_timeout);
1248 backup_timer_->async_wait(
1249 socket_->get_executor().wrap([=](boost::system::error_code ec) {
1250 if (ec) {
1251 return;
1252 }
1253 for (const auto& it : this_connection->invocations) {
1254 it.second->detect_and_handle_backup_timeout(backup_timeout);
1255 }
1256
1257 schedule_periodic_backup_cleanup(backup_timeout, this_connection);
1258 }));
1259}
1260
1261void
1262Connection::close()
1263{
1264 close("");
1265}
1266
1267void
1268Connection::close(const std::string& reason)
1269{
1270 close(reason, nullptr);
1271}
1272
1273void
1274Connection::close(const std::string& reason, std::exception_ptr cause)
1275{
1276 bool expected = true;
1277 if (!alive_.compare_exchange_strong(expected, false)) {
1278 return;
1279 }
1280
1281 closed_time_duration_.store(
1282 std::chrono::duration_cast<std::chrono::milliseconds>(
1283 std::chrono::steady_clock::now().time_since_epoch()));
1284
1285 if (backup_timer_) {
1286 backup_timer_->cancel();
1287 }
1288
1289 close_cause_ = cause;
1290 close_reason_ = reason;
1291
1292 log_close();
1293
1294 try {
1295 inner_close();
1296 } catch (exception::iexception& e) {
1297 HZ_LOG(
1298 client_context_.get_logger(),
1299 warning,
1300 boost::str(boost::format("Exception while closing connection %1%") %
1301 e.get_message()));
1302 }
1303
1304 auto thisConnection = shared_from_this();
1305 client_context_.get_connection_manager().on_connection_close(
1306 thisConnection);
1307
1308 boost::asio::post(socket_->get_executor(), [=]() {
1309 for (auto& invocationEntry : thisConnection->invocations) {
1310 invocationEntry.second->notify_exception(std::make_exception_ptr(
1311 boost::enable_current_exception(exception::target_disconnected(
1312 "Connection::close", thisConnection->get_close_reason()))));
1313 }
1314 });
1315}
1316
1317void
1318Connection::write(
1319 const std::shared_ptr<spi::impl::ClientInvocation>& client_invocation)
1320{
1321 socket_->async_write(shared_from_this(), client_invocation);
1322}
1323
1324const boost::optional<address>&
1325Connection::get_remote_address() const
1326{
1327 return remote_address_;
1328}
1329
1330void
1331Connection::set_remote_address(boost::optional<address> endpoint)
1332{
1333 this->remote_address_ = std::move(endpoint);
1334}
1335
1336void
1337Connection::handle_client_message(
1338 const std::shared_ptr<protocol::ClientMessage>& message)
1339{
1340 auto correlationId = message->get_correlation_id();
1341 auto invocationIterator = invocations.find(correlationId);
1342 if (invocationIterator == invocations.end()) {
1343 HZ_LOG(logger_,
1344 warning,
1345 boost::str(boost::format("No invocation for callId: %1%. "
1346 "Dropping this message: %2%") %
1347 correlationId % *message));
1348 return;
1349 }
1350 auto invocation = invocationIterator->second;
1351 auto flags = message->get_header_flags();
1352 if (message->is_flag_set(flags,
1353 protocol::ClientMessage::BACKUP_EVENT_FLAG)) {
1354 message->rd_ptr(protocol::ClientMessage::EVENT_HEADER_LEN);
1355 correlationId = message->get<int64_t>();
1356 client_context_.get_connection_manager().notify_backup(correlationId);
1357 } else if (message->is_flag_set(flags,
1358 protocol::ClientMessage::IS_EVENT_FLAG)) {
1359 client_context_.get_client_listener_service().handle_client_message(
1360 invocation, message);
1361 } else {
1362 invocation_service_.handle_client_message(invocation, message);
1363 }
1364}
1365
1366int32_t
1367Connection::get_connection_id() const
1368{
1369 return connection_id_;
1370}
1371
1372bool
1373Connection::is_alive() const
1374{
1375 return alive_;
1376}
1377
1378const std::string&
1379Connection::get_close_reason() const
1380{
1381 return close_reason_;
1382}
1383
1384void
1385Connection::log_close()
1386{
1387 std::ostringstream message;
1388 message << *this << " closed. Reason: ";
1389 if (!close_reason_.empty()) {
1390 message << close_reason_;
1391 } else if (close_cause_) {
1392 try {
1393 std::rethrow_exception(close_cause_);
1394 } catch (exception::iexception& ie) {
1395 message << ie.get_source() << "[" + ie.get_message() << "]";
1396 }
1397 } else {
1398 message << "Socket explicitly closed";
1399 }
1400
1401 if (client_context_.get_lifecycle_service().is_running()) {
1402 if (!close_cause_) {
1403 HZ_LOG(logger_, info, message.str());
1404 } else {
1405 try {
1406 std::rethrow_exception(close_cause_);
1407 } catch (exception::iexception& ie) {
1408 HZ_LOG(
1409 logger_,
1410 warning,
1411 boost::str(boost::format("%1% %2%") % message.str() % ie));
1412 }
1413 }
1414 } else {
1415 HZ_LOG(
1416 logger_, finest, message.str() + [this]() -> std::string {
1417 if (close_cause_) {
1418 try {
1419 std::rethrow_exception(close_cause_);
1420 } catch (exception::iexception& ie) {
1421 return ie.what();
1422 }
1423 }
1424 return "";
1425 }());
1426 }
1427}
1428
1429bool
1430Connection::operator==(const Connection& rhs) const
1431{
1432 return connection_id_ == rhs.connection_id_;
1433}
1434
1435bool
1436Connection::operator!=(const Connection& rhs) const
1437{
1438 return !(rhs == *this);
1439}
1440
1441const std::string&
1442Connection::get_connected_server_version_string() const
1443{
1444 return connected_server_version_string_;
1445}
1446
1447void
1448Connection::set_connected_server_version(const std::string& connected_server)
1449{
1450 Connection::connected_server_version_string_ = connected_server;
1451}
1452
1453boost::optional<address>
1454Connection::get_local_socket_address() const
1455{
1456 return socket_->local_socket_address();
1457}
1458
1459std::chrono::steady_clock::time_point
1460Connection::last_read_time() const
1461{
1462 return read_handler.get_last_read_time();
1463}
1464
1465void
1466Connection::inner_close()
1467{
1468 if (!socket_) {
1469 return;
1470 }
1471
1472 auto thisConnection = shared_from_this();
1473 boost::asio::post(socket_->get_executor(),
1474 [=]() { thisConnection->socket_->close(); });
1475}
1476
1477std::ostream&
1478operator<<(std::ostream& os, const Connection& connection)
1479{
1480 os << "Connection{"
1481 << "alive=" << connection.is_alive()
1482 << ", connection id=" << connection.get_connection_id()
1483 << ", remote endpoint=";
1484 if (connection.get_remote_address()) {
1485 os << *connection.get_remote_address();
1486 } else {
1487 os << "null";
1488 }
1489 os << ", last_read_time="
1490 << util::StringUtil::time_to_string(connection.last_read_time())
1491 << ", last_write_time="
1492 << util::StringUtil::time_to_string(connection.last_write_time())
1493 << ", closedTime="
1494 << util::StringUtil::time_to_string(
1495 std::chrono::steady_clock::time_point(
1496 std::chrono::duration_cast<std::chrono::steady_clock::duration>(
1497 connection.closed_time_duration_.load())))
1498 << ", connected server version="
1499 << connection.connected_server_version_string_ << '}';
1500
1501 return os;
1502}
1503
1504bool
1505Connection::operator<(const Connection& rhs) const
1506{
1507 return connection_id_ < rhs.connection_id_;
1508}
1509
1510std::chrono::system_clock::time_point
1511Connection::get_start_time() const
1512{
1513 return start_time_;
1514}
1515
1516socket&
1517Connection::get_socket()
1518{
1519 return *socket_;
1520}
1521
1522void
1523Connection::deregister_invocation(int64_t call_id)
1524{
1525 invocations.erase(call_id);
1526}
1527
1528boost::uuids::uuid
1529Connection::get_remote_uuid() const
1530{
1531 return remote_uuid_;
1532}
1533
1534void
1535Connection::set_remote_uuid(boost::uuids::uuid remote_uuid)
1536{
1537 remote_uuid_ = remote_uuid;
1538}
1539
1540void
1541Connection::last_write_time(std::chrono::steady_clock::time_point tp)
1542{
1543 last_write_time_ = tp.time_since_epoch();
1544}
1545
1546std::chrono::steady_clock::time_point
1547Connection::last_write_time() const
1548{
1549 return std::chrono::steady_clock::time_point{ last_write_time_ };
1550}
1551
1552HeartbeatManager::HeartbeatManager(
1553 spi::ClientContext& client,
1554 ClientConnectionManagerImpl& connection_manager)
1555 : client_(client)
1556 , client_connection_manager_(connection_manager)
1557 , logger_(client.get_logger())
1558{
1559 client_properties& clientProperties = client.get_client_properties();
1560 auto timeout_millis =
1561 clientProperties.get_long(clientProperties.get_heartbeat_timeout());
1562 heartbeat_timeout_ = std::chrono::milliseconds(
1563 timeout_millis > 0
1564 ? timeout_millis
1565 : util::IOUtil::to_value<int64_t>(
1566 client_properties::PROP_HEARTBEAT_TIMEOUT_DEFAULT));
1567
1568 auto interval_millis =
1569 clientProperties.get_long(clientProperties.get_heartbeat_interval());
1570 heartbeat_interval_ = std::chrono::milliseconds(
1571 interval_millis > 0
1572 ? interval_millis
1573 : util::IOUtil::to_value<int64_t>(
1574 client_properties::PROP_HEARTBEAT_INTERVAL_DEFAULT));
1575}
1576
1577void
1578HeartbeatManager::start()
1579{
1580 spi::impl::ClientExecutionServiceImpl& clientExecutionService =
1581 client_.get_client_execution_service();
1582
1583 timer_ = clientExecutionService.schedule_with_repetition(
1584 [=]() {
1585 if (!client_connection_manager_.is_alive()) {
1586 return;
1587 }
1588
1589 for (auto& connection :
1590 client_connection_manager_.get_active_connections()) {
1591 check_connection(connection);
1592 }
1593 },
1594 heartbeat_interval_,
1595 heartbeat_interval_);
1596}
1597
1598void
1599HeartbeatManager::check_connection(
1600 const std::shared_ptr<Connection>& connection)
1601{
1602 if (!connection->is_alive()) {
1603 return;
1604 }
1605
1606 auto now = std::chrono::steady_clock::now();
1607 if (now - connection->last_read_time() > heartbeat_timeout_) {
1608 HZ_LOG(logger_,
1609 warning,
1610 boost::str(
1611 boost::format("Heartbeat failed over the connection: %1%") %
1612 *connection));
1613 on_heartbeat_stopped(connection, "Heartbeat timed out");
1614 return;
1615 }
1616
1617 if (now - connection->last_write_time() > heartbeat_interval_) {
1618 auto request = protocol::codec::client_ping_encode();
1619 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
1620 spi::impl::ClientInvocation::create(client_, request, "", connection);
1621 clientInvocation->invoke_urgent();
1622 }
1623}
1624
1625void
1626HeartbeatManager::on_heartbeat_stopped(
1627 const std::shared_ptr<Connection>& connection,
1628 const std::string& reason)
1629{
1630 connection->close(
1631 reason,
1632 std::make_exception_ptr(
1633 (exception::exception_builder<exception::target_disconnected>(
1634 "HeartbeatManager::onHeartbeatStopped")
1635 << "Heartbeat timed out to connection " << *connection)
1636 .build()));
1637}
1638
1639void
1640HeartbeatManager::shutdown()
1641{
1642 if (timer_) {
1643 timer_->cancel();
1644 }
1645}
1646
1647std::chrono::milliseconds
1648HeartbeatManager::get_heartbeat_timeout() const
1649{
1650 return heartbeat_timeout_;
1651}
1652
1653void
1654wait_strategy::reset()
1655{
1656 attempt_ = 0;
1657 cluster_connect_attempt_begin_ = std::chrono::steady_clock::now();
1658 current_backoff_millis_ =
1659 (std::min)(max_backoff_millis_, initial_backoff_millis_);
1660}
1661
1662wait_strategy::wait_strategy(
1663 const config::connection_retry_config& retry_config,
1664 logger& log)
1665 : initial_backoff_millis_(retry_config.get_initial_backoff_duration())
1666 , max_backoff_millis_(retry_config.get_max_backoff_duration())
1667 , multiplier_(retry_config.get_multiplier())
1668 , jitter_(retry_config.get_jitter())
1669 , logger_(log)
1670 , cluster_connect_timeout_millis_(retry_config.get_cluster_connect_timeout())
1671{
1672 if (cluster_connect_timeout_millis_ == std::chrono::milliseconds::max()) {
1673 cluster_connect_timeout_text_ = "INFINITE";
1674 } else {
1675 cluster_connect_timeout_text_ =
1676 (boost::format("%1% msecs") % cluster_connect_timeout_millis_.count())
1677 .str();
1678 }
1679}
1680
1681bool
1682wait_strategy::sleep()
1683{
1684 attempt_++;
1685 using namespace std::chrono;
1686 auto current_time = steady_clock::now();
1687 auto time_passed = duration_cast<milliseconds>(
1688 current_time - cluster_connect_attempt_begin_);
1689 if (time_passed > cluster_connect_timeout_millis_) {
1690 HZ_LOG(
1691 logger_,
1692 warning,
1693 (boost::format("Unable to get live cluster connection, cluster "
1694 "connect timeout (%1%) is reached. Attempt %2%.") %
1695 cluster_connect_timeout_text_ % attempt_)
1696 .str());
1697 return false;
1698 }
1699
1700 // sleep time: current_backoff_millis_(1 +- (jitter * [0, 1]))
1701 auto actual_sleep_time =
1702 current_backoff_millis_ + milliseconds(static_cast<milliseconds::rep>(
1703 current_backoff_millis_.count() * jitter_ *
1704 (2.0 * random_(random_generator_) - 1.0)));
1705
1706 actual_sleep_time = (std::min)(
1707 actual_sleep_time, cluster_connect_timeout_millis_ - time_passed);
1708
1709 HZ_LOG(
1710 logger_,
1711 warning,
1712 (boost::format(
1713 "Unable to get live cluster connection, retry in %1% ms, attempt: %2% "
1714 ", cluster connect timeout: %3% , max backoff millis: %4%") %
1715 actual_sleep_time.count() % attempt_ % cluster_connect_timeout_text_ %
1716 max_backoff_millis_.count())
1717 .str());
1718
1719 std::this_thread::sleep_for(actual_sleep_time);
1720
1721 current_backoff_millis_ =
1722 (std::min)(milliseconds(static_cast<milliseconds::rep>(
1723 current_backoff_millis_.count() * multiplier_)),
1724 max_backoff_millis_);
1725 return true;
1726}
1727} // namespace connection
1728
1729namespace internal {
1730namespace socket {
1731SocketFactory::SocketFactory(spi::ClientContext& client_context,
1732 boost::asio::io_context& io,
1733 boost::asio::ip::tcp::resolver& resolver)
1734 : client_context_(client_context)
1735 , io_(io)
1736 , io_resolver_(resolver)
1737{}
1738
1739bool
1740SocketFactory::start()
1741{
1742#ifdef HZ_BUILD_WITH_SSL
1743 auto& sslConfig =
1744 client_context_.get_client_config().get_network_config().get_ssl_config();
1745 if (sslConfig.is_enabled()) {
1746 if (sslConfig.ssl_context_) {
1747 ssl_context_ = sslConfig.ssl_context_;
1748 } else {
1749 ssl_context_ = std::make_shared<boost::asio::ssl::context>(
1750 (boost::asio::ssl::context_base::method)sslConfig.get_protocol());
1751
1752 ssl_context_->set_verify_mode(boost::asio::ssl::verify_peer);
1753 ssl_context_->set_default_verify_paths();
1754
1755 const std::vector<std::string>& verifyFiles =
1756 sslConfig.get_verify_files();
1757 bool success = true;
1758 logger& lg = client_context_.get_logger();
1759 for (const auto& f : verifyFiles) {
1760 boost::system::error_code ec;
1761 ssl_context_->load_verify_file(f, ec);
1762 if (ec) {
1763 HZ_LOG(
1764 lg,
1765 warning,
1766 boost::str(
1767 boost::format("SocketFactory::start: Failed to load CA "
1768 "verify file at %1% %2%") %
1769 f % ec.message()));
1770 success = false;
1771 }
1772 }
1773
1774 if (!success) {
1775 ssl_context_.reset();
1776 HZ_LOG(lg,
1777 warning,
1778 "SocketFactory::start: Failed to load one or more "
1779 "configured CA verify files (PEM files). Please "
1780 "correct the files and retry.");
1781 return false;
1782 }
1783 }
1784
1785 // set cipher list if the list is set
1786 const std::string& cipherList = sslConfig.get_cipher_list();
1787 if (!cipherList.empty()) {
1788 if (!SSL_CTX_set_cipher_list(ssl_context_->native_handle(),
1789 cipherList.c_str())) {
1790 logger& lg = client_context_.get_logger();
1791 HZ_LOG(lg,
1792 warning,
1793 std::string("SocketFactory::start: Could not load any "
1794 "of the ciphers in the config provided "
1795 "ciphers:") +
1796 cipherList);
1797 return false;
1798 }
1799 }
1800 }
1801#else
1802 (void)client_context_;
1803#endif
1804 return true;
1805}
1806
1807std::unique_ptr<hazelcast::client::socket>
1808SocketFactory::create(const address& address,
1809 std::chrono::milliseconds& connect_timeout_in_millis)
1810{
1811#ifdef HZ_BUILD_WITH_SSL
1812 if (ssl_context_.get()) {
1813 return std::unique_ptr<hazelcast::client::socket>(
1814 new internal::socket::SSLSocket(io_,
1815 *ssl_context_,
1816 address,
1817 client_context_.get_client_config()
1818 .get_network_config()
1819 .get_socket_options(),
1820 connect_timeout_in_millis,
1821 io_resolver_));
1822 }
1823#endif
1824
1825 return std::unique_ptr<hazelcast::client::socket>(
1826 new internal::socket::TcpSocket(io_,
1827 address,
1828 client_context_.get_client_config()
1829 .get_network_config()
1830 .get_socket_options(),
1831 connect_timeout_in_millis,
1832 io_resolver_));
1833}
1834
1835#ifdef HZ_BUILD_WITH_SSL
1836
1837SSLSocket::SSLSocket(boost::asio::io_context& io_service,
1838 boost::asio::ssl::context& ssl_context,
1839 const client::address& address,
1840 client::config::socket_options& socket_options,
1841 std::chrono::milliseconds& connect_timeout_in_millis,
1842 boost::asio::ip::tcp::resolver& resolver)
1843 : BaseSocket<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>(
1844 resolver,
1845 address,
1846 socket_options,
1847 io_service,
1848 connect_timeout_in_millis,
1849 ssl_context)
1850{}
1851
1852std::vector<SSLSocket::CipherInfo>
1853SSLSocket::get_ciphers()
1854{
1855 STACK_OF(SSL_CIPHER)* ciphers = SSL_get_ciphers(socket_.native_handle());
1856 std::vector<CipherInfo> supportedCiphers;
1857 for (int i = 0; i < sk_SSL_CIPHER_num(ciphers); ++i) {
1858 struct SSLSocket::CipherInfo info;
1859 const SSL_CIPHER* cipher = sk_SSL_CIPHER_value(ciphers, i);
1860 info.name = SSL_CIPHER_get_name(cipher);
1861 info.number_of_bits = SSL_CIPHER_get_bits(cipher, 0);
1862 info.version = SSL_CIPHER_get_version(cipher);
1863 char descBuf[256];
1864 info.description = SSL_CIPHER_description(cipher, descBuf, 256);
1865 supportedCiphers.push_back(info);
1866 }
1867 return supportedCiphers;
1868}
1869
1870void
1871SSLSocket::post_connect()
1872{
1873 socket_.handshake(boost::asio::ssl::stream_base::client);
1874}
1875
1876std::ostream&
1877operator<<(std::ostream& out, const SSLSocket::CipherInfo& info)
1878{
1879 out << "Cipher{"
1880 "Name: "
1881 << info.name << ", Bits:" << info.number_of_bits
1882 << ", Version:" << info.version << ", Description:" << info.description
1883 << "}";
1884
1885 return out;
1886}
1887
1888#endif // HZ_BUILD_WITH_SSL
1889
1890TcpSocket::TcpSocket(boost::asio::io_context& io,
1891 const address& address,
1892 client::config::socket_options& socket_options,
1893 std::chrono::milliseconds& connect_timeout_in_millis,
1894 boost::asio::ip::tcp::resolver& resolver)
1895 : BaseSocket<boost::asio::ip::tcp::socket>(resolver,
1896 address,
1897 socket_options,
1898 io,
1899 connect_timeout_in_millis)
1900{}
1901
1902} // namespace socket
1903} // namespace internal
1904} // namespace client
1905} // namespace hazelcast
1906
1907namespace std {
1908std::size_t
1909hash<std::shared_ptr<hazelcast::client::connection::Connection>>::operator()(
1910 const std::shared_ptr<hazelcast::client::connection::Connection>& conn)
1911 const noexcept
1912{
1913 if (!conn) {
1914 return 0;
1915 }
1916 return std::abs(conn->get_connection_id());
1917}
1918} // namespace std
STL namespace.