Hazelcast C++ Client
Hazelcast C++ Client Library
Loading...
Searching...
No Matches
cp.cpp
1/*
2 * Copyright (c) 2008-2025, Hazelcast, Inc. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <algorithm>
18#include <boost/algorithm/string.hpp>
19
20#include "hazelcast/cp/cp.h"
21#include "hazelcast/client/spi/ClientContext.h"
22#include "hazelcast/util/Preconditions.h"
23#include "hazelcast/client/protocol/codec/codecs.h"
24#include "hazelcast/client/protocol/ClientMessage.h"
25#include "hazelcast/client/spi/impl/ClientInvocation.h"
26#include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
27
28namespace hazelcast {
29namespace cp {
30using namespace hazelcast::client::protocol;
31using namespace hazelcast::client::protocol::codec;
32using namespace hazelcast::util;
33
34raft_proxy_factory::raft_proxy_factory(client::spi::ClientContext& context)
35 : context_(context)
36{}
37
38std::shared_ptr<fenced_lock>
39raft_proxy_factory::create_fenced_lock(raft_group_id&& group_id,
40 const std::string& proxy_name,
41 const std::string& object_name)
42{
43 while (true) {
44 auto proxy = lock_proxies_.get(proxy_name);
45 if (proxy) {
46 if (proxy->get_group_id() != group_id) {
47 lock_proxies_.remove(proxy_name, proxy);
48 } else {
49 return proxy;
50 }
51 }
52
53 proxy = std::make_shared<fenced_lock>(
54 proxy_name, context_, group_id, object_name);
55 auto existing = lock_proxies_.put_if_absent(proxy_name, proxy);
56 if (!existing) {
57 return proxy;
58 } else if (existing->get_group_id() == group_id) {
59 return existing;
60 }
61
62 group_id = get_group_id(proxy_name, object_name).get();
63 }
64}
65
66std::shared_ptr<counting_semaphore>
67raft_proxy_factory::create_semaphore(raft_group_id&& group_id,
68 const std::string& proxy_name,
69 const std::string& object_name)
70{
71 auto request =
72 client::protocol::codec::semaphore_getsemaphoretype_encode(proxy_name);
73 auto is_sessionless = client::spi::impl::ClientInvocation::create(
74 context_, request, object_name)
75 ->invoke()
76 .get()
77 .get_first_fixed_sized_field<bool>();
78 if (is_sessionless) {
79 return std::shared_ptr<counting_semaphore>(
80 new sessionless_semaphore(proxy_name,
81 &context_,
82 group_id,
83 object_name,
84 context_.get_proxy_session_manager()));
85 } else {
86 return std::shared_ptr<counting_semaphore>(
87 new session_semaphore(proxy_name,
88 &context_,
89 group_id,
90 object_name,
91 context_.get_proxy_session_manager()));
92 }
93}
94
95std::string
96raft_proxy_factory::without_default_group_name(const std::string& n)
97{
98 std::string name = n;
99 boost::trim(name);
100 auto index = name.find('@');
101 if (index == std::string::npos) {
102 return name;
103 }
104
105 Preconditions::check_true(
106 name.find('@', index + 1) == std::string::npos,
107 "Custom group name must be specified at most once");
108
109 auto group_name = name.substr(index + 1);
110 boost::trim(group_name);
111 if (group_name == DEFAULT_GROUP_NAME) {
112 return name.substr(0, index);
113 }
114 return name;
115}
116
117std::string
118raft_proxy_factory::object_name_for_proxy(const std::string& name)
119{
120 auto index = name.find('@');
121 if (index == std::string::npos) {
122 return name;
123 }
124
125 Preconditions::check_true(index < (name.size() - 1),
126 "Object name cannot be empty string");
127 Preconditions::check_true(
128 name.find('@', index + 1) == std::string::npos,
129 "Custom CP group name must be specified at most once");
130
131 auto object_name = name.substr(0, index);
132 boost::trim(object_name);
133 Preconditions::check_true(object_name.size() > 0,
134 "Object name cannot be empty string");
135 return object_name;
136}
137
138boost::future<raft_group_id>
139raft_proxy_factory::get_group_id(const std::string& proxy_name,
140 const std::string& object_name)
141{
142 auto request = cpgroup_createcpgroup_encode(proxy_name);
143 return client::spi::impl::ClientInvocation::create(
144 context_, request, object_name)
145 ->invoke()
146 .then(boost::launch::sync,
147 [](boost::future<client::protocol::ClientMessage> f) {
148 return *f.get().get_first_var_sized_field<raft_group_id>();
149 });
150}
151
152cp_subsystem::cp_subsystem(client::spi::ClientContext& context)
153 : context_(context)
154 , proxy_factory_(context)
155{}
156
157boost::future<std::shared_ptr<atomic_long>>
158cp_subsystem::get_atomic_long(const std::string& name)
159{
160 return proxy_factory_.create_proxy<atomic_long>(name);
161}
162
163boost::future<std::shared_ptr<atomic_reference>>
165{
166 return proxy_factory_.create_proxy<atomic_reference>(name);
167}
168
169boost::future<std::shared_ptr<latch>>
170cp_subsystem::get_latch(const std::string& name)
171{
172 return proxy_factory_.create_proxy<latch>(name);
173}
174
175boost::future<std::shared_ptr<fenced_lock>>
176cp_subsystem::get_lock(const std::string& name)
177{
178 return proxy_factory_.create_proxy<fenced_lock>(name);
179}
180
181boost::future<std::shared_ptr<counting_semaphore>>
182cp_subsystem::get_semaphore(const std::string& name)
183{
184 return proxy_factory_.create_proxy<counting_semaphore>(name);
185}
186
187cp_proxy::cp_proxy(const std::string& service_name,
188 const std::string& proxy_name,
189 client::spi::ClientContext* context,
190 const raft_group_id& group_id,
191 const std::string& object_name)
192 : ProxyImpl(service_name, proxy_name, context)
193 , group_id_(group_id)
194 , object_name_(object_name)
195{}
196
197void
198cp_proxy::on_destroy()
199{
200 auto request = cpgroup_destroycpobject_encode(
201 group_id_, get_service_name(), object_name_);
202 invoke(request).get();
203}
204
205const raft_group_id&
206cp_proxy::get_group_id() const
207{
208 return group_id_;
209}
210
211atomic_long::atomic_long(const std::string& name,
212 client::spi::ClientContext& context,
213 const raft_group_id& group_id,
214 const std::string& object_name)
215 : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name)
216{}
217
218boost::future<int64_t>
220{
221 auto request = atomiclong_addandget_encode(group_id_, object_name_, delta);
222 return invoke_and_get_future<int64_t>(request);
223}
224
225boost::future<bool>
226atomic_long::compare_and_set(int64_t expect, int64_t update)
227{
228 auto request =
229 atomiclong_compareandset_encode(group_id_, object_name_, expect, update);
230 return invoke_and_get_future<bool>(request);
231}
232
233boost::future<int64_t>
235{
236 return get_and_add(-1);
237}
238
239boost::future<int64_t>
241{
242 return add_and_get(-1);
243}
244
245boost::future<int64_t>
247{
248 auto request = atomiclong_get_encode(group_id_, object_name_);
249 return invoke_and_get_future<int64_t>(request);
250}
251
252boost::future<int64_t>
254{
255 auto request = atomiclong_getandadd_encode(group_id_, object_name_, delta);
256 return invoke_and_get_future<int64_t>(request);
257}
258
259boost::future<int64_t>
260atomic_long::get_and_set(int64_t new_value)
261{
262 auto request =
263 atomiclong_getandset_encode(group_id_, object_name_, new_value);
264 return invoke_and_get_future<int64_t>(request);
265}
266
267boost::future<int64_t>
272
273boost::future<int64_t>
278
279boost::future<void>
280atomic_long::set(int64_t new_value)
281{
282 return to_void_future(get_and_set(new_value));
283}
284
285boost::future<int64_t>
286atomic_long::alter_data(client::serialization::pimpl::data& function_data,
287 alter_result_type result_type)
288{
289 auto request = atomiclong_alter_encode(group_id_,
290 object_name_,
291 function_data,
292 static_cast<int32_t>(result_type));
293 return invoke_and_get_future<int64_t>(request);
294}
295
296boost::future<boost::optional<client::serialization::pimpl::data>>
297atomic_long::apply_data(client::serialization::pimpl::data& function_data)
298{
299 auto request =
300 atomiclong_apply_encode(group_id_, object_name_, function_data);
301 return invoke_and_get_future<
302 boost::optional<client::serialization::pimpl::data>>(request);
303}
304
305atomic_reference::atomic_reference(const std::string& name,
306 client::spi::ClientContext& context,
307 const raft_group_id& group_id,
308 const std::string& object_name)
309 : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name)
310{}
311
312boost::future<boost::optional<client::serialization::pimpl::data>>
313atomic_reference::get_data()
314{
315 auto request = atomicref_get_encode(group_id_, object_name_);
316 return invoke_and_get_future<
317 boost::optional<client::serialization::pimpl::data>>(request);
318}
319
320boost::future<boost::optional<client::serialization::pimpl::data>>
321atomic_reference::set_data(
322 const client::serialization::pimpl::data& new_value_data)
323{
324 auto request =
325 atomicref_set_encode(group_id_, object_name_, &new_value_data, false);
326 return invoke_and_get_future<
327 boost::optional<client::serialization::pimpl::data>>(request);
328}
329
330boost::future<boost::optional<client::serialization::pimpl::data>>
331atomic_reference::get_and_set_data(
332 const client::serialization::pimpl::data& new_value_data)
333{
334 auto request =
335 atomicref_set_encode(group_id_, object_name_, &new_value_data, true);
336 return invoke_and_get_future<
337 boost::optional<client::serialization::pimpl::data>>(request);
338}
339
340boost::future<bool>
341atomic_reference::compare_and_set_data(
342 const client::serialization::pimpl::data& expect_data,
343 const client::serialization::pimpl::data& update_data)
344{
345 auto request = atomicref_compareandset_encode(
346 group_id_, object_name_, &expect_data, &update_data);
347 return invoke_and_get_future<bool>(request);
348}
349
350boost::future<bool>
351atomic_reference::contains_data(
352 const client::serialization::pimpl::data& value_data)
353{
354 auto request =
355 atomicref_contains_encode(group_id_, object_name_, &value_data);
356 return invoke_and_get_future<bool>(request);
357}
358
359boost::future<void>
360atomic_reference::alter_data(
361 const client::serialization::pimpl::data& function_data)
362{
363 return to_void_future(
364 invoke_apply(function_data, return_value_type::NO_VALUE, true));
365}
366
367boost::future<boost::optional<client::serialization::pimpl::data>>
368atomic_reference::alter_and_get_data(
369 const client::serialization::pimpl::data& function_data)
370{
371 return invoke_apply(function_data, return_value_type::NEW, true);
372}
373
374boost::future<boost::optional<client::serialization::pimpl::data>>
375atomic_reference::get_and_alter_data(
376 const client::serialization::pimpl::data& function_data)
377{
378 return invoke_apply(function_data, return_value_type::OLD, true);
379}
380
381boost::future<boost::optional<client::serialization::pimpl::data>>
382atomic_reference::apply_data(
383 const client::serialization::pimpl::data& function_data)
384{
385 return invoke_apply(function_data, return_value_type::NEW, false);
386}
387
388boost::future<bool>
389atomic_reference::is_null()
390{
391 return contains(static_cast<byte*>(nullptr));
392}
393
394boost::future<void>
395atomic_reference::clear()
396{
397 return to_void_future(set(static_cast<byte*>(nullptr)));
398}
399
400boost::future<boost::optional<client::serialization::pimpl::data>>
401atomic_reference::invoke_apply(
402 const client::serialization::pimpl::data function_data,
403 return_value_type return_type,
404 bool alter)
405{
406 auto request = atomicref_apply_encode(group_id_,
407 object_name_,
408 function_data,
409 static_cast<int32_t>(return_type),
410 alter);
411 return invoke_and_get_future<
412 boost::optional<client::serialization::pimpl::data>>(request);
413}
414
415latch::latch(const std::string& name,
416 client::spi::ClientContext& context,
417 const raft_group_id& group_id,
418 const std::string& object_name)
419 : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name)
420{}
421
422boost::future<bool>
423latch::try_set_count(int32_t count)
424{
425 util::Preconditions::check_positive(count, "count must be positive!");
426
427 auto request =
428 countdownlatch_trysetcount_encode(group_id_, object_name_, count);
429 return invoke_and_get_future<bool>(request);
430}
431
432boost::future<int32_t>
433latch::get_count()
434{
435 auto request = countdownlatch_getcount_encode(group_id_, object_name_);
436 return invoke_and_get_future<int32_t>(request);
437}
438
439boost::future<void>
440latch::count_down()
441{
442 auto invocation_uid =
443 get_context().get_hazelcast_client_implementation()->random_uuid();
444 return get_round().then(boost::launch::sync, [=](boost::future<int32_t> f) {
445 auto round = f.get();
446 for (;;) {
447 try {
448 count_down(round, invocation_uid);
449 return;
450 } catch (client::exception::operation_timeout&) {
451 // I can retry safely because my retry would be idempotent...
452 }
453 }
454 });
455}
456
457boost::future<bool>
458latch::try_wait()
459{
460 return get_count().then(boost::launch::sync, [](boost::future<int32_t> f) {
461 return f.get() == 0;
462 });
463}
464
465boost::future<int32_t>
466latch::get_round()
467{
468 auto request = countdownlatch_getround_encode(group_id_, object_name_);
469 return invoke_and_get_future<int32_t>(request);
470}
471
472void
473latch::count_down(int round, boost::uuids::uuid invocation_uid)
474{
475 auto request = countdownlatch_countdown_encode(
476 group_id_, object_name_, invocation_uid, round);
477 invoke(request).get();
478}
479
480boost::future<void>
481latch::wait()
482{
483 return to_void_future(wait_for(std::chrono::milliseconds(INT64_MAX)));
484}
485
486boost::future<std::cv_status>
487latch::wait_for(std::chrono::milliseconds timeout)
488{
489 auto timeout_millis = std::max<int64_t>(0, timeout.count());
490 auto invoation_uid =
491 get_context().get_hazelcast_client_implementation()->random_uuid();
492 auto request = countdownlatch_await_encode(
493 group_id_, object_name_, invoation_uid, timeout_millis);
494 return invoke_and_get_future<bool>(request).then(
495 boost::launch::sync, [](boost::future<bool> f) {
496 return f.get() ? std::cv_status::no_timeout : std::cv_status::timeout;
497 });
498}
499
500bool
501raft_group_id::operator==(const raft_group_id& rhs) const
502{
503 return name == rhs.name && seed == rhs.seed && group_id == rhs.group_id;
504}
505
506bool
507raft_group_id::operator!=(const raft_group_id& rhs) const
508{
509 return !(rhs == *this);
510}
511
512constexpr int64_t fenced_lock::INVALID_FENCE;
513
514fenced_lock::fenced_lock(const std::string& name,
515 client::spi::ClientContext& context,
516 const raft_group_id& group_id,
517 const std::string& object_name)
518 : session_aware_proxy(SERVICE_NAME,
519 name,
520 &context,
521 group_id,
522 object_name,
523 context.get_proxy_session_manager())
524{}
525
526boost::future<void>
527fenced_lock::lock()
528{
529 return to_void_future(lock_and_get_fence());
530}
531
532boost::future<int64_t>
533fenced_lock::lock_and_get_fence()
534{
535 auto thread_id = util::get_current_thread_id();
536 auto invocation_uid = get_context().random_uuid();
537
538 auto do_lock_once = [=]() {
539 auto session_id = session_manager_.acquire_session(group_id_);
540 verify_locked_session_id_if_present(thread_id, session_id, true);
541 return do_lock(session_id, thread_id, invocation_uid)
542 .then(boost::launch::sync, [=](boost::future<int64_t> f) {
543 try {
544 auto fence = f.get();
545 if (fence != INVALID_FENCE) {
546 locked_session_ids_.put(
547 thread_id, std::make_shared<int64_t>(session_id));
548 return fence;
549 }
550 BOOST_THROW_EXCEPTION(
551 client::exception::lock_acquire_limit_reached(
552 "fenced_lock::lock_and_get_fence",
553 (boost::format("Lock [%1%] reentrant lock limit is "
554 "already reached!") %
555 object_name_)
556 .str()));
557 } catch (client::exception::session_expired&) {
558 invalidate_session(session_id);
559 verify_no_locked_session_id_present(thread_id);
560 return INVALID_FENCE;
561 } catch (client::exception::wait_key_cancelled&) {
562 release_session(session_id);
563 BOOST_THROW_EXCEPTION(
564 client::exception::lock_acquire_limit_reached(
565 "fenced_lock::lock_and_get_fence",
566 (boost::format(
567 "Lock [%1%] not acquired because the lock call on the "
568 "CP group is cancelled, possibly because of another "
569 "indeterminate call from the same thread.") %
570 object_name_)
571 .str()));
572 } catch (...) {
573 release_session(session_id);
574 throw;
575 }
576 });
577 };
578
579 return do_lock_once().then(
580 boost::launch::sync, [=](boost::future<int64_t> f) {
581 auto result = f.get();
582 if (result != INVALID_FENCE) {
583 return result;
584 }
585 // iterate in the user thread
586 for (result = do_lock_once().get(); result == INVALID_FENCE;) {
587 }
588 return result;
589 });
590}
591
592void
593fenced_lock::verify_locked_session_id_if_present(int64_t thread_id,
594 int64_t session_id,
595 bool should_release)
596{
597 auto locked_session_id = locked_session_ids_.get(thread_id);
598 if (locked_session_id && *locked_session_id != session_id) {
599 locked_session_ids_.remove(thread_id);
600 if (should_release) {
601 release_session(session_id);
602 }
603
604 throw_lock_ownership_lost(*locked_session_id);
605 }
606}
607
608void
609fenced_lock::verify_no_locked_session_id_present(int64_t thread_id)
610{
611 auto locked_session_id = locked_session_ids_.remove(thread_id);
612 if (locked_session_id) {
613 locked_session_ids_.remove(thread_id);
614 throw_lock_ownership_lost(*locked_session_id);
615 }
616}
617
618void
619fenced_lock::throw_lock_ownership_lost(int64_t session_id) const
620{
621 BOOST_THROW_EXCEPTION(client::exception::lock_ownership_lost(
622 "fenced_lock",
623 (boost::format("Current thread is not owner of the Lock[%1%] because its "
624 "Session[%2%] is closed by server!") %
625 get_name() % session_id)
626 .str()));
627}
628
629void
630fenced_lock::throw_illegal_monitor_state() const
631{
632 BOOST_THROW_EXCEPTION(client::exception::illegal_monitor_state(
633 "fenced_lock",
634 (boost::format("Current thread is not owner of the Lock[%1%]") %
635 get_name())
636 .str()));
637}
638
639boost::future<int64_t>
640fenced_lock::do_lock(int64_t session_id,
641 int64_t thread_id,
642 boost::uuids::uuid invocation_uid)
643{
644 auto request = client::protocol::codec::fencedlock_lock_encode(
645 group_id_, object_name_, session_id, thread_id, invocation_uid);
646 return invoke_and_get_future<int64_t>(request);
647}
648
649boost::future<int64_t>
650fenced_lock::do_try_lock(int64_t session_id,
651 int64_t thread_id,
652 boost::uuids::uuid invocation_uid,
653 std::chrono::milliseconds timeout)
654{
655 auto request = client::protocol::codec::fencedlock_trylock_encode(
656 group_id_,
657 object_name_,
658 session_id,
659 thread_id,
660 invocation_uid,
661 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
662 return invoke_and_get_future<int64_t>(request);
663}
664
665boost::future<bool>
666fenced_lock::do_unlock(int64_t session_id,
667 int64_t thread_id,
668 boost::uuids::uuid invocation_uid)
669{
670 auto request = client::protocol::codec::fencedlock_unlock_encode(
671 group_id_, object_name_, session_id, thread_id, invocation_uid);
672 return invoke_and_get_future<bool>(request);
673}
674
675boost::future<fenced_lock::lock_ownership_state>
676fenced_lock::do_get_lock_ownership_state()
677{
678 auto request = client::protocol::codec::fencedlock_getlockownership_encode(
679 group_id_, object_name_);
680 return invoke(request).then(
681 boost::launch::sync,
682 [](boost::future<client::protocol::ClientMessage> f) {
683 auto msg = f.get();
684 fenced_lock::lock_ownership_state state;
685 state.fence = msg.get_first_fixed_sized_field<int64_t>();
686 state.lock_count = msg.get<int32_t>();
687 state.session_id = msg.get<int64_t>();
688 state.thread_id = msg.get<int64_t>();
689 return state;
690 });
691}
692
693void
694fenced_lock::invalidate_session(int64_t session_id)
695{
696 session_manager_.invalidate_session(group_id_, session_id);
697}
698
699boost::future<bool>
700fenced_lock::try_lock()
701{
702 return try_lock_and_get_fence().then(
703 boost::launch::sync,
704 [](boost::future<int64_t> f) { return f.get() != INVALID_FENCE; });
705}
706
707boost::future<bool>
708fenced_lock::try_lock(std::chrono::milliseconds timeout)
709{
710 return try_lock_and_get_fence(timeout).then(
711 boost::launch::sync,
712 [](boost::future<int64_t> f) { return f.get() != INVALID_FENCE; });
713}
714
715boost::future<int64_t>
716fenced_lock::try_lock_and_get_fence()
717{
718 return try_lock_and_get_fence(std::chrono::milliseconds(0));
719}
720
721boost::future<int64_t>
722fenced_lock::try_lock_and_get_fence(std::chrono::milliseconds timeout)
723{
724 auto thread_id = util::get_current_thread_id();
725 auto invocation_uid = get_context().random_uuid();
726
727 auto do_try_lock_once = [=]() {
728 using namespace std::chrono;
729 auto start = steady_clock::now();
730 auto session_id = session_manager_.acquire_session(group_id_);
731 verify_locked_session_id_if_present(thread_id, session_id, true);
732 return do_try_lock(session_id, thread_id, invocation_uid, timeout)
733 .then(boost::launch::sync, [=](boost::future<int64_t> f) {
734 try {
735 auto fence = f.get();
736 if (fence != INVALID_FENCE) {
737 locked_session_ids_.put(
738 thread_id, std::make_shared<int64_t>(session_id));
739 return std::make_pair(fence, false);
740 } else {
741 release_session(session_id);
742 }
743 return std::make_pair(fence, false);
744 } catch (client::exception::session_expired&) {
745 invalidate_session(session_id);
746 verify_no_locked_session_id_present(thread_id);
747 auto timeout_left = timeout - (steady_clock::now() - start);
748 if (timeout_left.count() <= 0) {
749 return std::make_pair(INVALID_FENCE, false);
750 }
751 return std::make_pair(INVALID_FENCE, false);
752 } catch (client::exception::wait_key_cancelled&) {
753 release_session(session_id);
754 return std::make_pair(INVALID_FENCE, false);
755 } catch (...) {
756 release_session(session_id);
757 throw;
758 }
759 });
760 };
761
762 return do_try_lock_once().then(
763 boost::launch::sync, [=](boost::future<std::pair<int64_t, bool>> f) {
764 auto result = f.get();
765 if (!result.second) {
766 return result.first;
767 }
768 // iterate in the user thread
769 for (result = do_try_lock_once().get(); result.second;) {
770 }
771 return result.first;
772 });
773}
774
775boost::future<void>
776fenced_lock::unlock()
777{
778 auto thread_id = util::get_current_thread_id();
779 int64_t session_id = session_manager_.get_session(group_id_);
780
781 // the order of the following checks is important.
782 verify_locked_session_id_if_present(thread_id, session_id, false);
783 if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
784 locked_session_ids_.remove(thread_id);
785 throw_illegal_monitor_state();
786 }
787
788 return do_unlock(session_id, thread_id, get_context().random_uuid())
789 .then(boost::launch::sync, [=](boost::future<bool> f) {
790 try {
791 auto still_locked_by_current_thread = f.get();
792 if (still_locked_by_current_thread) {
793 locked_session_ids_.put(
794 thread_id, std::make_shared<int64_t>(session_id));
795 } else {
796 locked_session_ids_.remove(thread_id);
797 }
798
799 release_session(session_id);
800 } catch (client::exception::session_expired&) {
801 invalidate_session(session_id);
802 locked_session_ids_.remove(thread_id);
803
804 throw_lock_ownership_lost(session_id);
805 } catch (client::exception::illegal_monitor_state&) {
806 locked_session_ids_.remove(thread_id);
807 throw;
808 }
809 });
810}
811
812boost::future<int64_t>
813fenced_lock::get_fence()
814{
815 auto thread_id = util::get_current_thread_id();
816 int64_t session_id = session_manager_.get_session(group_id_);
817
818 // the order of the following checks is important.
819 verify_locked_session_id_if_present(thread_id, session_id, false);
820 if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
821 locked_session_ids_.remove(thread_id);
822 throw_illegal_monitor_state();
823 }
824
825 return do_get_lock_ownership_state().then(
826 boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
827 auto ownership = f.get();
828 if (ownership.is_locked_by(session_id, thread_id)) {
829 locked_session_ids_.put(thread_id,
830 std::make_shared<int64_t>(session_id));
831 return ownership.fence;
832 }
833
834 verify_no_locked_session_id_present(thread_id);
835 throw_illegal_monitor_state();
836 return INVALID_FENCE;
837 });
838}
839
840boost::future<bool>
841fenced_lock::is_locked()
842{
843 auto thread_id = util::get_current_thread_id();
844 int64_t session_id = session_manager_.get_session(group_id_);
845
846 verify_locked_session_id_if_present(thread_id, session_id, false);
847
848 return do_get_lock_ownership_state().then(
849 boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
850 auto ownership = f.get();
851 if (ownership.is_locked_by(session_id, thread_id)) {
852 locked_session_ids_.put(thread_id,
853 std::make_shared<int64_t>(session_id));
854 return true;
855 }
856
857 verify_no_locked_session_id_present(thread_id);
858 return ownership.is_locked();
859 });
860}
861
862boost::future<bool>
863fenced_lock::is_locked_by_current_thread()
864{
865 auto thread_id = util::get_current_thread_id();
866 int64_t session_id = session_manager_.get_session(group_id_);
867
868 verify_locked_session_id_if_present(thread_id, session_id, false);
869
870 return do_get_lock_ownership_state().then(
871 boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
872 auto ownership = f.get();
873 auto locked_by_current_thread =
874 ownership.is_locked_by(session_id, thread_id);
875 if (locked_by_current_thread) {
876 locked_session_ids_.put(thread_id,
877 std::make_shared<int64_t>(session_id));
878 } else {
879 verify_no_locked_session_id_present(thread_id);
880 }
881
882 return locked_by_current_thread;
883 });
884}
885
886boost::future<int32_t>
887fenced_lock::get_lock_count()
888{
889 auto thread_id = util::get_current_thread_id();
890 int64_t session_id = session_manager_.get_session(group_id_);
891
892 verify_locked_session_id_if_present(thread_id, session_id, false);
893
894 return do_get_lock_ownership_state().then(
895 boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
896 auto ownership = f.get();
897 if (ownership.is_locked_by(session_id, thread_id)) {
898 locked_session_ids_.put(thread_id,
899 std::make_shared<int64_t>(session_id));
900 } else {
901 verify_no_locked_session_id_present(thread_id);
902 }
903
904 return ownership.lock_count;
905 });
906}
907
908const raft_group_id&
909fenced_lock::get_group_id()
910{
911 return group_id_;
912}
913
914bool
915operator==(const fenced_lock& lhs, const fenced_lock& rhs)
916{
917 return lhs.get_service_name() == rhs.get_service_name() &&
918 lhs.get_name() == rhs.get_name();
919}
920
921void
922fenced_lock::post_destroy()
923{
924 ClientProxy::post_destroy();
925 locked_session_ids_.clear();
926}
927
928session_aware_proxy::session_aware_proxy(
929 const std::string& service_name,
930 const std::string& proxy_name,
931 client::spi::ClientContext* context,
932 const raft_group_id& group_id,
933 const std::string& object_name,
934 internal::session::proxy_session_manager& session_manager)
935 : cp_proxy(service_name, proxy_name, context, group_id, object_name)
936 , session_manager_(session_manager)
937{}
938
939void
941{
942 session_manager_.release_session(group_id_, session_id);
943}
944
945bool
946fenced_lock::lock_ownership_state::is_locked_by(int64_t session, int64_t thread)
947{
948 return is_locked() && session_id == session && thread_id == thread;
949}
950
951bool
952fenced_lock::lock_ownership_state::is_locked()
953{
954 return fence != INVALID_FENCE;
955}
956
957counting_semaphore::counting_semaphore(
958 const std::string& proxy_name,
959 client::spi::ClientContext* context,
960 const raft_group_id& group_id,
961 const std::string& object_name,
962 internal::session::proxy_session_manager& session_manager)
963 : session_aware_proxy(SERVICE_NAME,
964 proxy_name,
965 context,
966 group_id,
967 object_name,
968 session_manager)
969{}
970
971boost::future<bool>
972counting_semaphore::init(int32_t permits)
973{
974 util::Preconditions::check_not_negative(permits,
975 "Permits must be non-negative!");
976
977 auto request = client::protocol::codec::semaphore_init_encode(
978 group_id_, object_name_, permits);
979 return client::spi::impl::ClientInvocation::create(
980 context_, request, object_name_)
981 ->invoke()
982 .then(boost::launch::sync,
983 [](boost::future<client::protocol::ClientMessage> f) {
984 return f.get().get_first_fixed_sized_field<bool>();
985 });
986}
987
988boost::future<bool>
989counting_semaphore::try_acquire(int32_t permits)
990{
991 return try_acquire_for(std::chrono::milliseconds::zero(), permits);
992}
993
994boost::future<bool>
995counting_semaphore::try_acquire_for(std::chrono::milliseconds rel_time,
996 int32_t permits)
997{
998 if (rel_time < std::chrono::milliseconds::zero()) {
999 rel_time = std::chrono::milliseconds ::zero();
1000 }
1001 return try_acquire_for_millis(permits, rel_time);
1002}
1003
1004boost::future<void>
1005counting_semaphore::do_release(int32_t permits,
1006 int64_t thread_id,
1007 int64_t session_id)
1008{
1009 auto invocation_uid =
1010 get_context().get_hazelcast_client_implementation()->random_uuid();
1011 auto request = codec::semaphore_release_encode(
1012 group_id_, object_name_, session_id, thread_id, invocation_uid, permits);
1013 return to_void_future(client::spi::impl::ClientInvocation::create(
1014 context_, request, object_name_)
1015 ->invoke());
1016}
1017
1018boost::future<int32_t>
1019counting_semaphore::available_permits()
1020{
1021 auto request =
1022 codec::semaphore_availablepermits_encode(group_id_, object_name_);
1023 return decode<int32_t>(client::spi::impl::ClientInvocation::create(
1024 context_, request, object_name_)
1025 ->invoke());
1026}
1027
1028boost::future<void>
1029counting_semaphore::reduce_permits(int32_t reduction)
1030{
1031 util::Preconditions::check_not_negative(reduction,
1032 "Reduction must be non-negative!");
1033 if (reduction == 0) {
1034 return boost::make_ready_future();
1035 }
1036 return do_change_permits(-reduction);
1037}
1038
1039boost::future<void>
1040counting_semaphore::increase_permits(int32_t increase)
1041{
1042 util::Preconditions::check_not_negative(increase,
1043 "Increase must be non-negative!");
1044 if (increase == 0) {
1045 return boost::make_ready_future();
1046 }
1047 return do_change_permits(increase);
1048}
1049
1050sessionless_semaphore::sessionless_semaphore(
1051 const std::string& proxy_name,
1052 client::spi::ClientContext* context,
1053 const raft_group_id& group_id,
1054 const std::string& object_name,
1055 internal::session::proxy_session_manager& session_manager)
1056 : counting_semaphore(proxy_name,
1057 context,
1058 group_id,
1059 object_name,
1060 session_manager)
1061{}
1062
1063boost::future<void>
1064sessionless_semaphore::acquire(int32_t permits)
1065{
1066 util::Preconditions::check_positive(permits,
1067 "permits must be positive number.");
1068
1069 return to_void_future(
1070 do_try_acquire(permits, std::chrono::milliseconds(-1)));
1071}
1072
1073boost::future<bool>
1074sessionless_semaphore::do_try_acquire(int32_t permits,
1075 std::chrono::milliseconds timeout_ms)
1076{
1077 auto cluster_wide_threadId = get_thread_id();
1078 auto invocation_uid =
1079 get_context().get_hazelcast_client_implementation()->random_uuid();
1080 auto request = client::protocol::codec::semaphore_acquire_encode(
1081 group_id_,
1082 object_name_,
1083 internal::session::proxy_session_manager::NO_SESSION_ID,
1084 cluster_wide_threadId,
1085 invocation_uid,
1086 permits,
1087 timeout_ms.count());
1088 return client::spi::impl::ClientInvocation::create(
1089 context_, request, object_name_)
1090 ->invoke()
1091 .then(
1092 boost::launch::sync,
1093 [=](boost::future<client::protocol::ClientMessage> f) {
1094 try {
1095 return f.get().get_first_fixed_sized_field<bool>();
1096 } catch (client::exception::wait_key_cancelled&) {
1097 throw client::exception::illegal_state(
1098 "sessionless_semaphore::acquire",
1099 (boost::format(
1100 "Semaphore[%1%] ] not acquired because the acquire call "
1101 "on the CP group is cancelled, possibly because of "
1102 "another indeterminate call from the same thread.") %
1103 object_name_)
1104 .str());
1105 }
1106 });
1107}
1108
1109boost::future<bool>
1110sessionless_semaphore::try_acquire_for_millis(int32_t permits,
1111 std::chrono::milliseconds timeout)
1112{
1113 util::Preconditions::check_positive(permits, "Permits must be positive!");
1114
1115 return do_try_acquire(permits,
1116 timeout > std::chrono::milliseconds::zero()
1117 ? timeout
1118 : std::chrono::milliseconds::zero());
1119}
1120
1121boost::future<void>
1122sessionless_semaphore::release(int32_t permits)
1123{
1124 util::Preconditions::check_positive(permits, "Permits must be positive!");
1125 auto thread_id = get_thread_id();
1126 return do_release(permits,
1127 thread_id,
1128 internal::session::proxy_session_manager::NO_SESSION_ID);
1129}
1130
1131int64_t
1132sessionless_semaphore::get_thread_id()
1133{
1134 return session_manager_.get_or_create_unique_thread_id(group_id_);
1135}
1136
1137boost::future<int32_t>
1138sessionless_semaphore::drain_permits()
1139{
1140 auto cluster_wide_threadId = get_thread_id();
1141 auto invocation_uid =
1142 get_context().get_hazelcast_client_implementation()->random_uuid();
1143 auto request = client::protocol::codec::semaphore_drain_encode(
1144 group_id_,
1145 object_name_,
1146 internal::session::proxy_session_manager::NO_SESSION_ID,
1147 cluster_wide_threadId,
1148 invocation_uid);
1149 return decode<int32_t>(client::spi::impl::ClientInvocation::create(
1150 context_, request, object_name_)
1151 ->invoke());
1152}
1153
1154boost::future<void>
1155sessionless_semaphore::do_change_permits(int32_t delta)
1156{
1157 auto cluster_wide_threadId = get_thread_id();
1158 auto invocation_uid =
1159 get_context().get_hazelcast_client_implementation()->random_uuid();
1160 auto request = client::protocol::codec::semaphore_change_encode(
1161 group_id_,
1162 object_name_,
1163 internal::session::proxy_session_manager::NO_SESSION_ID,
1164 cluster_wide_threadId,
1165 invocation_uid,
1166 delta);
1167 return to_void_future(client::spi::impl::ClientInvocation::create(
1168 context_, request, object_name_)
1169 ->invoke());
1170}
1171
1172session_semaphore::session_semaphore(
1173 const std::string& proxy_name,
1174 client::spi::ClientContext* context,
1175 const raft_group_id& group_id,
1176 const std::string& object_name,
1177 internal::session::proxy_session_manager& session_manager)
1178 : counting_semaphore(proxy_name,
1179 context,
1180 group_id,
1181 object_name,
1182 session_manager)
1183{}
1184
1185boost::future<void>
1186session_semaphore::acquire(int32_t permits)
1187{
1188 return to_void_future(
1189 try_acquire_for_millis(permits, std::chrono::milliseconds(-1)));
1190}
1191
1192boost::future<bool>
1193session_semaphore::try_acquire_for_millis(int32_t permits,
1194 std::chrono::milliseconds timeout)
1195{
1196 util::Preconditions::check_not_negative(
1197 permits, "permits must not be negative number.");
1198
1199 auto thread_id = get_thread_id();
1200 auto invocation_uid =
1201 get_context().get_hazelcast_client_implementation()->random_uuid();
1202
1203 auto do_try_acquire_once = ([=]() {
1204 auto start = std::chrono::steady_clock::now();
1205 auto use_timeout = timeout >= std::chrono::milliseconds::zero();
1206 auto session_id = session_manager_.acquire_session(group_id_, permits);
1207 auto request =
1208 client::protocol::codec::semaphore_acquire_encode(group_id_,
1209 object_name_,
1210 session_id,
1211 thread_id,
1212 invocation_uid,
1213 permits,
1214 timeout.count());
1215 return client::spi::impl::ClientInvocation::create(
1216 context_, request, object_name_)
1217 ->invoke()
1218 .then(
1219 boost::launch::sync,
1220 [=](boost::future<client::protocol::ClientMessage> f) {
1221 try {
1222 auto acquired = f.get().get_first_fixed_sized_field<bool>();
1223 if (!acquired) {
1224 session_manager_.release_session(group_id_, session_id);
1225 }
1226 // first bool means acquired or not, second bool means if
1227 // should try again
1228 return std::make_pair(acquired, false);
1229 } catch (client::exception::session_expired&) {
1230 session_manager_.invalidate_session(group_id_, session_id);
1231 if (use_timeout &&
1232 (timeout - (std::chrono::steady_clock::now() - start) <=
1233 std::chrono::milliseconds::zero())) {
1234 return std::make_pair(false, false);
1235 }
1236 return std::make_pair(false, true);
1237 } catch (client::exception::wait_key_cancelled&) {
1238 session_manager_.release_session(
1239 group_id_, session_id, permits);
1240 if (!use_timeout) {
1241 BOOST_THROW_EXCEPTION(client::exception::illegal_state(
1242 "session_semaphore::try_acquire_for_millis",
1243 (boost::format(
1244 "Semaphore[%1%] not acquired because the acquire "
1245 "call on the CP group is cancelled, possibly "
1246 "because of another indeterminate call from the "
1247 "same thread.") %
1248 object_name_)
1249 .str()));
1250 }
1251 return std::make_pair(false, false);
1252 }
1253 });
1254 });
1255
1256 return do_try_acquire_once().then(
1257 boost::launch::sync, [=](boost::future<std::pair<bool, bool>> f) {
1258 auto result = f.get();
1259 if (!result.second) {
1260 return result.first;
1261 }
1262 for (; result.second; result = do_try_acquire_once().get())
1263 ;
1264 return result.first;
1265 });
1266}
1267
1268boost::future<void>
1269session_semaphore::release(int32_t permits)
1270{
1271 util::Preconditions::check_positive(permits, "Permits must be positive!");
1272 auto session_id = session_manager_.get_session(group_id_);
1273 if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
1274 throw_illegal_state_exception(nullptr);
1275 }
1276
1277 auto thread_id = get_thread_id();
1278 return do_release(permits, thread_id, session_id)
1279 .then([=](boost::future<void> f) {
1280 try {
1281 f.get();
1282 session_manager_.release_session(group_id_, session_id, permits);
1283 } catch (client::exception::session_expired&) {
1284 session_manager_.invalidate_session(group_id_, session_id);
1285 session_manager_.release_session(group_id_, session_id, permits);
1286 throw_illegal_state_exception(std::current_exception());
1287 }
1288 });
1289}
1290
1291void
1292session_semaphore::throw_illegal_state_exception(std::exception_ptr e)
1293{
1294 auto ise = boost::enable_current_exception(client::exception::illegal_state(
1295 "session_semaphore::illegal_state", "No valid session!"));
1296 if (!e) {
1297 throw ise;
1298 }
1299 try {
1300 std::rethrow_exception(e);
1301 } catch (...) {
1302 std::throw_with_nested(ise);
1303 }
1304}
1305
1306int64_t
1307session_semaphore::get_thread_id()
1308{
1309 return util::get_current_thread_id();
1310}
1311
1312boost::future<int32_t>
1313session_semaphore::drain_permits()
1314{
1315 auto thread_id = get_thread_id();
1316 auto invocation_uid =
1317 get_context().get_hazelcast_client_implementation()->random_uuid();
1318
1319 auto do_drain_once = ([=]() {
1320 auto session_id =
1321 session_manager_.acquire_session(group_id_, DRAIN_SESSION_ACQ_COUNT);
1322 auto request = client::protocol::codec::semaphore_drain_encode(
1323 group_id_, object_name_, session_id, thread_id, invocation_uid);
1324 return client::spi::impl::ClientInvocation::create(
1325 context_, request, object_name_)
1326 ->invoke()
1327 .then(
1328 boost::launch::sync,
1329 [=](boost::future<client::protocol::ClientMessage> f) {
1330 try {
1331 auto count = f.get().get_first_fixed_sized_field<int32_t>();
1332 session_manager_.release_session(
1333 group_id_, session_id, DRAIN_SESSION_ACQ_COUNT - count);
1334 return count;
1335 } catch (client::exception::session_expired&) {
1336 session_manager_.invalidate_session(group_id_, session_id);
1337 return -1;
1338 }
1339 });
1340 });
1341
1342 return do_drain_once().then(
1343 boost::launch::sync, [=](boost::future<int32_t> f) {
1344 int32_t count = f.get();
1345 if (count != -1) {
1346 return count;
1347 }
1348 while ((count = do_drain_once().get()) == -1) {
1349 }
1350 return count;
1351 });
1352}
1353
1354boost::future<void>
1355session_semaphore::do_change_permits(int32_t delta)
1356{
1357 auto session_id =
1358 session_manager_.acquire_session(group_id_, DRAIN_SESSION_ACQ_COUNT);
1359 auto thread_id = get_thread_id();
1360 auto invocation_uid =
1361 get_context().get_hazelcast_client_implementation()->random_uuid();
1362
1363 auto request = client::protocol::codec::semaphore_change_encode(
1364 group_id_, object_name_, session_id, thread_id, invocation_uid, delta);
1365 return client::spi::impl::ClientInvocation::create(
1366 context_, request, object_name_)
1367 ->invoke()
1368 .then(boost::launch::sync,
1369 [=](boost::future<client::protocol::ClientMessage> f) {
1370 try {
1371 f.get();
1372 session_manager_.release_session(group_id_, session_id);
1373 } catch (client::exception::session_expired&) {
1374 session_manager_.invalidate_session(group_id_, session_id);
1375 throw_illegal_state_exception(std::current_exception());
1376 }
1377 });
1378}
1379} // namespace cp
1380} // namespace hazelcast
1381
1382namespace std {
1383std::size_t
1384hash<hazelcast::cp::raft_group_id>::operator()(
1385 const hazelcast::cp::raft_group_id& group_id) const noexcept
1386{
1387 std::size_t seed = 0;
1388 boost::hash_combine(seed, group_id.name);
1389 boost::hash_combine(seed, group_id.seed);
1390 boost::hash_combine(seed, group_id.group_id);
1391 return seed;
1392}
1393} // namespace std
1394
1395namespace boost {
1396std::size_t
1397hash<hazelcast::cp::raft_group_id>::operator()(
1398 const hazelcast::cp::raft_group_id& group_id) const noexcept
1399{
1400 return std::hash<hazelcast::cp::raft_group_id>()(group_id);
1401}
1402} // namespace boost
Client-side Raft-based proxy implementation of atomic long.
Definition cp.h:100
boost::future< int64_t > get_and_add(int64_t delta)
Atomically adds the given value to the current value.
Definition cp.cpp:253
boost::future< int64_t > get()
Gets the current value.
Definition cp.cpp:246
boost::future< int64_t > decrement_and_get()
Atomically decrements the current value by one.
Definition cp.cpp:240
boost::future< int64_t > get_and_increment()
Atomically increments the current value by one.
Definition cp.cpp:274
boost::future< bool > compare_and_set(int64_t expect, int64_t update)
Atomically sets the value to the given updated value only if the current value.
Definition cp.cpp:226
boost::future< int64_t > get_and_set(int64_t new_value)
Atomically sets the given value and returns the old value.
Definition cp.cpp:260
boost::future< void > set(int64_t new_value)
Atomically sets the given value.
Definition cp.cpp:280
boost::future< int64_t > increment_and_get()
Atomically increments the current value by one.
Definition cp.cpp:268
boost::future< int64_t > get_and_decrement()
Atomically decrements the current value by one.
Definition cp.cpp:234
boost::future< int64_t > add_and_get(int64_t delta)
Atomically adds the given value to the current value.
Definition cp.cpp:219
boost::future< std::shared_ptr< atomic_long > > get_atomic_long(const std::string &name)
Returns a proxy for an atomic_long instance created on CP Subsystem.
Definition cp.cpp:158
boost::future< std::shared_ptr< fenced_lock > > get_lock(const std::string &name)
Returns a proxy for an fenced_lock instance created on CP Subsystem.
Definition cp.cpp:176
boost::future< std::shared_ptr< latch > > get_latch(const std::string &name)
Returns a proxy for an count_down_latch instance created on CP Subsystem.
Definition cp.cpp:170
boost::future< std::shared_ptr< atomic_reference > > get_atomic_reference(const std::string &name)
Returns a proxy for an atomic_reference instance created on CP Subsystem.
Definition cp.cpp:164
boost::future< std::shared_ptr< counting_semaphore > > get_semaphore(const std::string &name)
Returns a proxy for an semaphore instance created on CP Subsystem.
Definition cp.cpp:182
void release_session(int64_t session_id)
Decrements acquire count of the session.
Definition cp.cpp:940
static constexpr int32_t DRAIN_SESSION_ACQ_COUNT
Since a proxy does not know how many permits will be drained on the Raft group, it uses this constant...
Definition cp.h:1301
STL namespace.