Hazelcast C++ Client
Hazelcast C++ Client Library
cp.cpp
1 /*
2  * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <algorithm>
18 #include <boost/algorithm/string.hpp>
19 
20 #include "hazelcast/cp/cp.h"
21 #include "hazelcast/client/spi/ClientContext.h"
22 #include "hazelcast/util/Preconditions.h"
23 #include "hazelcast/client/protocol/codec/codecs.h"
24 #include "hazelcast/client/protocol/ClientMessage.h"
25 #include "hazelcast/client/spi/impl/ClientInvocation.h"
26 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
27 
28 namespace hazelcast {
29 namespace cp {
30 using namespace hazelcast::client::protocol;
31 using namespace hazelcast::client::protocol::codec;
32 using namespace hazelcast::util;
33 
34 raft_proxy_factory::raft_proxy_factory(client::spi::ClientContext& context)
35  : context_(context)
36 {}
37 
38 std::shared_ptr<fenced_lock>
39 raft_proxy_factory::create_fenced_lock(raft_group_id&& group_id,
40  const std::string& proxy_name,
41  const std::string& object_name)
42 {
43  while (true) {
44  auto proxy = lock_proxies_.get(proxy_name);
45  if (proxy) {
46  if (proxy->get_group_id() != group_id) {
47  lock_proxies_.remove(proxy_name, proxy);
48  } else {
49  return proxy;
50  }
51  }
52 
53  proxy = std::make_shared<fenced_lock>(
54  proxy_name, context_, group_id, object_name);
55  auto existing = lock_proxies_.put_if_absent(proxy_name, proxy);
56  if (!existing) {
57  return proxy;
58  } else if (existing->get_group_id() == group_id) {
59  return existing;
60  }
61 
62  group_id = get_group_id(proxy_name, object_name).get();
63  }
64 }
65 
66 std::shared_ptr<counting_semaphore>
67 raft_proxy_factory::create_semaphore(raft_group_id&& group_id,
68  const std::string& proxy_name,
69  const std::string& object_name)
70 {
71  auto request =
72  client::protocol::codec::semaphore_getsemaphoretype_encode(proxy_name);
73  auto is_sessionless = client::spi::impl::ClientInvocation::create(
74  context_, request, object_name)
75  ->invoke()
76  .get()
77  .get_first_fixed_sized_field<bool>();
78  if (is_sessionless) {
79  return std::shared_ptr<counting_semaphore>(
80  new sessionless_semaphore(proxy_name,
81  &context_,
82  group_id,
83  object_name,
84  context_.get_proxy_session_manager()));
85  } else {
86  return std::shared_ptr<counting_semaphore>(
87  new session_semaphore(proxy_name,
88  &context_,
89  group_id,
90  object_name,
91  context_.get_proxy_session_manager()));
92  }
93 }
94 
95 std::string
96 raft_proxy_factory::without_default_group_name(const std::string& n)
97 {
98  std::string name = n;
99  boost::trim(name);
100  auto index = name.find('@');
101  if (index == std::string::npos) {
102  return name;
103  }
104 
105  Preconditions::check_true(
106  name.find('@', index + 1) == std::string::npos,
107  "Custom group name must be specified at most once");
108 
109  auto group_name = name.substr(index + 1);
110  boost::trim(group_name);
111  if (group_name == DEFAULT_GROUP_NAME) {
112  return name.substr(0, index);
113  }
114  return name;
115 }
116 
117 std::string
118 raft_proxy_factory::object_name_for_proxy(const std::string& name)
119 {
120  auto index = name.find('@');
121  if (index == std::string::npos) {
122  return name;
123  }
124 
125  Preconditions::check_true(index < (name.size() - 1),
126  "Object name cannot be empty string");
127  Preconditions::check_true(
128  name.find('@', index + 1) == std::string::npos,
129  "Custom CP group name must be specified at most once");
130 
131  auto object_name = name.substr(0, index);
132  boost::trim(object_name);
133  Preconditions::check_true(object_name.size() > 0,
134  "Object name cannot be empty string");
135  return object_name;
136 }
137 
138 boost::future<raft_group_id>
139 raft_proxy_factory::get_group_id(const std::string& proxy_name,
140  const std::string& object_name)
141 {
142  auto request = cpgroup_createcpgroup_encode(proxy_name);
143  return client::spi::impl::ClientInvocation::create(
144  context_, request, object_name)
145  ->invoke()
146  .then(boost::launch::sync,
147  [](boost::future<client::protocol::ClientMessage> f) {
148  return *f.get().get_first_var_sized_field<raft_group_id>();
149  });
150 }
151 
152 cp_subsystem::cp_subsystem(client::spi::ClientContext& context)
153  : context_(context)
154  , proxy_factory_(context)
155 {}
156 
157 boost::future<std::shared_ptr<atomic_long>>
158 cp_subsystem::get_atomic_long(const std::string& name)
159 {
160  return proxy_factory_.create_proxy<atomic_long>(name);
161 }
162 
163 boost::future<std::shared_ptr<atomic_reference>>
164 cp_subsystem::get_atomic_reference(const std::string& name)
165 {
166  return proxy_factory_.create_proxy<atomic_reference>(name);
167 }
168 
169 boost::future<std::shared_ptr<latch>>
170 cp_subsystem::get_latch(const std::string& name)
171 {
172  return proxy_factory_.create_proxy<latch>(name);
173 }
174 
175 boost::future<std::shared_ptr<fenced_lock>>
176 cp_subsystem::get_lock(const std::string& name)
177 {
178  return proxy_factory_.create_proxy<fenced_lock>(name);
179 }
180 
181 boost::future<std::shared_ptr<counting_semaphore>>
182 cp_subsystem::get_semaphore(const std::string& name)
183 {
184  return proxy_factory_.create_proxy<counting_semaphore>(name);
185 }
186 
187 cp_proxy::cp_proxy(const std::string& service_name,
188  const std::string& proxy_name,
189  client::spi::ClientContext* context,
190  const raft_group_id& group_id,
191  const std::string& object_name)
192  : ProxyImpl(service_name, proxy_name, context)
193  , group_id_(group_id)
194  , object_name_(object_name)
195 {}
196 
197 void
198 cp_proxy::on_destroy()
199 {
200  auto request = cpgroup_destroycpobject_encode(
201  group_id_, get_service_name(), object_name_);
202  invoke(request).get();
203 }
204 
205 const raft_group_id&
206 cp_proxy::get_group_id() const
207 {
208  return group_id_;
209 }
210 
211 atomic_long::atomic_long(const std::string& name,
212  client::spi::ClientContext& context,
213  const raft_group_id& group_id,
214  const std::string& object_name)
215  : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name)
216 {}
217 
218 boost::future<int64_t>
220 {
221  auto request = atomiclong_addandget_encode(group_id_, object_name_, delta);
222  return invoke_and_get_future<int64_t>(request);
223 }
224 
225 boost::future<bool>
226 atomic_long::compare_and_set(int64_t expect, int64_t update)
227 {
228  auto request =
229  atomiclong_compareandset_encode(group_id_, object_name_, expect, update);
230  return invoke_and_get_future<bool>(request);
231 }
232 
233 boost::future<int64_t>
235 {
236  return get_and_add(-1);
237 }
238 
239 boost::future<int64_t>
241 {
242  return add_and_get(-1);
243 }
244 
245 boost::future<int64_t>
247 {
248  auto request = atomiclong_get_encode(group_id_, object_name_);
249  return invoke_and_get_future<int64_t>(request);
250 }
251 
252 boost::future<int64_t>
254 {
255  auto request = atomiclong_getandadd_encode(group_id_, object_name_, delta);
256  return invoke_and_get_future<int64_t>(request);
257 }
258 
259 boost::future<int64_t>
260 atomic_long::get_and_set(int64_t new_value)
261 {
262  auto request =
263  atomiclong_getandset_encode(group_id_, object_name_, new_value);
264  return invoke_and_get_future<int64_t>(request);
265 }
266 
267 boost::future<int64_t>
269 {
270  return add_and_get(1);
271 }
272 
273 boost::future<int64_t>
275 {
276  return get_and_add(1);
277 }
278 
279 boost::future<void>
280 atomic_long::set(int64_t new_value)
281 {
282  return to_void_future(get_and_set(new_value));
283 }
284 
285 boost::future<int64_t>
286 atomic_long::alter_data(client::serialization::pimpl::data& function_data,
287  alter_result_type result_type)
288 {
289  auto request = atomiclong_alter_encode(group_id_,
290  object_name_,
291  function_data,
292  static_cast<int32_t>(result_type));
293  return invoke_and_get_future<int64_t>(request);
294 }
295 
296 boost::future<boost::optional<client::serialization::pimpl::data>>
297 atomic_long::apply_data(client::serialization::pimpl::data& function_data)
298 {
299  auto request =
300  atomiclong_apply_encode(group_id_, object_name_, function_data);
301  return invoke_and_get_future<
302  boost::optional<client::serialization::pimpl::data>>(request);
303 }
304 
305 atomic_reference::atomic_reference(const std::string& name,
306  client::spi::ClientContext& context,
307  const raft_group_id& group_id,
308  const std::string& object_name)
309  : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name)
310 {}
311 
312 boost::future<boost::optional<client::serialization::pimpl::data>>
313 atomic_reference::get_data()
314 {
315  auto request = atomicref_get_encode(group_id_, object_name_);
316  return invoke_and_get_future<
317  boost::optional<client::serialization::pimpl::data>>(request);
318 }
319 
320 boost::future<boost::optional<client::serialization::pimpl::data>>
321 atomic_reference::set_data(
322  const client::serialization::pimpl::data& new_value_data)
323 {
324  auto request =
325  atomicref_set_encode(group_id_, object_name_, &new_value_data, false);
326  return invoke_and_get_future<
327  boost::optional<client::serialization::pimpl::data>>(request);
328 }
329 
330 boost::future<boost::optional<client::serialization::pimpl::data>>
331 atomic_reference::get_and_set_data(
332  const client::serialization::pimpl::data& new_value_data)
333 {
334  auto request =
335  atomicref_set_encode(group_id_, object_name_, &new_value_data, true);
336  return invoke_and_get_future<
337  boost::optional<client::serialization::pimpl::data>>(request);
338 }
339 
340 boost::future<bool>
341 atomic_reference::compare_and_set_data(
342  const client::serialization::pimpl::data& expect_data,
343  const client::serialization::pimpl::data& update_data)
344 {
345  auto request = atomicref_compareandset_encode(
346  group_id_, object_name_, &expect_data, &update_data);
347  return invoke_and_get_future<bool>(request);
348 }
349 
350 boost::future<bool>
351 atomic_reference::contains_data(
352  const client::serialization::pimpl::data& value_data)
353 {
354  auto request =
355  atomicref_contains_encode(group_id_, object_name_, &value_data);
356  return invoke_and_get_future<bool>(request);
357 }
358 
359 boost::future<void>
360 atomic_reference::alter_data(
361  const client::serialization::pimpl::data& function_data)
362 {
363  return to_void_future(
364  invoke_apply(function_data, return_value_type::NO_VALUE, true));
365 }
366 
367 boost::future<boost::optional<client::serialization::pimpl::data>>
368 atomic_reference::alter_and_get_data(
369  const client::serialization::pimpl::data& function_data)
370 {
371  return invoke_apply(function_data, return_value_type::NEW, true);
372 }
373 
374 boost::future<boost::optional<client::serialization::pimpl::data>>
375 atomic_reference::get_and_alter_data(
376  const client::serialization::pimpl::data& function_data)
377 {
378  return invoke_apply(function_data, return_value_type::OLD, true);
379 }
380 
381 boost::future<boost::optional<client::serialization::pimpl::data>>
382 atomic_reference::apply_data(
383  const client::serialization::pimpl::data& function_data)
384 {
385  return invoke_apply(function_data, return_value_type::NEW, false);
386 }
387 
388 boost::future<bool>
389 atomic_reference::is_null()
390 {
391  return contains(static_cast<byte*>(nullptr));
392 }
393 
394 boost::future<void>
395 atomic_reference::clear()
396 {
397  return to_void_future(set(static_cast<byte*>(nullptr)));
398 }
399 
400 boost::future<boost::optional<client::serialization::pimpl::data>>
401 atomic_reference::invoke_apply(
402  const client::serialization::pimpl::data function_data,
403  return_value_type return_type,
404  bool alter)
405 {
406  auto request = atomicref_apply_encode(group_id_,
407  object_name_,
408  function_data,
409  static_cast<int32_t>(return_type),
410  alter);
411  return invoke_and_get_future<
412  boost::optional<client::serialization::pimpl::data>>(request);
413 }
414 
415 latch::latch(const std::string& name,
416  client::spi::ClientContext& context,
417  const raft_group_id& group_id,
418  const std::string& object_name)
419  : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name)
420 {}
421 
422 boost::future<bool>
423 latch::try_set_count(int32_t count)
424 {
425  util::Preconditions::check_positive(count, "count must be positive!");
426 
427  auto request =
428  countdownlatch_trysetcount_encode(group_id_, object_name_, count);
429  return invoke_and_get_future<bool>(request);
430 }
431 
432 boost::future<int32_t>
433 latch::get_count()
434 {
435  auto request = countdownlatch_getcount_encode(group_id_, object_name_);
436  return invoke_and_get_future<int32_t>(request);
437 }
438 
439 boost::future<void>
440 latch::count_down()
441 {
442  auto invocation_uid =
443  get_context().get_hazelcast_client_implementation()->random_uuid();
444  return get_round().then(boost::launch::sync, [=](boost::future<int32_t> f) {
445  auto round = f.get();
446  for (;;) {
447  try {
448  count_down(round, invocation_uid);
449  return;
450  } catch (client::exception::operation_timeout&) {
451  // I can retry safely because my retry would be idempotent...
452  }
453  }
454  });
455 }
456 
457 boost::future<bool>
458 latch::try_wait()
459 {
460  return get_count().then(boost::launch::sync, [](boost::future<int32_t> f) {
461  return f.get() == 0;
462  });
463 }
464 
465 boost::future<int32_t>
466 latch::get_round()
467 {
468  auto request = countdownlatch_getround_encode(group_id_, object_name_);
469  return invoke_and_get_future<int32_t>(request);
470 }
471 
472 void
473 latch::count_down(int round, boost::uuids::uuid invocation_uid)
474 {
475  auto request = countdownlatch_countdown_encode(
476  group_id_, object_name_, invocation_uid, round);
477  invoke(request).get();
478 }
479 
480 boost::future<void>
481 latch::wait()
482 {
483  return to_void_future(wait_for(std::chrono::milliseconds(INT64_MAX)));
484 }
485 
486 boost::future<std::cv_status>
487 latch::wait_for(std::chrono::milliseconds timeout)
488 {
489  auto timeout_millis = std::max<int64_t>(0, timeout.count());
490  auto invoation_uid =
491  get_context().get_hazelcast_client_implementation()->random_uuid();
492  auto request = countdownlatch_await_encode(
493  group_id_, object_name_, invoation_uid, timeout_millis);
494  return invoke_and_get_future<bool>(request).then(
495  boost::launch::sync, [](boost::future<bool> f) {
496  return f.get() ? std::cv_status::no_timeout : std::cv_status::timeout;
497  });
498 }
499 
500 bool
501 raft_group_id::operator==(const raft_group_id& rhs) const
502 {
503  return name == rhs.name && seed == rhs.seed && group_id == rhs.group_id;
504 }
505 
506 bool
507 raft_group_id::operator!=(const raft_group_id& rhs) const
508 {
509  return !(rhs == *this);
510 }
511 
512 constexpr int64_t fenced_lock::INVALID_FENCE;
513 
514 fenced_lock::fenced_lock(const std::string& name,
515  client::spi::ClientContext& context,
516  const raft_group_id& group_id,
517  const std::string& object_name)
518  : session_aware_proxy(SERVICE_NAME,
519  name,
520  &context,
521  group_id,
522  object_name,
523  context.get_proxy_session_manager())
524 {}
525 
526 boost::future<void>
527 fenced_lock::lock()
528 {
529  return to_void_future(lock_and_get_fence());
530 }
531 
532 boost::future<int64_t>
533 fenced_lock::lock_and_get_fence()
534 {
535  auto thread_id = util::get_current_thread_id();
536  auto invocation_uid = get_context().random_uuid();
537 
538  auto do_lock_once = [=]() {
539  auto session_id = session_manager_.acquire_session(group_id_);
540  verify_locked_session_id_if_present(thread_id, session_id, true);
541  return do_lock(session_id, thread_id, invocation_uid)
542  .then(boost::launch::sync, [=](boost::future<int64_t> f) {
543  try {
544  auto fence = f.get();
545  if (fence != INVALID_FENCE) {
546  locked_session_ids_.put(
547  thread_id, std::make_shared<int64_t>(session_id));
548  return fence;
549  }
550  BOOST_THROW_EXCEPTION(
551  client::exception::lock_acquire_limit_reached(
552  "fenced_lock::lock_and_get_fence",
553  (boost::format("Lock [%1%] reentrant lock limit is "
554  "already reached!") %
555  object_name_)
556  .str()));
557  } catch (client::exception::session_expired&) {
558  invalidate_session(session_id);
559  verify_no_locked_session_id_present(thread_id);
560  return INVALID_FENCE;
561  } catch (client::exception::wait_key_cancelled&) {
562  release_session(session_id);
563  BOOST_THROW_EXCEPTION(
564  client::exception::lock_acquire_limit_reached(
565  "fenced_lock::lock_and_get_fence",
566  (boost::format(
567  "Lock [%1%] not acquired because the lock call on the "
568  "CP group is cancelled, possibly because of another "
569  "indeterminate call from the same thread.") %
570  object_name_)
571  .str()));
572  } catch (...) {
573  release_session(session_id);
574  throw;
575  }
576  });
577  };
578 
579  return do_lock_once().then(
580  boost::launch::sync, [=](boost::future<int64_t> f) {
581  auto result = f.get();
582  if (result != INVALID_FENCE) {
583  return result;
584  }
585  // iterate in the user thread
586  for (result = do_lock_once().get(); result == INVALID_FENCE;) {
587  }
588  return result;
589  });
590 }
591 
592 void
593 fenced_lock::verify_locked_session_id_if_present(int64_t thread_id,
594  int64_t session_id,
595  bool should_release)
596 {
597  auto locked_session_id = locked_session_ids_.get(thread_id);
598  if (locked_session_id && *locked_session_id != session_id) {
599  locked_session_ids_.remove(thread_id);
600  if (should_release) {
601  release_session(session_id);
602  }
603 
604  throw_lock_ownership_lost(*locked_session_id);
605  }
606 }
607 
608 void
609 fenced_lock::verify_no_locked_session_id_present(int64_t thread_id)
610 {
611  auto locked_session_id = locked_session_ids_.remove(thread_id);
612  if (locked_session_id) {
613  locked_session_ids_.remove(thread_id);
614  throw_lock_ownership_lost(*locked_session_id);
615  }
616 }
617 
618 void
619 fenced_lock::throw_lock_ownership_lost(int64_t session_id) const
620 {
621  BOOST_THROW_EXCEPTION(client::exception::lock_ownership_lost(
622  "fenced_lock",
623  (boost::format("Current thread is not owner of the Lock[%1%] because its "
624  "Session[%2%] is closed by server!") %
625  get_name() % session_id)
626  .str()));
627 }
628 
629 void
630 fenced_lock::throw_illegal_monitor_state() const
631 {
632  BOOST_THROW_EXCEPTION(client::exception::illegal_monitor_state(
633  "fenced_lock",
634  (boost::format("Current thread is not owner of the Lock[%1%]") %
635  get_name())
636  .str()));
637 }
638 
639 boost::future<int64_t>
640 fenced_lock::do_lock(int64_t session_id,
641  int64_t thread_id,
642  boost::uuids::uuid invocation_uid)
643 {
644  auto request = client::protocol::codec::fencedlock_lock_encode(
645  group_id_, object_name_, session_id, thread_id, invocation_uid);
646  return invoke_and_get_future<int64_t>(request);
647 }
648 
649 boost::future<int64_t>
650 fenced_lock::do_try_lock(int64_t session_id,
651  int64_t thread_id,
652  boost::uuids::uuid invocation_uid,
653  std::chrono::milliseconds timeout)
654 {
655  auto request = client::protocol::codec::fencedlock_trylock_encode(
656  group_id_,
657  object_name_,
658  session_id,
659  thread_id,
660  invocation_uid,
661  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
662  return invoke_and_get_future<int64_t>(request);
663 }
664 
665 boost::future<bool>
666 fenced_lock::do_unlock(int64_t session_id,
667  int64_t thread_id,
668  boost::uuids::uuid invocation_uid)
669 {
670  auto request = client::protocol::codec::fencedlock_unlock_encode(
671  group_id_, object_name_, session_id, thread_id, invocation_uid);
672  return invoke_and_get_future<bool>(request);
673 }
674 
675 boost::future<fenced_lock::lock_ownership_state>
676 fenced_lock::do_get_lock_ownership_state()
677 {
678  auto request = client::protocol::codec::fencedlock_getlockownership_encode(
679  group_id_, object_name_);
680  return invoke(request).then(
681  boost::launch::sync,
682  [](boost::future<client::protocol::ClientMessage> f) {
683  auto msg = f.get();
684  fenced_lock::lock_ownership_state state;
685  state.fence = msg.get_first_fixed_sized_field<int64_t>();
686  state.lock_count = msg.get<int32_t>();
687  state.session_id = msg.get<int64_t>();
688  state.thread_id = msg.get<int64_t>();
689  return state;
690  });
691 }
692 
693 void
694 fenced_lock::invalidate_session(int64_t session_id)
695 {
696  session_manager_.invalidate_session(group_id_, session_id);
697 }
698 
699 boost::future<bool>
700 fenced_lock::try_lock()
701 {
702  return try_lock_and_get_fence().then(
703  boost::launch::sync,
704  [](boost::future<int64_t> f) { return f.get() != INVALID_FENCE; });
705 }
706 
707 boost::future<bool>
708 fenced_lock::try_lock(std::chrono::milliseconds timeout)
709 {
710  return try_lock_and_get_fence(timeout).then(
711  boost::launch::sync,
712  [](boost::future<int64_t> f) { return f.get() != INVALID_FENCE; });
713 }
714 
715 boost::future<int64_t>
716 fenced_lock::try_lock_and_get_fence()
717 {
718  return try_lock_and_get_fence(std::chrono::milliseconds(0));
719 }
720 
721 boost::future<int64_t>
722 fenced_lock::try_lock_and_get_fence(std::chrono::milliseconds timeout)
723 {
724  auto thread_id = util::get_current_thread_id();
725  auto invocation_uid = get_context().random_uuid();
726 
727  auto do_try_lock_once = [=]() {
728  using namespace std::chrono;
729  auto start = steady_clock::now();
730  auto session_id = session_manager_.acquire_session(group_id_);
731  verify_locked_session_id_if_present(thread_id, session_id, true);
732  return do_try_lock(session_id, thread_id, invocation_uid, timeout)
733  .then(boost::launch::sync, [=](boost::future<int64_t> f) {
734  try {
735  auto fence = f.get();
736  if (fence != INVALID_FENCE) {
737  locked_session_ids_.put(
738  thread_id, std::make_shared<int64_t>(session_id));
739  return std::make_pair(fence, false);
740  } else {
741  release_session(session_id);
742  }
743  return std::make_pair(fence, false);
744  } catch (client::exception::session_expired&) {
745  invalidate_session(session_id);
746  verify_no_locked_session_id_present(thread_id);
747  auto timeout_left = timeout - (steady_clock::now() - start);
748  if (timeout_left.count() <= 0) {
749  return std::make_pair(INVALID_FENCE, false);
750  }
751  return std::make_pair(INVALID_FENCE, false);
752  } catch (client::exception::wait_key_cancelled&) {
753  release_session(session_id);
754  return std::make_pair(INVALID_FENCE, false);
755  } catch (...) {
756  release_session(session_id);
757  throw;
758  }
759  });
760  };
761 
762  return do_try_lock_once().then(
763  boost::launch::sync, [=](boost::future<std::pair<int64_t, bool>> f) {
764  auto result = f.get();
765  if (!result.second) {
766  return result.first;
767  }
768  // iterate in the user thread
769  for (result = do_try_lock_once().get(); result.second;) {
770  }
771  return result.first;
772  });
773 }
774 
775 boost::future<void>
776 fenced_lock::unlock()
777 {
778  auto thread_id = util::get_current_thread_id();
779  int64_t session_id = session_manager_.get_session(group_id_);
780 
781  // the order of the following checks is important.
782  verify_locked_session_id_if_present(thread_id, session_id, false);
783  if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
784  locked_session_ids_.remove(thread_id);
785  throw_illegal_monitor_state();
786  }
787 
788  return do_unlock(session_id, thread_id, get_context().random_uuid())
789  .then(boost::launch::sync, [=](boost::future<bool> f) {
790  try {
791  auto still_locked_by_current_thread = f.get();
792  if (still_locked_by_current_thread) {
793  locked_session_ids_.put(
794  thread_id, std::make_shared<int64_t>(session_id));
795  } else {
796  locked_session_ids_.remove(thread_id);
797  }
798 
799  release_session(session_id);
800  } catch (client::exception::session_expired&) {
801  invalidate_session(session_id);
802  locked_session_ids_.remove(thread_id);
803 
804  throw_lock_ownership_lost(session_id);
805  } catch (client::exception::illegal_monitor_state&) {
806  locked_session_ids_.remove(thread_id);
807  throw;
808  }
809  });
810 }
811 
812 boost::future<int64_t>
813 fenced_lock::get_fence()
814 {
815  auto thread_id = util::get_current_thread_id();
816  int64_t session_id = session_manager_.get_session(group_id_);
817 
818  // the order of the following checks is important.
819  verify_locked_session_id_if_present(thread_id, session_id, false);
820  if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
821  locked_session_ids_.remove(thread_id);
822  throw_illegal_monitor_state();
823  }
824 
825  return do_get_lock_ownership_state().then(
826  boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
827  auto ownership = f.get();
828  if (ownership.is_locked_by(session_id, thread_id)) {
829  locked_session_ids_.put(thread_id,
830  std::make_shared<int64_t>(session_id));
831  return ownership.fence;
832  }
833 
834  verify_no_locked_session_id_present(thread_id);
835  throw_illegal_monitor_state();
836  return INVALID_FENCE;
837  });
838 }
839 
840 boost::future<bool>
841 fenced_lock::is_locked()
842 {
843  auto thread_id = util::get_current_thread_id();
844  int64_t session_id = session_manager_.get_session(group_id_);
845 
846  verify_locked_session_id_if_present(thread_id, session_id, false);
847 
848  return do_get_lock_ownership_state().then(
849  boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
850  auto ownership = f.get();
851  if (ownership.is_locked_by(session_id, thread_id)) {
852  locked_session_ids_.put(thread_id,
853  std::make_shared<int64_t>(session_id));
854  return true;
855  }
856 
857  verify_no_locked_session_id_present(thread_id);
858  return ownership.is_locked();
859  });
860 }
861 
862 boost::future<bool>
863 fenced_lock::is_locked_by_current_thread()
864 {
865  auto thread_id = util::get_current_thread_id();
866  int64_t session_id = session_manager_.get_session(group_id_);
867 
868  verify_locked_session_id_if_present(thread_id, session_id, false);
869 
870  return do_get_lock_ownership_state().then(
871  boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
872  auto ownership = f.get();
873  auto locked_by_current_thread =
874  ownership.is_locked_by(session_id, thread_id);
875  if (locked_by_current_thread) {
876  locked_session_ids_.put(thread_id,
877  std::make_shared<int64_t>(session_id));
878  } else {
879  verify_no_locked_session_id_present(thread_id);
880  }
881 
882  return locked_by_current_thread;
883  });
884 }
885 
886 boost::future<int32_t>
887 fenced_lock::get_lock_count()
888 {
889  auto thread_id = util::get_current_thread_id();
890  int64_t session_id = session_manager_.get_session(group_id_);
891 
892  verify_locked_session_id_if_present(thread_id, session_id, false);
893 
894  return do_get_lock_ownership_state().then(
895  boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
896  auto ownership = f.get();
897  if (ownership.is_locked_by(session_id, thread_id)) {
898  locked_session_ids_.put(thread_id,
899  std::make_shared<int64_t>(session_id));
900  } else {
901  verify_no_locked_session_id_present(thread_id);
902  }
903 
904  return ownership.lock_count;
905  });
906 }
907 
908 const raft_group_id&
909 fenced_lock::get_group_id()
910 {
911  return group_id_;
912 }
913 
914 bool
915 operator==(const fenced_lock& lhs, const fenced_lock& rhs)
916 {
917  return lhs.get_service_name() == rhs.get_service_name() &&
918  lhs.get_name() == rhs.get_name();
919 }
920 
921 void
922 fenced_lock::post_destroy()
923 {
924  ClientProxy::post_destroy();
925  locked_session_ids_.clear();
926 }
927 
928 session_aware_proxy::session_aware_proxy(
929  const std::string& service_name,
930  const std::string& proxy_name,
931  client::spi::ClientContext* context,
932  const raft_group_id& group_id,
933  const std::string& object_name,
934  internal::session::proxy_session_manager& session_manager)
935  : cp_proxy(service_name, proxy_name, context, group_id, object_name)
936  , session_manager_(session_manager)
937 {}
938 
939 void
941 {
942  session_manager_.release_session(group_id_, session_id);
943 }
944 
945 bool
946 fenced_lock::lock_ownership_state::is_locked_by(int64_t session, int64_t thread)
947 {
948  return is_locked() && session_id == session && thread_id == thread;
949 }
950 
951 bool
952 fenced_lock::lock_ownership_state::is_locked()
953 {
954  return fence != INVALID_FENCE;
955 }
956 
957 counting_semaphore::counting_semaphore(
958  const std::string& proxy_name,
959  client::spi::ClientContext* context,
960  const raft_group_id& group_id,
961  const std::string& object_name,
962  internal::session::proxy_session_manager& session_manager)
963  : session_aware_proxy(SERVICE_NAME,
964  proxy_name,
965  context,
966  group_id,
967  object_name,
968  session_manager)
969 {}
970 
971 boost::future<bool>
972 counting_semaphore::init(int32_t permits)
973 {
974  util::Preconditions::check_not_negative(permits,
975  "Permits must be non-negative!");
976 
977  auto request = client::protocol::codec::semaphore_init_encode(
978  group_id_, object_name_, permits);
979  return client::spi::impl::ClientInvocation::create(
980  context_, request, object_name_)
981  ->invoke()
982  .then(boost::launch::sync,
983  [](boost::future<client::protocol::ClientMessage> f) {
984  return f.get().get_first_fixed_sized_field<bool>();
985  });
986 }
987 
988 boost::future<bool>
989 counting_semaphore::try_acquire(int32_t permits)
990 {
991  return try_acquire_for(std::chrono::milliseconds::zero(), permits);
992 }
993 
994 boost::future<bool>
995 counting_semaphore::try_acquire_for(std::chrono::milliseconds rel_time,
996  int32_t permits)
997 {
998  if (rel_time < std::chrono::milliseconds::zero()) {
999  rel_time = std::chrono::milliseconds ::zero();
1000  }
1001  return try_acquire_for_millis(permits, rel_time);
1002 }
1003 
1004 boost::future<void>
1005 counting_semaphore::do_release(int32_t permits,
1006  int64_t thread_id,
1007  int64_t session_id)
1008 {
1009  auto invocation_uid =
1010  get_context().get_hazelcast_client_implementation()->random_uuid();
1011  auto request = codec::semaphore_release_encode(
1012  group_id_, object_name_, session_id, thread_id, invocation_uid, permits);
1013  return to_void_future(client::spi::impl::ClientInvocation::create(
1014  context_, request, object_name_)
1015  ->invoke());
1016 }
1017 
1018 boost::future<int32_t>
1019 counting_semaphore::available_permits()
1020 {
1021  auto request =
1022  codec::semaphore_availablepermits_encode(group_id_, object_name_);
1023  return decode<int32_t>(client::spi::impl::ClientInvocation::create(
1024  context_, request, object_name_)
1025  ->invoke());
1026 }
1027 
1028 boost::future<void>
1029 counting_semaphore::reduce_permits(int32_t reduction)
1030 {
1031  util::Preconditions::check_not_negative(reduction,
1032  "Reduction must be non-negative!");
1033  if (reduction == 0) {
1034  return boost::make_ready_future();
1035  }
1036  return do_change_permits(-reduction);
1037 }
1038 
1039 boost::future<void>
1040 counting_semaphore::increase_permits(int32_t increase)
1041 {
1042  util::Preconditions::check_not_negative(increase,
1043  "Increase must be non-negative!");
1044  if (increase == 0) {
1045  return boost::make_ready_future();
1046  }
1047  return do_change_permits(increase);
1048 }
1049 
1050 sessionless_semaphore::sessionless_semaphore(
1051  const std::string& proxy_name,
1052  client::spi::ClientContext* context,
1053  const raft_group_id& group_id,
1054  const std::string& object_name,
1055  internal::session::proxy_session_manager& session_manager)
1056  : counting_semaphore(proxy_name,
1057  context,
1058  group_id,
1059  object_name,
1060  session_manager)
1061 {}
1062 
1063 boost::future<void>
1064 sessionless_semaphore::acquire(int32_t permits)
1065 {
1066  util::Preconditions::check_positive(permits,
1067  "permits must be positive number.");
1068 
1069  return to_void_future(
1070  do_try_acquire(permits, std::chrono::milliseconds(-1)));
1071 }
1072 
1073 boost::future<bool>
1074 sessionless_semaphore::do_try_acquire(int32_t permits,
1075  std::chrono::milliseconds timeout_ms)
1076 {
1077  auto cluster_wide_threadId = get_thread_id();
1078  auto invocation_uid =
1079  get_context().get_hazelcast_client_implementation()->random_uuid();
1080  auto request = client::protocol::codec::semaphore_acquire_encode(
1081  group_id_,
1082  object_name_,
1083  internal::session::proxy_session_manager::NO_SESSION_ID,
1084  cluster_wide_threadId,
1085  invocation_uid,
1086  permits,
1087  timeout_ms.count());
1088  return client::spi::impl::ClientInvocation::create(
1089  context_, request, object_name_)
1090  ->invoke()
1091  .then(
1092  boost::launch::sync,
1093  [=](boost::future<client::protocol::ClientMessage> f) {
1094  try {
1095  return f.get().get_first_fixed_sized_field<bool>();
1096  } catch (client::exception::wait_key_cancelled&) {
1097  throw client::exception::illegal_state(
1098  "sessionless_semaphore::acquire",
1099  (boost::format(
1100  "Semaphore[%1%] ] not acquired because the acquire call "
1101  "on the CP group is cancelled, possibly because of "
1102  "another indeterminate call from the same thread.") %
1103  object_name_)
1104  .str());
1105  }
1106  });
1107 }
1108 
1109 boost::future<bool>
1110 sessionless_semaphore::try_acquire_for_millis(int32_t permits,
1111  std::chrono::milliseconds timeout)
1112 {
1113  util::Preconditions::check_positive(permits, "Permits must be positive!");
1114 
1115  return do_try_acquire(permits,
1116  timeout > std::chrono::milliseconds::zero()
1117  ? timeout
1118  : std::chrono::milliseconds::zero());
1119 }
1120 
1121 boost::future<void>
1122 sessionless_semaphore::release(int32_t permits)
1123 {
1124  util::Preconditions::check_positive(permits, "Permits must be positive!");
1125  auto thread_id = get_thread_id();
1126  return do_release(permits,
1127  thread_id,
1128  internal::session::proxy_session_manager::NO_SESSION_ID);
1129 }
1130 
1131 int64_t
1132 sessionless_semaphore::get_thread_id()
1133 {
1134  return session_manager_.get_or_create_unique_thread_id(group_id_);
1135 }
1136 
1137 boost::future<int32_t>
1138 sessionless_semaphore::drain_permits()
1139 {
1140  auto cluster_wide_threadId = get_thread_id();
1141  auto invocation_uid =
1142  get_context().get_hazelcast_client_implementation()->random_uuid();
1143  auto request = client::protocol::codec::semaphore_drain_encode(
1144  group_id_,
1145  object_name_,
1146  internal::session::proxy_session_manager::NO_SESSION_ID,
1147  cluster_wide_threadId,
1148  invocation_uid);
1149  return decode<int32_t>(client::spi::impl::ClientInvocation::create(
1150  context_, request, object_name_)
1151  ->invoke());
1152 }
1153 
1154 boost::future<void>
1155 sessionless_semaphore::do_change_permits(int32_t delta)
1156 {
1157  auto cluster_wide_threadId = get_thread_id();
1158  auto invocation_uid =
1159  get_context().get_hazelcast_client_implementation()->random_uuid();
1160  auto request = client::protocol::codec::semaphore_change_encode(
1161  group_id_,
1162  object_name_,
1163  internal::session::proxy_session_manager::NO_SESSION_ID,
1164  cluster_wide_threadId,
1165  invocation_uid,
1166  delta);
1167  return to_void_future(client::spi::impl::ClientInvocation::create(
1168  context_, request, object_name_)
1169  ->invoke());
1170 }
1171 
1172 session_semaphore::session_semaphore(
1173  const std::string& proxy_name,
1174  client::spi::ClientContext* context,
1175  const raft_group_id& group_id,
1176  const std::string& object_name,
1177  internal::session::proxy_session_manager& session_manager)
1178  : counting_semaphore(proxy_name,
1179  context,
1180  group_id,
1181  object_name,
1182  session_manager)
1183 {}
1184 
1185 boost::future<void>
1186 session_semaphore::acquire(int32_t permits)
1187 {
1188  return to_void_future(
1189  try_acquire_for_millis(permits, std::chrono::milliseconds(-1)));
1190 }
1191 
1192 boost::future<bool>
1193 session_semaphore::try_acquire_for_millis(int32_t permits,
1194  std::chrono::milliseconds timeout)
1195 {
1196  util::Preconditions::check_not_negative(
1197  permits, "permits must not be negative number.");
1198 
1199  auto thread_id = get_thread_id();
1200  auto invocation_uid =
1201  get_context().get_hazelcast_client_implementation()->random_uuid();
1202 
1203  auto do_try_acquire_once = ([=]() {
1204  auto start = std::chrono::steady_clock::now();
1205  auto use_timeout = timeout >= std::chrono::milliseconds::zero();
1206  auto session_id = session_manager_.acquire_session(group_id_, permits);
1207  auto request =
1208  client::protocol::codec::semaphore_acquire_encode(group_id_,
1209  object_name_,
1210  session_id,
1211  thread_id,
1212  invocation_uid,
1213  permits,
1214  timeout.count());
1215  return client::spi::impl::ClientInvocation::create(
1216  context_, request, object_name_)
1217  ->invoke()
1218  .then(
1219  boost::launch::sync,
1220  [=](boost::future<client::protocol::ClientMessage> f) {
1221  try {
1222  auto acquired = f.get().get_first_fixed_sized_field<bool>();
1223  if (!acquired) {
1224  session_manager_.release_session(group_id_, session_id);
1225  }
1226  // first bool means acquired or not, second bool means if
1227  // should try again
1228  return std::make_pair(acquired, false);
1229  } catch (client::exception::session_expired&) {
1230  session_manager_.invalidate_session(group_id_, session_id);
1231  if (use_timeout &&
1232  (timeout - (std::chrono::steady_clock::now() - start) <=
1233  std::chrono::milliseconds::zero())) {
1234  return std::make_pair(false, false);
1235  }
1236  return std::make_pair(false, true);
1237  } catch (client::exception::wait_key_cancelled&) {
1238  session_manager_.release_session(
1239  group_id_, session_id, permits);
1240  if (!use_timeout) {
1241  BOOST_THROW_EXCEPTION(client::exception::illegal_state(
1242  "session_semaphore::try_acquire_for_millis",
1243  (boost::format(
1244  "Semaphore[%1%] not acquired because the acquire "
1245  "call on the CP group is cancelled, possibly "
1246  "because of another indeterminate call from the "
1247  "same thread.") %
1248  object_name_)
1249  .str()));
1250  }
1251  return std::make_pair(false, false);
1252  }
1253  });
1254  });
1255 
1256  return do_try_acquire_once().then(
1257  boost::launch::sync, [=](boost::future<std::pair<bool, bool>> f) {
1258  auto result = f.get();
1259  if (!result.second) {
1260  return result.first;
1261  }
1262  for (; result.second; result = do_try_acquire_once().get())
1263  ;
1264  return result.first;
1265  });
1266 }
1267 
1268 boost::future<void>
1269 session_semaphore::release(int32_t permits)
1270 {
1271  util::Preconditions::check_positive(permits, "Permits must be positive!");
1272  auto session_id = session_manager_.get_session(group_id_);
1273  if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
1274  throw_illegal_state_exception(nullptr);
1275  }
1276 
1277  auto thread_id = get_thread_id();
1278  return do_release(permits, thread_id, session_id)
1279  .then([=](boost::future<void> f) {
1280  try {
1281  f.get();
1282  session_manager_.release_session(group_id_, session_id, permits);
1283  } catch (client::exception::session_expired&) {
1284  session_manager_.invalidate_session(group_id_, session_id);
1285  session_manager_.release_session(group_id_, session_id, permits);
1286  throw_illegal_state_exception(std::current_exception());
1287  }
1288  });
1289 }
1290 
1291 void
1292 session_semaphore::throw_illegal_state_exception(std::exception_ptr e)
1293 {
1294  auto ise = boost::enable_current_exception(client::exception::illegal_state(
1295  "session_semaphore::illegal_state", "No valid session!"));
1296  if (!e) {
1297  throw ise;
1298  }
1299  try {
1300  std::rethrow_exception(e);
1301  } catch (...) {
1302  std::throw_with_nested(ise);
1303  }
1304 }
1305 
1306 int64_t
1307 session_semaphore::get_thread_id()
1308 {
1309  return util::get_current_thread_id();
1310 }
1311 
1312 boost::future<int32_t>
1313 session_semaphore::drain_permits()
1314 {
1315  auto thread_id = get_thread_id();
1316  auto invocation_uid =
1317  get_context().get_hazelcast_client_implementation()->random_uuid();
1318 
1319  auto do_drain_once = ([=]() {
1320  auto session_id =
1321  session_manager_.acquire_session(group_id_, DRAIN_SESSION_ACQ_COUNT);
1322  auto request = client::protocol::codec::semaphore_drain_encode(
1323  group_id_, object_name_, session_id, thread_id, invocation_uid);
1324  return client::spi::impl::ClientInvocation::create(
1325  context_, request, object_name_)
1326  ->invoke()
1327  .then(
1328  boost::launch::sync,
1329  [=](boost::future<client::protocol::ClientMessage> f) {
1330  try {
1331  auto count = f.get().get_first_fixed_sized_field<int32_t>();
1332  session_manager_.release_session(
1333  group_id_, session_id, DRAIN_SESSION_ACQ_COUNT - count);
1334  return count;
1335  } catch (client::exception::session_expired&) {
1336  session_manager_.invalidate_session(group_id_, session_id);
1337  return -1;
1338  }
1339  });
1340  });
1341 
1342  return do_drain_once().then(
1343  boost::launch::sync, [=](boost::future<int32_t> f) {
1344  int32_t count = f.get();
1345  if (count != -1) {
1346  return count;
1347  }
1348  while ((count = do_drain_once().get()) == -1) {
1349  }
1350  return count;
1351  });
1352 }
1353 
1354 boost::future<void>
1355 session_semaphore::do_change_permits(int32_t delta)
1356 {
1357  auto session_id =
1358  session_manager_.acquire_session(group_id_, DRAIN_SESSION_ACQ_COUNT);
1359  auto thread_id = get_thread_id();
1360  auto invocation_uid =
1361  get_context().get_hazelcast_client_implementation()->random_uuid();
1362 
1363  auto request = client::protocol::codec::semaphore_change_encode(
1364  group_id_, object_name_, session_id, thread_id, invocation_uid, delta);
1365  return client::spi::impl::ClientInvocation::create(
1366  context_, request, object_name_)
1367  ->invoke()
1368  .then(boost::launch::sync,
1369  [=](boost::future<client::protocol::ClientMessage> f) {
1370  try {
1371  f.get();
1372  session_manager_.release_session(group_id_, session_id);
1373  } catch (client::exception::session_expired&) {
1374  session_manager_.invalidate_session(group_id_, session_id);
1375  throw_illegal_state_exception(std::current_exception());
1376  }
1377  });
1378 }
1379 } // namespace cp
1380 } // namespace hazelcast
1381 
1382 namespace std {
1383 std::size_t
1384 hash<hazelcast::cp::raft_group_id>::operator()(
1385  const hazelcast::cp::raft_group_id& group_id) const noexcept
1386 {
1387  std::size_t seed = 0;
1388  boost::hash_combine(seed, group_id.name);
1389  boost::hash_combine(seed, group_id.seed);
1390  boost::hash_combine(seed, group_id.group_id);
1391  return seed;
1392 }
1393 } // namespace std
1394 
1395 namespace boost {
1396 std::size_t
1397 hash<hazelcast::cp::raft_group_id>::operator()(
1398  const hazelcast::cp::raft_group_id& group_id) const noexcept
1399 {
1400  return std::hash<hazelcast::cp::raft_group_id>()(group_id);
1401 }
1402 } // namespace boost
Client-side Raft-based proxy implementation of atomic long.
Definition: cp.h:100
boost::future< int64_t > get_and_add(int64_t delta)
Atomically adds the given value to the current value.
Definition: cp.cpp:253
boost::future< int64_t > get()
Gets the current value.
Definition: cp.cpp:246
boost::future< int64_t > decrement_and_get()
Atomically decrements the current value by one.
Definition: cp.cpp:240
boost::future< int64_t > get_and_increment()
Atomically increments the current value by one.
Definition: cp.cpp:274
boost::future< bool > compare_and_set(int64_t expect, int64_t update)
Atomically sets the value to the given updated value only if the current value.
Definition: cp.cpp:226
boost::future< int64_t > get_and_set(int64_t new_value)
Atomically sets the given value and returns the old value.
Definition: cp.cpp:260
boost::future< void > set(int64_t new_value)
Atomically sets the given value.
Definition: cp.cpp:280
boost::future< int64_t > increment_and_get()
Atomically increments the current value by one.
Definition: cp.cpp:268
boost::future< int64_t > get_and_decrement()
Atomically decrements the current value by one.
Definition: cp.cpp:234
boost::future< int64_t > add_and_get(int64_t delta)
Atomically adds the given value to the current value.
Definition: cp.cpp:219
void release_session(int64_t session_id)
Decrements acquire count of the session.
Definition: cp.cpp:940
static constexpr int32_t DRAIN_SESSION_ACQ_COUNT
Since a proxy does not know how many permits will be drained on the Raft group, it uses this constant...
Definition: cp.h:1300