Hazelcast C++ Client
Hazelcast C++ Client Library
spi.cpp
1 /*
2  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
19  *
20  * Licensed under the Apache License, Version 2.0 (the "License");
21  * you may not use this file except in compliance with the License.
22  * You may obtain a copy of the License at
23  *
24  * http://www.apache.org/licenses/LICENSE-2.0
25  *
26  * Unless required by applicable law or agreed to in writing, software
27  * distributed under the License is distributed on an "AS IS" BASIS,
28  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29  * See the License for the specific language governing permissions and
30  * limitations under the License.
31  */
32 #include <utility>
33 
34 #include <boost/uuid/uuid_hash.hpp>
35 #include <boost/functional/hash.hpp>
36 #include <boost/property_tree/ptree.hpp>
37 #include <boost/property_tree/json_parser.hpp>
38 
39 #include "hazelcast/client/hazelcast_client.h"
40 #include <hazelcast/client/protocol/codec/ErrorCodec.h>
41 #include <hazelcast/client/spi/impl/ListenerMessageCodec.h>
42 #include <hazelcast/client/spi/impl/ClientClusterServiceImpl.h>
43 #include <hazelcast/client/spi/impl/listener/cluster_view_listener.h>
44 #include <hazelcast/client/spi/impl/listener/listener_service_impl.h>
45 #include <hazelcast/client/spi/impl/discovery/remote_address_provider.h>
46 #include <hazelcast/client/spi/impl/discovery/cloud_discovery.h>
47 #include <hazelcast/util/AddressUtil.h>
48 #include "hazelcast/client/member_selectors.h"
49 #include "hazelcast/client/lifecycle_event.h"
50 #include "hazelcast/client/initial_membership_event.h"
51 #include "hazelcast/client/membership_event.h"
52 #include "hazelcast/client/lifecycle_listener.h"
53 #include "hazelcast/client/spi/ProxyManager.h"
54 #include "hazelcast/client/spi/ClientProxy.h"
55 #include "hazelcast/client/spi/ClientContext.h"
56 #include "hazelcast/client/spi/impl/ClientInvocation.h"
57 #include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
58 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
59 #include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
60 #include "hazelcast/client/spi/impl/DefaultAddressProvider.h"
61 #include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithBackpressure.h"
62 #include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithoutBackpressure.h"
63 #include "hazelcast/client/spi/impl/sequence/FailFastCallIdSequence.h"
64 #include "hazelcast/client/spi/impl/sequence/CallIdFactory.h"
65 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
66 #include "hazelcast/util/AddressHelper.h"
67 #include "hazelcast/util/HashUtil.h"
68 #include "hazelcast/util/concurrent/BackoffIdleStrategy.h"
69 #ifdef HZ_BUILD_WITH_SSL
70 #include <hazelcast/util/SyncHttpsClient.h>
71 #endif //HZ_BUILD_WITH_SSL
72 
73 namespace hazelcast {
74  namespace client {
75  const std::unordered_set<member> &initial_membership_event::get_members() const {
76  return members_;
77  }
78 
80  return cluster_;
81  }
82 
83  initial_membership_event::initial_membership_event(cluster &cluster, std::unordered_set<member> members) : cluster_(
84  cluster), members_(std::move(members)) {
85  }
86 
88  : state_(state) {
89  }
90 
92  return state_;
93  }
94 
95  namespace spi {
96  ProxyManager::ProxyManager(ClientContext &context) : client_(context) {
97  }
98 
99  void ProxyManager::init() {
100  }
101 
102  void ProxyManager::destroy() {
103  std::lock_guard<std::recursive_mutex> guard(lock_);
104  for (auto &p : proxies_) {
105  try {
106  auto proxy = p.second.get();
107  p.second.get()->on_shutdown();
108  } catch (std::exception &se) {
109  auto &lg = client_.get_logger();
110  HZ_LOG(lg, finest,
111  boost::str(boost::format("Proxy was not created, "
112  "hence onShutdown can be called. Exception: %1%")
113  % se.what())
114  );
115  }
116  }
117  proxies_.clear();
118  }
119 
120  boost::future<void> ProxyManager::initialize(const std::shared_ptr<ClientProxy> &client_proxy) {
121  auto clientMessage = protocol::codec::client_createproxy_encode(client_proxy->get_name(),
122  client_proxy->get_service_name());
123  return spi::impl::ClientInvocation::create(client_, clientMessage,
124  client_proxy->get_service_name())->invoke().then(
125  boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
126  f.get();
127  client_proxy->on_initialize();
128  });
129  }
130 
131  boost::future<void> ProxyManager::destroy_proxy(ClientProxy &proxy) {
132  DefaultObjectNamespace objectNamespace(proxy.get_service_name(), proxy.get_name());
133  std::shared_ptr<ClientProxy> registeredProxy;
134  {
135  std::lock_guard<std::recursive_mutex> guard(lock_);
136  auto it = proxies_.find(objectNamespace);
137  registeredProxy = it == proxies_.end() ? nullptr : it->second.get();
138  if (it != proxies_.end()) {
139  proxies_.erase(it);
140  }
141  }
142 
143  try {
144  if (registeredProxy) {
145  try {
146  proxy.destroy_locally();
147  return proxy.destroy_remotely();
148  } catch (exception::iexception &) {
149  proxy.destroy_remotely();
150  throw;
151  }
152  }
153  if (&proxy != registeredProxy.get()) {
154  // The given proxy is stale and was already destroyed, but the caller
155  // may have allocated local resources in the context of this stale proxy
156  // instance after it was destroyed, so we have to cleanup it locally one
157  // more time to make sure there are no leaking local resources.
158  proxy.destroy_locally();
159  }
160  } catch (...) {
161  if (&proxy != registeredProxy.get()) {
162  // The given proxy is stale and was already destroyed, but the caller
163  // may have allocated local resources in the context of this stale proxy
164  // instance after it was destroyed, so we have to cleanup it locally one
165  // more time to make sure there are no leaking local resources.
166  proxy.destroy_locally();
167  }
168  throw;
169  }
170  return boost::make_ready_future();
171  }
172 
173  ClientContext::ClientContext(const client::hazelcast_client &hazelcast_client) : hazelcast_client_(
174  *hazelcast_client.client_impl_) {
175  }
176 
177  ClientContext::ClientContext(client::impl::hazelcast_client_instance_impl &hazelcast_client)
178  : hazelcast_client_(hazelcast_client) {
179  }
180 
181  serialization::pimpl::SerializationService &ClientContext::get_serialization_service() {
182  return hazelcast_client_.serialization_service_;
183  }
184 
185  impl::ClientClusterServiceImpl & ClientContext::get_client_cluster_service() {
186  return hazelcast_client_.cluster_service_;
187  }
188 
189  impl::ClientInvocationServiceImpl &ClientContext::get_invocation_service() {
190  return *hazelcast_client_.invocation_service_;
191  }
192 
193  client_config &ClientContext::get_client_config() {
194  return hazelcast_client_.client_config_;
195  }
196 
197  impl::ClientPartitionServiceImpl & ClientContext::get_partition_service() {
198  return *hazelcast_client_.partition_service_;
199  }
200 
201  lifecycle_service &ClientContext::get_lifecycle_service() {
202  return hazelcast_client_.lifecycle_service_;
203  }
204 
205  spi::impl::listener::listener_service_impl &ClientContext::get_client_listener_service() {
206  return *hazelcast_client_.listener_service_;
207  }
208 
209  connection::ClientConnectionManagerImpl &ClientContext::get_connection_manager() {
210  return *hazelcast_client_.connection_manager_;
211  }
212 
213  internal::nearcache::NearCacheManager &ClientContext::get_near_cache_manager() {
214  return *hazelcast_client_.near_cache_manager_;
215  }
216 
217  client_properties &ClientContext::get_client_properties() {
218  return hazelcast_client_.client_properties_;
219  }
220 
221  cluster &ClientContext::get_cluster() {
222  return hazelcast_client_.cluster_;
223  }
224 
225  std::shared_ptr<impl::sequence::CallIdSequence> &ClientContext::get_call_id_sequence() const {
226  return hazelcast_client_.call_id_sequence_;
227  }
228 
229  const protocol::ClientExceptionFactory &ClientContext::get_client_exception_factory() const {
230  return hazelcast_client_.get_exception_factory();
231  }
232 
233  const std::string &ClientContext::get_name() const {
234  return hazelcast_client_.get_name();
235  }
236 
237  impl::ClientExecutionServiceImpl &ClientContext::get_client_execution_service() const {
238  return *hazelcast_client_.execution_service_;
239  }
240 
241  const std::shared_ptr<client::impl::ClientLockReferenceIdGenerator> &
242  ClientContext::get_lock_reference_id_generator() {
243  return hazelcast_client_.get_lock_reference_id_generator();
244  }
245 
246  std::shared_ptr<client::impl::hazelcast_client_instance_impl>
247  ClientContext::get_hazelcast_client_implementation() {
248  return hazelcast_client_.shared_from_this();
249  }
250 
251  spi::ProxyManager &ClientContext::get_proxy_manager() {
252  return hazelcast_client_.get_proxy_manager();
253  }
254 
255  logger &ClientContext::get_logger() {
256  return *hazelcast_client_.logger_;
257  }
258 
259  client::impl::statistics::Statistics &ClientContext::get_clientstatistics() {
260  return *hazelcast_client_.statistics_;
261  }
262 
263  spi::impl::listener::cluster_view_listener &ClientContext::get_cluster_view_listener() {
264  return *hazelcast_client_.cluster_listener_;
265  }
266 
267  boost::uuids::uuid ClientContext::random_uuid() {
268  return hazelcast_client_.random_uuid();
269  }
270 
271  cp::internal::session::proxy_session_manager &ClientContext::get_proxy_session_manager() {
272  return hazelcast_client_.proxy_session_manager_;
273  }
274 
275  lifecycle_service::lifecycle_service(ClientContext &client_context,
276  const std::vector<lifecycle_listener> &listeners) :
277  client_context_(client_context), listeners_(),
278  shutdown_completed_latch_(1) {
279  for (const auto &listener: listeners) {
280  add_listener(lifecycle_listener(listener));
281  }
282  }
283 
284  bool lifecycle_service::start() {
285  bool expected = false;
286  if (!active_.compare_exchange_strong(expected, true)) {
287  return false;
288  }
289 
290  fire_lifecycle_event(lifecycle_event::STARTED);
291 
292  client_context_.get_client_execution_service().start();
293 
294  client_context_.get_client_listener_service().start();
295 
296  client_context_.get_invocation_service().start();
297 
298  client_context_.get_client_cluster_service().start();
299 
300  client_context_.get_cluster_view_listener().start();
301 
302  if (!client_context_.get_connection_manager().start()) {
303  return false;
304  }
305 
306  auto &connectionStrategyConfig = client_context_.get_client_config().get_connection_strategy_config();
307  if (!connectionStrategyConfig.is_async_start()) {
308  // The client needs to open connections to all members before any services requiring internal listeners start
309  wait_for_initial_membership_event();
310  client_context_.get_connection_manager().connect_to_all_cluster_members();
311  }
312 
313  client_context_.get_invocation_service().add_backup_listener();
314 
315  client_context_.get_clientstatistics().start();
316 
317  return true;
318  }
319 
320  void lifecycle_service::shutdown() {
321  bool expected = true;
322  if (!active_.compare_exchange_strong(expected, false)) {
323  shutdown_completed_latch_.wait();
324  return;
325  }
326  try {
327  fire_lifecycle_event(lifecycle_event::SHUTTING_DOWN);
328  client_context_.get_proxy_session_manager().shutdown();
329  client_context_.get_clientstatistics().shutdown();
330  client_context_.get_proxy_manager().destroy();
331  client_context_.get_connection_manager().shutdown();
332  client_context_.get_client_cluster_service().shutdown();
333  client_context_.get_invocation_service().shutdown();
334  client_context_.get_client_listener_service().shutdown();
335  client_context_.get_near_cache_manager().destroy_all_near_caches();
336  fire_lifecycle_event(lifecycle_event::SHUTDOWN);
337  client_context_.get_client_execution_service().shutdown();
338  client_context_.get_serialization_service().dispose();
339  shutdown_completed_latch_.count_down();
340  } catch (std::exception &e) {
341  HZ_LOG(client_context_.get_logger(), info,
342  boost::str(boost::format("An exception occured during LifecycleService shutdown. %1%")
343  % e.what())
344  );
345  shutdown_completed_latch_.count_down();
346  }
347  }
348 
349  boost::uuids::uuid lifecycle_service::add_listener(lifecycle_listener &&lifecycle_listener) {
350  std::lock_guard<std::mutex> lg(listener_lock_);
351  const auto id = uuid_generator_();
352  listeners_.emplace(id, std::move(lifecycle_listener));
353  return id;
354  }
355 
356  bool lifecycle_service::remove_listener(const boost::uuids::uuid &registration_id) {
357  std::lock_guard<std::mutex> guard(listener_lock_);
358  return listeners_.erase(registration_id) == 1;
359  }
360 
361  void lifecycle_service::fire_lifecycle_event(const lifecycle_event &lifecycle_event) {
362  std::lock_guard<std::mutex> guard(listener_lock_);
363  logger &lg = client_context_.get_logger();
364 
365  std::function<void(lifecycle_listener &)> fire_one;
366 
367  switch (lifecycle_event.get_state()) {
368  case lifecycle_event::STARTING : {
369  // convert the date string from "2016-04-20" to 20160420
370  std::string date(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_DATE));
371  util::git_date_to_hazelcast_log_date(date);
372  std::string commitId(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_ID));
373  commitId.erase(std::remove(commitId.begin(), commitId.end(), '"'), commitId.end());
374 
375  HZ_LOG(lg, info,
376  (boost::format("(%1%:%2%) LifecycleService::LifecycleEvent Client (%3%) is STARTING") %
377  date % commitId % client_context_.get_connection_manager().get_client_uuid()).str());
378  char msg[100];
379  util::hz_snprintf(msg, 100, "(%s:%s) LifecycleService::LifecycleEvent STARTING", date.c_str(),
380  commitId.c_str());
381  HZ_LOG(lg, info, msg);
382 
383  fire_one = [](lifecycle_listener &listener) {
384  listener.starting_();
385  };
386  break;
387  }
388  case lifecycle_event::STARTED : {
389  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent STARTED");
390 
391  fire_one = [](lifecycle_listener &listener) {
392  listener.started_();
393  };
394  break;
395  }
396  case lifecycle_event::SHUTTING_DOWN : {
397  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent SHUTTING_DOWN");
398 
399  fire_one = [](lifecycle_listener &listener) {
400  listener.shutting_down_();
401  };
402  break;
403  }
404  case lifecycle_event::SHUTDOWN : {
405  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent SHUTDOWN");
406 
407  fire_one = [](lifecycle_listener &listener) {
408  listener.shutdown_();
409  };
410  break;
411  }
412  case lifecycle_event::CLIENT_CONNECTED : {
413  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent CLIENT_CONNECTED");
414 
415  fire_one = [](lifecycle_listener &listener) {
416  listener.connected_();
417  };
418  break;
419  }
420  case lifecycle_event::CLIENT_DISCONNECTED : {
421  HZ_LOG(lg, info, "LifecycleService::LifecycleEvent CLIENT_DISCONNECTED");
422 
423  fire_one = [](lifecycle_listener &listener) {
424  listener.disconnected_();
425  };
426  break;
427  }
428  }
429 
430  for (auto &item: listeners_) {
431  fire_one(item.second);
432  }
433  }
434 
435  bool lifecycle_service::is_running() {
436  return active_;
437  }
438 
439  lifecycle_service::~lifecycle_service() {
440  if (active_) {
441  shutdown();
442  }
443  }
444 
445  void lifecycle_service::wait_for_initial_membership_event() const {
446  client_context_.get_client_cluster_service().wait_initial_member_list_fetched();
447  }
448 
449  DefaultObjectNamespace::DefaultObjectNamespace(const std::string &service, const std::string &object)
450  : service_name_(service), object_name_(object) {
451 
452  }
453 
454  const std::string &DefaultObjectNamespace::get_service_name() const {
455  return service_name_;
456  }
457 
458  const std::string &DefaultObjectNamespace::get_object_name() const {
459  return object_name_;
460  }
461 
462  bool DefaultObjectNamespace::operator==(const DefaultObjectNamespace &rhs) const {
463  return service_name_ == rhs.service_name_ && object_name_ == rhs.object_name_;
464  }
465 
466  ClientProxy::ClientProxy(const std::string &name, const std::string &service_name, ClientContext &context)
467  : name_(name), service_name_(service_name), context_(context) {}
468 
469  ClientProxy::~ClientProxy() = default;
470 
471  const std::string &ClientProxy::get_name() const {
472  return name_;
473  }
474 
475  const std::string &ClientProxy::get_service_name() const {
476  return service_name_;
477  }
478 
479  ClientContext &ClientProxy::get_context() {
480  return context_;
481  }
482 
483  void ClientProxy::on_destroy() {
484  }
485 
486  boost::future<void> ClientProxy::destroy() {
487  return get_context().get_proxy_manager().destroy_proxy(*this);
488  }
489 
490  void ClientProxy::destroy_locally() {
491  if (pre_destroy()) {
492  try {
493  on_destroy();
494  post_destroy();
495  } catch (exception::iexception &) {
496  post_destroy();
497  throw;
498  }
499  }
500  }
501 
502  bool ClientProxy::pre_destroy() {
503  return true;
504  }
505 
506  void ClientProxy::post_destroy() {
507  }
508 
509  void ClientProxy::on_initialize() {
510  }
511 
512  void ClientProxy::on_shutdown() {
513  }
514 
515  serialization::pimpl::SerializationService &ClientProxy::get_serialization_service() {
516  return context_.get_serialization_service();
517  }
518 
519  boost::future<void> ClientProxy::destroy_remotely() {
520  auto clientMessage = protocol::codec::client_destroyproxy_encode(
521  get_name(), get_service_name());
522  return spi::impl::ClientInvocation::create(get_context(), std::make_shared<protocol::ClientMessage>(
523  std::move(clientMessage)), get_name())->invoke().then(
524  boost::launch::sync, [](boost::future<protocol::ClientMessage> f) { f.get(); });
525  }
526 
527  boost::future<boost::uuids::uuid>
528  ClientProxy::register_listener(std::shared_ptr<impl::ListenerMessageCodec> listener_message_codec,
529  std::shared_ptr<client::impl::BaseEventHandler> handler) {
530  handler->set_logger(&get_context().get_logger());
531  return get_context().get_client_listener_service().register_listener(listener_message_codec,
532  handler);
533  }
534 
535  boost::future<bool> ClientProxy::deregister_listener(boost::uuids::uuid registration_id) {
536  return get_context().get_client_listener_service().deregister_listener(registration_id);
537  }
538 
539  namespace impl {
540  boost::uuids::uuid
541  ListenerMessageCodec::decode_add_response(protocol::ClientMessage &msg) const {
542  return msg.get_first_uuid();
543  }
544 
545  bool ListenerMessageCodec::decode_remove_response(protocol::ClientMessage &msg) const {
546  return msg.get_first_fixed_sized_field<bool>();
547  }
548 
549  ClientInvocationServiceImpl::ClientInvocationServiceImpl(ClientContext &client)
550  : client_(client), logger_(client.get_logger()),
551  invocation_timeout_(std::chrono::seconds(client.get_client_properties().get_integer(
552  client.get_client_properties().get_invocation_timeout_seconds()))),
553  invocation_retry_pause_(std::chrono::milliseconds(client.get_client_properties().get_long(
554  client.get_client_properties().get_invocation_retry_pause_millis()))),
555  smart_routing_(client.get_client_config().get_network_config().is_smart_routing()),
556  backup_acks_enabled_(smart_routing_ && client.get_client_config().backup_acks_enabled()),
557  fail_on_indeterminate_operation_state_(client.get_client_properties().get_boolean(client.get_client_properties().fail_on_indeterminate_state())),
558  backup_timeout_(std::chrono::milliseconds(client.get_client_properties().get_integer(client.get_client_properties().backup_timeout_millis()))) {}
559 
560  void ClientInvocationServiceImpl::start() {
561  }
562 
563  void ClientInvocationServiceImpl::add_backup_listener() {
564  if (this->backup_acks_enabled_) {
565  auto &listener_service = this->client_.get_client_listener_service();
566  listener_service.register_listener(std::make_shared<BackupListenerMessageCodec>(),
567  std::make_shared<noop_backup_event_handler>()).get();
568  }
569  }
570 
571  void ClientInvocationServiceImpl::shutdown() {
572  is_shutdown_.store(true);
573  }
574 
575  std::chrono::milliseconds ClientInvocationServiceImpl::get_invocation_timeout() const {
576  return invocation_timeout_;
577  }
578 
579  std::chrono::milliseconds ClientInvocationServiceImpl::get_invocation_retry_pause() const {
580  return invocation_retry_pause_;
581  }
582 
583  bool ClientInvocationServiceImpl::is_redo_operation() {
584  return client_.get_client_config().is_redo_operation();
585  }
586 
587  void
588  ClientInvocationServiceImpl::handle_client_message(const std::shared_ptr<ClientInvocation> &invocation,
589  const std::shared_ptr<protocol::ClientMessage> &response) {
590  try {
591  if (protocol::codec::ErrorCodec::EXCEPTION_MESSAGE_TYPE == response->get_message_type()) {
592  auto error_holder = protocol::codec::ErrorCodec::decode(*response);
593  invocation->notify_exception(
594  client_.get_client_exception_factory().create_exception(error_holder));
595  } else {
596  invocation->notify(response);
597  }
598  } catch (std::exception &e) {
599  HZ_LOG(logger_, severe,
600  boost::str(boost::format("Failed to process response for %1%. %2%")
601  % *invocation % e.what())
602  );
603  }
604  }
605 
606  bool ClientInvocationServiceImpl::send(const std::shared_ptr<impl::ClientInvocation>& invocation,
607  const std::shared_ptr<connection::Connection>& connection) {
608  if (is_shutdown_) {
609  BOOST_THROW_EXCEPTION(
610  exception::hazelcast_client_not_active("ClientInvocationServiceImpl::send",
611  "Client is shut down"));
612  }
613 
614  if (backup_acks_enabled_) {
615  invocation->get_client_message()->add_flag(protocol::ClientMessage::BACKUP_AWARE_FLAG);
616  }
617 
618  write_to_connection(*connection, invocation);
619  invocation->set_send_connection(connection);
620  return true;
621  }
622 
623  void ClientInvocationServiceImpl::write_to_connection(connection::Connection &connection,
624  const std::shared_ptr<ClientInvocation> &client_invocation) {
625  auto clientMessage = client_invocation->get_client_message();
626  connection.write(client_invocation);
627  }
628 
629  void ClientInvocationServiceImpl::check_invocation_allowed() {
630  client_.get_connection_manager().check_invocation_allowed();
631  }
632 
633  bool ClientInvocationServiceImpl::invoke(std::shared_ptr<ClientInvocation> invocation) {
634  auto connection = client_.get_connection_manager().get_random_connection();
635  if (!connection) {
636  HZ_LOG(logger_, finest, "No connection found to invoke");
637  return false;
638  }
639  return send(invocation, connection);
640  }
641 
642  DefaultAddressProvider::DefaultAddressProvider(config::client_network_config &network_config)
643  : network_config_(network_config) {}
644 
645  std::vector<address> DefaultAddressProvider::load_addresses() {
646  std::vector<address> addresses = network_config_.get_addresses();
647  if (addresses.empty()) {
648  addresses.emplace_back("127.0.0.1", 5701);
649  }
650 
651  // TODO Implement AddressHelper to add alternative ports for the same address
652 
653  return addresses;
654  }
655 
656  boost::optional<address> DefaultAddressProvider::translate(const address &addr) {
657  return addr;
658  }
659 
660  const boost::shared_ptr<ClientClusterServiceImpl::member_list_snapshot> ClientClusterServiceImpl::EMPTY_SNAPSHOT(
661  new ClientClusterServiceImpl::member_list_snapshot{-1});
662 
663  constexpr boost::chrono::milliseconds ClientClusterServiceImpl::INITIAL_MEMBERS_TIMEOUT;
664 
665  ClientClusterServiceImpl::ClientClusterServiceImpl(hazelcast::client::spi::ClientContext &client)
666  : client_(client), member_list_snapshot_(EMPTY_SNAPSHOT),
667  labels_(client.get_client_config().get_labels()),
668  initial_list_fetched_latch_(1) {
669  }
670 
671  boost::uuids::uuid ClientClusterServiceImpl::add_membership_listener_without_init(
672  membership_listener &&listener) {
673  std::lock_guard<std::mutex> g(listeners_lock_);
674  auto id = client_.random_uuid();
675  listeners_.emplace(id, std::move(listener));
676  return id;
677  }
678 
679  boost::optional<member> ClientClusterServiceImpl::get_member(boost::uuids::uuid uuid) const {
680  assert(!uuid.is_nil());
681  auto members_view_ptr = member_list_snapshot_.load();
682  const auto it = members_view_ptr->members.find(uuid);
683  if (it == members_view_ptr->members.end()) {
684  return boost::none;
685  }
686  return {it->second};
687  }
688 
689  std::vector<member> ClientClusterServiceImpl::get_member_list() const {
690  auto members_view_ptr = member_list_snapshot_.load();
691  std::vector<member> result;
692  result.reserve(members_view_ptr->members.size());
693  for (const auto &e : members_view_ptr->members) {
694  result.emplace_back(e.second);
695  }
696  return result;
697  }
698 
699  void ClientClusterServiceImpl::start() {
700  for (auto &listener : client_.get_client_config().get_membership_listeners()) {
701  add_membership_listener(membership_listener(listener));
702  }
703  }
704 
705  void ClientClusterServiceImpl::fire_initial_membership_event(const initial_membership_event &event) {
706  std::lock_guard<std::mutex> g(listeners_lock_);
707 
708  for (auto &item : listeners_) {
709  membership_listener &listener = item.second;
710  if (listener.init_) {
711  listener.init_(event);
712  }
713  }
714  }
715 
716  void ClientClusterServiceImpl::shutdown() {
717  initial_list_fetched_latch_.try_count_down();
718  }
719 
720  boost::uuids::uuid
721  ClientClusterServiceImpl::add_membership_listener(membership_listener &&listener) {
722  std::lock_guard<std::mutex> cluster_view_g(cluster_view_lock_);
723 
724  auto id = add_membership_listener_without_init(std::move(listener));
725 
726  std::lock_guard<std::mutex> listeners_g(listeners_lock_);
727  auto added_listener = listeners_[id];
728 
729  if (added_listener.init_) {
730  auto &cluster = client_.get_cluster();
731  auto members_ptr = member_list_snapshot_.load();
732  if (!members_ptr->members.empty()) {
733  std::unordered_set<member> members;
734  for (const auto &e : members_ptr->members) {
735  members.insert(e.second);
736  }
737  added_listener.init_(initial_membership_event(cluster, members));
738  }
739  }
740 
741  return id;
742  }
743 
744  bool ClientClusterServiceImpl::remove_membership_listener(boost::uuids::uuid registration_id) {
745  std::lock_guard<std::mutex> g(listeners_lock_);
746  return listeners_.erase(registration_id) == 1;
747  }
748 
749  std::vector<member>
750  ClientClusterServiceImpl::get_members(const member_selector &selector) const {
751  std::vector<member> result;
752  for (auto &&member : get_member_list()) {
753  if (selector.select(member)) {
754  result.emplace_back(std::move(member));
755  }
756  }
757 
758  return result;
759  }
760 
761  local_endpoint ClientClusterServiceImpl::get_local_client() const {
762  connection::ClientConnectionManagerImpl &cm = client_.get_connection_manager();
763  auto connection = cm.get_random_connection();
764  auto inetSocketAddress = connection ? connection->get_local_socket_address() : boost::none;
765  auto uuid = cm.get_client_uuid();
766  return local_endpoint(uuid, std::move(inetSocketAddress), client_.get_name(), labels_);
767  }
768 
769  void ClientClusterServiceImpl::clear_member_list_version() {
770  std::lock_guard<std::mutex> g(cluster_view_lock_);
771  auto &lg = client_.get_logger();
772  HZ_LOG(lg, finest, "Resetting the member list version ");
773  auto cluster_view_snapshot = member_list_snapshot_.load();
774  //This check is necessary so that `clear_member_list_version` when handling auth response will not
775  //intervene with client failover logic
776  if (cluster_view_snapshot != EMPTY_SNAPSHOT) {
777  member_list_snapshot_.store(boost::shared_ptr<member_list_snapshot>(
778  new member_list_snapshot{0, cluster_view_snapshot->members}));
779  }
780  }
781 
782  std::vector<membership_event> ClientClusterServiceImpl::clear_member_list_and_return_events() {
783  std::lock_guard<std::mutex> g(cluster_view_lock_);
784 
785  auto &lg = client_.get_logger();
786  HZ_LOG(lg, finest, "Resetting the member list");
787 
788  auto previous_list = member_list_snapshot_.load()->members;
789 
790  member_list_snapshot_.store(
791  boost::shared_ptr<member_list_snapshot>(new member_list_snapshot{0}));
792 
793  return detect_membership_events(previous_list,
794  std::unordered_map<boost::uuids::uuid, member, boost::hash<boost::uuids::uuid>>());
795  }
796 
797  void ClientClusterServiceImpl::clear_member_list() {
798  auto events = clear_member_list_and_return_events();
799  fire_events(std::move(events));
800  }
801 
802  void
803  ClientClusterServiceImpl::handle_event(int32_t version, const std::vector<member> &member_infos) {
804  auto &lg = client_.get_logger();
805  HZ_LOG(lg, finest,
806  boost::str(boost::format("Handling new snapshot with membership version: %1%, "
807  "membersString %2%")
808  % version
809  % members_string(create_snapshot(version, member_infos)))
810  );
811  auto cluster_view_snapshot = member_list_snapshot_.load();
812  if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
813  std::lock_guard<std::mutex> g(cluster_view_lock_);
814  cluster_view_snapshot = member_list_snapshot_.load();
815  if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
816  //this means this is the first time client connected to cluster
817  apply_initial_state(version, member_infos);
818  initial_list_fetched_latch_.count_down();
819  return;
820  }
821  }
822 
823  std::vector<membership_event> events;
824  if (version >= cluster_view_snapshot->version) {
825  std::lock_guard<std::mutex> g(cluster_view_lock_);
826  cluster_view_snapshot = member_list_snapshot_.load();
827  if (version >= cluster_view_snapshot->version) {
828  auto prev_members = cluster_view_snapshot->members;
829  auto snapshot = boost::make_shared<member_list_snapshot>(create_snapshot(version, member_infos));
830  member_list_snapshot_.store(snapshot);
831  events = detect_membership_events(prev_members, snapshot->members);
832  }
833  }
834 
835  fire_events(std::move(events));
836  }
837 
838  ClientClusterServiceImpl::member_list_snapshot
839  ClientClusterServiceImpl::create_snapshot(int32_t version, const std::vector<member> &members) {
840  member_list_snapshot result;
841  result.version = version;
842  for (auto &m : members) {
843  result.members.insert({m.get_uuid(), m});
844  }
845 
846  return result;
847  }
848 
849  std::string
850  ClientClusterServiceImpl::members_string(const ClientClusterServiceImpl::member_list_snapshot& snapshot) {
851  std::stringstream out;
852  auto const &members = snapshot.members;
853  out << std::endl << std::endl << "Members [" << members.size() << "] {";
854  for (auto const &e : members) {
855  out << std::endl << "\t" << e.second;
856  }
857  out << std::endl << "}" << std::endl;
858  return out.str();
859  }
860 
861  void
862  ClientClusterServiceImpl::apply_initial_state(int32_t version, const std::vector<member> &member_infos) {
863  auto snapshot = boost::make_shared<member_list_snapshot>(create_snapshot(version, member_infos));
864  member_list_snapshot_.store(snapshot);
865  HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
866  std::unordered_set<member> members;
867  for(auto const &e : snapshot->members) {
868  members.insert(e.second);
869  }
870  std::lock_guard<std::mutex> g(listeners_lock_);
871  for (auto &item : listeners_) {
872  membership_listener &listener = item.second;
873  if (listener.init_) {
874  listener.init_(initial_membership_event(client_.get_cluster(), members));
875  }
876  }
877  }
878 
879  std::vector<membership_event> ClientClusterServiceImpl::detect_membership_events(
880  std::unordered_map<boost::uuids::uuid, member, boost::hash<boost::uuids::uuid>> previous_members,
881  const std::unordered_map<boost::uuids::uuid, member, boost::hash<boost::uuids::uuid>>& current_members) {
882  std::vector<member> new_members;
883 
884  for (auto const & e : current_members) {
885  if (!previous_members.erase(e.first)) {
886  new_members.emplace_back(e.second);
887  }
888  }
889 
890  std::vector<membership_event> events;
891 
892  // removal events should be added before added events
893  for (auto const &e : previous_members) {
894  events.emplace_back(client_.get_cluster(), e.second, membership_event::membership_event_type::MEMBER_LEFT, current_members);
895  auto connection = client_.get_connection_manager().get_connection(e.second.get_uuid());
896  if (connection) {
897  connection->close("", std::make_exception_ptr(exception::target_disconnected(
898  "ClientClusterServiceImpl::detect_membership_events", (boost::format(
899  "The client has closed the connection to this member, after receiving a member left event from the cluster. %1%") %
900  *connection).str())));
901  }
902  }
903  for (auto const &member : new_members) {
904  events.emplace_back(client_.get_cluster(), member, membership_event::membership_event_type::MEMBER_JOINED, current_members);
905  }
906 
907  if (!events.empty()) {
908  auto snapshot = member_list_snapshot_.load();
909  if (!snapshot->members.empty()) {
910  HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
911  }
912  }
913  return events;
914  }
915 
916  void ClientClusterServiceImpl::fire_events(std::vector<membership_event> events) {
917  std::lock_guard<std::mutex> g(listeners_lock_);
918 
919  for (auto const &event : events) {
920  for (auto &item : listeners_) {
921  membership_listener &listener = item.second;
922  if (event.get_event_type() == membership_event::membership_event_type::MEMBER_JOINED) {
923  listener.joined_(event);
924  } else {
925  listener.left_(event);
926  }
927  }
928  }
929  }
930 
931  void ClientClusterServiceImpl::wait_initial_member_list_fetched() const {
932  // safe to const cast here since latch operations are already thread safe ops.
933  if ((const_cast<boost::latch&>(initial_list_fetched_latch_)).wait_for(INITIAL_MEMBERS_TIMEOUT) == boost::cv_status::timeout) {
934  BOOST_THROW_EXCEPTION(exception::illegal_state(
935  "ClientClusterServiceImpl::wait_initial_member_list_fetched",
936  "Could not get initial member list from cluster!"));
937  }
938  }
939 
940  bool
941  ClientInvocationServiceImpl::invoke_on_connection(const std::shared_ptr<ClientInvocation> &invocation,
942  const std::shared_ptr<connection::Connection> &connection) {
943  return send(invocation, connection);
944  }
945 
946  bool ClientInvocationServiceImpl::invoke_on_partition_owner(
947  const std::shared_ptr<ClientInvocation> &invocation, int partition_id) {
948  auto partition_owner = client_.get_partition_service().get_partition_owner(partition_id);
949  if (partition_owner.is_nil()) {
950  HZ_LOG(logger_, finest,
951  boost::str(boost::format("Partition owner is not assigned yet for partition %1%")
952  % partition_id)
953  );
954  return false;
955  }
956  return invoke_on_target(invocation, partition_owner);
957  }
958 
959  bool ClientInvocationServiceImpl::invoke_on_target(const std::shared_ptr<ClientInvocation> &invocation,
960  boost::uuids::uuid uuid) {
961  assert (!uuid.is_nil());
962  auto connection = client_.get_connection_manager().get_connection(uuid);
963  if (!connection) {
964  HZ_LOG(logger_, finest,
965  boost::str(boost::format("Client is not connected to target : %1%")
966  % uuid)
967  );
968  return false;
969  }
970  return send(invocation, connection);
971  }
972 
973  bool ClientInvocationServiceImpl::is_smart_routing() const {
974  return smart_routing_;
975  }
976 
977  const std::chrono::milliseconds &ClientInvocationServiceImpl::get_backup_timeout() const {
978  return backup_timeout_;
979  }
980 
981  bool ClientInvocationServiceImpl::fail_on_indeterminate_state() const {
982  return fail_on_indeterminate_operation_state_;
983  }
984 
985  ClientExecutionServiceImpl::ClientExecutionServiceImpl(const std::string &name,
986  const client_properties &properties,
987  int32_t pool_size,
988  spi::lifecycle_service &service)
989  : lifecycle_service_(service), client_properties_(properties) {}
990 
991  void ClientExecutionServiceImpl::start() {
992  int internalPoolSize = client_properties_.get_integer(
993  client_properties_.get_internal_executor_pool_size());
994  if (internalPoolSize <= 0) {
995  internalPoolSize = util::IOUtil::to_value<int>(
996  client_properties::INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT);
997  }
998 
999  internal_executor_.reset(new hazelcast::util::hz_thread_pool(internalPoolSize));
1000 
1001  user_executor_.reset(new hazelcast::util::hz_thread_pool());
1002  }
1003 
1004  void ClientExecutionServiceImpl::shutdown() {
1005  shutdown_thread_pool(internal_executor_.get());
1006  shutdown_thread_pool(user_executor_.get());
1007  }
1008 
1009  util::hz_thread_pool &ClientExecutionServiceImpl::get_user_executor() {
1010  return *user_executor_;
1011  }
1012 
1013  void ClientExecutionServiceImpl::shutdown_thread_pool(hazelcast::util::hz_thread_pool *pool) {
1014  if (!pool) {
1015  return;
1016  }
1017  pool->close();
1018  }
1019 
1020  constexpr int ClientInvocation::MAX_FAST_INVOCATION_COUNT;
1021  constexpr int ClientInvocation::UNASSIGNED_PARTITION;
1022 
1023  ClientInvocation::ClientInvocation(spi::ClientContext &client_context,
1024  std::shared_ptr<protocol::ClientMessage> &&message,
1025  const std::string &name,
1026  int partition,
1027  const std::shared_ptr<connection::Connection> &conn,
1028  boost::uuids::uuid uuid) :
1029  logger_(client_context.get_logger()),
1030  lifecycle_service_(client_context.get_lifecycle_service()),
1031  invocation_service_(client_context.get_invocation_service()),
1032  execution_service_(client_context.get_client_execution_service().shared_from_this()),
1033  call_id_sequence_(client_context.get_call_id_sequence()),
1034  uuid_(uuid),
1035  partition_id_(partition),
1036  start_time_(std::chrono::steady_clock::now()),
1037  retry_pause_(invocation_service_.get_invocation_retry_pause()),
1038  object_name_(name),
1039  connection_(conn), bound_to_single_connection_(conn != nullptr),
1040  invoke_count_(0), urgent_(false), smart_routing_(invocation_service_.is_smart_routing()) {
1041  message->set_partition_id(partition_id_);
1042  client_message_ = boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(message);
1043  set_send_connection(nullptr);
1044  }
1045 
1046  ClientInvocation::~ClientInvocation() = default;
1047 
1048  boost::future<protocol::ClientMessage> ClientInvocation::invoke() {
1049  assert (client_message_.load());
1050  // for back pressure
1051  call_id_sequence_->next();
1052  invoke_on_selection();
1053  if (!lifecycle_service_.is_running()) {
1054  return invocation_promise_.get_future().then([](boost::future<protocol::ClientMessage> f) {
1055  return f.get();
1056  });
1057  }
1058  auto id_seq = call_id_sequence_;
1059  return invocation_promise_.get_future().then(execution_service_->get_user_executor(),
1060  [=](boost::future<protocol::ClientMessage> f) {
1061  id_seq->complete();
1062  return f.get();
1063  });
1064  }
1065 
1066  boost::future<protocol::ClientMessage> ClientInvocation::invoke_urgent() {
1067  assert(client_message_.load());
1068  urgent_ = true;
1069  // for back pressure
1070  call_id_sequence_->force_next();
1071  invoke_on_selection();
1072  if (!lifecycle_service_.is_running()) {
1073  return invocation_promise_.get_future().then([](boost::future<protocol::ClientMessage> f) {
1074  return f.get();
1075  });
1076  }
1077  auto id_seq = call_id_sequence_;
1078  return invocation_promise_.get_future().then(execution_service_->get_user_executor(),
1079  [=](boost::future<protocol::ClientMessage> f) {
1080  id_seq->complete();
1081  return f.get();
1082  });
1083  }
1084 
1085  void ClientInvocation::invoke_on_selection() {
1086  try {
1087  invoke_count_++;
1088  if (!urgent_) {
1089  invocation_service_.check_invocation_allowed();
1090  }
1091 
1092  if (is_bind_to_single_connection()) {
1093  bool invoked = false;
1094  auto conn = connection_.lock();
1095  if (conn) {
1096  invoked = invocation_service_.invoke_on_connection(shared_from_this(), conn);
1097  }
1098  if (!invoked) {
1099  std::string message;
1100  if (conn) {
1101  message = (boost::format("Could not invoke on connection %1%") % *conn).str();
1102  } else {
1103  message = "Could not invoke. Bound to a connection that is deleted already.";
1104  }
1105  notify_exception(std::make_exception_ptr(
1106  exception::io("ClientInvocation::invoke_on_selection", message)));
1107  }
1108  return;
1109  }
1110 
1111  bool invoked = false;
1112  if (smart_routing_) {
1113  if (partition_id_ != -1) {
1114  invoked = invocation_service_.invoke_on_partition_owner(shared_from_this(), partition_id_);
1115  } else if (!uuid_.is_nil()) {
1116  invoked = invocation_service_.invoke_on_target(shared_from_this(), uuid_);
1117  } else {
1118  invoked = invocation_service_.invoke(shared_from_this());
1119  }
1120  if (!invoked) {
1121  invoked = invocation_service_.invoke(shared_from_this());
1122  }
1123  } else {
1124  invoked = invocation_service_.invoke(shared_from_this());
1125  }
1126  if (!invoked) {
1127  notify_exception(std::make_exception_ptr(exception::io("No connection found to invoke")));
1128  }
1129  } catch (exception::iexception &) {
1130  notify_exception(std::current_exception());
1131  } catch (std::exception &) {
1132  assert(false);
1133  }
1134  }
1135 
1136  bool ClientInvocation::is_bind_to_single_connection() const {
1137  return bound_to_single_connection_;
1138  }
1139 
1140  void ClientInvocation::run() {
1141  retry();
1142  }
1143 
1144  void ClientInvocation::retry() {
1145  // retry modifies the client message and should not reuse the client message.
1146  // It could be the case that it is in write queue of the connection.
1147  client_message_ = boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(copy_message());
1148 
1149  try {
1150  invoke_on_selection();
1151  } catch (exception::iexception &e) {
1152  set_exception(e, boost::current_exception());
1153  } catch (std::exception &) {
1154  assert(false);
1155  }
1156  }
1157 
1158  void ClientInvocation::set_exception(const std::exception &e, boost::exception_ptr exception_ptr) {
1159  invoked_or_exception_set_.store(true);
1160  try {
1161  auto send_conn = send_connection_.load();
1162  if (send_conn) {
1163  auto connection = send_conn->lock();
1164  if (connection) {
1165  auto call_id = client_message_.load()->get()->get_correlation_id();
1166  boost::asio::post(connection->get_socket().get_executor(), [=]() {
1167  connection->deregister_invocation(call_id);
1168  });
1169  }
1170  }
1171  invocation_promise_.set_exception(std::move(exception_ptr));
1172  } catch (boost::promise_already_satisfied &se) {
1173  if (!event_handler_) {
1174  HZ_LOG(logger_, finest,
1175  boost::str(boost::format("Failed to set the exception for invocation. "
1176  "%1%, %2% Exception to be set: %3%")
1177  % se.what() % *this % e.what())
1178  );
1179  }
1180  }
1181  }
1182 
1183  void ClientInvocation::notify_exception(std::exception_ptr exception) {
1184  erase_invocation();
1185  try {
1186  std::rethrow_exception(exception);
1187  } catch (exception::iexception &iex) {
1188  log_exception(iex);
1189 
1190  if (!lifecycle_service_.is_running()) {
1191  try {
1192  std::throw_with_nested(boost::enable_current_exception(
1193  exception::hazelcast_client_not_active(iex.get_source(),
1194  "Client is shutting down")));
1195  } catch (exception::iexception &e) {
1196  set_exception(e, boost::current_exception());
1197  }
1198  return;
1199  }
1200 
1201  if (!should_retry(iex)) {
1202  set_exception(iex, boost::current_exception());
1203  return;
1204  }
1205 
1206  auto timePassed = std::chrono::steady_clock::now() - start_time_;
1207  if (timePassed > invocation_service_.get_invocation_timeout()) {
1208  HZ_LOG(logger_, finest,
1209  boost::str(boost::format("Exception will not be retried because "
1210  "invocation timed out. %1%") % iex.what())
1211  );
1212 
1213  auto now = std::chrono::steady_clock::now();
1214 
1215  auto timeoutException = (exception::exception_builder<exception::operation_timeout>(
1216  "ClientInvocation::newoperation_timeout_exception") << *this
1217  << " timed out because exception occurred after client invocation timeout "
1218  << std::chrono::duration_cast<std::chrono::milliseconds>(invocation_service_.get_invocation_timeout()).count()
1219  << "msecs. Last exception:" << iex
1220  << " Current time :" << util::StringUtil::time_to_string(now) << ". "
1221  << "Start time: " << util::StringUtil::time_to_string(start_time_)
1222  << ". Total elapsed time: " <<
1223  std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time_).count()
1224  << " ms. ").build();
1225  try {
1226  BOOST_THROW_EXCEPTION(timeoutException);
1227  } catch (...) {
1228  set_exception(timeoutException, boost::current_exception());
1229  }
1230 
1231  return;
1232  }
1233 
1234  try {
1235  execute();
1236  } catch (std::exception &e) {
1237  set_exception(e, boost::current_exception());
1238  }
1239  } catch (...) {
1240  assert(false);
1241  }
1242  }
1243 
1244  void ClientInvocation::erase_invocation() const {
1245  if (!this->event_handler_) {
1246  auto sent_connection = get_send_connection();
1247  if (sent_connection) {
1248  auto this_invocation = shared_from_this();
1249  boost::asio::post(sent_connection->get_socket().get_executor(), [=] () {
1250  sent_connection->invocations.erase(this_invocation->get_client_message()->get_correlation_id());
1251  });
1252  }
1253  }
1254  }
1255 
1256  bool ClientInvocation::should_retry(exception::iexception &exception) {
1257  auto errorCode = exception.get_error_code();
1258  if (is_bind_to_single_connection() && (errorCode == protocol::IO || errorCode == protocol::TARGET_DISCONNECTED)) {
1259  return false;
1260  }
1261 
1262  if (!uuid_.is_nil() && errorCode == protocol::TARGET_NOT_MEMBER) {
1263  //when invocation send to a specific member
1264  //if target is no longer a member, we should not retry
1265  //note that this exception could come from the server
1266  return false;
1267  }
1268 
1269  if (errorCode == protocol::IO || errorCode == protocol::HAZELCAST_INSTANCE_NOT_ACTIVE || exception.is_retryable()) {
1270  return true;
1271  }
1272  if (errorCode == protocol::TARGET_DISCONNECTED) {
1273  return client_message_.load()->get()->is_retryable() || invocation_service_.is_redo_operation();
1274  }
1275  return false;
1276  }
1277 
1278  std::ostream &operator<<(std::ostream &os, const ClientInvocation &invocation) {
1279  std::ostringstream target;
1280  if (invocation.is_bind_to_single_connection()) {
1281  auto conn = invocation.connection_.lock();
1282  if (conn) {
1283  target << "connection " << *conn;
1284  }
1285  } else if (invocation.partition_id_ != -1) {
1286  target << "partition " << invocation.partition_id_;
1287  } else if (!invocation.uuid_.is_nil()) {
1288  target << "uuid " << boost::to_string(invocation.uuid_);
1289  } else {
1290  target << "random";
1291  }
1292  os << "ClientInvocation{" << "requestMessage = " << *invocation.client_message_.load()->get()
1293  << ", objectName = "
1294  << invocation.object_name_ << ", target = " << target.str() << ", sendConnection = ";
1295  auto sendConnection = invocation.get_send_connection();
1296  if (sendConnection) {
1297  os << *sendConnection;
1298  } else {
1299  os << "nullptr";
1300  }
1301  os << ", backup_acks_expected_ = " << static_cast<int>(invocation.backup_acks_expected_)
1302  << ", backup_acks_received = " << invocation.backup_acks_received_;
1303 
1304  if (invocation.pending_response_) {
1305  os << ", pending_response: " << *invocation.pending_response_;
1306  }
1307 
1308  os << '}';
1309 
1310  return os;
1311  }
1312 
1313  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1314  std::shared_ptr<protocol::ClientMessage> &&client_message,
1315  const std::string &object_name,
1316  int partition_id) {
1317  return std::shared_ptr<ClientInvocation>(
1318  new ClientInvocation(client_context, std::move(client_message), object_name, partition_id));
1319  }
1320 
1321  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1322  std::shared_ptr<protocol::ClientMessage> &&client_message,
1323  const std::string &object_name,
1324  const std::shared_ptr<connection::Connection> &connection) {
1325  return std::shared_ptr<ClientInvocation>(
1326  new ClientInvocation(client_context, std::move(client_message), object_name, UNASSIGNED_PARTITION,
1327  connection));
1328  }
1329 
1330 
1331  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1332  std::shared_ptr<protocol::ClientMessage> &&client_message,
1333  const std::string &object_name,
1334  boost::uuids::uuid uuid) {
1335  return std::shared_ptr<ClientInvocation>(
1336  new ClientInvocation(client_context, std::move(client_message), object_name, UNASSIGNED_PARTITION,
1337  nullptr, uuid));
1338  }
1339 
1340  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1341  protocol::ClientMessage &client_message,
1342  const std::string &object_name,
1343  int partition_id) {
1344  return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1345  object_name, partition_id);
1346  }
1347 
1348  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1349  protocol::ClientMessage &client_message,
1350  const std::string &object_name,
1351  const std::shared_ptr<connection::Connection> &connection) {
1352  return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1353  object_name, connection);
1354  }
1355 
1356  std::shared_ptr<ClientInvocation> ClientInvocation::create(spi::ClientContext &client_context,
1357  protocol::ClientMessage &client_message,
1358  const std::string &object_name,
1359  boost::uuids::uuid uuid) {
1360  return create(client_context, std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1361  object_name, uuid);
1362  }
1363 
1364  std::shared_ptr<connection::Connection> ClientInvocation::get_send_connection() const {
1365  return send_connection_.load()->lock();
1366  }
1367 
1368  void ClientInvocation::wait_invoked() const {
1369  //it could be either invoked or cancelled before invoked
1370  while (!invoked_or_exception_set_) {
1371  std::this_thread::sleep_for(retry_pause_);
1372  }
1373  }
1374 
1375  void
1376  ClientInvocation::set_send_connection(const std::shared_ptr<connection::Connection> &conn) {
1377  send_connection_.store(boost::make_shared<std::weak_ptr<connection::Connection>>(conn));
1378  invoked_or_exception_set_.store(true);
1379  }
1380 
1381  void ClientInvocation::notify(const std::shared_ptr<protocol::ClientMessage> &msg) {
1382  if (!msg) {
1383  BOOST_THROW_EXCEPTION(exception::illegal_argument("response can't be null"));
1384  }
1385 
1386  int8_t expected_backups = msg->get_number_of_backups();
1387 
1388  // if a regular response comes and there are backups, we need to wait for the backups
1389  // when the backups complete, the response will be send by the last backup or backup-timeout-handle mechanism kicks on
1390  if (expected_backups > backup_acks_received_) {
1391  // so the invocation has backups and since not all backups have completed, we need to wait
1392  // (it could be that backups arrive earlier than the response)
1393 
1394  pending_response_received_time_ = std::chrono::steady_clock::now();
1395 
1396  backup_acks_expected_ = expected_backups;
1397 
1398  // it is very important that the response is set after the backupsAcksExpected is set, else the system
1399  // can assume the invocation is complete because there is a response and no backups need to respond
1400  pending_response_ = msg;
1401 
1402  // we are done since not all backups have completed. Therefore we should not notify the future
1403  return;
1404  }
1405 
1406  // we are going to notify the future that a response is available; this can happen when:
1407  // - we had a regular operation (so no backups we need to wait for) that completed
1408  // - we had a backup-aware operation that has completed, but also all its backups have completed
1409  complete(msg);
1410  }
1411 
1412  void ClientInvocation::complete(const std::shared_ptr<protocol::ClientMessage> &msg) {
1413  try {
1414  // TODO: move msg content here?
1415  this->invocation_promise_.set_value(*msg);
1416  } catch (std::exception &e) {
1417  HZ_LOG(logger_, warning,
1418  boost::str(boost::format("Failed to set the response for invocation. "
1419  "Dropping the response. %1%, %2% Response: %3%")
1420  % e.what() % *this % *msg)
1421  );
1422  }
1423  this->erase_invocation();
1424  }
1425 
1426  std::shared_ptr<protocol::ClientMessage> ClientInvocation::get_client_message() const {
1427  return *client_message_.load();
1428  }
1429 
1430  const std::shared_ptr<EventHandler<protocol::ClientMessage> > &
1431  ClientInvocation::get_event_handler() const {
1432  return event_handler_;
1433  }
1434 
1435  void ClientInvocation::set_event_handler(
1436  const std::shared_ptr<EventHandler<protocol::ClientMessage> > &handler) {
1437  ClientInvocation::event_handler_ = handler;
1438  }
1439 
1440  void ClientInvocation::execute() {
1441  auto this_invocation = shared_from_this();
1442  auto command = [=]() {
1443  this_invocation->run();
1444  };
1445 
1446  // first we force a new invocation slot because we are going to return our old invocation slot immediately after
1447  // It is important that we first 'force' taking a new slot; otherwise it could be that a sneaky invocation gets
1448  // through that takes our slot!
1449  int64_t callId = call_id_sequence_->force_next();
1450  client_message_.load()->get()->set_correlation_id(callId);
1451 
1452  //we release the old slot
1453  call_id_sequence_->complete();
1454 
1455  if (invoke_count_ < MAX_FAST_INVOCATION_COUNT) {
1456  // fast retry for the first few invocations
1457  execution_service_->execute(command);
1458  } else {
1459  // progressive retry delay
1460  int64_t delayMillis = util::min<int64_t>(static_cast<int64_t>(1) << (invoke_count_ - MAX_FAST_INVOCATION_COUNT),
1461  std::chrono::duration_cast<std::chrono::milliseconds>(retry_pause_).count());
1462  retry_timer_ = execution_service_->schedule(command, std::chrono::milliseconds(delayMillis));
1463  }
1464  }
1465 
1466  const std::string ClientInvocation::get_name() const {
1467  return "ClientInvocation";
1468  }
1469 
1470  std::shared_ptr<protocol::ClientMessage> ClientInvocation::copy_message() {
1471  return std::make_shared<protocol::ClientMessage>(**client_message_.load());
1472  }
1473 
1474  boost::promise<protocol::ClientMessage> &ClientInvocation::get_promise() {
1475  return invocation_promise_;
1476  }
1477 
1478  void ClientInvocation::log_exception(exception::iexception &e) {
1479  HZ_LOG(logger_, finest,
1480  boost::str(boost::format("Invocation got an exception %1%, invoke count : %2%, "
1481  "exception : %3%")
1482  % *this % invoke_count_.load() % e)
1483  );
1484  }
1485 
1486  void ClientInvocation::notify_backup() {
1487  ++backup_acks_received_;
1488 
1489  if (!pending_response_) {
1490  // no pendingResponse has been set, so we are done since the invocation on the primary needs to complete first
1491  return;
1492  }
1493 
1494  // if a pendingResponse is set, then the backupsAcksExpected has been set (so we can now safely read backupsAcksExpected)
1495  if (backup_acks_expected_ != backup_acks_received_) {
1496  // we managed to complete a backup, but we were not the one completing the last backup, so we are done
1497  return;
1498  }
1499 
1500  // we are the lucky one since we just managed to complete the last backup for this invocation and since the
1501  // pendingResponse is set, we can set it on the future
1502  complete_with_pending_response();
1503  }
1504 
1505  void
1506  ClientInvocation::detect_and_handle_backup_timeout(const std::chrono::milliseconds &backup_timeout) {
1507  // if the backups have completed, we are done; this also filters out all non backup-aware operations
1508  // since the backupsAcksExpected will always be equal to the backupsAcksReceived
1509  if (backup_acks_expected_ == backup_acks_received_) {
1510  return;
1511  }
1512 
1513  // if no response has yet been received, we we are done; we are only going to re-invoke an operation
1514  // if the response of the primary has been received, but the backups have not replied
1515  if (!pending_response_) {
1516  return;
1517  }
1518 
1519  // if this has not yet expired (so has not been in the system for a too long period) we ignore it
1520  if (pending_response_received_time_ + backup_timeout >= std::chrono::steady_clock::now()) {
1521  return;
1522  }
1523 
1524  if (invocation_service_.fail_on_indeterminate_state()) {
1525  auto exception = boost::enable_current_exception((exception::exception_builder<exception::indeterminate_operation_state>(
1526  "ClientInvocation::detect_and_handle_backup_timeout") << *this
1527  << " failed because backup acks missed.").build());
1528  notify_exception(std::make_exception_ptr(exception));
1529  return;
1530  }
1531 
1532  // the backups have not yet completed, but we are going to release the future anyway if a pendingResponse has been set
1533  complete_with_pending_response();
1534  }
1535 
1536  void ClientInvocation::complete_with_pending_response() {
1537  complete(pending_response_);
1538  }
1539 
1540  ClientContext &impl::ClientTransactionManagerServiceImpl::get_client() const {
1541  return client_;
1542  }
1543 
1544  ClientTransactionManagerServiceImpl::ClientTransactionManagerServiceImpl(ClientContext &client)
1545  : client_(client) {}
1546 
1547  std::shared_ptr<connection::Connection> ClientTransactionManagerServiceImpl::connect() {
1548  auto &invocationService = client_.get_invocation_service();
1549  auto startTime = std::chrono::steady_clock::now();
1550  auto invocationTimeout = invocationService.get_invocation_timeout();
1551  client_config &clientConfig = client_.get_client_config();
1552  bool smartRouting = clientConfig.get_network_config().is_smart_routing();
1553 
1554  while (client_.get_lifecycle_service().is_running()) {
1555  try {
1556  auto connection = client_.get_connection_manager().get_random_connection();
1557  if (!connection) {
1558  throw_exception(smartRouting);
1559  }
1560  return connection;
1561  } catch (exception::hazelcast_client_offline &) {
1562  throw;
1563  } catch (exception::iexception &) {
1564  if (std::chrono::steady_clock::now() - startTime > invocationTimeout) {
1565  std::rethrow_exception(
1566  new_operation_timeout_exception(std::current_exception(), invocationTimeout,
1567  startTime));
1568  }
1569  }
1570  std::this_thread::sleep_for(invocationService.get_invocation_retry_pause());
1571  }
1572  BOOST_THROW_EXCEPTION(
1573  exception::hazelcast_client_not_active("ClientTransactionManagerServiceImpl::connect",
1574  "Client is shutdown"));
1575  }
1576 
1577  std::exception_ptr
1578  ClientTransactionManagerServiceImpl::new_operation_timeout_exception(std::exception_ptr cause,
1579  std::chrono::milliseconds invocation_timeout,
1580  std::chrono::steady_clock::time_point start_time) {
1581  std::ostringstream sb;
1582  auto now = std::chrono::steady_clock::now();
1583  sb
1584  << "Creating transaction context timed out because exception occurred after client invocation timeout "
1585  << std::chrono::duration_cast<std::chrono::milliseconds>(invocation_timeout).count() << " ms. " << "Current time: "
1586  << util::StringUtil::time_to_string(std::chrono::steady_clock::now()) << ". " << "Start time: "
1587  << util::StringUtil::time_to_string(start_time) << ". Total elapsed time: "
1588  << std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time).count() << " ms. ";
1589  try {
1590  std::rethrow_exception(cause);
1591  } catch (...) {
1592  try {
1593  std::throw_with_nested(boost::enable_current_exception(exception::operation_timeout(
1594  "ClientTransactionManagerServiceImpl::newoperation_timeout_exception", sb.str())));
1595  } catch (...) {
1596  return std::current_exception();
1597  }
1598  }
1599  return nullptr;
1600  }
1601 
1602  void ClientTransactionManagerServiceImpl::throw_exception(bool smart_routing) {
1603  auto &client_config = client_.get_client_config();
1604  auto &connection_strategy_Config = client_config.get_connection_strategy_config();
1605  auto reconnect_mode = connection_strategy_Config.get_reconnect_mode();
1606  if (reconnect_mode == config::client_connection_strategy_config::reconnect_mode::ASYNC) {
1607  BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
1608  "ClientTransactionManagerServiceImpl::throw_exception", ""));
1609  }
1610  if (smart_routing) {
1611  auto members = client_.get_cluster().get_members();
1612  std::ostringstream msg;
1613  if (members.empty()) {
1614  msg << "No address was return by the LoadBalancer since there are no members in the cluster";
1615  } else {
1616  msg << "No address was return by the LoadBalancer. "
1617  "But the cluster contains the following members:{\n";
1618  for (auto const &m : members) {
1619  msg << '\t' << m << '\n';
1620  }
1621  msg << "}";
1622  }
1623  BOOST_THROW_EXCEPTION(exception::illegal_state(
1624  "ClientTransactionManagerServiceImpl::throw_exception", msg.str()));
1625  }
1626  BOOST_THROW_EXCEPTION(exception::illegal_state(
1627  "ClientTransactionManagerServiceImpl::throw_exception",
1628  "No active connection is found"));
1629  }
1630 
1631  ClientPartitionServiceImpl::ClientPartitionServiceImpl(ClientContext &client)
1632  : client_(client), logger_(client.get_logger()), partition_count_(0),
1633  partition_table_(boost::shared_ptr<partition_table>(new partition_table{0, -1})) {
1634  }
1635 
1636  void ClientPartitionServiceImpl::handle_event(int32_t connection_id, int32_t version,
1637  const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
1638  HZ_LOG(logger_, finest,
1639  boost::str(boost::format("Handling new partition table with partitionStateVersion: %1%") % version)
1640  );
1641 
1642  while (true) {
1643  auto current = partition_table_.load();
1644  if (!should_be_applied(connection_id, version, partitions, *current)) {
1645  return;
1646  }
1647  if (partition_table_.compare_exchange_strong(current, boost::shared_ptr<partition_table>(
1648  new partition_table{connection_id, version, convert_to_map(partitions)}))) {
1649  HZ_LOG(logger_, finest,
1650  boost::str(boost::format("Applied partition table with partitionStateVersion : %1%") % version)
1651  );
1652  return;
1653  }
1654 
1655  }
1656  }
1657 
1658  boost::uuids::uuid ClientPartitionServiceImpl::get_partition_owner(int32_t partition_id) {
1659  auto table_ptr = partition_table_.load();
1660  auto it = table_ptr->partitions.find(partition_id);
1661  if (it != table_ptr->partitions.end()) {
1662  return it->second;
1663  }
1664  return boost::uuids::nil_uuid();
1665  }
1666 
1667  int32_t ClientPartitionServiceImpl::get_partition_id(const serialization::pimpl::data &key) {
1668  int32_t pc = get_partition_count();
1669  if (pc <= 0) {
1670  return 0;
1671  }
1672  int hash = key.get_partition_hash();
1673  return util::HashUtil::hash_to_index(hash, pc);
1674  }
1675 
1676  int32_t ClientPartitionServiceImpl::get_partition_count() {
1677  return partition_count_.load();
1678  }
1679 
1680  std::shared_ptr<client::impl::Partition> ClientPartitionServiceImpl::get_partition(int partition_id) {
1681  return std::shared_ptr<client::impl::Partition>(new PartitionImpl(partition_id, client_, *this));
1682  }
1683 
1684  bool ClientPartitionServiceImpl::check_and_set_partition_count(int32_t new_partition_count) {
1685  int32_t expected = 0;
1686  if (partition_count_.compare_exchange_strong(expected, new_partition_count)) {
1687  return true;
1688  }
1689  return partition_count_.load() == new_partition_count;
1690  }
1691 
1692  bool
1693  ClientPartitionServiceImpl::should_be_applied(int32_t connection_id, int32_t version,
1694  const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions,
1695  const partition_table &current) {
1696  auto &lg = client_.get_logger();
1697  if (partitions.empty()) {
1698  if (logger_.enabled(logger::level::finest)) {
1699  log_failure(connection_id, version, current, "response is empty");
1700  }
1701  return false;
1702  }
1703  if (!current.connection_id || connection_id != current.connection_id) {
1704  HZ_LOG(lg, finest,
1705  ([&current, connection_id](){
1706  auto frmt = boost::format("Event coming from a new connection. Old connection id: %1%, "
1707  "new connection %2%");
1708 
1709  if (current.connection_id) {
1710  frmt = frmt % current.connection_id;
1711  } else {
1712  frmt = frmt % "none";
1713  }
1714 
1715  return boost::str(frmt % connection_id);
1716  })()
1717  );
1718 
1719  return true;
1720  }
1721  if (version <= current.version) {
1722  if (lg.enabled(logger::level::finest)) {
1723  log_failure(connection_id, version, current, "response state version is old");
1724  }
1725  return false;
1726  }
1727  return true;
1728  }
1729 
1730  void ClientPartitionServiceImpl::log_failure(int32_t connection_id, int32_t version,
1731  const ClientPartitionServiceImpl::partition_table &current,
1732  const std::string &cause) {
1733  HZ_LOG(logger_, finest,
1734  [&](){
1735  auto frmt = boost::format(" We will not apply the response, since %1% ."
1736  " Response is from connection with id %2%. "
1737  "Current connection id is %3%, response state version:%4%. "
1738  "Current state version: %5%");
1739  if (current.connection_id) {
1740  return boost::str(frmt % cause % connection_id % current.connection_id % version %
1741  current.version);
1742  }
1743  else {
1744  return boost::str(frmt % cause % connection_id % "nullptr" % version % current.version);
1745  }
1746  }()
1747  );
1748  }
1749 
1750  void ClientPartitionServiceImpl::reset() {
1751  partition_table_.store(nullptr);
1752  }
1753 
1754  std::unordered_map<int32_t, boost::uuids::uuid> ClientPartitionServiceImpl::convert_to_map(
1755  const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
1756  std::unordered_map<int32_t, boost::uuids::uuid> new_partitions;
1757  for (auto const &e : partitions) {
1758  for (auto pid: e.second) {
1759  new_partitions.insert({pid, e.first});
1760  }
1761  }
1762  return new_partitions;
1763  }
1764 
1765  int ClientPartitionServiceImpl::PartitionImpl::get_partition_id() const {
1766  return partition_id_;
1767  }
1768 
1769  boost::optional<member> ClientPartitionServiceImpl::PartitionImpl::get_owner() const {
1770  auto owner = partition_service_.get_partition_owner(partition_id_);
1771  if (!owner.is_nil()) {
1772  return client_.get_client_cluster_service().get_member(owner);
1773  }
1774  return boost::none;
1775  }
1776 
1777  ClientPartitionServiceImpl::PartitionImpl::PartitionImpl(int partition_id, ClientContext &client,
1778  ClientPartitionServiceImpl &partition_service)
1779  : partition_id_(partition_id), client_(client), partition_service_(partition_service) {
1780  }
1781 
1782  namespace sequence {
1783  CallIdSequenceWithoutBackpressure::CallIdSequenceWithoutBackpressure() : head_(0) {}
1784 
1785  CallIdSequenceWithoutBackpressure::~CallIdSequenceWithoutBackpressure() = default;
1786 
1787  int32_t CallIdSequenceWithoutBackpressure::get_max_concurrent_invocations() const {
1788  return INT32_MAX;
1789  }
1790 
1791  int64_t CallIdSequenceWithoutBackpressure::next() {
1792  return force_next();
1793  }
1794 
1795  int64_t CallIdSequenceWithoutBackpressure::force_next() {
1796  return ++head_;
1797  }
1798 
1799  void CallIdSequenceWithoutBackpressure::complete() {
1800  // no-op
1801  }
1802 
1803  int64_t CallIdSequenceWithoutBackpressure::get_last_call_id() {
1804  return head_;
1805  }
1806 
1807  // TODO: see if we can utilize std::hardware_destructive_interference_size
1808  AbstractCallIdSequence::AbstractCallIdSequence(int32_t max_concurrent_invocations) {
1809  std::ostringstream out;
1810  out << "maxConcurrentInvocations should be a positive number. maxConcurrentInvocations="
1811  << max_concurrent_invocations;
1812  this->max_concurrent_invocations_ = util::Preconditions::check_positive(max_concurrent_invocations,
1813  out.str());
1814 
1815  for (size_t i = 0; i < longs_.size(); ++i) {
1816  longs_[i] = 0;
1817  }
1818  }
1819 
1820  AbstractCallIdSequence::~AbstractCallIdSequence() = default;
1821 
1822  int32_t AbstractCallIdSequence::get_max_concurrent_invocations() const {
1823  return max_concurrent_invocations_;
1824  }
1825 
1826  int64_t AbstractCallIdSequence::next() {
1827  if (!has_space()) {
1828  handle_no_space_left();
1829  }
1830  return force_next();
1831  }
1832 
1833  int64_t AbstractCallIdSequence::force_next() {
1834  return ++longs_[INDEX_HEAD];
1835  }
1836 
1837  void AbstractCallIdSequence::complete() {
1838  ++longs_[INDEX_TAIL];
1839  assert(longs_[INDEX_TAIL] <= longs_[INDEX_HEAD]);
1840  }
1841 
1842  int64_t AbstractCallIdSequence::get_last_call_id() {
1843  return longs_[INDEX_HEAD];
1844  }
1845 
1846  bool AbstractCallIdSequence::has_space() {
1847  return longs_[INDEX_HEAD] - longs_[INDEX_TAIL] < max_concurrent_invocations_;
1848  }
1849 
1850  int64_t AbstractCallIdSequence::get_tail() {
1851  return longs_[INDEX_TAIL];
1852  }
1853 
1854  const std::unique_ptr<util::concurrent::IdleStrategy> CallIdSequenceWithBackpressure::IDLER(
1855  new util::concurrent::BackoffIdleStrategy(
1856  0, 0, std::chrono::duration_cast<std::chrono::nanoseconds>(
1857  std::chrono::microseconds(1000)).count(),
1858  std::chrono::duration_cast<std::chrono::nanoseconds>(
1859  std::chrono::microseconds(MAX_DELAY_MS * 1000)).count()));
1860 
1861  CallIdSequenceWithBackpressure::CallIdSequenceWithBackpressure(int32_t max_concurrent_invocations,
1862  int64_t backoff_timeout_ms)
1863  : AbstractCallIdSequence(max_concurrent_invocations) {
1864  std::ostringstream out;
1865  out << "backoffTimeoutMs should be a positive number. backoffTimeoutMs=" << backoff_timeout_ms;
1866  util::Preconditions::check_positive(backoff_timeout_ms, out.str());
1867 
1868  backoff_timeout_nanos_ = std::chrono::duration_cast<std::chrono::nanoseconds>(
1869  std::chrono::milliseconds(backoff_timeout_ms)).count();
1870  }
1871 
1872  void CallIdSequenceWithBackpressure::handle_no_space_left() {
1873  auto start = std::chrono::steady_clock::now();
1874  for (int64_t idleCount = 0;; idleCount++) {
1875  int64_t elapsedNanos = std::chrono::duration_cast<std::chrono::nanoseconds>(
1876  std::chrono::steady_clock::now() - start).count();
1877  if (elapsedNanos > backoff_timeout_nanos_) {
1878  throw (exception::exception_builder<exception::hazelcast_overload>(
1879  "CallIdSequenceWithBackpressure::handleNoSpaceLeft")
1880  << "Timed out trying to acquire another call ID."
1881  << " maxConcurrentInvocations = " << get_max_concurrent_invocations()
1882  << ", backoffTimeout = "
1883  << std::chrono::microseconds(backoff_timeout_nanos_ / 1000).count()
1884  << " msecs, elapsed:"
1885  << std::chrono::microseconds(elapsedNanos / 1000).count() << " msecs").build();
1886  }
1887  IDLER->idle(idleCount);
1888  if (has_space()) {
1889  return;
1890  }
1891 
1892  }
1893  }
1894 
1895  FailFastCallIdSequence::FailFastCallIdSequence(int32_t max_concurrent_invocations)
1896  : AbstractCallIdSequence(max_concurrent_invocations) {}
1897 
1898  void FailFastCallIdSequence::handle_no_space_left() {
1899  throw (exception::exception_builder<exception::hazelcast_overload>(
1900  "FailFastCallIdSequence::handleNoSpaceLeft")
1901  << "Maximum invocation count is reached. maxConcurrentInvocations = "
1902  << get_max_concurrent_invocations()).build();
1903 
1904  }
1905 
1906  std::unique_ptr<CallIdSequence> CallIdFactory::new_call_id_sequence(bool is_back_pressure_enabled,
1907  int32_t max_allowed_concurrent_invocations,
1908  int64_t backoff_timeout_ms) {
1909  if (!is_back_pressure_enabled) {
1910  return std::unique_ptr<CallIdSequence>(new CallIdSequenceWithoutBackpressure());
1911  } else if (backoff_timeout_ms <= 0) {
1912  return std::unique_ptr<CallIdSequence>(
1913  new FailFastCallIdSequence(max_allowed_concurrent_invocations));
1914  } else {
1915  return std::unique_ptr<CallIdSequence>(
1916  new CallIdSequenceWithBackpressure(max_allowed_concurrent_invocations,
1917  backoff_timeout_ms));
1918  }
1919  }
1920  }
1921 
1922  namespace listener {
1923  listener_service_impl::listener_service_impl(ClientContext &client_context,
1924  int32_t event_thread_count)
1925  : client_context_(client_context),
1926  serialization_service_(client_context.get_serialization_service()),
1927  logger_(client_context.get_logger()),
1928  client_connection_manager_(client_context.get_connection_manager()),
1929  number_of_event_threads_(event_thread_count),
1930  smart_(client_context.get_client_config().get_network_config().is_smart_routing()) {
1931  auto &invocationService = client_context.get_invocation_service();
1932  invocation_timeout_ = invocationService.get_invocation_timeout();
1933  invocation_retry_pause_ = invocationService.get_invocation_retry_pause();
1934  }
1935 
1936  bool listener_service_impl::registers_local_only() const {
1937  return smart_;
1938  }
1939 
1940  boost::future<boost::uuids::uuid>
1941  listener_service_impl::register_listener(
1942  std::shared_ptr<ListenerMessageCodec> listener_message_codec,
1943  std::shared_ptr<client::impl::BaseEventHandler> handler) {
1944  auto task = boost::packaged_task<boost::uuids::uuid()>([=]() {
1945  return register_listener_internal(listener_message_codec, handler);
1946  });
1947  auto f = task.get_future();
1948  boost::asio::post(registration_executor_->get_executor(), std::move(task));
1949  return f;
1950  }
1951 
1952  boost::future<bool> listener_service_impl::deregister_listener(boost::uuids::uuid registration_id) {
1953  util::Preconditions::check_not_nill(registration_id, "Nil userRegistrationId is not allowed!");
1954 
1955  boost::packaged_task<bool()> task([=]() {
1956  return deregister_listener_internal(registration_id);
1957  });
1958  auto f = task.get_future();
1959  boost::asio::post(registration_executor_->get_executor(), std::move(task));
1960  return f;
1961  }
1962 
1963  void listener_service_impl::connection_added(
1964  const std::shared_ptr<connection::Connection> connection) {
1965  boost::asio::post(registration_executor_->get_executor(), [=]() { connection_added_internal(connection); });
1966  }
1967 
1968  void listener_service_impl::connection_removed(
1969  const std::shared_ptr<connection::Connection> connection) {
1970  boost::asio::post(registration_executor_->get_executor(), [=]() { connection_removed_internal(connection); });
1971  }
1972 
1973  void
1974  listener_service_impl::remove_event_handler(int64_t call_id,
1975  const std::shared_ptr<connection::Connection> &connection) {
1976  boost::asio::post(connection->get_socket().get_executor(),
1977  std::packaged_task<void()>([=]() {
1978  connection->deregister_invocation(call_id);
1979  }));
1980  }
1981 
1982  void listener_service_impl::handle_client_message(
1983  const std::shared_ptr<ClientInvocation> invocation,
1984  const std::shared_ptr<protocol::ClientMessage> response) {
1985  try {
1986  auto partitionId = response->get_partition_id();
1987  if (partitionId == -1) {
1988  // execute on random thread on the thread pool
1989  boost::asio::post(event_executor_->get_executor(), [=]() { process_event_message(invocation, response); });
1990  return;
1991  }
1992 
1993  // process on certain thread which is same for the partition id
1994  boost::asio::post(event_strands_[partitionId % event_strands_.size()],
1995  [=]() { process_event_message(invocation, response); });
1996 
1997  } catch (const std::exception &e) {
1998  if (client_context_.get_lifecycle_service().is_running()) {
1999  HZ_LOG(logger_, warning,
2000  boost::str(boost::format("Delivery of event message to event handler failed. %1%, %2%, %3%")
2001  % e.what() % *response % *invocation)
2002  );
2003  }
2004  }
2005  }
2006 
2007  void listener_service_impl::shutdown() {
2008  ClientExecutionServiceImpl::shutdown_thread_pool(event_executor_.get());
2009  ClientExecutionServiceImpl::shutdown_thread_pool(registration_executor_.get());
2010  }
2011 
2012  void listener_service_impl::start() {
2013  event_executor_.reset(new hazelcast::util::hz_thread_pool(number_of_event_threads_));
2014  registration_executor_.reset(new hazelcast::util::hz_thread_pool(1));
2015 
2016  for (int i = 0; i < number_of_event_threads_; ++i) {
2017  event_strands_.emplace_back(event_executor_->get_executor());
2018  }
2019 
2020  client_connection_manager_.add_connection_listener(shared_from_this());
2021  }
2022 
2023  boost::uuids::uuid listener_service_impl::register_listener_internal(
2024  std::shared_ptr<ListenerMessageCodec> listener_message_codec,
2025  std::shared_ptr<client::impl::BaseEventHandler> handler) {
2026  auto user_registration_id = client_context_.random_uuid();
2027 
2028  std::shared_ptr<listener_registration> registration(new listener_registration{listener_message_codec, handler});
2029  registrations_.put(user_registration_id, registration);
2030  for (auto const &connection : client_connection_manager_.get_active_connections()) {
2031  try {
2032  invoke(registration, connection);
2033  } catch (exception::iexception &e) {
2034  if (connection->is_alive()) {
2035  deregister_listener_internal(user_registration_id);
2036  BOOST_THROW_EXCEPTION((exception::exception_builder<exception::hazelcast_>(
2037  "ClientListenerService::RegisterListenerTask::call")
2038  << "Listener can not be added " << e).build());
2039  }
2040  }
2041  }
2042  return user_registration_id;
2043  }
2044 
2045  bool
2046  listener_service_impl::deregister_listener_internal(boost::uuids::uuid user_registration_id) {
2047  auto listenerRegistration = registrations_.get(user_registration_id);
2048  if (!listenerRegistration) {
2049  return false;
2050  }
2051  bool successful = true;
2052 
2053  auto listener_registrations = listenerRegistration->registrations.entry_set();
2054  for (auto it = listener_registrations.begin();it != listener_registrations.end();) {
2055  const auto &registration = it->second;
2056  const auto& subscriber = it->first;
2057  try {
2058  const auto &listenerMessageCodec = listenerRegistration->codec;
2059  auto serverRegistrationId = registration->server_registration_id;
2060  auto request = listenerMessageCodec->encode_remove_request(serverRegistrationId);
2061  auto invocation = ClientInvocation::create(client_context_,request, "",
2062  subscriber);
2063  invocation->invoke().get();
2064 
2065  remove_event_handler(registration->call_id, subscriber);
2066 
2067  it = listener_registrations.erase(it);
2068  } catch (exception::iexception &e) {
2069  ++it;
2070  if (subscriber->is_alive()) {
2071  successful = false;
2072  std::ostringstream endpoint;
2073  if (subscriber->get_remote_address()) {
2074  endpoint << *subscriber->get_remote_address();
2075  } else {
2076  endpoint << "null";
2077  }
2078  HZ_LOG(logger_, warning,
2079  boost::str(boost::format("ClientListenerService::deregisterListenerInternal "
2080  "Deregistration of listener with ID %1% "
2081  "has failed to address %2% %3%")
2082  % user_registration_id
2083  % subscriber->get_remote_address() % e)
2084  );
2085  }
2086  }
2087  }
2088  if (successful) {
2089  registrations_.remove(user_registration_id);
2090  }
2091  return successful;
2092  }
2093 
2094  void listener_service_impl::connection_added_internal(
2095  const std::shared_ptr<connection::Connection> &connection) {
2096  for (const auto &listener_registration : registrations_.values()) {
2097  invoke_from_internal_thread(listener_registration, connection);
2098  }
2099  }
2100 
2101  void listener_service_impl::connection_removed_internal(
2102  const std::shared_ptr<connection::Connection> &connection) {
2103  for (auto &registry : registrations_.values()) {
2104  registry->registrations.remove(connection);
2105  }
2106  }
2107 
2108  void
2109  listener_service_impl::invoke_from_internal_thread(
2110  const std::shared_ptr<listener_registration> &listener_registration,
2111  const std::shared_ptr<connection::Connection> &connection) {
2112  try {
2113  invoke(listener_registration, connection);
2114  } catch (exception::iexception &e) {
2115  HZ_LOG(logger_, warning,
2116  boost::str(boost::format("Listener with pointer %1% can not be added to "
2117  "a new connection: %2%, reason: %3%")
2118  % listener_registration.get() % *connection % e)
2119  );
2120  }
2121  }
2122 
2123  void
2124  listener_service_impl::invoke(const std::shared_ptr<listener_registration> &listener_registration,
2125  const std::shared_ptr<connection::Connection> &connection) {
2126  if (listener_registration->registrations.contains_key(connection)) {
2127  return;
2128  }
2129 
2130  const auto &codec = listener_registration->codec;
2131  auto request = codec->encode_add_request(registers_local_only());
2132  const auto &handler = listener_registration->handler;
2133  handler->before_listener_register();
2134 
2135  auto invocation = ClientInvocation::create(client_context_,
2136  std::make_shared<protocol::ClientMessage>(std::move(request)), "",
2137  connection);
2138  invocation->set_event_handler(handler);
2139  auto clientMessage = invocation->invoke_urgent().get();
2140 
2141  auto serverRegistrationId = codec->decode_add_response(clientMessage);
2142  handler->on_listener_register();
2143  int64_t correlationId = invocation->get_client_message()->get_correlation_id();
2144 
2145  (*listener_registration).registrations.put(connection, std::shared_ptr<connection_registration>(
2146  new connection_registration{serverRegistrationId, correlationId}));
2147  }
2148 
2149  void listener_service_impl::process_event_message(
2150  const std::shared_ptr<ClientInvocation> invocation,
2151  const std::shared_ptr<protocol::ClientMessage> response) {
2152  auto eventHandler = invocation->get_event_handler();
2153  if (!eventHandler) {
2154  if (client_context_.get_lifecycle_service().is_running()) {
2155  HZ_LOG(logger_, warning,
2156  boost::str(boost::format("No eventHandler for invocation. "
2157  "Ignoring this invocation response. %1%")
2158  % *invocation)
2159  );
2160  }
2161 
2162  return;
2163  }
2164 
2165  try {
2166  eventHandler->handle(*response);
2167  } catch (std::exception &e) {
2168  if (client_context_.get_lifecycle_service().is_running()) {
2169  HZ_LOG(logger_, warning,
2170  boost::str(boost::format("Delivery of event message to event handler failed. %1%, %2%, %3%")
2171  % e.what() % *response % *invocation)
2172  );
2173  }
2174  }
2175  }
2176 
2177  listener_service_impl::~listener_service_impl() = default;
2178 
2179  void cluster_view_listener::start() {
2180  client_context_.get_connection_manager().add_connection_listener(shared_from_this());
2181  }
2182 
2183  void cluster_view_listener::connection_added(const std::shared_ptr<connection::Connection> connection) {
2184  try_register(connection);
2185  }
2186 
2187  void cluster_view_listener::connection_removed(const std::shared_ptr<connection::Connection> connection) {
2188  try_reregister_to_random_connection(connection->get_connection_id());
2189  }
2190 
2191  cluster_view_listener::cluster_view_listener(ClientContext &client_context) : client_context_(
2192  client_context) {}
2193 
2194  void cluster_view_listener::try_register(std::shared_ptr<connection::Connection> connection) {
2195  int32_t expected_id = -1;
2196  if (!listener_added_connection_id_.compare_exchange_strong(expected_id,
2197  connection->get_connection_id())) {
2198  // already registering/registered to another connection
2199  return;
2200  }
2201 
2202  auto invocation = ClientInvocation::create(client_context_,
2203  std::make_shared<protocol::ClientMessage>(
2204  protocol::codec::client_addclusterviewlistener_encode()), "", connection);
2205 
2206  auto handler = std::make_shared<event_handler>(connection->get_connection_id(), *this);
2207  invocation->set_event_handler(handler);
2208  handler->before_listener_register();
2209 
2210  std::weak_ptr<cluster_view_listener> weak_self = shared_from_this();
2211  auto conn_id = connection->get_connection_id();
2212 
2213  invocation->invoke_urgent().then(
2214  [weak_self, handler, conn_id](boost::future<protocol::ClientMessage> f) {
2215  auto self = weak_self.lock();
2216  if (!self)
2217  return;
2218 
2219  if (f.has_value()) {
2220  handler->on_listener_register();
2221  return;
2222  }
2223 
2224  //completes with exception, listener needs to be reregistered
2225  self->try_reregister_to_random_connection(conn_id);
2226  });
2227 
2228  }
2229 
2230  void cluster_view_listener::try_reregister_to_random_connection(int32_t old_connection_id) {
2231  if (!listener_added_connection_id_.compare_exchange_strong(old_connection_id, -1)) {
2232  //somebody else already trying to reregister
2233  return;
2234  }
2235  auto new_connection = client_context_.get_connection_manager().get_random_connection();
2236  if (new_connection) {
2237  try_register(new_connection);
2238  }
2239  }
2240 
2241  cluster_view_listener::~cluster_view_listener() = default;
2242 
2243  void
2244  cluster_view_listener::event_handler::handle_membersview(int32_t version,
2245  const std::vector<member> &member_infos) {
2246  view_listener.client_context_.get_client_cluster_service().handle_event(version, member_infos);
2247  }
2248 
2249  void
2250  cluster_view_listener::event_handler::handle_partitionsview(int32_t version,
2251  const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>> &partitions) {
2252  view_listener.client_context_.get_partition_service().handle_event(connection_id, version, partitions);
2253  }
2254 
2255  void cluster_view_listener::event_handler::before_listener_register() {
2256  view_listener.client_context_.get_client_cluster_service().clear_member_list_version();
2257  auto &lg = view_listener.client_context_.get_logger();
2258  HZ_LOG(lg, finest,
2259  boost::str(boost::format(
2260  "Register attempt of cluster_view_listener::event_handler to connection with id %1%") %
2261  connection_id));
2262  }
2263 
2264  void cluster_view_listener::event_handler::on_listener_register() {
2265  auto &lg = view_listener.client_context_.get_logger();
2266  HZ_LOG(lg, finest,
2267  boost::str(boost::format(
2268  "Registered cluster_view_listener::event_handler to connection with id %1%") %
2269  connection_id));
2270  }
2271 
2272  cluster_view_listener::event_handler::event_handler(int connectionId,
2273  cluster_view_listener &viewListener)
2274  : connection_id(connectionId), view_listener(viewListener) {}
2275  }
2276 
2277  protocol::ClientMessage
2278  ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_add_request(bool local_only) const {
2279  return protocol::codec::client_localbackuplistener_encode();
2280  }
2281 
2282  protocol::ClientMessage ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_remove_request(
2283  boost::uuids::uuid real_registration_id) const {
2284  assert(0);
2285  return protocol::ClientMessage(0);
2286  }
2287 
2288  void ClientInvocationServiceImpl::noop_backup_event_handler::handle_backup(
2289  int64_t source_invocation_correlation_id) {
2290  assert(0);
2291  }
2292 
2293  namespace discovery {
2294  remote_address_provider::remote_address_provider(
2295  std::function<std::unordered_map<address, address>()> addr_map_method,
2296  bool use_public) : refresh_address_map_(std::move(addr_map_method)),
2297  use_public_(use_public) {}
2298 
2299  std::vector<address> remote_address_provider::load_addresses() {
2300  auto address_map = refresh_address_map_();
2301  std::lock_guard<std::mutex> guard(lock_);
2302  private_to_public_ = address_map;
2303  std::vector<address> addresses;
2304  addresses.reserve(address_map.size());
2305  for (const auto &addr_pair : address_map) {
2306  addresses.push_back(addr_pair.first);
2307  }
2308  return addresses;
2309  }
2310 
2311  boost::optional<address> remote_address_provider::translate(const address &addr) {
2312  // if it is inside cloud, return private address otherwise we need to translate it.
2313  if (!use_public_) {
2314  return addr;
2315  }
2316 
2317  {
2318  std::lock_guard<std::mutex> guard(lock_);
2319  auto found = private_to_public_.find(addr);
2320  if (found != private_to_public_.end()) {
2321  return found->second;
2322  }
2323  }
2324 
2325  auto address_map = refresh_address_map_();
2326 
2327  std::lock_guard<std::mutex> guard(lock_);
2328  private_to_public_ = address_map;
2329 
2330  auto found = private_to_public_.find(addr);
2331  if (found != private_to_public_.end()) {
2332  return found->second;
2333  }
2334 
2335  return boost::none;
2336  }
2337 
2338 #ifdef HZ_BUILD_WITH_SSL
2339  cloud_discovery::cloud_discovery(config::cloud_config &config, std::string cloud_base_url,
2340  std::chrono::steady_clock::duration timeout)
2341  : cloud_config_(config), cloud_base_url_(cloud_base_url), timeout_(timeout) {}
2342 #else
2343  cloud_discovery::cloud_discovery(config::cloud_config &config, std::string cloud_base_url,
2344  std::chrono::steady_clock::duration timeout) {}
2345 #endif // HZ_BUILD_WITH_SSL
2346 
2347  std::unordered_map<address, address> cloud_discovery::get_addresses() {
2348 #ifdef HZ_BUILD_WITH_SSL
2349  try {
2350  util::SyncHttpsClient httpsConnection(cloud_base_url_, std::string(CLOUD_URL_PATH) +
2351  cloud_config_.discovery_token, timeout_);
2352  auto &conn_stream = httpsConnection.connect_and_get_response();
2353  return parse_json_response(conn_stream);
2354  } catch (std::exception &e) {
2355  std::throw_with_nested(boost::enable_current_exception(
2356  exception::illegal_state("cloud_discovery::get_addresses",
2357  e.what())));
2358  }
2359 #else
2360  util::Preconditions::check_ssl("cloud_discovery::get_addresses");
2361  return std::unordered_map<address, address>();
2362 #endif
2363  }
2364 
2365  std::unordered_map<address, address>
2366  cloud_discovery::parse_json_response(std::istream &conn_stream) {
2367  namespace pt = boost::property_tree;
2368 
2369  pt::ptree root;
2370  pt::read_json(conn_stream, root);
2371 
2372  std::unordered_map<address, address> addresses;
2373  for (const auto &item : root) {
2374  auto private_address = item.second.get<std::string>(PRIVATE_ADDRESS_PROPERTY);
2375  auto public_address = item.second.get<std::string>(PUBLIC_ADDRESS_PROPERTY);
2376 
2377  address public_addr = create_address(public_address, -1);
2378  //if it is not explicitly given, create the private address with public addresses port
2379  auto private_addr = create_address(private_address, public_addr.get_port());
2380  addresses.emplace(std::move(private_addr), std::move(public_addr));
2381  }
2382 
2383  return addresses;
2384  }
2385 
2386  address cloud_discovery::create_address(const std::string &hostname, int default_port) {
2387  auto address_holder = util::AddressUtil::get_address_holder(hostname, default_port);
2388  auto scoped_hostname = util::AddressHelper::get_scoped_hostname(address_holder);
2389  return address(std::move(scoped_hostname), address_holder.get_port());
2390  }
2391  }
2392  }
2393  }
2394  }
2395 }
2396 
2397 namespace std {
2398  bool less<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
2399  const hazelcast::client::spi::DefaultObjectNamespace &lhs,
2400  const hazelcast::client::spi::DefaultObjectNamespace &rhs) const {
2401  int result = lhs.get_service_name().compare(rhs.get_service_name());
2402  if (result < 0) {
2403  return true;
2404  }
2405 
2406  if (result > 0) {
2407  return false;
2408  }
2409 
2410  return lhs.get_object_name().compare(rhs.get_object_name()) < 0;
2411  }
2412 
2413  std::size_t
2414  hash<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
2415  const hazelcast::client::spi::DefaultObjectNamespace &k) const noexcept {
2416  return std::hash<std::string>()(k.get_service_name() + k.get_object_name());
2417  }
2418 }
2419 
2420 
Hazelcast cluster interface.
Definition: cluster.h:36
const std::unordered_set< member > & get_members() const
Returns an immutable set of ordered members at the moment this MembershipListener is registered.
Definition: spi.cpp:75
cluster & get_cluster()
Returns the cluster of the event.
Definition: spi.cpp:79
lifecycle_state get_state() const
Definition: spi.cpp:91
lifecycle_event(lifecycle_state state)
Constructor.
Definition: spi.cpp:87