Hazelcast C++ Client
Hazelcast C++ Client Library
network.cpp
1 /*
2  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
19  *
20  * Licensed under the Apache License, Version 2.0 (the "License");
21  * you may not use this file except in compliance with the License.
22  * You may obtain a copy of the License at
23  *
24  * http://www.apache.org/licenses/LICENSE-2.0
25  *
26  * Unless required by applicable law or agreed to in writing, software
27  * distributed under the License is distributed on an "AS IS" BASIS,
28  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29  * See the License for the specific language governing permissions and
30  * limitations under the License.
31  */
32 
33 #include <cstdlib>
34 #include <unordered_set>
35 
36 #include "hazelcast/client/execution_callback.h"
37 #include "hazelcast/client/lifecycle_event.h"
38 #include "hazelcast/client/connection/AddressProvider.h"
39 #include "hazelcast/client/spi/impl/ClientInvocation.h"
40 #include "hazelcast/util/Util.h"
41 #include "hazelcast/client/protocol/AuthenticationStatus.h"
42 #include "hazelcast/client/exception/protocol_exceptions.h"
43 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
44 #include "hazelcast/client/connection/ConnectionListener.h"
45 #include "hazelcast/client/connection/Connection.h"
46 #include "hazelcast/client/spi/ClientContext.h"
47 #include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
48 #include "hazelcast/client/serialization/serialization.h"
49 #include "hazelcast/client/protocol/UsernamePasswordCredentials.h"
50 #include "hazelcast/client/protocol/codec/codecs.h"
51 #include "hazelcast/client/client_config.h"
52 #include "hazelcast/client/socket_interceptor.h"
53 #include "hazelcast/client/config/client_network_config.h"
54 #include "hazelcast/client/client_properties.h"
55 #include "hazelcast/client/connection/HeartbeatManager.h"
56 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
57 #include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
58 #include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
59 #include "hazelcast/client/internal/socket/TcpSocket.h"
60 #include "hazelcast/client/internal/socket/SSLSocket.h"
61 #include "hazelcast/client/config/ssl_config.h"
62 #include "hazelcast/util/IOUtil.h"
63 #include "hazelcast/client/internal/socket/SocketFactory.h"
64 #include "hazelcast/client/connection/wait_strategy.h"
65 
66 namespace hazelcast {
67  namespace client {
68  namespace connection {
69  constexpr size_t ClientConnectionManagerImpl::EXECUTOR_CORE_POOL_SIZE;
70 
71  ClientConnectionManagerImpl::ClientConnectionManagerImpl(spi::ClientContext &client,
72  std::unique_ptr<AddressProvider> address_provider)
73  : alive_(false), logger_(client.get_logger()),
74  connection_timeout_millis_((std::chrono::milliseconds::max) ()),
75  client_(client), socket_interceptor_(client.get_client_config().get_socket_interceptor()),
76  address_provider_(std::move(address_provider)), connection_id_gen_(0),
77  heartbeat_(client, *this),
78  async_start_(client.get_client_config().get_connection_strategy_config().is_async_start()),
79  reconnect_mode_(client.get_client_config().get_connection_strategy_config().get_reconnect_mode()),
80  smart_routing_enabled_(client.get_client_config().get_network_config().is_smart_routing()),
81  client_uuid_(client.random_uuid()),
82  authentication_timeout_(boost::chrono::milliseconds(heartbeat_.get_heartbeat_timeout().count())),
83  load_balancer_(client.get_client_config().get_load_balancer()),
84  wait_strategy_(client.get_client_config().get_connection_strategy_config().get_retry_config(),
85  logger_), cluster_id_(boost::uuids::nil_uuid()),
86  connect_to_cluster_task_submitted_(false) {
87 
88  config::client_network_config &networkConfig = client.get_client_config().get_network_config();
89  auto connTimeout = networkConfig.get_connection_timeout();
90  if (connTimeout.count() > 0) {
91  connection_timeout_millis_ = std::chrono::milliseconds(connTimeout);
92  }
93 
94  client_properties &clientProperties = client.get_client_properties();
95  shuffle_member_list_ = clientProperties.get_boolean(clientProperties.get_shuffle_member_list());
96  }
97 
98  bool ClientConnectionManagerImpl::start() {
99  bool expected = false;
100  if (!alive_.compare_exchange_strong(expected, true)) {
101  return false;
102  }
103 
104  io_context_.reset(new boost::asio::io_context);
105  io_resolver_.reset(new boost::asio::ip::tcp::resolver(io_context_->get_executor()));
106  socket_factory_.reset(new internal::socket::SocketFactory(client_, *io_context_, *io_resolver_));
107  io_guard_.reset(new boost::asio::io_context::work(*io_context_));
108 
109  if (!socket_factory_->start()) {
110  return false;
111  }
112 
113  socket_interceptor_ = client_.get_client_config().get_socket_interceptor();
114 
115  io_thread_ = std::thread([=]() { io_context_->run(); });
116 
117  executor_.reset(new hazelcast::util::hz_thread_pool(EXECUTOR_CORE_POOL_SIZE));
118  connect_to_members_timer_ = boost::asio::steady_timer(executor_->get_executor());
119 
120  heartbeat_.start();
121 
122  connect_to_cluster();
123  if (smart_routing_enabled_) {
124  schedule_connect_to_all_members();
125  }
126 
127  load_balancer_.init_(client_.get_cluster());
128 
129  return true;
130  }
131 
132  void ClientConnectionManagerImpl::schedule_connect_to_all_members() {
133  connect_to_members_timer_->expires_from_now(boost::asio::chrono::seconds(1));
134  connect_to_members_timer_->async_wait([=](boost::system::error_code ec) {
135  if (ec == boost::asio::error::operation_aborted) {
136  return;
137  }
138  connect_to_all_members();
139 
140  if (!client_.get_lifecycle_service().is_running()) {
141  return;
142  }
143 
144  schedule_connect_to_all_members();
145  });
146  }
147 
148  void ClientConnectionManagerImpl::shutdown() {
149  bool expected = true;
150  if (!alive_.compare_exchange_strong(expected, false)) {
151  return;
152  }
153 
154  if (connect_to_members_timer_) {
155  connect_to_members_timer_->cancel();
156  }
157 
158  heartbeat_.shutdown();
159 
160  // close connections
161  for (auto &connection : active_connections_.values()) {
162  // prevent any exceptions
163  util::IOUtil::close_resource(connection.get(), "Hazelcast client is shutting down");
164  }
165 
166  spi::impl::ClientExecutionServiceImpl::shutdown_thread_pool(executor_.get());
167 
168  // release the guard so that the io thread can stop gracefully
169  io_guard_.reset();
170  io_thread_.join();
171 
172  connection_listeners_.clear();
173  active_connections_.clear();
174  active_connection_ids_.clear();
175  }
176 
177  std::shared_ptr<Connection>
178  ClientConnectionManagerImpl::get_or_connect(const address &address) {
179  auto connection = get_connection(address);
180  if (connection) {
181  return connection;
182  }
183 
184  return connect(address);
185  }
186 
187  std::shared_ptr<Connection>
188  ClientConnectionManagerImpl::get_or_connect(const member &m) {
189  const auto &uuid = m.get_uuid();
190  auto connection = active_connections_.get(uuid);
191  if (connection) {
192  return connection;
193  }
194 
195  return connect(m.get_address());
196  }
197 
198  std::vector<std::shared_ptr<Connection> > ClientConnectionManagerImpl::get_active_connections() {
199  return active_connections_.values();
200  }
201 
202  std::shared_ptr<Connection>
203  ClientConnectionManagerImpl::get_connection(const address &address) {
204  for (const auto &connection : active_connections_.values()) {
205  auto remote_address = connection->get_remote_address();
206  if (remote_address && *remote_address == address) {
207  return connection;
208  }
209  }
210  return nullptr;
211  }
212 
213  std::shared_ptr<Connection> ClientConnectionManagerImpl::get_connection(boost::uuids::uuid uuid) {
214  return active_connections_.get(uuid);
215  }
216 
217  ClientConnectionManagerImpl::auth_response
218  ClientConnectionManagerImpl::authenticate_on_cluster(std::shared_ptr<Connection> &connection) {
219  auto request = encode_authentication_request(client_.get_serialization_service());
220  auto clientInvocation = spi::impl::ClientInvocation::create(client_, request, "", connection);
221  auto f = clientInvocation->invoke_urgent();
222 
223  struct auth_response result;
224  try {
225  if (f.wait_for(authentication_timeout_) != boost::future_status::ready) {
226  BOOST_THROW_EXCEPTION(exception::timeout(
227  "ClientConnectionManagerImpl::authenticate", (boost::format("Authentication response is "
228  "not received for %1% msecs for %2%") %
229  authentication_timeout_.count() %
230  *clientInvocation).str()));
231  }
232  auto response = f.get();
233  auto *initial_frame = reinterpret_cast<protocol::ClientMessage::frame_header_t *>(response.rd_ptr(
234  protocol::ClientMessage::RESPONSE_HEADER_LEN));
235  result = {
236  response.get<byte>(), response.get<boost::uuids::uuid>(),
237  response.get<byte>(), response.get<int32_t>(),
238  response.get<boost::uuids::uuid>()
239  };
240  // skip first frame
241  response.rd_ptr(static_cast<int32_t>(initial_frame->frame_len) -
242  protocol::ClientMessage::RESPONSE_HEADER_LEN -
243  2 * protocol::ClientMessage::UINT8_SIZE -
244  2 * (sizeof(boost::uuids::uuid) + protocol::ClientMessage::UINT8_SIZE) -
245  protocol::ClientMessage::INT32_SIZE);
246 
247  result.server_address = response.get_nullable<address>();
248  result.server_version = response.get<std::string>();
249  } catch (exception::iexception &) {
250  connection->close("Failed to authenticate connection", std::current_exception());
251  throw;
252  }
253 
254  auto authentication_status = (protocol::authentication_status) result.status;
255  switch (authentication_status) {
256  case protocol::AUTHENTICATED: {
257  return result;
258  }
259  case protocol::CREDENTIALS_FAILED: {
260  auto e = std::make_exception_ptr(
261  exception::authentication("ClientConnectionManagerImpl::authenticate_on_cluster",
262  "Authentication failed. The configured cluster name on the client (see client_config::set_cluster_name()) does not match the one configured in the cluster or the credentials set in the Client security config could not be authenticated"));
263  connection->close("Failed to authenticate connection", e);
264  std::rethrow_exception(e);
265  }
266  case protocol::NOT_ALLOWED_IN_CLUSTER: {
267  auto e = std::make_exception_ptr(
268  exception::authentication("ClientConnectionManagerImpl::authenticate_on_cluster",
269  "Client is not allowed in the cluster"));
270  connection->close("Failed to authenticate connection", e);
271  std::rethrow_exception(e);
272  }
273  default: {
274  auto e = std::make_exception_ptr(exception::authentication(
275  "ClientConnectionManagerImpl::authenticate_on_cluster",
276  (boost::format("Authentication status code not supported. status: %1%") %
277  authentication_status).str()));
278  connection->close("Failed to authenticate connection", e);
279  std::rethrow_exception(e);
280  }
281  }
282  }
283 
284  protocol::ClientMessage
285  ClientConnectionManagerImpl::encode_authentication_request(serialization::pimpl::SerializationService &ss) {
286  byte serializationVersion = ss.get_version();
287  client_config &clientConfig = client_.get_client_config();
288  auto cluster_name = clientConfig.get_cluster_name();
289 
290  auto credential = clientConfig.get_credentials();
291  if (!credential) {
292  return protocol::codec::client_authentication_encode(cluster_name, nullptr, nullptr,
293  client_uuid_, protocol::ClientTypes::CPP,
294  serializationVersion, HAZELCAST_VERSION,
295  client_.get_name(), labels_);
296  }
297 
298  switch(credential->type()) {
299  case security::credentials::credential_type::username_password:
300  {
301  auto cr = std::static_pointer_cast<security::username_password_credentials>(credential);
302  return protocol::codec::client_authentication_encode(cluster_name, &cr->name(), &cr->password(),
303  client_uuid_, protocol::ClientTypes::CPP,
304  serializationVersion, HAZELCAST_VERSION,
305  client_.get_name(), labels_);
306  }
307  case security::credentials::credential_type::token:
308  {
309  auto cr = std::static_pointer_cast<security::token_credentials>(credential);
310  return protocol::codec::client_authenticationcustom_encode(cluster_name, cr->token(),
311  client_uuid_, protocol::ClientTypes::CPP,
312  serializationVersion, HAZELCAST_VERSION,
313  client_.get_name(), labels_);
314  }
315  }
316  assert(0);
317  return protocol::ClientMessage();
318  }
319 
320  void
321  ClientConnectionManagerImpl::fire_connection_added_event(const std::shared_ptr<Connection> &connection) {
322  for (const std::shared_ptr<ConnectionListener> &connectionListener : connection_listeners_.to_array()) {
323  connectionListener->connection_added(connection);
324  }
325  }
326 
327  void
328  ClientConnectionManagerImpl::fire_connection_removed_event(const std::shared_ptr<Connection> &connection) {
329  for (const auto &listener : connection_listeners_.to_array()) {
330  listener->connection_removed(connection);
331  }
332  }
333 
334  void
335  ClientConnectionManagerImpl::shutdown_with_external_thread(
336  std::weak_ptr<client::impl::hazelcast_client_instance_impl> client_impl) {
337  std::thread([=] {
338  std::shared_ptr<client::impl::hazelcast_client_instance_impl> clientInstance = client_impl.lock();
339  if (!clientInstance || !clientInstance->get_lifecycle_service().is_running()) {
340  return;
341  }
342 
343  try {
344  clientInstance->get_lifecycle_service().shutdown();
345  } catch (exception::iexception &e) {
346  HZ_LOG(*clientInstance->get_logger(), severe,
347  boost::str(boost::format("Exception during client shutdown "
348  "%1%.clientShutdown-:%2%")
349  % clientInstance->get_name()
350  % e)
351  );
352  }
353  }).detach();
354  }
355 
356  void ClientConnectionManagerImpl::submit_connect_to_cluster_task() {
357  bool expected = false;
358  if (!connect_to_cluster_task_submitted_.compare_exchange_strong(expected, true)) {
359  return;
360  }
361 
362  std::weak_ptr<client::impl::hazelcast_client_instance_impl> c = client_.get_hazelcast_client_implementation();
363  boost::asio::post(executor_->get_executor(), [=]() {
364  try {
365  do_connect_to_cluster();
366 
367  std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
368  connect_to_cluster_task_submitted_ = false;
369  if (active_connections_.empty()) {
370  HZ_LOG(logger_, finest,
371  boost::str(boost::format("No connection to cluster: %1%")
372  % cluster_id_)
373  );
374 
375  submit_connect_to_cluster_task();
376  }
377 
378  } catch (std::exception &e) {
379  HZ_LOG(logger_, warning,
380  boost::str(boost::format("Could not connect to any cluster, "
381  "shutting down the client: %1%")
382  % e.what())
383  );
384 
385  shutdown_with_external_thread(c);
386  }
387  });
388  }
389 
390  void ClientConnectionManagerImpl::connect_to_all_members() {
391  if (!client_.get_lifecycle_service().is_running() || active_connections_.empty()) {
392  return;
393  }
394 
395  for (const auto &member : client_.get_client_cluster_service().get_member_list()) {
396  const auto& member_addr = member.get_address();
397 
398  if (client_.get_lifecycle_service().is_running() && !get_connection(member_addr)
399  && connecting_addresses_.get_or_put_if_absent(member_addr, nullptr).second) {
400  // submit a task for this address only if there is no other pending connection attempt for it
401  address addr = member_addr;
402  boost::asio::post(executor_->get_executor(), [=]() {
403  try {
404  if (!client_.get_lifecycle_service().is_running()) {
405  return;
406  }
407  if (!get_connection(member.get_uuid())) {
408  get_or_connect(addr);
409  }
410  connecting_addresses_.remove(addr);
411  } catch (std::exception &) {
412  connecting_addresses_.remove(addr);
413  }
414  });
415  }
416  }
417  }
418 
419  bool ClientConnectionManagerImpl::do_connect_to_cluster() {
420  std::unordered_set<address> tried_addresses;
421  wait_strategy_.reset();
422 
423  do {
424  std::unordered_set<address> tried_addresses_per_attempt;
425  auto member_list = client_.get_client_cluster_service().get_member_list();
426  if (shuffle_member_list_) {
427  shuffle(member_list);
428  }
429 
430  //try to connect to a member in the member list first
431  for (const auto &m : member_list) {
432  check_client_active();
433  tried_addresses_per_attempt.insert(m.get_address());
434  auto connection = try_connect(m);
435  if (connection) {
436  return true;
437  }
438  }
439  //try to connect to a member given via config(explicit config/discovery mechanisms)
440  for (const address &server_address : get_possible_member_addresses()) {
441  check_client_active();
442  if (!tried_addresses_per_attempt.insert(server_address).second) {
443  //if we can not add it means that it is already tried to be connected with the member list
444  continue;
445  }
446  auto connection = try_connect<address>(server_address);
447  if (connection) {
448  return true;
449  }
450  }
451  tried_addresses.insert(tried_addresses_per_attempt.begin(), tried_addresses_per_attempt.end());
452  // If the address provider loads no addresses, then the above loop is not entered
453  // and the lifecycle check is missing, hence we need to repeat the same check at this point.
454  check_client_active();
455  } while (wait_strategy_.sleep());
456 
457  std::ostringstream out;
458  out << "Unable to connect to any address! The following addresses were tried: { ";
459  for (const auto &address : tried_addresses) {
460  out << address << " , ";
461  }
462  out << "}";
463  BOOST_THROW_EXCEPTION(
464  exception::illegal_state("ConnectionManager::do_connect_to_cluster", out.str()));
465  }
466 
467  std::vector<address> ClientConnectionManagerImpl::get_possible_member_addresses() {
468  std::vector<address> addresses;
469  for (auto &&member : client_.get_client_cluster_service().get_member_list()) {
470  addresses.emplace_back(std::move(member.get_address()));
471  }
472 
473  if (shuffle_member_list_) {
474  shuffle(addresses);
475  }
476 
477  std::vector<address> provided_addresses = address_provider_->load_addresses();
478 
479  if (shuffle_member_list_) {
480  shuffle(provided_addresses);
481  }
482 
483  addresses.insert(addresses.end(), provided_addresses.begin(), provided_addresses.end());
484 
485  return addresses;
486  }
487 
488  void ClientConnectionManagerImpl::connect_to_cluster() {
489  if (async_start_) {
490  submit_connect_to_cluster_task();
491  } else {
492  do_connect_to_cluster();
493  }
494  }
495 
496  bool ClientConnectionManagerImpl::is_alive() {
497  return alive_;
498  }
499 
500  void ClientConnectionManagerImpl::on_connection_close(const std::shared_ptr<Connection> &connection) {
501  auto endpoint = connection->get_remote_address();
502  auto member_uuid = connection->get_remote_uuid();
503 
504  auto socket_remote_address = connection->get_socket().get_remote_endpoint();
505 
506  if (!endpoint) {
507  HZ_LOG(logger_, finest,
508  boost::str(boost::format("Destroying %1% , but it has end-point set to null "
509  "-> not removing it from a connection map")
510  % *connection)
511  );
512  return;
513  }
514 
515  std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
516  if (active_connections_.remove(member_uuid, connection)) {
517  active_connection_ids_.remove(connection->get_connection_id());
518 
519  HZ_LOG(logger_, info,
520  boost::str(boost::format("Removed connection to endpoint: %1%, connection: %2%")
521  % *endpoint % *connection)
522  );
523 
524  if (active_connections_.empty()) {
525  fire_life_cycle_event(lifecycle_event::lifecycle_state::CLIENT_DISCONNECTED);
526 
527  trigger_cluster_reconnection();
528  }
529 
530  fire_connection_removed_event(connection);
531  } else {
532  HZ_LOG(logger_, finest,
533  boost::str(boost::format("Destroying a connection, but there is no mapping "
534  "%1% -> %2% in the connection map.")
535  % endpoint % *connection)
536  );
537  }
538  }
539 
540  void
541  ClientConnectionManagerImpl::add_connection_listener(
542  const std::shared_ptr<ConnectionListener> &connection_listener) {
543  connection_listeners_.add(connection_listener);
544  }
545 
546  ClientConnectionManagerImpl::~ClientConnectionManagerImpl() {
547  shutdown();
548  }
549 
550  logger &ClientConnectionManagerImpl::get_logger() {
551  return client_.get_logger();
552  }
553 
554  void ClientConnectionManagerImpl::check_client_active() {
555  if (!client_.get_lifecycle_service().is_running()) {
556  BOOST_THROW_EXCEPTION(exception::hazelcast_client_not_active(
557  "ClientConnectionManagerImpl::check_client_active", "Client is shutdown"));
558  }
559  }
560 
561  std::shared_ptr<Connection>
562  ClientConnectionManagerImpl::on_authenticated(const std::shared_ptr<Connection> &connection,
563  auth_response &response) {
564  {
565  std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
566  check_partition_count(response.partition_count);
567  connection->set_connected_server_version(response.server_version);
568  connection->set_remote_address(response.server_address);
569  connection->set_remote_uuid(response.member_uuid);
570 
571  auto existing_connection = active_connections_.get(response.member_uuid);
572  if (existing_connection) {
573  connection->close((boost::format("Duplicate connection to same member with uuid : %1%") %
574  boost::uuids::to_string(response.member_uuid)).str());
575  return existing_connection;
576  }
577 
578  auto new_cluster_id = response.cluster_id;
579  boost::uuids::uuid current_cluster_id = cluster_id_;
580 
581  HZ_LOG(logger_, finest,
582  boost::str(boost::format("Checking the cluster: %1%, current cluster: %2%")
583  % new_cluster_id % current_cluster_id)
584  );
585 
586  auto cluster_id_changed = !current_cluster_id.is_nil() && !(new_cluster_id == current_cluster_id);
587  if (cluster_id_changed) {
588  HZ_LOG(logger_, warning,
589  boost::str(boost::format("Switching from current cluster: %1% to new cluster: %2%")
590  % current_cluster_id % new_cluster_id)
591  );
592  client_.get_hazelcast_client_implementation()->on_cluster_restart();
593  }
594 
595  auto connections_empty = active_connections_.empty();
596  active_connection_ids_.put(connection->get_connection_id(), connection);
597  active_connections_.put(response.member_uuid, connection);
598  if (connections_empty) {
599  cluster_id_ = new_cluster_id;
600  fire_life_cycle_event(lifecycle_event::lifecycle_state::CLIENT_CONNECTED);
601  }
602 
603  auto local_address = connection->get_local_socket_address();
604  if (local_address) {
605  HZ_LOG(logger_, info,
606  boost::str(boost::format("Authenticated with server %1%:%2%, server version: %3%, "
607  "local address: %4%. %5%")
608  % response.server_address % response.member_uuid
609  % response.server_version % *local_address % *connection)
610  );
611  } else {
612  HZ_LOG(logger_, info,
613  boost::str(boost::format("Authenticated with server %1%:%2%, server version: %3%, "
614  "no local address: (connection disconnected ?). %5%")
615  % response.server_address % response.member_uuid
616  % response.server_version % *connection)
617  );
618  }
619 
620  fire_connection_added_event(connection);
621  }
622 
623  // It could happen that this connection is already closed and
624  // on_connection_close() is called even before the synchronized block
625  // above is executed. In this case, now we have a closed but registered
626  // connection. We do a final check here to remove this connection
627  // if needed.
628  if (!connection->is_alive()) {
629  on_connection_close(connection);
630  return nullptr;
631  }
632 
633  // If the client is shutdown in parallel, we need to close this new connection.
634  if (!client_.get_lifecycle_service().is_running()) {
635  connection->close("Client is shutdown");
636  }
637 
638  return connection;
639  }
640 
641  void ClientConnectionManagerImpl::fire_life_cycle_event(lifecycle_event::lifecycle_state state) {
642  client_.get_lifecycle_service().fire_lifecycle_event(state);
643  }
644 
645  void ClientConnectionManagerImpl::check_partition_count(int32_t new_partition_count) {
646  auto &partition_service = static_cast<spi::impl::ClientPartitionServiceImpl &>(client_.get_partition_service());
647  if (!partition_service.check_and_set_partition_count(new_partition_count)) {
648  BOOST_THROW_EXCEPTION(exception::client_not_allowed_in_cluster("ClientConnectionManagerImpl::check_partition_count",
649  (boost::format("Client can not work with this cluster because it has a different partition count. "
650  "Expected partition count: %1%, Member partition count: %2%")
651  %partition_service.get_partition_count() %new_partition_count).str()));
652  }
653  }
654 
655  void ClientConnectionManagerImpl::trigger_cluster_reconnection() {
656  if (reconnect_mode_ == config::client_connection_strategy_config::reconnect_mode::OFF) {
657  HZ_LOG(logger_, info, "RECONNECT MODE is off. Shutting down the client.");
658  shutdown_with_external_thread(client_.get_hazelcast_client_implementation());
659  return;
660  }
661 
662  if (client_.get_lifecycle_service().is_running()) {
663  submit_connect_to_cluster_task();
664  }
665  }
666 
667  std::shared_ptr<Connection> ClientConnectionManagerImpl::get_random_connection() {
668  if (smart_routing_enabled_) {
669  auto member = load_balancer_.next_(client_.get_cluster());
670  if (!member) {
671  return nullptr;
672  }
673  auto connection = get_connection(member->get_uuid());
674  if (connection) {
675  return connection;
676  }
677  }
678 
679  auto connections = active_connections_.values();
680  if (connections.empty()) {
681  return nullptr;
682  }
683 
684  return connections[0];
685  }
686 
687  boost::uuids::uuid ClientConnectionManagerImpl::get_client_uuid() const {
688  return client_uuid_;
689  }
690 
691  void ClientConnectionManagerImpl::check_invocation_allowed() {
692  if (active_connections_.size() > 0) {
693  return;
694  }
695 
696  if (async_start_) {
697  BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
698  "ClientConnectionManagerImpl::check_invocation_allowed",
699  "No connection found to cluster and async start is configured."));
700  } else if (reconnect_mode_ == config::client_connection_strategy_config::reconnect_mode::ASYNC) {
701  BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
702  "ClientConnectionManagerImpl::check_invocation_allowed",
703  "No connection found to cluster and reconnect mode is async."));
704  } else {
705  BOOST_THROW_EXCEPTION(
706  exception::io("ClientConnectionManagerImpl::check_invocation_allowed",
707  "No connection found to cluster."));
708  }
709  }
710 
711  void ClientConnectionManagerImpl::connect_to_all_cluster_members() {
712  if (!smart_routing_enabled_) {
713  return;
714  }
715 
716  for (const auto &member : client_.get_client_cluster_service().get_member_list()) {
717  try {
718  get_or_connect(member.get_address());
719  } catch (std::exception &) {
720  // ignore
721  }
722  }
723  }
724 
725  void ClientConnectionManagerImpl::notify_backup(int64_t call_id) {
726  struct correlation_id {
727  int32_t connnection_id;
728  int32_t call_id;
729  };
730  union {
731  int64_t id;
732  correlation_id composed_id;
733  } c_id_union;
734  c_id_union.id = call_id;
735  auto connection_id = c_id_union.composed_id.connnection_id;
736  auto connection = active_connection_ids_.get(connection_id);
737  if (!connection) {
738  return;
739  }
740  boost::asio::post(connection->get_socket().get_executor(), [=]() {
741  auto invocation_it = connection->invocations.find(call_id);
742  if (invocation_it != connection->invocations.end()) {
743  invocation_it->second->notify_backup();
744  }
745  });
746  }
747 
748  std::shared_ptr<Connection> ClientConnectionManagerImpl::connect(const address &addr) {
749  auto target = address_provider_->translate(addr);
750  if (!target) {
751  BOOST_THROW_EXCEPTION(exception::null_pointer("ClientConnectionManagerImpl::connect",
752  (boost::format("Address Provider could not translate "
753  "address %2%") % target).str()));
754  }
755  HZ_LOG(logger_, info, boost::str(
756  boost::format("Trying to connect to %1%. Translated address:%2%.") % addr % target));
757 
758  auto connection = std::make_shared<Connection>(*target, client_, ++connection_id_gen_,
759  *socket_factory_, *this, connection_timeout_millis_);
760  connection->connect();
761 
762  // call the interceptor from user thread
763  socket_interceptor_.connect_(connection->get_socket());
764 
765  auto result = authenticate_on_cluster(connection);
766 
767  return on_authenticated(connection, result);
768  }
769 
770  ReadHandler::ReadHandler(Connection &connection, size_t buffer_size)
771  : buffer(new char[buffer_size]), byte_buffer(buffer, buffer_size), builder_(connection),
772  last_read_time_duration_(std::chrono::duration_cast<std::chrono::milliseconds>(
773  std::chrono::steady_clock::now().time_since_epoch())) {
774  }
775 
776  ReadHandler::~ReadHandler() {
777  delete[] buffer;
778  }
779 
780  void ReadHandler::handle() {
781  last_read_time_duration_ = std::chrono::duration_cast<std::chrono::milliseconds>(
782  std::chrono::steady_clock::now().time_since_epoch());
783 
784  if (byte_buffer.position() == 0)
785  return;
786 
787  byte_buffer.flip();
788 
789  // it is important to check the ondata return value since there may be left data less than a message
790  // header size, and this may cause an infinite loop.
791  while (byte_buffer.has_remaining() && builder_.on_data(byte_buffer)) {
792  }
793 
794  if (byte_buffer.has_remaining()) {
795  byte_buffer.compact();
796  } else {
797  byte_buffer.clear();
798  }
799  }
800 
801  std::chrono::steady_clock::time_point ReadHandler::get_last_read_time() const {
802  return std::chrono::steady_clock::time_point(
803  std::chrono::duration_cast<std::chrono::steady_clock::duration>(last_read_time_duration_.load()));
804  }
805 
806  Connection::Connection(const address &address, spi::ClientContext &client_context, int connection_id, // NOLINT(cppcoreguidelines-pro-type-member-init)
807  internal::socket::SocketFactory &socket_factory,
808  ClientConnectionManagerImpl &client_connection_manager,
809  std::chrono::milliseconds &connect_timeout_in_millis)
810  : read_handler(*this, 16 << 10),
811  start_time_(std::chrono::system_clock::now()),
812  closed_time_duration_(),
813  client_context_(client_context),
814  invocation_service_(client_context.get_invocation_service()),
815  connection_id_(connection_id),
816  remote_uuid_(boost::uuids::nil_uuid()), logger_(client_context.get_logger()), alive_(true) {
817  socket_ = socket_factory.create(address, connect_timeout_in_millis);
818  }
819 
820  Connection::~Connection() = default;
821 
822  void Connection::connect() {
823  socket_->connect(shared_from_this());
824  backup_timer_.reset(new boost::asio::steady_timer(socket_->get_executor().context()));
825  auto backupTimeout = static_cast<spi::impl::ClientInvocationServiceImpl &>(invocation_service_).get_backup_timeout();
826  auto this_connection = shared_from_this();
827  schedule_periodic_backup_cleanup(backupTimeout, this_connection);
828  }
829 
830  void Connection::schedule_periodic_backup_cleanup(std::chrono::milliseconds backup_timeout,
831  std::shared_ptr<Connection> this_connection) {
832  backup_timer_->expires_from_now(backup_timeout);
833  backup_timer_->async_wait(socket_->get_executor().wrap([=](boost::system::error_code ec) {
834  if (ec) {
835  return;
836  }
837  for (const auto &it : this_connection->invocations) {
838  it.second->detect_and_handle_backup_timeout(backup_timeout);
839  }
840 
841  schedule_periodic_backup_cleanup(backup_timeout, this_connection);
842  }));
843  }
844 
845  void Connection::close() {
846  close("");
847  }
848 
849  void Connection::close(const std::string &reason) {
850  close(reason, nullptr);
851  }
852 
853  void Connection::close(const std::string &reason, std::exception_ptr cause) {
854  bool expected = true;
855  if (!alive_.compare_exchange_strong(expected, false)) {
856  return;
857  }
858 
859  closed_time_duration_.store(std::chrono::duration_cast<std::chrono::milliseconds>(
860  std::chrono::steady_clock::now().time_since_epoch()));
861 
862  if (backup_timer_) {
863  boost::system::error_code ignored;
864  backup_timer_->cancel(ignored);
865  }
866 
867  close_cause_ = cause;
868  close_reason_ = reason;
869 
870  log_close();
871 
872  try {
873  inner_close();
874  } catch (exception::iexception &e) {
875  HZ_LOG(client_context_.get_logger(), warning,
876  boost::str(boost::format("Exception while closing connection %1%")
877  % e.get_message())
878  );
879  }
880 
881  auto thisConnection = shared_from_this();
882  client_context_.get_connection_manager().on_connection_close(thisConnection);
883 
884  boost::asio::post(socket_->get_executor(), [=]() {
885  for (auto &invocationEntry : thisConnection->invocations) {
886  invocationEntry.second->notify_exception(
887  std::make_exception_ptr(boost::enable_current_exception(
888  exception::target_disconnected("Connection::close",
889  thisConnection->get_close_reason()))));
890  }
891  });
892  }
893 
894  void Connection::write(const std::shared_ptr<spi::impl::ClientInvocation> &client_invocation) {
895  socket_->async_write(shared_from_this(), client_invocation);
896  }
897 
898  const boost::optional<address> &Connection::get_remote_address() const {
899  return remote_address_;
900  }
901 
902  void Connection::set_remote_address(boost::optional<address> endpoint) {
903  this->remote_address_ = std::move(endpoint);
904  }
905 
906  void Connection::handle_client_message(const std::shared_ptr<protocol::ClientMessage> &message) {
907  auto correlationId = message->get_correlation_id();
908  auto invocationIterator = invocations.find(correlationId);
909  if (invocationIterator == invocations.end()) {
910  HZ_LOG(logger_, warning,
911  boost::str(boost::format("No invocation for callId: %1%. "
912  "Dropping this message: %2%")
913  % correlationId % *message)
914  );
915  return;
916  }
917  auto invocation = invocationIterator->second;
918  auto flags = message->get_header_flags();
919  if (message->is_flag_set(flags, protocol::ClientMessage::BACKUP_EVENT_FLAG)) {
920  message->rd_ptr(protocol::ClientMessage::EVENT_HEADER_LEN);
921  correlationId = message->get<int64_t>();
922  client_context_.get_connection_manager().notify_backup(correlationId);
923  } else if (message->is_flag_set(flags, protocol::ClientMessage::IS_EVENT_FLAG)) {
924  client_context_.get_client_listener_service().handle_client_message(invocation, message);
925  } else {
926  invocation_service_.handle_client_message(invocation, message);
927  }
928  }
929 
930  int32_t Connection::get_connection_id() const {
931  return connection_id_;
932  }
933 
934  bool Connection::is_alive() const {
935  return alive_;
936  }
937 
938  const std::string &Connection::get_close_reason() const {
939  return close_reason_;
940  }
941 
942  void Connection::log_close() {
943  std::ostringstream message;
944  message << *this << " closed. Reason: ";
945  if (!close_reason_.empty()) {
946  message << close_reason_;
947  } else if (close_cause_) {
948  try {
949  std::rethrow_exception(close_cause_);
950  } catch (exception::iexception &ie) {
951  message << ie.get_source() << "[" + ie.get_message() << "]";
952  }
953  } else {
954  message << "Socket explicitly closed";
955  }
956 
957  if (client_context_.get_lifecycle_service().is_running()) {
958  if (!close_cause_) {
959  HZ_LOG(logger_, info, message.str());
960  } else {
961  try {
962  std::rethrow_exception(close_cause_);
963  } catch (exception::iexception &ie) {
964  HZ_LOG(logger_, warning,
965  boost::str(boost::format("%1%%2%") % message.str() % ie)
966  );
967  }
968  }
969  } else {
970  HZ_LOG(logger_, finest,
971  message.str() +
972  [this]() -> std::string {
973  if (close_cause_) {
974  try {
975  std::rethrow_exception(close_cause_);
976  } catch (exception::iexception &ie) {
977  return ie.what();
978  }
979  }
980  return "";
981  }()
982  );
983  }
984  }
985 
986  bool Connection::operator==(const Connection &rhs) const {
987  return connection_id_ == rhs.connection_id_;
988  }
989 
990  bool Connection::operator!=(const Connection &rhs) const {
991  return !(rhs == *this);
992  }
993 
994  const std::string &Connection::get_connected_server_version_string() const {
995  return connected_server_version_string_;
996  }
997 
998  void Connection::set_connected_server_version(const std::string &connected_server) {
999  Connection::connected_server_version_string_ = connected_server;
1000  }
1001 
1002  boost::optional<address> Connection::get_local_socket_address() const {
1003  return socket_->local_socket_address();
1004  }
1005 
1006  std::chrono::steady_clock::time_point Connection::last_read_time() const {
1007  return read_handler.get_last_read_time();
1008  }
1009 
1010  void Connection::inner_close() {
1011  if (!socket_) {
1012  return;
1013  }
1014 
1015  auto thisConnection = shared_from_this();
1016  boost::asio::post(socket_->get_executor(), [=] () { thisConnection->socket_->close(); });
1017  }
1018 
1019  std::ostream &operator<<(std::ostream &os, const Connection &connection) {
1020  os << "ClientConnection{"
1021  << "alive=" << connection.is_alive()
1022  << ", connectionId=" << connection.get_connection_id()
1023  << ", remoteEndpoint=";
1024  if (connection.get_remote_address()) {
1025  os << *connection.get_remote_address();
1026  } else {
1027  os << "null";
1028  }
1029  os << ", lastReadTime=" << util::StringUtil::time_to_string(connection.last_read_time())
1030  << ", closedTime=" << util::StringUtil::time_to_string(std::chrono::steady_clock::time_point(
1031  std::chrono::duration_cast<std::chrono::steady_clock::duration>(connection.closed_time_duration_.load())))
1032  << ", connected server version=" << connection.connected_server_version_string_
1033  << '}';
1034 
1035  return os;
1036  }
1037 
1038  bool Connection::operator<(const Connection &rhs) const {
1039  return connection_id_ < rhs.connection_id_;
1040  }
1041 
1042  std::chrono::system_clock::time_point Connection::get_start_time() const {
1043  return start_time_;
1044  }
1045 
1046  socket &Connection::get_socket() {
1047  return *socket_;
1048  }
1049 
1050  void Connection::deregister_invocation(int64_t call_id) {
1051  invocations.erase(call_id);
1052  }
1053 
1054  boost::uuids::uuid Connection::get_remote_uuid() const {
1055  return remote_uuid_;
1056  }
1057 
1058  void Connection::set_remote_uuid(boost::uuids::uuid remote_uuid) {
1059  remote_uuid_ = remote_uuid;
1060  }
1061 
1062  HeartbeatManager::HeartbeatManager(spi::ClientContext &client,
1063  ClientConnectionManagerImpl &connection_manager)
1064  : client_(client), client_connection_manager_(connection_manager), logger_(client.get_logger()) {
1065  client_properties &clientProperties = client.get_client_properties();
1066  auto timeout_millis = clientProperties.get_long(clientProperties.get_heartbeat_timeout());
1067  heartbeat_timeout_ = std::chrono::milliseconds(
1068  timeout_millis > 0 ? timeout_millis : util::IOUtil::to_value<int64_t>(
1069  client_properties::PROP_HEARTBEAT_TIMEOUT_DEFAULT));
1070 
1071  auto interval_millis = clientProperties.get_long(clientProperties.get_heartbeat_interval());
1072  heartbeat_interval_ = std::chrono::milliseconds(interval_millis > 0 ? interval_millis
1073  : util::IOUtil::to_value<int64_t>(client_properties::PROP_HEARTBEAT_INTERVAL_DEFAULT));
1074  }
1075 
1076  void HeartbeatManager::start() {
1077  spi::impl::ClientExecutionServiceImpl &clientExecutionService = client_.get_client_execution_service();
1078 
1079  timer_ = clientExecutionService.schedule_with_repetition([=]() {
1080  if (!client_connection_manager_.is_alive()) {
1081  return;
1082  }
1083 
1084  for (auto &connection : client_connection_manager_.get_active_connections()) {
1085  check_connection(connection);
1086  }
1087  }, heartbeat_interval_, heartbeat_interval_);
1088  }
1089 
1090  void HeartbeatManager::check_connection(const std::shared_ptr<Connection> &connection) {
1091  if (!connection->is_alive()) {
1092  return;
1093  }
1094 
1095  auto now = std::chrono::steady_clock::now();
1096  if (now - connection->last_read_time() > heartbeat_timeout_) {
1097  HZ_LOG(logger_, warning,
1098  boost::str(boost::format("Heartbeat failed over the connection: %1%") % *connection)
1099  );
1100  on_heartbeat_stopped(connection, "Heartbeat timed out");
1101  return;
1102  }
1103 
1104  if (now - connection->last_read_time() > heartbeat_interval_) {
1105  auto request = protocol::codec::client_ping_encode();
1106  std::shared_ptr<spi::impl::ClientInvocation> clientInvocation = spi::impl::ClientInvocation::create(
1107  client_, request, "", connection);
1108  clientInvocation->invoke_urgent();
1109  }
1110  }
1111 
1112  void
1113  HeartbeatManager::on_heartbeat_stopped(const std::shared_ptr<Connection> &connection,
1114  const std::string &reason) {
1115  connection->close(reason, std::make_exception_ptr(
1116  (exception::exception_builder<exception::target_disconnected>(
1117  "HeartbeatManager::onHeartbeatStopped") << "Heartbeat timed out to connection "
1118  << *connection).build()));
1119  }
1120 
1121  void HeartbeatManager::shutdown() {
1122  if (timer_) {
1123  boost::system::error_code ignored;
1124  timer_->cancel(ignored);
1125  }
1126  }
1127 
1128  std::chrono::milliseconds HeartbeatManager::get_heartbeat_timeout() const {
1129  return heartbeat_timeout_;
1130  }
1131 
1132  void wait_strategy::reset() {
1133  attempt_ = 0;
1134  cluster_connect_attempt_begin_ = std::chrono::steady_clock::now();
1135  current_backoff_millis_ = (std::min)(max_backoff_millis_, initial_backoff_millis_);
1136  }
1137 
1138  wait_strategy::wait_strategy(const config::connection_retry_config &retry_config, logger &log)
1139  : initial_backoff_millis_(retry_config.get_initial_backoff_duration()),
1140  max_backoff_millis_(retry_config.get_max_backoff_duration()),
1141  multiplier_(retry_config.get_multiplier()), jitter_(retry_config.get_jitter()), logger_(log),
1142  cluster_connect_timeout_millis_(retry_config.get_cluster_connect_timeout()) {
1143  if (cluster_connect_timeout_millis_ == std::chrono::milliseconds::max()) {
1144  cluster_connect_timeout_text_ = "INFINITE";
1145  } else {
1146  cluster_connect_timeout_text_ = (boost::format("%1% msecs")
1147  %cluster_connect_timeout_millis_.count()).str();
1148  }
1149  }
1150 
1151  bool wait_strategy::sleep() {
1152  attempt_++;
1153  using namespace std::chrono;
1154  auto current_time = steady_clock::now();
1155  auto time_passed = duration_cast<milliseconds>(current_time - cluster_connect_attempt_begin_);
1156  if (time_passed > cluster_connect_timeout_millis_) {
1157  HZ_LOG(logger_, warning, (boost::format(
1158  "Unable to get live cluster connection, cluster connect timeout (%1%) is reached. Attempt %2%.")
1159  %cluster_connect_timeout_text_ %attempt_).str());
1160  return false;
1161  }
1162 
1163  //sleep time: current_backoff_millis_(1 +- (jitter * [0, 1]))
1164  auto actual_sleep_time = current_backoff_millis_ + milliseconds(
1165  static_cast<milliseconds::rep>(current_backoff_millis_.count() * jitter_ *
1166  (2.0 * random_(random_generator_) - 1.0)));
1167 
1168  actual_sleep_time = (std::min)(actual_sleep_time, cluster_connect_timeout_millis_ - time_passed);
1169 
1170  HZ_LOG(logger_, warning, (boost::format(
1171  "Unable to get live cluster connection, retry in %1% ms, attempt: %2% , cluster connect timeout: %3% , max backoff millis: %4%") %
1172  actual_sleep_time.count() % attempt_ %
1173  cluster_connect_timeout_text_ %
1174  max_backoff_millis_.count()).str());
1175 
1176  std::this_thread::sleep_for(actual_sleep_time);
1177 
1178  current_backoff_millis_ = (std::min)(
1179  milliseconds(static_cast<milliseconds::rep>(current_backoff_millis_.count() * multiplier_)),
1180  max_backoff_millis_);
1181  return true;
1182  }
1183  }
1184 
1185  namespace internal {
1186  namespace socket {
1187  SocketFactory::SocketFactory(spi::ClientContext &client_context, boost::asio::io_context &io,
1188  boost::asio::ip::tcp::resolver &resolver)
1189  : client_context_(client_context), io_(io), io_resolver_(resolver) {
1190  }
1191 
1192  bool SocketFactory::start() {
1193 #ifdef HZ_BUILD_WITH_SSL
1194  auto &sslConfig = client_context_.get_client_config().get_network_config().get_ssl_config();
1195  if (sslConfig.is_enabled()) {
1196  if (sslConfig.ssl_context_) {
1197  ssl_context_ = sslConfig.ssl_context_;
1198  } else {
1199  ssl_context_ = std::make_shared<boost::asio::ssl::context>(
1200  (boost::asio::ssl::context_base::method) sslConfig.get_protocol());
1201 
1202  ssl_context_->set_verify_mode(boost::asio::ssl::verify_peer);
1203  ssl_context_->set_default_verify_paths();
1204 
1205  const std::vector<std::string> &verifyFiles = sslConfig.get_verify_files();
1206  bool success = true;
1207  logger &lg = client_context_.get_logger();
1208  for (const auto &f : verifyFiles) {
1209  boost::system::error_code ec;
1210  ssl_context_->load_verify_file(f, ec);
1211  if (ec) {
1212  HZ_LOG(lg, warning,
1213  boost::str(boost::format("SocketFactory::start: Failed to load CA "
1214  "verify file at %1% %2%")
1215  % f % ec.message())
1216  );
1217  success = false;
1218  }
1219  }
1220 
1221  if (!success) {
1222  ssl_context_.reset();
1223  HZ_LOG(lg, warning,
1224  "SocketFactory::start: Failed to load one or more "
1225  "configured CA verify files (PEM files). Please "
1226  "correct the files and retry."
1227  );
1228  return false;
1229  }
1230  }
1231 
1232  // set cipher list if the list is set
1233  const std::string &cipherList = sslConfig.get_cipher_list();
1234  if (!cipherList.empty()) {
1235  if (!SSL_CTX_set_cipher_list(ssl_context_->native_handle(), cipherList.c_str())) {
1236  logger &lg = client_context_.get_logger();
1237  HZ_LOG(lg, warning,
1238  std::string("SocketFactory::start: Could not load any "
1239  "of the ciphers in the config provided "
1240  "ciphers:") + cipherList
1241  );
1242  return false;
1243  }
1244  }
1245 
1246  }
1247 #else
1248  (void) client_context_;
1249 #endif
1250  return true;
1251  }
1252 
1253  std::unique_ptr<hazelcast::client::socket> SocketFactory::create(const address &address,
1254  std::chrono::milliseconds &connect_timeout_in_millis) {
1255 #ifdef HZ_BUILD_WITH_SSL
1256  if (ssl_context_.get()) {
1257  return std::unique_ptr<hazelcast::client::socket>(
1258  new internal::socket::SSLSocket(io_, *ssl_context_, address,
1259  client_context_.get_client_config().get_network_config().get_socket_options(),
1260  connect_timeout_in_millis, io_resolver_));
1261  }
1262 #endif
1263 
1264  return std::unique_ptr<hazelcast::client::socket>(new internal::socket::TcpSocket(io_, address,
1265  client_context_.get_client_config().get_network_config().get_socket_options(),
1266  connect_timeout_in_millis,
1267  io_resolver_));
1268  }
1269 
1270 #ifdef HZ_BUILD_WITH_SSL
1271 
1272  SSLSocket::SSLSocket(boost::asio::io_context &io_service, boost::asio::ssl::context &ssl_context,
1273  const client::address &address, client::config::socket_options &socket_options,
1274  std::chrono::milliseconds &connect_timeout_in_millis,
1275  boost::asio::ip::tcp::resolver &resolver)
1276  : BaseSocket<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>(resolver, address,
1277  socket_options, io_service,connect_timeout_in_millis, ssl_context) {
1278  }
1279 
1280  std::vector<SSLSocket::CipherInfo> SSLSocket::get_ciphers() {
1281  STACK_OF(SSL_CIPHER) *ciphers = SSL_get_ciphers(socket_.native_handle());
1282  std::vector<CipherInfo> supportedCiphers;
1283  for (int i = 0; i < sk_SSL_CIPHER_num(ciphers); ++i) {
1284  struct SSLSocket::CipherInfo info;
1285  const SSL_CIPHER *cipher = sk_SSL_CIPHER_value(ciphers, i);
1286  info.name = SSL_CIPHER_get_name(cipher);
1287  info.number_of_bits = SSL_CIPHER_get_bits(cipher, 0);
1288  info.version = SSL_CIPHER_get_version(cipher);
1289  char descBuf[256];
1290  info.description = SSL_CIPHER_description(cipher, descBuf, 256);
1291  supportedCiphers.push_back(info);
1292  }
1293  return supportedCiphers;
1294  }
1295 
1296  void SSLSocket::post_connect() {
1297  socket_.handshake(boost::asio::ssl::stream_base::client);
1298  }
1299 
1300  std::ostream &operator<<(std::ostream &out, const SSLSocket::CipherInfo &info) {
1301  out << "Cipher{"
1302  "Name: " << info.name <<
1303  ", Bits:" << info.number_of_bits <<
1304  ", Version:" << info.version <<
1305  ", Description:" << info.description << "}";
1306 
1307  return out;
1308  }
1309 
1310 #endif // HZ_BUILD_WITH_SSL
1311 
1312  TcpSocket::TcpSocket(boost::asio::io_context &io, const address &address,
1313  client::config::socket_options &socket_options,
1314  std::chrono::milliseconds &connect_timeout_in_millis,
1315  boost::asio::ip::tcp::resolver &resolver)
1316  : BaseSocket<boost::asio::ip::tcp::socket>(resolver, address, socket_options, io,
1317  connect_timeout_in_millis) {
1318  }
1319 
1320  }
1321  }
1322  }
1323 }
1324 
1325 namespace std {
1326  std::size_t hash<std::shared_ptr<hazelcast::client::connection::Connection>>::operator()(
1327  const std::shared_ptr<hazelcast::client::connection::Connection> &conn) const noexcept {
1328  if (!conn) {
1329  return 0;
1330  }
1331  return std::abs(conn->get_connection_id());
1332  }
1333 }
1334