Hazelcast C++ Client
Hazelcast C++ Client Library
network.cpp
1 /*
2  * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <cstdlib>
18 #include <unordered_set>
19 
20 #include "hazelcast/client/execution_callback.h"
21 #include "hazelcast/client/lifecycle_event.h"
22 #include "hazelcast/client/connection/AddressProvider.h"
23 #include "hazelcast/client/spi/impl/ClientInvocation.h"
24 #include "hazelcast/util/Util.h"
25 #include "hazelcast/client/protocol/AuthenticationStatus.h"
26 #include "hazelcast/client/exception/protocol_exceptions.h"
27 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
28 #include "hazelcast/client/connection/ConnectionListener.h"
29 #include "hazelcast/client/connection/Connection.h"
30 #include "hazelcast/client/spi/ClientContext.h"
31 #include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
32 #include "hazelcast/client/serialization/serialization.h"
33 #include "hazelcast/client/protocol/UsernamePasswordCredentials.h"
34 #include "hazelcast/client/protocol/codec/codecs.h"
35 #include "hazelcast/client/client_config.h"
36 #include "hazelcast/client/socket_interceptor.h"
37 #include "hazelcast/client/config/client_network_config.h"
38 #include "hazelcast/client/client_properties.h"
39 #include "hazelcast/client/connection/HeartbeatManager.h"
40 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
41 #include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
42 #include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
43 #include "hazelcast/client/internal/socket/TcpSocket.h"
44 #include "hazelcast/client/internal/socket/SSLSocket.h"
45 #include "hazelcast/client/config/ssl_config.h"
46 #include "hazelcast/util/IOUtil.h"
47 #include "hazelcast/client/internal/socket/SocketFactory.h"
48 #include "hazelcast/client/connection/wait_strategy.h"
49 
50 namespace hazelcast {
51 namespace client {
52 namespace connection {
53 constexpr size_t ClientConnectionManagerImpl::EXECUTOR_CORE_POOL_SIZE;
54 const endpoint_qualifier ClientConnectionManagerImpl::PUBLIC_ENDPOINT_QUALIFIER{
55  CLIENT,
56  "public"
57 };
58 
59 constexpr int ClientConnectionManagerImpl::SQL_CONNECTION_RANDOM_ATTEMPTS;
60 
61 ClientConnectionManagerImpl::ClientConnectionManagerImpl(
62  spi::ClientContext& client,
63  std::unique_ptr<AddressProvider> address_provider)
64  : alive_(false)
65  , logger_(client.get_logger())
66  , connection_timeout_millis_((std::chrono::milliseconds::max)())
67  , client_(client)
68  , socket_interceptor_(client.get_client_config().get_socket_interceptor())
69  , address_provider_(std::move(address_provider))
70  , connection_id_gen_(0)
71  , heartbeat_(client, *this)
72  , async_start_(client.get_client_config()
73  .get_connection_strategy_config()
74  .is_async_start())
75  , reconnect_mode_(client.get_client_config()
76  .get_connection_strategy_config()
77  .get_reconnect_mode())
78  , smart_routing_enabled_(
79  client.get_client_config().get_network_config().is_smart_routing())
80  , client_uuid_(client.random_uuid())
81  , authentication_timeout_(
82  boost::chrono::milliseconds(heartbeat_.get_heartbeat_timeout().count()))
83  , load_balancer_(client.get_client_config().get_load_balancer())
84  , wait_strategy_(client.get_client_config()
85  .get_connection_strategy_config()
86  .get_retry_config(),
87  logger_)
88  , cluster_id_(boost::uuids::nil_uuid())
89  , connect_to_cluster_task_submitted_(false)
90  , use_public_address_(
91  address_provider_->is_default_provider() &&
92  client.get_client_config().get_network_config().use_public_address())
93 {
94  config::client_network_config& networkConfig =
95  client.get_client_config().get_network_config();
96  auto connTimeout = networkConfig.get_connection_timeout();
97  if (connTimeout.count() > 0) {
98  connection_timeout_millis_ = std::chrono::milliseconds(connTimeout);
99  }
100 
101  client_properties& clientProperties = client.get_client_properties();
102  shuffle_member_list_ =
103  clientProperties.get_boolean(clientProperties.get_shuffle_member_list());
104 }
105 
106 bool
107 ClientConnectionManagerImpl::start()
108 {
109  bool expected = false;
110  if (!alive_.compare_exchange_strong(expected, true)) {
111  return false;
112  }
113 
114  io_context_.reset(new boost::asio::io_context);
115  io_resolver_.reset(
116  new boost::asio::ip::tcp::resolver(io_context_->get_executor()));
117  socket_factory_.reset(new internal::socket::SocketFactory(
118  client_, *io_context_, *io_resolver_));
119  io_guard_.reset(new boost::asio::io_context::work(*io_context_));
120 
121  if (!socket_factory_->start()) {
122  return false;
123  }
124 
125  socket_interceptor_ = client_.get_client_config().get_socket_interceptor();
126 
127  io_thread_ = std::thread([=]() { io_context_->run(); });
128 
129  executor_.reset(
130  new hazelcast::util::hz_thread_pool(EXECUTOR_CORE_POOL_SIZE));
131  connect_to_members_timer_ =
132  boost::asio::steady_timer(executor_->get_executor());
133 
134  heartbeat_.start();
135 
136  connect_to_cluster();
137  if (smart_routing_enabled_) {
138  schedule_connect_to_all_members();
139  }
140 
141  load_balancer_.init_(client_.get_cluster());
142 
143  return true;
144 }
145 
146 void
147 ClientConnectionManagerImpl::schedule_connect_to_all_members()
148 {
149  if (!client_.get_lifecycle_service().is_running()) {
150  return;
151  }
152 
153  connect_to_members_timer_->expires_from_now(
154  boost::asio::chrono::seconds(1));
155  connect_to_members_timer_->async_wait([=](boost::system::error_code ec) {
156  if (ec == boost::asio::error::operation_aborted) {
157  return;
158  }
159  connect_to_all_members();
160 
161  schedule_connect_to_all_members();
162  });
163 }
164 
165 void
166 ClientConnectionManagerImpl::shutdown()
167 {
168  bool expected = true;
169  if (!alive_.compare_exchange_strong(expected, false)) {
170  return;
171  }
172 
173  if (connect_to_members_timer_) {
174  connect_to_members_timer_->cancel();
175  }
176 
177  heartbeat_.shutdown();
178 
179  // close connections
180  for (auto& connection : active_connections_.values()) {
181  // prevent any exceptions
182  util::IOUtil::close_resource(connection.get(),
183  "Hazelcast client is shutting down");
184  }
185 
186  spi::impl::ClientExecutionServiceImpl::shutdown_thread_pool(
187  executor_.get());
188 
189  // release the guard so that the io thread can stop gracefully
190  io_guard_.reset();
191  io_thread_.join();
192 
193  connection_listeners_.clear();
194  active_connections_.clear();
195  active_connection_ids_.clear();
196 }
197 
198 std::shared_ptr<Connection>
199 ClientConnectionManagerImpl::get_or_connect(const member& m)
200 {
201  const auto& uuid = m.get_uuid();
202  auto connection = active_connections_.get(uuid);
203  if (connection) {
204  return connection;
205  }
206 
207  address addr = translate(m);
208  return connect(addr);
209 }
210 
211 std::vector<std::shared_ptr<Connection>>
212 ClientConnectionManagerImpl::get_active_connections()
213 {
214  return active_connections_.values();
215 }
216 
217 std::shared_ptr<Connection>
218 ClientConnectionManagerImpl::get_connection(boost::uuids::uuid uuid)
219 {
220  return active_connections_.get(uuid);
221 }
222 
223 ClientConnectionManagerImpl::auth_response
224 ClientConnectionManagerImpl::authenticate_on_cluster(
225  std::shared_ptr<Connection>& connection)
226 {
227  auto request =
228  encode_authentication_request(client_.get_serialization_service());
229  auto clientInvocation =
230  spi::impl::ClientInvocation::create(client_, request, "", connection);
231  auto f = clientInvocation->invoke_urgent();
232 
233  struct auth_response result;
234  try {
235  if (f.wait_for(authentication_timeout_) !=
236  boost::future_status::ready) {
237  BOOST_THROW_EXCEPTION(exception::timeout(
238  "ClientConnectionManagerImpl::authenticate",
239  (boost::format("Authentication response is "
240  "not received for %1% msecs for %2%") %
241  authentication_timeout_.count() % *clientInvocation)
242  .str()));
243  }
244  auto response = f.get();
245  auto* initial_frame =
246  reinterpret_cast<protocol::ClientMessage::frame_header_type*>(
247  response.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN));
248  result = { response.get<byte>(),
249  response.get<boost::uuids::uuid>(),
250  response.get<byte>(),
251  response.get<int32_t>(),
252  response.get<boost::uuids::uuid>() };
253  // skip first frame
254  response.rd_ptr(static_cast<int32_t>(initial_frame->frame_len) -
255  protocol::ClientMessage::RESPONSE_HEADER_LEN -
256  2 * protocol::ClientMessage::UINT8_SIZE -
257  2 * (sizeof(boost::uuids::uuid) +
258  protocol::ClientMessage::UINT8_SIZE) -
259  protocol::ClientMessage::INT32_SIZE);
260 
261  result.server_address = response.get_nullable<address>();
262  result.server_version = response.get<std::string>();
263  } catch (exception::iexception&) {
264  connection->close("Failed to authenticate connection",
265  std::current_exception());
266  throw;
267  }
268 
269  auto authentication_status = (protocol::authentication_status)result.status;
270  switch (authentication_status) {
271  case protocol::AUTHENTICATED: {
272  return result;
273  }
274  case protocol::CREDENTIALS_FAILED: {
275  auto e = std::make_exception_ptr(exception::authentication(
276  "ClientConnectionManagerImpl::authenticate_on_cluster",
277  "Authentication failed. The configured cluster name on the "
278  "client (see client_config::set_cluster_name()) does not match "
279  "the one configured in the cluster or the credentials set in the "
280  "Client security config could not be authenticated"));
281  connection->close("Failed to authenticate connection", e);
282  std::rethrow_exception(e);
283  }
284  case protocol::NOT_ALLOWED_IN_CLUSTER: {
285  auto e = std::make_exception_ptr(exception::authentication(
286  "ClientConnectionManagerImpl::authenticate_on_cluster",
287  "Client is not allowed in the cluster"));
288  connection->close("Failed to authenticate connection", e);
289  std::rethrow_exception(e);
290  }
291  default: {
292  auto e = std::make_exception_ptr(exception::authentication(
293  "ClientConnectionManagerImpl::authenticate_on_cluster",
294  (boost::format(
295  "Authentication status code not supported. status: %1%") %
296  authentication_status)
297  .str()));
298  connection->close("Failed to authenticate connection", e);
299  std::rethrow_exception(e);
300  }
301  }
302 }
303 
304 protocol::ClientMessage
305 ClientConnectionManagerImpl::encode_authentication_request(
306  serialization::pimpl::SerializationService& ss)
307 {
308  byte serializationVersion = ss.get_version();
309  client_config& clientConfig = client_.get_client_config();
310  auto cluster_name = clientConfig.get_cluster_name();
311 
312  auto credential = clientConfig.get_credentials();
313  if (!credential) {
314  return protocol::codec::client_authentication_encode(
315  cluster_name,
316  nullptr,
317  nullptr,
318  client_uuid_,
319  protocol::ClientTypes::CPP,
320  serializationVersion,
321  HAZELCAST_VERSION,
322  client_.get_name(),
323  labels_);
324  }
325 
326  switch (credential->type()) {
327  case security::credentials::credential_type::username_password: {
328  auto cr =
329  std::static_pointer_cast<security::username_password_credentials>(
330  credential);
331  return protocol::codec::client_authentication_encode(
332  cluster_name,
333  &cr->name(),
334  &cr->password(),
335  client_uuid_,
336  protocol::ClientTypes::CPP,
337  serializationVersion,
338  HAZELCAST_VERSION,
339  client_.get_name(),
340  labels_);
341  }
342  case security::credentials::credential_type::token: {
343  auto cr =
344  std::static_pointer_cast<security::token_credentials>(credential);
345  return protocol::codec::client_authenticationcustom_encode(
346  cluster_name,
347  cr->token(),
348  client_uuid_,
349  protocol::ClientTypes::CPP,
350  serializationVersion,
351  HAZELCAST_VERSION,
352  client_.get_name(),
353  labels_);
354  }
355  }
356  assert(0);
357  return protocol::ClientMessage();
358 }
359 
360 void
361 ClientConnectionManagerImpl::fire_connection_added_event(
362  const std::shared_ptr<Connection>& connection)
363 {
364  for (const std::shared_ptr<ConnectionListener>& connectionListener :
365  connection_listeners_.to_array()) {
366  connectionListener->connection_added(connection);
367  }
368 }
369 
370 void
371 ClientConnectionManagerImpl::fire_connection_removed_event(
372  const std::shared_ptr<Connection>& connection)
373 {
374  for (const auto& listener : connection_listeners_.to_array()) {
375  listener->connection_removed(connection);
376  }
377 }
378 
379 void
380 ClientConnectionManagerImpl::shutdown_with_external_thread(
381  std::weak_ptr<client::impl::hazelcast_client_instance_impl> client_impl)
382 {
383  std::thread([=] {
384  std::shared_ptr<client::impl::hazelcast_client_instance_impl>
385  clientInstance = client_impl.lock();
386  if (!clientInstance ||
387  !clientInstance->get_lifecycle_service().is_running()) {
388  return;
389  }
390 
391  try {
392  clientInstance->get_lifecycle_service().shutdown();
393  } catch (exception::iexception& e) {
394  HZ_LOG(*clientInstance->get_logger(),
395  severe,
396  boost::str(boost::format("Exception during client shutdown "
397  "%1%.clientShutdown-:%2%") %
398  clientInstance->get_name() % e));
399  }
400  }).detach();
401 }
402 
403 void
404 ClientConnectionManagerImpl::submit_connect_to_cluster_task()
405 {
406  bool expected = false;
407  if (!connect_to_cluster_task_submitted_.compare_exchange_strong(expected,
408  true)) {
409  return;
410  }
411 
412  std::weak_ptr<client::impl::hazelcast_client_instance_impl> c =
413  client_.get_hazelcast_client_implementation();
414  boost::asio::post(executor_->get_executor(), [=]() {
415  try {
416  do_connect_to_cluster();
417 
418  std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
419  connect_to_cluster_task_submitted_ = false;
420  if (active_connections_.empty()) {
421  HZ_LOG(
422  logger_,
423  finest,
424  boost::str(boost::format("No connection to cluster: %1%") %
425  cluster_id_));
426 
427  submit_connect_to_cluster_task();
428  }
429 
430  } catch (std::exception& e) {
431  HZ_LOG(logger_,
432  warning,
433  boost::str(boost::format("Could not connect to any cluster, "
434  "shutting down the client: %1%") %
435  e.what()));
436 
437  shutdown_with_external_thread(c);
438  }
439  });
440 }
441 
442 void
443 ClientConnectionManagerImpl::connect_to_all_members()
444 {
445  if (!client_.get_lifecycle_service().is_running() ||
446  active_connections_.empty()) {
447  return;
448  }
449 
450  for (const auto& m :
451  client_.get_client_cluster_service().get_member_list()) {
452  if (client_.get_lifecycle_service().is_running() &&
453  !get_connection(m.get_uuid()) &&
454  connecting_members_.get_or_put_if_absent(m, nullptr).second) {
455  // submit a task for this address only if there is no other pending
456  // connection attempt for it
457  member member_to_connect = m;
458  boost::asio::post(
459  executor_->get_executor(), [member_to_connect, this]() {
460  try {
461  if (!client_.get_lifecycle_service().is_running()) {
462  return;
463  }
464  if (!get_connection(member_to_connect.get_uuid())) {
465  get_or_connect(member_to_connect);
466  }
467  connecting_members_.remove(member_to_connect);
468  } catch (std::exception&) {
469  connecting_members_.remove(member_to_connect);
470  }
471  });
472  }
473  }
474 }
475 
476 bool
477 ClientConnectionManagerImpl::do_connect_to_cluster()
478 {
479  std::unordered_set<address> tried_addresses;
480  wait_strategy_.reset();
481 
482  do {
483  std::unordered_set<address> tried_addresses_per_attempt;
484  auto member_list =
485  client_.get_client_cluster_service().get_member_list();
486  if (shuffle_member_list_) {
487  shuffle(member_list);
488  }
489 
490  // try to connect to a member in the member list first
491  for (const auto& m : member_list) {
492  check_client_active();
493  tried_addresses_per_attempt.insert(m.get_address());
494  auto connection = try_connect(m);
495  if (connection) {
496  return true;
497  }
498  }
499  // try to connect to a member given via config(explicit config/discovery
500  // mechanisms)
501  for (const address& server_address : get_possible_member_addresses()) {
502  check_client_active();
503  if (!tried_addresses_per_attempt.insert(server_address).second) {
504  // if we can not add it means that it is already tried to be
505  // connected with the member list
506  continue;
507  }
508  auto connection = try_connect<address>(server_address);
509  if (connection) {
510  return true;
511  }
512  }
513  tried_addresses.insert(tried_addresses_per_attempt.begin(),
514  tried_addresses_per_attempt.end());
515  // If the address provider loads no addresses, then the above loop is
516  // not entered and the lifecycle check is missing, hence we need to
517  // repeat the same check at this point.
518  check_client_active();
519  } while (wait_strategy_.sleep());
520 
521  std::ostringstream out;
522  out << "Unable to connect to any address! The following addresses were "
523  "tried: { ";
524  for (const auto& address : tried_addresses) {
525  out << address << " , ";
526  }
527  out << "}";
528  BOOST_THROW_EXCEPTION(exception::illegal_state(
529  "ConnectionManager::do_connect_to_cluster", out.str()));
530 }
531 
532 std::vector<address>
533 ClientConnectionManagerImpl::get_possible_member_addresses()
534 {
535  std::vector<address> addresses;
536  for (auto&& member :
537  client_.get_client_cluster_service().get_member_list()) {
538  addresses.emplace_back(std::move(member.get_address()));
539  }
540 
541  if (shuffle_member_list_) {
542  shuffle(addresses);
543  }
544 
545  std::vector<address> provided_addresses =
546  address_provider_->load_addresses();
547 
548  if (shuffle_member_list_) {
549  shuffle(provided_addresses);
550  }
551 
552  addresses.insert(
553  addresses.end(), provided_addresses.begin(), provided_addresses.end());
554 
555  return addresses;
556 }
557 
558 void
559 ClientConnectionManagerImpl::connect_to_cluster()
560 {
561  if (async_start_) {
562  submit_connect_to_cluster_task();
563  } else {
564  do_connect_to_cluster();
565  }
566 }
567 
568 bool
569 ClientConnectionManagerImpl::is_alive()
570 {
571  return alive_;
572 }
573 
574 void
575 ClientConnectionManagerImpl::on_connection_close(
576  const std::shared_ptr<Connection>& connection)
577 {
578  auto endpoint = connection->get_remote_address();
579  auto member_uuid = connection->get_remote_uuid();
580 
581  auto socket_remote_address = connection->get_socket().get_remote_endpoint();
582 
583  if (!endpoint) {
584  HZ_LOG(logger_,
585  finest,
586  boost::str(boost::format(
587  "Destroying %1% , but it has end-point set to null "
588  "-> not removing it from a connection map") %
589  *connection));
590  return;
591  }
592 
593  std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
594  if (active_connections_.remove(member_uuid, connection)) {
595  active_connection_ids_.remove(connection->get_connection_id());
596 
597  HZ_LOG(
598  logger_,
599  info,
600  boost::str(boost::format(
601  "Removed connection to endpoint: %1%, connection: %2%") %
602  *endpoint % *connection));
603 
604  if (active_connections_.empty()) {
605  fire_life_cycle_event(
606  lifecycle_event::lifecycle_state::CLIENT_DISCONNECTED);
607 
608  trigger_cluster_reconnection();
609  }
610 
611  fire_connection_removed_event(connection);
612  } else {
613  HZ_LOG(logger_,
614  finest,
615  boost::str(boost::format(
616  "Destroying a connection, but there is no mapping "
617  "%1% -> %2% in the connection map.") %
618  endpoint % *connection));
619  }
620 }
621 
622 void
623 ClientConnectionManagerImpl::add_connection_listener(
624  const std::shared_ptr<ConnectionListener>& connection_listener)
625 {
626  connection_listeners_.add(connection_listener);
627 }
628 
629 ClientConnectionManagerImpl::~ClientConnectionManagerImpl()
630 {
631  shutdown();
632 }
633 
634 logger&
635 ClientConnectionManagerImpl::get_logger()
636 {
637  return client_.get_logger();
638 }
639 
640 void
641 ClientConnectionManagerImpl::check_client_active()
642 {
643  if (!client_.get_lifecycle_service().is_running()) {
644  BOOST_THROW_EXCEPTION(exception::hazelcast_client_not_active(
645  "ClientConnectionManagerImpl::check_client_active",
646  "Client is shutdown"));
647  }
648 }
649 
650 std::shared_ptr<Connection>
651 ClientConnectionManagerImpl::on_authenticated(
652  const std::shared_ptr<Connection>& connection,
653  auth_response& response)
654 {
655  {
656  std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
657  check_partition_count(response.partition_count);
658  connection->set_connected_server_version(response.server_version);
659  connection->set_remote_address(response.server_address);
660  connection->set_remote_uuid(response.member_uuid);
661 
662  auto existing_connection =
663  active_connections_.get(response.member_uuid);
664  if (existing_connection) {
665  connection->close(
666  (boost::format(
667  "Duplicate connection to same member with uuid : %1%") %
668  boost::uuids::to_string(response.member_uuid))
669  .str());
670  return existing_connection;
671  }
672 
673  auto new_cluster_id = response.cluster_id;
674  boost::uuids::uuid current_cluster_id = cluster_id_;
675 
676  HZ_LOG(logger_,
677  finest,
678  boost::str(boost::format(
679  "Checking the cluster: %1%, current cluster: %2%") %
680  new_cluster_id % current_cluster_id));
681 
682  auto cluster_id_changed = !current_cluster_id.is_nil() &&
683  !(new_cluster_id == current_cluster_id);
684  if (cluster_id_changed) {
685  HZ_LOG(
686  logger_,
687  warning,
688  boost::str(
689  boost::format(
690  "Switching from current cluster: %1% to new cluster: %2%") %
691  current_cluster_id % new_cluster_id));
692  client_.get_hazelcast_client_implementation()->on_cluster_restart();
693  }
694 
695  auto connections_empty = active_connections_.empty();
696  active_connection_ids_.put(connection->get_connection_id(), connection);
697  active_connections_.put(response.member_uuid, connection);
698  if (connections_empty) {
699  cluster_id_ = new_cluster_id;
700  fire_life_cycle_event(
701  lifecycle_event::lifecycle_state::CLIENT_CONNECTED);
702  }
703 
704  auto local_address = connection->get_local_socket_address();
705  if (local_address) {
706  HZ_LOG(
707  logger_,
708  info,
709  boost::str(
710  boost::format(
711  "Authenticated with server %1%:%2%, server version: %3%, "
712  "local address: %4%. %5%") %
713  response.server_address % response.member_uuid %
714  response.server_version % *local_address % *connection));
715  } else {
716  HZ_LOG(
717  logger_,
718  info,
719  boost::str(
720  boost::format(
721  "Authenticated with server %1%:%2%, server version: %3%, "
722  "no local address: (connection disconnected ?). %4%") %
723  response.server_address % response.member_uuid %
724  response.server_version % *connection));
725  }
726 
727  fire_connection_added_event(connection);
728  }
729 
730  // It could happen that this connection is already closed and
731  // on_connection_close() is called even before the synchronized block
732  // above is executed. In this case, now we have a closed but registered
733  // connection. We do a final check here to remove this connection
734  // if needed.
735  if (!connection->is_alive()) {
736  on_connection_close(connection);
737  return nullptr;
738  }
739 
740  // If the client is shutdown in parallel, we need to close this new
741  // connection.
742  if (!client_.get_lifecycle_service().is_running()) {
743  connection->close("Client is shutdown");
744  }
745 
746  return connection;
747 }
748 
749 void
750 ClientConnectionManagerImpl::fire_life_cycle_event(
751  lifecycle_event::lifecycle_state state)
752 {
753  client_.get_lifecycle_service().fire_lifecycle_event(state);
754 }
755 
756 void
757 ClientConnectionManagerImpl::check_partition_count(int32_t new_partition_count)
758 {
759  auto& partition_service =
760  static_cast<spi::impl::ClientPartitionServiceImpl&>(
761  client_.get_partition_service());
762  if (!partition_service.check_and_set_partition_count(new_partition_count)) {
763  BOOST_THROW_EXCEPTION(exception::client_not_allowed_in_cluster(
764  "ClientConnectionManagerImpl::check_partition_count",
765  (boost::format(
766  "Client can not work with this cluster because it has a different "
767  "partition count. "
768  "Expected partition count: %1%, Member partition count: %2%") %
769  partition_service.get_partition_count() % new_partition_count)
770  .str()));
771  }
772 }
773 
774 void
775 ClientConnectionManagerImpl::trigger_cluster_reconnection()
776 {
777  if (reconnect_mode_ ==
778  config::client_connection_strategy_config::reconnect_mode::OFF) {
779  HZ_LOG(
780  logger_, info, "RECONNECT MODE is off. Shutting down the client.");
781  shutdown_with_external_thread(
782  client_.get_hazelcast_client_implementation());
783  return;
784  }
785 
786  if (client_.get_lifecycle_service().is_running()) {
787  submit_connect_to_cluster_task();
788  }
789 }
790 
791 std::shared_ptr<Connection>
792 ClientConnectionManagerImpl::get_random_connection()
793 {
794  if (smart_routing_enabled_) {
795  auto member = load_balancer_.next_(client_.get_cluster());
796  if (!member) {
797  return nullptr;
798  }
799  auto connection = get_connection(member->get_uuid());
800  if (connection) {
801  return connection;
802  }
803  }
804 
805  auto connections = active_connections_.values();
806  if (connections.empty()) {
807  return nullptr;
808  }
809 
810  return connections[0];
811 }
812 
813 boost::uuids::uuid
814 ClientConnectionManagerImpl::get_client_uuid() const
815 {
816  return client_uuid_;
817 }
818 
819 void
820 ClientConnectionManagerImpl::check_invocation_allowed()
821 {
822  if (active_connections_.size() > 0) {
823  return;
824  }
825 
826  if (async_start_) {
827  BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
828  "ClientConnectionManagerImpl::check_invocation_allowed",
829  "No connection found to cluster and async start is configured."));
830  } else if (reconnect_mode_ == config::client_connection_strategy_config::
831  reconnect_mode::ASYNC) {
832  BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
833  "ClientConnectionManagerImpl::check_invocation_allowed",
834  "No connection found to cluster and reconnect mode is async."));
835  } else {
836  BOOST_THROW_EXCEPTION(
837  exception::io("ClientConnectionManagerImpl::check_invocation_allowed",
838  "No connection found to cluster."));
839  }
840 }
841 
842 void
843 ClientConnectionManagerImpl::connect_to_all_cluster_members()
844 {
845  if (!smart_routing_enabled_) {
846  return;
847  }
848 
849  for (const auto& member :
850  client_.get_client_cluster_service().get_member_list()) {
851  try {
852  get_or_connect(member);
853  } catch (std::exception&) {
854  // ignore
855  }
856  }
857 }
858 
859 void
860 ClientConnectionManagerImpl::notify_backup(int64_t call_id)
861 {
862  struct correlation_id
863  {
864  int32_t connnection_id;
865  int32_t call_id;
866  };
867  union
868  {
869  int64_t id;
870  correlation_id composed_id;
871  } c_id_union;
872  c_id_union.id = call_id;
873  auto connection_id = c_id_union.composed_id.connnection_id;
874  auto connection = active_connection_ids_.get(connection_id);
875  if (!connection) {
876  return;
877  }
878  boost::asio::post(connection->get_socket().get_executor(), [=]() {
879  auto invocation_it = connection->invocations.find(call_id);
880  if (invocation_it != connection->invocations.end()) {
881  invocation_it->second->notify_backup();
882  }
883  });
884 }
885 
886 std::shared_ptr<Connection>
887 ClientConnectionManagerImpl::connect(const address& addr)
888 {
889  HZ_LOG(logger_,
890  info,
891  boost::str(boost::format("Trying to connect to %1%.") % addr));
892 
893  auto connection = std::make_shared<Connection>(addr,
894  client_,
895  ++connection_id_gen_,
896  *socket_factory_,
897  *this,
898  connection_timeout_millis_);
899  connection->connect();
900 
901  // call the interceptor from user thread
902  socket_interceptor_.connect_(connection->get_socket());
903 
904  auto result = authenticate_on_cluster(connection);
905 
906  return on_authenticated(connection, result);
907 }
908 
909 address
910 ClientConnectionManagerImpl::translate(const member& m)
911 {
912  if (use_public_address_) {
913  auto public_addr_it = m.address_map().find(PUBLIC_ENDPOINT_QUALIFIER);
914  if (public_addr_it != m.address_map().end()) {
915  return public_addr_it->second;
916  }
917  return m.get_address();
918  }
919 
920  try {
921  boost::optional<address> addr =
922  address_provider_->translate(m.get_address());
923 
924  if (!addr) {
925  throw exception::hazelcast_(boost::str(
926  boost::format("Address Provider could not translate %1%") % m));
927  }
928 
929  return *addr;
930  } catch (const exception::hazelcast_&) {
931  logger_.log(
932  logger::level::warning,
933  boost::str(boost::format("Address Provider could not translate %1%") %
934  m));
935 
936  throw;
937  }
938 }
939 
940 std::shared_ptr<connection::Connection>
941 ClientConnectionManagerImpl::connection_for_sql(
942  std::function<boost::optional<member>()> member_of_large_same_version_group,
943  std::function<boost::optional<member>(boost::uuids::uuid)> get_cluster_member)
944 {
945  if (smart_routing_enabled_) {
946  // There might be a race - the chosen member might be just connected or
947  // disconnected - try a couple of times, the
948  // memberOfLargerSameVersionGroup returns a random connection, we might
949  // be lucky...
950  for (int i = 0; i < SQL_CONNECTION_RANDOM_ATTEMPTS; i++) {
951  auto member = member_of_large_same_version_group();
952  if (!member) {
953  break;
954  }
955  auto connection = active_connections_.get(member->get_uuid());
956  if (connection) {
957  return connection;
958  }
959  }
960  }
961 
962  // Otherwise iterate over connections and return the first one that's not to
963  // a lite member
964  std::shared_ptr<connection::Connection> first_connection;
965  for (const auto& connection_entry : active_connections_.entry_set()) {
966  if (!first_connection) {
967  first_connection = connection_entry.second;
968  }
969  const auto& member_id = connection_entry.first;
970  auto member = get_cluster_member(member_id);
971  if (!member || member->is_lite_member()) {
972  continue;
973  }
974  return connection_entry.second;
975  }
976 
977  // Failed to get a connection to a data member
978  return first_connection;
979 }
980 
981 ReadHandler::ReadHandler(Connection& connection, size_t buffer_size)
982  : buffer(new char[buffer_size])
983  , byte_buffer(buffer, buffer_size)
984  , builder_(connection)
985  , last_read_time_(std::chrono::steady_clock::now().time_since_epoch())
986 {}
987 
988 ReadHandler::~ReadHandler()
989 {
990  delete[] buffer;
991 }
992 
993 void
994 ReadHandler::handle()
995 {
996  last_read_time_ = std::chrono::steady_clock::now().time_since_epoch();
997 
998  if (byte_buffer.position() == 0)
999  return;
1000 
1001  byte_buffer.flip();
1002 
1003  // it is important to check the on_data return value since there may be left
1004  // data less than a message header size, and this may cause an infinite
1005  // loop.
1006  while (byte_buffer.has_remaining() && builder_.on_data(byte_buffer)) {
1007  }
1008 
1009  if (byte_buffer.has_remaining()) {
1010  byte_buffer.compact();
1011  } else {
1012  byte_buffer.clear();
1013  }
1014 }
1015 
1016 std::chrono::steady_clock::time_point
1017 ReadHandler::get_last_read_time() const
1018 {
1019  return std::chrono::steady_clock::time_point{ last_read_time_ };
1020 }
1021 
1022 bool
1023 AddressProvider::is_default_provider()
1024 {
1025  return false;
1026 }
1027 
1028 Connection::Connection(
1029  const address& address,
1030  spi::ClientContext& client_context,
1031  int connection_id, // NOLINT(cppcoreguidelines-pro-type-member-init)
1032  internal::socket::SocketFactory& socket_factory,
1033  ClientConnectionManagerImpl& client_connection_manager,
1034  std::chrono::milliseconds& connect_timeout_in_millis)
1035  : read_handler(*this, 16 << 10)
1036  , start_time_(std::chrono::system_clock::now())
1037  , closed_time_duration_()
1038  , client_context_(client_context)
1039  , invocation_service_(client_context.get_invocation_service())
1040  , connection_id_(connection_id)
1041  , remote_uuid_(boost::uuids::nil_uuid())
1042  , logger_(client_context.get_logger())
1043  , alive_(true)
1044  , last_write_time_(std::chrono::steady_clock::now().time_since_epoch())
1045 {
1046  socket_ = socket_factory.create(address, connect_timeout_in_millis);
1047 }
1048 
1049 Connection::~Connection() = default;
1050 
1051 void
1052 Connection::connect()
1053 {
1054  socket_->connect(shared_from_this());
1055  backup_timer_.reset(
1056  new boost::asio::steady_timer(socket_->get_executor().context()));
1057  auto backupTimeout =
1058  static_cast<spi::impl::ClientInvocationServiceImpl&>(invocation_service_)
1059  .get_backup_timeout();
1060  auto this_connection = shared_from_this();
1061  schedule_periodic_backup_cleanup(backupTimeout, this_connection);
1062 }
1063 
1064 void
1065 Connection::schedule_periodic_backup_cleanup(
1066  std::chrono::milliseconds backup_timeout,
1067  std::shared_ptr<Connection> this_connection)
1068 {
1069  if (!alive_) {
1070  return;
1071  }
1072 
1073  backup_timer_->expires_from_now(backup_timeout);
1074  backup_timer_->async_wait(
1075  socket_->get_executor().wrap([=](boost::system::error_code ec) {
1076  if (ec) {
1077  return;
1078  }
1079  for (const auto& it : this_connection->invocations) {
1080  it.second->detect_and_handle_backup_timeout(backup_timeout);
1081  }
1082 
1083  schedule_periodic_backup_cleanup(backup_timeout, this_connection);
1084  }));
1085 }
1086 
1087 void
1088 Connection::close()
1089 {
1090  close("");
1091 }
1092 
1093 void
1094 Connection::close(const std::string& reason)
1095 {
1096  close(reason, nullptr);
1097 }
1098 
1099 void
1100 Connection::close(const std::string& reason, std::exception_ptr cause)
1101 {
1102  bool expected = true;
1103  if (!alive_.compare_exchange_strong(expected, false)) {
1104  return;
1105  }
1106 
1107  closed_time_duration_.store(
1108  std::chrono::duration_cast<std::chrono::milliseconds>(
1109  std::chrono::steady_clock::now().time_since_epoch()));
1110 
1111  if (backup_timer_) {
1112  boost::system::error_code ignored;
1113  backup_timer_->cancel(ignored);
1114  }
1115 
1116  close_cause_ = cause;
1117  close_reason_ = reason;
1118 
1119  log_close();
1120 
1121  try {
1122  inner_close();
1123  } catch (exception::iexception& e) {
1124  HZ_LOG(
1125  client_context_.get_logger(),
1126  warning,
1127  boost::str(boost::format("Exception while closing connection %1%") %
1128  e.get_message()));
1129  }
1130 
1131  auto thisConnection = shared_from_this();
1132  client_context_.get_connection_manager().on_connection_close(
1133  thisConnection);
1134 
1135  boost::asio::post(socket_->get_executor(), [=]() {
1136  for (auto& invocationEntry : thisConnection->invocations) {
1137  invocationEntry.second->notify_exception(std::make_exception_ptr(
1138  boost::enable_current_exception(exception::target_disconnected(
1139  "Connection::close", thisConnection->get_close_reason()))));
1140  }
1141  });
1142 }
1143 
1144 void
1145 Connection::write(
1146  const std::shared_ptr<spi::impl::ClientInvocation>& client_invocation)
1147 {
1148  socket_->async_write(shared_from_this(), client_invocation);
1149 }
1150 
1151 const boost::optional<address>&
1152 Connection::get_remote_address() const
1153 {
1154  return remote_address_;
1155 }
1156 
1157 void
1158 Connection::set_remote_address(boost::optional<address> endpoint)
1159 {
1160  this->remote_address_ = std::move(endpoint);
1161 }
1162 
1163 void
1164 Connection::handle_client_message(
1165  const std::shared_ptr<protocol::ClientMessage>& message)
1166 {
1167  auto correlationId = message->get_correlation_id();
1168  auto invocationIterator = invocations.find(correlationId);
1169  if (invocationIterator == invocations.end()) {
1170  HZ_LOG(logger_,
1171  warning,
1172  boost::str(boost::format("No invocation for callId: %1%. "
1173  "Dropping this message: %2%") %
1174  correlationId % *message));
1175  return;
1176  }
1177  auto invocation = invocationIterator->second;
1178  auto flags = message->get_header_flags();
1179  if (message->is_flag_set(flags,
1180  protocol::ClientMessage::BACKUP_EVENT_FLAG)) {
1181  message->rd_ptr(protocol::ClientMessage::EVENT_HEADER_LEN);
1182  correlationId = message->get<int64_t>();
1183  client_context_.get_connection_manager().notify_backup(correlationId);
1184  } else if (message->is_flag_set(flags,
1185  protocol::ClientMessage::IS_EVENT_FLAG)) {
1186  client_context_.get_client_listener_service().handle_client_message(
1187  invocation, message);
1188  } else {
1189  invocation_service_.handle_client_message(invocation, message);
1190  }
1191 }
1192 
1193 int32_t
1194 Connection::get_connection_id() const
1195 {
1196  return connection_id_;
1197 }
1198 
1199 bool
1200 Connection::is_alive() const
1201 {
1202  return alive_;
1203 }
1204 
1205 const std::string&
1206 Connection::get_close_reason() const
1207 {
1208  return close_reason_;
1209 }
1210 
1211 void
1212 Connection::log_close()
1213 {
1214  std::ostringstream message;
1215  message << *this << " closed. Reason: ";
1216  if (!close_reason_.empty()) {
1217  message << close_reason_;
1218  } else if (close_cause_) {
1219  try {
1220  std::rethrow_exception(close_cause_);
1221  } catch (exception::iexception& ie) {
1222  message << ie.get_source() << "[" + ie.get_message() << "]";
1223  }
1224  } else {
1225  message << "Socket explicitly closed";
1226  }
1227 
1228  if (client_context_.get_lifecycle_service().is_running()) {
1229  if (!close_cause_) {
1230  HZ_LOG(logger_, info, message.str());
1231  } else {
1232  try {
1233  std::rethrow_exception(close_cause_);
1234  } catch (exception::iexception& ie) {
1235  HZ_LOG(
1236  logger_,
1237  warning,
1238  boost::str(boost::format("%1% %2%") % message.str() % ie));
1239  }
1240  }
1241  } else {
1242  HZ_LOG(
1243  logger_, finest, message.str() + [this]() -> std::string {
1244  if (close_cause_) {
1245  try {
1246  std::rethrow_exception(close_cause_);
1247  } catch (exception::iexception& ie) {
1248  return ie.what();
1249  }
1250  }
1251  return "";
1252  }());
1253  }
1254 }
1255 
1256 bool
1257 Connection::operator==(const Connection& rhs) const
1258 {
1259  return connection_id_ == rhs.connection_id_;
1260 }
1261 
1262 bool
1263 Connection::operator!=(const Connection& rhs) const
1264 {
1265  return !(rhs == *this);
1266 }
1267 
1268 const std::string&
1269 Connection::get_connected_server_version_string() const
1270 {
1271  return connected_server_version_string_;
1272 }
1273 
1274 void
1275 Connection::set_connected_server_version(const std::string& connected_server)
1276 {
1277  Connection::connected_server_version_string_ = connected_server;
1278 }
1279 
1280 boost::optional<address>
1281 Connection::get_local_socket_address() const
1282 {
1283  return socket_->local_socket_address();
1284 }
1285 
1286 std::chrono::steady_clock::time_point
1287 Connection::last_read_time() const
1288 {
1289  return read_handler.get_last_read_time();
1290 }
1291 
1292 void
1293 Connection::inner_close()
1294 {
1295  if (!socket_) {
1296  return;
1297  }
1298 
1299  auto thisConnection = shared_from_this();
1300  boost::asio::post(socket_->get_executor(),
1301  [=]() { thisConnection->socket_->close(); });
1302 }
1303 
1304 std::ostream&
1305 operator<<(std::ostream& os, const Connection& connection)
1306 {
1307  os << "Connection{"
1308  << "alive=" << connection.is_alive()
1309  << ", connection id=" << connection.get_connection_id()
1310  << ", remote endpoint=";
1311  if (connection.get_remote_address()) {
1312  os << *connection.get_remote_address();
1313  } else {
1314  os << "null";
1315  }
1316  os << ", last_read_time="
1317  << util::StringUtil::time_to_string(connection.last_read_time())
1318  << ", last_write_time="
1319  << util::StringUtil::time_to_string(connection.last_write_time())
1320  << ", closedTime="
1321  << util::StringUtil::time_to_string(
1322  std::chrono::steady_clock::time_point(
1323  std::chrono::duration_cast<std::chrono::steady_clock::duration>(
1324  connection.closed_time_duration_.load())))
1325  << ", connected server version="
1326  << connection.connected_server_version_string_ << '}';
1327 
1328  return os;
1329 }
1330 
1331 bool
1332 Connection::operator<(const Connection& rhs) const
1333 {
1334  return connection_id_ < rhs.connection_id_;
1335 }
1336 
1337 std::chrono::system_clock::time_point
1338 Connection::get_start_time() const
1339 {
1340  return start_time_;
1341 }
1342 
1343 socket&
1344 Connection::get_socket()
1345 {
1346  return *socket_;
1347 }
1348 
1349 void
1350 Connection::deregister_invocation(int64_t call_id)
1351 {
1352  invocations.erase(call_id);
1353 }
1354 
1355 boost::uuids::uuid
1356 Connection::get_remote_uuid() const
1357 {
1358  return remote_uuid_;
1359 }
1360 
1361 void
1362 Connection::set_remote_uuid(boost::uuids::uuid remote_uuid)
1363 {
1364  remote_uuid_ = remote_uuid;
1365 }
1366 
1367 void
1368 Connection::last_write_time(std::chrono::steady_clock::time_point tp)
1369 {
1370  last_write_time_ = tp.time_since_epoch();
1371 }
1372 
1373 std::chrono::steady_clock::time_point
1374 Connection::last_write_time() const
1375 {
1376  return std::chrono::steady_clock::time_point{ last_write_time_ };
1377 }
1378 
1379 HeartbeatManager::HeartbeatManager(
1380  spi::ClientContext& client,
1381  ClientConnectionManagerImpl& connection_manager)
1382  : client_(client)
1383  , client_connection_manager_(connection_manager)
1384  , logger_(client.get_logger())
1385 {
1386  client_properties& clientProperties = client.get_client_properties();
1387  auto timeout_millis =
1388  clientProperties.get_long(clientProperties.get_heartbeat_timeout());
1389  heartbeat_timeout_ = std::chrono::milliseconds(
1390  timeout_millis > 0
1391  ? timeout_millis
1392  : util::IOUtil::to_value<int64_t>(
1393  client_properties::PROP_HEARTBEAT_TIMEOUT_DEFAULT));
1394 
1395  auto interval_millis =
1396  clientProperties.get_long(clientProperties.get_heartbeat_interval());
1397  heartbeat_interval_ = std::chrono::milliseconds(
1398  interval_millis > 0
1399  ? interval_millis
1400  : util::IOUtil::to_value<int64_t>(
1401  client_properties::PROP_HEARTBEAT_INTERVAL_DEFAULT));
1402 }
1403 
1404 void
1405 HeartbeatManager::start()
1406 {
1407  spi::impl::ClientExecutionServiceImpl& clientExecutionService =
1408  client_.get_client_execution_service();
1409 
1410  timer_ = clientExecutionService.schedule_with_repetition(
1411  [=]() {
1412  if (!client_connection_manager_.is_alive()) {
1413  return;
1414  }
1415 
1416  for (auto& connection :
1417  client_connection_manager_.get_active_connections()) {
1418  check_connection(connection);
1419  }
1420  },
1421  heartbeat_interval_,
1422  heartbeat_interval_);
1423 }
1424 
1425 void
1426 HeartbeatManager::check_connection(
1427  const std::shared_ptr<Connection>& connection)
1428 {
1429  if (!connection->is_alive()) {
1430  return;
1431  }
1432 
1433  auto now = std::chrono::steady_clock::now();
1434  if (now - connection->last_read_time() > heartbeat_timeout_) {
1435  HZ_LOG(logger_,
1436  warning,
1437  boost::str(
1438  boost::format("Heartbeat failed over the connection: %1%") %
1439  *connection));
1440  on_heartbeat_stopped(connection, "Heartbeat timed out");
1441  return;
1442  }
1443 
1444  if (now - connection->last_write_time() > heartbeat_interval_) {
1445  auto request = protocol::codec::client_ping_encode();
1446  std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
1447  spi::impl::ClientInvocation::create(client_, request, "", connection);
1448  clientInvocation->invoke_urgent();
1449  }
1450 }
1451 
1452 void
1453 HeartbeatManager::on_heartbeat_stopped(
1454  const std::shared_ptr<Connection>& connection,
1455  const std::string& reason)
1456 {
1457  connection->close(
1458  reason,
1459  std::make_exception_ptr(
1460  (exception::exception_builder<exception::target_disconnected>(
1461  "HeartbeatManager::onHeartbeatStopped")
1462  << "Heartbeat timed out to connection " << *connection)
1463  .build()));
1464 }
1465 
1466 void
1467 HeartbeatManager::shutdown()
1468 {
1469  if (timer_) {
1470  boost::system::error_code ignored;
1471  timer_->cancel(ignored);
1472  }
1473 }
1474 
1475 std::chrono::milliseconds
1476 HeartbeatManager::get_heartbeat_timeout() const
1477 {
1478  return heartbeat_timeout_;
1479 }
1480 
1481 void
1482 wait_strategy::reset()
1483 {
1484  attempt_ = 0;
1485  cluster_connect_attempt_begin_ = std::chrono::steady_clock::now();
1486  current_backoff_millis_ =
1487  (std::min)(max_backoff_millis_, initial_backoff_millis_);
1488 }
1489 
1490 wait_strategy::wait_strategy(
1491  const config::connection_retry_config& retry_config,
1492  logger& log)
1493  : initial_backoff_millis_(retry_config.get_initial_backoff_duration())
1494  , max_backoff_millis_(retry_config.get_max_backoff_duration())
1495  , multiplier_(retry_config.get_multiplier())
1496  , jitter_(retry_config.get_jitter())
1497  , logger_(log)
1498  , cluster_connect_timeout_millis_(retry_config.get_cluster_connect_timeout())
1499 {
1500  if (cluster_connect_timeout_millis_ == std::chrono::milliseconds::max()) {
1501  cluster_connect_timeout_text_ = "INFINITE";
1502  } else {
1503  cluster_connect_timeout_text_ =
1504  (boost::format("%1% msecs") % cluster_connect_timeout_millis_.count())
1505  .str();
1506  }
1507 }
1508 
1509 bool
1510 wait_strategy::sleep()
1511 {
1512  attempt_++;
1513  using namespace std::chrono;
1514  auto current_time = steady_clock::now();
1515  auto time_passed = duration_cast<milliseconds>(
1516  current_time - cluster_connect_attempt_begin_);
1517  if (time_passed > cluster_connect_timeout_millis_) {
1518  HZ_LOG(
1519  logger_,
1520  warning,
1521  (boost::format("Unable to get live cluster connection, cluster "
1522  "connect timeout (%1%) is reached. Attempt %2%.") %
1523  cluster_connect_timeout_text_ % attempt_)
1524  .str());
1525  return false;
1526  }
1527 
1528  // sleep time: current_backoff_millis_(1 +- (jitter * [0, 1]))
1529  auto actual_sleep_time =
1530  current_backoff_millis_ + milliseconds(static_cast<milliseconds::rep>(
1531  current_backoff_millis_.count() * jitter_ *
1532  (2.0 * random_(random_generator_) - 1.0)));
1533 
1534  actual_sleep_time = (std::min)(
1535  actual_sleep_time, cluster_connect_timeout_millis_ - time_passed);
1536 
1537  HZ_LOG(
1538  logger_,
1539  warning,
1540  (boost::format(
1541  "Unable to get live cluster connection, retry in %1% ms, attempt: %2% "
1542  ", cluster connect timeout: %3% , max backoff millis: %4%") %
1543  actual_sleep_time.count() % attempt_ % cluster_connect_timeout_text_ %
1544  max_backoff_millis_.count())
1545  .str());
1546 
1547  std::this_thread::sleep_for(actual_sleep_time);
1548 
1549  current_backoff_millis_ =
1550  (std::min)(milliseconds(static_cast<milliseconds::rep>(
1551  current_backoff_millis_.count() * multiplier_)),
1552  max_backoff_millis_);
1553  return true;
1554 }
1555 } // namespace connection
1556 
1557 namespace internal {
1558 namespace socket {
1559 SocketFactory::SocketFactory(spi::ClientContext& client_context,
1560  boost::asio::io_context& io,
1561  boost::asio::ip::tcp::resolver& resolver)
1562  : client_context_(client_context)
1563  , io_(io)
1564  , io_resolver_(resolver)
1565 {}
1566 
1567 bool
1568 SocketFactory::start()
1569 {
1570 #ifdef HZ_BUILD_WITH_SSL
1571  auto& sslConfig =
1572  client_context_.get_client_config().get_network_config().get_ssl_config();
1573  if (sslConfig.is_enabled()) {
1574  if (sslConfig.ssl_context_) {
1575  ssl_context_ = sslConfig.ssl_context_;
1576  } else {
1577  ssl_context_ = std::make_shared<boost::asio::ssl::context>(
1578  (boost::asio::ssl::context_base::method)sslConfig.get_protocol());
1579 
1580  ssl_context_->set_verify_mode(boost::asio::ssl::verify_peer);
1581  ssl_context_->set_default_verify_paths();
1582 
1583  const std::vector<std::string>& verifyFiles =
1584  sslConfig.get_verify_files();
1585  bool success = true;
1586  logger& lg = client_context_.get_logger();
1587  for (const auto& f : verifyFiles) {
1588  boost::system::error_code ec;
1589  ssl_context_->load_verify_file(f, ec);
1590  if (ec) {
1591  HZ_LOG(
1592  lg,
1593  warning,
1594  boost::str(
1595  boost::format("SocketFactory::start: Failed to load CA "
1596  "verify file at %1% %2%") %
1597  f % ec.message()));
1598  success = false;
1599  }
1600  }
1601 
1602  if (!success) {
1603  ssl_context_.reset();
1604  HZ_LOG(lg,
1605  warning,
1606  "SocketFactory::start: Failed to load one or more "
1607  "configured CA verify files (PEM files). Please "
1608  "correct the files and retry.");
1609  return false;
1610  }
1611  }
1612 
1613  // set cipher list if the list is set
1614  const std::string& cipherList = sslConfig.get_cipher_list();
1615  if (!cipherList.empty()) {
1616  if (!SSL_CTX_set_cipher_list(ssl_context_->native_handle(),
1617  cipherList.c_str())) {
1618  logger& lg = client_context_.get_logger();
1619  HZ_LOG(lg,
1620  warning,
1621  std::string("SocketFactory::start: Could not load any "
1622  "of the ciphers in the config provided "
1623  "ciphers:") +
1624  cipherList);
1625  return false;
1626  }
1627  }
1628  }
1629 #else
1630  (void)client_context_;
1631 #endif
1632  return true;
1633 }
1634 
1635 std::unique_ptr<hazelcast::client::socket>
1636 SocketFactory::create(const address& address,
1637  std::chrono::milliseconds& connect_timeout_in_millis)
1638 {
1639 #ifdef HZ_BUILD_WITH_SSL
1640  if (ssl_context_.get()) {
1641  return std::unique_ptr<hazelcast::client::socket>(
1642  new internal::socket::SSLSocket(io_,
1643  *ssl_context_,
1644  address,
1645  client_context_.get_client_config()
1646  .get_network_config()
1647  .get_socket_options(),
1648  connect_timeout_in_millis,
1649  io_resolver_));
1650  }
1651 #endif
1652 
1653  return std::unique_ptr<hazelcast::client::socket>(
1654  new internal::socket::TcpSocket(io_,
1655  address,
1656  client_context_.get_client_config()
1657  .get_network_config()
1658  .get_socket_options(),
1659  connect_timeout_in_millis,
1660  io_resolver_));
1661 }
1662 
1663 #ifdef HZ_BUILD_WITH_SSL
1664 
1665 SSLSocket::SSLSocket(boost::asio::io_context& io_service,
1666  boost::asio::ssl::context& ssl_context,
1667  const client::address& address,
1668  client::config::socket_options& socket_options,
1669  std::chrono::milliseconds& connect_timeout_in_millis,
1670  boost::asio::ip::tcp::resolver& resolver)
1671  : BaseSocket<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>(
1672  resolver,
1673  address,
1674  socket_options,
1675  io_service,
1676  connect_timeout_in_millis,
1677  ssl_context)
1678 {}
1679 
1680 std::vector<SSLSocket::CipherInfo>
1681 SSLSocket::get_ciphers()
1682 {
1683  STACK_OF(SSL_CIPHER)* ciphers = SSL_get_ciphers(socket_.native_handle());
1684  std::vector<CipherInfo> supportedCiphers;
1685  for (int i = 0; i < sk_SSL_CIPHER_num(ciphers); ++i) {
1686  struct SSLSocket::CipherInfo info;
1687  const SSL_CIPHER* cipher = sk_SSL_CIPHER_value(ciphers, i);
1688  info.name = SSL_CIPHER_get_name(cipher);
1689  info.number_of_bits = SSL_CIPHER_get_bits(cipher, 0);
1690  info.version = SSL_CIPHER_get_version(cipher);
1691  char descBuf[256];
1692  info.description = SSL_CIPHER_description(cipher, descBuf, 256);
1693  supportedCiphers.push_back(info);
1694  }
1695  return supportedCiphers;
1696 }
1697 
1698 void
1699 SSLSocket::post_connect()
1700 {
1701  socket_.handshake(boost::asio::ssl::stream_base::client);
1702 }
1703 
1704 std::ostream&
1705 operator<<(std::ostream& out, const SSLSocket::CipherInfo& info)
1706 {
1707  out << "Cipher{"
1708  "Name: "
1709  << info.name << ", Bits:" << info.number_of_bits
1710  << ", Version:" << info.version << ", Description:" << info.description
1711  << "}";
1712 
1713  return out;
1714 }
1715 
1716 #endif // HZ_BUILD_WITH_SSL
1717 
1718 TcpSocket::TcpSocket(boost::asio::io_context& io,
1719  const address& address,
1720  client::config::socket_options& socket_options,
1721  std::chrono::milliseconds& connect_timeout_in_millis,
1722  boost::asio::ip::tcp::resolver& resolver)
1723  : BaseSocket<boost::asio::ip::tcp::socket>(resolver,
1724  address,
1725  socket_options,
1726  io,
1727  connect_timeout_in_millis)
1728 {}
1729 
1730 } // namespace socket
1731 } // namespace internal
1732 } // namespace client
1733 } // namespace hazelcast
1734 
1735 namespace std {
1736 std::size_t
1737 hash<std::shared_ptr<hazelcast::client::connection::Connection>>::operator()(
1738  const std::shared_ptr<hazelcast::client::connection::Connection>& conn)
1739  const noexcept
1740 {
1741  if (!conn) {
1742  return 0;
1743  }
1744  return std::abs(conn->get_connection_id());
1745 }
1746 } // namespace std