Hazelcast C++ Client
Hazelcast C++ Client Library
Loading...
Searching...
No Matches
spi.cpp
1/*
2 * Copyright (c) 2008-2025, Hazelcast, Inc. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <utility>
18
19#include <boost/uuid/uuid_hash.hpp>
20#include <boost/functional/hash.hpp>
21#include <boost/property_tree/ptree.hpp>
22#include <boost/property_tree/json_parser.hpp>
23
24#include "hazelcast/client/hazelcast_client.h"
25#include <hazelcast/client/protocol/codec/ErrorCodec.h>
26#include <hazelcast/client/spi/impl/ListenerMessageCodec.h>
27#include <hazelcast/client/spi/impl/ClientClusterServiceImpl.h>
28#include <hazelcast/client/spi/impl/listener/cluster_view_listener.h>
29#include <hazelcast/client/spi/impl/listener/listener_service_impl.h>
30#include <hazelcast/client/spi/impl/discovery/remote_address_provider.h>
31#include <hazelcast/client/spi/impl/discovery/cloud_discovery.h>
32#include <hazelcast/util/AddressUtil.h>
33#include "hazelcast/client/member_selectors.h"
34#include "hazelcast/client/lifecycle_event.h"
35#include "hazelcast/client/initial_membership_event.h"
36#include "hazelcast/client/membership_event.h"
37#include "hazelcast/client/lifecycle_listener.h"
38#include "hazelcast/client/spi/ProxyManager.h"
39#include "hazelcast/client/spi/ClientProxy.h"
40#include "hazelcast/client/spi/ClientContext.h"
41#include "hazelcast/client/spi/impl/ClientInvocation.h"
42#include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
43#include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
44#include "hazelcast/client/impl/statistics/Statistics.h"
45#include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
46#include "hazelcast/client/spi/impl/DefaultAddressProvider.h"
47#include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithBackpressure.h"
48#include "hazelcast/client/spi/impl/sequence/CallIdSequenceWithoutBackpressure.h"
49#include "hazelcast/client/spi/impl/sequence/FailFastCallIdSequence.h"
50#include "hazelcast/client/spi/impl/sequence/CallIdFactory.h"
51#include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
52#include "hazelcast/util/AddressHelper.h"
53#include "hazelcast/util/HashUtil.h"
54#include "hazelcast/util/concurrent/BackoffIdleStrategy.h"
55#ifdef HZ_BUILD_WITH_SSL
56#include <hazelcast/util/SyncHttpsClient.h>
57#endif // HZ_BUILD_WITH_SSL
58
59namespace hazelcast {
60namespace client {
61const std::unordered_set<member>&
63{
64 return members_;
65}
66
69{
70 return cluster_;
71}
72
73initial_membership_event::initial_membership_event(
75 std::unordered_set<member> members)
76 : cluster_(cluster)
77 , members_(std::move(members))
78{}
79
83
86{
87 return state_;
88}
89
90namespace spi {
91ProxyManager::ProxyManager(ClientContext& context)
92 : client_(context)
93{}
94
95void
96ProxyManager::init()
97{}
98
99void
100ProxyManager::destroy()
101{
102 std::lock_guard<std::recursive_mutex> guard(lock_);
103 for (auto& p : proxies_) {
104 try {
105 auto proxy = p.second.get();
106 p.second.get()->on_shutdown();
107 } catch (std::exception& se) {
108 auto& lg = client_.get_logger();
109 HZ_LOG(
110 lg,
111 finest,
112 boost::str(boost::format(
113 "Proxy was not created, "
114 "hence onShutdown can be called. Exception: %1%") %
115 se.what()));
116 }
117 }
118 proxies_.clear();
119}
120
121boost::future<void>
122ProxyManager::initialize(const std::shared_ptr<ClientProxy>& client_proxy)
123{
124 auto clientMessage = protocol::codec::client_createproxy_encode(
125 client_proxy->get_name(), client_proxy->get_service_name());
126 return spi::impl::ClientInvocation::create(
127 client_, clientMessage, client_proxy->get_service_name())
128 ->invoke()
129 .then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
130 f.get();
131 client_proxy->on_initialize();
132 });
133}
134
135boost::future<void>
136ProxyManager::destroy_proxy(ClientProxy& proxy)
137{
138 DefaultObjectNamespace objectNamespace(proxy.get_service_name(),
139 proxy.get_name());
140 std::shared_ptr<ClientProxy> registeredProxy;
141 {
142 std::lock_guard<std::recursive_mutex> guard(lock_);
143 auto it = proxies_.find(objectNamespace);
144 registeredProxy = it == proxies_.end() ? nullptr : it->second.get();
145 if (it != proxies_.end()) {
146 proxies_.erase(it);
147 }
148 }
149
150 try {
151 if (registeredProxy) {
152 try {
153 proxy.destroy_locally();
154 return proxy.destroy_remotely();
155 } catch (exception::iexception&) {
156 proxy.destroy_remotely();
157 throw;
158 }
159 }
160 if (&proxy != registeredProxy.get()) {
161 // The given proxy is stale and was already destroyed, but the
162 // caller may have allocated local resources in the context of this
163 // stale proxy instance after it was destroyed, so we have to
164 // cleanup it locally one more time to make sure there are no
165 // leaking local resources.
166 proxy.destroy_locally();
167 }
168 } catch (...) {
169 if (&proxy != registeredProxy.get()) {
170 // The given proxy is stale and was already destroyed, but the
171 // caller may have allocated local resources in the context of this
172 // stale proxy instance after it was destroyed, so we have to
173 // cleanup it locally one more time to make sure there are no
174 // leaking local resources.
175 proxy.destroy_locally();
176 }
177 throw;
178 }
179 return boost::make_ready_future();
180}
181
182ClientContext::ClientContext(const client::hazelcast_client& hazelcast_client)
183 : hazelcast_client_(*hazelcast_client.client_impl_)
184{}
185
186ClientContext::ClientContext(
187 client::impl::hazelcast_client_instance_impl& hazelcast_client)
188 : hazelcast_client_(hazelcast_client)
189{}
190
191serialization::pimpl::SerializationService&
192ClientContext::get_serialization_service()
193{
194 return hazelcast_client_.serialization_service_;
195}
196
197impl::ClientClusterServiceImpl&
198ClientContext::get_client_cluster_service()
199{
200 return hazelcast_client_.cluster_service_;
201}
202
203impl::ClientInvocationServiceImpl&
204ClientContext::get_invocation_service()
205{
206 return *hazelcast_client_.invocation_service_;
207}
208
210ClientContext::get_client_config()
211{
212 return hazelcast_client_.client_config_;
213}
214
215impl::ClientPartitionServiceImpl&
216ClientContext::get_partition_service()
217{
218 return *hazelcast_client_.partition_service_;
219}
220
221lifecycle_service&
222ClientContext::get_lifecycle_service()
223{
224 return hazelcast_client_.lifecycle_service_;
225}
226
227spi::impl::listener::listener_service_impl&
228ClientContext::get_client_listener_service()
229{
230 return *hazelcast_client_.listener_service_;
231}
232
233connection::ClientConnectionManagerImpl&
234ClientContext::get_connection_manager()
235{
236 return *hazelcast_client_.connection_manager_;
237}
238
239internal::nearcache::NearCacheManager&
240ClientContext::get_near_cache_manager()
241{
242 return *hazelcast_client_.near_cache_manager_;
243}
244
246ClientContext::get_client_properties()
247{
248 return hazelcast_client_.client_properties_;
249}
250
251cluster&
252ClientContext::get_cluster()
253{
254 return hazelcast_client_.cluster_;
255}
256
257std::shared_ptr<impl::sequence::CallIdSequence>&
258ClientContext::get_call_id_sequence() const
259{
260 return hazelcast_client_.call_id_sequence_;
261}
262
263const protocol::ClientExceptionFactory&
264ClientContext::get_client_exception_factory() const
265{
266 return hazelcast_client_.get_exception_factory();
267}
268
269const std::string&
270ClientContext::get_name() const
271{
272 return hazelcast_client_.get_name();
273}
274
275impl::ClientExecutionServiceImpl&
276ClientContext::get_client_execution_service() const
277{
278 return *hazelcast_client_.execution_service_;
279}
280
281const std::shared_ptr<client::impl::ClientLockReferenceIdGenerator>&
282ClientContext::get_lock_reference_id_generator()
283{
284 return hazelcast_client_.get_lock_reference_id_generator();
285}
286
287std::shared_ptr<client::impl::hazelcast_client_instance_impl>
288ClientContext::get_hazelcast_client_implementation()
289{
290 return hazelcast_client_.shared_from_this();
291}
292
293spi::ProxyManager&
294ClientContext::get_proxy_manager()
295{
296 return hazelcast_client_.get_proxy_manager();
297}
298
299logger&
300ClientContext::get_logger()
301{
302 return *hazelcast_client_.logger_;
303}
304
305client::impl::statistics::Statistics&
306ClientContext::get_clientstatistics()
307{
308 return *hazelcast_client_.statistics_;
309}
310
311spi::impl::listener::cluster_view_listener&
312ClientContext::get_cluster_view_listener()
313{
314 return *hazelcast_client_.cluster_listener_;
315}
316
317boost::uuids::uuid
318ClientContext::random_uuid()
319{
320 return hazelcast_client_.random_uuid();
321}
322
323cp::internal::session::proxy_session_manager&
324ClientContext::get_proxy_session_manager()
325{
326 return hazelcast_client_.proxy_session_manager_;
327}
328
329serialization::pimpl::default_schema_service&
330ClientContext::get_schema_service()
331{
332 return hazelcast_client_.schema_service_;
333}
334
335lifecycle_service::lifecycle_service(
336 ClientContext& client_context,
337 const std::vector<lifecycle_listener>& listeners)
338 : client_context_(client_context)
339 , listeners_()
340 , shutdown_completed_latch_(1)
341{
342 for (const auto& listener : listeners) {
343 add_listener(lifecycle_listener(listener));
344 }
345}
346
347bool
348lifecycle_service::start()
349{
350 bool expected = false;
351 if (!active_.compare_exchange_strong(expected, true)) {
352 return false;
353 }
354
355 fire_lifecycle_event(lifecycle_event::STARTED);
356
357 client_context_.get_client_execution_service().start();
358
359 client_context_.get_client_listener_service().start();
360
361 client_context_.get_invocation_service().start();
362
363 client_context_.get_client_cluster_service().start();
364
365 client_context_.get_cluster_view_listener().start();
366
367 if (!client_context_.get_connection_manager().start()) {
368 return false;
369 }
370
371 auto& connectionStrategyConfig =
372 client_context_.get_client_config().get_connection_strategy_config();
373 if (!connectionStrategyConfig.is_async_start()) {
374 // The client needs to open connections to all members before any
375 // services requiring internal listeners start
376 wait_for_initial_membership_event();
377 client_context_.get_connection_manager()
378 .connect_to_all_cluster_members();
379 }
380
381 client_context_.get_invocation_service().add_backup_listener();
382
383 client_context_.get_clientstatistics().start();
384
385 return true;
386}
387
388void
389lifecycle_service::shutdown()
390{
391 bool expected = true;
392 if (!active_.compare_exchange_strong(expected, false)) {
393 shutdown_completed_latch_.wait();
394 return;
395 }
396 try {
397 fire_lifecycle_event(lifecycle_event::SHUTTING_DOWN);
398 client_context_.get_proxy_session_manager().shutdown();
399 client_context_.get_clientstatistics().shutdown();
400 client_context_.get_proxy_manager().destroy();
401 client_context_.get_connection_manager().shutdown();
402 client_context_.get_client_cluster_service().shutdown();
403 client_context_.get_invocation_service().shutdown();
404 client_context_.get_client_listener_service().shutdown();
405 client_context_.get_near_cache_manager().destroy_all_near_caches();
406 fire_lifecycle_event(lifecycle_event::SHUTDOWN);
407 client_context_.get_client_execution_service().shutdown();
408 client_context_.get_serialization_service().dispose();
409 shutdown_completed_latch_.count_down();
410 } catch (std::exception& e) {
411 HZ_LOG(
412 client_context_.get_logger(),
413 info,
414 boost::str(
415 boost::format(
416 "An exception occured during LifecycleService shutdown. %1%") %
417 e.what()));
418 shutdown_completed_latch_.count_down();
419 }
420}
421
422boost::uuids::uuid
423lifecycle_service::add_listener(lifecycle_listener&& lifecycle_listener)
424{
425 std::lock_guard<std::mutex> lg(listener_lock_);
426 const auto id = uuid_generator_();
427 listeners_.emplace(id, std::move(lifecycle_listener));
428 return id;
429}
430
431bool
432lifecycle_service::remove_listener(const boost::uuids::uuid& registration_id)
433{
434 std::lock_guard<std::mutex> guard(listener_lock_);
435 return listeners_.erase(registration_id) == 1;
436}
437
438void
439lifecycle_service::fire_lifecycle_event(const lifecycle_event& lifecycle_event)
440{
441 std::lock_guard<std::mutex> guard(listener_lock_);
442 logger& lg = client_context_.get_logger();
443
444 std::function<void(lifecycle_listener&)> fire_one;
445
446 switch (lifecycle_event.get_state()) {
447 case lifecycle_event::STARTING: {
448 // convert the date string from "2016-04-20" to 20160420
449 std::string date(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_DATE));
450 util::git_date_to_hazelcast_log_date(date);
451 std::string commitId(HAZELCAST_STRINGIZE(HAZELCAST_GIT_COMMIT_ID));
452 commitId.erase(std::remove(commitId.begin(), commitId.end(), '"'),
453 commitId.end());
454
455 HZ_LOG(lg,
456 info,
457 (boost::format("(%1%:%2%) LifecycleService::LifecycleEvent "
458 "Client (%3%) is STARTING") %
459 date % commitId %
460 client_context_.get_connection_manager().get_client_uuid())
461 .str());
462 char msg[100];
463 util::hz_snprintf(
464 msg,
465 100,
466 "(%s:%s) LifecycleService::LifecycleEvent STARTING",
467 date.c_str(),
468 commitId.c_str());
469 HZ_LOG(lg, info, msg);
470
471 fire_one = [](lifecycle_listener& listener) {
472 listener.starting_();
473 };
474 break;
475 }
476 case lifecycle_event::STARTED: {
477 HZ_LOG(lg, info, "LifecycleService::LifecycleEvent STARTED");
478
479 fire_one = [](lifecycle_listener& listener) {
480 listener.started_();
481 };
482 break;
483 }
484 case lifecycle_event::SHUTTING_DOWN: {
485 HZ_LOG(lg, info, "LifecycleService::LifecycleEvent SHUTTING_DOWN");
486
487 fire_one = [](lifecycle_listener& listener) {
488 listener.shutting_down_();
489 };
490 break;
491 }
492 case lifecycle_event::SHUTDOWN: {
493 HZ_LOG(lg, info, "LifecycleService::LifecycleEvent SHUTDOWN");
494
495 fire_one = [](lifecycle_listener& listener) {
496 listener.shutdown_();
497 };
498 break;
499 }
500 case lifecycle_event::CLIENT_CONNECTED: {
501 HZ_LOG(
502 lg, info, "LifecycleService::LifecycleEvent CLIENT_CONNECTED");
503
504 fire_one = [](lifecycle_listener& listener) {
505 listener.connected_();
506 };
507 break;
508 }
509 case lifecycle_event::CLIENT_DISCONNECTED: {
510 HZ_LOG(
511 lg, info, "LifecycleService::LifecycleEvent CLIENT_DISCONNECTED");
512
513 fire_one = [](lifecycle_listener& listener) {
514 listener.disconnected_();
515 };
516 break;
517 }
518 }
519
520 for (auto& item : listeners_) {
521 fire_one(item.second);
522 }
523}
524
525bool
526lifecycle_service::is_running()
527{
528 return active_;
529}
530
531lifecycle_service::~lifecycle_service()
532{
533 if (active_) {
534 shutdown();
535 }
536}
537
538void
539lifecycle_service::wait_for_initial_membership_event() const
540{
541 client_context_.get_client_cluster_service()
542 .wait_initial_member_list_fetched();
543}
544
545DefaultObjectNamespace::DefaultObjectNamespace(const std::string& service,
546 const std::string& object)
547 : service_name_(service)
548 , object_name_(object)
549{}
550
551const std::string&
552DefaultObjectNamespace::get_service_name() const
553{
554 return service_name_;
555}
556
557const std::string&
558DefaultObjectNamespace::get_object_name() const
559{
560 return object_name_;
561}
562
563bool
564DefaultObjectNamespace::operator==(const DefaultObjectNamespace& rhs) const
565{
566 return service_name_ == rhs.service_name_ &&
567 object_name_ == rhs.object_name_;
568}
569
570ClientProxy::ClientProxy(const std::string& name,
571 const std::string& service_name,
572 ClientContext& context)
573 : name_(name)
574 , service_name_(service_name)
575 , context_(context)
576{}
577
578ClientProxy::~ClientProxy() = default;
579
580const std::string&
581ClientProxy::get_name() const
582{
583 return name_;
584}
585
586const std::string&
587ClientProxy::get_service_name() const
588{
589 return service_name_;
590}
591
592ClientContext&
593ClientProxy::get_context()
594{
595 return context_;
596}
597
598void
599ClientProxy::on_destroy()
600{}
601
602boost::future<void>
603ClientProxy::destroy()
604{
605 return get_context().get_proxy_manager().destroy_proxy(*this);
606}
607
608void
609ClientProxy::destroy_locally()
610{
611 if (pre_destroy()) {
612 try {
613 on_destroy();
614 post_destroy();
615 } catch (exception::iexception&) {
616 post_destroy();
617 throw;
618 }
619 }
620}
621
622bool
623ClientProxy::pre_destroy()
624{
625 return true;
626}
627
628void
629ClientProxy::post_destroy()
630{}
631
632void
633ClientProxy::on_initialize()
634{}
635
636void
637ClientProxy::on_shutdown()
638{}
639
640serialization::pimpl::SerializationService&
641ClientProxy::get_serialization_service()
642{
643 return context_.get_serialization_service();
644}
645
646boost::future<void>
647ClientProxy::destroy_remotely()
648{
649 auto clientMessage = protocol::codec::client_destroyproxy_encode(
650 get_name(), get_service_name());
651 return spi::impl::ClientInvocation::create(
652 get_context(),
653 std::make_shared<protocol::ClientMessage>(
654 std::move(clientMessage)),
655 get_name())
656 ->invoke()
657 .then(boost::launch::sync,
658 [](boost::future<protocol::ClientMessage> f) { f.get(); });
659}
660
661boost::future<boost::uuids::uuid>
662ClientProxy::register_listener(
663 std::shared_ptr<impl::ListenerMessageCodec> listener_message_codec,
664 std::shared_ptr<client::impl::BaseEventHandler> handler)
665{
666 return get_context().get_client_listener_service().register_listener(
667 listener_message_codec, handler);
668}
669
670boost::future<bool>
671ClientProxy::deregister_listener(boost::uuids::uuid registration_id)
672{
673 return get_context().get_client_listener_service().deregister_listener(
674 registration_id);
675}
676
677namespace impl {
678boost::uuids::uuid
679ListenerMessageCodec::decode_add_response(protocol::ClientMessage& msg) const
680{
681 return msg.get_first_uuid();
682}
683
684bool
685ListenerMessageCodec::decode_remove_response(protocol::ClientMessage& msg) const
686{
687 return msg.get_first_fixed_sized_field<bool>();
688}
689
690ClientInvocationServiceImpl::ClientInvocationServiceImpl(ClientContext& client)
691 : client_(client)
692 , logger_(client.get_logger())
693 , invocation_timeout_(
694 std::chrono::seconds(client.get_client_properties().get_integer(
695 client.get_client_properties().get_invocation_timeout_seconds())))
696 , invocation_retry_pause_(
697 std::chrono::milliseconds(client.get_client_properties().get_long(
698 client.get_client_properties().get_invocation_retry_pause_millis())))
699 , smart_routing_(
700 client.get_client_config().get_network_config().is_smart_routing())
701 , backup_acks_enabled_(smart_routing_ &&
702 client.get_client_config().backup_acks_enabled())
703 , fail_on_indeterminate_operation_state_(
704 client.get_client_properties().get_boolean(
705 client.get_client_properties().fail_on_indeterminate_state()))
706 , backup_timeout_(
707 std::chrono::milliseconds(client.get_client_properties().get_integer(
708 client.get_client_properties().backup_timeout_millis())))
709{}
710
711void
712ClientInvocationServiceImpl::start()
713{}
714
715void
716ClientInvocationServiceImpl::add_backup_listener()
717{
718 if (this->backup_acks_enabled_) {
719 auto& listener_service = this->client_.get_client_listener_service();
720 listener_service
721 .register_listener(std::make_shared<BackupListenerMessageCodec>(),
722 std::make_shared<noop_backup_event_handler>(logger_))
723 .get();
724 }
725}
726
727void
728ClientInvocationServiceImpl::shutdown()
729{
730 is_shutdown_.store(true);
731}
732
733std::chrono::milliseconds
734ClientInvocationServiceImpl::get_invocation_timeout() const
735{
736 return invocation_timeout_;
737}
738
739std::chrono::milliseconds
740ClientInvocationServiceImpl::get_invocation_retry_pause() const
741{
742 return invocation_retry_pause_;
743}
744
745bool
746ClientInvocationServiceImpl::is_redo_operation()
747{
748 return client_.get_client_config().is_redo_operation();
749}
750
751void
752ClientInvocationServiceImpl::handle_client_message(
753 const std::shared_ptr<ClientInvocation>& invocation,
754 const std::shared_ptr<protocol::ClientMessage>& response)
755{
756 try {
757 if (protocol::codec::ErrorCodec::EXCEPTION_MESSAGE_TYPE ==
758 response->get_message_type()) {
759 auto error_holder = protocol::codec::ErrorCodec::decode(*response);
760 invocation->notify_exception(
761 client_.get_client_exception_factory().create_exception(
762 error_holder));
763 } else {
764 invocation->notify(response);
765 }
766 } catch (std::exception& e) {
767 HZ_LOG(
768 logger_,
769 severe,
770 boost::str(boost::format("Failed to process response for %1%. %2%") %
771 *invocation % e.what()));
772 }
773}
774
775bool
776ClientInvocationServiceImpl::send(
777 const std::shared_ptr<impl::ClientInvocation>& invocation,
778 const std::shared_ptr<connection::Connection>& connection)
779{
780 if (is_shutdown_) {
781 BOOST_THROW_EXCEPTION(exception::hazelcast_client_not_active(
782 "ClientInvocationServiceImpl::send", "Client is shut down"));
783 }
784
785 if (backup_acks_enabled_) {
786 invocation->get_client_message()->add_flag(
787 protocol::ClientMessage::BACKUP_AWARE_FLAG);
788 }
789
790 write_to_connection(*connection, invocation);
791 invocation->set_send_connection(connection);
792 return true;
793}
794
795void
796ClientInvocationServiceImpl::write_to_connection(
797 connection::Connection& connection,
798 const std::shared_ptr<ClientInvocation>& client_invocation)
799{
800 auto clientMessage = client_invocation->get_client_message();
801 connection.write(client_invocation);
802}
803
804void
805ClientInvocationServiceImpl::check_invocation_allowed()
806{
807 client_.get_connection_manager().check_invocation_allowed();
808}
809
810void
811ClientInvocationServiceImpl::check_urgent_invocation_allowed(
812 const ClientInvocation& invocation)
813{
814 if (client_.get_connection_manager().client_initialized_on_cluster()) {
815 // If the client is initialized on the cluster, that means we
816 // have sent all the schemas to the cluster, even if we are
817 // reconnected to it
818 return;
819 }
820
821 if (!client_.get_hazelcast_client_implementation()
822 ->should_check_urgent_invocations()) {
823 // If there were no Compact schemas to begin with, we don't need
824 // to perform the check below. If the client didn't send a Compact
825 // schema up until this point, the retries or listener registrations
826 // could not send a schema, because if they were, we wouldn't hit
827 // this line.
828 return;
829 }
830
831 // We are not yet initialized on cluster, so the Compact schemas might
832 // not be sent yet. This message contains some serialized classes,
833 // and it is possible that it can also contain Compact serialized data.
834 // In that case, allowing this invocation to go through now could
835 // violate the invariant that the schema must come to cluster before
836 // the data. We will retry this invocation and wait until the client
837 // is initialized on the cluster, which means schemas are replicated
838 // in the cluster.
839 if (invocation.get_client_message()
840 ->contains_serialized_data_in_request()) {
842 "ClientInvocationServiceImpl::check_urgent_invocation_allowed",
843 invocation
844 };
845 }
846}
847
848bool
849ClientInvocationServiceImpl::invoke(
850 std::shared_ptr<ClientInvocation> invocation)
851{
852 auto connection = client_.get_connection_manager().get_random_connection();
853 if (!connection) {
854 HZ_LOG(logger_, finest, "No connection found to invoke");
855 return false;
856 }
857 return send(invocation, connection);
858}
859
860DefaultAddressProvider::DefaultAddressProvider(
861 config::client_network_config& network_config)
862 : network_config_(network_config)
863{}
864
865std::vector<address>
866DefaultAddressProvider::load_addresses()
867{
868 std::vector<address> addresses = network_config_.get_addresses();
869 if (addresses.empty()) {
870 addresses.emplace_back("127.0.0.1", 5701);
871 }
872
873 // TODO Implement AddressHelper to add alternative ports for the same
874 // address
875
876 return addresses;
877}
878
879boost::optional<address>
880DefaultAddressProvider::translate(const address& addr)
881{
882 return addr;
883}
884
885bool
886DefaultAddressProvider::is_default_provider()
887{
888 return true;
889}
890
891const boost::shared_ptr<ClientClusterServiceImpl::member_list_snapshot>
892 ClientClusterServiceImpl::EMPTY_SNAPSHOT(
893 new ClientClusterServiceImpl::member_list_snapshot{ -1 });
894
895constexpr boost::chrono::milliseconds
896 ClientClusterServiceImpl::INITIAL_MEMBERS_TIMEOUT;
897const endpoint_qualifier ClientClusterServiceImpl::CLIENT{ 1, "" };
898const endpoint_qualifier ClientClusterServiceImpl::MEMBER{ 0, "" };
899
900ClientClusterServiceImpl::ClientClusterServiceImpl(
901 hazelcast::client::spi::ClientContext& client)
902 : client_(client)
903 , member_list_snapshot_(EMPTY_SNAPSHOT)
904 , labels_(client.get_client_config().get_labels())
905 , initial_list_fetched_latch_(1)
906{}
907
908boost::uuids::uuid
909ClientClusterServiceImpl::add_membership_listener_without_init(
910 membership_listener&& listener)
911{
912 std::lock_guard<std::mutex> g(listeners_lock_);
913 auto id = client_.random_uuid();
914 listeners_.emplace(id, std::move(listener));
915 return id;
916}
917
918boost::optional<member>
919ClientClusterServiceImpl::get_member(boost::uuids::uuid uuid) const
920{
921 assert(!uuid.is_nil());
922 auto members_view_ptr = member_list_snapshot_.load();
923 const auto it = members_view_ptr->members.find(uuid);
924 if (it == members_view_ptr->members.end()) {
925 return boost::none;
926 }
927 return { it->second };
928}
929
930std::vector<member>
931ClientClusterServiceImpl::get_member_list() const
932{
933 auto members_view_ptr = member_list_snapshot_.load();
934 std::vector<member> result;
935 result.reserve(members_view_ptr->members.size());
936 for (const auto& e : members_view_ptr->members) {
937 result.emplace_back(e.second);
938 }
939 return result;
940}
941
942void
943ClientClusterServiceImpl::start()
944{
945 for (auto& listener :
946 client_.get_client_config().get_membership_listeners()) {
947 add_membership_listener(membership_listener(listener));
948 }
949}
950
951void
952ClientClusterServiceImpl::fire_initial_membership_event(
953 const initial_membership_event& event)
954{
955 std::lock_guard<std::mutex> g(listeners_lock_);
956
957 for (auto& item : listeners_) {
958 membership_listener& listener = item.second;
959 if (listener.init_) {
960 listener.init_(event);
961 }
962 }
963}
964
965void
966ClientClusterServiceImpl::shutdown()
967{
968 initial_list_fetched_latch_.try_count_down();
969}
970
971boost::uuids::uuid
972ClientClusterServiceImpl::add_membership_listener(
973 membership_listener&& listener)
974{
975 std::lock_guard<std::mutex> cluster_view_g(cluster_view_lock_);
976
977 auto id = add_membership_listener_without_init(std::move(listener));
978
979 std::lock_guard<std::mutex> listeners_g(listeners_lock_);
980 auto added_listener = listeners_[id];
981
982 if (added_listener.init_) {
983 auto& cluster = client_.get_cluster();
984 auto members_ptr = member_list_snapshot_.load();
985 if (!members_ptr->members.empty()) {
986 std::unordered_set<member> members;
987 for (const auto& e : members_ptr->members) {
988 members.insert(e.second);
989 }
990 added_listener.init_(initial_membership_event(cluster, members));
991 }
992 }
993
994 return id;
995}
996
997bool
998ClientClusterServiceImpl::remove_membership_listener(
999 boost::uuids::uuid registration_id)
1000{
1001 std::lock_guard<std::mutex> g(listeners_lock_);
1002 return listeners_.erase(registration_id) == 1;
1003}
1004
1005std::vector<member>
1006ClientClusterServiceImpl::get_members(const member_selector& selector) const
1007{
1008 std::vector<member> result;
1009 for (auto&& member : get_member_list()) {
1010 if (selector.select(member)) {
1011 result.emplace_back(std::move(member));
1012 }
1013 }
1014
1015 return result;
1016}
1017
1019ClientClusterServiceImpl::get_local_client() const
1020{
1021 connection::ClientConnectionManagerImpl& cm =
1022 client_.get_connection_manager();
1023 auto connection = cm.get_random_connection();
1024 auto inetSocketAddress =
1025 connection ? connection->get_local_socket_address() : boost::none;
1026 auto uuid = cm.get_client_uuid();
1027 return local_endpoint(
1028 uuid, std::move(inetSocketAddress), client_.get_name(), labels_);
1029}
1030
1031void
1032ClientClusterServiceImpl::clear_member_list_version()
1033{
1034 std::lock_guard<std::mutex> g(cluster_view_lock_);
1035 auto& lg = client_.get_logger();
1036 HZ_LOG(lg, finest, "Resetting the member list version ");
1037 auto cluster_view_snapshot = member_list_snapshot_.load();
1038 // This check is necessary so that `clear_member_list_version` when handling
1039 // auth response will not intervene with client failover logic
1040 if (cluster_view_snapshot != EMPTY_SNAPSHOT) {
1041 member_list_snapshot_.store(boost::shared_ptr<member_list_snapshot>(
1042 new member_list_snapshot{ 0, cluster_view_snapshot->members }));
1043 }
1044}
1045
1046std::vector<membership_event>
1047ClientClusterServiceImpl::clear_member_list_and_return_events()
1048{
1049 std::lock_guard<std::mutex> g(cluster_view_lock_);
1050
1051 auto& lg = client_.get_logger();
1052 HZ_LOG(lg, finest, "Resetting the member list");
1053
1054 auto previous_list = member_list_snapshot_.load()->members;
1055
1056 member_list_snapshot_.store(
1057 boost::shared_ptr<member_list_snapshot>(new member_list_snapshot{ 0 }));
1058
1059 return detect_membership_events(
1060 previous_list,
1061 std::unordered_map<boost::uuids::uuid,
1062 member,
1063 boost::hash<boost::uuids::uuid>>());
1064}
1065
1066void
1067ClientClusterServiceImpl::clear_member_list()
1068{
1069 auto events = clear_member_list_and_return_events();
1070 fire_events(std::move(events));
1071}
1072
1073void
1074ClientClusterServiceImpl::handle_event(int32_t version,
1075 const std::vector<member>& member_infos)
1076{
1077 auto& lg = client_.get_logger();
1078 HZ_LOG(
1079 lg,
1080 finest,
1081 boost::str(
1082 boost::format("Handling new snapshot with membership version: %1%, "
1083 "membersString %2%") %
1084 version % members_string(create_snapshot(version, member_infos))));
1085 auto cluster_view_snapshot = member_list_snapshot_.load();
1086 if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
1087 std::lock_guard<std::mutex> g(cluster_view_lock_);
1088 cluster_view_snapshot = member_list_snapshot_.load();
1089 if (cluster_view_snapshot == EMPTY_SNAPSHOT) {
1090 // this means this is the first time client connected to cluster
1091 apply_initial_state(version, member_infos);
1092 initial_list_fetched_latch_.count_down();
1093 return;
1094 }
1095 }
1096
1097 std::vector<membership_event> events;
1098 if (version >= cluster_view_snapshot->version) {
1099 std::lock_guard<std::mutex> g(cluster_view_lock_);
1100 cluster_view_snapshot = member_list_snapshot_.load();
1101 if (version >= cluster_view_snapshot->version) {
1102 auto prev_members = cluster_view_snapshot->members;
1103 auto snapshot = boost::make_shared<member_list_snapshot>(
1104 create_snapshot(version, member_infos));
1105 member_list_snapshot_.store(snapshot);
1106 events = detect_membership_events(prev_members, snapshot->members);
1107 }
1108 }
1109
1110 fire_events(std::move(events));
1111}
1112
1113ClientClusterServiceImpl::member_list_snapshot
1114ClientClusterServiceImpl::create_snapshot(int32_t version,
1115 const std::vector<member>& members)
1116{
1117 member_list_snapshot result;
1118 result.version = version;
1119 for (auto& m : members) {
1120 auto const& address_map = m.address_map();
1121 if (address_map.empty()) {
1122 result.members.insert({ m.get_uuid(), m });
1123 } else {
1124 auto found = address_map.find(CLIENT);
1125 address member_address;
1126 if (found != address_map.end()) {
1127 member_address = found->second;
1128 } else {
1129 found = address_map.find(MEMBER);
1130 assert(found != address_map.end());
1131 member_address = found->second;
1132 }
1133 member new_member(member_address,
1134 m.get_uuid(),
1135 m.is_lite_member(),
1136 m.get_attributes(),
1137 m.address_map(),
1138 m.get_version());
1139 result.members.emplace(new_member.get_uuid(),
1140 std::move(new_member));
1141 }
1142 }
1143
1144 return result;
1145}
1146
1147std::string
1148ClientClusterServiceImpl::members_string(
1149 const ClientClusterServiceImpl::member_list_snapshot& snapshot)
1150{
1151 std::stringstream out;
1152 auto const& members = snapshot.members;
1153 out << std::endl << std::endl << "Members [" << members.size() << "] {";
1154 for (auto const& e : members) {
1155 out << std::endl << "\t" << e.second;
1156 }
1157 out << std::endl << "}" << std::endl;
1158 return out.str();
1159}
1160
1161void
1162ClientClusterServiceImpl::apply_initial_state(
1163 int32_t version,
1164 const std::vector<member>& member_infos)
1165{
1166 auto snapshot = boost::make_shared<member_list_snapshot>(
1167 create_snapshot(version, member_infos));
1168 member_list_snapshot_.store(snapshot);
1169 HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
1170 std::unordered_set<member> members;
1171 for (auto const& e : snapshot->members) {
1172 members.insert(e.second);
1173 }
1174 std::lock_guard<std::mutex> g(listeners_lock_);
1175 for (auto& item : listeners_) {
1176 membership_listener& listener = item.second;
1177 if (listener.init_) {
1178 listener.init_(
1179 initial_membership_event(client_.get_cluster(), members));
1180 }
1181 }
1182}
1183
1184std::vector<membership_event>
1185ClientClusterServiceImpl::detect_membership_events(
1186 std::unordered_map<boost::uuids::uuid,
1187 member,
1188 boost::hash<boost::uuids::uuid>> previous_members,
1189 const std::unordered_map<boost::uuids::uuid,
1190 member,
1191 boost::hash<boost::uuids::uuid>>& current_members)
1192{
1193 std::vector<member> new_members;
1194
1195 for (auto const& e : current_members) {
1196 if (!previous_members.erase(e.first)) {
1197 new_members.emplace_back(e.second);
1198 }
1199 }
1200
1201 std::vector<membership_event> events;
1202
1203 // removal events should be added before added events
1204 for (auto const& e : previous_members) {
1205 events.emplace_back(
1206 client_.get_cluster(),
1207 e.second,
1208 membership_event::membership_event_type::MEMBER_LEFT,
1209 current_members);
1210 auto connection =
1211 client_.get_connection_manager().get_connection(e.second.get_uuid());
1212 if (connection) {
1213 connection->close(
1214 "",
1215 std::make_exception_ptr(exception::target_disconnected(
1216 "ClientClusterServiceImpl::detect_membership_events",
1217 (boost::format(
1218 "The client has closed the connection to this member, after "
1219 "receiving a member left event from the cluster. %1%") %
1220 *connection)
1221 .str())));
1222 }
1223 }
1224 for (auto const& member : new_members) {
1225 events.emplace_back(
1226 client_.get_cluster(),
1227 member,
1228 membership_event::membership_event_type::MEMBER_JOINED,
1229 current_members);
1230 }
1231
1232 if (!events.empty()) {
1233 auto snapshot = member_list_snapshot_.load();
1234 if (!snapshot->members.empty()) {
1235 HZ_LOG(client_.get_logger(), info, members_string(*snapshot));
1236 }
1237 }
1238 return events;
1239}
1240
1241void
1242ClientClusterServiceImpl::fire_events(std::vector<membership_event> events)
1243{
1244 std::lock_guard<std::mutex> g(listeners_lock_);
1245
1246 for (auto const& event : events) {
1247 for (auto& item : listeners_) {
1248 membership_listener& listener = item.second;
1249 if (event.get_event_type() ==
1250 membership_event::membership_event_type::MEMBER_JOINED) {
1251 listener.joined_(event);
1252 } else {
1253 listener.left_(event);
1254 }
1255 }
1256 }
1257}
1258
1259void
1260ClientClusterServiceImpl::wait_initial_member_list_fetched() const
1261{
1262 // safe to const cast here since latch operations are already thread safe
1263 // ops.
1264 if ((const_cast<boost::latch&>(initial_list_fetched_latch_))
1265 .wait_for(INITIAL_MEMBERS_TIMEOUT) == boost::cv_status::timeout) {
1266 BOOST_THROW_EXCEPTION(exception::illegal_state(
1267 "ClientClusterServiceImpl::wait_initial_member_list_fetched",
1268 "Could not get initial member list from cluster!"));
1269 }
1270}
1271
1272bool
1273ClientInvocationServiceImpl::invoke_on_connection(
1274 const std::shared_ptr<ClientInvocation>& invocation,
1275 const std::shared_ptr<connection::Connection>& connection)
1276{
1277 return send(invocation, connection);
1278}
1279
1280bool
1281ClientInvocationServiceImpl::invoke_on_partition_owner(
1282 const std::shared_ptr<ClientInvocation>& invocation,
1283 int partition_id)
1284{
1285 auto partition_owner =
1286 client_.get_partition_service().get_partition_owner(partition_id);
1287 if (partition_owner.is_nil()) {
1288 HZ_LOG(logger_,
1289 finest,
1290 boost::str(
1291 boost::format(
1292 "Partition owner is not assigned yet for partition %1%") %
1293 partition_id));
1294 return false;
1295 }
1296 return invoke_on_target(invocation, partition_owner);
1297}
1298
1299bool
1300ClientInvocationServiceImpl::invoke_on_target(
1301 const std::shared_ptr<ClientInvocation>& invocation,
1302 boost::uuids::uuid uuid)
1303{
1304 assert(!uuid.is_nil());
1305 auto connection = client_.get_connection_manager().get_connection(uuid);
1306 if (!connection) {
1307 HZ_LOG(
1308 logger_,
1309 finest,
1310 boost::str(boost::format("Client is not connected to target : %1%") %
1311 uuid));
1312 return false;
1313 }
1314 return send(invocation, connection);
1315}
1316
1317bool
1318ClientInvocationServiceImpl::is_smart_routing() const
1319{
1320 return smart_routing_;
1321}
1322
1323const std::chrono::milliseconds&
1324ClientInvocationServiceImpl::get_backup_timeout() const
1325{
1326 return backup_timeout_;
1327}
1328
1329bool
1330ClientInvocationServiceImpl::fail_on_indeterminate_state() const
1331{
1332 return fail_on_indeterminate_operation_state_;
1333}
1334
1335ClientExecutionServiceImpl::ClientExecutionServiceImpl(
1336 const std::string& name,
1337 const client_properties& properties,
1338 int32_t user_pool_size,
1339 spi::lifecycle_service& service)
1340 : lifecycle_service_(service)
1341 , client_properties_(properties)
1342 , user_pool_size_(user_pool_size)
1343{
1344 (void)name;
1345}
1346
1347void
1348ClientExecutionServiceImpl::start()
1349{
1350 int internalPoolSize = client_properties_.get_integer(
1351 client_properties_.get_internal_executor_pool_size());
1352 if (internalPoolSize <= 0) {
1353 internalPoolSize = util::IOUtil::to_value<int>(
1354 client_properties::INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT);
1355 }
1356
1357 internal_executor_.reset(
1358 new hazelcast::util::hz_thread_pool(internalPoolSize));
1359
1360 if (user_pool_size_ <= 0) {
1361 user_executor_.reset(new hazelcast::util::hz_thread_pool());
1362 } else {
1363 user_executor_.reset(
1364 new hazelcast::util::hz_thread_pool(user_pool_size_));
1365 }
1366
1367 schema_replication_executor_.reset(new hazelcast::util::hz_thread_pool());
1368}
1369
1370void
1371ClientExecutionServiceImpl::shutdown()
1372{
1373 shutdown_thread_pool(internal_executor_.get());
1374 shutdown_thread_pool(user_executor_.get());
1375 shutdown_thread_pool(schema_replication_executor_.get());
1376}
1377
1378util::hz_thread_pool&
1379ClientExecutionServiceImpl::get_user_executor()
1380{
1381 return *user_executor_;
1382}
1383
1384util::hz_thread_pool&
1385ClientExecutionServiceImpl::get_schema_replication_executor()
1386{
1387 return *schema_replication_executor_;
1388}
1389
1390void
1391ClientExecutionServiceImpl::shutdown_thread_pool(
1392 hazelcast::util::hz_thread_pool* pool)
1393{
1394 if (!pool) {
1395 return;
1396 }
1397 pool->close();
1398}
1399
1400constexpr int ClientInvocation::MAX_FAST_INVOCATION_COUNT;
1401constexpr int ClientInvocation::UNASSIGNED_PARTITION;
1402
1403ClientInvocation::ClientInvocation(
1404 spi::ClientContext& client_context,
1405 std::shared_ptr<protocol::ClientMessage>&& message,
1406 const std::string& name,
1407 int partition,
1408 const std::shared_ptr<connection::Connection>& conn,
1409 boost::uuids::uuid uuid)
1410 : logger_(client_context.get_logger())
1411 , lifecycle_service_(client_context.get_lifecycle_service())
1412 , invocation_service_(client_context.get_invocation_service())
1413 , execution_service_(
1414 client_context.get_client_execution_service().shared_from_this())
1415 , schema_service_(client_context.get_schema_service())
1416 , call_id_sequence_(client_context.get_call_id_sequence())
1417 , uuid_(uuid)
1418 , partition_id_(partition)
1419 , start_time_(std::chrono::steady_clock::now())
1420 , retry_pause_(invocation_service_.get_invocation_retry_pause())
1421 , object_name_(name)
1422 , connection_(conn)
1423 , bound_to_single_connection_(conn != nullptr)
1424 , invoke_count_(0)
1425 , urgent_(false)
1426 , smart_routing_(invocation_service_.is_smart_routing())
1427{
1428 message->set_partition_id(partition_id_);
1429 client_message_ =
1430 boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(message);
1431 set_send_connection(nullptr);
1432}
1433
1434ClientInvocation::~ClientInvocation() = default;
1435
1436boost::future<protocol::ClientMessage>
1437ClientInvocation::invoke()
1438{
1439 assert(client_message_.load());
1440
1441 auto actual_work = [this]() {
1442 // for back pressure
1443 call_id_sequence_->next();
1444 invoke_on_selection();
1445 if (!lifecycle_service_.is_running()) {
1446 return invocation_promise_.get_future().then(
1447 [](boost::future<protocol::ClientMessage> f) { return f.get(); });
1448 }
1449 auto id_seq = call_id_sequence_;
1450 return invocation_promise_.get_future().then(
1451 execution_service_->get_user_executor(),
1452 [=](boost::future<protocol::ClientMessage> f) {
1453 id_seq->complete();
1454 return f.get();
1455 });
1456 };
1457
1458 const auto& schemas =
1459 (*(client_message_.load()))->schemas_will_be_replicated();
1460
1461 if (!schemas.empty()) {
1462 auto self = shared_from_this();
1463
1464 return replicate_schemas(schemas)
1465 .then(boost::launch::sync,
1466 [actual_work, self](boost::future<void> replication) {
1467 replication.get();
1468
1469 return actual_work();
1470 })
1471 .unwrap();
1472 }
1473
1474 return actual_work();
1475}
1476
1477boost::future<protocol::ClientMessage>
1478ClientInvocation::invoke_urgent()
1479{
1480 assert(client_message_.load());
1481 urgent_ = true;
1482
1483 // for back pressure
1484 call_id_sequence_->force_next();
1485 invoke_on_selection();
1486 if (!lifecycle_service_.is_running()) {
1487 return invocation_promise_.get_future().then(
1488 [](boost::future<protocol::ClientMessage> f) { return f.get(); });
1489 }
1490 auto id_seq = call_id_sequence_;
1491 return invocation_promise_.get_future().then(
1492 execution_service_->get_user_executor(),
1493 [=](boost::future<protocol::ClientMessage> f) {
1494 id_seq->complete();
1495 return f.get();
1496 });
1497}
1498
1499boost::future<void>
1500ClientInvocation::replicate_schemas(
1501 std::vector<serialization::pimpl::schema> schemas)
1502{
1503 std::weak_ptr<ClientInvocation> self = shared_from_this();
1504
1505 return boost::async(
1506 execution_service_->get_schema_replication_executor(), [self, schemas]() {
1507 auto invocation = self.lock();
1508
1509 if (!invocation)
1510 return;
1511
1512 for (const serialization::pimpl::schema& s : schemas) {
1513 invocation->schema_service_.replicate_schema_in_cluster(s);
1514 }
1515 });
1516}
1517
1518void
1519ClientInvocation::invoke_on_selection()
1520{
1521 try {
1522 invoke_count_++;
1523 if (urgent_) {
1524 invocation_service_.check_urgent_invocation_allowed(*this);
1525 } else {
1526 invocation_service_.check_invocation_allowed();
1527 }
1528
1529 if (is_bind_to_single_connection()) {
1530 bool invoked = false;
1531 auto conn = connection_.lock();
1532 if (conn) {
1533 invoked = invocation_service_.invoke_on_connection(
1534 shared_from_this(), conn);
1535 }
1536 if (!invoked) {
1537 std::string message;
1538 if (conn) {
1539 message =
1540 (boost::format("Could not invoke on connection %1%") %
1541 *conn)
1542 .str();
1543 } else {
1544 message = "Could not invoke. Bound to a connection that is "
1545 "deleted already.";
1546 }
1547 notify_exception(std::make_exception_ptr(exception::io(
1548 "ClientInvocation::invoke_on_selection", message)));
1549 }
1550 return;
1551 }
1552
1553 bool invoked = false;
1554 if (smart_routing_) {
1555 if (partition_id_ != -1) {
1556 invoked = invocation_service_.invoke_on_partition_owner(
1557 shared_from_this(), partition_id_);
1558 } else if (!uuid_.is_nil()) {
1559 invoked = invocation_service_.invoke_on_target(
1560 shared_from_this(), uuid_);
1561 } else {
1562 invoked = invocation_service_.invoke(shared_from_this());
1563 }
1564 if (!invoked) {
1565 invoked = invocation_service_.invoke(shared_from_this());
1566 }
1567 } else {
1568 invoked = invocation_service_.invoke(shared_from_this());
1569 }
1570 if (!invoked) {
1571 notify_exception(std::make_exception_ptr(
1572 exception::io("No connection found to invoke")));
1573 }
1574 } catch (exception::iexception&) {
1575 notify_exception(std::current_exception());
1576 } catch (std::exception&) {
1577 assert(false);
1578 }
1579}
1580
1581bool
1582ClientInvocation::is_bind_to_single_connection() const
1583{
1584 return bound_to_single_connection_;
1585}
1586
1587void
1588ClientInvocation::run()
1589{
1590 retry();
1591}
1592
1593void
1594ClientInvocation::retry()
1595{
1596 // retry modifies the client message and should not reuse the client
1597 // message. It could be the case that it is in write queue of the
1598 // connection.
1599 client_message_ =
1600 boost::make_shared<std::shared_ptr<protocol::ClientMessage>>(
1601 copy_message());
1602
1603 try {
1604 invoke_on_selection();
1605 } catch (exception::iexception& e) {
1606 set_exception(e, boost::current_exception());
1607 } catch (std::exception&) {
1608 assert(false);
1609 }
1610}
1611
1612void
1613ClientInvocation::set_exception(const std::exception& e,
1614 boost::exception_ptr exception_ptr)
1615{
1616 invoked_or_exception_set_.store(true);
1617 try {
1618 auto send_conn = send_connection_.load();
1619 if (send_conn) {
1620 auto connection = send_conn->lock();
1621 if (connection) {
1622 auto call_id =
1623 client_message_.load()->get()->get_correlation_id();
1624 boost::asio::post(
1625 connection->get_socket().get_executor(),
1626 [=]() { connection->deregister_invocation(call_id); });
1627 }
1628 }
1629 invocation_promise_.set_exception(std::move(exception_ptr));
1630 } catch (boost::promise_already_satisfied& se) {
1631 if (!event_handler_) {
1632 HZ_LOG(logger_,
1633 finest,
1634 boost::str(boost::format(
1635 "Failed to set the exception for invocation. "
1636 "%1%, %2% Exception to be set: %3%") %
1637 se.what() % *this % e.what()));
1638 }
1639 }
1640}
1641
1642void
1643ClientInvocation::notify_exception(std::exception_ptr exception)
1644{
1645 erase_invocation();
1646 try {
1647 std::rethrow_exception(exception);
1648 } catch (exception::iexception& iex) {
1649 log_exception(iex);
1650
1651 if (!lifecycle_service_.is_running()) {
1652 try {
1653 std::throw_with_nested(boost::enable_current_exception(
1654 exception::hazelcast_client_not_active(
1655 iex.get_source(), "Client is shutting down")));
1656 } catch (exception::iexception& e) {
1657 set_exception(e, boost::current_exception());
1658 }
1659 return;
1660 }
1661
1662 if (!should_retry(iex)) {
1663 set_exception(iex, boost::current_exception());
1664 return;
1665 }
1666
1667 auto timePassed = std::chrono::steady_clock::now() - start_time_;
1668 if (timePassed > invocation_service_.get_invocation_timeout()) {
1669 HZ_LOG(
1670 logger_,
1671 finest,
1672 boost::str(boost::format("Exception will not be retried because "
1673 "invocation timed out. %1%") %
1674 iex.what()));
1675
1676 auto now = std::chrono::steady_clock::now();
1677
1678 auto timeoutException =
1679 (exception::exception_builder<exception::operation_timeout>(
1680 "ClientInvocation::newoperation_timeout_exception")
1681 << *this
1682 << " timed out because exception occurred after client "
1683 "invocation timeout "
1684 << std::chrono::duration_cast<std::chrono::milliseconds>(
1685 invocation_service_.get_invocation_timeout())
1686 .count()
1687 << "msecs. Last exception:" << iex << " Current time :"
1688 << util::StringUtil::time_to_string(now) << ". "
1689 << "Start time: "
1690 << util::StringUtil::time_to_string(start_time_)
1691 << ". Total elapsed time: "
1692 << std::chrono::duration_cast<std::chrono::milliseconds>(
1693 now - start_time_)
1694 .count()
1695 << " ms. ")
1696 .build();
1697 try {
1698 BOOST_THROW_EXCEPTION(timeoutException);
1699 } catch (...) {
1700 set_exception(timeoutException, boost::current_exception());
1701 }
1702
1703 return;
1704 }
1705
1706 try {
1707 execute();
1708 } catch (std::exception& e) {
1709 set_exception(e, boost::current_exception());
1710 }
1711 } catch (...) {
1712 assert(false);
1713 }
1714}
1715
1716void
1717ClientInvocation::erase_invocation() const
1718{
1719 if (!this->event_handler_) {
1720 auto sent_connection = get_send_connection();
1721 if (sent_connection) {
1722 auto this_invocation = shared_from_this();
1723 boost::asio::post(sent_connection->get_socket().get_executor(),
1724 [=]() {
1725 sent_connection->invocations.erase(
1726 this_invocation->get_client_message()
1727 ->get_correlation_id());
1728 });
1729 }
1730 }
1731}
1732
1733bool
1734ClientInvocation::should_retry(exception::iexception& exception)
1735{
1736 auto errorCode = exception.get_error_code();
1737 if (is_bind_to_single_connection() &&
1738 (errorCode == protocol::IO ||
1739 errorCode == protocol::TARGET_DISCONNECTED)) {
1740 return false;
1741 }
1742
1743 if (!uuid_.is_nil() && errorCode == protocol::TARGET_NOT_MEMBER) {
1744 // when invocation send to a specific member
1745 // if target is no longer a member, we should not retry
1746 // note that this exception could come from the server
1747 return false;
1748 }
1749
1750 if (errorCode == protocol::IO ||
1751 errorCode == protocol::HAZELCAST_INSTANCE_NOT_ACTIVE ||
1752 exception.is_retryable()) {
1753 return true;
1754 }
1755 if (errorCode == protocol::TARGET_DISCONNECTED) {
1756 return client_message_.load()->get()->is_retryable() ||
1757 invocation_service_.is_redo_operation();
1758 }
1759 return false;
1760}
1761
1762std::ostream&
1763operator<<(std::ostream& os, const ClientInvocation& invocation)
1764{
1765 std::ostringstream target;
1766 if (invocation.is_bind_to_single_connection()) {
1767 auto conn = invocation.connection_.lock();
1768 if (conn) {
1769 target << "connection " << *conn;
1770 }
1771 } else if (invocation.partition_id_ != -1) {
1772 target << "partition " << invocation.partition_id_;
1773 } else if (!invocation.uuid_.is_nil()) {
1774 target << "uuid " << boost::to_string(invocation.uuid_);
1775 } else {
1776 target << "random";
1777 }
1778 os << "ClientInvocation{"
1779 << "requestMessage = " << *invocation.client_message_.load()->get()
1780 << ", objectName = " << invocation.object_name_
1781 << ", target = " << target.str() << ", sendConnection = ";
1782 auto sendConnection = invocation.get_send_connection();
1783 if (sendConnection) {
1784 os << *sendConnection;
1785 } else {
1786 os << "nullptr";
1787 }
1788 os << ", backup_acks_expected_ = "
1789 << static_cast<int>(invocation.backup_acks_expected_)
1790 << ", backup_acks_received = " << invocation.backup_acks_received_;
1791
1792 if (invocation.pending_response_) {
1793 os << ", pending_response: " << *invocation.pending_response_;
1794 }
1795
1796 os << '}';
1797
1798 return os;
1799}
1800
1801std::shared_ptr<ClientInvocation>
1802ClientInvocation::create(
1803 spi::ClientContext& client_context,
1804 std::shared_ptr<protocol::ClientMessage>&& client_message,
1805 const std::string& object_name,
1806 int partition_id)
1807{
1808 return std::shared_ptr<ClientInvocation>(new ClientInvocation(
1809 client_context, std::move(client_message), object_name, partition_id));
1810}
1811
1812std::shared_ptr<ClientInvocation>
1813ClientInvocation::create(
1814 spi::ClientContext& client_context,
1815 std::shared_ptr<protocol::ClientMessage>&& client_message,
1816 const std::string& object_name,
1817 const std::shared_ptr<connection::Connection>& connection)
1818{
1819 return std::shared_ptr<ClientInvocation>(
1820 new ClientInvocation(client_context,
1821 std::move(client_message),
1822 object_name,
1823 UNASSIGNED_PARTITION,
1824 connection));
1825}
1826
1827std::shared_ptr<ClientInvocation>
1828ClientInvocation::create(
1829 spi::ClientContext& client_context,
1830 std::shared_ptr<protocol::ClientMessage>&& client_message,
1831 const std::string& object_name,
1832 boost::uuids::uuid uuid)
1833{
1834 return std::shared_ptr<ClientInvocation>(
1835 new ClientInvocation(client_context,
1836 std::move(client_message),
1837 object_name,
1838 UNASSIGNED_PARTITION,
1839 nullptr,
1840 uuid));
1841}
1842
1843std::shared_ptr<ClientInvocation>
1844ClientInvocation::create(spi::ClientContext& client_context,
1845 protocol::ClientMessage& client_message,
1846 const std::string& object_name,
1847 int partition_id)
1848{
1849 return create(
1850 client_context,
1851 std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1852 object_name,
1853 partition_id);
1854}
1855
1856std::shared_ptr<ClientInvocation>
1857ClientInvocation::create(
1858 spi::ClientContext& client_context,
1859 protocol::ClientMessage& client_message,
1860 const std::string& object_name,
1861 const std::shared_ptr<connection::Connection>& connection)
1862{
1863 return create(
1864 client_context,
1865 std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1866 object_name,
1867 connection);
1868}
1869
1870std::shared_ptr<ClientInvocation>
1871ClientInvocation::create(spi::ClientContext& client_context,
1872 protocol::ClientMessage& client_message,
1873 const std::string& object_name,
1874 boost::uuids::uuid uuid)
1875{
1876 return create(
1877 client_context,
1878 std::make_shared<protocol::ClientMessage>(std::move(client_message)),
1879 object_name,
1880 uuid);
1881}
1882
1883std::shared_ptr<connection::Connection>
1884ClientInvocation::get_send_connection() const
1885{
1886 return send_connection_.load()->lock();
1887}
1888
1889void
1890ClientInvocation::wait_invoked() const
1891{
1892 // it could be either invoked or cancelled before invoked
1893 while (!invoked_or_exception_set_) {
1894 std::this_thread::sleep_for(retry_pause_);
1895 }
1896}
1897
1898void
1899ClientInvocation::set_send_connection(
1900 const std::shared_ptr<connection::Connection>& conn)
1901{
1902 send_connection_.store(
1903 boost::make_shared<std::weak_ptr<connection::Connection>>(conn));
1904 invoked_or_exception_set_.store(true);
1905}
1906
1907void
1908ClientInvocation::notify(const std::shared_ptr<protocol::ClientMessage>& msg)
1909{
1910 if (!msg) {
1911 BOOST_THROW_EXCEPTION(
1912 exception::illegal_argument("response can't be null"));
1913 }
1914
1915 int8_t expected_backups = msg->get_number_of_backups();
1916
1917 // if a regular response comes and there are backups, we need to wait for
1918 // the backups when the backups complete, the response will be send by the
1919 // last backup or backup-timeout-handle mechanism kicks on
1920 if (expected_backups > backup_acks_received_) {
1921 // so the invocation has backups and since not all backups have
1922 // completed, we need to wait (it could be that backups arrive earlier
1923 // than the response)
1924
1925 pending_response_received_time_ = std::chrono::steady_clock::now();
1926
1927 backup_acks_expected_ = expected_backups;
1928
1929 // it is very important that the response is set after the
1930 // backupsAcksExpected is set, else the system can assume the invocation
1931 // is complete because there is a response and no backups need to
1932 // respond
1933 pending_response_ = msg;
1934
1935 // we are done since not all backups have completed. Therefore we should
1936 // not notify the future
1937 return;
1938 }
1939
1940 // we are going to notify the future that a response is available; this can
1941 // happen when:
1942 // - we had a regular operation (so no backups we need to wait for) that
1943 // completed
1944 // - we had a backup-aware operation that has completed, but also all its
1945 // backups have completed
1946 complete(msg);
1947}
1948
1949void
1950ClientInvocation::complete(const std::shared_ptr<protocol::ClientMessage>& msg)
1951{
1952 try {
1953 // TODO: move msg content here?
1954 this->invocation_promise_.set_value(*msg);
1955 } catch (std::exception& e) {
1956 HZ_LOG(logger_,
1957 warning,
1958 boost::str(boost::format(
1959 "Failed to set the response for invocation. "
1960 "Dropping the response. %1%, %2% Response: %3%") %
1961 e.what() % *this % *msg));
1962 }
1963 this->erase_invocation();
1964}
1965
1966std::shared_ptr<protocol::ClientMessage>
1967ClientInvocation::get_client_message() const
1968{
1969 return *client_message_.load();
1970}
1971
1972const std::shared_ptr<EventHandler<protocol::ClientMessage>>&
1973ClientInvocation::get_event_handler() const
1974{
1975 return event_handler_;
1976}
1977
1978void
1979ClientInvocation::set_event_handler(
1980 const std::shared_ptr<EventHandler<protocol::ClientMessage>>& handler)
1981{
1982 ClientInvocation::event_handler_ = handler;
1983}
1984
1985void
1986ClientInvocation::execute()
1987{
1988 auto this_invocation = shared_from_this();
1989 auto command = [=]() { this_invocation->run(); };
1990
1991 // first we force a new invocation slot because we are going to return our
1992 // old invocation slot immediately after It is important that we first
1993 // 'force' taking a new slot; otherwise it could be that a sneaky invocation
1994 // gets through that takes our slot!
1995 int64_t callId = call_id_sequence_->force_next();
1996 client_message_.load()->get()->set_correlation_id(callId);
1997
1998 // we release the old slot
1999 call_id_sequence_->complete();
2000
2001 if (invoke_count_ < MAX_FAST_INVOCATION_COUNT) {
2002 // fast retry for the first few invocations
2003 execution_service_->execute(command);
2004 } else {
2005 // progressive retry delay
2006 int64_t delayMillis = util::min<int64_t>(
2007 static_cast<int64_t>(1)
2008 << (invoke_count_ - MAX_FAST_INVOCATION_COUNT),
2009 std::chrono::duration_cast<std::chrono::milliseconds>(retry_pause_)
2010 .count());
2011 retry_timer_ = execution_service_->schedule(
2012 command, std::chrono::milliseconds(delayMillis));
2013 }
2014}
2015
2016const std::string
2017ClientInvocation::get_name() const
2018{
2019 return "ClientInvocation";
2020}
2021
2022std::shared_ptr<protocol::ClientMessage>
2023ClientInvocation::copy_message()
2024{
2025 return std::make_shared<protocol::ClientMessage>(**client_message_.load());
2026}
2027
2028boost::promise<protocol::ClientMessage>&
2029ClientInvocation::get_promise()
2030{
2031 return invocation_promise_;
2032}
2033
2034void
2035ClientInvocation::log_exception(exception::iexception& e)
2036{
2037 HZ_LOG(logger_,
2038 finest,
2039 boost::str(boost::format(
2040 "Invocation got an exception %1%, invoke count : %2%, "
2041 "exception : %3%") %
2042 *this % invoke_count_.load() % e));
2043}
2044
2045void
2046ClientInvocation::notify_backup()
2047{
2048 ++backup_acks_received_;
2049
2050 if (!pending_response_) {
2051 // no pendingResponse has been set, so we are done since the invocation
2052 // on the primary needs to complete first
2053 return;
2054 }
2055
2056 // if a pendingResponse is set, then the backupsAcksExpected has been set
2057 // (so we can now safely read backupsAcksExpected)
2058 if (backup_acks_expected_ != backup_acks_received_) {
2059 // we managed to complete a backup, but we were not the one completing
2060 // the last backup, so we are done
2061 return;
2062 }
2063
2064 // we are the lucky one since we just managed to complete the last backup
2065 // for this invocation and since the pendingResponse is set, we can set it
2066 // on the future
2067 complete_with_pending_response();
2068}
2069
2070void
2071ClientInvocation::detect_and_handle_backup_timeout(
2072 const std::chrono::milliseconds& backup_timeout)
2073{
2074 // if the backups have completed, we are done; this also filters out all non
2075 // backup-aware operations since the backupsAcksExpected will always be
2076 // equal to the backupsAcksReceived
2077 if (backup_acks_expected_ == backup_acks_received_) {
2078 return;
2079 }
2080
2081 // if no response has yet been received, we we are done; we are only going
2082 // to re-invoke an operation if the response of the primary has been
2083 // received, but the backups have not replied
2084 if (!pending_response_) {
2085 return;
2086 }
2087
2088 // if this has not yet expired (so has not been in the system for a too long
2089 // period) we ignore it
2090 if (pending_response_received_time_ + backup_timeout >=
2091 std::chrono::steady_clock::now()) {
2092 return;
2093 }
2094
2095 if (invocation_service_.fail_on_indeterminate_state()) {
2096 auto exception = boost::enable_current_exception(
2097 (exception::exception_builder<
2098 exception::indeterminate_operation_state>(
2099 "ClientInvocation::detect_and_handle_backup_timeout")
2100 << *this << " failed because backup acks missed.")
2101 .build());
2102 notify_exception(std::make_exception_ptr(exception));
2103 return;
2104 }
2105
2106 // the backups have not yet completed, but we are going to release the
2107 // future anyway if a pendingResponse has been set
2108 complete_with_pending_response();
2109}
2110
2111void
2112ClientInvocation::complete_with_pending_response()
2113{
2114 complete(pending_response_);
2115}
2116
2117ClientContext&
2118impl::ClientTransactionManagerServiceImpl::get_client() const
2119{
2120 return client_;
2121}
2122
2123ClientTransactionManagerServiceImpl::ClientTransactionManagerServiceImpl(
2124 ClientContext& client)
2125 : client_(client)
2126{}
2127
2128std::shared_ptr<connection::Connection>
2129ClientTransactionManagerServiceImpl::connect()
2130{
2131 auto& invocationService = client_.get_invocation_service();
2132 auto startTime = std::chrono::steady_clock::now();
2133 auto invocationTimeout = invocationService.get_invocation_timeout();
2134 client_config& clientConfig = client_.get_client_config();
2135 bool smartRouting = clientConfig.get_network_config().is_smart_routing();
2136
2137 while (client_.get_lifecycle_service().is_running()) {
2138 try {
2139 auto connection =
2140 client_.get_connection_manager().get_random_connection();
2141 if (!connection) {
2142 throw_exception(smartRouting);
2143 }
2144 return connection;
2145 } catch (exception::hazelcast_client_offline&) {
2146 throw;
2147 } catch (exception::iexception&) {
2148 if (std::chrono::steady_clock::now() - startTime >
2149 invocationTimeout) {
2150 std::rethrow_exception(new_operation_timeout_exception(
2151 std::current_exception(), invocationTimeout, startTime));
2152 }
2153 }
2154 std::this_thread::sleep_for(
2155 invocationService.get_invocation_retry_pause());
2156 }
2157 BOOST_THROW_EXCEPTION(exception::hazelcast_client_not_active(
2158 "ClientTransactionManagerServiceImpl::connect", "Client is shutdown"));
2159}
2160
2161std::exception_ptr
2162ClientTransactionManagerServiceImpl::new_operation_timeout_exception(
2163 std::exception_ptr cause,
2164 std::chrono::milliseconds invocation_timeout,
2165 std::chrono::steady_clock::time_point start_time)
2166{
2167 std::ostringstream sb;
2168 auto now = std::chrono::steady_clock::now();
2169 sb << "Creating transaction context timed out because exception occurred "
2170 "after client invocation timeout "
2171 << std::chrono::duration_cast<std::chrono::milliseconds>(
2172 invocation_timeout)
2173 .count()
2174 << " ms. "
2175 << "Current time: "
2176 << util::StringUtil::time_to_string(std::chrono::steady_clock::now())
2177 << ". "
2178 << "Start time: " << util::StringUtil::time_to_string(start_time)
2179 << ". Total elapsed time: "
2180 << std::chrono::duration_cast<std::chrono::milliseconds>(now -
2181 start_time)
2182 .count()
2183 << " ms. ";
2184 try {
2185 std::rethrow_exception(cause);
2186 } catch (...) {
2187 try {
2188 std::throw_with_nested(boost::enable_current_exception(
2189 exception::operation_timeout("ClientTransactionManagerServiceImpl"
2190 "::newoperation_timeout_exception",
2191 sb.str())));
2192 } catch (...) {
2193 return std::current_exception();
2194 }
2195 }
2196 return nullptr;
2197}
2198
2199void
2200ClientTransactionManagerServiceImpl::throw_exception(bool smart_routing)
2201{
2202 auto& client_config = client_.get_client_config();
2203 auto& connection_strategy_Config =
2204 client_config.get_connection_strategy_config();
2205 auto reconnect_mode = connection_strategy_Config.get_reconnect_mode();
2206 if (reconnect_mode ==
2208 BOOST_THROW_EXCEPTION(exception::hazelcast_client_offline(
2209 "ClientTransactionManagerServiceImpl::throw_exception", ""));
2210 }
2211 if (smart_routing) {
2212 auto members = client_.get_cluster().get_members();
2213 std::ostringstream msg;
2214 if (members.empty()) {
2215 msg << "No address was return by the LoadBalancer since there are "
2216 "no members in the cluster";
2217 } else {
2218 msg << "No address was return by the LoadBalancer. "
2219 "But the cluster contains the following members:{\n";
2220 for (auto const& m : members) {
2221 msg << '\t' << m << '\n';
2222 }
2223 msg << "}";
2224 }
2225 BOOST_THROW_EXCEPTION(exception::illegal_state(
2226 "ClientTransactionManagerServiceImpl::throw_exception", msg.str()));
2227 }
2228 BOOST_THROW_EXCEPTION(exception::illegal_state(
2229 "ClientTransactionManagerServiceImpl::throw_exception",
2230 "No active connection is found"));
2231}
2232
2233ClientPartitionServiceImpl::ClientPartitionServiceImpl(ClientContext& client)
2234 : client_(client)
2235 , logger_(client.get_logger())
2236 , partition_count_(0)
2237 , partition_table_(
2238 boost::shared_ptr<partition_table>(new partition_table{ 0, -1 }))
2239{}
2240
2241void
2242ClientPartitionServiceImpl::handle_event(
2243 int32_t connection_id,
2244 int32_t version,
2245 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>>&
2246 partitions)
2247{
2248 HZ_LOG(logger_,
2249 finest,
2250 boost::str(
2251 boost::format(
2252 "Handling new partition table with partitionStateVersion: %1%") %
2253 version));
2254
2255 while (true) {
2256 auto current = partition_table_.load();
2257 if (!should_be_applied(connection_id, version, partitions, *current)) {
2258 return;
2259 }
2260 if (partition_table_.compare_exchange_strong(
2261 current,
2262 boost::shared_ptr<partition_table>(new partition_table{
2263 connection_id, version, convert_to_map(partitions) }))) {
2264 HZ_LOG(
2265 logger_,
2266 finest,
2267 boost::str(
2268 boost::format(
2269 "Applied partition table with partitionStateVersion : %1%") %
2270 version));
2271 return;
2272 }
2273 }
2274}
2275
2276boost::uuids::uuid
2277ClientPartitionServiceImpl::get_partition_owner(int32_t partition_id)
2278{
2279 auto table_ptr = partition_table_.load();
2280 auto it = table_ptr->partitions.find(partition_id);
2281 if (it != table_ptr->partitions.end()) {
2282 return it->second;
2283 }
2284 return boost::uuids::nil_uuid();
2285}
2286
2287int32_t
2288ClientPartitionServiceImpl::get_partition_id(
2289 const serialization::pimpl::data& key)
2290{
2291 int32_t pc = get_partition_count();
2292 if (pc <= 0) {
2293 return 0;
2294 }
2295 int hash = key.get_partition_hash();
2296 return util::HashUtil::hash_to_index(hash, pc);
2297}
2298
2299int32_t
2300ClientPartitionServiceImpl::get_partition_count()
2301{
2302 return partition_count_.load();
2303}
2304
2305std::shared_ptr<client::impl::Partition>
2306ClientPartitionServiceImpl::get_partition(int partition_id)
2307{
2308 return std::shared_ptr<client::impl::Partition>(
2309 new PartitionImpl(partition_id, client_, *this));
2310}
2311
2312bool
2313ClientPartitionServiceImpl::check_and_set_partition_count(
2314 int32_t new_partition_count)
2315{
2316 int32_t expected = 0;
2317 if (partition_count_.compare_exchange_strong(expected,
2318 new_partition_count)) {
2319 return true;
2320 }
2321 return partition_count_.load() == new_partition_count;
2322}
2323
2324bool
2325ClientPartitionServiceImpl::should_be_applied(
2326 int32_t connection_id,
2327 int32_t version,
2328 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>>&
2329 partitions,
2330 const partition_table& current)
2331{
2332 auto& lg = client_.get_logger();
2333 if (partitions.empty()) {
2334 if (logger_.enabled(logger::level::finest)) {
2335 log_failure(connection_id, version, current, "response is empty");
2336 }
2337 return false;
2338 }
2339 if (!current.connection_id || connection_id != current.connection_id) {
2340 HZ_LOG(
2341 lg, finest, ([&current, connection_id]() {
2342 auto frmt = boost::format(
2343 "Event coming from a new connection. Old connection id: %1%, "
2344 "new connection %2%");
2345
2346 if (current.connection_id) {
2347 frmt = frmt % current.connection_id;
2348 } else {
2349 frmt = frmt % "none";
2350 }
2351
2352 return boost::str(frmt % connection_id);
2353 })());
2354
2355 return true;
2356 }
2357 if (version <= current.version) {
2358 if (lg.enabled(logger::level::finest)) {
2359 log_failure(
2360 connection_id, version, current, "response state version is old");
2361 }
2362 return false;
2363 }
2364 return true;
2365}
2366
2367void
2368ClientPartitionServiceImpl::log_failure(
2369 int32_t connection_id,
2370 int32_t version,
2371 const ClientPartitionServiceImpl::partition_table& current,
2372 const std::string& cause)
2373{
2374 HZ_LOG(logger_, finest, [&]() {
2375 auto frmt = boost::format(
2376 " We will not apply the response, since %1% ."
2377 " Response is from connection with id %2%. "
2378 "Current connection id is %3%, response state version:%4%. "
2379 "Current state version: %5%");
2380 if (current.connection_id) {
2381 return boost::str(frmt % cause % connection_id %
2382 current.connection_id % version %
2383 current.version);
2384 } else {
2385 return boost::str(frmt % cause % connection_id % "nullptr" %
2386 version % current.version);
2387 }
2388 }());
2389}
2390
2391void
2392ClientPartitionServiceImpl::reset()
2393{
2394 partition_table_.store(nullptr);
2395}
2396
2397std::unordered_map<int32_t, boost::uuids::uuid>
2398ClientPartitionServiceImpl::convert_to_map(
2399 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>>&
2400 partitions)
2401{
2402 std::unordered_map<int32_t, boost::uuids::uuid> new_partitions;
2403 for (auto const& e : partitions) {
2404 for (auto pid : e.second) {
2405 new_partitions.insert({ pid, e.first });
2406 }
2407 }
2408 return new_partitions;
2409}
2410
2411int
2412ClientPartitionServiceImpl::PartitionImpl::get_partition_id() const
2413{
2414 return partition_id_;
2415}
2416
2417boost::optional<member>
2418ClientPartitionServiceImpl::PartitionImpl::get_owner() const
2419{
2420 auto owner = partition_service_.get_partition_owner(partition_id_);
2421 if (!owner.is_nil()) {
2422 return client_.get_client_cluster_service().get_member(owner);
2423 }
2424 return boost::none;
2425}
2426
2427ClientPartitionServiceImpl::PartitionImpl::PartitionImpl(
2428 int partition_id,
2429 ClientContext& client,
2430 ClientPartitionServiceImpl& partition_service)
2431 : partition_id_(partition_id)
2432 , client_(client)
2433 , partition_service_(partition_service)
2434{}
2435
2436namespace sequence {
2437CallIdSequenceWithoutBackpressure::CallIdSequenceWithoutBackpressure()
2438 : head_(0)
2439{}
2440
2441CallIdSequenceWithoutBackpressure::~CallIdSequenceWithoutBackpressure() =
2442 default;
2443
2444int32_t
2445CallIdSequenceWithoutBackpressure::get_max_concurrent_invocations() const
2446{
2447 return INT32_MAX;
2448}
2449
2450int64_t
2451CallIdSequenceWithoutBackpressure::next()
2452{
2453 return force_next();
2454}
2455
2456int64_t
2457CallIdSequenceWithoutBackpressure::force_next()
2458{
2459 return ++head_;
2460}
2461
2462void
2463CallIdSequenceWithoutBackpressure::complete()
2464{
2465 // no-op
2466}
2467
2468int64_t
2469CallIdSequenceWithoutBackpressure::get_last_call_id()
2470{
2471 return head_;
2472}
2473
2474// TODO: see if we can utilize std::hardware_destructive_interference_size
2475AbstractCallIdSequence::AbstractCallIdSequence(
2476 int32_t max_concurrent_invocations)
2477{
2478 std::ostringstream out;
2479 out << "maxConcurrentInvocations should be a positive number. "
2480 "maxConcurrentInvocations="
2481 << max_concurrent_invocations;
2482 this->max_concurrent_invocations_ = util::Preconditions::check_positive(
2483 max_concurrent_invocations, out.str());
2484
2485 for (size_t i = 0; i < longs_.size(); ++i) {
2486 longs_[i] = 0;
2487 }
2488}
2489
2490AbstractCallIdSequence::~AbstractCallIdSequence() = default;
2491
2492int32_t
2493AbstractCallIdSequence::get_max_concurrent_invocations() const
2494{
2495 return max_concurrent_invocations_;
2496}
2497
2498int64_t
2499AbstractCallIdSequence::next()
2500{
2501 if (!has_space()) {
2502 handle_no_space_left();
2503 }
2504 return force_next();
2505}
2506
2507int64_t
2508AbstractCallIdSequence::force_next()
2509{
2510 return ++longs_[INDEX_HEAD];
2511}
2512
2513void
2514AbstractCallIdSequence::complete()
2515{
2516 ++longs_[INDEX_TAIL];
2517 assert(longs_[INDEX_TAIL] <= longs_[INDEX_HEAD]);
2518}
2519
2520int64_t
2521AbstractCallIdSequence::get_last_call_id()
2522{
2523 return longs_[INDEX_HEAD];
2524}
2525
2526bool
2527AbstractCallIdSequence::has_space()
2528{
2529 return longs_[INDEX_HEAD] - longs_[INDEX_TAIL] <
2530 max_concurrent_invocations_;
2531}
2532
2533int64_t
2534AbstractCallIdSequence::get_tail()
2535{
2536 return longs_[INDEX_TAIL];
2537}
2538
2539const std::unique_ptr<util::concurrent::IdleStrategy>
2540 CallIdSequenceWithBackpressure::IDLER(
2541 new util::concurrent::BackoffIdleStrategy(
2542 0,
2543 0,
2544 std::chrono::duration_cast<std::chrono::nanoseconds>(
2545 std::chrono::microseconds(1000))
2546 .count(),
2547 std::chrono::duration_cast<std::chrono::nanoseconds>(
2548 std::chrono::microseconds(MAX_DELAY_MS * 1000))
2549 .count()));
2550
2551CallIdSequenceWithBackpressure::CallIdSequenceWithBackpressure(
2552 int32_t max_concurrent_invocations,
2553 int64_t backoff_timeout_ms)
2554 : AbstractCallIdSequence(max_concurrent_invocations)
2555{
2556 std::ostringstream out;
2557 out << "backoffTimeoutMs should be a positive number. backoffTimeoutMs="
2558 << backoff_timeout_ms;
2559 util::Preconditions::check_positive(backoff_timeout_ms, out.str());
2560
2561 backoff_timeout_nanos_ =
2562 std::chrono::duration_cast<std::chrono::nanoseconds>(
2563 std::chrono::milliseconds(backoff_timeout_ms))
2564 .count();
2565}
2566
2567void
2568CallIdSequenceWithBackpressure::handle_no_space_left()
2569{
2570 auto start = std::chrono::steady_clock::now();
2571 for (int64_t idleCount = 0;; idleCount++) {
2572 int64_t elapsedNanos =
2573 std::chrono::duration_cast<std::chrono::nanoseconds>(
2574 std::chrono::steady_clock::now() - start)
2575 .count();
2576 if (elapsedNanos > backoff_timeout_nanos_) {
2578 "CallIdSequenceWithBackpressure::handleNoSpaceLeft")
2579 << "Timed out trying to acquire another call ID."
2580 << " maxConcurrentInvocations = "
2581 << get_max_concurrent_invocations() << ", backoffTimeout = "
2582 << std::chrono::microseconds(backoff_timeout_nanos_ / 1000)
2583 .count()
2584 << " msecs, elapsed:"
2585 << std::chrono::microseconds(elapsedNanos / 1000).count()
2586 << " msecs")
2587 .build();
2588 }
2589 IDLER->idle(idleCount);
2590 if (has_space()) {
2591 return;
2592 }
2593 }
2594}
2595
2596FailFastCallIdSequence::FailFastCallIdSequence(
2597 int32_t max_concurrent_invocations)
2598 : AbstractCallIdSequence(max_concurrent_invocations)
2599{}
2600
2601void
2602FailFastCallIdSequence::handle_no_space_left()
2603{
2605 "FailFastCallIdSequence::handleNoSpaceLeft")
2606 << "Maximum invocation count is reached. maxConcurrentInvocations = "
2607 << get_max_concurrent_invocations())
2608 .build();
2609}
2610
2611std::unique_ptr<CallIdSequence>
2612CallIdFactory::new_call_id_sequence(bool is_back_pressure_enabled,
2613 int32_t max_allowed_concurrent_invocations,
2614 int64_t backoff_timeout_ms)
2615{
2616 if (!is_back_pressure_enabled) {
2617 return std::unique_ptr<CallIdSequence>(
2618 new CallIdSequenceWithoutBackpressure());
2619 } else if (backoff_timeout_ms <= 0) {
2620 return std::unique_ptr<CallIdSequence>(
2621 new FailFastCallIdSequence(max_allowed_concurrent_invocations));
2622 } else {
2623 return std::unique_ptr<CallIdSequence>(
2624 new CallIdSequenceWithBackpressure(max_allowed_concurrent_invocations,
2625 backoff_timeout_ms));
2626 }
2627}
2628} // namespace sequence
2629
2630namespace listener {
2631listener_service_impl::listener_service_impl(ClientContext& client_context,
2632 int32_t event_thread_count)
2633 : client_context_(client_context)
2634 , serialization_service_(client_context.get_serialization_service())
2635 , logger_(client_context.get_logger())
2636 , client_connection_manager_(client_context.get_connection_manager())
2637 , number_of_event_threads_(event_thread_count)
2638 , smart_(client_context.get_client_config()
2639 .get_network_config()
2640 .is_smart_routing())
2641{
2642 auto& invocationService = client_context.get_invocation_service();
2643 invocation_timeout_ = invocationService.get_invocation_timeout();
2644 invocation_retry_pause_ = invocationService.get_invocation_retry_pause();
2645}
2646
2647bool
2648listener_service_impl::registers_local_only() const
2649{
2650 return smart_;
2651}
2652
2653boost::future<boost::uuids::uuid>
2654listener_service_impl::register_listener(
2655 std::shared_ptr<ListenerMessageCodec> listener_message_codec,
2656 std::shared_ptr<client::impl::BaseEventHandler> handler)
2657{
2658 auto task = boost::packaged_task<boost::uuids::uuid()>([=]() {
2659 return register_listener_internal(listener_message_codec, handler);
2660 });
2661 auto f = task.get_future();
2662 boost::asio::post(registration_executor_->get_executor(), std::move(task));
2663 return f;
2664}
2665
2666boost::future<bool>
2667listener_service_impl::deregister_listener(boost::uuids::uuid registration_id)
2668{
2669 util::Preconditions::check_not_nill(
2670 registration_id, "Nil userRegistrationId is not allowed!");
2671
2672 boost::packaged_task<bool()> task(
2673 [=]() { return deregister_listener_internal(registration_id); });
2674 auto f = task.get_future();
2675 boost::asio::post(registration_executor_->get_executor(), std::move(task));
2676 return f;
2677}
2678
2679void
2680listener_service_impl::connection_added(
2681 const std::shared_ptr<connection::Connection> connection)
2682{
2683 boost::asio::post(registration_executor_->get_executor(),
2684 [=]() { connection_added_internal(connection); });
2685}
2686
2687void
2688listener_service_impl::connection_removed(
2689 const std::shared_ptr<connection::Connection> connection)
2690{
2691 boost::asio::post(registration_executor_->get_executor(),
2692 [=]() { connection_removed_internal(connection); });
2693}
2694
2695void
2696listener_service_impl::remove_event_handler(
2697 int64_t call_id,
2698 const std::shared_ptr<connection::Connection>& connection)
2699{
2700 boost::asio::post(connection->get_socket().get_executor(),
2701 std::packaged_task<void()>(
2702 [=]() { connection->deregister_invocation(call_id); }));
2703}
2704
2705void
2706listener_service_impl::handle_client_message(
2707 const std::shared_ptr<ClientInvocation> invocation,
2708 const std::shared_ptr<protocol::ClientMessage> response)
2709{
2710 try {
2711 auto partitionId = response->get_partition_id();
2712 if (partitionId == -1) {
2713 // execute on random thread on the thread pool
2714 boost::asio::post(event_executor_->get_executor(), [=]() {
2715 process_event_message(invocation, response);
2716 });
2717 return;
2718 }
2719
2720 // process on certain thread which is same for the partition id
2721 boost::asio::post(
2722 event_strands_[partitionId % event_strands_.size()],
2723 [=]() { process_event_message(invocation, response); });
2724
2725 } catch (const std::exception& e) {
2726 if (client_context_.get_lifecycle_service().is_running()) {
2727 HZ_LOG(
2728 logger_,
2729 warning,
2730 boost::str(boost::format("Delivery of event message to event "
2731 "handler failed. %1%, %2%, %3%") %
2732 e.what() % *response % *invocation));
2733 }
2734 }
2735}
2736
2737void
2738listener_service_impl::shutdown()
2739{
2740 ClientExecutionServiceImpl::shutdown_thread_pool(event_executor_.get());
2741 ClientExecutionServiceImpl::shutdown_thread_pool(
2742 registration_executor_.get());
2743}
2744
2745void
2746listener_service_impl::start()
2747{
2748 event_executor_.reset(
2749 new hazelcast::util::hz_thread_pool(number_of_event_threads_));
2750 registration_executor_.reset(new hazelcast::util::hz_thread_pool(1));
2751
2752 for (int i = 0; i < number_of_event_threads_; ++i) {
2753 event_strands_.emplace_back(event_executor_->get_executor());
2754 }
2755
2756 client_connection_manager_.add_connection_listener(shared_from_this());
2757}
2758
2759boost::uuids::uuid
2760listener_service_impl::register_listener_internal(
2761 std::shared_ptr<ListenerMessageCodec> listener_message_codec,
2762 std::shared_ptr<client::impl::BaseEventHandler> handler)
2763{
2764 auto user_registration_id = client_context_.random_uuid();
2765
2766 std::shared_ptr<listener_registration> registration(
2767 new listener_registration{ listener_message_codec, handler });
2768 registrations_.put(user_registration_id, registration);
2769 for (auto const& connection :
2770 client_connection_manager_.get_active_connections()) {
2771 try {
2772 invoke(registration, connection);
2773 } catch (exception::iexception& e) {
2774 if (connection->is_alive()) {
2775 deregister_listener_internal(user_registration_id);
2776 BOOST_THROW_EXCEPTION(
2778 "ClientListenerService::RegisterListenerTask::call")
2779 << "Listener can not be added " << e)
2780 .build());
2781 }
2782 }
2783 }
2784 return user_registration_id;
2785}
2786
2787bool
2788listener_service_impl::deregister_listener_internal(
2789 boost::uuids::uuid user_registration_id)
2790{
2791 auto listenerRegistration = registrations_.get(user_registration_id);
2792 if (!listenerRegistration) {
2793 return false;
2794 }
2795 bool successful = true;
2796
2797 auto listener_registrations =
2798 listenerRegistration->registrations.entry_set();
2799 for (auto it = listener_registrations.begin();
2800 it != listener_registrations.end();) {
2801 const auto& registration = it->second;
2802 const auto& subscriber = it->first;
2803 try {
2804 const auto& listenerMessageCodec = listenerRegistration->codec;
2805 auto serverRegistrationId = registration->server_registration_id;
2806 auto request =
2807 listenerMessageCodec->encode_remove_request(serverRegistrationId);
2808 auto invocation = ClientInvocation::create(
2809 client_context_, request, "", subscriber);
2810 invocation->invoke().get();
2811
2812 remove_event_handler(registration->call_id, subscriber);
2813
2814 it = listener_registrations.erase(it);
2815 } catch (exception::iexception& e) {
2816 ++it;
2817 if (subscriber->is_alive()) {
2818 successful = false;
2819 std::ostringstream endpoint;
2820 if (subscriber->get_remote_address()) {
2821 endpoint << *subscriber->get_remote_address();
2822 } else {
2823 endpoint << "null";
2824 }
2825 HZ_LOG(logger_,
2826 warning,
2827 boost::str(
2828 boost::format(
2829 "ClientListenerService::deregisterListenerInternal "
2830 "Deregistration of listener with ID %1% "
2831 "has failed to address %2% %3%") %
2832 user_registration_id %
2833 subscriber->get_remote_address() % e));
2834 }
2835 }
2836 }
2837 if (successful) {
2838 registrations_.remove(user_registration_id);
2839 }
2840 return successful;
2841}
2842
2843void
2844listener_service_impl::connection_added_internal(
2845 const std::shared_ptr<connection::Connection>& connection)
2846{
2847 for (const auto& listener_registration : registrations_.values()) {
2848 invoke_from_internal_thread(listener_registration, connection);
2849 }
2850}
2851
2852void
2853listener_service_impl::connection_removed_internal(
2854 const std::shared_ptr<connection::Connection>& connection)
2855{
2856 for (auto& registry : registrations_.values()) {
2857 registry->registrations.remove(connection);
2858 }
2859}
2860
2861void
2862listener_service_impl::invoke_from_internal_thread(
2863 const std::shared_ptr<listener_registration>& listener_registration,
2864 const std::shared_ptr<connection::Connection>& connection)
2865{
2866 try {
2867 invoke(listener_registration, connection);
2868 } catch (exception::iexception& e) {
2869 HZ_LOG(logger_,
2870 warning,
2871 boost::str(
2872 boost::format("Listener with pointer %1% can not be added to "
2873 "a new connection: %2%, reason: %3%") %
2874 listener_registration.get() % *connection % e));
2875 }
2876}
2877
2878void
2879listener_service_impl::invoke(
2880 const std::shared_ptr<listener_registration>& listener_registration,
2881 const std::shared_ptr<connection::Connection>& connection)
2882{
2883 if (listener_registration->registrations.contains_key(connection)) {
2884 return;
2885 }
2886
2887 const auto& codec = listener_registration->codec;
2888 auto request = codec->encode_add_request(registers_local_only());
2889 const auto& handler = listener_registration->handler;
2890 handler->before_listener_register();
2891
2892 auto invocation = ClientInvocation::create(
2893 client_context_,
2894 std::make_shared<protocol::ClientMessage>(std::move(request)),
2895 "",
2896 connection);
2897 invocation->set_event_handler(handler);
2898 auto clientMessage = invocation->invoke_urgent().get();
2899
2900 auto serverRegistrationId = codec->decode_add_response(clientMessage);
2901 handler->on_listener_register();
2902 int64_t correlationId =
2903 invocation->get_client_message()->get_correlation_id();
2904
2905 (*listener_registration)
2906 .registrations.put(
2907 connection,
2908 std::shared_ptr<connection_registration>(
2909 new connection_registration{ serverRegistrationId, correlationId }));
2910}
2911
2912void
2913listener_service_impl::process_event_message(
2914 const std::shared_ptr<ClientInvocation> invocation,
2915 const std::shared_ptr<protocol::ClientMessage> response)
2916{
2917 auto eventHandler = invocation->get_event_handler();
2918 if (!eventHandler) {
2919 if (client_context_.get_lifecycle_service().is_running()) {
2920 HZ_LOG(logger_,
2921 warning,
2922 boost::str(
2923 boost::format("No eventHandler for invocation. "
2924 "Ignoring this invocation response. %1%") %
2925 *invocation));
2926 }
2927
2928 return;
2929 }
2930
2931 try {
2932 eventHandler->handle(*response);
2933 } catch (std::exception& e) {
2934 if (client_context_.get_lifecycle_service().is_running()) {
2935 HZ_LOG(
2936 logger_,
2937 warning,
2938 boost::str(boost::format("Delivery of event message to event "
2939 "handler failed. %1%, %2%, %3%") %
2940 e.what() % *response % *invocation));
2941 }
2942 }
2943}
2944
2945listener_service_impl::~listener_service_impl() = default;
2946
2947void
2948cluster_view_listener::start()
2949{
2950 client_context_.get_connection_manager().add_connection_listener(
2951 shared_from_this());
2952}
2953
2954void
2955cluster_view_listener::connection_added(
2956 const std::shared_ptr<connection::Connection> connection)
2957{
2958 try_register(connection);
2959}
2960
2961void
2962cluster_view_listener::connection_removed(
2963 const std::shared_ptr<connection::Connection> connection)
2964{
2965 try_reregister_to_random_connection(connection->get_connection_id());
2966}
2967
2968cluster_view_listener::cluster_view_listener(ClientContext& client_context)
2969 : client_context_(client_context)
2970{}
2971
2972void
2973cluster_view_listener::try_register(
2974 std::shared_ptr<connection::Connection> connection)
2975{
2976 int32_t expected_id = -1;
2977 if (!listener_added_connection_id_.compare_exchange_strong(
2978 expected_id, connection->get_connection_id())) {
2979 // already registering/registered to another connection
2980 return;
2981 }
2982
2983 auto invocation = ClientInvocation::create(
2984 client_context_,
2985 std::make_shared<protocol::ClientMessage>(
2986 protocol::codec::client_addclusterviewlistener_encode()),
2987 "",
2988 connection);
2989
2990 auto handler = std::make_shared<event_handler>(
2991 connection->get_connection_id(), *this, client_context_.get_logger());
2992 invocation->set_event_handler(handler);
2993 handler->before_listener_register();
2994
2995 std::weak_ptr<cluster_view_listener> weak_self = shared_from_this();
2996 auto conn_id = connection->get_connection_id();
2997
2998 invocation->invoke_urgent().then(
2999 boost::launch::sync,
3000 [weak_self, handler, conn_id](boost::future<protocol::ClientMessage> f) {
3001 auto self = weak_self.lock();
3002 if (!self)
3003 return;
3004
3005 if (f.has_value()) {
3006 handler->on_listener_register();
3007 return;
3008 }
3009
3010 try {
3011 f.get();
3012 } catch (exception::hazelcast_client_not_active& e) {
3017 return;
3018 }
3019 // completes with exception, listener needs to be reregistered
3020 self->try_reregister_to_random_connection(conn_id);
3021 });
3022}
3023
3024void
3025cluster_view_listener::try_reregister_to_random_connection(
3026 int32_t old_connection_id)
3027{
3028 if (!listener_added_connection_id_.compare_exchange_strong(
3029 old_connection_id, -1)) {
3030 // somebody else already trying to reregister
3031 return;
3032 }
3033 auto new_connection =
3034 client_context_.get_connection_manager().get_random_connection();
3035 if (new_connection) {
3036 try_register(new_connection);
3037 }
3038}
3039
3040cluster_view_listener::~cluster_view_listener() = default;
3041
3042void
3043cluster_view_listener::event_handler::handle_membersview(
3044 int32_t version,
3045 const std::vector<member>& member_infos)
3046{
3047 view_listener.client_context_.get_client_cluster_service().handle_event(
3048 version, member_infos);
3049}
3050
3051void
3052cluster_view_listener::event_handler::handle_partitionsview(
3053 int32_t version,
3054 const std::vector<std::pair<boost::uuids::uuid, std::vector<int>>>&
3055 partitions)
3056{
3057 view_listener.client_context_.get_partition_service().handle_event(
3058 connection_id, version, partitions);
3059}
3060
3061void
3062cluster_view_listener::event_handler::before_listener_register()
3063{
3064 view_listener.client_context_.get_client_cluster_service()
3065 .clear_member_list_version();
3066 auto& lg = view_listener.client_context_.get_logger();
3067 HZ_LOG(
3068 lg,
3069 finest,
3070 boost::str(boost::format(
3071 "Register attempt of cluster_view_listener::event_handler "
3072 "to connection with id %1%") %
3073 connection_id));
3074}
3075
3076void
3077cluster_view_listener::event_handler::on_listener_register()
3078{
3079 auto& lg = view_listener.client_context_.get_logger();
3080 HZ_LOG(lg,
3081 finest,
3082 boost::str(
3083 boost::format("Registered cluster_view_listener::event_handler to "
3084 "connection with id %1%") %
3085 connection_id));
3086}
3087
3088cluster_view_listener::event_handler::event_handler(
3089 int connectionId,
3090 cluster_view_listener& viewListener,
3091 logger &logger)
3092 : protocol::codec::client_addclusterviewlistener_handler(logger)
3093 , connection_id(connectionId)
3094 , view_listener(viewListener)
3095{}
3096} // namespace listener
3097
3098protocol::ClientMessage
3099ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_add_request(
3100 bool /* local_only */) const
3101{
3102 return protocol::codec::client_localbackuplistener_encode();
3103}
3104
3105protocol::ClientMessage
3106ClientInvocationServiceImpl::BackupListenerMessageCodec::encode_remove_request(
3107 boost::uuids::uuid /* real_registration_id */) const
3108{
3109 assert(0);
3110 return protocol::ClientMessage(0);
3111}
3112
3113void
3114ClientInvocationServiceImpl::noop_backup_event_handler::handle_backup(
3115 int64_t /* source_invocation_correlation_id */)
3116{
3117 assert(0);
3118}
3119ClientInvocationServiceImpl::noop_backup_event_handler::
3120 noop_backup_event_handler(logger& l)
3121 : client_localbackuplistener_handler(l)
3122{
3123}
3124
3125namespace discovery {
3126remote_address_provider::remote_address_provider(
3127 std::function<std::unordered_map<address, address>()> addr_map_method,
3128 bool use_public)
3129 : refresh_address_map_(std::move(addr_map_method))
3130 , use_public_(use_public)
3131{}
3132
3133std::vector<address>
3134remote_address_provider::load_addresses()
3135{
3136 auto address_map = refresh_address_map_();
3137 std::lock_guard<std::mutex> guard(lock_);
3138 private_to_public_ = address_map;
3139 std::vector<address> addresses;
3140 addresses.reserve(address_map.size());
3141 for (const auto& addr_pair : address_map) {
3142 addresses.push_back(addr_pair.first);
3143 }
3144 return addresses;
3145}
3146
3147boost::optional<address>
3148remote_address_provider::translate(const address& addr)
3149{
3150 // if it is inside cloud, return private address otherwise we need to
3151 // translate it.
3152 if (!use_public_) {
3153 return addr;
3154 }
3155
3156 {
3157 std::lock_guard<std::mutex> guard(lock_);
3158 auto found = private_to_public_.find(addr);
3159 if (found != private_to_public_.end()) {
3160 return found->second;
3161 }
3162 }
3163
3164 auto address_map = refresh_address_map_();
3165
3166 std::lock_guard<std::mutex> guard(lock_);
3167 private_to_public_ = address_map;
3168
3169 auto found = private_to_public_.find(addr);
3170 if (found != private_to_public_.end()) {
3171 return found->second;
3172 }
3173
3174 return boost::none;
3175}
3176
3177#ifdef HZ_BUILD_WITH_SSL
3178cloud_discovery::cloud_discovery(config::cloud_config& config,
3179 std::string cloud_base_url,
3180 std::chrono::steady_clock::duration timeout)
3181 : cloud_config_(config)
3182 , cloud_base_url_(cloud_base_url)
3183 , timeout_(timeout)
3184{}
3185#else
3186cloud_discovery::cloud_discovery(config::cloud_config& /* config */,
3187 std::string /* cloud_base_url */,
3188 std::chrono::steady_clock::duration /* timeout */)
3189{}
3190#endif // HZ_BUILD_WITH_SSL
3191
3192std::unordered_map<address, address>
3193cloud_discovery::get_addresses()
3194{
3195#ifdef HZ_BUILD_WITH_SSL
3196 try {
3197 util::SyncHttpsClient httpsConnection(cloud_base_url_,
3198 std::string(CLOUD_URL_PATH) +
3199 cloud_config_.discovery_token,
3200 timeout_,
3201 cloud_config_.discovery_token);
3202 auto& conn_stream = httpsConnection.connect_and_get_response();
3203 return parse_json_response(conn_stream);
3204 } catch (std::exception& e) {
3205 std::throw_with_nested(
3206 boost::enable_current_exception(exception::illegal_state(
3207 "cloud_discovery::get_addresses", e.what())));
3208 }
3209#else
3210 util::Preconditions::check_ssl("cloud_discovery::get_addresses");
3211 return std::unordered_map<address, address>();
3212#endif
3213}
3214
3215std::unordered_map<address, address>
3216cloud_discovery::parse_json_response(std::istream& conn_stream)
3217{
3218 namespace pt = boost::property_tree;
3219
3220 pt::ptree root;
3221 pt::read_json(conn_stream, root);
3222
3223 std::unordered_map<address, address> addresses;
3224 for (const auto& item : root) {
3225 auto private_address =
3226 item.second.get<std::string>(PRIVATE_ADDRESS_PROPERTY);
3227 auto public_address =
3228 item.second.get<std::string>(PUBLIC_ADDRESS_PROPERTY);
3229
3230 address public_addr = create_address(public_address, -1);
3231 // if it is not explicitly given, create the private address with public
3232 // addresses port
3233 auto private_addr =
3234 create_address(private_address, public_addr.get_port());
3235 addresses.emplace(std::move(private_addr), std::move(public_addr));
3236 }
3237
3238 return addresses;
3239}
3240
3241address
3242cloud_discovery::create_address(const std::string& hostname, int default_port)
3243{
3244 auto address_holder =
3245 util::AddressUtil::get_address_holder(hostname, default_port);
3246 auto scoped_hostname =
3247 util::AddressHelper::get_scoped_hostname(address_holder);
3248 return address(std::move(scoped_hostname), address_holder.get_port());
3249}
3250} // namespace discovery
3251} // namespace impl
3252} // namespace spi
3253} // namespace client
3254} // namespace hazelcast
3255
3256namespace std {
3257bool
3258less<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
3259 const hazelcast::client::spi::DefaultObjectNamespace& lhs,
3260 const hazelcast::client::spi::DefaultObjectNamespace& rhs) const
3261{
3262 int result = lhs.get_service_name().compare(rhs.get_service_name());
3263 if (result < 0) {
3264 return true;
3265 }
3266
3267 if (result > 0) {
3268 return false;
3269 }
3270
3271 return lhs.get_object_name().compare(rhs.get_object_name()) < 0;
3272}
3273
3274std::size_t
3275hash<hazelcast::client::spi::DefaultObjectNamespace>::operator()(
3276 const hazelcast::client::spi::DefaultObjectNamespace& k) const noexcept
3277{
3278 return std::hash<std::string>()(k.get_service_name() + k.get_object_name());
3279}
3280} // namespace std
Represents an address of a client or member in the cluster.
Definition address.h:37
hazelcast_client configuration class.
config::client_network_config & get_network_config()
Gets {}.
Definition config.cpp:1203
Client Properties is an internal class.
Hazelcast cluster interface.
Definition cluster.h:37
Contains configuration parameters for client network related behaviour.
bool is_smart_routing() const
See client_network_config::setSmartRouting(boolean) for details.
Definition config.cpp:364
Endpoint represents a peer in the cluster.
Definition endpoint.h:35
Base class for all exception originated from Hazelcast methods.
Definition iexception.h:49
A event that is sent when a MembershipListener is registered.
const std::unordered_set< member > & get_members() const
Returns an immutable set of ordered members at the moment this MembershipListener is registered.
Definition spi.cpp:62
cluster & get_cluster()
Returns the cluster of the event.
Definition spi.cpp:68
Event to be fired when lifecycle states are changed.
lifecycle_state get_state() const
Definition spi.cpp:85
lifecycle_event(lifecycle_state state)
Constructor.
Definition spi.cpp:80
Listener object for listening lifecycle events of hazelcast instance.
The Client interface allows to get information about a connected client's socket address,...
hz_cluster member class.
Definition member.h:62
STL namespace.
hazelcast.cloud configuration to let the client connect the cluster via hazelcast....