Hazelcast C++ Client
Hazelcast C++ Client Library
spi.cpp
1 /*
2  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
19  *
20  * Licensed under the Apache License, Version 2.0 (the "License");
21  * you may not use this file except in compliance with the License.
22  * You may obtain a copy of the License at
23  *
24  * http://www.apache.org/licenses/LICENSE-2.0
25  *
26  * Unless required by applicable law or agreed to in writing, software
27  * distributed under the License is distributed on an "AS IS" BASIS,
28  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29  * See the License for the specific language governing permissions and
30  * limitations under the License.
31  */
32 #include <utility>
33 
34 #include <boost/uuid/uuid_hash.hpp>
35 #include <boost/functional/hash.hpp>
36 #include <boost/property_tree/ptree.hpp>
37 #include <boost/property_tree/json_parser.hpp>
38 
39 #include "hazelcast/client/hazelcast_client.h"
40 #include <hazelcast/client/protocol/codec/ErrorCodec.h>
41 #include <hazelcast/client/spi/impl/ListenerMessageCodec.h>
42 #include <hazelcast/client/spi/impl/ClientClusterServiceImpl.h>
43 #include <hazelcast/client/spi/impl/listener/cluster_view_listener.h>
44 #include <hazelcast/client/spi/impl/listener/listener_service_impl.h>
45 #include <hazelcast/client/spi/impl/discovery/remote_address_provider.h>
46 #include <hazelcast/client/spi/impl/discovery/cloud_discovery.h>
47 #include <hazelcast/util/AddressUtil.h>
48 #include "hazelcast/client/member_selectors.h"
49 #include "hazelcast/client/lifecycle_event.h"
50 #include "hazelcast/client/initial_membership_event.h"
51 #include "hazelcast/client/membership_event.h"
52 #include "hazelcast/client/lifecycle_listener.h"
53 #include "hazelcast/client/spi/ProxyManager.h"
54 #include "hazelcast/client/spi/ClientProxy.h"
55 #include "hazelcast/client/spi/ClientContext.h"
56 #include "hazelcast/client/spi/impl/ClientInvocation.h"
57 #include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
58 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
59 #include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
60 #include "hazelcast/client/spi/impl/DefaultAddressProvider.h"
61 #include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithBackpressure.h"
62 #include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithoutBackpressure.h"
63 #include "hazelcast/client/spi/impl/sequence/FailFastCallIdSequence.h"
64 #include "hazelcast/client/spi/impl/sequence/CallIdFactory.h"
65 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
66 #include "hazelcast/util/AddressHelper.h"
67 #include "hazelcast/util/HashUtil.h"
68 #include "hazelcast/util/concurrent/BackoffIdleStrategy.h"
69 #ifdef HZ_BUILD_WITH_SSL
70 #include <hazelcast/util/SyncHttpsClient.h>
71 #endif //HZ_BUILD_WITH_SSL
72 
73 namespace hazelcast {
74  namespace client {
75  const std::unordered_set<member> &initial_membership_event::get_members() const {
76  return members_;
77  }
78 
80  return cluster_;
81  }
82 
83  initial_membership_event::initial_membership_event(cluster &cluster, std::unordered_set<member> members) : cluster_(
84  cluster), members_(std::move(members)) {
85  }
86 
88  : state_(state) {
89  }
90 
92  return state_;
93  }
94 
95  namespace spi {
96  ProxyManager::ProxyManager(ClientContext &context) : client_(context) {
97  }
98 
99  void ProxyManager::init() {
100  }
101 
102  void ProxyManager::destroy() {
103  std::lock_guard<std::recursive_mutex> guard(lock_);
104  for (auto &p : proxies_) {
105  try {
106  auto proxy = p.second.get();
107  p.second.get()->on_shutdown();
108  } catch (std::exception &se) {
109  auto &lg = client_.get_logger();
110  HZ_LOG(lg, finest,
111  boost::str(boost::format("Proxy was not created, "
112  "hence onShutdown can be called. Exception: %1%")
113  % se.what())
114  );
115  }
116  }
117  proxies_.clear();
118  }
119 
120  boost::future<void> ProxyManager::initialize(const std::shared_ptr<ClientProxy> &client_proxy) {
121  auto clientMessage = protocol::codec::client_createproxy_encode(client_proxy->get_name(),
122  client_proxy->get_service_name());
123  return spi::impl::ClientInvocation::create(client_, clientMessage,
124  client_proxy->get_service_name())->invoke().then(
125  boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
126  f.get();
127  client_proxy->on_initialize();
128  });
129  }
130 
131  boost::future<void> ProxyManager::destroy_proxy(ClientProxy &proxy) {
132  DefaultObjectNamespace objectNamespace(proxy.get_service_name(), proxy.get_name());
133  std::shared_ptr<ClientProxy> registeredProxy;
134  {
135  std::lock_guard<std::recursive_mutex> guard(lock_);
136  auto it = proxies_.find(objectNamespace);
137  registeredProxy = it == proxies_.end() ? nullptr : it->second.get();
138  if (it != proxies_.end()) {
139  proxies_.erase(it);
140  }
141  }
142 
143  try {
144  if (registeredProxy) {
145  try {
146  proxy.destroy_locally();
147  return proxy.destroy_remotely();
148  } catch (exception::iexception &) {
149  proxy.destroy_remotely();
150  throw;
151  }
152  }
153  if (&proxy != registeredProxy.get()) {
154  // The given proxy is stale and was already destroyed, but the caller
155  // may have allocated local resources in the context of this stale proxy
156  // instance after it was destroyed, so we have to cleanup it locally one
157  // more time to make sure there are no leaking local resources.
158  proxy.destroy_locally();
159  }
160  } catch (...) {
161  if (&proxy != registeredProxy.get()) {
162  // The given proxy is stale and was already destroyed, but the caller
163  // may have allocated local resources in the context of this stale proxy
164  // instance after it was destroyed, so we have to cleanup it locally one
165  // more time to make sure there are no leaking local resources.
166  proxy.destroy_locally();
167  }
168  throw;
169  }
170  return boost::make_ready_future();
171  }
172 
173  ClientContext::ClientContext(const client::hazelcast_client &hazelcast_client) : hazelcast_client_(
174  *hazelcast_client.client_impl_) {
175  }
176 
177  ClientContext::ClientContext(client::impl::hazelcast_client_instance_impl &hazelcast_client)
178  : hazelcast_client_(hazelcast_client) {
179  }
180 
181  serialization::pimpl::SerializationService &ClientContext::get_serialization_service() {
182  return hazelcast_client_.serialization_service_;
183  }
184 
185  impl::ClientClusterServiceImpl & ClientContext::get_client_cluster_service() {
186  return hazelcast_client_.cluster_service_;
187  }
188 
189  impl::ClientInvocationServiceImpl &ClientContext::get_invocation_service() {
190  return *hazelcast_client_.invocation_service_;
191  }
192 
193  client_config &ClientContext::get_client_config() {
194  return hazelcast_client_.client_config_;
195  }
196 
197  impl::ClientPartitionServiceImpl & ClientContext::get_partition_service() {
198  return *hazelcast_client_.partition_service_;
199  }
200 
201  lifecycle_service &ClientContext::get_lifecycle_service() {
202  return hazelcast_client_.lifecycle_service_;
203  }
204 
205  spi::impl::listener::listener_service_impl &ClientContext::get_client_listener_service() {
206  return *hazelcast_client_.listener_service_;
207  }
208 
209  connection::ClientConnectionManagerImpl &ClientContext::get_connection_manager() {
210  return *hazelcast_client_.connection_manager_;
211  }
212 
213  internal::nearcache::NearCacheManager &ClientContext::get_near_cache_manager() {
214  return *hazelcast_client_.near_cache_manager_;
215  }
216 
217  client_properties &ClientContext::get_client_properties() {
218  return hazelcast_client_.client_properties_;
219  }
220 
221  cluster &ClientContext::get_cluster() {
222  return hazelcast_client_.cluster_;
223  }
224 
225  std::shared_ptr<impl::sequence::CallIdSequence> &ClientContext::get_call_id_sequence() const {
226  return hazelcast_client_.call_id_sequence_;
227  }
228 
229  const protocol::ClientExceptionFactory &ClientContext::get_client_exception_factory() const {
230  return hazelcast_client_.get_exception_factory();
231  }
232 
233  const std::string &ClientContext::get_name() const {
234  return hazelcast_client_.get_name();
235  }
236 
237  impl::ClientExecutionServiceImpl &ClientContext::get_client_execution_service() const {
238  return *hazelcast_client_.execution_service_;
239  }
240 
241  const std::shared_ptr<client::impl::ClientLockReferenceIdGenerator> &
242  ClientContext::get_lock_reference_id_generator() {
243  return hazelcast_client_.get_lock_reference_id_generator();
244  }
245 
246  std::shared_ptr<client::impl::hazelcast_client_instance_impl>
247  ClientContext::get_hazelcast_client_implementation() {
248  return hazelcast_client_.shared_from_this();
249  }
250 
251  spi::ProxyManager &ClientContext::get_proxy_manager() {
252  return hazelcast_client_.get_proxy_manager();
253  }
254 
255  logger &ClientContext::get_logger() {
256  return *hazelcast_client_.logger_;
257  }
258 
259  client::impl::statistics::Statistics &ClientContext::get_clientstatistics() {
260  return *hazelcast_client_.statistics_;
261  }
262 
263  spi::impl::listener::cluster_view_listener &ClientContext::get_cluster_view_listener() {
264  return *hazelcast_client_.cluster_listener_;
265  }
266 
267  boost::uuids::uuid ClientContext::random_uuid() {
268  return hazelcast_client_.random_uuid();
269  }
270 
271  cp::internal::session::proxy_session_manager &ClientContext::get_proxy_session_manager() {
272  return hazelcast_client_.proxy_session_manager_;
273  }
274 
275  lifecycle_service::lifecycle_service(ClientContext &client_context,
276  const std::vector<lifecycle_listener> &listeners) :
277  client_context_(client_context), listeners_(),
278  shutdown_completed_latch_(1) {
279  for (const auto &listener: listeners) {
280  add_listener(lifecycle_listener(listener));
281  }
282  }
283 
284  bool lifecycle_service::start() {
285  bool expected = false;
286  if (!active_.compare_exchange_strong(expected, true)) {
287  return false;
288  }
289 
290  fire_lifecycle_event(lifecycle_event::STARTED);
291 
292  client_context_.get_client_execution_service().start();
293 
294  client_context_.get_client_listener_service().start();
295 
296  client_context_.get_invocation_service().start();
297 
298  client_context_.get_client_cluster_service().start();
299 
300  client_context_.get_cluster_view_listener().start();
301 
302  if (!client_context_.get_connection_manager().start()) {
303  return false;
304  }
305 
306  auto &connectionStrategyConfig = client_context_.get_client_config().get_connection_strategy_config();
307  if (!connectionStrategyConfig.is_async_start()) {
308  // The client needs to open connections to all members before any services requiring internal listeners start
309  wait_for_initial_membership_event();
310  client_context_.get_connection_manager().connect_to_all_cluster_members();
311  }
312 
313  client_context_.get_invocation_service().add_backup_listener();
314 
315  client_context_.get_clientstatistics().start();
316 
317  return true;
318  }
319 
320  void lifecycle_service::shutdown() {
321  bool expected = true;
322  if (!active_.compare_exchange_strong(expected, false)) {
323  shutdown_completed_latch_.wait();
324  return;
325  }
326  try {
327  fire_lifecycle_event(lifecycle_event::SHUTTING_DOWN);
328  client_context_.get_proxy_session_manager().shutdown();
329  client_context_.get_clientstatistics().shutdown();
330  client_context_.get_proxy_manager().destroy();
331  client_context_.get_connection_manager().shutdown();
332  client_context_.get_client_cluster_service().shutdown();
333  client_context_.get_invocation_service().shutdown();
334  client_context_.get_client_listener_service().shutdown();
335  client_context_.get_near_cache_manager().destroy_all_near_caches();
336  fire_lifecycle_event(lifecycle_event::SHUTDOWN);
337  client_context_.get_client_execution_service().shutdown();
338  client_context_.get_serialization_service().dispose();
339  shutdown_completed_latch_.count_down();
340  } catch (std::exception &e) {
341  HZ_LOG(client_context_.get_logger(), info,
342  boost::str(boost::format("An exception occured during LifecycleService shutdown. %1%")
343  % e.what())
344  );
345  shutdown_completed_latch_.count_down();
346  }
347  }
348 
349  boost::uuids::uuid lifecycle_service::add_listener(lifecycle_listener &&lifecycle_listener) {
350  std::lock_guard<std::mutex> lg(listener_lock_);
351  const auto id = uuid_generator_();
352  listeners_.emplace(id, std::move(lifecycle_listener));
353  return id;
354  }
355 
356  bool lifecycle_service::remove_listener(const boost::uuids::uuid &registration_id) {
357  std::lock_guard<std::mutex> guard(listener_lock_);
358  return listeners_.erase(registration_id) == 1;
359  }
360 
361  void lifecycle_service::fire_lifecycle_event(const lifecycle_event &lifecycle_event) {
362  std::lock_guard<std::mutex> guard(listener_lock_);
363  logger &lg = client_context_.get_logger();
364 
365  std::function<void(lifecycle_listener &)> fire_one;
366 
367  switch (lifecycle_event.get_state()) {
368  case lifecycle_event::STARTING : {
369  // convert the date string from "2016-04-20" to 20160420
370  std::string date(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_DATE));
371  util::git_date_to_hazelcast_log_date(date);
372  std::string commitId(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_ID));
373  commitId.erase(std::remove(commitId.begin(), commitId.end(), '"'), commitId.end());
374 
375  HZ_LOG(lg, info,
376  (boost::format("(%1%:%2%) LifecycleService::LifecycleEvent Client (%3%) is STARTING") %
377  date % commitId % client_context_.get_connection_manager().get_client_uuid()).str());
378  char msg[100];
379  util::hz_snprintf(msg, 100, "(%s:%s) LifecycleService::LifecycleEvent STARTING", date.c_str(),
380  commitId.c_str());
381  HZ_LOG(lg, info, msg);
382 
383  fire_one = [](lifecycle_listener &listener) {
384  listener.starting_();
385  };
386  break;
387  }
388  case lifecycle_event::STARTED : {
389  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent STARTED");
390 
391  fire_one = [](lifecycle_listener &listener) {
392  listener.started_();
393  };
394  break;
395  }
396  case lifecycle_event::SHUTTING_DOWN : {
397  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent SHUTTING_DOWN");
398 
399  fire_one = [](lifecycle_listener &listener) {
400  listener.shutting_down_();
401  };
402  break;
403  }
404  case lifecycle_event::SHUTDOWN : {
405  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent SHUTDOWN");
406 
407  fire_one = [](lifecycle_listener &listener) {
408  listener.shutdown_();
409  };
410  break;
411  }
412  case lifecycle_event::CLIENT_CONNECTED : {
413  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent CLIENT_CONNECTED");
414 
415  fire_one = [](lifecycle_listener &listener) {
416  listener.connected_();
417  };
418  break;
419  }
420  case lifecycle_event::CLIENT_DISCONNECTED : {
421  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent CLIENT_DISCONNECTED");
422 
423  fire_one = [](lifecycle_listener &listener) {
424  listener.disconnected_();
425  };
426  break;
427  }
428  }
429 
430  for (auto &item: listeners_) {
431  fire_one(item.second);
432  }
433  }
434 
435  bool lifecycle_service::is_running() {
436  return active_;
437  }
438 
439  lifecycle_service::~lifecycle_service() {
440  if (active_) {
441  shutdown();
442  }
443  }
444 
445  void lifecycle_service::wait_for_initial_membership_event() const {
446  client_context_.get_client_cluster_service().wait_initial_member_list_fetched();
447  }
448 
449  DefaultObjectNamespace::DefaultObjectNamespace(const std::string &service, const std::string &object)
450  : service_name_(service), object_name_(object) {
451 
452  }
453 
454  const std::string &DefaultObjectNamespace::get_service_name() const {
455  return service_name_;
456  }
457 
458  const std::string &DefaultObjectNamespace::get_object_name() const {
459  return object_name_;
460  }
461 
462  bool DefaultObjectNamespace::operator==(const DefaultObjectNamespace &rhs) const {
463  return service_name_ == rhs.service_name_ && object_name_ == rhs.object_name_;
464  }
465 
466  ClientProxy::ClientProxy(const std::string &name, const std::string &service_name, ClientContext &context)
467  : name_(name), service_name_(service_name), context_(context) {}
468 
469  ClientProxy::~ClientProxy() = default;
470 
471  const std::string &ClientProxy::get_name() const {
472  return name_;
473  }
474 
475  const std::string &ClientProxy::get_service_name() const {
476  return service_name_;
477  }
478 
479  ClientContext &ClientProxy::get_context() {
480  return context_;
481  }
482 
483  void ClientProxy::on_destroy() {
484  }
485 
486  boost::future<void> ClientProxy::destroy() {
487  return get_context().get_proxy_manager().destroy_proxy(*this);
488  }
489 
490  void ClientProxy::destroy_locally() {
491  if (pre_destroy()) {
492  try {
493  on_destroy();
494  post_destroy();
495  } catch (exception::iexception &) {
496  post_destroy();
497  throw;
498  }
499  }
500  }
501 
502  bool ClientProxy::pre_destroy() {
503  return true;
504  }
505 
506  void ClientProxy::post_destroy() {
507  }
508 
509  void ClientProxy::on_initialize() {
510  }
511 
512  void ClientProxy::on_shutdown() {
513  }
514 
515  serialization::pimpl::SerializationService &ClientProxy::get_serialization_service() {
516  return context_.get_serialization_service();
517  }
518 
519  boost::future<void> ClientProxy::destroy_remotely() {
520  auto clientMessage = protocol::codec::client_destroyproxy_encode(
521  get_name(), get_service_name());
522  return spi::impl::ClientInvocation::create(get_context(), std::make_shared<protocol::ClientMessage>(
523  std::move(clientMessage)), get_name())->invoke().then(
524  boost::launch::sync, [](boost::future<protocol::ClientMessage> f) { f.get(); });
525  }
526 
527  boost::future<boost::uuids::uuid>
528  ClientProxy::register_listener(std::shared_ptr<impl::ListenerMessageCodec> listener_message_codec,
529  std::shared_ptr<client::impl::BaseEventHandler> handler) {
530  handler->set_logger(&get_context().get_logger());
531  return get_context().get_client_listener_service().register_listener(listener_message_codec,
532  handler);
533  }
534 
535  boost::future<bool> ClientProxy::deregister_listener(boost::uuids::uuid registration_id) {
536  return get_context().get_client_listener_service().deregister_listener(registration_id);
537  }
538 
539  namespace impl {
540  boost::uuids::uuid
541  ListenerMessageCodec::decode_add_response(protocol::ClientMessage &msg) const {
542  return msg.get_first_uuid();
543  }
544 
545  bool ListenerMessageCodec::decode_remove_response(protocol::ClientMessage &msg) const {
546  return msg.get_first_fixed_sized_field<bool>();
547  }
548 
549  ClientInvocationServiceImpl::ClientInvocationServiceImpl(ClientContext &client)
550  : client_(client), logger_(client.get_logger()),
551  invocation_timeout_(std::chrono::seconds(client.get_client_properties().get_integer(
552  client.get_client_properties().get_invocation_timeout_seconds()))),
553  invocation_retry_pause_(std::chrono::milliseconds(client.get_client_properties().get_long(
554  client.get_client_properties().get_invocation_retry_pause_millis()))),
555  smart_routing_(client.get_client_config().get_network_config().is_smart_routing()),
556  backup_acks_enabled_(smart_routing_ && client.get_client_config().backup_acks_enabled()),
557  fail_on_indeterminate_operation_state_(client.get_client_properties().get_boolean(client.get_client_properties().fail_on_indeterminate_state())),
558  backup_timeout_(std::chrono::milliseconds(client.get_client_properties().get_integer(client.get_client_properties().backup_timeout_millis()))) {}
559 
560  void ClientInvocationServiceImpl::start() {
561  }
562 
563  void ClientInvocationServiceImpl::add_backup_listener() {
564  if (this->backup_acks_enabled_) {
565  auto &listener_service = this->client_.get_client_listener_service();
566  listener_service.register_listener(std::make_shared<BackupListenerMessageCodec>(),
567  std::make_shared<noop_backup_event_handler>()).get();
568  }
569  }
570 
571  void ClientInvocationServiceImpl::shutdown() {
572  is_shutdown_.store(true);
573  }
574 
575  std::chrono::milliseconds ClientInvocationServiceImpl::get_invocation_timeout() const {
576  return invocation_timeout_;
577  }
578 
579  std::chrono::milliseconds ClientInvocationServiceImpl::get_invocation_retry_pause() const {
580  return invocation_retry_pause_;
581  }
582 
583  bool ClientInvocationServiceImpl::is_redo_operation() {
584  return client_.get_client_config().is_redo_operation();
585  }
586 
587  void
588  ClientInvocationServiceImpl::handle_client_message(const std::shared_ptr<ClientInvocation> &invocation,
589  const std::shared_ptr<protocol::ClientMessage> &response) {
590  try {
591  if (protocol::codec::ErrorCodec::EXCEPTION_MESSAGE_TYPE == response->get_message_type()) {
592  auto error_holder = protocol::codec::ErrorCodec::decode(*response);
593  invocation->notify_exception(
594  client_.get_client_exception_factory().create_exception(error_holder));
595  } else {
596  invocation->notify(response);
597  }
598  } catch (std::exception &e) {
599  HZ_LOG(logger_, severe,
600  boost::str(boost::format("Failed to process response for %1%. %2%")
601  % *invocation % e.what())
602  );
603  }
604  }
605 
606  bool ClientInvocationServiceImpl::send(const std::shared_ptr<impl::ClientInvocation>& invocation,
607  const std::shared_ptr<connection::Connection>& connection) {
608  if (is_shutdown_) {
609  BOOST_THROW_EXCEPTION(
610  exception::hazelcast_client_not_active("ClientInvocationServiceImpl::send",
611  "Client is shut down"));
612  }
613 
614  if (backup_acks_enabled_) {
615  invocation->get_client_message()->add_flag(protocol::ClientMessage::BACKUP_AWARE_FLAG);
616  }
617 
618  write_to_connection(*connection, invocation);
619  invocation->set_send_connection(connection);
620  return true;
621  }
622 
623  void ClientInvocationServiceImpl::write_to_connection(connection::Connection &connection,
624  const std::shared_ptr<ClientInvocation> &client_invocation) {
625  auto clientMessage = client_invocation->get_client_message();
626  connection.write(client_invocation);
627  }
628 
629  void ClientInvocationServiceImpl::check_invocation_allowed() {
630  client_.get_connection_manager().check_invocation_allowed();
631  }
632 
633  bool ClientInvocationServiceImpl::invoke(std::shared_ptr<ClientInvocation> invocation) {
634  auto connection = client_.get_connection_manager().get_random_connection();
635  if (!connection) {
636  HZ_LOG(logger_, finest, "No connection found to invoke");
637  return false;
638  }
639  return send(invocation, connection);
640  }
641 
642  DefaultAddressProvider::DefaultAddressProvider(config::client_network_config &network_config)
643  : network_config_(network_config) {}
644 
645  std::vector<address> DefaultAddressProvider::load_addresses() {
646  std::vector<address> addresses = network_config_.get_addresses();
647  if (addresses.empty()) {
648  addresses.emplace_back("127.0.0.1", 5701);
649  }
650 
651  // TODO Implement AddressHelper to add alternative ports for the same address
652 
653  return addresses;
654  }
655 
656  boost::optional<address> DefaultAddressProvider::translate(const address &addr) {
657  return addr;
658  }
659 
660  const boost::shared_ptr<ClientClusterServiceImpl::member_list_snapshot> ClientClusterServiceImpl::EMPTY_SNAPSHOT(
661  new ClientClusterServiceImpl::member_list_snapshot{-1});
662 
663  constexpr boost::chrono::milliseconds ClientClusterServiceImpl::INITIAL_MEMBERS_TIMEOUT;
664 
665  ClientClusterServiceImpl::ClientClusterServiceImpl(hazelcast::client::spi::ClientContext &client)
666  : client_(client), member_list_snapshot_(EMPTY_SNAPSHOT),
667  labels_(client.get_client_config().get_labels()),
668  initial_list_fetched_latch_(1) {
669  }
670 
671  boost::uuids::uuid ClientClusterServiceImpl::add_membership_listener_without_init(
672  membership_listener &&listener) {
673  std::lock_guard<std::mutex> g(listeners_lock_);
674  auto id = client_.random_uuid();
675  listeners_.emplace(id, std::move(listener));
676  return id;
677  }
678 
679  boost::optional<member> ClientClusterServiceImpl::get_member(boost::uuids::uuid uuid) const {
680  assert(!uuid.is_nil());
681  auto members_view_ptr = member_list_snapshot_.load();
682  const auto it = members_view_ptr->members.find(uuid);
683  if (it == members_view_ptr->members.end()) {
684  return boost::none;
685  }
686  return {it->second};
687  }
688 
689  std::vector<member> ClientClusterServiceImpl::get_member_list() const {
690  auto members_view_ptr = member_list_snapshot_.load();
691  std::vector<member> result;
692  result.reserve(members_view_ptr->members.size());
693  for (const auto &e : members_view_ptr->members) {
694  result.emplace_back(e.second);
695  }
696  return result;
697  }
698 
699  void ClientClusterServiceImpl::start() {
700  for (auto &listener : client_.get_client_config().get_membership_listeners()) {
701  add_membership_listener(membership_listener(listener));
702  }
703  }
704 
705  void ClientClusterServiceImpl::fire_initial_membership_event(const initial_membership_event &event) {
706  std::lock_guard<std::mutex> g(listeners_lock_);
707 
708  for (auto &item : listeners_) {
709  membership_listener &listener = item.second;
710  if (listener.init_) {
711  listener.init_(event);
712  }
713  }
714  }
715 
716  void ClientClusterServiceImpl::shutdown() {
717  initial_list_fetched_latch_.try_count_down();
718  }
719 
720  boost::uuids::uuid
721  ClientClusterServiceImpl::add_membership_listener(membership_listener &&listener) {
722  std::lock_guard<std::mutex> cluster_view_g(cluster_view_lock_);
723 
724  auto id = add_membership_listener_without_init(std::move(listener));
725 
726  std::lock_guard<std::mutex> listeners_g(listeners_lock_);
727  auto added_listener = listeners_[id];
728 
729  if (added_listener.init_) {
730  auto &cluster = client_.get_cluster();
731  auto members_ptr = member_list_snapshot_.load();
732  if (!members_ptr->members.empty()) {
733  std::unordered_set<member> members;
734  for (const auto &e : members_ptr->members) {
735  members.insert(e.second);
736  }
737  added_listener.init_(initial_membership_event(cluster, members));
738  }
739  }
740 
741  return id;
742  }
743 
744  bool ClientClusterServiceImpl::remove_membership_listener(boost::uuids::uuid registration_id) {
745  std::lock_guard<std::mutex> g(listeners_lock_);
746  return listeners_.erase(registration_id) == 1;
747  }
748 
749  std::vector<member>
750  ClientClusterServiceImpl::get_members(const member_selector &selector) const {
751  std::vector<member> result;
752  for (auto &&member : get_member_list()) {
753  if (selector.select(member)) {
754  result.emplace_back(std::move(member));
755  }
756  }
757 
758  return result;
759  }
760 
761  local_endpoint ClientClusterServiceImpl::get_local_client() const {
762  connection::ClientConnectionManagerImpl &cm = client_.get_connection_manager();
763  auto connection = cm.get_random_connection();
764  auto inetSocketAddress = connection ? connection->get_local_socket_address() : boost::none;
765  auto uuid = cm.get_client_uuid();
766  return local_endpoint(uuid, std::move(inetSocketAddress), client_.get_name(), labels_);
767  }
768 
769  void ClientClusterServiceImpl::clear_member_list_version() {
770  std::lock_guard<std::mutex> g(cluster_view_lock_);
771  auto &lg = client_.get_logger();
772  HZ_LOG(lg, finest, "Resetting the member list version ");
773  auto cluster_view_snapshot = member_list_snapshot_.load();
774  //This check is necessary so that `clear_member_list_version` when handling auth response will not
775  //intervene with client failover logic
776  if (cluster_view_snapshot != EMPTY_SNAPSHOT) {
777  member_list_snapshot_.store(boost::shared_ptr<member_list_snapshot>(
778  new member_list_snapshot{0, cluster_view_snapshot->members}));
779  }
780  }
781 
782  std::vector<membership_event> ClientClusterServiceImpl::clear_member_list_and_return_events() {
783  std::lock_guard<std::mutex> g(cluster_view_lock_);
784 
785  auto &lg = client_.get_logger();
786  HZ_LOG(lg, finest, "Resetting the member list");
787 
788  auto previous_list = member_list_snapshot_.load()->members;
789 
790  member_list_snapshot_.store(
791  boost::shared_ptr<member_list_snapshot>(new member_list_snapshot{0, {}}));
792 
793  return detect_membership_events(previous_list, {});
794  }
795 
796  void ClientClusterServiceImpl::clear_member_list() {
797  auto events = clear_member_list_and_return_events();
798  fire_events(std::move(events));
799  }
800 
801  void
802  ClientClusterServiceImpl::handle_event(int32_t version, const std::vector<member> &member_infos) {
803  auto &lg = client_.get_logger();
804  HZ_LOG(lg, finest,
805  boost::str(boost::format("Handling new snapshot with membership version: %1%, "
806  "membersString %2%")
807  % version
808  % members_string(create_snapshot(version, member_infos)))
809  );
810  auto cluster_view_snapshot = member_list_snapshot_.load();
811  if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
812  std::lock_guard<std::mutex> g(cluster_view_lock_);
813  cluster_view_snapshot = member_list_snapshot_.load();
814  if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
815  //this means this is the first time client connected to cluster
816  apply_initial_state(version, member_infos);
817  initial_list_fetched_latch_.count_down();
818  return;
819  }
820  }
821 
822  std::vector<membership_event> events;
823  if (version >= cluster_view_snapshot->version) {
824  std::lock_guard<std::mutex> g(cluster_view_lock_);
825  cluster_view_snapshot = member_list_snapshot_.load();
826  if (version >= cluster_view_snapshot->version) {
827  auto prev_members = cluster_view_snapshot->members;
828  auto snapshot = boost::make_shared<member_list_snapshot>(create_snapshot(version, member_infos));
829  member_list_snapshot_.store(snapshot);
830  events = detect_membership_events(prev_members, snapshot->members);
831  }
832  }
833 
834  fire_events(std::move(events));
835  }
836 
837  ClientClusterServiceImpl::member_list_snapshot
838  ClientClusterServiceImpl::create_snapshot(int32_t version, const std::vector<member> &members) {
839  member_list_snapshot result;
840  result.version = version;
841  for (auto &m : members) {
842  result.members.insert({m.get_uuid(), m});
843  }
844 
845  return result;
846  }
847 
848  std::string
849  ClientClusterServiceImpl::members_string(const ClientClusterServiceImpl::member_list_snapshot& snapshot) {
850  std::stringstream out;
851  auto const &members = snapshot.members;
852  out << std::endl << std::endl << "Members [" << members.size() << "] {";
853  for (auto const &e : members) {
854  out << std::endl << "\t" << e.second;
855  }
856  out << std::endl << "}" << std::endl;
857  return out.str();
858  }
859 
860  void
861  ClientClusterServiceImpl::apply_initial_state(int32_t version, const std::vector<member> &member_infos) {
862  auto snapshot = boost::make_shared<member_list_snapshot>(create_snapshot(version, member_infos));
863  member_list_snapshot_.store(snapshot);
864  HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
865  std::unordered_set<member> members;
866  for(auto const &e : snapshot->members) {
867  members.insert(e.second);
868  }
869  std::lock_guard<std::mutex> g(listeners_lock_);
870  for (auto &item : listeners_) {
871  membership_listener &listener = item.second;
872  if (listener.init_) {
873  listener.init_(initial_membership_event(client_.get_cluster(), members));
874  }
875  }
876  }
877 
878  std::vector<membership_event> ClientClusterServiceImpl::detect_membership_events(
879  std::unordered_map<boost::uuids::uuid, member, boost::hash<boost::uuids::uuid>> previous_members,
880  const std::unordered_map<boost::uuids::uuid, member, boost::hash<boost::uuids::uuid>>& current_members) {
881  std::vector<member> new_members;
882 
883  for (auto const & e : current_members) {
884  if (!previous_members.erase(e.first)) {
885  new_members.emplace_back(e.second);
886  }
887  }
888 
889  std::vector<membership_event> events;
890 
891  // removal events should be added before added events
892  for (auto const &e : previous_members) {
893  events.emplace_back(client_.get_cluster(), e.second, membership_event::membership_event_type::MEMBER_LEFT, current_members);
894  auto connection = client_.get_connection_manager().get_connection(e.second.get_uuid());
895  if (connection) {
896  connection->close("", std::make_exception_ptr(exception::target_disconnected(
897  "ClientClusterServiceImpl::detect_membership_events", (boost::format(
898  "The client has closed the connection to this member, after receiving a member left event from the cluster. %1%") %
899  *connection).str())));
900  }
901  }
902  for (auto const &member : new_members) {
903  events.emplace_back(client_.get_cluster(), member, membership_event::membership_event_type::MEMBER_JOINED, current_members);
904  }
905 
906  if (!events.empty()) {
907  auto snapshot = member_list_snapshot_.load();
908  if (!snapshot->members.empty()) {
909  HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
910  }
911  }
912  return events;
913  }
914 
915  void ClientClusterServiceImpl::fire_events(std::vector<membership_event> events) {
916  std::lock_guard<std::mutex> g(listeners_lock_);
917 
918  for (auto const &event : events) {
919  for (auto &item : listeners_) {
920  membership_listener &listener = item.second;
921  if (event.get_event_type() == membership_event::membership_event_type::MEMBER_JOINED) {
922  listener.joined_(event);
923  } else {
924  listener.left_(event);
925  }
926  }
927  }
928  }
929 
930  void ClientClusterServiceImpl::wait_initial_member_list_fetched() const {
931  // safe to const cast here since latch operations are already thread safe ops.
932  if ((const_cast<boost::latch&>(initial_list_fetched_latch_)).wait_for(INITIAL_MEMBERS_TIMEOUT) == boost::cv_status::timeout) {
933  BOOST_THROW_EXCEPTION(exception::illegal_state(
934  "ClientClusterServiceImpl::wait_initial_member_list_fetched",
935  "Could not get initial member list from cluster!"));
936  }
937  }
938 
939  bool
940  ClientInvocationServiceImpl::invoke_on_connection(const std::shared_ptr<ClientInvocation> &invocation,
941  const std::shared_ptr<connection::Connection> &connection) {
942  return send(invocation, connection);
943  }
944 
945  bool ClientInvocationServiceImpl::invoke_on_partition_owner(
946  const std::shared_ptr<ClientInvocation> &invocation, int partition_id) {
947  auto partition_owner = client_.get_partition_service().get_partition_owner(partition_id);
948  if (partition_owner.is_nil()) {
949  HZ_LOG(logger_, finest,
950  boost::str(boost::format("Partition owner is not assigned yet for partition %1%")
951  % partition_id)
952  );
953  return false;
954  }
955  return invoke_on_target(invocation, partition_owner);
956  }
957 
958  bool ClientInvocationServiceImpl::invoke_on_target(const std::shared_ptr<ClientInvocation> &invocation,
959  boost::uuids::uuid uuid) {
960  assert (!uuid.is_nil());
961  auto connection = client_.get_connection_manager().get_connection(uuid);
962  if (!connection) {
963  HZ_LOG(logger_, finest,
964  boost::str(boost::format("Client is not connected to target : %1%")
965  % uuid)
966  );
967  return false;
968  }
969  return send(invocation, connection);
970  }
971 
972  bool ClientInvocationServiceImpl::is_smart_routing() const {
973  return smart_routing_;
974  }
975 
976  const std::chrono::milliseconds &ClientInvocationServiceImpl::get_backup_timeout() const {
977  return backup_timeout_;
978  }
979 
980  bool ClientInvocationServiceImpl::fail_on_indeterminate_state() const {
981  return fail_on_indeterminate_operation_state_;
982  }
983 
984  ClientExecutionServiceImpl::ClientExecutionServiceImpl(const std::string &name,
985  const client_properties &properties,
986  int32_t pool_size,
987  spi::lifecycle_service &service)
988  : lifecycle_service_(service), client_properties_(properties) {}
989 
990  void ClientExecutionServiceImpl::start() {
991  int internalPoolSize = client_properties_.get_integer(
992  client_properties_.get_internal_executor_pool_size());
993  if (internalPoolSize <= 0) {
994  internalPoolSize = util::IOUtil::to_value<int>(
995  client_properties::INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT);
996  }
997 
998  internal_executor_.reset(new hazelcast::util::hz_thread_pool(internalPoolSize));
999 
1000  user_executor_.reset(new hazelcast::util::hz_thread_pool());
1001  }
1002 
1003  void ClientExecutionServiceImpl::shutdown() {
1004  shutdown_thread_pool(internal_executor_.get());
1005  shutdown_thread_pool(user_executor_.get());
1006  }
1007 
1008  util::hz_thread_pool &ClientExecutionServiceImpl::get_user_executor() {
1009  return *user_executor_;
1010  }
1011 
1012  void ClientExecutionServiceImpl::shutdown_thread_pool(hazelcast::util::hz_thread_pool *pool) {
1013  if (!pool) {
1014  return;
1015  }
1016  pool->close();
1017  }
1018 
1019  constexpr int ClientInvocation::MAX_FAST_INVOCATION_COUNT;
1020  constexpr int ClientInvocation::UNASSIGNED_PARTITION;
1021 
1022  ClientInvocation::ClientInvocation(spi::ClientContext &client_context,
1023  std::shared_ptr<protocol::ClientMessage> &&message,
1024  const std::string &name,
1025  int partition,
1026  const std::shared_ptr<connection::Connection> &conn,
1027  boost::uuids::uuid uuid) :
1028  logger_(client_context.get_logger()),
1029  lifecycle_service_(client_context.get_lifecycle_service()),
1030  invocation_service_(client_context.get_invocation_service()),
1031  execution_service_(client_context.get_client_execution_service().shared_from_this()),
1032  call_id_sequence_(client_context.get_call_id_sequence()),
1033  uuid_(uuid),
1034  partition_id_(partition),
1035  start_time_(std::chrono::steady_clock::now()),
1036  retry_pause_(invocation_service_.get_invocation_retry_pause()),
1037  object_name_(name),
1038  connection_(conn), bound_to_single_connection_(conn != nullptr),
1039  invoke_count_(0), urgent_(false), smart_routing_(invocation_service_.is_smart_routing()) {
1040  message->set_partition_id(partition_id_);
1041  client_message_ = boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(message);
1042  set_send_connection(nullptr);
1043  }
1044 
1045  ClientInvocation::~ClientInvocation() = default;
1046 
1047  boost::future<protocol::ClientMessage> ClientInvocation::invoke() {
1048  assert (client_message_.load());
1049  // for back pressure
1050  call_id_sequence_->next();
1051  invoke_on_selection();
1052  if (!lifecycle_service_.is_running()) {
1053  return invocation_promise_.get_future().then([](boost::future<protocol::ClientMessage> f) {
1054  return f.get();
1055  });
1056  }
1057  auto id_seq = call_id_sequence_;
1058  return invocation_promise_.get_future().then(execution_service_->get_user_executor(),
1059  [=](boost::future<protocol::ClientMessage> f) {
1060  id_seq->complete();
1061  return f.get();
1062  });
1063  }
1064 
1065  boost::future<protocol::ClientMessage> ClientInvocation::invoke_urgent() {
1066  assert(client_message_.load());
1067  urgent_ = true;
1068  // for back pressure
1069  call_id_sequence_->force_next();
1070  invoke_on_selection();
1071  if (!lifecycle_service_.is_running()) {
1072  return invocation_promise_.get_future().then([](boost::future<protocol::ClientMessage> f) {
1073  return f.get();
1074  });
1075  }
1076  auto id_seq = call_id_sequence_;
1077  return invocation_promise_.get_future().then(execution_service_->get_user_executor(),
1078  [=](boost::future<protocol::ClientMessage> f) {
1079  id_seq->complete();
1080  return f.get();
1081  });
1082  }
1083 
1084  void ClientInvocation::invoke_on_selection() {
1085  try {
1086  invoke_count_++;
1087  if (!urgent_) {
1088  invocation_service_.check_invocation_allowed();
1089  }
1090 
1091  if (is_bind_to_single_connection()) {
1092  bool invoked = false;
1093  auto conn = connection_.lock();
1094  if (conn) {
1095  invoked = invocation_service_.invoke_on_connection(shared_from_this(), conn);
1096  }
1097  if (!invoked) {
1098  std::string message;
1099  if (conn) {
1100  message = (boost::format("Could not invoke on connection %1%") % *conn).str();
1101  } else {
1102  message = "Could not invoke. Bound to a connection that is deleted already.";
1103  }
1104  notify_exception(std::make_exception_ptr(
1105  exception::io("ClientInvocation::invoke_on_selection", message)));
1106  }
1107  return;
1108  }
1109 
1110  bool invoked = false;
1111  if (smart_routing_) {
1112  if (partition_id_ != -1) {
1113  invoked = invocation_service_.invoke_on_partition_owner(shared_from_this(), partition_id_);
1114  } else if (!uuid_.is_nil()) {
1115  invoked = invocation_service_.invoke_on_target(shared_from_this(), uuid_);
1116  } else {
1117  invoked = invocation_service_.invoke(shared_from_this());
1118  }
1119  if (!invoked) {
1120  invoked = invocation_service_.invoke(shared_from_this());
1121  }
1122  } else {
1123  invoked = invocation_service_.invoke(shared_from_this());
1124  }
1125  if (!invoked) {
1126  notify_exception(std::make_exception_ptr(exception::io("No connection found to invoke")));
1127  }
1128  } catch (exception::iexception &) {
1129  notify_exception(std::current_exception());
1130  } catch (std::exception &) {
1131  assert(false);
1132  }
1133  }
1134 
1135  bool ClientInvocation::is_bind_to_single_connection() const {
1136  return bound_to_single_connection_;
1137  }
1138 
1139  void ClientInvocation::run() {
1140  retry();
1141  }
1142 
1143  void ClientInvocation::retry() {
1144  // retry modifies the client message and should not reuse the client message.
1145  // It could be the case that it is in write queue of the connection.
1146  client_message_ = boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(copy_message());
1147 
1148  try {
1149  invoke_on_selection();
1150  } catch (exception::iexception &e) {
1151  set_exception(e, boost::current_exception());
1152  } catch (std::exception &) {
1153  assert(false);
1154  }
1155  }
1156 
1157  void ClientInvocation::set_exception(const std::exception &e, boost::exception_ptr exception_ptr) {
1158  invoked_or_exception_set_.store(true);
1159  try {
1160  auto send_conn = send_connection_.load();
1161  if (send_conn) {
1162  auto connection = send_conn->lock();
1163  if (connection) {
1164  auto call_id = client_message_.load()->get()->get_correlation_id();
1165  boost::asio::post(connection->get_socket().get_executor(), [=]() {
1166  connection->deregister_invocation(call_id);
1167  });
1168  }
1169  }
1170  invocation_promise_.set_exception(std::move(exception_ptr));
1171  } catch (boost::promise_already_satisfied &se) {
1172  if (!event_handler_) {
1173  HZ_LOG(logger_, finest,
1174  boost::str(boost::format("Failed to set the exception for invocation. "
1175  "%1%, %2% Exception to be set: %3%")
1176  % se.what() % *this % e.what())
1177  );
1178  }
1179  }
1180  }
1181 
1182  void ClientInvocation::notify_exception(std::exception_ptr exception) {
1183  erase_invocation();
1184  try {
1185  std::rethrow_exception(exception);
1186  } catch (exception::iexception &iex) {
1187  log_exception(iex);
1188 
1189  if (!lifecycle_service_.is_running()) {
1190  try {
1191  std::throw_with_nested(boost::enable_current_exception(
1192  exception::hazelcast_client_not_active(iex.get_source(),
1193  "Client is shutting down")));
1194  } catch (exception::iexception &e) {
1195  set_exception(e, boost::current_exception());
1196  }
1197  return;
1198  }
1199 
1200  if (!should_retry(iex)) {
1201  set_exception(iex, boost::current_exception());
1202  return;
1203  }
1204 
1205  auto timePassed = std::chrono::steady_clock::now() - start_time_;
1206  if (timePassed > invocation_service_.get_invocation_timeout()) {
1207  HZ_LOG(logger_, finest,
1208  boost::str(boost::format("Exception will not be retried because "
1209  "invocation timed out. %1%") % iex.what())
1210  );
1211 
1212  auto now = std::chrono::steady_clock::now();
1213 
1214  auto timeoutException = (exception::exception_builder<exception::operation_timeout>(
1215  "ClientInvocation::newoperation_timeout_exception") << *this
1216  << " timed out because exception occurred after client invocation timeout "
1217  << std::chrono::duration_cast<std::chrono::milliseconds>(invocation_service_.get_invocation_timeout()).count()
1218  << "msecs. Last exception:" << iex
1219  << " Current time :" << util::StringUtil::time_to_string(now) << ". "
1220  << "Start time: " << util::StringUtil::time_to_string(start_time_)
1221  << ". Total elapsed time: " <<
1222  std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time_).count()
1223  << " ms. ").build();
1224  try {
1225  BOOST_THROW_EXCEPTION(timeoutException);
1226  } catch (...) {
1227  set_exception(timeoutException, boost::current_exception());
1228  }
1229 
1230  return;
1231  }
1232 
1233  try {
1234  execute();
1235  } catch (std::exception &e) {
1236  set_exception(e, boost::current_exception());
1237  }
1238  } catch (...) {
1239  assert(false);
1240  }
1241  }
1242 
1243  void ClientInvocation::erase_invocation() const {
1244  if (!this->event_handler_) {
1245  auto sent_connection = get_send_connection();
1246  if (sent_connection) {
1247  auto this_invocation = shared_from_this();
1248  boost::asio::post(sent_connection->get_socket().get_executor(), [=] () {
1249  sent_connection->invocations.erase(this_invocation->get_client_message()->get_correlation_id());
1250  });
1251  }
1252  }
1253  }
1254 
1255  bool ClientInvocation::should_retry(exception::iexception &exception) {
1256  auto errorCode = exception.get_error_code();
1257  if (is_bind_to_single_connection() && (errorCode == protocol::IO || errorCode == protocol::TARGET_DISCONNECTED)) {
1258  return false;
1259  }
1260 
1261  if (!uuid_.is_nil() && errorCode == protocol::TARGET_NOT_MEMBER) {
1262  //when invocation send to a specific member
1263  //if target is no longer a member, we should not retry
1264  //note that this exception could come from the server
1265  return false;
1266  }
1267 
1268  if (errorCode == protocol::IO || errorCode == protocol::HAZELCAST_INSTANCE_NOT_ACTIVE || exception.is_retryable()) {
1269  return true;
1270  }
1271  if (errorCode == protocol::TARGET_DISCONNECTED) {
1272  return client_message_.load()->get()->is_retryable() || invocation_service_.is_redo_operation();
1273  }
1274  return false;
1275  }
1276 
1277  std::ostream &operator<<(std::ostream &os, const ClientInvocation &invocation) {
1278  std::ostringstream target;
1279  if (invocation.is_bind_to_single_connection()) {
1280  auto conn = invocation.connection_.lock();
1281  if (conn) {
1282  target << "connection " << *conn;
1283  }
1284  } else if (invocation.partition_id_ != -1) {
1285  target << "partition " << invocation.partition_id_;
1286  } else if (!invocation.uuid_.is_nil()) {
1287  target << "uuid " << boost::to_string(invocation.uuid_);
1288  } else {
1289  target << "random";
1290  }
1291  os << "ClientInvocation{" << "requestMessage = " << *invocation.client_message_.load()->get()
1292  << ", objectName = "
1293  << invocation.object_name_ << ", target = " << target.str() << ", sendConnection = ";
1294  auto sendConnection = invocation.get_send_connection();
1295  if (sendConnection) {
1296  os << *sendConnection;
1297  } else {
1298  os << "nullptr";
1299  }
1300  os << ", backup_acks_expected_ = " << static_cast<int>(invocation.backup_acks_expected_)
1301  << ", backup_acks_received = " << invocation.backup_acks_received_;
1302 
1303  if (invocation.pending_response_) {
1304  os << ", pending_response: " << *invocation.pending_response_;
1305  }
1306 
1307  os << '}';
1308 
1309  return os;
1310  }
1311 
1312  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1313  std::shared_ptr<protocol::ClientMessage> &&client_message,
1314  const std::string &object_name,
1315  int partition_id) {
1316  return std::shared_ptr<ClientInvocation>(
1317  new ClientInvocation(client_context, std::move(client_message), object_name, partition_id));
1318  }
1319 
1320  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1321  std::shared_ptr<protocol::ClientMessage> &&client_message,
1322  const std::string &object_name,
1323  const std::shared_ptr<connection::Connection> &connection) {
1324  return std::shared_ptr<ClientInvocation>(
1325  new ClientInvocation(client_context, std::move(client_message), object_name, UNASSIGNED_PARTITION,
1326  connection));
1327  }
1328 
1329 
1330  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1331  std::shared_ptr<protocol::ClientMessage> &&client_message,
1332  const std::string &object_name,
1333  boost::uuids::uuid uuid) {
1334  return std::shared_ptr<ClientInvocation>(
1335  new ClientInvocation(client_context, std::move(client_message), object_name, UNASSIGNED_PARTITION,
1336  nullptr, uuid));
1337  }
1338 
1339  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1340  protocol::ClientMessage &client_message,
1341  const std::string &object_name,
1342  int partition_id) {
1343  return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1344  object_name, partition_id);
1345  }
1346 
1347  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1348  protocol::ClientMessage &client_message,
1349  const std::string &object_name,
1350  const std::shared_ptr<connection::Connection> &connection) {
1351  return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1352  object_name, connection);
1353  }
1354 
1355  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1356  protocol::ClientMessage &client_message,
1357  const std::string &object_name,
1358  boost::uuids::uuid uuid) {
1359  return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1360  object_name, uuid);
1361  }
1362 
1363  std::shared_ptr<connection::Connection> ClientInvocation::get_send_connection() const {
1364  return send_connection_.load()->lock();
1365  }
1366 
1367  void ClientInvocation::wait_invoked() const {
1368  //it could be either invoked or cancelled before invoked
1369  while (!invoked_or_exception_set_) {
1370  std::this_thread::sleep_for(retry_pause_);
1371  }
1372  }
1373 
1374  void
1375  ClientInvocation::set_send_connection(const std::shared_ptr<connection::Connection> &conn) {
1376  send_connection_.store(boost::make_shared<std::weak_ptr<connection::Connection>>(conn));
1377  invoked_or_exception_set_.store(true);
1378  }
1379 
1380  void ClientInvocation::notify(const std::shared_ptr<protocol::ClientMessage> &msg) {
1381  if (!msg) {
1382  BOOST_THROW_EXCEPTION(exception::illegal_argument("response can't be null"));
1383  }
1384 
1385  int8_t expected_backups = msg->get_number_of_backups();
1386 
1387  // if a regular response comes and there are backups, we need to wait for the backups
1388  // when the backups complete, the response will be send by the last backup or backup-timeout-handle mechanism kicks on
1389  if (expected_backups > backup_acks_received_) {
1390  // so the invocation has backups and since not all backups have completed, we need to wait
1391  // (it could be that backups arrive earlier than the response)
1392 
1393  pending_response_received_time_ = std::chrono::steady_clock::now();
1394 
1395  backup_acks_expected_ = expected_backups;
1396 
1397  // it is very important that the response is set after the backupsAcksExpected is set, else the system
1398  // can assume the invocation is complete because there is a response and no backups need to respond
1399  pending_response_ = msg;
1400 
1401  // we are done since not all backups have completed. Therefore we should not notify the future
1402  return;
1403  }
1404 
1405  // we are going to notify the future that a response is available; this can happen when:
1406  // - we had a regular operation (so no backups we need to wait for) that completed
1407  // - we had a backup-aware operation that has completed, but also all its backups have completed
1408  complete(msg);
1409  }
1410 
1411  void ClientInvocation::complete(const std::shared_ptr<protocol::ClientMessage> &msg) {
1412  try {
1413  // TODO: move msg content here?
1414  this->invocation_promise_.set_value(*msg);
1415  } catch (std::exception &e) {
1416  HZ_LOG(logger_, warning,
1417  boost::str(boost::format("Failed to set the response for invocation. "
1418  "Dropping the response. %1%, %2% Response: %3%")
1419  % e.what() % *this % *msg)
1420  );
1421  }
1422  this->erase_invocation();
1423  }
1424 
1425  std::shared_ptr<protocol::ClientMessage> ClientInvocation::get_client_message() const {
1426  return *client_message_.load();
1427  }
1428 
1429  const std::shared_ptr<EventHandler<protocol::ClientMessage> > &
1430  ClientInvocation::get_event_handler() const {
1431  return event_handler_;
1432  }
1433 
1434  void ClientInvocation::set_event_handler(
1435  const std::shared_ptr<EventHandler<protocol::ClientMessage> > &handler) {
1436  ClientInvocation::event_handler_ = handler;
1437  }
1438 
1439  void ClientInvocation::execute() {
1440  auto this_invocation = shared_from_this();
1441  auto command = [=]() {
1442  this_invocation->run();
1443  };
1444 
1445  // first we force a new invocation slot because we are going to return our old invocation slot immediately after
1446  // It is important that we first 'force' taking a new slot; otherwise it could be that a sneaky invocation gets
1447  // through that takes our slot!
1448  int64_t callId = call_id_sequence_->force_next();
1449  client_message_.load()->get()->set_correlation_id(callId);
1450 
1451  //we release the old slot
1452  call_id_sequence_->complete();
1453 
1454  if (invoke_count_ < MAX_FAST_INVOCATION_COUNT) {
1455  // fast retry for the first few invocations
1456  execution_service_->execute(command);
1457  } else {
1458  // progressive retry delay
1459  int64_t delayMillis = util::min<int64_t>(static_cast<int64_t>(1) << (invoke_count_ - MAX_FAST_INVOCATION_COUNT),
1460  std::chrono::duration_cast<std::chrono::milliseconds>(retry_pause_).count());
1461  retry_timer_ = execution_service_->schedule(command, std::chrono::milliseconds(delayMillis));
1462  }
1463  }
1464 
1465  const std::string ClientInvocation::get_name() const {
1466  return "ClientInvocation";
1467  }
1468 
1469  std::shared_ptr<protocol::ClientMessage> ClientInvocation::copy_message() {
1470  return std::make_shared<protocol::ClientMessage>(**client_message_.load());
1471  }
1472 
1473  boost::promise<protocol::ClientMessage> &ClientInvocation::get_promise() {
1474  return invocation_promise_;
1475  }
1476 
1477  void ClientInvocation::log_exception(exception::iexception &e) {
1478  HZ_LOG(logger_, finest,
1479  boost::str(boost::format("Invocation got an exception %1%, invoke count : %2%, "
1480  "exception : %3%")
1481  % *this % invoke_count_.load() % e)
1482  );
1483  }
1484 
1485  void ClientInvocation::notify_backup() {
1486  ++backup_acks_received_;
1487 
1488  if (!pending_response_) {
1489  // no pendingResponse has been set, so we are done since the invocation on the primary needs to complete first
1490  return;
1491  }
1492 
1493  // if a pendingResponse is set, then the backupsAcksExpected has been set (so we can now safely read backupsAcksExpected)
1494  if (backup_acks_expected_ != backup_acks_received_) {
1495  // we managed to complete a backup, but we were not the one completing the last backup, so we are done
1496  return;
1497  }
1498 
1499  // we are the lucky one since we just managed to complete the last backup for this invocation and since the
1500  // pendingResponse is set, we can set it on the future
1501  complete_with_pending_response();
1502  }
1503 
1504  void
1505  ClientInvocation::detect_and_handle_backup_timeout(const std::chrono::milliseconds &backup_timeout) {
1506  // if the backups have completed, we are done; this also filters out all non backup-aware operations
1507  // since the backupsAcksExpected will always be equal to the backupsAcksReceived
1508  if (backup_acks_expected_ == backup_acks_received_) {
1509  return;
1510  }
1511 
1512  // if no response has yet been received, we we are done; we are only going to re-invoke an operation
1513  // if the response of the primary has been received, but the backups have not replied
1514  if (!pending_response_) {
1515  return;
1516  }
1517 
1518  // if this has not yet expired (so has not been in the system for a too long period) we ignore it
1519  if (pending_response_received_time_ + backup_timeout >= std::chrono::steady_clock::now()) {
1520  return;
1521  }
1522 
1523  if (invocation_service_.fail_on_indeterminate_state()) {
1524  auto exception = boost::enable_current_exception((exception::exception_builder<exception::indeterminate_operation_state>(
1525  "ClientInvocation::detect_and_handle_backup_timeout") << *this
1526  << " failed because backup acks missed.").build());
1527  notify_exception(std::make_exception_ptr(exception));
1528  return;
1529  }
1530 
1531  // the backups have not yet completed, but we are going to release the future anyway if a pendingResponse has been set
1532  complete_with_pending_response();
1533  }
1534 
1535  void ClientInvocation::complete_with_pending_response() {
1536  complete(pending_response_);
1537  }
1538 
1539  ClientContext &impl::ClientTransactionManagerServiceImpl::get_client() const {
1540  return client_;
1541  }
1542 
1543  ClientTransactionManagerServiceImpl::ClientTransactionManagerServiceImpl(ClientContext &client)
1544  : client_(client) {}
1545 
1546  std::shared_ptr<connection::Connection> ClientTransactionManagerServiceImpl::connect() {
1547  auto &invocationService = client_.get_invocation_service();
1548  auto startTime = std::chrono::steady_clock::now();
1549  auto invocationTimeout = invocationService.get_invocation_timeout();
1550  client_config &clientConfig = client_.get_client_config();
1551  bool smartRouting = clientConfig.get_network_config().is_smart_routing();
1552 
1553  while (client_.get_lifecycle_service().is_running()) {
1554  try {
1555  auto connection = client_.get_connection_manager().get_random_connection();
1556  if (!connection) {
1557  throw_exception(smartRouting);
1558  }
1559  return connection;
1560  } catch (exception::hazelcast_client_offline &) {
1561  throw;
1562  } catch (exception::iexception &) {
1563  if (std::chrono::steady_clock::now() - startTime > invocationTimeout) {
1564  std::rethrow_exception(
1565  new_operation_timeout_exception(std::current_exception(), invocationTimeout,
1566  startTime));
1567  }
1568  }
1569  std::this_thread::sleep_for(invocationService.get_invocation_retry_pause());
1570  }
1571  BOOST_THROW_EXCEPTION(
1572  exception::hazelcast_client_not_active("ClientTransactionManagerServiceImpl::connect",
1573  "Client is shutdown"));
1574  }
1575 
1576  std::exception_ptr
1577  ClientTransactionManagerServiceImpl::new_operation_timeout_exception(std::exception_ptr cause,
1578  std::chrono::milliseconds invocation_timeout,
1579  std::chrono::steady_clock::time_point start_time) {
1580  std::ostringstream sb;
1581  auto now = std::chrono::steady_clock::now();
1582  sb
1583  << "Creating transaction context timed out because exception occurred after client invocation timeout "
1584  << std::chrono::duration_cast<std::chrono::milliseconds>(invocation_timeout).count() << " ms. " << "Current time: "
1585  << util::StringUtil::time_to_string(std::chrono::steady_clock::now()) << ". " << "Start time: "
1586  << util::StringUtil::time_to_string(start_time) << ". Total elapsed time: "
1587  << std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time).count() << " ms. ";
1588  try {
1589  std::rethrow_exception(cause);
1590  } catch (...) {
1591  try {
1592  std::throw_with_nested(boost::enable_current_exception(exception::operation_timeout(
1593  "ClientTransactionManagerServiceImpl::newoperation_timeout_exception", sb.str())));
1594  } catch (...) {
1595  return std::current_exception();
1596  }
1597  }
1598  return nullptr;
1599  }
1600 
1601  void ClientTransactionManagerServiceImpl::throw_exception(bool smart_routing) {
1602  auto &client_config = client_.get_client_config();
1603  auto &connection_strategy_Config = client_config.get_connection_strategy_config();
1604  auto reconnect_mode = connection_strategy_Config.get_reconnect_mode();
1605  if (reconnect_mode == config::client_connection_strategy_config::reconnect_mode::ASYNC) {
1606  BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
1607  "ClientTransactionManagerServiceImpl::throw_exception", ""));
1608  }
1609  if (smart_routing) {
1610  auto members = client_.get_cluster().get_members();
1611  std::ostringstream msg;
1612  if (members.empty()) {
1613  msg << "No address was return by the LoadBalancer since there are no members in the cluster";
1614  } else {
1615  msg << "No address was return by the LoadBalancer. "
1616  "But the cluster contains the following members:{\n";
1617  for (auto const &m : members) {
1618  msg << '\t' << m << '\n';
1619  }
1620  msg << "}";
1621  }
1622  BOOST_THROW_EXCEPTION(exception::illegal_state(
1623  "ClientTransactionManagerServiceImpl::throw_exception", msg.str()));
1624  }
1625  BOOST_THROW_EXCEPTION(exception::illegal_state(
1626  "ClientTransactionManagerServiceImpl::throw_exception",
1627  "No active connection is found"));
1628  }
1629 
1630  ClientPartitionServiceImpl::ClientPartitionServiceImpl(ClientContext &client)
1631  : client_(client), logger_(client.get_logger()), partition_count_(0),
1632  partition_table_(boost::shared_ptr<partition_table>(new partition_table{0, -1})) {
1633  }
1634 
1635  void ClientPartitionServiceImpl::handle_event(int32_t connection_id, int32_t version,
1636  const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
1637  HZ_LOG(logger_, finest,
1638  boost::str(boost::format("Handling new partition table with partitionStateVersion: %1%") % version)
1639  );
1640 
1641  while (true) {
1642  auto current = partition_table_.load();
1643  if (!should_be_applied(connection_id, version, partitions, *current)) {
1644  return;
1645  }
1646  if (partition_table_.compare_exchange_strong(current, boost::shared_ptr<partition_table>(
1647  new partition_table{connection_id, version, convert_to_map(partitions)}))) {
1648  HZ_LOG(logger_, finest,
1649  boost::str(boost::format("Applied partition table with partitionStateVersion : %1%") % version)
1650  );
1651  return;
1652  }
1653 
1654  }
1655  }
1656 
1657  boost::uuids::uuid ClientPartitionServiceImpl::get_partition_owner(int32_t partition_id) {
1658  auto table_ptr = partition_table_.load();
1659  auto it = table_ptr->partitions.find(partition_id);
1660  if (it != table_ptr->partitions.end()) {
1661  return it->second;
1662  }
1663  return boost::uuids::nil_uuid();
1664  }
1665 
1666  int32_t ClientPartitionServiceImpl::get_partition_id(const serialization::pimpl::data &key) {
1667  int32_t pc = get_partition_count();
1668  if (pc <= 0) {
1669  return 0;
1670  }
1671  int hash = key.get_partition_hash();
1672  return util::HashUtil::hash_to_index(hash, pc);
1673  }
1674 
1675  int32_t ClientPartitionServiceImpl::get_partition_count() {
1676  return partition_count_.load();
1677  }
1678 
1679  std::shared_ptr<client::impl::Partition> ClientPartitionServiceImpl::get_partition(int partition_id) {
1680  return std::shared_ptr<client::impl::Partition>(new PartitionImpl(partition_id, client_, *this));
1681  }
1682 
1683  bool ClientPartitionServiceImpl::check_and_set_partition_count(int32_t new_partition_count) {
1684  int32_t expected = 0;
1685  if (partition_count_.compare_exchange_strong(expected, new_partition_count)) {
1686  return true;
1687  }
1688  return partition_count_.load() == new_partition_count;
1689  }
1690 
1691  bool
1692  ClientPartitionServiceImpl::should_be_applied(int32_t connection_id, int32_t version,
1693  const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions,
1694  const partition_table &current) {
1695  auto &lg = client_.get_logger();
1696  if (partitions.empty()) {
1697  if (logger_.enabled(logger::level::finest)) {
1698  log_failure(connection_id, version, current, "response is empty");
1699  }
1700  return false;
1701  }
1702  if (!current.connection_id || connection_id != current.connection_id) {
1703  HZ_LOG(lg, finest,
1704  ([&current, connection_id](){
1705  auto frmt = boost::format("Event coming from a new connection. Old connection id: %1%, "
1706  "new connection %2%");
1707 
1708  if (current.connection_id) {
1709  frmt = frmt % current.connection_id;
1710  } else {
1711  frmt = frmt % "none";
1712  }
1713 
1714  return boost::str(frmt % connection_id);
1715  })()
1716  );
1717 
1718  return true;
1719  }
1720  if (version <= current.version) {
1721  if (lg.enabled(logger::level::finest)) {
1722  log_failure(connection_id, version, current, "response state version is old");
1723  }
1724  return false;
1725  }
1726  return true;
1727  }
1728 
1729  void ClientPartitionServiceImpl::log_failure(int32_t connection_id, int32_t version,
1730  const ClientPartitionServiceImpl::partition_table &current,
1731  const std::string &cause) {
1732  HZ_LOG(logger_, finest,
1733  [&](){
1734  auto frmt = boost::format(" We will not apply the response, since %1% ."
1735  " Response is from connection with id %2%. "
1736  "Current connection id is %3%, response state version:%4%. "
1737  "Current state version: %5%");
1738  if (current.connection_id) {
1739  return boost::str(frmt % cause % connection_id % current.connection_id % version %
1740  current.version);
1741  }
1742  else {
1743  return boost::str(frmt % cause % connection_id % "nullptr" % version % current.version);
1744  }
1745  }()
1746  );
1747  }
1748 
1749  void ClientPartitionServiceImpl::reset() {
1750  partition_table_.store(nullptr);
1751  }
1752 
1753  std::unordered_map<int32_t, boost::uuids::uuid> ClientPartitionServiceImpl::convert_to_map(
1754  const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
1755  std::unordered_map<int32_t, boost::uuids::uuid> new_partitions;
1756  for (auto const &e : partitions) {
1757  for (auto pid: e.second) {
1758  new_partitions.insert({pid, e.first});
1759  }
1760  }
1761  return new_partitions;
1762  }
1763 
1764  int ClientPartitionServiceImpl::PartitionImpl::get_partition_id() const {
1765  return partition_id_;
1766  }
1767 
1768  boost::optional<member> ClientPartitionServiceImpl::PartitionImpl::get_owner() const {
1769  auto owner = partition_service_.get_partition_owner(partition_id_);
1770  if (!owner.is_nil()) {
1771  return client_.get_client_cluster_service().get_member(owner);
1772  }
1773  return boost::none;
1774  }
1775 
1776  ClientPartitionServiceImpl::PartitionImpl::PartitionImpl(int partition_id, ClientContext &client,
1777  ClientPartitionServiceImpl &partition_service)
1778  : partition_id_(partition_id), client_(client), partition_service_(partition_service) {
1779  }
1780 
1781  namespace sequence {
1782  CallIdSequenceWithoutBackpressure::CallIdSequenceWithoutBackpressure() : head_(0) {}
1783 
1784  CallIdSequenceWithoutBackpressure::~CallIdSequenceWithoutBackpressure() = default;
1785 
1786  int32_t CallIdSequenceWithoutBackpressure::get_max_concurrent_invocations() const {
1787  return INT32_MAX;
1788  }
1789 
1790  int64_t CallIdSequenceWithoutBackpressure::next() {
1791  return force_next();
1792  }
1793 
1794  int64_t CallIdSequenceWithoutBackpressure::force_next() {
1795  return ++head_;
1796  }
1797 
1798  void CallIdSequenceWithoutBackpressure::complete() {
1799  // no-op
1800  }
1801 
1802  int64_t CallIdSequenceWithoutBackpressure::get_last_call_id() {
1803  return head_;
1804  }
1805 
1806  // TODO: see if we can utilize std::hardware_destructive_interference_size
1807  AbstractCallIdSequence::AbstractCallIdSequence(int32_t max_concurrent_invocations) {
1808  std::ostringstream out;
1809  out << "maxConcurrentInvocations should be a positive number. maxConcurrentInvocations="
1810  << max_concurrent_invocations;
1811  this->max_concurrent_invocations_ = util::Preconditions::check_positive(max_concurrent_invocations,
1812  out.str());
1813 
1814  for (size_t i = 0; i < longs_.size(); ++i) {
1815  longs_[i] = 0;
1816  }
1817  }
1818 
1819  AbstractCallIdSequence::~AbstractCallIdSequence() = default;
1820 
1821  int32_t AbstractCallIdSequence::get_max_concurrent_invocations() const {
1822  return max_concurrent_invocations_;
1823  }
1824 
1825  int64_t AbstractCallIdSequence::next() {
1826  if (!has_space()) {
1827  handle_no_space_left();
1828  }
1829  return force_next();
1830  }
1831 
1832  int64_t AbstractCallIdSequence::force_next() {
1833  return ++longs_[INDEX_HEAD];
1834  }
1835 
1836  void AbstractCallIdSequence::complete() {
1837  ++longs_[INDEX_TAIL];
1838  assert(longs_[INDEX_TAIL] <= longs_[INDEX_HEAD]);
1839  }
1840 
1841  int64_t AbstractCallIdSequence::get_last_call_id() {
1842  return longs_[INDEX_HEAD];
1843  }
1844 
1845  bool AbstractCallIdSequence::has_space() {
1846  return longs_[INDEX_HEAD] - longs_[INDEX_TAIL] < max_concurrent_invocations_;
1847  }
1848 
1849  int64_t AbstractCallIdSequence::get_tail() {
1850  return longs_[INDEX_TAIL];
1851  }
1852 
1853  const std::unique_ptr<util::concurrent::IdleStrategy> CallIdSequenceWithBackpressure::IDLER(
1854  new util::concurrent::BackoffIdleStrategy(
1855  0, 0, std::chrono::duration_cast<std::chrono::nanoseconds>(
1856  std::chrono::microseconds(1000)).count(),
1857  std::chrono::duration_cast<std::chrono::nanoseconds>(
1858  std::chrono::microseconds(MAX_DELAY_MS * 1000)).count()));
1859 
1860  CallIdSequenceWithBackpressure::CallIdSequenceWithBackpressure(int32_t max_concurrent_invocations,
1861  int64_t backoff_timeout_ms)
1862  : AbstractCallIdSequence(max_concurrent_invocations) {
1863  std::ostringstream out;
1864  out << "backoffTimeoutMs should be a positive number. backoffTimeoutMs=" << backoff_timeout_ms;
1865  util::Preconditions::check_positive(backoff_timeout_ms, out.str());
1866 
1867  backoff_timeout_nanos_ = std::chrono::duration_cast<std::chrono::nanoseconds>(
1868  std::chrono::milliseconds(backoff_timeout_ms)).count();
1869  }
1870 
1871  void CallIdSequenceWithBackpressure::handle_no_space_left() {
1872  auto start = std::chrono::steady_clock::now();
1873  for (int64_t idleCount = 0;; idleCount++) {
1874  int64_t elapsedNanos = std::chrono::duration_cast<std::chrono::nanoseconds>(
1875  std::chrono::steady_clock::now() - start).count();
1876  if (elapsedNanos > backoff_timeout_nanos_) {
1877  throw (exception::exception_builder<exception::hazelcast_overload>(
1878  "CallIdSequenceWithBackpressure::handleNoSpaceLeft")
1879  << "Timed out trying to acquire another call ID."
1880  << " maxConcurrentInvocations = " << get_max_concurrent_invocations()
1881  << ", backoffTimeout = "
1882  << std::chrono::microseconds(backoff_timeout_nanos_ / 1000).count()
1883  << " msecs, elapsed:"
1884  << std::chrono::microseconds(elapsedNanos / 1000).count() << " msecs").build();
1885  }
1886  IDLER->idle(idleCount);
1887  if (has_space()) {
1888  return;
1889  }
1890 
1891  }
1892  }
1893 
1894  FailFastCallIdSequence::FailFastCallIdSequence(int32_t max_concurrent_invocations)
1895  : AbstractCallIdSequence(max_concurrent_invocations) {}
1896 
1897  void FailFastCallIdSequence::handle_no_space_left() {
1898  throw (exception::exception_builder<exception::hazelcast_overload>(
1899  "FailFastCallIdSequence::handleNoSpaceLeft")
1900  << "Maximum invocation count is reached. maxConcurrentInvocations = "
1901  << get_max_concurrent_invocations()).build();
1902 
1903  }
1904 
1905  std::unique_ptr<CallIdSequence> CallIdFactory::new_call_id_sequence(bool is_back_pressure_enabled,
1906  int32_t max_allowed_concurrent_invocations,
1907  int64_t backoff_timeout_ms) {
1908  if (!is_back_pressure_enabled) {
1909  return std::unique_ptr<CallIdSequence>(new CallIdSequenceWithoutBackpressure());
1910  } else if (backoff_timeout_ms <= 0) {
1911  return std::unique_ptr<CallIdSequence>(
1912  new FailFastCallIdSequence(max_allowed_concurrent_invocations));
1913  } else {
1914  return std::unique_ptr<CallIdSequence>(
1915  new CallIdSequenceWithBackpressure(max_allowed_concurrent_invocations,
1916  backoff_timeout_ms));
1917  }
1918  }
1919  }
1920 
1921  namespace listener {
1922  listener_service_impl::listener_service_impl(ClientContext &client_context,
1923  int32_t event_thread_count)
1924  : client_context_(client_context),
1925  serialization_service_(client_context.get_serialization_service()),
1926  logger_(client_context.get_logger()),
1927  client_connection_manager_(client_context.get_connection_manager()),
1928  number_of_event_threads_(event_thread_count),
1929  smart_(client_context.get_client_config().get_network_config().is_smart_routing()) {
1930  auto &invocationService = client_context.get_invocation_service();
1931  invocation_timeout_ = invocationService.get_invocation_timeout();
1932  invocation_retry_pause_ = invocationService.get_invocation_retry_pause();
1933  }
1934 
1935  bool listener_service_impl::registers_local_only() const {
1936  return smart_;
1937  }
1938 
1939  boost::future<boost::uuids::uuid>
1940  listener_service_impl::register_listener(
1941  std::shared_ptr<ListenerMessageCodec> listener_message_codec,
1942  std::shared_ptr<client::impl::BaseEventHandler> handler) {
1943  auto task = boost::packaged_task<boost::uuids::uuid()>([=]() {
1944  return register_listener_internal(listener_message_codec, handler);
1945  });
1946  auto f = task.get_future();
1947  boost::asio::post(registration_executor_->get_executor(), std::move(task));
1948  return f;
1949  }
1950 
1951  boost::future<bool> listener_service_impl::deregister_listener(boost::uuids::uuid registration_id) {
1952  util::Preconditions::check_not_nill(registration_id, "Nil userRegistrationId is not allowed!");
1953 
1954  boost::packaged_task<bool()> task([=]() {
1955  return deregister_listener_internal(registration_id);
1956  });
1957  auto f = task.get_future();
1958  boost::asio::post(registration_executor_->get_executor(), std::move(task));
1959  return f;
1960  }
1961 
1962  void listener_service_impl::connection_added(
1963  const std::shared_ptr<connection::Connection> connection) {
1964  boost::asio::post(registration_executor_->get_executor(), [=]() { connection_added_internal(connection); });
1965  }
1966 
1967  void listener_service_impl::connection_removed(
1968  const std::shared_ptr<connection::Connection> connection) {
1969  boost::asio::post(registration_executor_->get_executor(), [=]() { connection_removed_internal(connection); });
1970  }
1971 
1972  void
1973  listener_service_impl::remove_event_handler(int64_t call_id,
1974  const std::shared_ptr<connection::Connection> &connection) {
1975  boost::asio::post(connection->get_socket().get_executor(),
1976  std::packaged_task<void()>([=]() {
1977  connection->deregister_invocation(call_id);
1978  }));
1979  }
1980 
1981  void listener_service_impl::handle_client_message(
1982  const std::shared_ptr<ClientInvocation> invocation,
1983  const std::shared_ptr<protocol::ClientMessage> response) {
1984  try {
1985  auto partitionId = response->get_partition_id();
1986  if (partitionId == -1) {
1987  // execute on random thread on the thread pool
1988  boost::asio::post(event_executor_->get_executor(), [=]() { process_event_message(invocation, response); });
1989  return;
1990  }
1991 
1992  // process on certain thread which is same for the partition id
1993  boost::asio::post(event_strands_[partitionId % event_strands_.size()],
1994  [=]() { process_event_message(invocation, response); });
1995 
1996  } catch (const std::exception &e) {
1997  if (client_context_.get_lifecycle_service().is_running()) {
1998  HZ_LOG(logger_, warning,
1999  boost::str(boost::format("Delivery of event message to event handler failed. %1%, %2%, %3%")
2000  % e.what() % *response % *invocation)
2001  );
2002  }
2003  }
2004  }
2005 
2006  void listener_service_impl::shutdown() {
2007  ClientExecutionServiceImpl::shutdown_thread_pool(event_executor_.get());
2008  ClientExecutionServiceImpl::shutdown_thread_pool(registration_executor_.get());
2009  }
2010 
2011  void listener_service_impl::start() {
2012  event_executor_.reset(new hazelcast::util::hz_thread_pool(number_of_event_threads_));
2013  registration_executor_.reset(new hazelcast::util::hz_thread_pool(1));
2014 
2015  for (int i = 0; i < number_of_event_threads_; ++i) {
2016  event_strands_.emplace_back(event_executor_->get_executor());
2017  }
2018 
2019  client_connection_manager_.add_connection_listener(shared_from_this());
2020  }
2021 
2022  boost::uuids::uuid listener_service_impl::register_listener_internal(
2023  std::shared_ptr<ListenerMessageCodec> listener_message_codec,
2024  std::shared_ptr<client::impl::BaseEventHandler> handler) {
2025  auto user_registration_id = client_context_.random_uuid();
2026 
2027  std::shared_ptr<listener_registration> registration(new listener_registration{listener_message_codec, handler});
2028  registrations_.put(user_registration_id, registration);
2029  for (auto const &connection : client_connection_manager_.get_active_connections()) {
2030  try {
2031  invoke(registration, connection);
2032  } catch (exception::iexception &e) {
2033  if (connection->is_alive()) {
2034  deregister_listener_internal(user_registration_id);
2035  BOOST_THROW_EXCEPTION((exception::exception_builder<exception::hazelcast_>(
2036  "ClientListenerService::RegisterListenerTask::call")
2037  << "Listener can not be added " << e).build());
2038  }
2039  }
2040  }
2041  return user_registration_id;
2042  }
2043 
2044  bool
2045  listener_service_impl::deregister_listener_internal(boost::uuids::uuid user_registration_id) {
2046  auto listenerRegistration = registrations_.get(user_registration_id);
2047  if (!listenerRegistration) {
2048  return false;
2049  }
2050  bool successful = true;
2051 
2052  auto listener_registrations = listenerRegistration->registrations.entry_set();
2053  for (auto it = listener_registrations.begin();it != listener_registrations.end();) {
2054  const auto &registration = it->second;
2055  const auto& subscriber = it->first;
2056  try {
2057  const auto &listenerMessageCodec = listenerRegistration->codec;
2058  auto serverRegistrationId = registration->server_registration_id;
2059  auto request = listenerMessageCodec->encode_remove_request(serverRegistrationId);
2060  auto invocation = ClientInvocation::create(client_context_,request, "",
2061  subscriber);
2062  invocation->invoke().get();
2063 
2064  remove_event_handler(registration->call_id, subscriber);
2065 
2066  it = listener_registrations.erase(it);
2067  } catch (exception::iexception &e) {
2068  ++it;
2069  if (subscriber->is_alive()) {
2070  successful = false;
2071  std::ostringstream endpoint;
2072  if (subscriber->get_remote_address()) {
2073  endpoint << *subscriber->get_remote_address();
2074  } else {
2075  endpoint << "null";
2076  }
2077  HZ_LOG(logger_, warning,
2078  boost::str(boost::format("ClientListenerService::deregisterListenerInternal "
2079  "Deregistration of listener with ID %1% "
2080  "has failed to address %2% %3%")
2081  % user_registration_id
2082  % subscriber->get_remote_address() % e)
2083  );
2084  }
2085  }
2086  }
2087  if (successful) {
2088  registrations_.remove(user_registration_id);
2089  }
2090  return successful;
2091  }
2092 
2093  void listener_service_impl::connection_added_internal(
2094  const std::shared_ptr<connection::Connection> &connection) {
2095  for (const auto &listener_registration : registrations_.values()) {
2096  invoke_from_internal_thread(listener_registration, connection);
2097  }
2098  }
2099 
2100  void listener_service_impl::connection_removed_internal(
2101  const std::shared_ptr<connection::Connection> &connection) {
2102  for (auto &registry : registrations_.values()) {
2103  registry->registrations.remove(connection);
2104  }
2105  }
2106 
2107  void
2108  listener_service_impl::invoke_from_internal_thread(
2109  const std::shared_ptr<listener_registration> &listener_registration,
2110  const std::shared_ptr<connection::Connection> &connection) {
2111  try {
2112  invoke(listener_registration, connection);
2113  } catch (exception::iexception &e) {
2114  HZ_LOG(logger_, warning,
2115  boost::str(boost::format("Listener with pointer %1% can not be added to "
2116  "a new connection: %2%, reason: %3%")
2117  % listener_registration.get() % *connection % e)
2118  );
2119  }
2120  }
2121 
2122  void
2123  listener_service_impl::invoke(const std::shared_ptr<listener_registration> &listener_registration,
2124  const std::shared_ptr<connection::Connection> &connection) {
2125  if (listener_registration->registrations.contains_key(connection)) {
2126  return;
2127  }
2128 
2129  const auto &codec = listener_registration->codec;
2130  auto request = codec->encode_add_request(registers_local_only());
2131  const auto &handler = listener_registration->handler;
2132  handler->before_listener_register();
2133 
2134  auto invocation = ClientInvocation::create(client_context_,
2135  std::make_shared<protocol::ClientMessage>(std::move(request)), "",
2136  connection);
2137  invocation->set_event_handler(handler);
2138  auto clientMessage = invocation->invoke_urgent().get();
2139 
2140  auto serverRegistrationId = codec->decode_add_response(clientMessage);
2141  handler->on_listener_register();
2142  int64_t correlationId = invocation->get_client_message()->get_correlation_id();
2143 
2144  (*listener_registration).registrations.put(connection, std::shared_ptr<connection_registration>(
2145  new connection_registration{serverRegistrationId, correlationId}));
2146  }
2147 
2148  void listener_service_impl::process_event_message(
2149  const std::shared_ptr<ClientInvocation> invocation,
2150  const std::shared_ptr<protocol::ClientMessage> response) {
2151  auto eventHandler = invocation->get_event_handler();
2152  if (!eventHandler) {
2153  if (client_context_.get_lifecycle_service().is_running()) {
2154  HZ_LOG(logger_, warning,
2155  boost::str(boost::format("No eventHandler for invocation. "
2156  "Ignoring this invocation response. %1%")
2157  % *invocation)
2158  );
2159  }
2160 
2161  return;
2162  }
2163 
2164  try {
2165  eventHandler->handle(*response);
2166  } catch (std::exception &e) {
2167  if (client_context_.get_lifecycle_service().is_running()) {
2168  HZ_LOG(logger_, warning,
2169  boost::str(boost::format("Delivery of event message to event handler failed. %1%, %2%, %3%")
2170  % e.what() % *response % *invocation)
2171  );
2172  }
2173  }
2174  }
2175 
2176  listener_service_impl::~listener_service_impl() = default;
2177 
2178  void cluster_view_listener::start() {
2179  client_context_.get_connection_manager().add_connection_listener(shared_from_this());
2180  }
2181 
2182  void cluster_view_listener::connection_added(const std::shared_ptr<connection::Connection> connection) {
2183  try_register(connection);
2184  }
2185 
2186  void cluster_view_listener::connection_removed(const std::shared_ptr<connection::Connection> connection) {
2187  try_reregister_to_random_connection(connection->get_connection_id());
2188  }
2189 
2190  cluster_view_listener::cluster_view_listener(ClientContext &client_context) : client_context_(
2191  client_context) {}
2192 
2193  void cluster_view_listener::try_register(std::shared_ptr<connection::Connection> connection) {
2194  int32_t expected_id = -1;
2195  if (!listener_added_connection_id_.compare_exchange_strong(expected_id,
2196  connection->get_connection_id())) {
2197  // already registering/registered to another connection
2198  return;
2199  }
2200 
2201  auto invocation = ClientInvocation::create(client_context_,
2202  std::make_shared<protocol::ClientMessage>(
2203  protocol::codec::client_addclusterviewlistener_encode()), "", connection);
2204 
2205  auto handler = std::make_shared<event_handler>(connection->get_connection_id(), *this);
2206  invocation->set_event_handler(handler);
2207  handler->before_listener_register();
2208 
2209  std::weak_ptr<cluster_view_listener> weak_self = shared_from_this();
2210  auto conn_id = connection->get_connection_id();
2211 
2212  invocation->invoke_urgent().then(
2213  [weak_self, handler, conn_id](boost::future<protocol::ClientMessage> f) {
2214  auto self = weak_self.lock();
2215  if (!self)
2216  return;
2217 
2218  if (f.has_value()) {
2219  handler->on_listener_register();
2220  return;
2221  }
2222 
2223  //completes with exception, listener needs to be reregistered
2224  self->try_reregister_to_random_connection(conn_id);
2225  });
2226 
2227  }
2228 
2229  void cluster_view_listener::try_reregister_to_random_connection(int32_t old_connection_id) {
2230  if (!listener_added_connection_id_.compare_exchange_strong(old_connection_id, -1)) {
2231  //somebody else already trying to reregister
2232  return;
2233  }
2234  auto new_connection = client_context_.get_connection_manager().get_random_connection();
2235  if (new_connection) {
2236  try_register(new_connection);
2237  }
2238  }
2239 
2240  cluster_view_listener::~cluster_view_listener() = default;
2241 
2242  void
2243  cluster_view_listener::event_handler::handle_membersview(int32_t version,
2244  const std::vector<member> &member_infos) {
2245  view_listener.client_context_.get_client_cluster_service().handle_event(version, member_infos);
2246  }
2247 
2248  void
2249  cluster_view_listener::event_handler::handle_partitionsview(int32_t version,
2250  const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
2251  view_listener.client_context_.get_partition_service().handle_event(connection_id, version, partitions);
2252  }
2253 
2254  void cluster_view_listener::event_handler::before_listener_register() {
2255  view_listener.client_context_.get_client_cluster_service().clear_member_list_version();
2256  auto &lg = view_listener.client_context_.get_logger();
2257  HZ_LOG(lg, finest,
2258  boost::str(boost::format(
2259  "Register attempt of cluster_view_listener::event_handler to connection with id %1%") %
2260  connection_id));
2261  }
2262 
2263  void cluster_view_listener::event_handler::on_listener_register() {
2264  auto &lg = view_listener.client_context_.get_logger();
2265  HZ_LOG(lg, finest,
2266  boost::str(boost::format(
2267  "Registered cluster_view_listener::event_handler to connection with id %1%") %
2268  connection_id));
2269  }
2270 
2271  cluster_view_listener::event_handler::event_handler(int connectionId,
2272  cluster_view_listener &viewListener)
2273  : connection_id(connectionId), view_listener(viewListener) {}
2274  }
2275 
2276  protocol::ClientMessage
2277  ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_add_request(bool local_only) const {
2278  return protocol::codec::client_localbackuplistener_encode();
2279  }
2280 
2281  protocol::ClientMessage ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_remove_request(
2282  boost::uuids::uuid real_registration_id) const {
2283  assert(0);
2284  return protocol::ClientMessage(0);
2285  }
2286 
2287  void ClientInvocationServiceImpl::noop_backup_event_handler::handle_backup(
2288  int64_t source_invocation_correlation_id) {
2289  assert(0);
2290  }
2291 
2292  namespace discovery {
2293  remote_address_provider::remote_address_provider(
2294  std::function<std::unordered_map<address, address>()> addr_map_method,
2295  bool use_public) : refresh_address_map_(std::move(addr_map_method)),
2296  use_public_(use_public) {}
2297 
2298  std::vector<address> remote_address_provider::load_addresses() {
2299  auto address_map = refresh_address_map_();
2300  std::lock_guard<std::mutex> guard(lock_);
2301  private_to_public_ = address_map;
2302  std::vector<address> addresses;
2303  addresses.reserve(address_map.size());
2304  for (const auto &addr_pair : address_map) {
2305  addresses.push_back(addr_pair.first);
2306  }
2307  return addresses;
2308  }
2309 
2310  boost::optional<address> remote_address_provider::translate(const address &addr) {
2311  // if it is inside cloud, return private address otherwise we need to translate it.
2312  if (!use_public_) {
2313  return addr;
2314  }
2315 
2316  {
2317  std::lock_guard<std::mutex> guard(lock_);
2318  auto found = private_to_public_.find(addr);
2319  if (found != private_to_public_.end()) {
2320  return found->second;
2321  }
2322  }
2323 
2324  auto address_map = refresh_address_map_();
2325 
2326  std::lock_guard<std::mutex> guard(lock_);
2327  private_to_public_ = address_map;
2328 
2329  auto found = private_to_public_.find(addr);
2330  if (found != private_to_public_.end()) {
2331  return found->second;
2332  }
2333 
2334  return boost::none;
2335  }
2336 
2337 #ifdef HZ_BUILD_WITH_SSL
2338  cloud_discovery::cloud_discovery(config::cloud_config &config, std::string cloud_base_url,
2339  std::chrono::steady_clock::duration timeout)
2340  : cloud_config_(config), cloud_base_url_(cloud_base_url), timeout_(timeout) {}
2341 #else
2342  cloud_discovery::cloud_discovery(config::cloud_config &config, std::string cloud_base_url,
2343  std::chrono::steady_clock::duration timeout) {}
2344 #endif // HZ_BUILD_WITH_SSL
2345 
2346  std::unordered_map<address, address> cloud_discovery::get_addresses() {
2347 #ifdef HZ_BUILD_WITH_SSL
2348  try {
2349  util::SyncHttpsClient httpsConnection(cloud_base_url_, std::string(CLOUD_URL_PATH) +
2350  cloud_config_.discovery_token, timeout_);
2351  auto &conn_stream = httpsConnection.connect_and_get_response();
2352  return parse_json_response(conn_stream);
2353  } catch (std::exception &e) {
2354  std::throw_with_nested(boost::enable_current_exception(
2355  exception::illegal_state("cloud_discovery::get_addresses",
2356  e.what())));
2357  }
2358 #else
2359  util::Preconditions::check_ssl("cloud_discovery::get_addresses");
2360  return {};
2361 #endif
2362  }
2363 
2364  std::unordered_map<address, address>
2365  cloud_discovery::parse_json_response(std::istream &conn_stream) {
2366  namespace pt = boost::property_tree;
2367 
2368  pt::ptree root;
2369  pt::read_json(conn_stream, root);
2370 
2371  std::unordered_map<address, address> addresses;
2372  for (const auto &item : root) {
2373  auto private_address = item.second.get<std::string>(PRIVATE_ADDRESS_PROPERTY);
2374  auto public_address = item.second.get<std::string>(PUBLIC_ADDRESS_PROPERTY);
2375 
2376  address public_addr = create_address(public_address, -1);
2377  //if it is not explicitly given, create the private address with public addresses port
2378  auto private_addr = create_address(private_address, public_addr.get_port());
2379  addresses.emplace(std::move(private_addr), std::move(public_addr));
2380  }
2381 
2382  return addresses;
2383  }
2384 
2385  address cloud_discovery::create_address(const std::string &hostname, int default_port) {
2386  auto address_holder = util::AddressUtil::get_address_holder(hostname, default_port);
2387  auto scoped_hostname = util::AddressHelper::get_scoped_hostname(address_holder);
2388  return address(std::move(scoped_hostname), address_holder.get_port());
2389  }
2390  }
2391 
2392  ClientPartitionServiceImpl::partition_table::partition_table(int32_t connectionId, int32_t version,
2393  const std::unordered_map<int32_t, boost::uuids::uuid> &partitions)
2394  : connection_id(connectionId), version(version), partitions(partitions) {}
2395  }
2396  }
2397  }
2398 }
2399 
2400 namespace std {
2401  bool less<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
2402  const hazelcast::client::spi::DefaultObjectNamespace &lhs,
2403  const hazelcast::client::spi::DefaultObjectNamespace &rhs) const {
2404  int result = lhs.get_service_name().compare(rhs.get_service_name());
2405  if (result < 0) {
2406  return true;
2407  }
2408 
2409  if (result > 0) {
2410  return false;
2411  }
2412 
2413  return lhs.get_object_name().compare(rhs.get_object_name()) < 0;
2414  }
2415 
2416  std::size_t
2417  hash<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
2418  const hazelcast::client::spi::DefaultObjectNamespace &k) const noexcept {
2419  return std::hash<std::string>()(k.get_service_name() + k.get_object_name());
2420  }
2421 }
2422 
2423 
Hazelcast cluster interface.
Definition: cluster.h:36
const std::unordered_set< member > & get_members() const
Returns an immutable set of ordered members at the moment this MembershipListener is registered.
Definition: spi.cpp:75
cluster & get_cluster()
Returns the cluster of the event.
Definition: spi.cpp:79
lifecycle_state get_state() const
Definition: spi.cpp:91
lifecycle_event(lifecycle_state state)
Constructor.
Definition: spi.cpp:87