Hazelcast C++ Client
Hazelcast C++ Client Library
Loading...
Searching...
No Matches
network.cpp
1/*
2 * Copyright (c) 2008-2025, Hazelcast, Inc. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <cstdlib>
18#include <unordered_set>
19
20#include "hazelcast/client/execution_callback.h"
21#include "hazelcast/client/lifecycle_event.h"
22#include "hazelcast/client/connection/AddressProvider.h"
23#include "hazelcast/client/spi/impl/ClientInvocation.h"
24#include "hazelcast/util/Util.h"
25#include "hazelcast/client/protocol/AuthenticationStatus.h"
26#include "hazelcast/client/exception/protocol_exceptions.h"
27#include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
28#include "hazelcast/client/connection/ConnectionListener.h"
29#include "hazelcast/client/connection/Connection.h"
30#include "hazelcast/client/spi/ClientContext.h"
31#include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
32#include "hazelcast/client/serialization/serialization.h"
33#include "hazelcast/client/protocol/UsernamePasswordCredentials.h"
34#include "hazelcast/client/protocol/codec/codecs.h"
35#include "hazelcast/client/client_config.h"
36#include "hazelcast/client/socket_interceptor.h"
37#include "hazelcast/client/config/client_network_config.h"
38#include "hazelcast/client/client_properties.h"
39#include "hazelcast/client/connection/HeartbeatManager.h"
40#include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
41#include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
42#include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
43#include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
44#include "hazelcast/client/internal/socket/TcpSocket.h"
45#include "hazelcast/client/internal/socket/SSLSocket.h"
46#include "hazelcast/client/config/ssl_config.h"
47#include "hazelcast/util/IOUtil.h"
48#include "hazelcast/client/internal/socket/SocketFactory.h"
49#include "hazelcast/client/connection/wait_strategy.h"
50
51namespace hazelcast {
52namespace client {
53namespace connection {
54constexpr size_t ClientConnectionManagerImpl::EXECUTOR_CORE_POOL_SIZE;
55const endpoint_qualifier ClientConnectionManagerImpl::PUBLIC_ENDPOINT_QUALIFIER{
56 CLIENT,
57 "public"
58};
59
60constexpr int ClientConnectionManagerImpl::SQL_CONNECTION_RANDOM_ATTEMPTS;
61
62ClientConnectionManagerImpl::ClientConnectionManagerImpl(
63 spi::ClientContext& client,
64 std::unique_ptr<AddressProvider> address_provider)
65 : alive_(false)
66 , logger_(client.get_logger())
67 , connection_timeout_millis_((std::chrono::milliseconds::max)())
68 , client_(client)
69 , socket_interceptor_(client.get_client_config().get_socket_interceptor())
70 , address_provider_(std::move(address_provider))
71 , connection_id_gen_(0)
72 , heartbeat_(client, *this)
73 , async_start_(client.get_client_config()
74 .get_connection_strategy_config()
75 .is_async_start())
76 , reconnect_mode_(client.get_client_config()
77 .get_connection_strategy_config()
78 .get_reconnect_mode())
79 , smart_routing_enabled_(
80 client.get_client_config().get_network_config().is_smart_routing())
81 , client_uuid_(client.random_uuid())
82 , authentication_timeout_(
83 boost::chrono::milliseconds(heartbeat_.get_heartbeat_timeout().count()))
84 , load_balancer_(client.get_client_config().get_load_balancer())
85 , wait_strategy_(client.get_client_config()
86 .get_connection_strategy_config()
87 .get_retry_config(),
88 logger_)
89 , cluster_id_(boost::uuids::nil_uuid())
90 , client_state_(client_state::INITIAL)
91 , connect_to_cluster_task_submitted_(false)
92 , established_initial_cluster_connection(false)
93 , use_public_address_(
94 address_provider_->is_default_provider() &&
95 client.get_client_config().get_network_config().use_public_address())
96{
97 config::client_network_config& networkConfig =
98 client.get_client_config().get_network_config();
99 auto connTimeout = networkConfig.get_connection_timeout();
100 if (connTimeout.count() > 0) {
101 connection_timeout_millis_ = std::chrono::milliseconds(connTimeout);
102 }
103
104 client_properties& clientProperties = client.get_client_properties();
105 shuffle_member_list_ =
106 clientProperties.get_boolean(clientProperties.get_shuffle_member_list());
107}
108
109bool
110ClientConnectionManagerImpl::start()
111{
112 bool expected = false;
113 if (!alive_.compare_exchange_strong(expected, true)) {
114 return false;
115 }
116
117 io_context_.reset(new boost::asio::io_context);
118 io_resolver_.reset(
119 new boost::asio::ip::tcp::resolver(io_context_->get_executor()));
120 socket_factory_.reset(new internal::socket::SocketFactory(
121 client_, *io_context_, *io_resolver_));
122 io_guard_.reset(new boost::asio::io_context::work(*io_context_));
123
124 if (!socket_factory_->start()) {
125 return false;
126 }
127
128 socket_interceptor_ = client_.get_client_config().get_socket_interceptor();
129
130 io_thread_ = std::thread([=]() { io_context_->run(); });
131
132 executor_.reset(
133 new hazelcast::util::hz_thread_pool(EXECUTOR_CORE_POOL_SIZE));
134 connect_to_members_timer_ =
135 boost::asio::steady_timer(executor_->get_executor());
136
137 heartbeat_.start();
138
139 connect_to_cluster();
140 if (smart_routing_enabled_) {
141 schedule_connect_to_all_members();
142 }
143
144 load_balancer_.init_(client_.get_cluster());
145
146 return true;
147}
148
149void
150ClientConnectionManagerImpl::schedule_connect_to_all_members()
151{
152 if (!client_.get_lifecycle_service().is_running()) {
153 return;
154 }
155
156 connect_to_members_timer_->expires_from_now(
157 boost::asio::chrono::seconds(1));
158 connect_to_members_timer_->async_wait([=](boost::system::error_code ec) {
159 if (ec == boost::asio::error::operation_aborted) {
160 return;
161 }
162 connect_to_all_members();
163
164 schedule_connect_to_all_members();
165 });
166}
167
168void
169ClientConnectionManagerImpl::shutdown()
170{
171 bool expected = true;
172 if (!alive_.compare_exchange_strong(expected, false)) {
173 return;
174 }
175
176 if (connect_to_members_timer_) {
177 connect_to_members_timer_->cancel();
178 }
179
180 heartbeat_.shutdown();
181
182 // close connections
183 for (auto& connection : active_connections_.values()) {
184 // prevent any exceptions
185 util::IOUtil::close_resource(connection.get(),
186 "Hazelcast client is shutting down");
187 }
188
189 spi::impl::ClientExecutionServiceImpl::shutdown_thread_pool(
190 executor_.get());
191
192 // release the guard so that the io thread can stop gracefully
193 io_guard_.reset();
194 io_thread_.join();
195
196 connection_listeners_.clear();
197 active_connections_.clear();
198 active_connection_ids_.clear();
199}
200
201std::shared_ptr<Connection>
202ClientConnectionManagerImpl::get_or_connect(const member& m)
203{
204 const auto& uuid = m.get_uuid();
205 auto connection = active_connections_.get(uuid);
206 if (connection) {
207 return connection;
208 }
209
210 address addr = translate(m);
211 return connect(addr);
212}
213
214std::vector<std::shared_ptr<Connection>>
215ClientConnectionManagerImpl::get_active_connections()
216{
217 return active_connections_.values();
218}
219
220std::shared_ptr<Connection>
221ClientConnectionManagerImpl::get_connection(boost::uuids::uuid uuid)
222{
223 return active_connections_.get(uuid);
224}
225
226ClientConnectionManagerImpl::auth_response
227ClientConnectionManagerImpl::authenticate_on_cluster(
228 std::shared_ptr<Connection>& connection)
229{
230 auto request =
231 encode_authentication_request(client_.get_serialization_service());
232 auto clientInvocation =
233 spi::impl::ClientInvocation::create(client_, request, "", connection);
234 auto f = clientInvocation->invoke_urgent();
235
236 struct auth_response result;
237 try {
238 if (f.wait_for(authentication_timeout_) !=
239 boost::future_status::ready) {
240 BOOST_THROW_EXCEPTION(exception::timeout(
241 "ClientConnectionManagerImpl::authenticate",
242 (boost::format("Authentication response is "
243 "not received for %1% msecs for %2%") %
244 authentication_timeout_.count() % *clientInvocation)
245 .str()));
246 }
247 auto response = f.get();
248 auto* initial_frame =
249 reinterpret_cast<protocol::ClientMessage::frame_header_type*>(
250 response.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN));
251 result = { response.get<byte>(),
252 response.get<boost::uuids::uuid>(),
253 response.get<byte>(),
254 response.get<int32_t>(),
255 response.get<boost::uuids::uuid>() };
256 // skip first frame
257 response.rd_ptr(static_cast<int32_t>(initial_frame->frame_len) -
258 protocol::ClientMessage::RESPONSE_HEADER_LEN -
259 2 * protocol::ClientMessage::UINT8_SIZE -
260 2 * (sizeof(boost::uuids::uuid) +
261 protocol::ClientMessage::UINT8_SIZE) -
262 protocol::ClientMessage::INT32_SIZE);
263
264 result.server_address = response.get_nullable<address>();
265 result.server_version = response.get<std::string>();
266 } catch (exception::iexception&) {
267 connection->close("Failed to authenticate connection",
268 std::current_exception());
269 throw;
270 }
271
272 auto authentication_status = (protocol::authentication_status)result.status;
273 switch (authentication_status) {
274 case protocol::AUTHENTICATED: {
275 return result;
276 }
277 case protocol::CREDENTIALS_FAILED: {
278 auto e = std::make_exception_ptr(exception::authentication(
279 "ClientConnectionManagerImpl::authenticate_on_cluster",
280 "Authentication failed. The configured cluster name on the "
281 "client (see client_config::set_cluster_name()) does not match "
282 "the one configured in the cluster or the credentials set in the "
283 "Client security config could not be authenticated"));
284 connection->close("Failed to authenticate connection", e);
285 std::rethrow_exception(e);
286 }
287 case protocol::NOT_ALLOWED_IN_CLUSTER: {
288 auto e = std::make_exception_ptr(exception::authentication(
289 "ClientConnectionManagerImpl::authenticate_on_cluster",
290 "Client is not allowed in the cluster"));
291 connection->close("Failed to authenticate connection", e);
292 std::rethrow_exception(e);
293 }
294 default: {
295 auto e = std::make_exception_ptr(exception::authentication(
296 "ClientConnectionManagerImpl::authenticate_on_cluster",
297 (boost::format(
298 "Authentication status code not supported. status: %1%") %
299 authentication_status)
300 .str()));
301 connection->close("Failed to authenticate connection", e);
302 std::rethrow_exception(e);
303 }
304 }
305}
306
307std::ostream&
308operator<<(std::ostream& os, ClientConnectionManagerImpl::client_state state)
309{
310 using client_state = ClientConnectionManagerImpl::client_state;
311
312 switch (state) {
313 case client_state::INITIAL:
314 return os << "INITIAL";
315 case client_state::CONNECTED_TO_CLUSTER:
316 return os << "CONNECTED_TO_CLUSTER";
317 case client_state::INITIALIZED_ON_CLUSTER:
318 return os << "INITIALIZED_ON_CLUSTER";
319 case client_state::DISCONNECTED_FROM_CLUSTER:
320 return os << "DISCONNECTED_FROM_CLUSTER";
321 }
322
323 return os;
324}
325
326protocol::ClientMessage
327ClientConnectionManagerImpl::encode_authentication_request(
328 serialization::pimpl::SerializationService& ss)
329{
330 byte serializationVersion = ss.get_version();
331 client_config& clientConfig = client_.get_client_config();
332 auto cluster_name = clientConfig.get_cluster_name();
333
334 auto credential = clientConfig.get_credentials();
335 if (!credential) {
336 return protocol::codec::client_authentication_encode(
337 cluster_name,
338 nullptr,
339 nullptr,
340 client_uuid_,
341 protocol::ClientTypes::CPP,
342 serializationVersion,
343 HAZELCAST_VERSION,
344 client_.get_name(),
345 labels_);
346 }
347
348 switch (credential->type()) {
349 case security::credentials::credential_type::username_password: {
350 auto cr =
351 std::static_pointer_cast<security::username_password_credentials>(
352 credential);
353 return protocol::codec::client_authentication_encode(
354 cluster_name,
355 &cr->name(),
356 &cr->password(),
357 client_uuid_,
358 protocol::ClientTypes::CPP,
359 serializationVersion,
360 HAZELCAST_VERSION,
361 client_.get_name(),
362 labels_);
363 }
364 case security::credentials::credential_type::token: {
365 auto cr =
366 std::static_pointer_cast<security::token_credentials>(credential);
367 return protocol::codec::client_authenticationcustom_encode(
368 cluster_name,
369 cr->token(),
370 client_uuid_,
371 protocol::ClientTypes::CPP,
372 serializationVersion,
373 HAZELCAST_VERSION,
374 client_.get_name(),
375 labels_);
376 }
377 }
378 assert(0);
379 return protocol::ClientMessage();
380}
381
382void
383ClientConnectionManagerImpl::fire_connection_added_event(
384 const std::shared_ptr<Connection>& connection)
385{
386 for (const std::shared_ptr<ConnectionListener>& connectionListener :
387 connection_listeners_.to_array()) {
388 connectionListener->connection_added(connection);
389 }
390}
391
392void
393ClientConnectionManagerImpl::fire_connection_removed_event(
394 const std::shared_ptr<Connection>& connection)
395{
396 for (const auto& listener : connection_listeners_.to_array()) {
397 listener->connection_removed(connection);
398 }
399}
400
401void
402ClientConnectionManagerImpl::shutdown_with_external_thread(
403 std::weak_ptr<client::impl::hazelcast_client_instance_impl> client_impl)
404{
405 std::thread([=] {
406 std::shared_ptr<client::impl::hazelcast_client_instance_impl>
407 clientInstance = client_impl.lock();
408 if (!clientInstance ||
409 !clientInstance->get_lifecycle_service().is_running()) {
410 return;
411 }
412
413 try {
414 clientInstance->get_lifecycle_service().shutdown();
415 } catch (exception::iexception& e) {
416 HZ_LOG(*clientInstance->get_logger(),
417 severe,
418 boost::str(boost::format("Exception during client shutdown "
419 "%1%.clientShutdown-:%2%") %
420 clientInstance->get_name() % e));
421 }
422 }).detach();
423}
424
425void
426ClientConnectionManagerImpl::submit_connect_to_cluster_task()
427{
428 bool expected = false;
429 if (!connect_to_cluster_task_submitted_.compare_exchange_strong(expected,
430 true)) {
431 return;
432 }
433
434 std::weak_ptr<client::impl::hazelcast_client_instance_impl> c =
435 client_.get_hazelcast_client_implementation();
436 boost::asio::post(executor_->get_executor(), [=]() {
437 try {
438 do_connect_to_cluster();
439
440 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
441 connect_to_cluster_task_submitted_ = false;
442 if (active_connections_.empty()) {
443 HZ_LOG(
444 logger_,
445 finest,
446 boost::str(boost::format("No connection to cluster: %1%") %
447 cluster_id_));
448
449 submit_connect_to_cluster_task();
450 }
451
452 } catch (std::exception& e) {
453 HZ_LOG(logger_,
454 warning,
455 boost::str(boost::format("Could not connect to any cluster, "
456 "shutting down the client: %1%") %
457 e.what()));
458
459 shutdown_with_external_thread(c);
460 }
461 });
462}
463
464void
465ClientConnectionManagerImpl::connect_to_all_members()
466{
467 if (!client_.get_lifecycle_service().is_running() ||
468 active_connections_.empty()) {
469 return;
470 }
471
472 for (const auto& m :
473 client_.get_client_cluster_service().get_member_list()) {
474
475 {
476 std::lock_guard<std::recursive_mutex> guard{ client_state_mutex_ };
477
478 if (client_state_ == client_state::DISCONNECTED_FROM_CLUSTER) {
479 // Best effort check to prevent this task from attempting to
480 // open a new connection when the client is either switching
481 // clusters or is not connected to any of the cluster members.
482 // In such occasions, only `do_connect_to_cluster`
483 // method should open new connections.
484 return;
485 }
486 }
487
488 if (client_.get_lifecycle_service().is_running() &&
489 !get_connection(m.get_uuid()) &&
490 connecting_members_.get_or_put_if_absent(m, nullptr).second) {
491 // submit a task for this address only if there is no other pending
492 // connection attempt for it
493 member member_to_connect = m;
494 boost::asio::post(
495 executor_->get_executor(), [member_to_connect, this]() {
496 try {
497 if (!client_.get_lifecycle_service().is_running()) {
498 return;
499 }
500 if (!get_connection(member_to_connect.get_uuid())) {
501 get_or_connect(member_to_connect);
502 }
503 connecting_members_.remove(member_to_connect);
504 } catch (std::exception&) {
505 connecting_members_.remove(member_to_connect);
506 }
507 });
508 }
509 }
510}
511
512void
513ClientConnectionManagerImpl::do_connect_to_cluster()
514{
515 std::unordered_set<address> tried_addresses;
516 wait_strategy_.reset();
517
518 do {
519 std::unordered_set<address> tried_addresses_per_attempt;
520 auto member_list =
521 client_.get_client_cluster_service().get_member_list();
522 if (shuffle_member_list_) {
523 shuffle(member_list);
524 }
525
526 // try to connect to a member in the member list first
527 for (const auto& m : member_list) {
528 check_client_active();
529 tried_addresses_per_attempt.insert(m.get_address());
530 auto connection = try_connect(m);
531 if (connection) {
532 return;
533 }
534 }
535 // try to connect to a member given via config(explicit config/discovery
536 // mechanisms)
537 for (const address& server_address : get_possible_member_addresses()) {
538 check_client_active();
539 if (!tried_addresses_per_attempt.insert(server_address).second) {
540 // if we can not add it means that it is already tried to be
541 // connected with the member list
542 continue;
543 }
544 auto connection = try_connect<address>(server_address);
545 if (connection) {
546 return;
547 }
548 }
549 tried_addresses.insert(tried_addresses_per_attempt.begin(),
550 tried_addresses_per_attempt.end());
551 // If the address provider loads no addresses, then the above loop is
552 // not entered and the lifecycle check is missing, hence we need to
553 // repeat the same check at this point.
554 check_client_active();
555 } while (wait_strategy_.sleep());
556
557 std::ostringstream out;
558 out << "Unable to connect to any address! The following addresses were "
559 "tried: { ";
560 for (const auto& address : tried_addresses) {
561 out << address << " , ";
562 }
563 out << "}";
564 BOOST_THROW_EXCEPTION(exception::illegal_state(
565 "ConnectionManager::do_connect_to_cluster", out.str()));
566}
567
568std::vector<address>
569ClientConnectionManagerImpl::get_possible_member_addresses()
570{
571 std::vector<address> addresses;
572 for (auto&& member :
573 client_.get_client_cluster_service().get_member_list()) {
574 addresses.emplace_back(std::move(member.get_address()));
575 }
576
577 if (shuffle_member_list_) {
578 shuffle(addresses);
579 }
580
581 std::vector<address> provided_addresses =
582 address_provider_->load_addresses();
583
584 if (shuffle_member_list_) {
585 shuffle(provided_addresses);
586 }
587
588 addresses.insert(
589 addresses.end(), provided_addresses.begin(), provided_addresses.end());
590
591 return addresses;
592}
593
594void
595ClientConnectionManagerImpl::connect_to_cluster()
596{
597 if (async_start_) {
598 submit_connect_to_cluster_task();
599 } else {
600 do_connect_to_cluster();
601 }
602}
603
604bool
605ClientConnectionManagerImpl::is_alive()
606{
607 return alive_;
608}
609
610void
611ClientConnectionManagerImpl::on_connection_close(
612 const std::shared_ptr<Connection>& connection)
613{
614 auto endpoint = connection->get_remote_address();
615 auto member_uuid = connection->get_remote_uuid();
616
617 auto socket_remote_address = connection->get_socket().get_remote_endpoint();
618
619 if (!endpoint) {
620 HZ_LOG(logger_,
621 finest,
622 boost::str(boost::format(
623 "Destroying %1% , but it has end-point set to null "
624 "-> not removing it from a connection map") %
625 *connection));
626 return;
627 }
628
629 {
630 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
631
632 if (active_connections_.remove(member_uuid, connection)) {
633 active_connection_ids_.remove(connection->get_connection_id());
634
635 HZ_LOG(logger_,
636 info,
637 boost::str(
638 boost::format(
639 "Removed connection to endpoint: %1%, connection: %2%") %
640 *endpoint % *connection));
641
642 if (active_connections_.empty()) {
643 if (client_state_ == client_state::INITIALIZED_ON_CLUSTER) {
644 fire_life_cycle_event(
645 lifecycle_event::lifecycle_state::CLIENT_DISCONNECTED);
646 }
647
648 client_state_ = client_state::DISCONNECTED_FROM_CLUSTER;
649 trigger_cluster_reconnection();
650 }
651
652 fire_connection_removed_event(connection);
653 } else {
654 HZ_LOG(
655 logger_,
656 finest,
657 boost::str(boost::format(
658 "Destroying a connection, but there is no mapping "
659 "%1% -> %2% in the connection map.") %
660 endpoint % *connection));
661 }
662 }
663}
664
665void
666ClientConnectionManagerImpl::add_connection_listener(
667 const std::shared_ptr<ConnectionListener>& connection_listener)
668{
669 connection_listeners_.add(connection_listener);
670}
671
672ClientConnectionManagerImpl::~ClientConnectionManagerImpl()
673{
674 shutdown();
675}
676
677void
678ClientConnectionManagerImpl::check_client_active()
679{
680 if (!client_.get_lifecycle_service().is_running()) {
681 BOOST_THROW_EXCEPTION(exception::hazelcast_client_not_active(
682 "ClientConnectionManagerImpl::check_client_active",
683 "Client is shutdown"));
684 }
685}
686
687void
688ClientConnectionManagerImpl::initialize_client_on_cluster(
689 boost::uuids::uuid target_cluster_id)
690{
691 if (!client_.get_lifecycle_service().is_running()) {
692 return;
693 }
694
695 try {
696 {
697 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
698
699 if (target_cluster_id != cluster_id_) {
700 logger_.log(
701 hazelcast::logger::level::warning,
702 (boost::format("Won't send client state to cluster: %1%"
703 " Because switched to a new cluster: %2%") %
704 target_cluster_id % cluster_id_)
705 .str());
706
707 return;
708 }
709 }
710
711 client_.get_hazelcast_client_implementation()->send_state_to_cluster();
712
713 {
714 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
715
716 if (target_cluster_id == cluster_id_) {
717 HZ_LOG(logger_,
718 fine,
719 (boost::format("Client state is sent to cluster: %1%") %
720 target_cluster_id)
721 .str());
722
723 client_state_ = client_state::INITIALIZED_ON_CLUSTER;
724 fire_life_cycle_event(lifecycle_event::CLIENT_CONNECTED);
725 } else if (logger_.enabled(hazelcast::logger::level::fine)) {
726 logger_.log(hazelcast::logger::level::warning,
727 (boost::format("Cannot set client state to %1%"
728 " because current cluster id: %2%"
729 " is different than expected cluster"
730 " id: %3%") %
731 client_state::INITIALIZED_ON_CLUSTER %
732 cluster_id_ % target_cluster_id)
733 .str());
734 }
735 }
736 } catch (const std::exception& e) {
737 auto cluster_name = client_.get_client_config().get_cluster_name();
738
739 logger_.log(
740 hazelcast::logger::level::warning,
741 (boost::format("Failure during sending state to the cluster. %1%") %
742 e.what())
743 .str());
744
745 {
746 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
747
748 if (cluster_id_ == target_cluster_id) {
749 if (logger_.enabled(hazelcast::logger::level::fine)) {
750 logger_.log(
751 hazelcast::logger::level::warning,
752 (boost::format(
753 "Retrying sending to the cluster: %1%, name: %2%") %
754 target_cluster_id % cluster_name)
755 .str());
756 }
757
758 auto self = shared_from_this();
759
760 boost::asio::post(
761 executor_->get_executor(), [self, target_cluster_id]() {
762 self->initialize_client_on_cluster(target_cluster_id);
763 });
764 }
765 }
766 }
767}
768
769std::shared_ptr<Connection>
770ClientConnectionManagerImpl::on_authenticated(
771 const std::shared_ptr<Connection>& connection,
772 auth_response& response)
773{
774 {
775 std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
776 check_partition_count(response.partition_count);
777 connection->set_connected_server_version(response.server_version);
778 connection->set_remote_address(response.server_address);
779 connection->set_remote_uuid(response.member_uuid);
780
781 auto existing_connection =
782 active_connections_.get(response.member_uuid);
783 if (existing_connection) {
784 connection->close(
785 (boost::format(
786 "Duplicate connection to same member with uuid : %1%") %
787 boost::uuids::to_string(response.member_uuid))
788 .str());
789 return existing_connection;
790 }
791
792 auto new_cluster_id = response.cluster_id;
793 boost::uuids::uuid current_cluster_id = cluster_id_;
794
795 HZ_LOG(logger_,
796 finest,
797 boost::str(boost::format(
798 "Checking the cluster: %1%, current cluster: %2%") %
799 new_cluster_id % current_cluster_id));
800
801 auto cluster_id_changed = !current_cluster_id.is_nil() &&
802 !(new_cluster_id == current_cluster_id);
803 if (cluster_id_changed) {
804 HZ_LOG(
805 logger_,
806 warning,
807 boost::str(
808 boost::format(
809 "Switching from current cluster: %1% to new cluster: %2%") %
810 current_cluster_id % new_cluster_id));
811 client_.get_hazelcast_client_implementation()->on_cluster_restart();
812 }
813
814 auto connections_empty = active_connections_.empty();
815 active_connection_ids_.put(connection->get_connection_id(), connection);
816 active_connections_.put(response.member_uuid, connection);
817 if (connections_empty) {
818 // The first connection that opens a connection to the new cluster
819 // should set `clusterId`. This one will initiate
820 // `initializeClientOnCluster` if necessary.
821 cluster_id_ = new_cluster_id;
822
823 if (established_initial_cluster_connection) {
824 // In split brain, the client might connect to the one half
825 // of the cluster, and then later might reconnect to the
826 // other half, after the half it was connected to is
827 // completely dead. Since the cluster id is preserved in
828 // split brain scenarios, it is impossible to distinguish
829 // reconnection to the same cluster vs reconnection to the
830 // other half of the split brain. However, in the latter,
831 // we might need to send some state to the other half of
832 // the split brain (like Compact schemas or user code
833 // deployment classes). That forces us to send the client
834 // state to the cluster after the first cluster connection,
835 // regardless the cluster id is changed or not.
836 client_state_ = client_state::CONNECTED_TO_CLUSTER;
837 auto self = shared_from_this();
838 boost::asio::post(
839 executor_->get_executor(), [self, new_cluster_id]() {
840 self->initialize_client_on_cluster(new_cluster_id);
841 });
842 } else {
843 established_initial_cluster_connection = true;
844 client_state_ = client_state::INITIALIZED_ON_CLUSTER;
845
846 fire_life_cycle_event(
847 lifecycle_event::lifecycle_state::CLIENT_CONNECTED);
848 }
849 }
850
851 auto local_address = connection->get_local_socket_address();
852 if (local_address) {
853 HZ_LOG(
854 logger_,
855 info,
856 boost::str(
857 boost::format(
858 "Authenticated with server %1%:%2%, server version: %3%, "
859 "local address: %4%. %5%") %
860 response.server_address % response.member_uuid %
861 response.server_version % *local_address % *connection));
862 } else {
863 HZ_LOG(
864 logger_,
865 info,
866 boost::str(
867 boost::format(
868 "Authenticated with server %1%:%2%, server version: %3%, "
869 "no local address: (connection disconnected ?). %4%") %
870 response.server_address % response.member_uuid %
871 response.server_version % *connection));
872 }
873
874 fire_connection_added_event(connection);
875 }
876
877 // It could happen that this connection is already closed and
878 // on_connection_close() is called even before the synchronized block
879 // above is executed. In this case, now we have a closed but registered
880 // connection. We do a final check here to remove this connection
881 // if needed.
882 if (!connection->is_alive()) {
883 on_connection_close(connection);
884 return nullptr;
885 }
886
887 // If the client is shutdown in parallel, we need to close this new
888 // connection.
889 if (!client_.get_lifecycle_service().is_running()) {
890 connection->close("Client is shutdown");
891 }
892
893 return connection;
894}
895
896void
897ClientConnectionManagerImpl::fire_life_cycle_event(
898 lifecycle_event::lifecycle_state state)
899{
900 client_.get_lifecycle_service().fire_lifecycle_event(state);
901}
902
903void
904ClientConnectionManagerImpl::check_partition_count(int32_t new_partition_count)
905{
906 auto& partition_service =
907 static_cast<spi::impl::ClientPartitionServiceImpl&>(
908 client_.get_partition_service());
909 if (!partition_service.check_and_set_partition_count(new_partition_count)) {
910 BOOST_THROW_EXCEPTION(exception::client_not_allowed_in_cluster(
911 "ClientConnectionManagerImpl::check_partition_count",
912 (boost::format(
913 "Client can not work with this cluster because it has a different "
914 "partition count. "
915 "Expected partition count: %1%, Member partition count: %2%") %
916 partition_service.get_partition_count() % new_partition_count)
917 .str()));
918 }
919}
920
921void
922ClientConnectionManagerImpl::trigger_cluster_reconnection()
923{
924 if (reconnect_mode_ ==
925 config::client_connection_strategy_config::reconnect_mode::OFF) {
926 HZ_LOG(
927 logger_, info, "RECONNECT MODE is off. Shutting down the client.");
928 shutdown_with_external_thread(
929 client_.get_hazelcast_client_implementation());
930 return;
931 }
932
933 if (client_.get_lifecycle_service().is_running()) {
934 submit_connect_to_cluster_task();
935 }
936}
937
938std::shared_ptr<Connection>
939ClientConnectionManagerImpl::get_random_connection()
940{
941 if (smart_routing_enabled_) {
942 auto member = load_balancer_.next_(client_.get_cluster());
943 if (!member) {
944 return nullptr;
945 }
946 auto connection = get_connection(member->get_uuid());
947 if (connection) {
948 return connection;
949 }
950 }
951
952 auto connections = active_connections_.values();
953 if (connections.empty()) {
954 return nullptr;
955 }
956
957 return connections[0];
958}
959
960boost::uuids::uuid
961ClientConnectionManagerImpl::get_client_uuid() const
962{
963 return client_uuid_;
964}
965
966void
967ClientConnectionManagerImpl::check_invocation_allowed()
968{
969 client_state state = client_state_;
970 if (state == client_state::INITIALIZED_ON_CLUSTER &&
971 active_connections_.size() > 0) {
972 return;
973 }
974
975 if (state == client_state::INITIAL) {
976 if (async_start_) {
977 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
978 "ClientConnectionManagerImpl::check_invocation_allowed",
979 "No connection found to cluster and async start is configured."));
980 } else {
981 BOOST_THROW_EXCEPTION(exception::io(
982 "No connection found to cluster since the client is starting."));
983 }
984 } else if (reconnect_mode_ == config::client_connection_strategy_config::
985 reconnect_mode::ASYNC) {
986 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
987 "ClientConnectionManagerImpl::check_invocation_allowed",
988 "No connection found to cluster and reconnect mode is async."));
989 } else {
990 BOOST_THROW_EXCEPTION(
991 exception::io("ClientConnectionManagerImpl::check_invocation_allowed",
992 "No connection found to cluster."));
993 }
994}
995
996bool
997ClientConnectionManagerImpl::client_initialized_on_cluster() const
998{
999 std::lock_guard<std::recursive_mutex> guard{ client_state_mutex_ };
1000
1001 return client_state_ == client_state::INITIALIZED_ON_CLUSTER;
1002}
1003
1004void
1005ClientConnectionManagerImpl::connect_to_all_cluster_members()
1006{
1007 if (!smart_routing_enabled_) {
1008 return;
1009 }
1010
1011 for (const auto& member :
1012 client_.get_client_cluster_service().get_member_list()) {
1013
1014 try {
1015 get_or_connect(member);
1016 } catch (std::exception&) {
1017 // ignore
1018 }
1019 }
1020}
1021
1022void
1023ClientConnectionManagerImpl::notify_backup(int64_t call_id)
1024{
1025 struct correlation_id
1026 {
1027 int32_t connnection_id;
1028 int32_t call_id;
1029 };
1030 union
1031 {
1032 int64_t id;
1033 correlation_id composed_id;
1034 } c_id_union;
1035 c_id_union.id = call_id;
1036 auto connection_id = c_id_union.composed_id.connnection_id;
1037 auto connection = active_connection_ids_.get(connection_id);
1038 if (!connection) {
1039 return;
1040 }
1041 boost::asio::post(connection->get_socket().get_executor(), [=]() {
1042 auto invocation_it = connection->invocations.find(call_id);
1043 if (invocation_it != connection->invocations.end()) {
1044 invocation_it->second->notify_backup();
1045 }
1046 });
1047}
1048
1049std::shared_ptr<Connection>
1050ClientConnectionManagerImpl::connect(const address& addr)
1051{
1052 HZ_LOG(logger_,
1053 info,
1054 boost::str(boost::format("Trying to connect to %1%.") % addr));
1055
1056 auto connection = std::make_shared<Connection>(addr,
1057 client_,
1058 ++connection_id_gen_,
1059 *socket_factory_,
1060 *this,
1061 connection_timeout_millis_);
1062 connection->connect();
1063
1064 // call the interceptor from user thread
1065 socket_interceptor_.connect_(connection->get_socket());
1066
1067 auto result = authenticate_on_cluster(connection);
1068
1069 return on_authenticated(connection, result);
1070}
1071
1072address
1073ClientConnectionManagerImpl::translate(const member& m)
1074{
1075 if (use_public_address_) {
1076 auto public_addr_it = m.address_map().find(PUBLIC_ENDPOINT_QUALIFIER);
1077 if (public_addr_it != m.address_map().end()) {
1078 return public_addr_it->second;
1079 }
1080 return m.get_address();
1081 }
1082
1083 try {
1084 boost::optional<address> addr =
1085 address_provider_->translate(m.get_address());
1086
1087 if (!addr) {
1088 throw exception::hazelcast_(boost::str(
1089 boost::format("Address Provider could not translate %1%") % m));
1090 }
1091
1092 return *addr;
1093 } catch (const exception::hazelcast_&) {
1094 logger_.log(
1095 logger::level::warning,
1096 boost::str(boost::format("Address Provider could not translate %1%") %
1097 m));
1098
1099 throw;
1100 }
1101}
1102
1103std::shared_ptr<connection::Connection>
1104ClientConnectionManagerImpl::connection_for_sql(
1105 std::function<boost::optional<member>()> member_of_large_same_version_group,
1106 std::function<boost::optional<member>(boost::uuids::uuid)> get_cluster_member)
1107{
1108 if (smart_routing_enabled_) {
1109 // There might be a race - the chosen member might be just connected or
1110 // disconnected - try a couple of times, the
1111 // memberOfLargerSameVersionGroup returns a random connection, we might
1112 // be lucky...
1113 for (int i = 0; i < SQL_CONNECTION_RANDOM_ATTEMPTS; i++) {
1114 auto member = member_of_large_same_version_group();
1115 if (!member) {
1116 break;
1117 }
1118 auto connection = active_connections_.get(member->get_uuid());
1119 if (connection) {
1120 return connection;
1121 }
1122 }
1123 }
1124
1125 // Otherwise iterate over connections and return the first one that's not to
1126 // a lite member
1127 std::shared_ptr<connection::Connection> first_connection;
1128 for (const auto& connection_entry : active_connections_.entry_set()) {
1129 if (!first_connection) {
1130 first_connection = connection_entry.second;
1131 }
1132 const auto& member_id = connection_entry.first;
1133 auto member = get_cluster_member(member_id);
1134 if (!member || member->is_lite_member()) {
1135 continue;
1136 }
1137 return connection_entry.second;
1138 }
1139
1140 // Failed to get a connection to a data member
1141 return first_connection;
1142}
1143
1144ReadHandler::ReadHandler(Connection& connection, size_t buffer_size)
1145 : buffer(new char[buffer_size])
1146 , byte_buffer(buffer, buffer_size)
1147 , builder_(connection)
1148 , last_read_time_(std::chrono::steady_clock::now().time_since_epoch())
1149{}
1150
1151ReadHandler::~ReadHandler()
1152{
1153 delete[] buffer;
1154}
1155
1156void
1157ReadHandler::handle()
1158{
1159 last_read_time_ = std::chrono::steady_clock::now().time_since_epoch();
1160
1161 if (byte_buffer.position() == 0)
1162 return;
1163
1164 byte_buffer.flip();
1165
1166 // it is important to check the on_data return value since there may be left
1167 // data less than a message header size, and this may cause an infinite
1168 // loop.
1169 while (byte_buffer.has_remaining() && builder_.on_data(byte_buffer)) {
1170 }
1171
1172 if (byte_buffer.has_remaining()) {
1173 byte_buffer.compact();
1174 } else {
1175 byte_buffer.clear();
1176 }
1177}
1178
1179std::chrono::steady_clock::time_point
1180ReadHandler::get_last_read_time() const
1181{
1182 return std::chrono::steady_clock::time_point{ last_read_time_ };
1183}
1184
1185bool
1186AddressProvider::is_default_provider()
1187{
1188 return false;
1189}
1190
1191Connection::Connection(
1192 const address& address,
1193 spi::ClientContext& client_context,
1194 int connection_id, // NOLINT(cppcoreguidelines-pro-type-member-init)
1195 internal::socket::SocketFactory& socket_factory,
1196 ClientConnectionManagerImpl& client_connection_manager,
1197 std::chrono::milliseconds& connect_timeout_in_millis)
1198 : read_handler(*this, 16 << 10)
1199 , start_time_(std::chrono::system_clock::now())
1200 , closed_time_duration_()
1201 , client_context_(client_context)
1202 , invocation_service_(client_context.get_invocation_service())
1203 , connection_id_(connection_id)
1204 , remote_uuid_(boost::uuids::nil_uuid())
1205 , logger_(client_context.get_logger())
1206 , alive_(true)
1207 , last_write_time_(std::chrono::steady_clock::now().time_since_epoch())
1208{
1209 (void)client_connection_manager;
1210 socket_ = socket_factory.create(address, connect_timeout_in_millis);
1211}
1212
1213Connection::~Connection() = default;
1214
1215void
1216Connection::connect()
1217{
1218 socket_->connect(shared_from_this());
1219 backup_timer_.reset(
1220 new boost::asio::steady_timer(socket_->get_executor().context()));
1221 auto backupTimeout =
1222 static_cast<spi::impl::ClientInvocationServiceImpl&>(invocation_service_)
1223 .get_backup_timeout();
1224 auto this_connection = shared_from_this();
1225 schedule_periodic_backup_cleanup(backupTimeout, this_connection);
1226}
1227
1228void
1229Connection::schedule_periodic_backup_cleanup(
1230 std::chrono::milliseconds backup_timeout,
1231 std::shared_ptr<Connection> this_connection)
1232{
1233 if (!alive_) {
1234 return;
1235 }
1236
1237 backup_timer_->expires_from_now(backup_timeout);
1238 backup_timer_->async_wait(
1239 socket_->get_executor().wrap([=](boost::system::error_code ec) {
1240 if (ec) {
1241 return;
1242 }
1243 for (const auto& it : this_connection->invocations) {
1244 it.second->detect_and_handle_backup_timeout(backup_timeout);
1245 }
1246
1247 schedule_periodic_backup_cleanup(backup_timeout, this_connection);
1248 }));
1249}
1250
1251void
1252Connection::close()
1253{
1254 close("");
1255}
1256
1257void
1258Connection::close(const std::string& reason)
1259{
1260 close(reason, nullptr);
1261}
1262
1263void
1264Connection::close(const std::string& reason, std::exception_ptr cause)
1265{
1266 bool expected = true;
1267 if (!alive_.compare_exchange_strong(expected, false)) {
1268 return;
1269 }
1270
1271 closed_time_duration_.store(
1272 std::chrono::duration_cast<std::chrono::milliseconds>(
1273 std::chrono::steady_clock::now().time_since_epoch()));
1274
1275 if (backup_timer_) {
1276 boost::system::error_code ignored;
1277 backup_timer_->cancel(ignored);
1278 }
1279
1280 close_cause_ = cause;
1281 close_reason_ = reason;
1282
1283 log_close();
1284
1285 try {
1286 inner_close();
1287 } catch (exception::iexception& e) {
1288 HZ_LOG(
1289 client_context_.get_logger(),
1290 warning,
1291 boost::str(boost::format("Exception while closing connection %1%") %
1292 e.get_message()));
1293 }
1294
1295 auto thisConnection = shared_from_this();
1296 client_context_.get_connection_manager().on_connection_close(
1297 thisConnection);
1298
1299 boost::asio::post(socket_->get_executor(), [=]() {
1300 for (auto& invocationEntry : thisConnection->invocations) {
1301 invocationEntry.second->notify_exception(std::make_exception_ptr(
1302 boost::enable_current_exception(exception::target_disconnected(
1303 "Connection::close", thisConnection->get_close_reason()))));
1304 }
1305 });
1306}
1307
1308void
1309Connection::write(
1310 const std::shared_ptr<spi::impl::ClientInvocation>& client_invocation)
1311{
1312 socket_->async_write(shared_from_this(), client_invocation);
1313}
1314
1315const boost::optional<address>&
1316Connection::get_remote_address() const
1317{
1318 return remote_address_;
1319}
1320
1321void
1322Connection::set_remote_address(boost::optional<address> endpoint)
1323{
1324 this->remote_address_ = std::move(endpoint);
1325}
1326
1327void
1328Connection::handle_client_message(
1329 const std::shared_ptr<protocol::ClientMessage>& message)
1330{
1331 auto correlationId = message->get_correlation_id();
1332 auto invocationIterator = invocations.find(correlationId);
1333 if (invocationIterator == invocations.end()) {
1334 HZ_LOG(logger_,
1335 warning,
1336 boost::str(boost::format("No invocation for callId: %1%. "
1337 "Dropping this message: %2%") %
1338 correlationId % *message));
1339 return;
1340 }
1341 auto invocation = invocationIterator->second;
1342 auto flags = message->get_header_flags();
1343 if (message->is_flag_set(flags,
1344 protocol::ClientMessage::BACKUP_EVENT_FLAG)) {
1345 message->rd_ptr(protocol::ClientMessage::EVENT_HEADER_LEN);
1346 correlationId = message->get<int64_t>();
1347 client_context_.get_connection_manager().notify_backup(correlationId);
1348 } else if (message->is_flag_set(flags,
1349 protocol::ClientMessage::IS_EVENT_FLAG)) {
1350 client_context_.get_client_listener_service().handle_client_message(
1351 invocation, message);
1352 } else {
1353 invocation_service_.handle_client_message(invocation, message);
1354 }
1355}
1356
1357int32_t
1358Connection::get_connection_id() const
1359{
1360 return connection_id_;
1361}
1362
1363bool
1364Connection::is_alive() const
1365{
1366 return alive_;
1367}
1368
1369const std::string&
1370Connection::get_close_reason() const
1371{
1372 return close_reason_;
1373}
1374
1375void
1376Connection::log_close()
1377{
1378 std::ostringstream message;
1379 message << *this << " closed. Reason: ";
1380 if (!close_reason_.empty()) {
1381 message << close_reason_;
1382 } else if (close_cause_) {
1383 try {
1384 std::rethrow_exception(close_cause_);
1385 } catch (exception::iexception& ie) {
1386 message << ie.get_source() << "[" + ie.get_message() << "]";
1387 }
1388 } else {
1389 message << "Socket explicitly closed";
1390 }
1391
1392 if (client_context_.get_lifecycle_service().is_running()) {
1393 if (!close_cause_) {
1394 HZ_LOG(logger_, info, message.str());
1395 } else {
1396 try {
1397 std::rethrow_exception(close_cause_);
1398 } catch (exception::iexception& ie) {
1399 HZ_LOG(
1400 logger_,
1401 warning,
1402 boost::str(boost::format("%1% %2%") % message.str() % ie));
1403 }
1404 }
1405 } else {
1406 HZ_LOG(
1407 logger_, finest, message.str() + [this]() -> std::string {
1408 if (close_cause_) {
1409 try {
1410 std::rethrow_exception(close_cause_);
1411 } catch (exception::iexception& ie) {
1412 return ie.what();
1413 }
1414 }
1415 return "";
1416 }());
1417 }
1418}
1419
1420bool
1421Connection::operator==(const Connection& rhs) const
1422{
1423 return connection_id_ == rhs.connection_id_;
1424}
1425
1426bool
1427Connection::operator!=(const Connection& rhs) const
1428{
1429 return !(rhs == *this);
1430}
1431
1432const std::string&
1433Connection::get_connected_server_version_string() const
1434{
1435 return connected_server_version_string_;
1436}
1437
1438void
1439Connection::set_connected_server_version(const std::string& connected_server)
1440{
1441 Connection::connected_server_version_string_ = connected_server;
1442}
1443
1444boost::optional<address>
1445Connection::get_local_socket_address() const
1446{
1447 return socket_->local_socket_address();
1448}
1449
1450std::chrono::steady_clock::time_point
1451Connection::last_read_time() const
1452{
1453 return read_handler.get_last_read_time();
1454}
1455
1456void
1457Connection::inner_close()
1458{
1459 if (!socket_) {
1460 return;
1461 }
1462
1463 auto thisConnection = shared_from_this();
1464 boost::asio::post(socket_->get_executor(),
1465 [=]() { thisConnection->socket_->close(); });
1466}
1467
1468std::ostream&
1469operator<<(std::ostream& os, const Connection& connection)
1470{
1471 os << "Connection{"
1472 << "alive=" << connection.is_alive()
1473 << ", connection id=" << connection.get_connection_id()
1474 << ", remote endpoint=";
1475 if (connection.get_remote_address()) {
1476 os << *connection.get_remote_address();
1477 } else {
1478 os << "null";
1479 }
1480 os << ", last_read_time="
1481 << util::StringUtil::time_to_string(connection.last_read_time())
1482 << ", last_write_time="
1483 << util::StringUtil::time_to_string(connection.last_write_time())
1484 << ", closedTime="
1485 << util::StringUtil::time_to_string(
1486 std::chrono::steady_clock::time_point(
1487 std::chrono::duration_cast<std::chrono::steady_clock::duration>(
1488 connection.closed_time_duration_.load())))
1489 << ", connected server version="
1490 << connection.connected_server_version_string_ << '}';
1491
1492 return os;
1493}
1494
1495bool
1496Connection::operator<(const Connection& rhs) const
1497{
1498 return connection_id_ < rhs.connection_id_;
1499}
1500
1501std::chrono::system_clock::time_point
1502Connection::get_start_time() const
1503{
1504 return start_time_;
1505}
1506
1507socket&
1508Connection::get_socket()
1509{
1510 return *socket_;
1511}
1512
1513void
1514Connection::deregister_invocation(int64_t call_id)
1515{
1516 invocations.erase(call_id);
1517}
1518
1519boost::uuids::uuid
1520Connection::get_remote_uuid() const
1521{
1522 return remote_uuid_;
1523}
1524
1525void
1526Connection::set_remote_uuid(boost::uuids::uuid remote_uuid)
1527{
1528 remote_uuid_ = remote_uuid;
1529}
1530
1531void
1532Connection::last_write_time(std::chrono::steady_clock::time_point tp)
1533{
1534 last_write_time_ = tp.time_since_epoch();
1535}
1536
1537std::chrono::steady_clock::time_point
1538Connection::last_write_time() const
1539{
1540 return std::chrono::steady_clock::time_point{ last_write_time_ };
1541}
1542
1543HeartbeatManager::HeartbeatManager(
1544 spi::ClientContext& client,
1545 ClientConnectionManagerImpl& connection_manager)
1546 : client_(client)
1547 , client_connection_manager_(connection_manager)
1548 , logger_(client.get_logger())
1549{
1550 client_properties& clientProperties = client.get_client_properties();
1551 auto timeout_millis =
1552 clientProperties.get_long(clientProperties.get_heartbeat_timeout());
1553 heartbeat_timeout_ = std::chrono::milliseconds(
1554 timeout_millis > 0
1555 ? timeout_millis
1556 : util::IOUtil::to_value<int64_t>(
1557 client_properties::PROP_HEARTBEAT_TIMEOUT_DEFAULT));
1558
1559 auto interval_millis =
1560 clientProperties.get_long(clientProperties.get_heartbeat_interval());
1561 heartbeat_interval_ = std::chrono::milliseconds(
1562 interval_millis > 0
1563 ? interval_millis
1564 : util::IOUtil::to_value<int64_t>(
1565 client_properties::PROP_HEARTBEAT_INTERVAL_DEFAULT));
1566}
1567
1568void
1569HeartbeatManager::start()
1570{
1571 spi::impl::ClientExecutionServiceImpl& clientExecutionService =
1572 client_.get_client_execution_service();
1573
1574 timer_ = clientExecutionService.schedule_with_repetition(
1575 [=]() {
1576 if (!client_connection_manager_.is_alive()) {
1577 return;
1578 }
1579
1580 for (auto& connection :
1581 client_connection_manager_.get_active_connections()) {
1582 check_connection(connection);
1583 }
1584 },
1585 heartbeat_interval_,
1586 heartbeat_interval_);
1587}
1588
1589void
1590HeartbeatManager::check_connection(
1591 const std::shared_ptr<Connection>& connection)
1592{
1593 if (!connection->is_alive()) {
1594 return;
1595 }
1596
1597 auto now = std::chrono::steady_clock::now();
1598 if (now - connection->last_read_time() > heartbeat_timeout_) {
1599 HZ_LOG(logger_,
1600 warning,
1601 boost::str(
1602 boost::format("Heartbeat failed over the connection: %1%") %
1603 *connection));
1604 on_heartbeat_stopped(connection, "Heartbeat timed out");
1605 return;
1606 }
1607
1608 if (now - connection->last_write_time() > heartbeat_interval_) {
1609 auto request = protocol::codec::client_ping_encode();
1610 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
1611 spi::impl::ClientInvocation::create(client_, request, "", connection);
1612 clientInvocation->invoke_urgent();
1613 }
1614}
1615
1616void
1617HeartbeatManager::on_heartbeat_stopped(
1618 const std::shared_ptr<Connection>& connection,
1619 const std::string& reason)
1620{
1621 connection->close(
1622 reason,
1623 std::make_exception_ptr(
1624 (exception::exception_builder<exception::target_disconnected>(
1625 "HeartbeatManager::onHeartbeatStopped")
1626 << "Heartbeat timed out to connection " << *connection)
1627 .build()));
1628}
1629
1630void
1631HeartbeatManager::shutdown()
1632{
1633 if (timer_) {
1634 boost::system::error_code ignored;
1635 timer_->cancel(ignored);
1636 }
1637}
1638
1639std::chrono::milliseconds
1640HeartbeatManager::get_heartbeat_timeout() const
1641{
1642 return heartbeat_timeout_;
1643}
1644
1645void
1646wait_strategy::reset()
1647{
1648 attempt_ = 0;
1649 cluster_connect_attempt_begin_ = std::chrono::steady_clock::now();
1650 current_backoff_millis_ =
1651 (std::min)(max_backoff_millis_, initial_backoff_millis_);
1652}
1653
1654wait_strategy::wait_strategy(
1655 const config::connection_retry_config& retry_config,
1656 logger& log)
1657 : initial_backoff_millis_(retry_config.get_initial_backoff_duration())
1658 , max_backoff_millis_(retry_config.get_max_backoff_duration())
1659 , multiplier_(retry_config.get_multiplier())
1660 , jitter_(retry_config.get_jitter())
1661 , logger_(log)
1662 , cluster_connect_timeout_millis_(retry_config.get_cluster_connect_timeout())
1663{
1664 if (cluster_connect_timeout_millis_ == std::chrono::milliseconds::max()) {
1665 cluster_connect_timeout_text_ = "INFINITE";
1666 } else {
1667 cluster_connect_timeout_text_ =
1668 (boost::format("%1% msecs") % cluster_connect_timeout_millis_.count())
1669 .str();
1670 }
1671}
1672
1673bool
1674wait_strategy::sleep()
1675{
1676 attempt_++;
1677 using namespace std::chrono;
1678 auto current_time = steady_clock::now();
1679 auto time_passed = duration_cast<milliseconds>(
1680 current_time - cluster_connect_attempt_begin_);
1681 if (time_passed > cluster_connect_timeout_millis_) {
1682 HZ_LOG(
1683 logger_,
1684 warning,
1685 (boost::format("Unable to get live cluster connection, cluster "
1686 "connect timeout (%1%) is reached. Attempt %2%.") %
1687 cluster_connect_timeout_text_ % attempt_)
1688 .str());
1689 return false;
1690 }
1691
1692 // sleep time: current_backoff_millis_(1 +- (jitter * [0, 1]))
1693 auto actual_sleep_time =
1694 current_backoff_millis_ + milliseconds(static_cast<milliseconds::rep>(
1695 current_backoff_millis_.count() * jitter_ *
1696 (2.0 * random_(random_generator_) - 1.0)));
1697
1698 actual_sleep_time = (std::min)(
1699 actual_sleep_time, cluster_connect_timeout_millis_ - time_passed);
1700
1701 HZ_LOG(
1702 logger_,
1703 warning,
1704 (boost::format(
1705 "Unable to get live cluster connection, retry in %1% ms, attempt: %2% "
1706 ", cluster connect timeout: %3% , max backoff millis: %4%") %
1707 actual_sleep_time.count() % attempt_ % cluster_connect_timeout_text_ %
1708 max_backoff_millis_.count())
1709 .str());
1710
1711 std::this_thread::sleep_for(actual_sleep_time);
1712
1713 current_backoff_millis_ =
1714 (std::min)(milliseconds(static_cast<milliseconds::rep>(
1715 current_backoff_millis_.count() * multiplier_)),
1716 max_backoff_millis_);
1717 return true;
1718}
1719} // namespace connection
1720
1721namespace internal {
1722namespace socket {
1723SocketFactory::SocketFactory(spi::ClientContext& client_context,
1724 boost::asio::io_context& io,
1725 boost::asio::ip::tcp::resolver& resolver)
1726 : client_context_(client_context)
1727 , io_(io)
1728 , io_resolver_(resolver)
1729{}
1730
1731bool
1732SocketFactory::start()
1733{
1734#ifdef HZ_BUILD_WITH_SSL
1735 auto& sslConfig =
1736 client_context_.get_client_config().get_network_config().get_ssl_config();
1737 if (sslConfig.is_enabled()) {
1738 if (sslConfig.ssl_context_) {
1739 ssl_context_ = sslConfig.ssl_context_;
1740 } else {
1741 ssl_context_ = std::make_shared<boost::asio::ssl::context>(
1742 (boost::asio::ssl::context_base::method)sslConfig.get_protocol());
1743
1744 ssl_context_->set_verify_mode(boost::asio::ssl::verify_peer);
1745 ssl_context_->set_default_verify_paths();
1746
1747 const std::vector<std::string>& verifyFiles =
1748 sslConfig.get_verify_files();
1749 bool success = true;
1750 logger& lg = client_context_.get_logger();
1751 for (const auto& f : verifyFiles) {
1752 boost::system::error_code ec;
1753 ssl_context_->load_verify_file(f, ec);
1754 if (ec) {
1755 HZ_LOG(
1756 lg,
1757 warning,
1758 boost::str(
1759 boost::format("SocketFactory::start: Failed to load CA "
1760 "verify file at %1% %2%") %
1761 f % ec.message()));
1762 success = false;
1763 }
1764 }
1765
1766 if (!success) {
1767 ssl_context_.reset();
1768 HZ_LOG(lg,
1769 warning,
1770 "SocketFactory::start: Failed to load one or more "
1771 "configured CA verify files (PEM files). Please "
1772 "correct the files and retry.");
1773 return false;
1774 }
1775 }
1776
1777 // set cipher list if the list is set
1778 const std::string& cipherList = sslConfig.get_cipher_list();
1779 if (!cipherList.empty()) {
1780 if (!SSL_CTX_set_cipher_list(ssl_context_->native_handle(),
1781 cipherList.c_str())) {
1782 logger& lg = client_context_.get_logger();
1783 HZ_LOG(lg,
1784 warning,
1785 std::string("SocketFactory::start: Could not load any "
1786 "of the ciphers in the config provided "
1787 "ciphers:") +
1788 cipherList);
1789 return false;
1790 }
1791 }
1792 }
1793#else
1794 (void)client_context_;
1795#endif
1796 return true;
1797}
1798
1799std::unique_ptr<hazelcast::client::socket>
1800SocketFactory::create(const address& address,
1801 std::chrono::milliseconds& connect_timeout_in_millis)
1802{
1803#ifdef HZ_BUILD_WITH_SSL
1804 if (ssl_context_.get()) {
1805 return std::unique_ptr<hazelcast::client::socket>(
1806 new internal::socket::SSLSocket(io_,
1807 *ssl_context_,
1808 address,
1809 client_context_.get_client_config()
1810 .get_network_config()
1811 .get_socket_options(),
1812 connect_timeout_in_millis,
1813 io_resolver_));
1814 }
1815#endif
1816
1817 return std::unique_ptr<hazelcast::client::socket>(
1818 new internal::socket::TcpSocket(io_,
1819 address,
1820 client_context_.get_client_config()
1821 .get_network_config()
1822 .get_socket_options(),
1823 connect_timeout_in_millis,
1824 io_resolver_));
1825}
1826
1827#ifdef HZ_BUILD_WITH_SSL
1828
1829SSLSocket::SSLSocket(boost::asio::io_context& io_service,
1830 boost::asio::ssl::context& ssl_context,
1831 const client::address& address,
1832 client::config::socket_options& socket_options,
1833 std::chrono::milliseconds& connect_timeout_in_millis,
1834 boost::asio::ip::tcp::resolver& resolver)
1835 : BaseSocket<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>(
1836 resolver,
1837 address,
1838 socket_options,
1839 io_service,
1840 connect_timeout_in_millis,
1841 ssl_context)
1842{}
1843
1844std::vector<SSLSocket::CipherInfo>
1845SSLSocket::get_ciphers()
1846{
1847 STACK_OF(SSL_CIPHER)* ciphers = SSL_get_ciphers(socket_.native_handle());
1848 std::vector<CipherInfo> supportedCiphers;
1849 for (int i = 0; i < sk_SSL_CIPHER_num(ciphers); ++i) {
1850 struct SSLSocket::CipherInfo info;
1851 const SSL_CIPHER* cipher = sk_SSL_CIPHER_value(ciphers, i);
1852 info.name = SSL_CIPHER_get_name(cipher);
1853 info.number_of_bits = SSL_CIPHER_get_bits(cipher, 0);
1854 info.version = SSL_CIPHER_get_version(cipher);
1855 char descBuf[256];
1856 info.description = SSL_CIPHER_description(cipher, descBuf, 256);
1857 supportedCiphers.push_back(info);
1858 }
1859 return supportedCiphers;
1860}
1861
1862void
1863SSLSocket::post_connect()
1864{
1865 socket_.handshake(boost::asio::ssl::stream_base::client);
1866}
1867
1868std::ostream&
1869operator<<(std::ostream& out, const SSLSocket::CipherInfo& info)
1870{
1871 out << "Cipher{"
1872 "Name: "
1873 << info.name << ", Bits:" << info.number_of_bits
1874 << ", Version:" << info.version << ", Description:" << info.description
1875 << "}";
1876
1877 return out;
1878}
1879
1880#endif // HZ_BUILD_WITH_SSL
1881
1882TcpSocket::TcpSocket(boost::asio::io_context& io,
1883 const address& address,
1884 client::config::socket_options& socket_options,
1885 std::chrono::milliseconds& connect_timeout_in_millis,
1886 boost::asio::ip::tcp::resolver& resolver)
1887 : BaseSocket<boost::asio::ip::tcp::socket>(resolver,
1888 address,
1889 socket_options,
1890 io,
1891 connect_timeout_in_millis)
1892{}
1893
1894} // namespace socket
1895} // namespace internal
1896} // namespace client
1897} // namespace hazelcast
1898
1899namespace std {
1900std::size_t
1901hash<std::shared_ptr<hazelcast::client::connection::Connection>>::operator()(
1902 const std::shared_ptr<hazelcast::client::connection::Connection>& conn)
1903 const noexcept
1904{
1905 if (!conn) {
1906 return 0;
1907 }
1908 return std::abs(conn->get_connection_id());
1909}
1910} // namespace std
STL namespace.