18 #include <boost/algorithm/string.hpp>
20 #include "hazelcast/cp/cp.h"
21 #include "hazelcast/client/spi/ClientContext.h"
22 #include "hazelcast/util/Preconditions.h"
23 #include "hazelcast/client/protocol/codec/codecs.h"
24 #include "hazelcast/client/protocol/ClientMessage.h"
25 #include "hazelcast/client/spi/impl/ClientInvocation.h"
26 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
30 using namespace hazelcast::client::protocol;
31 using namespace hazelcast::client::protocol::codec;
32 using namespace hazelcast::util;
34 raft_proxy_factory::raft_proxy_factory(client::spi::ClientContext &context) : context_(context) {}
36 std::shared_ptr<fenced_lock>
37 raft_proxy_factory::create_fenced_lock(raft_group_id &&group_id,
const std::string &proxy_name,
38 const std::string &object_name) {
40 auto proxy = lock_proxies_.get(proxy_name);
42 if (proxy->get_group_id() != group_id) {
43 lock_proxies_.remove(proxy_name, proxy);
49 proxy = std::make_shared<fenced_lock>(proxy_name, context_, group_id, object_name);
50 auto existing = lock_proxies_.put_if_absent(proxy_name, proxy);
53 }
else if (existing->get_group_id() == group_id) {
57 group_id = get_group_id(proxy_name, object_name).get();
61 std::shared_ptr<counting_semaphore>
62 raft_proxy_factory::create_semaphore(raft_group_id &&group_id,
const std::string &proxy_name,
63 const std::string &object_name) {
64 auto request = client::protocol::codec::semaphore_getsemaphoretype_encode(proxy_name);
65 auto is_sessionless = client::spi::impl::ClientInvocation::create(context_, request,
66 object_name)->invoke().get().get_first_fixed_sized_field<
bool>();
68 return std::shared_ptr<counting_semaphore>(
69 new sessionless_semaphore(proxy_name, &context_, group_id, object_name,
70 context_.get_proxy_session_manager()));
72 return std::shared_ptr<counting_semaphore>(
73 new session_semaphore(proxy_name, &context_, group_id, object_name,
74 context_.get_proxy_session_manager()));
78 std::string raft_proxy_factory::without_default_group_name(
const std::string &n) {
81 auto index = name.find(
'@');
82 if (index == std::string::npos) {
86 Preconditions::check_true(name.find(
'@', index + 1) == std::string::npos,
87 "Custom group name must be specified at most once");
89 auto group_name = name.substr(index + 1);
90 boost::trim(group_name);
91 if (group_name == DEFAULT_GROUP_NAME) {
92 return name.substr(0, index);
97 std::string raft_proxy_factory::object_name_for_proxy(
const std::string &name) {
98 auto index = name.find(
'@');
99 if (index == std::string::npos) {
103 Preconditions::check_true(index < (name.size() - 1),
104 "Object name cannot be empty string");
105 Preconditions::check_true(name.find(
'@', index + 1) == std::string::npos,
106 "Custom CP group name must be specified at most once");
108 auto object_name = name.substr(0, index);
109 boost::trim(object_name);
110 Preconditions::check_true(object_name.size() > 0,
111 "Object name cannot be empty string");
115 boost::future<raft_group_id> raft_proxy_factory::get_group_id(
const std::string &proxy_name,
const std::string &object_name) {
116 auto request = cpgroup_createcpgroup_encode(proxy_name);
117 return client::spi::impl::ClientInvocation::create(context_, request, object_name)->invoke().then(
118 boost::launch::sync, [](boost::future<client::protocol::ClientMessage> f) {
119 return *f.get().get_first_var_sized_field<raft_group_id>();
123 cp_subsystem::cp_subsystem(client::spi::ClientContext &context) : context_(context), proxy_factory_(context) {
126 boost::future<std::shared_ptr<atomic_long>> cp_subsystem::get_atomic_long(
const std::string &name) {
127 return proxy_factory_.create_proxy<
atomic_long>(name);
130 boost::future<std::shared_ptr<atomic_reference>> cp_subsystem::get_atomic_reference(
const std::string &name) {
131 return proxy_factory_.create_proxy<atomic_reference>(name);
134 boost::future<std::shared_ptr<latch>> cp_subsystem::get_latch(
const std::string &name) {
135 return proxy_factory_.create_proxy<latch>(name);
138 boost::future<std::shared_ptr<fenced_lock>> cp_subsystem::get_lock(
const std::string &name) {
139 return proxy_factory_.create_proxy<fenced_lock>(name);
142 boost::future<std::shared_ptr<counting_semaphore>> cp_subsystem::get_semaphore(
const std::string &name) {
143 return proxy_factory_.create_proxy<counting_semaphore>(name);
146 cp_proxy::cp_proxy(
const std::string &service_name,
const std::string &proxy_name,
147 client::spi::ClientContext *context,
148 const raft_group_id &group_id,
const std::string &object_name) : ProxyImpl(service_name,
152 object_name_(object_name) {}
154 void cp_proxy::on_destroy() {
155 auto request = cpgroup_destroycpobject_encode(group_id_, get_service_name(), object_name_);
156 invoke(request).get();
159 const raft_group_id &cp_proxy::get_group_id()
const {
163 atomic_long::atomic_long(
const std::string &name, client::spi::ClientContext &context,
164 const raft_group_id &group_id,
const std::string &object_name)
165 : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name) {}
168 auto request = atomiclong_addandget_encode(group_id_, object_name_, delta);
169 return invoke_and_get_future<int64_t>(request);
173 auto request = atomiclong_compareandset_encode(group_id_, object_name_, expect, update);
174 return invoke_and_get_future<bool>(request);
186 auto request = atomiclong_get_encode(group_id_, object_name_);
187 return invoke_and_get_future<int64_t>(request);
191 auto request = atomiclong_getandadd_encode(group_id_, object_name_, delta);
192 return invoke_and_get_future<int64_t>(request);
196 auto request = atomiclong_getandset_encode(group_id_, object_name_, new_value);
197 return invoke_and_get_future<int64_t>(request);
212 boost::future<int64_t> atomic_long::alter_data(client::serialization::pimpl::data &function_data,
213 alter_result_type result_type) {
214 auto request = atomiclong_alter_encode(group_id_, object_name_, function_data,
215 static_cast<int32_t
>(result_type));
216 return invoke_and_get_future<int64_t>(request);
219 boost::future<boost::optional<client::serialization::pimpl::data>>
220 atomic_long::apply_data(client::serialization::pimpl::data &function_data) {
221 auto request = atomiclong_apply_encode(group_id_, object_name_, function_data);
222 return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
225 atomic_reference::atomic_reference(
const std::string &name, client::spi::ClientContext &context,
226 const raft_group_id &group_id,
const std::string &object_name)
227 : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name) {}
229 boost::future<boost::optional<client::serialization::pimpl::data>> atomic_reference::get_data() {
230 auto request = atomicref_get_encode(group_id_, object_name_);
231 return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
234 boost::future<boost::optional<client::serialization::pimpl::data>>
235 atomic_reference::set_data(
const client::serialization::pimpl::data &new_value_data) {
236 auto request = atomicref_set_encode(group_id_, object_name_, &new_value_data,
false);
237 return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
240 boost::future<boost::optional<client::serialization::pimpl::data>>
241 atomic_reference::get_and_set_data(
const client::serialization::pimpl::data &new_value_data) {
242 auto request = atomicref_set_encode(group_id_, object_name_, &new_value_data,
true);
243 return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
247 atomic_reference::compare_and_set_data(
const client::serialization::pimpl::data &expect_data,
248 const client::serialization::pimpl::data &update_data) {
249 auto request = atomicref_compareandset_encode(group_id_, object_name_, &expect_data, &update_data);
250 return invoke_and_get_future<bool>(request);
253 boost::future<bool> atomic_reference::contains_data(
const client::serialization::pimpl::data &value_data) {
254 auto request = atomicref_contains_encode(group_id_, object_name_, &value_data);
255 return invoke_and_get_future<bool>(request);
258 boost::future<void> atomic_reference::alter_data(
const client::serialization::pimpl::data &function_data) {
259 return to_void_future(invoke_apply(function_data, return_value_type::NO_VALUE,
true));
262 boost::future<boost::optional<client::serialization::pimpl::data>>
263 atomic_reference::alter_and_get_data(
const client::serialization::pimpl::data &function_data) {
264 return invoke_apply(function_data, return_value_type::NEW,
true);
267 boost::future<boost::optional<client::serialization::pimpl::data>>
268 atomic_reference::get_and_alter_data(
const client::serialization::pimpl::data &function_data) {
269 return invoke_apply(function_data, return_value_type::OLD,
true);
272 boost::future<boost::optional<client::serialization::pimpl::data>>
273 atomic_reference::apply_data(
const client::serialization::pimpl::data &function_data) {
274 return invoke_apply(function_data, return_value_type::NEW,
false);
277 boost::future<bool> atomic_reference::is_null() {
278 return contains(
static_cast<byte *
>(
nullptr));
281 boost::future<void> atomic_reference::clear() {
282 return to_void_future(set(
static_cast<byte *
>(
nullptr)));
285 boost::future<boost::optional<client::serialization::pimpl::data>>
286 atomic_reference::invoke_apply(
const client::serialization::pimpl::data function_data,
287 return_value_type return_type,
bool alter) {
288 auto request = atomicref_apply_encode(group_id_, object_name_, function_data,
289 static_cast<int32_t
>(return_type), alter);
290 return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
293 latch::latch(
const std::string &name, client::spi::ClientContext &context,
const raft_group_id &group_id,
294 const std::string &object_name) : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name) {}
296 boost::future<bool> latch::try_set_count(int32_t count) {
297 util::Preconditions::check_positive(count,
"count must be positive!");
299 auto request = countdownlatch_trysetcount_encode(group_id_, object_name_, count);
300 return invoke_and_get_future<bool>(request);
303 boost::future<int32_t> latch::get_count() {
304 auto request = countdownlatch_getcount_encode(group_id_, object_name_);
305 return invoke_and_get_future<int32_t>(request);
308 boost::future<void> latch::count_down() {
309 auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
310 return get_round().then(boost::launch::sync, [=](boost::future<int32_t> f) {
311 auto round = f.get();
314 count_down(round, invocation_uid);
316 }
catch (client::exception::operation_timeout &) {
324 boost::future<bool> latch::try_wait() {
325 return get_count().then(boost::launch::sync, [](boost::future<int32_t> f) {
330 boost::future<int32_t> latch::get_round() {
331 auto request = countdownlatch_getround_encode(group_id_, object_name_);
332 return invoke_and_get_future<int32_t>(request);
335 void latch::count_down(
int round, boost::uuids::uuid invocation_uid) {
336 auto request = countdownlatch_countdown_encode(group_id_, object_name_, invocation_uid, round);
337 invoke(request).get();
340 boost::future<void> latch::wait() {
341 return to_void_future(wait_for(std::chrono::milliseconds(INT64_MAX)));
344 boost::future<std::cv_status> latch::wait_for(std::chrono::milliseconds timeout) {
345 auto timeout_millis = std::max<int64_t>(0, timeout.count());
346 auto invoation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
347 auto request = countdownlatch_await_encode(group_id_, object_name_, invoation_uid, timeout_millis);
348 return invoke_and_get_future<bool>(request).then(boost::launch::sync, [](boost::future<bool> f) {
349 return f.get() ? std::cv_status::no_timeout : std::cv_status::timeout;
353 bool raft_group_id::operator==(
const raft_group_id &rhs)
const {
354 return name == rhs.name &&
356 group_id == rhs.group_id;
359 bool raft_group_id::operator!=(
const raft_group_id &rhs)
const {
360 return !(rhs == *
this);
363 constexpr int64_t fenced_lock::INVALID_FENCE;
365 fenced_lock::fenced_lock(
const std::string &name, client::spi::ClientContext &context,
366 const raft_group_id &group_id,
367 const std::string &object_name) : session_aware_proxy(SERVICE_NAME, name, &context,
368 group_id, object_name,
369 context.get_proxy_session_manager()) {}
371 boost::future<void> fenced_lock::lock() {
372 return to_void_future(lock_and_get_fence());
375 boost::future<int64_t> fenced_lock::lock_and_get_fence() {
376 auto thread_id = util::get_current_thread_id();
377 auto invocation_uid = get_context().random_uuid();
379 auto do_lock_once = [=] () {
380 auto session_id = session_manager_.acquire_session(group_id_);
381 verify_locked_session_id_if_present(thread_id, session_id,
true);
382 return do_lock(session_id, thread_id, invocation_uid).then(boost::launch::sync,
383 [=](boost::future<int64_t> f) {
385 auto fence = f.get();
386 if (fence != INVALID_FENCE) {
387 locked_session_ids_.put(
389 std::make_shared<int64_t>(
393 BOOST_THROW_EXCEPTION(
394 client::exception::lock_acquire_limit_reached(
395 "fenced_lock::lock_and_get_fence", (
397 "Lock [%1%] reentrant lock limit is already reached!") %
398 object_name_).str()));
399 }
catch (client::exception::session_expired &) {
400 invalidate_session(session_id);
401 verify_no_locked_session_id_present(thread_id);
402 return INVALID_FENCE;
403 }
catch (client::exception::wait_key_cancelled &) {
404 release_session(session_id);
405 BOOST_THROW_EXCEPTION(client::exception::lock_acquire_limit_reached(
406 "fenced_lock::lock_and_get_fence", (boost::format(
407 "Lock [%1%] not acquired because the lock call on the CP group is cancelled, possibly because of another indeterminate call from the same thread.") %
408 object_name_).str()));
410 release_session(session_id);
416 return do_lock_once().then(boost::launch::sync, [=](boost::future<int64_t> f) {
417 auto result = f.get();
418 if (result != INVALID_FENCE) {
422 for (result = do_lock_once().get(); result == INVALID_FENCE;) {}
427 void fenced_lock::verify_locked_session_id_if_present(int64_t thread_id, int64_t session_id,
428 bool should_release) {
429 auto locked_session_id = locked_session_ids_.get(thread_id);
430 if (locked_session_id && *locked_session_id != session_id) {
431 locked_session_ids_.remove(thread_id);
432 if (should_release) {
433 release_session(session_id);
436 throw_lock_ownership_lost(*locked_session_id);
440 void fenced_lock::verify_no_locked_session_id_present(int64_t thread_id) {
441 auto locked_session_id = locked_session_ids_.remove(thread_id);
442 if (locked_session_id) {
443 locked_session_ids_.remove(thread_id);
444 throw_lock_ownership_lost(*locked_session_id);
448 void fenced_lock::throw_lock_ownership_lost(int64_t session_id)
const {
449 BOOST_THROW_EXCEPTION(client::exception::lock_ownership_lost(
"fenced_lock", (boost::format(
450 "Current thread is not owner of the Lock[%1%] because its Session[%2%] is closed by server!") %
451 get_name() % session_id).str()));
454 void fenced_lock::throw_illegal_monitor_state()
const {
455 BOOST_THROW_EXCEPTION(client::exception::illegal_monitor_state(
"fenced_lock", (boost::format(
456 "Current thread is not owner of the Lock[%1%]") %get_name()).str()));
459 boost::future<int64_t>
460 fenced_lock::do_lock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid) {
461 auto request = client::protocol::codec::fencedlock_lock_encode(group_id_, object_name_, session_id, thread_id, invocation_uid);
462 return invoke_and_get_future<int64_t>(request);
465 boost::future<int64_t>
466 fenced_lock::do_try_lock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid,
467 std::chrono::milliseconds timeout) {
468 auto request = client::protocol::codec::fencedlock_trylock_encode(group_id_, object_name_, session_id,
469 thread_id, invocation_uid,
470 std::chrono::duration_cast<std::chrono::milliseconds>(
472 return invoke_and_get_future<int64_t>(request);
476 fenced_lock::do_unlock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid) {
477 auto request = client::protocol::codec::fencedlock_unlock_encode(group_id_, object_name_, session_id, thread_id, invocation_uid);
478 return invoke_and_get_future<bool>(request);
481 boost::future<fenced_lock::lock_ownership_state> fenced_lock::do_get_lock_ownership_state(){
482 auto request = client::protocol::codec::fencedlock_getlockownership_encode(group_id_, object_name_);
483 return invoke(request).then(boost::launch::sync, [](boost::future<client::protocol::ClientMessage> f) {
485 fenced_lock::lock_ownership_state state;
486 state.fence = msg.get_first_fixed_sized_field<int64_t>();
487 state.lock_count = msg.get<int32_t>();
488 state.session_id = msg.get<int64_t>();
489 state.thread_id = msg.get<int64_t>();
494 void fenced_lock::invalidate_session(int64_t session_id) {
495 session_manager_.invalidate_session(group_id_, session_id);
498 boost::future<bool> fenced_lock::try_lock() {
499 return try_lock_and_get_fence().then(boost::launch::sync, [](boost::future<int64_t> f) {
500 return f.get() != INVALID_FENCE;
504 boost::future<bool> fenced_lock::try_lock(std::chrono::milliseconds timeout) {
505 return try_lock_and_get_fence(timeout).then(boost::launch::sync, [](boost::future<int64_t> f) {
506 return f.get() != INVALID_FENCE;
510 boost::future<int64_t> fenced_lock::try_lock_and_get_fence() {
511 return try_lock_and_get_fence(std::chrono::milliseconds(0));
514 boost::future<int64_t>
515 fenced_lock::try_lock_and_get_fence(std::chrono::milliseconds timeout) {
516 auto thread_id = util::get_current_thread_id();
517 auto invocation_uid = get_context().random_uuid();
519 auto do_try_lock_once = [=] () {
520 using namespace std::chrono;
521 auto start = steady_clock::now();
522 auto session_id = session_manager_.acquire_session(group_id_);
523 verify_locked_session_id_if_present(thread_id, session_id,
true);
524 return do_try_lock(session_id, thread_id, invocation_uid, timeout).then(boost::launch::sync,
525 [=](boost::future<int64_t> f) {
527 auto fence = f.get();
530 locked_session_ids_.put(
532 std::make_shared<int64_t>(
534 return std::make_pair(
541 return std::make_pair(
544 client::exception::session_expired &) {
545 invalidate_session(session_id);
546 verify_no_locked_session_id_present(thread_id);
547 auto timeout_left = timeout - (steady_clock::now() - start);
548 if (timeout_left.count() <= 0) {
549 return std::make_pair(INVALID_FENCE,
false);
551 return std::make_pair(INVALID_FENCE,
false);
552 }
catch (client::exception::wait_key_cancelled &) {
553 release_session(session_id);
554 return std::make_pair(INVALID_FENCE,
false);
556 release_session(session_id);
562 return do_try_lock_once().then(boost::launch::sync, [=](boost::future<std::pair<int64_t, bool>> f) {
563 auto result = f.get();
564 if (!result.second) {
568 for (result = do_try_lock_once().get(); result.second;) {}
573 boost::future<void> fenced_lock::unlock() {
574 auto thread_id = util::get_current_thread_id();
575 int64_t session_id = session_manager_.get_session(group_id_);
578 verify_locked_session_id_if_present(thread_id, session_id,
false);
579 if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
580 locked_session_ids_.remove(thread_id);
581 throw_illegal_monitor_state();
584 return do_unlock(session_id, thread_id, get_context().random_uuid()).then(boost::launch::sync,
585 [=](boost::future<bool> f) {
587 auto still_locked_by_current_thread = f.get();
588 if (still_locked_by_current_thread) {
589 locked_session_ids_.put(
591 std::make_shared<int64_t>(
594 locked_session_ids_.remove(
601 client::exception::session_expired &) {
602 invalidate_session(session_id);
603 locked_session_ids_.remove(thread_id);
605 throw_lock_ownership_lost(session_id);
606 }
catch (client::exception::illegal_monitor_state &) {
607 locked_session_ids_.remove(thread_id);
613 boost::future<int64_t> fenced_lock::get_fence() {
614 auto thread_id = util::get_current_thread_id();
615 int64_t session_id = session_manager_.get_session(group_id_);
618 verify_locked_session_id_if_present(thread_id, session_id,
false);
619 if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
620 locked_session_ids_.remove(thread_id);
621 throw_illegal_monitor_state();
624 return do_get_lock_ownership_state().then(boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
625 auto ownership = f.get();
626 if (ownership.is_locked_by(session_id, thread_id)) {
627 locked_session_ids_.put(thread_id, std::make_shared<int64_t>(session_id));
628 return ownership.fence;
631 verify_no_locked_session_id_present(thread_id);
632 throw_illegal_monitor_state();
633 return INVALID_FENCE;
637 boost::future<bool> fenced_lock::is_locked() {
638 auto thread_id = util::get_current_thread_id();
639 int64_t session_id = session_manager_.get_session(group_id_);
641 verify_locked_session_id_if_present(thread_id, session_id,
false);
643 return do_get_lock_ownership_state().then(boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
644 auto ownership = f.get();
645 if (ownership.is_locked_by(session_id, thread_id)) {
646 locked_session_ids_.put(thread_id, std::make_shared<int64_t>(session_id));
650 verify_no_locked_session_id_present(thread_id);
651 return ownership.is_locked();
655 boost::future<bool> fenced_lock::is_locked_by_current_thread() {
656 auto thread_id = util::get_current_thread_id();
657 int64_t session_id = session_manager_.get_session(group_id_);
659 verify_locked_session_id_if_present(thread_id, session_id,
false);
661 return do_get_lock_ownership_state().then(boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
662 auto ownership = f.get();
663 auto locked_by_current_thread = ownership.is_locked_by(session_id, thread_id);
664 if (locked_by_current_thread) {
665 locked_session_ids_.put(thread_id, std::make_shared<int64_t>(session_id));
667 verify_no_locked_session_id_present(thread_id);
670 return locked_by_current_thread;
674 boost::future<int32_t> fenced_lock::get_lock_count() {
675 auto thread_id = util::get_current_thread_id();
676 int64_t session_id = session_manager_.get_session(group_id_);
678 verify_locked_session_id_if_present(thread_id, session_id,
false);
680 return do_get_lock_ownership_state().then(boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
681 auto ownership = f.get();
682 if (ownership.is_locked_by(session_id, thread_id)) {
683 locked_session_ids_.put(thread_id, std::make_shared<int64_t>(session_id));
685 verify_no_locked_session_id_present(thread_id);
688 return ownership.lock_count;
692 const raft_group_id &fenced_lock::get_group_id() {
696 bool operator==(
const fenced_lock &lhs,
const fenced_lock &rhs) {
697 return lhs.get_service_name() == rhs.get_service_name() && lhs.get_name() == rhs.get_name();
700 void fenced_lock::post_destroy() {
701 ClientProxy::post_destroy();
702 locked_session_ids_.clear();
705 session_aware_proxy::session_aware_proxy(
const std::string &service_name,
const std::string &proxy_name,
706 client::spi::ClientContext *context,
const raft_group_id &group_id,
707 const std::string &object_name,
708 internal::session::proxy_session_manager &session_manager) : cp_proxy(
709 service_name, proxy_name, context, group_id, object_name), session_manager_(session_manager) {}
712 session_manager_.release_session(group_id_, session_id);
715 bool fenced_lock::lock_ownership_state::is_locked_by(int64_t session, int64_t thread) {
716 return is_locked() && session_id == session && thread_id == thread;
719 bool fenced_lock::lock_ownership_state::is_locked() {
720 return fence != INVALID_FENCE;
723 counting_semaphore::counting_semaphore(
const std::string &proxy_name, client::spi::ClientContext *context,
724 const raft_group_id &group_id,
const std::string &object_name,
725 internal::session::proxy_session_manager &session_manager)
726 : session_aware_proxy(SERVICE_NAME,
729 object_name, session_manager) {}
731 boost::future<bool> counting_semaphore::init(int32_t permits) {
732 util::Preconditions::check_not_negative(permits,
"Permits must be non-negative!");
734 auto request = client::protocol::codec::semaphore_init_encode(group_id_, object_name_, permits);
735 return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
736 boost::launch::sync, [](boost::future<client::protocol::ClientMessage> f) {
737 return f.get().get_first_fixed_sized_field<bool>();
741 boost::future<bool> counting_semaphore::try_acquire(int32_t permits) {
742 return try_acquire_for(std::chrono::milliseconds::zero(), permits);
745 boost::future<bool> counting_semaphore::try_acquire_for(std::chrono::milliseconds rel_time, int32_t permits) {
746 if (rel_time < std::chrono::milliseconds::zero()) {
747 rel_time = std::chrono::milliseconds ::zero();
749 return try_acquire_for_millis(permits, rel_time);
752 boost::future<void> counting_semaphore::do_release(int32_t permits, int64_t thread_id, int64_t session_id) {
753 auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
754 auto request = codec::semaphore_release_encode(group_id_, object_name_, session_id, thread_id, invocation_uid, permits);
755 return to_void_future(
756 client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke());
759 boost::future<int32_t> counting_semaphore::available_permits() {
760 auto request = codec::semaphore_availablepermits_encode(group_id_, object_name_);
761 return decode<int32_t>(
762 client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke());
765 boost::future<void> counting_semaphore::reduce_permits(int32_t reduction) {
766 util::Preconditions::check_not_negative(reduction,
"Reduction must be non-negative!");
767 if (reduction == 0) {
768 return boost::make_ready_future();
770 return do_change_permits(-reduction);
773 boost::future<void> counting_semaphore::increase_permits(int32_t increase) {
774 util::Preconditions::check_not_negative(increase,
"Increase must be non-negative!");
776 return boost::make_ready_future();
778 return do_change_permits(increase);
781 sessionless_semaphore::sessionless_semaphore(
const std::string &proxy_name, client::spi::ClientContext *context,
782 const raft_group_id &group_id,
const std::string &object_name,
783 internal::session::proxy_session_manager &session_manager)
784 : counting_semaphore(proxy_name, context, group_id, object_name, session_manager) {}
786 boost::future<void> sessionless_semaphore::acquire(int32_t permits) {
787 util::Preconditions::check_positive(permits,
"permits must be positive number.");
789 return to_void_future(do_try_acquire(permits, std::chrono::milliseconds(-1)));
793 sessionless_semaphore::do_try_acquire(int32_t permits, std::chrono::milliseconds timeout_ms) {
794 auto cluster_wide_threadId = get_thread_id();
795 auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
796 auto request = client::protocol::codec::semaphore_acquire_encode(group_id_, object_name_, internal::session::proxy_session_manager::NO_SESSION_ID, cluster_wide_threadId, invocation_uid, permits, timeout_ms.count());
797 return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
798 boost::launch::sync, [=](boost::future<client::protocol::ClientMessage> f) {
800 return f.get().get_first_fixed_sized_field<bool>();
801 }
catch (client::exception::wait_key_cancelled &) {
802 throw client::exception::illegal_state(
"sessionless_semaphore::acquire",
804 "Semaphore[%1%] ] not acquired because the acquire call on the CP group is cancelled, possibly because of another indeterminate call from the same thread.") %
805 object_name_).str());
810 boost::future<bool> sessionless_semaphore::try_acquire_for_millis(int32_t permits, std::chrono::milliseconds timeout) {
811 util::Preconditions::check_positive(permits,
"Permits must be positive!");
813 return do_try_acquire(permits, timeout > std::chrono::milliseconds::zero() ? timeout : std::chrono::milliseconds::zero());
816 boost::future<void> sessionless_semaphore::release(int32_t permits) {
817 util::Preconditions::check_positive(permits,
"Permits must be positive!");
818 auto thread_id = get_thread_id();
819 return do_release(permits, thread_id, internal::session::proxy_session_manager::NO_SESSION_ID);
822 int64_t sessionless_semaphore::get_thread_id() {
823 return session_manager_.get_or_create_unique_thread_id(group_id_);
826 boost::future<int32_t> sessionless_semaphore::drain_permits() {
827 auto cluster_wide_threadId = get_thread_id();
828 auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
829 auto request = client::protocol::codec::semaphore_drain_encode(group_id_, object_name_, internal::session::proxy_session_manager::NO_SESSION_ID, cluster_wide_threadId, invocation_uid);
830 return decode<int32_t>(
831 client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke());
834 boost::future<void> sessionless_semaphore::do_change_permits(int32_t delta) {
835 auto cluster_wide_threadId = get_thread_id();
836 auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
837 auto request = client::protocol::codec::semaphore_change_encode(group_id_, object_name_, internal::session::proxy_session_manager::NO_SESSION_ID, cluster_wide_threadId, invocation_uid, delta);
838 return to_void_future(
839 client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke());
842 session_semaphore::session_semaphore(
const std::string &proxy_name, client::spi::ClientContext *context,
843 const raft_group_id &group_id,
const std::string &object_name,
844 internal::session::proxy_session_manager &session_manager)
845 : counting_semaphore(proxy_name, context, group_id, object_name, session_manager) {}
847 boost::future<void> session_semaphore::acquire(int32_t permits) {
848 return to_void_future(try_acquire_for_millis(permits, std::chrono::milliseconds(-1)));
852 session_semaphore::try_acquire_for_millis(int32_t permits, std::chrono::milliseconds timeout) {
853 util::Preconditions::check_not_negative(permits,
"permits must not be negative number.");
855 auto thread_id = get_thread_id();
856 auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
858 auto do_try_acquire_once = ([=] () {
859 auto start = std::chrono::steady_clock::now();
860 auto use_timeout = timeout >= std::chrono::milliseconds::zero();
861 auto session_id = session_manager_.acquire_session(group_id_, permits);
862 auto request = client::protocol::codec::semaphore_acquire_encode(group_id_, object_name_,
864 thread_id, invocation_uid, permits,
866 return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
867 boost::launch::sync, [=](boost::future<client::protocol::ClientMessage> f) {
869 auto acquired = f.get().get_first_fixed_sized_field<bool>();
871 session_manager_.release_session(group_id_, session_id);
874 return std::make_pair(acquired,
false);
875 }
catch (client::exception::session_expired &) {
876 session_manager_.invalidate_session(group_id_, session_id);
877 if (use_timeout && (timeout - (std::chrono::steady_clock::now() - start) <=
878 std::chrono::milliseconds::zero())) {
879 return std::make_pair(
false,
false);
881 return std::make_pair(
false,
true);
882 }
catch (client::exception::wait_key_cancelled &) {
883 session_manager_.release_session(group_id_, session_id, permits);
885 BOOST_THROW_EXCEPTION(
886 client::exception::illegal_state(
887 "session_semaphore::try_acquire_for_millis", (boost::format(
888 "Semaphore[%1%] not acquired because the acquire call on the CP group is cancelled, possibly because of another indeterminate call from the same thread.") %
889 object_name_).str()));
891 return std::make_pair(
false,
false);
896 return do_try_acquire_once().then(boost::launch::sync, [=](boost::future<std::pair<bool, bool>> f) {
897 auto result = f.get();
898 if (!result.second) {
901 for (; result.second; result = do_try_acquire_once().get());
906 boost::future<void> session_semaphore::release(int32_t permits) {
907 util::Preconditions::check_positive(permits,
"Permits must be positive!");
908 auto session_id = session_manager_.get_session(group_id_);
909 if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
910 throw_illegal_state_exception(
nullptr);
913 auto thread_id = get_thread_id();
914 return do_release(permits, thread_id, session_id).then([=] (boost::future<void> f) {
917 session_manager_.release_session(group_id_, session_id, permits);
918 }
catch (client::exception::session_expired &) {
919 session_manager_.invalidate_session(group_id_, session_id);
920 session_manager_.release_session(group_id_, session_id, permits);
921 throw_illegal_state_exception(std::current_exception());
926 void session_semaphore::throw_illegal_state_exception(std::exception_ptr e) {
927 auto ise = boost::enable_current_exception(
928 client::exception::illegal_state(
"session_semaphore::illegal_state",
929 "No valid session!"));
934 std::rethrow_exception(e);
936 std::throw_with_nested(ise);
940 int64_t session_semaphore::get_thread_id() {
941 return util::get_current_thread_id();
944 boost::future<int32_t> session_semaphore::drain_permits() {
945 auto thread_id = get_thread_id();
946 auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
948 auto do_drain_once = ([=] () {
950 auto request = client::protocol::codec::semaphore_drain_encode(group_id_, object_name_,
952 thread_id, invocation_uid);
953 return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
954 boost::launch::sync, [=](boost::future<client::protocol::ClientMessage> f) {
956 auto count = f.get().get_first_fixed_sized_field<int32_t>();
957 session_manager_.release_session(group_id_, session_id,
960 }
catch (client::exception::session_expired &) {
961 session_manager_.invalidate_session(group_id_, session_id);
967 return do_drain_once().then(boost::launch::sync, [=](boost::future<int32_t> f) {
968 int32_t count = f.get();
972 while ((count = do_drain_once().get()) == -1) {}
977 boost::future<void> session_semaphore::do_change_permits(int32_t delta) {
979 auto thread_id = get_thread_id();
980 auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
982 auto request = client::protocol::codec::semaphore_change_encode(group_id_, object_name_,
984 thread_id, invocation_uid, delta);
985 return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
986 boost::launch::sync, [=](boost::future<client::protocol::ClientMessage> f) {
989 session_manager_.release_session(group_id_, session_id);
990 }
catch (client::exception::session_expired &) {
991 session_manager_.invalidate_session(group_id_, session_id);
992 throw_illegal_state_exception(std::current_exception());
1002 std::size_t seed = 0;
1003 boost::hash_combine(seed, group_id.name);
1004 boost::hash_combine(seed, group_id.seed);
1005 boost::hash_combine(seed, group_id.group_id);
Client-side Raft-based proxy implementation of atomic long.
boost::future< int64_t > get_and_add(int64_t delta)
Atomically adds the given value to the current value.
boost::future< int64_t > get()
Gets the current value.
boost::future< int64_t > decrement_and_get()
Atomically decrements the current value by one.
boost::future< int64_t > get_and_increment()
Atomically increments the current value by one.
boost::future< bool > compare_and_set(int64_t expect, int64_t update)
Atomically sets the value to the given updated value only if the current value.
boost::future< int64_t > get_and_set(int64_t new_value)
Atomically sets the given value and returns the old value.
boost::future< void > set(int64_t new_value)
Atomically sets the given value.
boost::future< int64_t > increment_and_get()
Atomically increments the current value by one.
boost::future< int64_t > get_and_decrement()
Atomically decrements the current value by one.
boost::future< int64_t > add_and_get(int64_t delta)
Atomically adds the given value to the current value.
void release_session(int64_t session_id)
Decrements acquire count of the session.
static constexpr int32_t DRAIN_SESSION_ACQ_COUNT
Since a proxy does not know how many permits will be drained on the Raft group, it uses this constant...