Hazelcast C++ Client
Hazelcast C++ Client Library
Loading...
Searching...
No Matches
client_impl.cpp
1/*
2 * Copyright (c) 2008-2025, Hazelcast, Inc. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <utility>
18#include <vector>
19#include <functional>
20
21#include <boost/format.hpp>
22
23#include "hazelcast/util/Util.h"
24#include "hazelcast/util/IOUtil.h"
25#include "hazelcast/client/hazelcast_client.h"
26#include "hazelcast/client/transaction_context.h"
27#include "hazelcast/client/cluster.h"
28#include "hazelcast/client/spi/lifecycle_service.h"
29#include "hazelcast/client/lifecycle_listener.h"
30#include "hazelcast/client/exception/protocol_exceptions.h"
31#include "hazelcast/client/aws/aws_client.h"
32#include "hazelcast/client/spi/impl/discovery/cloud_discovery.h"
33#include "hazelcast/client/impl/statistics/Statistics.h"
34#include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
35#include "hazelcast/client/impl/ClientLockReferenceIdGenerator.h"
36#include "hazelcast/client/spi/impl/ClientInvocationServiceImpl.h"
37#include "hazelcast/client/spi/impl/ClientPartitionServiceImpl.h"
38#include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
39#include "hazelcast/client/spi/impl/sequence/CallIdFactory.h"
40#include "hazelcast/client/spi/impl/DefaultAddressProvider.h"
41#include "hazelcast/client/spi/impl/discovery/remote_address_provider.h"
42#include "hazelcast/client/spi/impl/listener/cluster_view_listener.h"
43#include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
44#include "hazelcast/client/load_balancer.h"
45#include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
46#include "hazelcast/client/proxy/flake_id_generator_impl.h"
47#include "hazelcast/client/map/NearCachedClientMapProxy.h"
48#include "hazelcast/logger.h"
49#include "hazelcast/client/member_selectors.h"
50#include "hazelcast/client/client_properties.h"
51#include "hazelcast/client/big_decimal.h"
52#include "hazelcast/client/local_time.h"
53#include "hazelcast/client/local_date.h"
54#include "hazelcast/client/local_date_time.h"
55#include "hazelcast/client/offset_date_time.h"
56#ifndef HAZELCAST_VERSION
57#define HAZELCAST_VERSION "NOT_FOUND"
58#endif
59
60namespace hazelcast {
61namespace client {
62hazelcast_client::hazelcast_client()
63 : client_impl_(
64 new impl::hazelcast_client_instance_impl(*this, client_config()))
65{
66 client_impl_->start();
67}
68
69hazelcast_client::hazelcast_client(client_config config)
70 : client_impl_(
71 new impl::hazelcast_client_instance_impl(*this, std::move(config)))
72{
73 client_impl_->start();
74}
75
76const std::string&
78{
79 return client_impl_->get_name();
80}
81
84{
85 return client_impl_->get_client_config();
86}
87
90{
91 return client_impl_->new_transaction_context();
92}
93
96{
97 return client_impl_->new_transaction_context(options);
98}
99
100cluster&
102{
103 return client_impl_->get_cluster();
104}
105
106boost::uuids::uuid
109{
110 return client_impl_->add_lifecycle_listener(std::move(lifecycle_listener));
111}
112
113bool
115 const boost::uuids::uuid& registration_id)
116{
117 return client_impl_->remove_lifecycle_listener(registration_id);
118}
119
120boost::future<void>
122{
123 return boost::async([=]() { client_impl_->shutdown(); });
124}
125
126spi::lifecycle_service&
128{
129 return client_impl_->get_lifecycle_service();
130}
131
134{
135 return client_impl_->get_local_endpoint();
136}
137
138hazelcast_client::~hazelcast_client() = default;
139
142{
143 return client_impl_->get_cp_subsystem();
144}
145
148{
149 return client_impl_->get_sql();
150}
151
152const boost::string_view
153version()
154{
155 return HAZELCAST_VERSION;
156}
157
158namespace impl {
159std::atomic<int32_t> hazelcast_client_instance_impl::CLIENT_ID(0);
160
161hazelcast_client_instance_impl::hazelcast_client_instance_impl(
163 client_config config)
164 : client_config_(std::move(config))
165 , client_properties_(client_config_.get_properties())
166 , client_context_(*this)
167 , schema_service_{ client_context_ }
168 , serialization_service_(client_config_.get_serialization_config(),
169 schema_service_)
170 , cluster_service_(client_context_)
171 , transaction_manager_(client_context_)
172 , cluster_(cluster_service_)
173 , lifecycle_service_(client_context_,
174 client_config_.get_lifecycle_listeners())
175 , proxy_manager_(client_context_)
176 , id_(++CLIENT_ID)
177 , random_generator_(std::random_device{}())
178 , uuid_generator_{ random_generator_ }
179 , cp_subsystem_(client_context_)
180 , sql_service_(client_context_)
181 , proxy_session_manager_(client_context_)
182{
183 auto& name = client_config_.get_instance_name();
184 if (name) {
185 instance_name_ = *name;
186 } else {
187 std::ostringstream out;
188 out << "hz.client_" << id_;
189 instance_name_ = out.str();
190 }
191
192 auto logger_config = client_config_.get_logger_config();
193 logger_ = std::make_shared<logger>(instance_name_,
194 client_config_.get_cluster_name(),
195 logger_config.level(),
196 logger_config.handler());
197
198 execution_service_ = init_execution_service();
199
200 initalize_near_cache_manager();
201
202 int32_t maxAllowedConcurrentInvocations = client_properties_.get_integer(
203 client_properties_.get_max_concurrent_invocations());
204 int64_t backofftimeoutMs = client_properties_.get_long(
205 client_properties_.get_backpressure_backoff_timeout_millis());
206 bool isBackPressureEnabled = maxAllowedConcurrentInvocations != INT32_MAX;
207 call_id_sequence_ =
208 spi::impl::sequence::CallIdFactory::new_call_id_sequence(
209 isBackPressureEnabled,
210 maxAllowedConcurrentInvocations,
211 backofftimeoutMs);
212
213 auto address_provider = create_address_provider();
214
215 connection_manager_ =
216 std::make_shared<connection::ClientConnectionManagerImpl>(
217 client_context_, std::move(address_provider));
218
219 cluster_listener_.reset(
220 new spi::impl::listener::cluster_view_listener(client_context_));
221
222 partition_service_.reset(
223 new spi::impl::ClientPartitionServiceImpl(client_context_));
224
225 invocation_service_.reset(
226 new spi::impl::ClientInvocationServiceImpl(client_context_));
227
228 listener_service_ = init_listener_service();
229
230 proxy_manager_.init();
231
232 lock_reference_id_generator_.reset(
233 new impl::ClientLockReferenceIdGenerator());
234
235 statistics_.reset(new statistics::Statistics(client_context_));
236}
237
238hazelcast_client_instance_impl::~hazelcast_client_instance_impl()
239{
240 shutdown();
241}
242
243void
244hazelcast_client_instance_impl::start()
245{
246 lifecycle_service_.fire_lifecycle_event(lifecycle_event::STARTING);
247
248 try {
249 if (!lifecycle_service_.start()) {
250 lifecycle_service_.shutdown();
251 BOOST_THROW_EXCEPTION(exception::illegal_state(
252 "hazelcast_client", "hazelcast_client could not be started!"));
253 }
254 } catch (std::exception&) {
255 lifecycle_service_.shutdown();
256 throw;
257 }
258}
259
261hazelcast_client_instance_impl::get_client_config()
262{
263 return client_config_;
264}
265
266cluster&
267hazelcast_client_instance_impl::get_cluster()
268{
269 return cluster_;
270}
271
272boost::uuids::uuid
273hazelcast_client_instance_impl::add_lifecycle_listener(
275{
276 return lifecycle_service_.add_listener(std::move(lifecycle_listener));
277}
278
279bool
280hazelcast_client_instance_impl::remove_lifecycle_listener(
281 const boost::uuids::uuid& registration_id)
282{
283 return lifecycle_service_.remove_listener(registration_id);
284}
285
286void
287hazelcast_client_instance_impl::shutdown()
288{
289 lifecycle_service_.shutdown();
290}
291
293hazelcast_client_instance_impl::new_transaction_context()
294{
295 transaction_options defaultOptions;
296 return new_transaction_context(defaultOptions);
297}
298
300hazelcast_client_instance_impl::new_transaction_context(
301 const transaction_options& options)
302{
303 return transaction_context(transaction_manager_, options);
304}
305
306internal::nearcache::NearCacheManager&
307hazelcast_client_instance_impl::get_near_cache_manager()
308{
309 return *near_cache_manager_;
310}
311
312serialization::pimpl::SerializationService&
313hazelcast_client_instance_impl::get_serialization_service()
314{
315 return serialization_service_;
316}
317
318const protocol::ClientExceptionFactory&
319hazelcast_client_instance_impl::get_exception_factory() const
320{
321 return exception_factory_;
322}
323
324std::shared_ptr<spi::impl::listener::listener_service_impl>
325hazelcast_client_instance_impl::init_listener_service()
326{
327 auto eventThreadCount = client_properties_.get_integer(
328 client_properties_.get_event_thread_count());
329 return std::make_shared<spi::impl::listener::listener_service_impl>(
330 client_context_, eventThreadCount);
331}
332
333std::shared_ptr<spi::impl::ClientExecutionServiceImpl>
334hazelcast_client_instance_impl::init_execution_service()
335{
336 return std::make_shared<spi::impl::ClientExecutionServiceImpl>(
337 instance_name_,
338 client_properties_,
339 client_config_.get_executor_pool_size(),
340 lifecycle_service_);
341}
342
343void
344hazelcast_client_instance_impl::on_cluster_restart()
345{
346 HZ_LOG(*logger_,
347 info,
348 "Clearing local state of the client, because of a cluster restart");
349
350 near_cache_manager_->clear_all_near_caches();
351 // clear the member list
352 cluster_service_.clear_member_list();
353}
354
355std::unique_ptr<connection::AddressProvider>
356hazelcast_client_instance_impl::create_address_provider()
357{
358 config::client_network_config& networkConfig =
359 get_client_config().get_network_config();
360 config::client_aws_config& awsConfig = networkConfig.get_aws_config();
361 config::cloud_config& cloudConfig = networkConfig.get_cloud_config();
362
363 auto addresses = networkConfig.get_addresses();
364 bool addressListProvided = !addresses.empty();
365 bool awsDiscoveryEnabled = awsConfig.is_enabled();
366 bool cloud_enabled = cloudConfig.enabled;
367
368 check_discovery_configuration_consistency(
369 addressListProvided, awsDiscoveryEnabled, cloud_enabled);
370
371 auto connect_timeout = networkConfig.get_connection_timeout();
372 if (cloud_enabled) {
373 auto cloud_provider =
374 std::make_shared<spi::impl::discovery::cloud_discovery>(
375 cloudConfig,
376 client_properties_.get_string(client_properties_.cloud_base_url()),
377 connect_timeout);
378 return std::unique_ptr<connection::AddressProvider>(
379 new spi::impl::discovery::remote_address_provider(
380 [=]() { return cloud_provider->get_addresses(); }, true));
381 }
382
383 if (awsDiscoveryEnabled) {
384 auto aws_addr_provider = std::make_shared<aws::aws_client>(
385 connect_timeout, awsConfig, client_properties_, *logger_);
386 return std::unique_ptr<connection::AddressProvider>(
387 new spi::impl::discovery::remote_address_provider(
388 [=]() { return aws_addr_provider->get_addresses(); },
389 !awsConfig.is_inside_aws()));
390 }
391
392 return std::unique_ptr<connection::AddressProvider>(
393 new spi::impl::DefaultAddressProvider(networkConfig));
394}
395
396const std::string&
397hazelcast_client_instance_impl::get_name() const
398{
399 return instance_name_;
400}
401
402spi::lifecycle_service&
403hazelcast_client_instance_impl::get_lifecycle_service()
404{
405 return lifecycle_service_;
406}
407
408const std::shared_ptr<ClientLockReferenceIdGenerator>&
409hazelcast_client_instance_impl::get_lock_reference_id_generator() const
410{
411 return lock_reference_id_generator_;
412}
413
414spi::ProxyManager&
415hazelcast_client_instance_impl::get_proxy_manager()
416{
417 return proxy_manager_;
418}
419
420void
421hazelcast_client_instance_impl::initalize_near_cache_manager()
422{
423 near_cache_manager_.reset(new internal::nearcache::NearCacheManager(
424 execution_service_, serialization_service_, *logger_));
425}
426
428hazelcast_client_instance_impl::get_local_endpoint() const
429{
430 return cluster_service_.get_local_client();
431}
432
433template<>
434boost::shared_future<std::shared_ptr<imap>>
435hazelcast_client_instance_impl::get_distributed_object(const std::string& name)
436{
437 auto nearCacheConfig = client_config_.get_near_cache_config(name);
438 if (nearCacheConfig) {
439 return proxy_manager_
440 .get_or_create_proxy<
441 map::NearCachedClientMapProxy<serialization::pimpl::data,
442 serialization::pimpl::data>>(
443 imap::SERVICE_NAME, name)
444 .then(
445 boost::launch::sync,
446 [=](
447 boost::shared_future<std::shared_ptr<
448 map::NearCachedClientMapProxy<serialization::pimpl::data,
449 serialization::pimpl::data>>> f) {
450 return std::static_pointer_cast<imap>(f.get());
451 });
452 } else {
453 return proxy_manager_.get_or_create_proxy<imap>(imap::SERVICE_NAME,
454 name);
455 }
456}
457
458const std::shared_ptr<logger>&
459hazelcast_client_instance_impl::get_logger() const
460{
461 return logger_;
462}
463
464boost::uuids::uuid
465hazelcast_client_instance_impl::random_uuid()
466{
467 std::lock_guard<std::mutex> g(uuid_generator_lock_);
468 return uuid_generator_();
469}
470
471cp::cp_subsystem&
472hazelcast_client_instance_impl::get_cp_subsystem()
473{
474 return cp_subsystem_;
475}
476
478hazelcast_client_instance_impl::get_sql()
479{
480 return sql_service_;
481}
482
483void
484hazelcast_client_instance_impl::send_state_to_cluster()
485{
486 schema_service_.replicate_all_schemas();
487}
488
489bool
490hazelcast_client_instance_impl::should_check_urgent_invocations() const
491{
492 return schema_service_.has_any_schemas();
493}
494
495void
496hazelcast_client_instance_impl::check_discovery_configuration_consistency(
497 bool address_list_provided,
498 bool aws_enabled,
499 bool cloud_enabled)
500{
501 int count = 0;
502 if (address_list_provided)
503 count++;
504 if (aws_enabled)
505 count++;
506 if (cloud_enabled)
507 count++;
508 if (count > 1) {
509 BOOST_THROW_EXCEPTION(exception::illegal_state(
510 "hazelcast_client_instance_impl::check_discovery_configuration_"
511 "consistency",
512 (boost::format(
513 "Only one discovery method can be enabled at a time. cluster "
514 "members given explicitly : %1%, aws discovery: %2%, "
515 "hazelcast.cloud enabled : %3%") %
516 address_list_provided % aws_enabled % cloud_enabled)
517 .str()));
518 }
519}
520
521BaseEventHandler::~BaseEventHandler() = default;
522
523BaseEventHandler::BaseEventHandler(logger &logger)
524 : logger_(logger)
525{}
526
527logger &
528BaseEventHandler::get_logger() const
529{
530 return logger_;
531}
532} // namespace impl
533
534constexpr int address::ID;
535
536address::address()
537 : host_("localhost")
538 , type_(IPV4)
539 , scope_id_(0)
540{}
541
542address::address(std::string url, int port)
543 : host_(std::move(url))
544 , port_(port)
545 , type_(IPV4)
546 , scope_id_(0)
547{}
548
549address::address(std::string hostname, int port, unsigned long scope_id)
550 : host_(std::move(hostname))
551 , port_(port)
552 , type_(IPV6)
553 , scope_id_(scope_id)
554{}
555
556bool
557address::operator==(const address& rhs) const
558{
559 return rhs.port_ == port_ && rhs.type_ == type_ &&
560 0 == rhs.host_.compare(host_);
561}
562
563bool
564address::operator!=(const address& rhs) const
565{
566 return !(*this == rhs);
567}
568
569int
571{
572 return port_;
573}
574
575const std::string&
577{
578 return host_;
579}
580
581bool
582address::operator<(const address& rhs) const
583{
584 if (host_ < rhs.host_) {
585 return true;
586 }
587 if (rhs.host_ < host_) {
588 return false;
589 }
590 if (port_ < rhs.port_) {
591 return true;
592 }
593 if (rhs.port_ < port_) {
594 return false;
595 }
596 return type_ < rhs.type_;
597}
598
599bool
601{
602 return type_ == IPV4;
603}
604
605unsigned long
606address::get_scope_id() const
607{
608 return scope_id_;
609}
610
611std::string
612address::to_string() const
613{
614 std::ostringstream out;
615 out << "Address[" << get_host() << ":" << get_port() << "]";
616 return out.str();
617}
618
619std::ostream&
620operator<<(std::ostream& stream, const address& address)
621{
622 return stream << address.to_string();
623}
624
625bool
626operator==(const big_decimal& lhs, const big_decimal& rhs)
627{
628 return lhs.unscaled == rhs.unscaled && lhs.scale == rhs.scale;
629}
630
631bool
632operator<(const big_decimal& lhs, const big_decimal& rhs)
633{
634 if (lhs.scale != rhs.scale) {
635 return lhs.scale < rhs.scale;
636 }
637 return lhs.unscaled < rhs.unscaled;
638}
639
640bool
641operator==(const local_time& lhs, const local_time& rhs)
642{
643 return lhs.hours == rhs.hours && lhs.minutes == rhs.minutes &&
644 lhs.seconds == rhs.seconds && lhs.nanos == rhs.nanos;
645}
646
651template<typename T>
652std::size_t
653hash_value(const T& v)
654{
655 return std::hash<T>()(v);
656}
657
658bool
659operator<(const local_time& lhs, const local_time& rhs)
660{
661 if (lhs.hours < rhs.hours) {
662 return true;
663 }
664 if (rhs.hours < lhs.hours) {
665 return false;
666 }
667 if (lhs.minutes < rhs.minutes) {
668 return true;
669 }
670 if (rhs.minutes < lhs.minutes) {
671 return false;
672 }
673 if (lhs.seconds < rhs.seconds) {
674 return true;
675 }
676 if (rhs.seconds < lhs.seconds) {
677 return false;
678 }
679 return lhs.nanos < rhs.nanos;
680}
681
682bool
683operator==(const local_date& lhs, const local_date& rhs)
684{
685 return lhs.year == rhs.year && lhs.month == rhs.month &&
686 lhs.day_of_month == rhs.day_of_month;
687}
688
689bool
690operator<(const local_date& lhs, const local_date& rhs)
691{
692 if (lhs.year < rhs.year) {
693 return true;
694 }
695 if (rhs.year < lhs.year) {
696 return false;
697 }
698 if (lhs.month < rhs.month) {
699 return true;
700 }
701 if (rhs.month < lhs.month) {
702 return false;
703 }
704 return lhs.day_of_month < rhs.day_of_month;
705}
706
707bool
708operator==(const local_date_time& lhs, const local_date_time& rhs)
709{
710 return lhs.date == rhs.date && lhs.time == rhs.time;
711}
712
713bool
714operator<(const local_date_time& lhs, const local_date_time& rhs)
715{
716 if (lhs.date < rhs.date) {
717 return true;
718 }
719 if (rhs.date < lhs.date) {
720 return false;
721 }
722 return lhs.time < rhs.time;
723}
724
725bool
726operator==(const offset_date_time& lhs, const offset_date_time& rhs)
727{
728 return lhs.date_time == rhs.date_time &&
729 lhs.zone_offset_in_seconds == rhs.zone_offset_in_seconds;
730}
731
732bool
733operator<(const offset_date_time& lhs, const offset_date_time& rhs)
734{
735 if (lhs.date_time < rhs.date_time) {
736 return true;
737 }
738 if (rhs.date_time < lhs.date_time) {
739 return false;
740 }
741 return lhs.zone_offset_in_seconds < rhs.zone_offset_in_seconds;
742}
743
744namespace pimpl {
745
746void
747twos_complement(std::vector<int8_t>& a)
748{
749 // twos complement is calculated via flipping the bits and adding 1
750 // flip the bits
751 for (auto& item : a) {
752 item = ~item;
753 }
754 // add 1
755 int8_t carry = 1;
756 for (int i = a.size() - 1; i >= 0; i--) {
757 a[i] = a[i] + carry;
758 if (a[i] == 0) {
759 carry = 1;
760 } else {
761 break;
762 }
763 }
764}
765
766boost::multiprecision::cpp_int
767from_bytes(std::vector<int8_t> v)
768{
769 boost::multiprecision::cpp_int i;
770 bool is_negative = v[0] < 0;
771 if (is_negative) {
772 twos_complement(v);
773 }
774 import_bits(i, v.begin(), v.end(), 8);
775 if (is_negative) {
776 return -i;
777 }
778 return i;
779}
780
781std::vector<int8_t>
782to_bytes(const boost::multiprecision::cpp_int& i)
783{
784 std::vector<int8_t> v;
785 export_bits(i, std::back_inserter(v), 8);
786 if (i < 0) {
787 twos_complement(v);
788 if (v[0] > 0) {
789 // add -1 as the most significant to have a negative sign bit
790 v.insert(v.begin(), -1);
791 }
792 } else {
793 // add 0 as the most significant byte to have a positive sign bit
794 if (v[0] < 0) {
795 v.insert(v.begin(), 0);
796 }
797 }
798 return v;
799}
800} // namespace pimpl
801
802namespace serialization {
803int32_t
805{
806 return F_ID;
807}
808
809int32_t
811{
812 return ADDRESS;
813}
814
815void
816hz_serializer<address>::write_data(const address& object,
818{
819 out.write<int32_t>(object.port_);
820 out.write<byte>(object.type_);
821 out.write(object.host_);
822}
823
824address
826{
827 address object;
828 object.port_ = in.read<int32_t>();
829 object.type_ = in.read<byte>();
830 object.host_ = in.read<std::string>();
831 return object;
832}
833} // namespace serialization
834
835iexecutor_service::iexecutor_service(const std::string& name,
836 spi::ClientContext* context)
837 : ProxyImpl(SERVICE_NAME, name, context)
838 , consecutive_submits_(0)
839 , last_submit_time_(0)
840{}
841
842std::vector<member>
843iexecutor_service::select_members(const member_selector& member_selector)
844{
845 std::vector<member> selected;
846 std::vector<member> members =
847 get_context().get_client_cluster_service().get_member_list();
848 for (const member& member : members) {
849 if (member_selector.select(member)) {
850 selected.push_back(member);
851 }
852 }
853 if (selected.empty()) {
854 BOOST_THROW_EXCEPTION(exception::rejected_execution(
855 "IExecutorService::selectMembers",
856 "No member could be selected with member selector"));
857 }
858 return selected;
859}
860
861std::pair<boost::future<protocol::ClientMessage>,
862 std::shared_ptr<spi::impl::ClientInvocation>>
863iexecutor_service::invoke_on_target(protocol::ClientMessage&& request,
864 boost::uuids::uuid target)
865{
866 try {
867 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
868 spi::impl::ClientInvocation::create(
869 get_context(), request, get_name(), target);
870 return std::make_pair(clientInvocation->invoke(), clientInvocation);
871 } catch (exception::iexception&) {
872 util::exception_util::rethrow(std::current_exception());
873 }
874 return std::pair<boost::future<protocol::ClientMessage>,
875 std::shared_ptr<spi::impl::ClientInvocation>>();
876}
877
878std::pair<boost::future<protocol::ClientMessage>,
879 std::shared_ptr<spi::impl::ClientInvocation>>
880iexecutor_service::invoke_on_partition_owner(protocol::ClientMessage&& request,
881 int partition_id)
882{
883 try {
884 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
885 spi::impl::ClientInvocation::create(
886 get_context(), request, get_name(), partition_id);
887 return std::make_pair(clientInvocation->invoke(), clientInvocation);
888 } catch (exception::iexception&) {
889 util::exception_util::rethrow(std::current_exception());
890 }
891 return std::pair<boost::future<protocol::ClientMessage>,
892 std::shared_ptr<spi::impl::ClientInvocation>>();
893}
894
895bool
896iexecutor_service::is_sync_computation(bool prevent_sync)
897{
898 int64_t now = util::current_time_millis();
899
900 int64_t last = last_submit_time_;
901 last_submit_time_ = now;
902
903 if (last + MIN_TIME_RESOLUTION_OF_CONSECUTIVE_SUBMITS < now) {
904 consecutive_submits_ = 0;
905 return false;
906 }
907
908 return !prevent_sync &&
909 (consecutive_submits_++ % MAX_CONSECUTIVE_SUBMITS == 0);
910}
911
913iexecutor_service::get_member_address(const member& member)
914{
915 auto m =
916 get_context().get_client_cluster_service().get_member(member.get_uuid());
917 if (!m) {
918 throw(exception::exception_builder<exception::hazelcast_>(
919 "IExecutorService::getMemberAddress(Member)")
920 << member << " is not available!")
921 .build();
922 }
923 return m->get_address();
924}
925
926int
927iexecutor_service::random_partition_id()
928{
929 auto& partitionService = get_context().get_partition_service();
930 return rand() % partitionService.get_partition_count();
931}
932
933void
935{
936 auto request = protocol::codec::executorservice_shutdown_encode(get_name());
937 invoke(request);
938}
939
940boost::future<bool>
942{
943 auto request =
944 protocol::codec::executorservice_isshutdown_encode(get_name());
945 return invoke_and_get_future<bool>(request);
946}
947
948boost::future<bool>
953
955 "hazelcast_client_heartbeat_timeout";
956const std::string client_properties::PROP_HEARTBEAT_TIMEOUT_DEFAULT = "60000";
958 "hazelcast_client_heartbeat_interval";
959const std::string client_properties::PROP_HEARTBEAT_INTERVAL_DEFAULT = "5000";
961 "hazelcast_client_request_retry_count";
962const std::string client_properties::PROP_REQUEST_RETRY_COUNT_DEFAULT = "20";
964 "hazelcast_client_request_retry_wait_time";
965const std::string client_properties::PROP_REQUEST_RETRY_WAIT_TIME_DEFAULT = "1";
966
967const std::string client_properties::PROP_AWS_MEMBER_PORT = "hz-port";
968const std::string client_properties::PROP_AWS_MEMBER_PORT_DEFAULT = "5701";
969
971 "hazelcast.client.invocation.retry.pause.millis";
972const std::string client_properties::INVOCATION_RETRY_PAUSE_MILLIS_DEFAULT =
973 "1000";
974
976 "hazelcast.client.invocation.timeout.seconds";
977const std::string client_properties::INVOCATION_TIMEOUT_SECONDS_DEFAULT = "120";
978
980 "hazelcast.client.event.thread.count";
981const std::string client_properties::EVENT_THREAD_COUNT_DEFAULT = "5";
982
983const std::string client_properties::INTERNAL_EXECUTOR_POOL_SIZE =
984 "hazelcast.client.internal.executor.pool.size";
985const std::string client_properties::INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT = "3";
986
988 "hazelcast.client.shuffle.member.list";
989const std::string client_properties::SHUFFLE_MEMBER_LIST_DEFAULT = "true";
990
992 "hazelcast.client.max.concurrent.invocations";
993const std::string client_properties::MAX_CONCURRENT_INVOCATIONS_DEFAULT =
994 util::IOUtil::to_string<int32_t>(INT32_MAX);
995
997 "hazelcast.client.invocation.backoff.timeout.millis";
998const std::string
999 client_properties::BACKPRESSURE_BACKOFF_TIMEOUT_MILLIS_DEFAULT = "-1";
1000
1001const std::string client_properties::STATISTICS_ENABLED =
1002 "hazelcast.client.statistics.enabled";
1003const std::string client_properties::STATISTICS_ENABLED_DEFAULT = "false";
1004
1006 "hazelcast.client.statistics.period.seconds";
1007const std::string client_properties::STATISTICS_PERIOD_SECONDS_DEFAULT = "3";
1008
1009client_property::client_property(const std::string& name,
1010 const std::string& default_value)
1011 : name_(name)
1012 , default_value_(default_value)
1013{}
1014
1015const std::string&
1016client_property::get_name() const
1017{
1018 return name_;
1019}
1020
1021const std::string&
1022client_property::get_default_value() const
1023{
1024 return default_value_;
1025}
1026
1027const char*
1029{
1030#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
1031#pragma warning(push)
1032#pragma warning( \
1033 disable : 4996) // for 'getenv': This function or variable may be unsafe.
1034#endif
1035 return std::getenv(name_.c_str());
1036#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
1037#pragma warning(pop)
1038#endif
1039}
1040
1041client_properties::client_properties(
1042 const std::unordered_map<std::string, std::string>& properties)
1043 : heartbeat_timeout_(PROP_HEARTBEAT_TIMEOUT, PROP_HEARTBEAT_TIMEOUT_DEFAULT)
1044 , heartbeat_interval_(PROP_HEARTBEAT_INTERVAL,
1045 PROP_HEARTBEAT_INTERVAL_DEFAULT)
1046 , retry_count_(PROP_REQUEST_RETRY_COUNT, PROP_REQUEST_RETRY_COUNT_DEFAULT)
1047 , retry_wait_time_(PROP_REQUEST_RETRY_WAIT_TIME,
1048 PROP_REQUEST_RETRY_WAIT_TIME_DEFAULT)
1049 , aws_member_port_(PROP_AWS_MEMBER_PORT, PROP_AWS_MEMBER_PORT_DEFAULT)
1050 , invocation_retry_pause_millis_(INVOCATION_RETRY_PAUSE_MILLIS,
1051 INVOCATION_RETRY_PAUSE_MILLIS_DEFAULT)
1052 , invocation_timeout_seconds_(INVOCATION_TIMEOUT_SECONDS,
1053 INVOCATION_TIMEOUT_SECONDS_DEFAULT)
1054 , event_thread_count_(EVENT_THREAD_COUNT, EVENT_THREAD_COUNT_DEFAULT)
1055 , internal_executor_pool_size_(INTERNAL_EXECUTOR_POOL_SIZE,
1056 INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT)
1057 , shuffle_member_list_(SHUFFLE_MEMBER_LIST, SHUFFLE_MEMBER_LIST_DEFAULT)
1058 , max_concurrent_invocations_(MAX_CONCURRENT_INVOCATIONS,
1059 MAX_CONCURRENT_INVOCATIONS_DEFAULT)
1060 , backpressure_backoff_timeout_millis_(
1061 BACKPRESSURE_BACKOFF_TIMEOUT_MILLIS,
1062 BACKPRESSURE_BACKOFF_TIMEOUT_MILLIS_DEFAULT)
1063 , statistics_enabled_(STATISTICS_ENABLED, STATISTICS_ENABLED_DEFAULT)
1064 , statistics_period_seconds_(STATISTICS_PERIOD_SECONDS,
1065 STATISTICS_PERIOD_SECONDS_DEFAULT)
1066 , backup_timeout_millis_(OPERATION_BACKUP_TIMEOUT_MILLIS,
1067 OPERATION_BACKUP_TIMEOUT_MILLIS_DEFAULT)
1068 , fail_on_indeterminate_state_(FAIL_ON_INDETERMINATE_OPERATION_STATE,
1069 FAIL_ON_INDETERMINATE_OPERATION_STATE_DEFAULT)
1070 , cloud_base_url_(CLOUD_URL_BASE, CLOUD_URL_BASE_DEFAULT)
1071 , partition_arg_cache_size_(PARTITION_ARGUMENT_CACHE_SIZE,
1072 PARTITION_ARGUMENT_CACHE_SIZE_DEFAULT)
1073 , properties_map_(properties)
1074{}
1075
1076const client_property&
1077client_properties::get_heartbeat_timeout() const
1078{
1079 return heartbeat_timeout_;
1080}
1081
1082const client_property&
1083client_properties::get_heartbeat_interval() const
1084{
1085 return heartbeat_interval_;
1086}
1087
1088const client_property&
1089client_properties::get_aws_member_port() const
1090{
1091 return aws_member_port_;
1092}
1093
1094const client_property&
1095client_properties::get_invocation_retry_pause_millis() const
1096{
1097 return invocation_retry_pause_millis_;
1098}
1099
1100const client_property&
1101client_properties::get_invocation_timeout_seconds() const
1102{
1103 return invocation_timeout_seconds_;
1104}
1105
1106const client_property&
1107client_properties::get_event_thread_count() const
1108{
1109 return event_thread_count_;
1110}
1111
1112const client_property&
1113client_properties::get_internal_executor_pool_size() const
1114{
1115 return internal_executor_pool_size_;
1116}
1117
1118const client_property&
1119client_properties::get_shuffle_member_list() const
1120{
1121 return shuffle_member_list_;
1122}
1123
1124const client_property&
1125client_properties::get_max_concurrent_invocations() const
1126{
1127 return max_concurrent_invocations_;
1128}
1129
1130const client_property&
1131client_properties::get_backpressure_backoff_timeout_millis() const
1132{
1133 return backpressure_backoff_timeout_millis_;
1134}
1135
1136const client_property&
1137client_properties::get_statistics_enabled() const
1138{
1139 return statistics_enabled_;
1140}
1141
1142const client_property&
1143client_properties::get_statistics_period_seconds() const
1144{
1145 return statistics_period_seconds_;
1146}
1147
1148std::string
1150{
1151 std::unordered_map<std::string, std::string>::const_iterator valueIt =
1152 properties_map_.find(property.get_name());
1153 if (valueIt != properties_map_.end()) {
1154 return valueIt->second;
1155 }
1156
1157 const char* value = property.get_system_property();
1158 if (value != NULL) {
1159 return value;
1160 }
1161
1162 return property.get_default_value();
1163}
1164
1165bool
1167{
1168 return util::IOUtil::to_value<bool>(get_string(property));
1169}
1170
1171int32_t
1173{
1174 return util::IOUtil::to_value<int32_t>(get_string(property));
1175}
1176
1177int64_t
1179{
1180 return util::IOUtil::to_value<int64_t>(get_string(property));
1181}
1182
1183const client_property&
1184client_properties::backup_timeout_millis() const
1185{
1186 return backup_timeout_millis_;
1187}
1188
1189const client_property&
1190client_properties::fail_on_indeterminate_state() const
1191{
1192 return fail_on_indeterminate_state_;
1193}
1194
1195const client_property&
1196client_properties::cloud_base_url() const
1197{
1198 return cloud_base_url_;
1199}
1200
1201const client_property&
1202client_properties::partition_arg_cache_size() const
1203{
1204 return partition_arg_cache_size_;
1205}
1206
1207namespace exception {
1208iexception::iexception(std::string exception_name,
1209 std::string source,
1210 std::string message,
1211 std::string details,
1212 int32_t error_no,
1213 std::exception_ptr cause,
1214 bool is_runtime,
1215 bool retryable)
1216 : src_(std::move(source))
1217 , msg_(std::move(message))
1218 , details_(std::move(details))
1219 , error_code_(error_no)
1220 , cause_(std::move(cause))
1221 , runtime_exception_(is_runtime)
1222 , retryable_(retryable)
1223 , report_((boost::format("%1% {%2%. Error code:%3%, Details:%4%.} at %5%.") %
1224 exception_name % msg_ % error_code_ % details_ % src_)
1225 .str())
1226{}
1227
1228iexception::~iexception() noexcept = default;
1229
1230char const*
1231iexception::what() const noexcept
1232{
1233 return report_.c_str();
1234}
1235
1236const std::string&
1237iexception::get_source() const
1238{
1239 return src_;
1240}
1241
1242const std::string&
1243iexception::get_message() const
1244{
1245 return msg_;
1246}
1247
1248std::ostream&
1249operator<<(std::ostream& os, const iexception& exception)
1250{
1251 os << exception.what();
1252 return os;
1253}
1254
1255const std::string&
1256iexception::get_details() const
1257{
1258 return details_;
1259}
1260
1261int32_t
1262iexception::get_error_code() const
1263{
1264 return error_code_;
1265}
1266
1267bool
1268iexception::is_runtime() const
1269{
1270 return runtime_exception_;
1271}
1272
1273bool
1274iexception::is_retryable() const
1275{
1276 return retryable_;
1277}
1278
1279iexception::iexception() = default;
1280
1281retryable_hazelcast::retryable_hazelcast(std::string source,
1282 std::string message,
1283 std::string details,
1284 std::exception_ptr cause)
1285 : retryable_hazelcast("retryable_hazelcast",
1286 protocol::RETRYABLE_HAZELCAST,
1287 std::move(source),
1288 std::move(message),
1289 std::move(details),
1290 std::move(cause),
1291 true,
1292 true)
1293{}
1294
1295retryable_hazelcast::retryable_hazelcast(std::string error_name,
1296 int32_t error_code,
1297 std::string source,
1298 std::string message,
1299 std::string details,
1300 std::exception_ptr cause,
1301 bool runtime,
1302 bool retryable)
1303 : hazelcast_(std::move(error_name),
1304 error_code,
1305 std::move(source),
1306 std::move(message),
1307 std::move(details),
1308 std::move(cause),
1309 runtime,
1310 retryable)
1311{}
1312
1313member_left::member_left(std::string source,
1314 std::string message,
1315 std::string details,
1316 std::exception_ptr cause)
1317 : execution("member_left",
1318 protocol::MEMBER_LEFT,
1319 std::move(source),
1320 std::move(message),
1321 std::move(details),
1322 std::move(cause),
1323 false,
1324 true)
1325{}
1326
1327consistency_lost::consistency_lost(std::string source,
1328 std::string message,
1329 std::string details,
1330 std::exception_ptr cause)
1331 : hazelcast_("consistency_lost",
1332 protocol::CONSISTENCY_LOST_EXCEPTION,
1333 std::move(source),
1334 std::move(message),
1335 std::move(details),
1336 std::move(cause),
1337 true,
1338 false)
1339{}
1340
1341query::query(std::string source,
1342 std::string message,
1343 std::string details,
1344 std::exception_ptr cause)
1345 : hazelcast_(std::move(source),
1346 std::move(message),
1347 std::move(details),
1348 std::move(cause),
1349 false)
1350{
1351}
1352
1353query::query(int32_t code,
1354 std::string message,
1355 std::exception_ptr cause,
1356 boost::uuids::uuid originating_member_id,
1357 std::string suggestion)
1358 : hazelcast_("", std::move(message), "", std::move(cause))
1359 , code_(code)
1360 , suggestion_(std::move(suggestion))
1361 , originating_member_uuid_(originating_member_id)
1362{
1363}
1364
1365int32_t
1366query::code() const
1367{
1368 return code_;
1369}
1370
1371const std::string&
1372query::suggestion() const
1373{
1374 return suggestion_;
1375}
1376
1377const boost::uuids::uuid&
1378query::originating_member_uuid() const
1379{
1380 return originating_member_uuid_;
1381}
1382
1383invocation_might_contain_compact_data::invocation_might_contain_compact_data(
1384 std::string source,
1385 const spi::impl::ClientInvocation& invocation)
1386 : hazelcast_{
1387 std::move(source),
1388 boost::str(
1389 boost::format(
1390 "The invocation %1% might contain Compact serialized "
1391 "data and it is not safe to invoke it when the client is not "
1392 "yet initialized on the cluster") %
1393 invocation)
1394 }
1395{
1396}
1397
1398} // namespace exception
1399} // namespace client
1400
1401boost::future<client::hazelcast_client>
1402new_client()
1403{
1404 return boost::async([]() { return client::hazelcast_client(); });
1405}
1406
1407boost::future<client::hazelcast_client>
1408new_client(client::client_config config)
1409{
1410 return boost::async(
1411 [](client::client_config&& c) {
1412 return client::hazelcast_client(std::move(c));
1413 },
1414 std::move(config));
1415}
1416} // namespace hazelcast
1417
1418namespace std {
1419std::size_t
1420hash<hazelcast::client::address>::operator()(
1421 const hazelcast::client::address& address) const noexcept
1422{
1423 std::size_t seed = 0;
1424 boost::hash_combine(seed, address.get_host());
1425 boost::hash_combine(seed, address.get_port());
1426 boost::hash_combine(seed, address.type_);
1427 return seed;
1428}
1429
1430std::size_t
1431hash<hazelcast::client::big_decimal>::operator()(
1432 const hazelcast::client::big_decimal& dec) const
1433{
1434 std::size_t seed = 0;
1435 boost::hash_combine(seed, dec.unscaled);
1436 boost::hash_combine(seed, dec.scale);
1437 return seed;
1438}
1439
1440std::size_t
1441hash<hazelcast::client::local_time>::operator()(
1442 const hazelcast::client::local_time& v) const
1443{
1444 std::size_t seed = 0;
1445 boost::hash_combine(seed, v.hours);
1446 boost::hash_combine(seed, v.minutes);
1447 boost::hash_combine(seed, v.seconds);
1448 boost::hash_combine(seed, v.nanos);
1449 return seed;
1450}
1451
1452std::size_t
1453hash<hazelcast::client::local_date>::operator()(
1454 const hazelcast::client::local_date& v) const
1455{
1456 std::size_t seed = 0;
1457 boost::hash_combine(seed, v.year);
1458 boost::hash_combine(seed, v.month);
1459 boost::hash_combine(seed, v.day_of_month);
1460 return seed;
1461}
1462
1463std::size_t
1464hash<hazelcast::client::local_date_time>::operator()(
1465 const hazelcast::client::local_date_time& v) const
1466{
1467 std::size_t seed = 0;
1468 boost::hash_combine<hazelcast::client::local_date>(seed, v.date);
1469 boost::hash_combine<hazelcast::client::local_time>(seed, v.time);
1470 return seed;
1471}
1472
1473std::size_t
1474hash<hazelcast::client::offset_date_time>::operator()(
1475 const hazelcast::client::offset_date_time& v) const
1476{
1477 std::size_t seed = 0;
1478 boost::hash_combine<hazelcast::client::local_date_time>(seed, v.date_time);
1479 boost::hash_combine(seed, v.zone_offset_in_seconds);
1480 return seed;
1481}
1482} // namespace std
Represents an address of a client or member in the cluster.
Definition address.h:37
const std::string & get_host() const
bool operator==(const address &address) const
bool operator!=(const address &address) const
hazelcast_client configuration class.
static const std::string PROP_HEARTBEAT_INTERVAL
Time interval in milliseconds between the heartbeats sent by the client to the nodes.
static const std::string PROP_REQUEST_RETRY_COUNT
Client will retry requests which either inherently retryable(idempotentclient) or client_network_conf...
static const std::string PROP_REQUEST_RETRY_WAIT_TIME
Client will retry requests which either inherently retryable(idempotentclient) or client_network_conf...
static const std::string MAX_CONCURRENT_INVOCATIONS
The maximum number of concurrent invocations allowed.
static const std::string SHUFFLE_MEMBER_LIST
Client shuffles the given member list to prevent all clients to connect to the same node when this pr...
static const std::string STATISTICS_PERIOD_SECONDS
The period in seconds the statistics sent to the cluster.
static const std::string EVENT_THREAD_COUNT
Number of the threads to handle the incoming event packets.
static const std::string STATISTICS_ENABLED
Use to enable the client statistics collection.
static const std::string INVOCATION_RETRY_PAUSE_MILLIS
Pause time between each retry cycle of an invocation in milliseconds.
std::string get_string(const client_property &property) const
Returns the configured value of a ClientProperty as std::string.
int32_t get_integer(const client_property &property) const
Returns the configured int32_t value of a ClientProperty.
static const std::string PROP_AWS_MEMBER_PORT
The discovery mechanism will discover only IP addresses.
static const std::string PROP_HEARTBEAT_TIMEOUT
Client will be sending heartbeat messages to members and this is the timeout.
static const std::string INVOCATION_TIMEOUT_SECONDS
When an invocation gets an exception because :
bool get_boolean(const client_property &property) const
Returns the configured boolean value of a ClientProperty.
int64_t get_long(const client_property &property) const
Returns the configured int64_t value of a ClientProperty.
static const std::string BACKPRESSURE_BACKOFF_TIMEOUT_MILLIS
Control the maximum timeout in millis to wait for an invocation space to be available.
A client property is a configuration for hazelcast client.
const char * get_system_property() const
Gets the system environment property value of the property.
Hazelcast cluster interface.
Definition cluster.h:37
The client_aws_config contains the configuration for client to connect to nodes in aws environment.
Contains configuration parameters for client network related behaviour.
char const * what() const noexcept override
return pointer to the explanation string.
cp::cp_subsystem & get_cp_subsystem()
boost::future< void > shutdown()
Shuts down this hazelcast_client.
bool remove_lifecycle_listener(const boost::uuids::uuid &registration_id)
Remove lifecycle listener.
cluster & get_cluster()
Returns the Cluster that connected Hazelcast instance is a part of.
local_endpoint get_local_endpoint() const
Returns the local endpoint which this HazelcastInstance belongs to.
transaction_context new_transaction_context()
Creates a new transaction_context associated with the current thread using default options.
boost::uuids::uuid add_lifecycle_listener(lifecycle_listener &&lifecycle_listener)
Add listener to listen lifecycle events.
const std::string & get_name() const
Returns the name of this client instance.
spi::lifecycle_service & get_lifecycle_service()
Returns the lifecycle service for this instance.
boost::future< bool > is_shutdown()
Returns true if this executor has been shut down.
boost::future< bool > is_terminated()
Returns true if all tasks have completed following shut down.
void shutdown()
Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will...
Concurrent, distributed, observable and queryable map client.
Definition imap.h:63
Listener object for listening lifecycle events of hazelcast instance.
The Client interface allows to get information about a connected client's socket address,...
hz_cluster member class.
Definition member.h:62
A service to execute SQL statements.
Definition sql_service.h:90
Provides a context to do transactional operations; so beginning/committing transactions,...
Contains the configuration for a Hazelcast transaction.
CP Subsystem is a component of Hazelcast that builds a strongly consistent layer for a set of distrib...
Definition cp.h:1408
STL namespace.
An arbitrary precision and scale floating point number.
Definition big_decimal.h:44
hazelcast.cloud configuration to let the client connect the cluster via hazelcast....
A date-time without a time-zone in the ISO-8601 calendar system, such as 2007-12-03T10:15:30.
A date without a time-zone in the ISO-8601 calendar system, such as 2007-12-03.
Definition local_date.h:53
uint8_t day_of_month
minimum value is 1 maximum value is 31
Definition local_date.h:68
int32_t year
minimum value is -999999999 maximum value is 999999999
Definition local_date.h:58
uint8_t month
minimum value is 1 maximum value is 12
Definition local_date.h:63
A time without a time-zone in the ISO-8601 calendar system, such as 10:15:30.
Definition local_time.h:53
uint8_t hours
the hour-of-day to represent, from 0 to 23
Definition local_time.h:57
uint8_t seconds
the second-of-minute to represent, from 0 to 59
Definition local_time.h:65
int32_t nanos
the nanosecond-of-second to represent, from 0 to 999,999,999
Definition local_time.h:69
uint8_t minutes
the minute-of-hour to represent, from 0 to 59
Definition local_time.h:61
A date-time with an offset from UTC/Greenwich in the ISO-8601 calendar system, such as 2007-12-03T10:...
int32_t zone_offset_in_seconds
The offset from UTC/Greenwich.
local_date_time date_time
The local date-time.