17 #include <boost/algorithm/string.hpp>
19 #include "hazelcast/cp/cp.h"
20 #include "hazelcast/client/spi/ClientContext.h"
21 #include "hazelcast/util/Preconditions.h"
22 #include "hazelcast/client/protocol/codec/codecs.h"
23 #include "hazelcast/client/protocol/ClientMessage.h"
24 #include "hazelcast/client/spi/impl/ClientInvocation.h"
25 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
29 using namespace hazelcast::client::protocol;
30 using namespace hazelcast::client::protocol::codec;
31 using namespace hazelcast::util;
33 raft_proxy_factory::raft_proxy_factory(client::spi::ClientContext &context) : context_(context) {}
35 std::shared_ptr<fenced_lock>
36 raft_proxy_factory::create_fenced_lock(raft_group_id &&group_id,
const std::string &proxy_name,
37 const std::string &object_name) {
39 auto proxy = lock_proxies_.get(proxy_name);
41 if (proxy->get_group_id() != group_id) {
42 lock_proxies_.remove(proxy_name, proxy);
48 proxy = std::make_shared<fenced_lock>(proxy_name, context_, group_id, object_name);
49 auto existing = lock_proxies_.put_if_absent(proxy_name, proxy);
52 }
else if (existing->get_group_id() == group_id) {
56 group_id = get_group_id(proxy_name, object_name).get();
60 std::shared_ptr<counting_semaphore>
61 raft_proxy_factory::create_semaphore(raft_group_id &&group_id,
const std::string &proxy_name,
62 const std::string &object_name) {
63 auto request = client::protocol::codec::semaphore_getsemaphoretype_encode(proxy_name);
64 auto is_sessionless = client::spi::impl::ClientInvocation::create(context_, request,
65 object_name)->invoke().get().get_first_fixed_sized_field<
bool>();
67 return std::shared_ptr<counting_semaphore>(
68 new sessionless_semaphore(proxy_name, &context_, group_id, object_name,
69 context_.get_proxy_session_manager()));
71 return std::shared_ptr<counting_semaphore>(
72 new session_semaphore(proxy_name, &context_, group_id, object_name,
73 context_.get_proxy_session_manager()));
77 std::string raft_proxy_factory::without_default_group_name(
const std::string &n) {
80 auto index = name.find(
'@');
81 if (index == std::string::npos) {
85 Preconditions::check_true(name.find(
'@', index + 1) == std::string::npos,
86 "Custom group name must be specified at most once");
88 auto group_name = name.substr(index + 1);
89 boost::trim(group_name);
90 if (group_name == DEFAULT_GROUP_NAME) {
91 return name.substr(0, index);
96 std::string raft_proxy_factory::object_name_for_proxy(
const std::string &name) {
97 auto index = name.find(
'@');
98 if (index == std::string::npos) {
102 Preconditions::check_true(index < (name.size() - 1),
103 "Object name cannot be empty string");
104 Preconditions::check_true(name.find(
'@', index + 1) == std::string::npos,
105 "Custom CP group name must be specified at most once");
107 auto object_name = name.substr(0, index);
108 boost::trim(object_name);
109 Preconditions::check_true(object_name.size() > 0,
110 "Object name cannot be empty string");
114 boost::future<raft_group_id> raft_proxy_factory::get_group_id(
const std::string &proxy_name,
const std::string &object_name) {
115 auto request = cpgroup_createcpgroup_encode(proxy_name);
116 return client::spi::impl::ClientInvocation::create(context_, request, object_name)->invoke().then(
117 boost::launch::sync, [](boost::future<client::protocol::ClientMessage> f) {
118 return *f.get().get_first_var_sized_field<raft_group_id>();
122 cp_subsystem::cp_subsystem(client::spi::ClientContext &context) : context_(context), proxy_factory_(context) {
125 boost::future<std::shared_ptr<atomic_long>> cp_subsystem::get_atomic_long(
const std::string &name) {
126 return proxy_factory_.create_proxy<
atomic_long>(name);
129 boost::future<std::shared_ptr<atomic_reference>> cp_subsystem::get_atomic_reference(
const std::string &name) {
130 return proxy_factory_.create_proxy<atomic_reference>(name);
133 boost::future<std::shared_ptr<latch>> cp_subsystem::get_latch(
const std::string &name) {
134 return proxy_factory_.create_proxy<latch>(name);
137 boost::future<std::shared_ptr<fenced_lock>> cp_subsystem::get_lock(
const std::string &name) {
138 return proxy_factory_.create_proxy<fenced_lock>(name);
141 boost::future<std::shared_ptr<counting_semaphore>> cp_subsystem::get_semaphore(
const std::string &name) {
142 return proxy_factory_.create_proxy<counting_semaphore>(name);
145 cp_proxy::cp_proxy(
const std::string &service_name,
const std::string &proxy_name,
146 client::spi::ClientContext *context,
147 const raft_group_id &group_id,
const std::string &object_name) : ProxyImpl(service_name,
151 object_name_(object_name) {}
153 void cp_proxy::on_destroy() {
154 auto request = cpgroup_destroycpobject_encode(group_id_, get_service_name(), object_name_);
155 invoke(request).get();
158 const raft_group_id &cp_proxy::get_group_id()
const {
162 atomic_long::atomic_long(
const std::string &name, client::spi::ClientContext &context,
163 const raft_group_id &group_id,
const std::string &object_name)
164 : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name) {}
167 auto request = atomiclong_addandget_encode(group_id_, object_name_, delta);
168 return invoke_and_get_future<int64_t>(request);
172 auto request = atomiclong_compareandset_encode(group_id_, object_name_, expect, update);
173 return invoke_and_get_future<bool>(request);
185 auto request = atomiclong_get_encode(group_id_, object_name_);
186 return invoke_and_get_future<int64_t>(request);
190 auto request = atomiclong_getandadd_encode(group_id_, object_name_, delta);
191 return invoke_and_get_future<int64_t>(request);
195 auto request = atomiclong_getandset_encode(group_id_, object_name_, new_value);
196 return invoke_and_get_future<int64_t>(request);
211 boost::future<int64_t> atomic_long::alter_data(client::serialization::pimpl::data &function_data,
212 alter_result_type result_type) {
213 auto request = atomiclong_alter_encode(group_id_, object_name_, function_data,
214 static_cast<int32_t
>(result_type));
215 return invoke_and_get_future<int64_t>(request);
218 boost::future<boost::optional<client::serialization::pimpl::data>>
219 atomic_long::apply_data(client::serialization::pimpl::data &function_data) {
220 auto request = atomiclong_apply_encode(group_id_, object_name_, function_data);
221 return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
224 atomic_reference::atomic_reference(
const std::string &name, client::spi::ClientContext &context,
225 const raft_group_id &group_id,
const std::string &object_name)
226 : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name) {}
228 boost::future<boost::optional<client::serialization::pimpl::data>> atomic_reference::get_data() {
229 auto request = atomicref_get_encode(group_id_, object_name_);
230 return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
233 boost::future<boost::optional<client::serialization::pimpl::data>>
234 atomic_reference::set_data(
const client::serialization::pimpl::data &new_value_data) {
235 auto request = atomicref_set_encode(group_id_, object_name_, &new_value_data,
false);
236 return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
239 boost::future<boost::optional<client::serialization::pimpl::data>>
240 atomic_reference::get_and_set_data(
const client::serialization::pimpl::data &new_value_data) {
241 auto request = atomicref_set_encode(group_id_, object_name_, &new_value_data,
true);
242 return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
246 atomic_reference::compare_and_set_data(
const client::serialization::pimpl::data &expect_data,
247 const client::serialization::pimpl::data &update_data) {
248 auto request = atomicref_compareandset_encode(group_id_, object_name_, &expect_data, &update_data);
249 return invoke_and_get_future<bool>(request);
252 boost::future<bool> atomic_reference::contains_data(
const client::serialization::pimpl::data &value_data) {
253 auto request = atomicref_contains_encode(group_id_, object_name_, &value_data);
254 return invoke_and_get_future<bool>(request);
257 boost::future<void> atomic_reference::alter_data(
const client::serialization::pimpl::data &function_data) {
258 return to_void_future(invoke_apply(function_data, return_value_type::NO_VALUE,
true));
261 boost::future<boost::optional<client::serialization::pimpl::data>>
262 atomic_reference::alter_and_get_data(
const client::serialization::pimpl::data &function_data) {
263 return invoke_apply(function_data, return_value_type::NEW,
true);
266 boost::future<boost::optional<client::serialization::pimpl::data>>
267 atomic_reference::get_and_alter_data(
const client::serialization::pimpl::data &function_data) {
268 return invoke_apply(function_data, return_value_type::OLD,
true);
271 boost::future<boost::optional<client::serialization::pimpl::data>>
272 atomic_reference::apply_data(
const client::serialization::pimpl::data &function_data) {
273 return invoke_apply(function_data, return_value_type::NEW,
false);
276 boost::future<bool> atomic_reference::is_null() {
277 return contains(
static_cast<byte *
>(
nullptr));
280 boost::future<void> atomic_reference::clear() {
281 return to_void_future(set(
static_cast<byte *
>(
nullptr)));
284 boost::future<boost::optional<client::serialization::pimpl::data>>
285 atomic_reference::invoke_apply(
const client::serialization::pimpl::data function_data,
286 return_value_type return_type,
bool alter) {
287 auto request = atomicref_apply_encode(group_id_, object_name_, function_data,
288 static_cast<int32_t
>(return_type), alter);
289 return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
292 latch::latch(
const std::string &name, client::spi::ClientContext &context,
const raft_group_id &group_id,
293 const std::string &object_name) : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name) {}
295 boost::future<bool> latch::try_set_count(int32_t count) {
296 util::Preconditions::check_positive(count,
"count must be positive!");
298 auto request = countdownlatch_trysetcount_encode(group_id_, object_name_, count);
299 return invoke_and_get_future<bool>(request);
302 boost::future<int32_t> latch::get_count() {
303 auto request = countdownlatch_getcount_encode(group_id_, object_name_);
304 return invoke_and_get_future<int32_t>(request);
307 boost::future<void> latch::count_down() {
308 auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
309 return get_round().then(boost::launch::sync, [=](boost::future<int32_t> f) {
310 auto round = f.get();
313 count_down(round, invocation_uid);
315 }
catch (client::exception::operation_timeout &) {
323 boost::future<bool> latch::try_wait() {
324 return get_count().then(boost::launch::sync, [](boost::future<int32_t> f) {
329 boost::future<int32_t> latch::get_round() {
330 auto request = countdownlatch_getround_encode(group_id_, object_name_);
331 return invoke_and_get_future<int32_t>(request);
334 void latch::count_down(
int round, boost::uuids::uuid invocation_uid) {
335 auto request = countdownlatch_countdown_encode(group_id_, object_name_, invocation_uid, round);
336 invoke(request).get();
339 boost::future<void> latch::wait() {
340 return to_void_future(wait_for(std::chrono::milliseconds(INT64_MAX)));
343 boost::future<std::cv_status> latch::wait_for(std::chrono::milliseconds timeout) {
344 auto timeout_millis = std::max<int64_t>(0, timeout.count());
345 auto invoation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
346 auto request = countdownlatch_await_encode(group_id_, object_name_, invoation_uid, timeout_millis);
347 return invoke_and_get_future<bool>(request).then(boost::launch::sync, [](boost::future<bool> f) {
348 return f.get() ? std::cv_status::no_timeout : std::cv_status::timeout;
352 bool raft_group_id::operator==(
const raft_group_id &rhs)
const {
353 return name == rhs.name &&
355 group_id == rhs.group_id;
358 bool raft_group_id::operator!=(
const raft_group_id &rhs)
const {
359 return !(rhs == *
this);
362 constexpr int64_t fenced_lock::INVALID_FENCE;
364 fenced_lock::fenced_lock(
const std::string &name, client::spi::ClientContext &context,
365 const raft_group_id &group_id,
366 const std::string &object_name) : session_aware_proxy(SERVICE_NAME, name, &context,
367 group_id, object_name,
368 context.get_proxy_session_manager()) {}
370 boost::future<void> fenced_lock::lock() {
371 return to_void_future(lock_and_get_fence());
374 boost::future<int64_t> fenced_lock::lock_and_get_fence() {
375 auto thread_id = util::get_current_thread_id();
376 auto invocation_uid = get_context().random_uuid();
378 auto do_lock_once = [=] () {
379 auto session_id = session_manager_.acquire_session(group_id_);
380 verify_locked_session_id_if_present(thread_id, session_id,
true);
381 return do_lock(session_id, thread_id, invocation_uid).then(boost::launch::sync,
382 [=](boost::future<int64_t> f) {
384 auto fence = f.get();
385 if (fence != INVALID_FENCE) {
386 locked_session_ids_.put(
388 std::make_shared<int64_t>(
392 BOOST_THROW_EXCEPTION(
393 client::exception::lock_acquire_limit_reached(
394 "fenced_lock::lock_and_get_fence", (
396 "Lock [%1%] reentrant lock limit is already reached!") %
397 object_name_).str()));
398 }
catch (client::exception::session_expired &) {
399 invalidate_session(session_id);
400 verify_no_locked_session_id_present(thread_id);
401 return INVALID_FENCE;
402 }
catch (client::exception::wait_key_cancelled &) {
403 release_session(session_id);
404 BOOST_THROW_EXCEPTION(client::exception::lock_acquire_limit_reached(
405 "fenced_lock::lock_and_get_fence", (boost::format(
406 "Lock [%1%] not acquired because the lock call on the CP group is cancelled, possibly because of another indeterminate call from the same thread.") %
407 object_name_).str()));
409 release_session(session_id);
415 return do_lock_once().then(boost::launch::sync, [=](boost::future<int64_t> f) {
416 auto result = f.get();
417 if (result != INVALID_FENCE) {
421 for (result = do_lock_once().get(); result == INVALID_FENCE;) {}
426 void fenced_lock::verify_locked_session_id_if_present(int64_t thread_id, int64_t session_id,
427 bool should_release) {
428 auto locked_session_id = locked_session_ids_.get(thread_id);
429 if (locked_session_id && *locked_session_id != session_id) {
430 locked_session_ids_.remove(thread_id);
431 if (should_release) {
432 release_session(session_id);
435 throw_lock_ownership_lost(*locked_session_id);
439 void fenced_lock::verify_no_locked_session_id_present(int64_t thread_id) {
440 auto locked_session_id = locked_session_ids_.remove(thread_id);
441 if (locked_session_id) {
442 locked_session_ids_.remove(thread_id);
443 throw_lock_ownership_lost(*locked_session_id);
447 void fenced_lock::throw_lock_ownership_lost(int64_t session_id)
const {
448 BOOST_THROW_EXCEPTION(client::exception::lock_ownership_lost(
"fenced_lock", (boost::format(
449 "Current thread is not owner of the Lock[%1%] because its Session[%2%] is closed by server!") %
450 get_name() % session_id).str()));
453 void fenced_lock::throw_illegal_monitor_state()
const {
454 BOOST_THROW_EXCEPTION(client::exception::illegal_monitor_state(
"fenced_lock", (boost::format(
455 "Current thread is not owner of the Lock[%1%]") %get_name()).str()));
458 boost::future<int64_t>
459 fenced_lock::do_lock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid) {
460 auto request = client::protocol::codec::fencedlock_lock_encode(group_id_, object_name_, session_id, thread_id, invocation_uid);
461 return invoke_and_get_future<int64_t>(request);
464 boost::future<int64_t>
465 fenced_lock::do_try_lock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid,
466 std::chrono::milliseconds timeout) {
467 auto request = client::protocol::codec::fencedlock_trylock_encode(group_id_, object_name_, session_id,
468 thread_id, invocation_uid,
469 std::chrono::duration_cast<std::chrono::milliseconds>(
471 return invoke_and_get_future<int64_t>(request);
475 fenced_lock::do_unlock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid) {
476 auto request = client::protocol::codec::fencedlock_unlock_encode(group_id_, object_name_, session_id, thread_id, invocation_uid);
477 return invoke_and_get_future<bool>(request);
480 boost::future<fenced_lock::lock_ownership_state> fenced_lock::do_get_lock_ownership_state(){
481 auto request = client::protocol::codec::fencedlock_getlockownership_encode(group_id_, object_name_);
482 return invoke(request).then(boost::launch::sync, [](boost::future<client::protocol::ClientMessage> f) {
484 fenced_lock::lock_ownership_state state;
485 state.fence = msg.get_first_fixed_sized_field<int64_t>();
486 state.lock_count = msg.get<int32_t>();
487 state.session_id = msg.get<int64_t>();
488 state.thread_id = msg.get<int64_t>();
493 void fenced_lock::invalidate_session(int64_t session_id) {
494 session_manager_.invalidate_session(group_id_, session_id);
497 boost::future<bool> fenced_lock::try_lock() {
498 return try_lock_and_get_fence().then(boost::launch::sync, [](boost::future<int64_t> f) {
499 return f.get() != INVALID_FENCE;
503 boost::future<bool> fenced_lock::try_lock(std::chrono::milliseconds timeout) {
504 return try_lock_and_get_fence(timeout).then(boost::launch::sync, [](boost::future<int64_t> f) {
505 return f.get() != INVALID_FENCE;
509 boost::future<int64_t> fenced_lock::try_lock_and_get_fence() {
510 return try_lock_and_get_fence(std::chrono::milliseconds(0));
513 boost::future<int64_t>
514 fenced_lock::try_lock_and_get_fence(std::chrono::milliseconds timeout) {
515 auto thread_id = util::get_current_thread_id();
516 auto invocation_uid = get_context().random_uuid();
518 auto do_try_lock_once = [=] () {
519 using namespace std::chrono;
520 auto start = steady_clock::now();
521 auto session_id = session_manager_.acquire_session(group_id_);
522 verify_locked_session_id_if_present(thread_id, session_id,
true);
523 return do_try_lock(session_id, thread_id, invocation_uid, timeout).then(boost::launch::sync,
524 [=](boost::future<int64_t> f) {
526 auto fence = f.get();
529 locked_session_ids_.put(
531 std::make_shared<int64_t>(
533 return std::make_pair(
540 return std::make_pair(
543 client::exception::session_expired &) {
544 invalidate_session(session_id);
545 verify_no_locked_session_id_present(thread_id);
546 auto timeout_left = timeout - (steady_clock::now() - start);
547 if (timeout_left.count() <= 0) {
548 return std::make_pair(INVALID_FENCE,
false);
550 return std::make_pair(INVALID_FENCE,
false);
551 }
catch (client::exception::wait_key_cancelled &) {
552 release_session(session_id);
553 return std::make_pair(INVALID_FENCE,
false);
555 release_session(session_id);
561 return do_try_lock_once().then(boost::launch::sync, [=](boost::future<std::pair<int64_t, bool>> f) {
562 auto result = f.get();
563 if (!result.second) {
567 for (result = do_try_lock_once().get(); result.second;) {}
572 boost::future<void> fenced_lock::unlock() {
573 auto thread_id = util::get_current_thread_id();
574 int64_t session_id = session_manager_.get_session(group_id_);
577 verify_locked_session_id_if_present(thread_id, session_id,
false);
578 if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
579 locked_session_ids_.remove(thread_id);
580 throw_illegal_monitor_state();
583 return do_unlock(session_id, thread_id, get_context().random_uuid()).then(boost::launch::sync,
584 [=](boost::future<bool> f) {
586 auto still_locked_by_current_thread = f.get();
587 if (still_locked_by_current_thread) {
588 locked_session_ids_.put(
590 std::make_shared<int64_t>(
593 locked_session_ids_.remove(
600 client::exception::session_expired &) {
601 invalidate_session(session_id);
602 locked_session_ids_.remove(thread_id);
604 throw_lock_ownership_lost(session_id);
605 }
catch (client::exception::illegal_monitor_state &) {
606 locked_session_ids_.remove(thread_id);
612 boost::future<int64_t> fenced_lock::get_fence() {
613 auto thread_id = util::get_current_thread_id();
614 int64_t session_id = session_manager_.get_session(group_id_);
617 verify_locked_session_id_if_present(thread_id, session_id,
false);
618 if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
619 locked_session_ids_.remove(thread_id);
620 throw_illegal_monitor_state();
623 return do_get_lock_ownership_state().then(boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
624 auto ownership = f.get();
625 if (ownership.is_locked_by(session_id, thread_id)) {
626 locked_session_ids_.put(thread_id, std::make_shared<int64_t>(session_id));
627 return ownership.fence;
630 verify_no_locked_session_id_present(thread_id);
631 throw_illegal_monitor_state();
632 return INVALID_FENCE;
636 boost::future<bool> fenced_lock::is_locked() {
637 auto thread_id = util::get_current_thread_id();
638 int64_t session_id = session_manager_.get_session(group_id_);
640 verify_locked_session_id_if_present(thread_id, session_id,
false);
642 return do_get_lock_ownership_state().then(boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
643 auto ownership = f.get();
644 if (ownership.is_locked_by(session_id, thread_id)) {
645 locked_session_ids_.put(thread_id, std::make_shared<int64_t>(session_id));
649 verify_no_locked_session_id_present(thread_id);
650 return ownership.is_locked();
654 boost::future<bool> fenced_lock::is_locked_by_current_thread() {
655 auto thread_id = util::get_current_thread_id();
656 int64_t session_id = session_manager_.get_session(group_id_);
658 verify_locked_session_id_if_present(thread_id, session_id,
false);
660 return do_get_lock_ownership_state().then(boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
661 auto ownership = f.get();
662 auto locked_by_current_thread = ownership.is_locked_by(session_id, thread_id);
663 if (locked_by_current_thread) {
664 locked_session_ids_.put(thread_id, std::make_shared<int64_t>(session_id));
666 verify_no_locked_session_id_present(thread_id);
669 return locked_by_current_thread;
673 boost::future<int32_t> fenced_lock::get_lock_count() {
674 auto thread_id = util::get_current_thread_id();
675 int64_t session_id = session_manager_.get_session(group_id_);
677 verify_locked_session_id_if_present(thread_id, session_id,
false);
679 return do_get_lock_ownership_state().then(boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
680 auto ownership = f.get();
681 if (ownership.is_locked_by(session_id, thread_id)) {
682 locked_session_ids_.put(thread_id, std::make_shared<int64_t>(session_id));
684 verify_no_locked_session_id_present(thread_id);
687 return ownership.lock_count;
691 const raft_group_id &fenced_lock::get_group_id() {
695 bool operator==(
const fenced_lock &lhs,
const fenced_lock &rhs) {
696 return lhs.get_service_name() == rhs.get_service_name() && lhs.get_name() == rhs.get_name();
699 void fenced_lock::post_destroy() {
700 ClientProxy::post_destroy();
701 locked_session_ids_.clear();
704 session_aware_proxy::session_aware_proxy(
const std::string &service_name,
const std::string &proxy_name,
705 client::spi::ClientContext *context,
const raft_group_id &group_id,
706 const std::string &object_name,
707 internal::session::proxy_session_manager &session_manager) : cp_proxy(
708 service_name, proxy_name, context, group_id, object_name), session_manager_(session_manager) {}
711 session_manager_.release_session(group_id_, session_id);
714 bool fenced_lock::lock_ownership_state::is_locked_by(int64_t session, int64_t thread) {
715 return is_locked() && session_id == session && thread_id == thread;
718 bool fenced_lock::lock_ownership_state::is_locked() {
719 return fence != INVALID_FENCE;
722 counting_semaphore::counting_semaphore(
const std::string &proxy_name, client::spi::ClientContext *context,
723 const raft_group_id &group_id,
const std::string &object_name,
724 internal::session::proxy_session_manager &session_manager)
725 : session_aware_proxy(SERVICE_NAME,
728 object_name, session_manager) {}
730 boost::future<bool> counting_semaphore::init(int32_t permits) {
731 util::Preconditions::check_not_negative(permits,
"Permits must be non-negative!");
733 auto request = client::protocol::codec::semaphore_init_encode(group_id_, object_name_, permits);
734 return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
735 boost::launch::sync, [](boost::future<client::protocol::ClientMessage> f) {
736 return f.get().get_first_fixed_sized_field<bool>();
740 boost::future<bool> counting_semaphore::try_acquire(int32_t permits) {
741 return try_acquire_for(std::chrono::milliseconds::zero(), permits);
744 boost::future<bool> counting_semaphore::try_acquire_for(std::chrono::milliseconds rel_time, int32_t permits) {
745 if (rel_time < std::chrono::milliseconds::zero()) {
746 rel_time = std::chrono::milliseconds ::zero();
748 return try_acquire_for_millis(permits, rel_time);
751 boost::future<void> counting_semaphore::do_release(int32_t permits, int64_t thread_id, int64_t session_id) {
752 auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
753 auto request = codec::semaphore_release_encode(group_id_, object_name_, session_id, thread_id, invocation_uid, permits);
754 return to_void_future(
755 client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke());
758 boost::future<int32_t> counting_semaphore::available_permits() {
759 auto request = codec::semaphore_availablepermits_encode(group_id_, object_name_);
760 return decode<int32_t>(
761 client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke());
764 boost::future<void> counting_semaphore::reduce_permits(int32_t reduction) {
765 util::Preconditions::check_not_negative(reduction,
"Reduction must be non-negative!");
766 if (reduction == 0) {
767 return boost::make_ready_future();
769 return do_change_permits(-reduction);
772 boost::future<void> counting_semaphore::increase_permits(int32_t increase) {
773 util::Preconditions::check_not_negative(increase,
"Increase must be non-negative!");
775 return boost::make_ready_future();
777 return do_change_permits(increase);
780 sessionless_semaphore::sessionless_semaphore(
const std::string &proxy_name, client::spi::ClientContext *context,
781 const raft_group_id &group_id,
const std::string &object_name,
782 internal::session::proxy_session_manager &session_manager)
783 : counting_semaphore(proxy_name, context, group_id, object_name, session_manager) {}
785 boost::future<void> sessionless_semaphore::acquire(int32_t permits) {
786 util::Preconditions::check_positive(permits,
"permits must be positive number.");
788 return to_void_future(do_try_acquire(permits, std::chrono::milliseconds(-1)));
792 sessionless_semaphore::do_try_acquire(int32_t permits, std::chrono::milliseconds timeout_ms) {
793 auto cluster_wide_threadId = get_thread_id();
794 auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
795 auto request = client::protocol::codec::semaphore_acquire_encode(group_id_, object_name_, internal::session::proxy_session_manager::NO_SESSION_ID, cluster_wide_threadId, invocation_uid, permits, timeout_ms.count());
796 return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
797 boost::launch::sync, [=](boost::future<client::protocol::ClientMessage> f) {
799 return f.get().get_first_fixed_sized_field<bool>();
800 }
catch (client::exception::wait_key_cancelled &) {
801 throw client::exception::illegal_state(
"sessionless_semaphore::acquire",
803 "Semaphore[%1%] ] not acquired because the acquire call on the CP group is cancelled, possibly because of another indeterminate call from the same thread.") %
804 object_name_).str());
809 boost::future<bool> sessionless_semaphore::try_acquire_for_millis(int32_t permits, std::chrono::milliseconds timeout) {
810 util::Preconditions::check_positive(permits,
"Permits must be positive!");
812 return do_try_acquire(permits, timeout > std::chrono::milliseconds::zero() ? timeout : std::chrono::milliseconds::zero());
815 boost::future<void> sessionless_semaphore::release(int32_t permits) {
816 util::Preconditions::check_positive(permits,
"Permits must be positive!");
817 auto thread_id = get_thread_id();
818 return do_release(permits, thread_id, internal::session::proxy_session_manager::NO_SESSION_ID);
821 int64_t sessionless_semaphore::get_thread_id() {
822 return session_manager_.get_or_create_unique_thread_id(group_id_);
825 boost::future<int32_t> sessionless_semaphore::drain_permits() {
826 auto cluster_wide_threadId = get_thread_id();
827 auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
828 auto request = client::protocol::codec::semaphore_drain_encode(group_id_, object_name_, internal::session::proxy_session_manager::NO_SESSION_ID, cluster_wide_threadId, invocation_uid);
829 return decode<int32_t>(
830 client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke());
833 boost::future<void> sessionless_semaphore::do_change_permits(int32_t delta) {
834 auto cluster_wide_threadId = get_thread_id();
835 auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
836 auto request = client::protocol::codec::semaphore_change_encode(group_id_, object_name_, internal::session::proxy_session_manager::NO_SESSION_ID, cluster_wide_threadId, invocation_uid, delta);
837 return to_void_future(
838 client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke());
841 session_semaphore::session_semaphore(
const std::string &proxy_name, client::spi::ClientContext *context,
842 const raft_group_id &group_id,
const std::string &object_name,
843 internal::session::proxy_session_manager &session_manager)
844 : counting_semaphore(proxy_name, context, group_id, object_name, session_manager) {}
846 boost::future<void> session_semaphore::acquire(int32_t permits) {
847 return to_void_future(try_acquire_for_millis(permits, std::chrono::milliseconds(-1)));
851 session_semaphore::try_acquire_for_millis(int32_t permits, std::chrono::milliseconds timeout) {
852 util::Preconditions::check_not_negative(permits,
"permits must not be negative number.");
854 auto thread_id = get_thread_id();
855 auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
857 auto do_try_acquire_once = ([=] () {
858 auto start = std::chrono::steady_clock::now();
859 auto use_timeout = timeout >= std::chrono::milliseconds::zero();
860 auto session_id = session_manager_.acquire_session(group_id_, permits);
861 auto request = client::protocol::codec::semaphore_acquire_encode(group_id_, object_name_,
863 thread_id, invocation_uid, permits,
865 return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
866 boost::launch::sync, [=](boost::future<client::protocol::ClientMessage> f) {
868 auto acquired = f.get().get_first_fixed_sized_field<bool>();
870 session_manager_.release_session(group_id_, session_id);
873 return std::make_pair(acquired,
false);
874 }
catch (client::exception::session_expired &) {
875 session_manager_.invalidate_session(group_id_, session_id);
876 if (use_timeout && (timeout - (std::chrono::steady_clock::now() - start) <=
877 std::chrono::milliseconds::zero())) {
878 return std::make_pair(
false,
false);
880 return std::make_pair(
false,
true);
881 }
catch (client::exception::wait_key_cancelled &) {
882 session_manager_.release_session(group_id_, session_id, permits);
884 BOOST_THROW_EXCEPTION(
885 client::exception::illegal_state(
886 "session_semaphore::try_acquire_for_millis", (boost::format(
887 "Semaphore[%1%] not acquired because the acquire call on the CP group is cancelled, possibly because of another indeterminate call from the same thread.") %
888 object_name_).str()));
890 return std::make_pair(
false,
false);
895 return do_try_acquire_once().then(boost::launch::sync, [=](boost::future<std::pair<bool, bool>> f) {
896 auto result = f.get();
897 if (!result.second) {
900 for (; result.second; result = do_try_acquire_once().get());
905 boost::future<void> session_semaphore::release(int32_t permits) {
906 util::Preconditions::check_positive(permits,
"Permits must be positive!");
907 auto session_id = session_manager_.get_session(group_id_);
908 if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
909 throw_illegal_state_exception(
nullptr);
912 auto thread_id = get_thread_id();
913 return do_release(permits, thread_id, session_id).then([=] (boost::future<void> f) {
916 session_manager_.release_session(group_id_, session_id, permits);
917 }
catch (client::exception::session_expired &) {
918 session_manager_.invalidate_session(group_id_, session_id);
919 session_manager_.release_session(group_id_, session_id, permits);
920 throw_illegal_state_exception(std::current_exception());
925 void session_semaphore::throw_illegal_state_exception(std::exception_ptr e) {
926 auto ise = boost::enable_current_exception(
927 client::exception::illegal_state(
"session_semaphore::illegal_state",
928 "No valid session!"));
933 std::rethrow_exception(e);
935 std::throw_with_nested(ise);
939 int64_t session_semaphore::get_thread_id() {
940 return util::get_current_thread_id();
943 boost::future<int32_t> session_semaphore::drain_permits() {
944 auto thread_id = get_thread_id();
945 auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
947 auto do_drain_once = ([=] () {
949 auto request = client::protocol::codec::semaphore_drain_encode(group_id_, object_name_,
951 thread_id, invocation_uid);
952 return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
953 boost::launch::sync, [=](boost::future<client::protocol::ClientMessage> f) {
955 auto count = f.get().get_first_fixed_sized_field<int32_t>();
956 session_manager_.release_session(group_id_, session_id,
959 }
catch (client::exception::session_expired &) {
960 session_manager_.invalidate_session(group_id_, session_id);
966 return do_drain_once().then(boost::launch::sync, [=](boost::future<int32_t> f) {
967 int32_t count = f.get();
971 while ((count = do_drain_once().get()) == -1) {}
976 boost::future<void> session_semaphore::do_change_permits(int32_t delta) {
978 auto thread_id = get_thread_id();
979 auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
981 auto request = client::protocol::codec::semaphore_change_encode(group_id_, object_name_,
983 thread_id, invocation_uid, delta);
984 return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
985 boost::launch::sync, [=](boost::future<client::protocol::ClientMessage> f) {
988 session_manager_.release_session(group_id_, session_id);
989 }
catch (client::exception::session_expired &) {
990 session_manager_.invalidate_session(group_id_, session_id);
991 throw_illegal_state_exception(std::current_exception());
1001 std::size_t seed = 0;
1002 boost::hash_combine(seed, group_id.name);
1003 boost::hash_combine(seed, group_id.seed);
1004 boost::hash_combine(seed, group_id.group_id);
Client-side Raft-based proxy implementation of atomic long.
boost::future< int64_t > get_and_add(int64_t delta)
Atomically adds the given value to the current value.
boost::future< int64_t > get()
Gets the current value.
boost::future< int64_t > decrement_and_get()
Atomically decrements the current value by one.
boost::future< int64_t > get_and_increment()
Atomically increments the current value by one.
boost::future< bool > compare_and_set(int64_t expect, int64_t update)
Atomically sets the value to the given updated value only if the current value.
boost::future< int64_t > get_and_set(int64_t new_value)
Atomically sets the given value and returns the old value.
boost::future< void > set(int64_t new_value)
Atomically sets the given value.
boost::future< int64_t > increment_and_get()
Atomically increments the current value by one.
boost::future< int64_t > get_and_decrement()
Atomically decrements the current value by one.
boost::future< int64_t > add_and_get(int64_t delta)
Atomically adds the given value to the current value.
void release_session(int64_t session_id)
Decrements acquire count of the session.
static constexpr int32_t DRAIN_SESSION_ACQ_COUNT
Since a proxy does not know how many permits will be drained on the Raft group, it uses this constant...