18 #include <boost/algorithm/string.hpp>
20 #include "hazelcast/cp/cp.h"
21 #include "hazelcast/client/spi/ClientContext.h"
22 #include "hazelcast/util/Preconditions.h"
23 #include "hazelcast/client/protocol/codec/codecs.h"
24 #include "hazelcast/client/protocol/ClientMessage.h"
25 #include "hazelcast/client/spi/impl/ClientInvocation.h"
26 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
30 using namespace hazelcast::client::protocol;
31 using namespace hazelcast::client::protocol::codec;
32 using namespace hazelcast::util;
34 raft_proxy_factory::raft_proxy_factory(client::spi::ClientContext& context)
38 std::shared_ptr<fenced_lock>
39 raft_proxy_factory::create_fenced_lock(raft_group_id&& group_id,
40 const std::string& proxy_name,
41 const std::string& object_name)
44 auto proxy = lock_proxies_.get(proxy_name);
46 if (proxy->get_group_id() != group_id) {
47 lock_proxies_.remove(proxy_name, proxy);
53 proxy = std::make_shared<fenced_lock>(
54 proxy_name, context_, group_id, object_name);
55 auto existing = lock_proxies_.put_if_absent(proxy_name, proxy);
58 }
else if (existing->get_group_id() == group_id) {
62 group_id = get_group_id(proxy_name, object_name).get();
66 std::shared_ptr<counting_semaphore>
67 raft_proxy_factory::create_semaphore(raft_group_id&& group_id,
68 const std::string& proxy_name,
69 const std::string& object_name)
72 client::protocol::codec::semaphore_getsemaphoretype_encode(proxy_name);
73 auto is_sessionless = client::spi::impl::ClientInvocation::create(
74 context_, request, object_name)
77 .get_first_fixed_sized_field<
bool>();
79 return std::shared_ptr<counting_semaphore>(
80 new sessionless_semaphore(proxy_name,
84 context_.get_proxy_session_manager()));
86 return std::shared_ptr<counting_semaphore>(
87 new session_semaphore(proxy_name,
91 context_.get_proxy_session_manager()));
96 raft_proxy_factory::without_default_group_name(
const std::string& n)
100 auto index = name.find(
'@');
101 if (index == std::string::npos) {
105 Preconditions::check_true(
106 name.find(
'@', index + 1) == std::string::npos,
107 "Custom group name must be specified at most once");
109 auto group_name = name.substr(index + 1);
110 boost::trim(group_name);
111 if (group_name == DEFAULT_GROUP_NAME) {
112 return name.substr(0, index);
118 raft_proxy_factory::object_name_for_proxy(
const std::string& name)
120 auto index = name.find(
'@');
121 if (index == std::string::npos) {
125 Preconditions::check_true(index < (name.size() - 1),
126 "Object name cannot be empty string");
127 Preconditions::check_true(
128 name.find(
'@', index + 1) == std::string::npos,
129 "Custom CP group name must be specified at most once");
131 auto object_name = name.substr(0, index);
132 boost::trim(object_name);
133 Preconditions::check_true(object_name.size() > 0,
134 "Object name cannot be empty string");
138 boost::future<raft_group_id>
139 raft_proxy_factory::get_group_id(
const std::string& proxy_name,
140 const std::string& object_name)
142 auto request = cpgroup_createcpgroup_encode(proxy_name);
143 return client::spi::impl::ClientInvocation::create(
144 context_, request, object_name)
146 .then(boost::launch::sync,
147 [](boost::future<client::protocol::ClientMessage> f) {
148 return *f.get().get_first_var_sized_field<raft_group_id>();
152 cp_subsystem::cp_subsystem(client::spi::ClientContext& context)
154 , proxy_factory_(context)
157 boost::future<std::shared_ptr<atomic_long>>
158 cp_subsystem::get_atomic_long(
const std::string& name)
160 return proxy_factory_.create_proxy<
atomic_long>(name);
163 boost::future<std::shared_ptr<atomic_reference>>
164 cp_subsystem::get_atomic_reference(
const std::string& name)
166 return proxy_factory_.create_proxy<atomic_reference>(name);
169 boost::future<std::shared_ptr<latch>>
170 cp_subsystem::get_latch(
const std::string& name)
172 return proxy_factory_.create_proxy<latch>(name);
175 boost::future<std::shared_ptr<fenced_lock>>
176 cp_subsystem::get_lock(
const std::string& name)
178 return proxy_factory_.create_proxy<fenced_lock>(name);
181 boost::future<std::shared_ptr<counting_semaphore>>
182 cp_subsystem::get_semaphore(
const std::string& name)
184 return proxy_factory_.create_proxy<counting_semaphore>(name);
187 cp_proxy::cp_proxy(
const std::string& service_name,
188 const std::string& proxy_name,
189 client::spi::ClientContext* context,
191 const std::string& object_name)
192 : ProxyImpl(service_name, proxy_name, context)
193 , group_id_(group_id)
194 , object_name_(object_name)
198 cp_proxy::on_destroy()
200 auto request = cpgroup_destroycpobject_encode(
201 group_id_, get_service_name(), object_name_);
202 invoke(request).get();
206 cp_proxy::get_group_id()
const
211 atomic_long::atomic_long(
const std::string& name,
212 client::spi::ClientContext& context,
213 const raft_group_id& group_id,
214 const std::string& object_name)
215 : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name)
218 boost::future<int64_t>
221 auto request = atomiclong_addandget_encode(group_id_, object_name_, delta);
222 return invoke_and_get_future<int64_t>(request);
229 atomiclong_compareandset_encode(group_id_, object_name_, expect, update);
230 return invoke_and_get_future<bool>(request);
233 boost::future<int64_t>
239 boost::future<int64_t>
245 boost::future<int64_t>
248 auto request = atomiclong_get_encode(group_id_, object_name_);
249 return invoke_and_get_future<int64_t>(request);
252 boost::future<int64_t>
255 auto request = atomiclong_getandadd_encode(group_id_, object_name_, delta);
256 return invoke_and_get_future<int64_t>(request);
259 boost::future<int64_t>
263 atomiclong_getandset_encode(group_id_, object_name_, new_value);
264 return invoke_and_get_future<int64_t>(request);
267 boost::future<int64_t>
273 boost::future<int64_t>
285 boost::future<int64_t>
286 atomic_long::alter_data(client::serialization::pimpl::data& function_data,
287 alter_result_type result_type)
289 auto request = atomiclong_alter_encode(group_id_,
292 static_cast<int32_t
>(result_type));
293 return invoke_and_get_future<int64_t>(request);
296 boost::future<boost::optional<client::serialization::pimpl::data>>
297 atomic_long::apply_data(client::serialization::pimpl::data& function_data)
300 atomiclong_apply_encode(group_id_, object_name_, function_data);
301 return invoke_and_get_future<
302 boost::optional<client::serialization::pimpl::data>>(request);
305 atomic_reference::atomic_reference(
const std::string& name,
306 client::spi::ClientContext& context,
307 const raft_group_id& group_id,
308 const std::string& object_name)
309 : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name)
312 boost::future<boost::optional<client::serialization::pimpl::data>>
313 atomic_reference::get_data()
315 auto request = atomicref_get_encode(group_id_, object_name_);
316 return invoke_and_get_future<
317 boost::optional<client::serialization::pimpl::data>>(request);
320 boost::future<boost::optional<client::serialization::pimpl::data>>
321 atomic_reference::set_data(
322 const client::serialization::pimpl::data& new_value_data)
325 atomicref_set_encode(group_id_, object_name_, &new_value_data,
false);
326 return invoke_and_get_future<
327 boost::optional<client::serialization::pimpl::data>>(request);
330 boost::future<boost::optional<client::serialization::pimpl::data>>
331 atomic_reference::get_and_set_data(
332 const client::serialization::pimpl::data& new_value_data)
335 atomicref_set_encode(group_id_, object_name_, &new_value_data,
true);
336 return invoke_and_get_future<
337 boost::optional<client::serialization::pimpl::data>>(request);
341 atomic_reference::compare_and_set_data(
342 const client::serialization::pimpl::data& expect_data,
343 const client::serialization::pimpl::data& update_data)
345 auto request = atomicref_compareandset_encode(
346 group_id_, object_name_, &expect_data, &update_data);
347 return invoke_and_get_future<bool>(request);
351 atomic_reference::contains_data(
352 const client::serialization::pimpl::data& value_data)
355 atomicref_contains_encode(group_id_, object_name_, &value_data);
356 return invoke_and_get_future<bool>(request);
360 atomic_reference::alter_data(
361 const client::serialization::pimpl::data& function_data)
363 return to_void_future(
364 invoke_apply(function_data, return_value_type::NO_VALUE,
true));
367 boost::future<boost::optional<client::serialization::pimpl::data>>
368 atomic_reference::alter_and_get_data(
369 const client::serialization::pimpl::data& function_data)
371 return invoke_apply(function_data, return_value_type::NEW,
true);
374 boost::future<boost::optional<client::serialization::pimpl::data>>
375 atomic_reference::get_and_alter_data(
376 const client::serialization::pimpl::data& function_data)
378 return invoke_apply(function_data, return_value_type::OLD,
true);
381 boost::future<boost::optional<client::serialization::pimpl::data>>
382 atomic_reference::apply_data(
383 const client::serialization::pimpl::data& function_data)
385 return invoke_apply(function_data, return_value_type::NEW,
false);
389 atomic_reference::is_null()
391 return contains(
static_cast<byte*
>(
nullptr));
395 atomic_reference::clear()
397 return to_void_future(set(
static_cast<byte*
>(
nullptr)));
400 boost::future<boost::optional<client::serialization::pimpl::data>>
401 atomic_reference::invoke_apply(
402 const client::serialization::pimpl::data function_data,
403 return_value_type return_type,
406 auto request = atomicref_apply_encode(group_id_,
409 static_cast<int32_t
>(return_type),
411 return invoke_and_get_future<
412 boost::optional<client::serialization::pimpl::data>>(request);
415 latch::latch(
const std::string& name,
416 client::spi::ClientContext& context,
417 const raft_group_id& group_id,
418 const std::string& object_name)
419 : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name)
423 latch::try_set_count(int32_t count)
425 util::Preconditions::check_positive(count,
"count must be positive!");
428 countdownlatch_trysetcount_encode(group_id_, object_name_, count);
429 return invoke_and_get_future<bool>(request);
432 boost::future<int32_t>
435 auto request = countdownlatch_getcount_encode(group_id_, object_name_);
436 return invoke_and_get_future<int32_t>(request);
442 auto invocation_uid =
443 get_context().get_hazelcast_client_implementation()->random_uuid();
444 return get_round().then(boost::launch::sync, [=](boost::future<int32_t> f) {
445 auto round = f.get();
448 count_down(round, invocation_uid);
450 }
catch (client::exception::operation_timeout&) {
460 return get_count().then(boost::launch::sync, [](boost::future<int32_t> f) {
465 boost::future<int32_t>
468 auto request = countdownlatch_getround_encode(group_id_, object_name_);
469 return invoke_and_get_future<int32_t>(request);
473 latch::count_down(
int round, boost::uuids::uuid invocation_uid)
475 auto request = countdownlatch_countdown_encode(
476 group_id_, object_name_, invocation_uid, round);
477 invoke(request).get();
483 return to_void_future(wait_for(std::chrono::milliseconds(INT64_MAX)));
486 boost::future<std::cv_status>
487 latch::wait_for(std::chrono::milliseconds timeout)
489 auto timeout_millis = std::max<int64_t>(0, timeout.count());
491 get_context().get_hazelcast_client_implementation()->random_uuid();
492 auto request = countdownlatch_await_encode(
493 group_id_, object_name_, invoation_uid, timeout_millis);
494 return invoke_and_get_future<bool>(request).then(
495 boost::launch::sync, [](boost::future<bool> f) {
496 return f.get() ? std::cv_status::no_timeout : std::cv_status::timeout;
501 raft_group_id::operator==(
const raft_group_id& rhs)
const
503 return name == rhs.name && seed == rhs.seed && group_id == rhs.group_id;
507 raft_group_id::operator!=(
const raft_group_id& rhs)
const
509 return !(rhs == *
this);
512 constexpr int64_t fenced_lock::INVALID_FENCE;
514 fenced_lock::fenced_lock(
const std::string& name,
515 client::spi::ClientContext& context,
516 const raft_group_id& group_id,
517 const std::string& object_name)
518 : session_aware_proxy(SERVICE_NAME,
523 context.get_proxy_session_manager())
529 return to_void_future(lock_and_get_fence());
532 boost::future<int64_t>
533 fenced_lock::lock_and_get_fence()
535 auto thread_id = util::get_current_thread_id();
536 auto invocation_uid = get_context().random_uuid();
538 auto do_lock_once = [=]() {
539 auto session_id = session_manager_.acquire_session(group_id_);
540 verify_locked_session_id_if_present(thread_id, session_id,
true);
541 return do_lock(session_id, thread_id, invocation_uid)
542 .then(boost::launch::sync, [=](boost::future<int64_t> f) {
544 auto fence = f.get();
545 if (fence != INVALID_FENCE) {
546 locked_session_ids_.put(
547 thread_id, std::make_shared<int64_t>(session_id));
550 BOOST_THROW_EXCEPTION(
551 client::exception::lock_acquire_limit_reached(
552 "fenced_lock::lock_and_get_fence",
553 (boost::format(
"Lock [%1%] reentrant lock limit is "
554 "already reached!") %
557 }
catch (client::exception::session_expired&) {
558 invalidate_session(session_id);
559 verify_no_locked_session_id_present(thread_id);
560 return INVALID_FENCE;
561 }
catch (client::exception::wait_key_cancelled&) {
562 release_session(session_id);
563 BOOST_THROW_EXCEPTION(
564 client::exception::lock_acquire_limit_reached(
565 "fenced_lock::lock_and_get_fence",
567 "Lock [%1%] not acquired because the lock call on the "
568 "CP group is cancelled, possibly because of another "
569 "indeterminate call from the same thread.") %
573 release_session(session_id);
579 return do_lock_once().then(
580 boost::launch::sync, [=](boost::future<int64_t> f) {
581 auto result = f.get();
582 if (result != INVALID_FENCE) {
586 for (result = do_lock_once().get(); result == INVALID_FENCE;) {
593 fenced_lock::verify_locked_session_id_if_present(int64_t thread_id,
597 auto locked_session_id = locked_session_ids_.get(thread_id);
598 if (locked_session_id && *locked_session_id != session_id) {
599 locked_session_ids_.remove(thread_id);
600 if (should_release) {
601 release_session(session_id);
604 throw_lock_ownership_lost(*locked_session_id);
609 fenced_lock::verify_no_locked_session_id_present(int64_t thread_id)
611 auto locked_session_id = locked_session_ids_.remove(thread_id);
612 if (locked_session_id) {
613 locked_session_ids_.remove(thread_id);
614 throw_lock_ownership_lost(*locked_session_id);
619 fenced_lock::throw_lock_ownership_lost(int64_t session_id)
const
621 BOOST_THROW_EXCEPTION(client::exception::lock_ownership_lost(
623 (boost::format(
"Current thread is not owner of the Lock[%1%] because its "
624 "Session[%2%] is closed by server!") %
625 get_name() % session_id)
630 fenced_lock::throw_illegal_monitor_state()
const
632 BOOST_THROW_EXCEPTION(client::exception::illegal_monitor_state(
634 (boost::format(
"Current thread is not owner of the Lock[%1%]") %
639 boost::future<int64_t>
640 fenced_lock::do_lock(int64_t session_id,
642 boost::uuids::uuid invocation_uid)
644 auto request = client::protocol::codec::fencedlock_lock_encode(
645 group_id_, object_name_, session_id, thread_id, invocation_uid);
646 return invoke_and_get_future<int64_t>(request);
649 boost::future<int64_t>
650 fenced_lock::do_try_lock(int64_t session_id,
652 boost::uuids::uuid invocation_uid,
653 std::chrono::milliseconds timeout)
655 auto request = client::protocol::codec::fencedlock_trylock_encode(
661 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
662 return invoke_and_get_future<int64_t>(request);
666 fenced_lock::do_unlock(int64_t session_id,
668 boost::uuids::uuid invocation_uid)
670 auto request = client::protocol::codec::fencedlock_unlock_encode(
671 group_id_, object_name_, session_id, thread_id, invocation_uid);
672 return invoke_and_get_future<bool>(request);
675 boost::future<fenced_lock::lock_ownership_state>
676 fenced_lock::do_get_lock_ownership_state()
678 auto request = client::protocol::codec::fencedlock_getlockownership_encode(
679 group_id_, object_name_);
680 return invoke(request).then(
682 [](boost::future<client::protocol::ClientMessage> f) {
684 fenced_lock::lock_ownership_state state;
685 state.fence = msg.get_first_fixed_sized_field<int64_t>();
686 state.lock_count = msg.get<int32_t>();
687 state.session_id = msg.get<int64_t>();
688 state.thread_id = msg.get<int64_t>();
694 fenced_lock::invalidate_session(int64_t session_id)
696 session_manager_.invalidate_session(group_id_, session_id);
700 fenced_lock::try_lock()
702 return try_lock_and_get_fence().then(
704 [](boost::future<int64_t> f) {
return f.get() != INVALID_FENCE; });
708 fenced_lock::try_lock(std::chrono::milliseconds timeout)
710 return try_lock_and_get_fence(timeout).then(
712 [](boost::future<int64_t> f) {
return f.get() != INVALID_FENCE; });
715 boost::future<int64_t>
716 fenced_lock::try_lock_and_get_fence()
718 return try_lock_and_get_fence(std::chrono::milliseconds(0));
721 boost::future<int64_t>
722 fenced_lock::try_lock_and_get_fence(std::chrono::milliseconds timeout)
724 auto thread_id = util::get_current_thread_id();
725 auto invocation_uid = get_context().random_uuid();
727 auto do_try_lock_once = [=]() {
728 using namespace std::chrono;
729 auto start = steady_clock::now();
730 auto session_id = session_manager_.acquire_session(group_id_);
731 verify_locked_session_id_if_present(thread_id, session_id,
true);
732 return do_try_lock(session_id, thread_id, invocation_uid, timeout)
733 .then(boost::launch::sync, [=](boost::future<int64_t> f) {
735 auto fence = f.get();
736 if (fence != INVALID_FENCE) {
737 locked_session_ids_.put(
738 thread_id, std::make_shared<int64_t>(session_id));
739 return std::make_pair(fence,
false);
741 release_session(session_id);
743 return std::make_pair(fence,
false);
744 }
catch (client::exception::session_expired&) {
745 invalidate_session(session_id);
746 verify_no_locked_session_id_present(thread_id);
747 auto timeout_left = timeout - (steady_clock::now() - start);
748 if (timeout_left.count() <= 0) {
749 return std::make_pair(INVALID_FENCE,
false);
751 return std::make_pair(INVALID_FENCE,
false);
752 }
catch (client::exception::wait_key_cancelled&) {
753 release_session(session_id);
754 return std::make_pair(INVALID_FENCE,
false);
756 release_session(session_id);
762 return do_try_lock_once().then(
763 boost::launch::sync, [=](boost::future<std::pair<int64_t, bool>> f) {
764 auto result = f.get();
765 if (!result.second) {
769 for (result = do_try_lock_once().get(); result.second;) {
776 fenced_lock::unlock()
778 auto thread_id = util::get_current_thread_id();
779 int64_t session_id = session_manager_.get_session(group_id_);
782 verify_locked_session_id_if_present(thread_id, session_id,
false);
783 if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
784 locked_session_ids_.remove(thread_id);
785 throw_illegal_monitor_state();
788 return do_unlock(session_id, thread_id, get_context().random_uuid())
789 .then(boost::launch::sync, [=](boost::future<bool> f) {
791 auto still_locked_by_current_thread = f.get();
792 if (still_locked_by_current_thread) {
793 locked_session_ids_.put(
794 thread_id, std::make_shared<int64_t>(session_id));
796 locked_session_ids_.remove(thread_id);
799 release_session(session_id);
800 }
catch (client::exception::session_expired&) {
801 invalidate_session(session_id);
802 locked_session_ids_.remove(thread_id);
804 throw_lock_ownership_lost(session_id);
805 }
catch (client::exception::illegal_monitor_state&) {
806 locked_session_ids_.remove(thread_id);
812 boost::future<int64_t>
813 fenced_lock::get_fence()
815 auto thread_id = util::get_current_thread_id();
816 int64_t session_id = session_manager_.get_session(group_id_);
819 verify_locked_session_id_if_present(thread_id, session_id,
false);
820 if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
821 locked_session_ids_.remove(thread_id);
822 throw_illegal_monitor_state();
825 return do_get_lock_ownership_state().then(
826 boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
827 auto ownership = f.get();
828 if (ownership.is_locked_by(session_id, thread_id)) {
829 locked_session_ids_.put(thread_id,
830 std::make_shared<int64_t>(session_id));
831 return ownership.fence;
834 verify_no_locked_session_id_present(thread_id);
835 throw_illegal_monitor_state();
836 return INVALID_FENCE;
841 fenced_lock::is_locked()
843 auto thread_id = util::get_current_thread_id();
844 int64_t session_id = session_manager_.get_session(group_id_);
846 verify_locked_session_id_if_present(thread_id, session_id,
false);
848 return do_get_lock_ownership_state().then(
849 boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
850 auto ownership = f.get();
851 if (ownership.is_locked_by(session_id, thread_id)) {
852 locked_session_ids_.put(thread_id,
853 std::make_shared<int64_t>(session_id));
857 verify_no_locked_session_id_present(thread_id);
858 return ownership.is_locked();
863 fenced_lock::is_locked_by_current_thread()
865 auto thread_id = util::get_current_thread_id();
866 int64_t session_id = session_manager_.get_session(group_id_);
868 verify_locked_session_id_if_present(thread_id, session_id,
false);
870 return do_get_lock_ownership_state().then(
871 boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
872 auto ownership = f.get();
873 auto locked_by_current_thread =
874 ownership.is_locked_by(session_id, thread_id);
875 if (locked_by_current_thread) {
876 locked_session_ids_.put(thread_id,
877 std::make_shared<int64_t>(session_id));
879 verify_no_locked_session_id_present(thread_id);
882 return locked_by_current_thread;
886 boost::future<int32_t>
887 fenced_lock::get_lock_count()
889 auto thread_id = util::get_current_thread_id();
890 int64_t session_id = session_manager_.get_session(group_id_);
892 verify_locked_session_id_if_present(thread_id, session_id,
false);
894 return do_get_lock_ownership_state().then(
895 boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
896 auto ownership = f.get();
897 if (ownership.is_locked_by(session_id, thread_id)) {
898 locked_session_ids_.put(thread_id,
899 std::make_shared<int64_t>(session_id));
901 verify_no_locked_session_id_present(thread_id);
904 return ownership.lock_count;
909 fenced_lock::get_group_id()
915 operator==(
const fenced_lock& lhs,
const fenced_lock& rhs)
917 return lhs.get_service_name() == rhs.get_service_name() &&
918 lhs.get_name() == rhs.get_name();
922 fenced_lock::post_destroy()
924 ClientProxy::post_destroy();
925 locked_session_ids_.clear();
928 session_aware_proxy::session_aware_proxy(
929 const std::string& service_name,
930 const std::string& proxy_name,
931 client::spi::ClientContext* context,
932 const raft_group_id& group_id,
933 const std::string& object_name,
934 internal::session::proxy_session_manager& session_manager)
935 : cp_proxy(service_name, proxy_name, context, group_id, object_name)
936 , session_manager_(session_manager)
942 session_manager_.release_session(group_id_, session_id);
946 fenced_lock::lock_ownership_state::is_locked_by(int64_t session, int64_t thread)
948 return is_locked() && session_id == session && thread_id == thread;
952 fenced_lock::lock_ownership_state::is_locked()
954 return fence != INVALID_FENCE;
957 counting_semaphore::counting_semaphore(
958 const std::string& proxy_name,
959 client::spi::ClientContext* context,
960 const raft_group_id& group_id,
961 const std::string& object_name,
962 internal::session::proxy_session_manager& session_manager)
963 : session_aware_proxy(SERVICE_NAME,
972 counting_semaphore::init(int32_t permits)
974 util::Preconditions::check_not_negative(permits,
975 "Permits must be non-negative!");
977 auto request = client::protocol::codec::semaphore_init_encode(
978 group_id_, object_name_, permits);
979 return client::spi::impl::ClientInvocation::create(
980 context_, request, object_name_)
982 .then(boost::launch::sync,
983 [](boost::future<client::protocol::ClientMessage> f) {
984 return f.get().get_first_fixed_sized_field<bool>();
989 counting_semaphore::try_acquire(int32_t permits)
991 return try_acquire_for(std::chrono::milliseconds::zero(), permits);
995 counting_semaphore::try_acquire_for(std::chrono::milliseconds rel_time,
998 if (rel_time < std::chrono::milliseconds::zero()) {
999 rel_time = std::chrono::milliseconds ::zero();
1001 return try_acquire_for_millis(permits, rel_time);
1005 counting_semaphore::do_release(int32_t permits,
1009 auto invocation_uid =
1010 get_context().get_hazelcast_client_implementation()->random_uuid();
1011 auto request = codec::semaphore_release_encode(
1012 group_id_, object_name_, session_id, thread_id, invocation_uid, permits);
1013 return to_void_future(client::spi::impl::ClientInvocation::create(
1014 context_, request, object_name_)
1018 boost::future<int32_t>
1019 counting_semaphore::available_permits()
1022 codec::semaphore_availablepermits_encode(group_id_, object_name_);
1023 return decode<int32_t>(client::spi::impl::ClientInvocation::create(
1024 context_, request, object_name_)
1029 counting_semaphore::reduce_permits(int32_t reduction)
1031 util::Preconditions::check_not_negative(reduction,
1032 "Reduction must be non-negative!");
1033 if (reduction == 0) {
1034 return boost::make_ready_future();
1036 return do_change_permits(-reduction);
1040 counting_semaphore::increase_permits(int32_t increase)
1042 util::Preconditions::check_not_negative(increase,
1043 "Increase must be non-negative!");
1044 if (increase == 0) {
1045 return boost::make_ready_future();
1047 return do_change_permits(increase);
1050 sessionless_semaphore::sessionless_semaphore(
1051 const std::string& proxy_name,
1052 client::spi::ClientContext* context,
1053 const raft_group_id& group_id,
1054 const std::string& object_name,
1055 internal::session::proxy_session_manager& session_manager)
1056 : counting_semaphore(proxy_name,
1064 sessionless_semaphore::acquire(int32_t permits)
1066 util::Preconditions::check_positive(permits,
1067 "permits must be positive number.");
1069 return to_void_future(
1070 do_try_acquire(permits, std::chrono::milliseconds(-1)));
1074 sessionless_semaphore::do_try_acquire(int32_t permits,
1075 std::chrono::milliseconds timeout_ms)
1077 auto cluster_wide_threadId = get_thread_id();
1078 auto invocation_uid =
1079 get_context().get_hazelcast_client_implementation()->random_uuid();
1080 auto request = client::protocol::codec::semaphore_acquire_encode(
1083 internal::session::proxy_session_manager::NO_SESSION_ID,
1084 cluster_wide_threadId,
1087 timeout_ms.count());
1088 return client::spi::impl::ClientInvocation::create(
1089 context_, request, object_name_)
1092 boost::launch::sync,
1093 [=](boost::future<client::protocol::ClientMessage> f) {
1095 return f.get().get_first_fixed_sized_field<bool>();
1096 }
catch (client::exception::wait_key_cancelled&) {
1097 throw client::exception::illegal_state(
1098 "sessionless_semaphore::acquire",
1100 "Semaphore[%1%] ] not acquired because the acquire call "
1101 "on the CP group is cancelled, possibly because of "
1102 "another indeterminate call from the same thread.") %
1110 sessionless_semaphore::try_acquire_for_millis(int32_t permits,
1111 std::chrono::milliseconds timeout)
1113 util::Preconditions::check_positive(permits,
"Permits must be positive!");
1115 return do_try_acquire(permits,
1116 timeout > std::chrono::milliseconds::zero()
1118 : std::chrono::milliseconds::zero());
1122 sessionless_semaphore::release(int32_t permits)
1124 util::Preconditions::check_positive(permits,
"Permits must be positive!");
1125 auto thread_id = get_thread_id();
1126 return do_release(permits,
1128 internal::session::proxy_session_manager::NO_SESSION_ID);
1132 sessionless_semaphore::get_thread_id()
1134 return session_manager_.get_or_create_unique_thread_id(group_id_);
1137 boost::future<int32_t>
1138 sessionless_semaphore::drain_permits()
1140 auto cluster_wide_threadId = get_thread_id();
1141 auto invocation_uid =
1142 get_context().get_hazelcast_client_implementation()->random_uuid();
1143 auto request = client::protocol::codec::semaphore_drain_encode(
1146 internal::session::proxy_session_manager::NO_SESSION_ID,
1147 cluster_wide_threadId,
1149 return decode<int32_t>(client::spi::impl::ClientInvocation::create(
1150 context_, request, object_name_)
1155 sessionless_semaphore::do_change_permits(int32_t delta)
1157 auto cluster_wide_threadId = get_thread_id();
1158 auto invocation_uid =
1159 get_context().get_hazelcast_client_implementation()->random_uuid();
1160 auto request = client::protocol::codec::semaphore_change_encode(
1163 internal::session::proxy_session_manager::NO_SESSION_ID,
1164 cluster_wide_threadId,
1167 return to_void_future(client::spi::impl::ClientInvocation::create(
1168 context_, request, object_name_)
1172 session_semaphore::session_semaphore(
1173 const std::string& proxy_name,
1174 client::spi::ClientContext* context,
1175 const raft_group_id& group_id,
1176 const std::string& object_name,
1177 internal::session::proxy_session_manager& session_manager)
1178 : counting_semaphore(proxy_name,
1186 session_semaphore::acquire(int32_t permits)
1188 return to_void_future(
1189 try_acquire_for_millis(permits, std::chrono::milliseconds(-1)));
1193 session_semaphore::try_acquire_for_millis(int32_t permits,
1194 std::chrono::milliseconds timeout)
1196 util::Preconditions::check_not_negative(
1197 permits,
"permits must not be negative number.");
1199 auto thread_id = get_thread_id();
1200 auto invocation_uid =
1201 get_context().get_hazelcast_client_implementation()->random_uuid();
1203 auto do_try_acquire_once = ([=]() {
1204 auto start = std::chrono::steady_clock::now();
1205 auto use_timeout = timeout >= std::chrono::milliseconds::zero();
1206 auto session_id = session_manager_.acquire_session(group_id_, permits);
1208 client::protocol::codec::semaphore_acquire_encode(group_id_,
1215 return client::spi::impl::ClientInvocation::create(
1216 context_, request, object_name_)
1219 boost::launch::sync,
1220 [=](boost::future<client::protocol::ClientMessage> f) {
1222 auto acquired = f.get().get_first_fixed_sized_field<bool>();
1224 session_manager_.release_session(group_id_, session_id);
1228 return std::make_pair(acquired,
false);
1229 }
catch (client::exception::session_expired&) {
1230 session_manager_.invalidate_session(group_id_, session_id);
1232 (timeout - (std::chrono::steady_clock::now() - start) <=
1233 std::chrono::milliseconds::zero())) {
1234 return std::make_pair(
false,
false);
1236 return std::make_pair(
false,
true);
1237 }
catch (client::exception::wait_key_cancelled&) {
1238 session_manager_.release_session(
1239 group_id_, session_id, permits);
1241 BOOST_THROW_EXCEPTION(client::exception::illegal_state(
1242 "session_semaphore::try_acquire_for_millis",
1244 "Semaphore[%1%] not acquired because the acquire "
1245 "call on the CP group is cancelled, possibly "
1246 "because of another indeterminate call from the "
1251 return std::make_pair(
false,
false);
1256 return do_try_acquire_once().then(
1257 boost::launch::sync, [=](boost::future<std::pair<bool, bool>> f) {
1258 auto result = f.get();
1259 if (!result.second) {
1260 return result.first;
1262 for (; result.second; result = do_try_acquire_once().get())
1264 return result.first;
1269 session_semaphore::release(int32_t permits)
1271 util::Preconditions::check_positive(permits,
"Permits must be positive!");
1272 auto session_id = session_manager_.get_session(group_id_);
1273 if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
1274 throw_illegal_state_exception(
nullptr);
1277 auto thread_id = get_thread_id();
1278 return do_release(permits, thread_id, session_id)
1279 .then([=](boost::future<void> f) {
1282 session_manager_.release_session(group_id_, session_id, permits);
1283 }
catch (client::exception::session_expired&) {
1284 session_manager_.invalidate_session(group_id_, session_id);
1285 session_manager_.release_session(group_id_, session_id, permits);
1286 throw_illegal_state_exception(std::current_exception());
1292 session_semaphore::throw_illegal_state_exception(std::exception_ptr e)
1294 auto ise = boost::enable_current_exception(client::exception::illegal_state(
1295 "session_semaphore::illegal_state",
"No valid session!"));
1300 std::rethrow_exception(e);
1302 std::throw_with_nested(ise);
1307 session_semaphore::get_thread_id()
1309 return util::get_current_thread_id();
1312 boost::future<int32_t>
1313 session_semaphore::drain_permits()
1315 auto thread_id = get_thread_id();
1316 auto invocation_uid =
1317 get_context().get_hazelcast_client_implementation()->random_uuid();
1319 auto do_drain_once = ([=]() {
1322 auto request = client::protocol::codec::semaphore_drain_encode(
1323 group_id_, object_name_, session_id, thread_id, invocation_uid);
1324 return client::spi::impl::ClientInvocation::create(
1325 context_, request, object_name_)
1328 boost::launch::sync,
1329 [=](boost::future<client::protocol::ClientMessage> f) {
1331 auto count = f.get().get_first_fixed_sized_field<int32_t>();
1332 session_manager_.release_session(
1335 }
catch (client::exception::session_expired&) {
1336 session_manager_.invalidate_session(group_id_, session_id);
1342 return do_drain_once().then(
1343 boost::launch::sync, [=](boost::future<int32_t> f) {
1344 int32_t count = f.get();
1348 while ((count = do_drain_once().get()) == -1) {
1355 session_semaphore::do_change_permits(int32_t delta)
1359 auto thread_id = get_thread_id();
1360 auto invocation_uid =
1361 get_context().get_hazelcast_client_implementation()->random_uuid();
1363 auto request = client::protocol::codec::semaphore_change_encode(
1364 group_id_, object_name_, session_id, thread_id, invocation_uid, delta);
1365 return client::spi::impl::ClientInvocation::create(
1366 context_, request, object_name_)
1368 .then(boost::launch::sync,
1369 [=](boost::future<client::protocol::ClientMessage> f) {
1372 session_manager_.release_session(group_id_, session_id);
1373 }
catch (client::exception::session_expired&) {
1374 session_manager_.invalidate_session(group_id_, session_id);
1375 throw_illegal_state_exception(std::current_exception());
1384 hash<hazelcast::cp::raft_group_id>::operator()(
1387 std::size_t seed = 0;
1388 boost::hash_combine(seed, group_id.name);
1389 boost::hash_combine(seed, group_id.seed);
1390 boost::hash_combine(seed, group_id.group_id);
1397 hash<hazelcast::cp::raft_group_id>::operator()(
Client-side Raft-based proxy implementation of atomic long.
boost::future< int64_t > get_and_add(int64_t delta)
Atomically adds the given value to the current value.
boost::future< int64_t > get()
Gets the current value.
boost::future< int64_t > decrement_and_get()
Atomically decrements the current value by one.
boost::future< int64_t > get_and_increment()
Atomically increments the current value by one.
boost::future< bool > compare_and_set(int64_t expect, int64_t update)
Atomically sets the value to the given updated value only if the current value.
boost::future< int64_t > get_and_set(int64_t new_value)
Atomically sets the given value and returns the old value.
boost::future< void > set(int64_t new_value)
Atomically sets the given value.
boost::future< int64_t > increment_and_get()
Atomically increments the current value by one.
boost::future< int64_t > get_and_decrement()
Atomically decrements the current value by one.
boost::future< int64_t > add_and_get(int64_t delta)
Atomically adds the given value to the current value.
void release_session(int64_t session_id)
Decrements acquire count of the session.
static constexpr int32_t DRAIN_SESSION_ACQ_COUNT
Since a proxy does not know how many permits will be drained on the Raft group, it uses this constant...