Hazelcast C++ Client
Hazelcast C++ Client Library
cp.cpp
1 /*
2  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include <boost/algorithm/string.hpp>
18 
19 #include "hazelcast/cp/cp.h"
20 #include "hazelcast/client/spi/ClientContext.h"
21 #include "hazelcast/util/Preconditions.h"
22 #include "hazelcast/client/protocol/codec/codecs.h"
23 #include "hazelcast/client/protocol/ClientMessage.h"
24 #include "hazelcast/client/spi/impl/ClientInvocation.h"
25 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
26 
27 namespace hazelcast {
28  namespace cp {
29  using namespace hazelcast::client::protocol;
30  using namespace hazelcast::client::protocol::codec;
31  using namespace hazelcast::util;
32 
33  raft_proxy_factory::raft_proxy_factory(client::spi::ClientContext &context) : context_(context) {}
34 
35  std::shared_ptr<fenced_lock>
36  raft_proxy_factory::create_fenced_lock(raft_group_id &&group_id, const std::string &proxy_name,
37  const std::string &object_name) {
38  while (true) {
39  auto proxy = lock_proxies_.get(proxy_name);
40  if (proxy) {
41  if (proxy->get_group_id() != group_id) {
42  lock_proxies_.remove(proxy_name, proxy);
43  } else {
44  return proxy;
45  }
46  }
47 
48  proxy = std::make_shared<fenced_lock>(proxy_name, context_, group_id, object_name);
49  auto existing = lock_proxies_.put_if_absent(proxy_name, proxy);
50  if (!existing) {
51  return proxy;
52  } else if (existing->get_group_id() == group_id) {
53  return existing;
54  }
55 
56  group_id = get_group_id(proxy_name, object_name).get();
57  }
58  }
59 
60  std::shared_ptr<counting_semaphore>
61  raft_proxy_factory::create_semaphore(raft_group_id &&group_id, const std::string &proxy_name,
62  const std::string &object_name) {
63  auto request = client::protocol::codec::semaphore_getsemaphoretype_encode(proxy_name);
64  auto is_sessionless = client::spi::impl::ClientInvocation::create(context_, request,
65  object_name)->invoke().get().get_first_fixed_sized_field<bool>();
66  if (is_sessionless) {
67  return std::shared_ptr<counting_semaphore>(
68  new sessionless_semaphore(proxy_name, &context_, group_id, object_name,
69  context_.get_proxy_session_manager()));
70  } else {
71  return std::shared_ptr<counting_semaphore>(
72  new session_semaphore(proxy_name, &context_, group_id, object_name,
73  context_.get_proxy_session_manager()));
74  }
75  }
76 
77  std::string raft_proxy_factory::without_default_group_name(const std::string &n) {
78  std::string name = n;
79  boost::trim(name);
80  auto index = name.find('@');
81  if (index == std::string::npos) {
82  return name;
83  }
84 
85  Preconditions::check_true(name.find('@', index + 1) == std::string::npos,
86  "Custom group name must be specified at most once");
87 
88  auto group_name = name.substr(index + 1);
89  boost::trim(group_name);
90  if (group_name == DEFAULT_GROUP_NAME) {
91  return name.substr(0, index);
92  }
93  return name;
94  }
95 
96  std::string raft_proxy_factory::object_name_for_proxy(const std::string &name) {
97  auto index = name.find('@');
98  if (index == std::string::npos) {
99  return name;
100  }
101 
102  Preconditions::check_true(index < (name.size() - 1),
103  "Object name cannot be empty string");
104  Preconditions::check_true(name.find('@', index + 1) == std::string::npos,
105  "Custom CP group name must be specified at most once");
106 
107  auto object_name = name.substr(0, index);
108  boost::trim(object_name);
109  Preconditions::check_true(object_name.size() > 0,
110  "Object name cannot be empty string");
111  return object_name;
112  }
113 
114  boost::future<raft_group_id> raft_proxy_factory::get_group_id(const std::string &proxy_name, const std::string &object_name) {
115  auto request = cpgroup_createcpgroup_encode(proxy_name);
116  return client::spi::impl::ClientInvocation::create(context_, request, object_name)->invoke().then(
117  boost::launch::sync, [](boost::future<client::protocol::ClientMessage> f) {
118  return *f.get().get_first_var_sized_field<raft_group_id>();
119  });
120  }
121 
122  cp_subsystem::cp_subsystem(client::spi::ClientContext &context) : context_(context), proxy_factory_(context) {
123  }
124 
125  boost::future<std::shared_ptr<atomic_long>> cp_subsystem::get_atomic_long(const std::string &name) {
126  return proxy_factory_.create_proxy<atomic_long>(name);
127  }
128 
129  boost::future<std::shared_ptr<atomic_reference>> cp_subsystem::get_atomic_reference(const std::string &name) {
130  return proxy_factory_.create_proxy<atomic_reference>(name);
131  }
132 
133  boost::future<std::shared_ptr<latch>> cp_subsystem::get_latch(const std::string &name) {
134  return proxy_factory_.create_proxy<latch>(name);
135  }
136 
137  boost::future<std::shared_ptr<fenced_lock>> cp_subsystem::get_lock(const std::string &name) {
138  return proxy_factory_.create_proxy<fenced_lock>(name);
139  }
140 
141  boost::future<std::shared_ptr<counting_semaphore>> cp_subsystem::get_semaphore(const std::string &name) {
142  return proxy_factory_.create_proxy<counting_semaphore>(name);
143  }
144 
145  cp_proxy::cp_proxy(const std::string &service_name, const std::string &proxy_name,
146  client::spi::ClientContext *context,
147  const raft_group_id &group_id, const std::string &object_name) : ProxyImpl(service_name,
148  proxy_name,
149  context),
150  group_id_(group_id),
151  object_name_(object_name) {}
152 
153  void cp_proxy::on_destroy() {
154  auto request = cpgroup_destroycpobject_encode(group_id_, get_service_name(), object_name_);
155  invoke(request).get();
156  }
157 
158  const raft_group_id &cp_proxy::get_group_id() const {
159  return group_id_;
160  }
161 
162  atomic_long::atomic_long(const std::string &name, client::spi::ClientContext &context,
163  const raft_group_id &group_id, const std::string &object_name)
164  : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name) {}
165 
166  boost::future<int64_t> atomic_long::add_and_get(int64_t delta) {
167  auto request = atomiclong_addandget_encode(group_id_, object_name_, delta);
168  return invoke_and_get_future<int64_t>(request);
169  }
170 
171  boost::future<bool> atomic_long::compare_and_set(int64_t expect, int64_t update) {
172  auto request = atomiclong_compareandset_encode(group_id_, object_name_, expect, update);
173  return invoke_and_get_future<bool>(request);
174  }
175 
176  boost::future<int64_t> atomic_long::get_and_decrement() {
177  return get_and_add(-1);
178  }
179 
180  boost::future<int64_t> atomic_long::decrement_and_get() {
181  return add_and_get(-1);
182  }
183 
184  boost::future<int64_t> atomic_long::get() {
185  auto request = atomiclong_get_encode(group_id_, object_name_);
186  return invoke_and_get_future<int64_t>(request);
187  }
188 
189  boost::future<int64_t> atomic_long::get_and_add(int64_t delta) {
190  auto request = atomiclong_getandadd_encode(group_id_, object_name_, delta);
191  return invoke_and_get_future<int64_t>(request);
192  }
193 
194  boost::future<int64_t> atomic_long::get_and_set(int64_t new_value) {
195  auto request = atomiclong_getandset_encode(group_id_, object_name_, new_value);
196  return invoke_and_get_future<int64_t>(request);
197  }
198 
199  boost::future<int64_t> atomic_long::increment_and_get() {
200  return add_and_get(1);
201  }
202 
203  boost::future<int64_t> atomic_long::get_and_increment() {
204  return get_and_add(1);
205  }
206 
207  boost::future<void> atomic_long::set(int64_t new_value) {
208  return to_void_future(get_and_set(new_value));
209  }
210 
211  boost::future<int64_t> atomic_long::alter_data(client::serialization::pimpl::data &function_data,
212  alter_result_type result_type) {
213  auto request = atomiclong_alter_encode(group_id_, object_name_, function_data,
214  static_cast<int32_t>(result_type));
215  return invoke_and_get_future<int64_t>(request);
216  }
217 
218  boost::future<boost::optional<client::serialization::pimpl::data>>
219  atomic_long::apply_data(client::serialization::pimpl::data &function_data) {
220  auto request = atomiclong_apply_encode(group_id_, object_name_, function_data);
221  return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
222  }
223 
224  atomic_reference::atomic_reference(const std::string &name, client::spi::ClientContext &context,
225  const raft_group_id &group_id, const std::string &object_name)
226  : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name) {}
227 
228  boost::future<boost::optional<client::serialization::pimpl::data>> atomic_reference::get_data() {
229  auto request = atomicref_get_encode(group_id_, object_name_);
230  return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
231  }
232 
233  boost::future<boost::optional<client::serialization::pimpl::data>>
234  atomic_reference::set_data(const client::serialization::pimpl::data &new_value_data) {
235  auto request = atomicref_set_encode(group_id_, object_name_, &new_value_data, false);
236  return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
237  }
238 
239  boost::future<boost::optional<client::serialization::pimpl::data>>
240  atomic_reference::get_and_set_data(const client::serialization::pimpl::data &new_value_data) {
241  auto request = atomicref_set_encode(group_id_, object_name_, &new_value_data, true);
242  return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
243  }
244 
245  boost::future<bool>
246  atomic_reference::compare_and_set_data(const client::serialization::pimpl::data &expect_data,
247  const client::serialization::pimpl::data &update_data) {
248  auto request = atomicref_compareandset_encode(group_id_, object_name_, &expect_data, &update_data);
249  return invoke_and_get_future<bool>(request);
250  }
251 
252  boost::future<bool> atomic_reference::contains_data(const client::serialization::pimpl::data &value_data) {
253  auto request = atomicref_contains_encode(group_id_, object_name_, &value_data);
254  return invoke_and_get_future<bool>(request);
255  }
256 
257  boost::future<void> atomic_reference::alter_data(const client::serialization::pimpl::data &function_data) {
258  return to_void_future(invoke_apply(function_data, return_value_type::NO_VALUE, true));
259  }
260 
261  boost::future<boost::optional<client::serialization::pimpl::data>>
262  atomic_reference::alter_and_get_data(const client::serialization::pimpl::data &function_data) {
263  return invoke_apply(function_data, return_value_type::NEW, true);
264  }
265 
266  boost::future<boost::optional<client::serialization::pimpl::data>>
267  atomic_reference::get_and_alter_data(const client::serialization::pimpl::data &function_data) {
268  return invoke_apply(function_data, return_value_type::OLD, true);
269  }
270 
271  boost::future<boost::optional<client::serialization::pimpl::data>>
272  atomic_reference::apply_data(const client::serialization::pimpl::data &function_data) {
273  return invoke_apply(function_data, return_value_type::NEW, false);
274  }
275 
276  boost::future<bool> atomic_reference::is_null() {
277  return contains(static_cast<byte *>(nullptr));
278  }
279 
280  boost::future<void> atomic_reference::clear() {
281  return to_void_future(set(static_cast<byte *>(nullptr)));
282  }
283 
284  boost::future<boost::optional<client::serialization::pimpl::data>>
285  atomic_reference::invoke_apply(const client::serialization::pimpl::data function_data,
286  return_value_type return_type, bool alter) {
287  auto request = atomicref_apply_encode(group_id_, object_name_, function_data,
288  static_cast<int32_t>(return_type), alter);
289  return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
290  }
291 
292  latch::latch(const std::string &name, client::spi::ClientContext &context, const raft_group_id &group_id,
293  const std::string &object_name) : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name) {}
294 
295  boost::future<bool> latch::try_set_count(int32_t count) {
296  util::Preconditions::check_positive(count, "count must be positive!");
297 
298  auto request = countdownlatch_trysetcount_encode(group_id_, object_name_, count);
299  return invoke_and_get_future<bool>(request);
300  }
301 
302  boost::future<int32_t> latch::get_count() {
303  auto request = countdownlatch_getcount_encode(group_id_, object_name_);
304  return invoke_and_get_future<int32_t>(request);
305  }
306 
307  boost::future<void> latch::count_down() {
308  auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
309  return get_round().then(boost::launch::sync, [=](boost::future<int32_t> f) {
310  auto round = f.get();
311  for (;;) {
312  try {
313  count_down(round, invocation_uid);
314  return;
315  } catch (client::exception::operation_timeout &) {
316  // I can retry safely because my retry would be idempotent...
317  }
318  }
319 
320  });
321  }
322 
323  boost::future<bool> latch::try_wait() {
324  return get_count().then(boost::launch::sync, [](boost::future<int32_t> f) {
325  return f.get() == 0;
326  });
327  }
328 
329  boost::future<int32_t> latch::get_round() {
330  auto request = countdownlatch_getround_encode(group_id_, object_name_);
331  return invoke_and_get_future<int32_t>(request);
332  }
333 
334  void latch::count_down(int round, boost::uuids::uuid invocation_uid) {
335  auto request = countdownlatch_countdown_encode(group_id_, object_name_, invocation_uid, round);
336  invoke(request).get();
337  }
338 
339  boost::future<void> latch::wait() {
340  return to_void_future(wait_for(std::chrono::milliseconds(INT64_MAX)));
341  }
342 
343  boost::future<std::cv_status> latch::wait_for(std::chrono::milliseconds timeout) {
344  auto timeout_millis = std::max<int64_t>(0, timeout.count());
345  auto invoation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
346  auto request = countdownlatch_await_encode(group_id_, object_name_, invoation_uid, timeout_millis);
347  return invoke_and_get_future<bool>(request).then(boost::launch::sync, [](boost::future<bool> f) {
348  return f.get() ? std::cv_status::no_timeout : std::cv_status::timeout;
349  });
350  }
351 
352  bool raft_group_id::operator==(const raft_group_id &rhs) const {
353  return name == rhs.name &&
354  seed == rhs.seed &&
355  group_id == rhs.group_id;
356  }
357 
358  bool raft_group_id::operator!=(const raft_group_id &rhs) const {
359  return !(rhs == *this);
360  }
361 
362  constexpr int64_t fenced_lock::INVALID_FENCE;
363 
364  fenced_lock::fenced_lock(const std::string &name, client::spi::ClientContext &context,
365  const raft_group_id &group_id,
366  const std::string &object_name) : session_aware_proxy(SERVICE_NAME, name, &context,
367  group_id, object_name,
368  context.get_proxy_session_manager()) {}
369 
370  boost::future<void> fenced_lock::lock() {
371  return to_void_future(lock_and_get_fence());
372  }
373 
374  boost::future<int64_t> fenced_lock::lock_and_get_fence() {
375  auto thread_id = util::get_current_thread_id();
376  auto invocation_uid = get_context().random_uuid();
377 
378  auto do_lock_once = [=] () {
379  auto session_id = session_manager_.acquire_session(group_id_);
380  verify_locked_session_id_if_present(thread_id, session_id, true);
381  return do_lock(session_id, thread_id, invocation_uid).then(boost::launch::sync,
382  [=](boost::future<int64_t> f) {
383  try {
384  auto fence = f.get();
385  if (fence != INVALID_FENCE) {
386  locked_session_ids_.put(
387  thread_id,
388  std::make_shared<int64_t>(
389  session_id));
390  return fence;
391  }
392  BOOST_THROW_EXCEPTION(
393  client::exception::lock_acquire_limit_reached(
394  "fenced_lock::lock_and_get_fence", (
395  boost::format(
396  "Lock [%1%] reentrant lock limit is already reached!") %
397  object_name_).str()));
398  } catch (client::exception::session_expired &) {
399  invalidate_session(session_id);
400  verify_no_locked_session_id_present(thread_id);
401  return INVALID_FENCE;
402  } catch (client::exception::wait_key_cancelled &) {
403  release_session(session_id);
404  BOOST_THROW_EXCEPTION(client::exception::lock_acquire_limit_reached(
405  "fenced_lock::lock_and_get_fence", (boost::format(
406  "Lock [%1%] not acquired because the lock call on the CP group is cancelled, possibly because of another indeterminate call from the same thread.") %
407  object_name_).str()));
408  } catch (...) {
409  release_session(session_id);
410  throw;
411  }
412  });
413  };
414 
415  return do_lock_once().then(boost::launch::sync, [=](boost::future<int64_t> f) {
416  auto result = f.get();
417  if (result != INVALID_FENCE) {
418  return result;
419  }
420  // iterate in the user thread
421  for (result = do_lock_once().get(); result == INVALID_FENCE;) {}
422  return result;
423  });
424  }
425 
426  void fenced_lock::verify_locked_session_id_if_present(int64_t thread_id, int64_t session_id,
427  bool should_release) {
428  auto locked_session_id = locked_session_ids_.get(thread_id);
429  if (locked_session_id && *locked_session_id != session_id) {
430  locked_session_ids_.remove(thread_id);
431  if (should_release) {
432  release_session(session_id);
433  }
434 
435  throw_lock_ownership_lost(*locked_session_id);
436  }
437  }
438 
439  void fenced_lock::verify_no_locked_session_id_present(int64_t thread_id) {
440  auto locked_session_id = locked_session_ids_.remove(thread_id);
441  if (locked_session_id) {
442  locked_session_ids_.remove(thread_id);
443  throw_lock_ownership_lost(*locked_session_id);
444  }
445  }
446 
447  void fenced_lock::throw_lock_ownership_lost(int64_t session_id) const {
448  BOOST_THROW_EXCEPTION(client::exception::lock_ownership_lost("fenced_lock", (boost::format(
449  "Current thread is not owner of the Lock[%1%] because its Session[%2%] is closed by server!") %
450  get_name() % session_id).str()));
451  }
452 
453  void fenced_lock::throw_illegal_monitor_state() const {
454  BOOST_THROW_EXCEPTION(client::exception::illegal_monitor_state("fenced_lock", (boost::format(
455  "Current thread is not owner of the Lock[%1%]") %get_name()).str()));
456  }
457 
458  boost::future<int64_t>
459  fenced_lock::do_lock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid) {
460  auto request = client::protocol::codec::fencedlock_lock_encode(group_id_, object_name_, session_id, thread_id, invocation_uid);
461  return invoke_and_get_future<int64_t>(request);
462  }
463 
464  boost::future<int64_t>
465  fenced_lock::do_try_lock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid,
466  std::chrono::milliseconds timeout) {
467  auto request = client::protocol::codec::fencedlock_trylock_encode(group_id_, object_name_, session_id,
468  thread_id, invocation_uid,
469  std::chrono::duration_cast<std::chrono::milliseconds>(
470  timeout).count());
471  return invoke_and_get_future<int64_t>(request);
472  }
473 
474  boost::future<bool>
475  fenced_lock::do_unlock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid) {
476  auto request = client::protocol::codec::fencedlock_unlock_encode(group_id_, object_name_, session_id, thread_id, invocation_uid);
477  return invoke_and_get_future<bool>(request);
478  }
479 
480  boost::future<fenced_lock::lock_ownership_state> fenced_lock::do_get_lock_ownership_state(){
481  auto request = client::protocol::codec::fencedlock_getlockownership_encode(group_id_, object_name_);
482  return invoke(request).then(boost::launch::sync, [](boost::future<client::protocol::ClientMessage> f) {
483  auto msg = f.get();
484  fenced_lock::lock_ownership_state state;
485  state.fence = msg.get_first_fixed_sized_field<int64_t>();
486  state.lock_count = msg.get<int32_t>();
487  state.session_id = msg.get<int64_t>();
488  state.thread_id = msg.get<int64_t>();
489  return state;
490  });
491  }
492 
493  void fenced_lock::invalidate_session(int64_t session_id) {
494  session_manager_.invalidate_session(group_id_, session_id);
495  }
496 
497  boost::future<bool> fenced_lock::try_lock() {
498  return try_lock_and_get_fence().then(boost::launch::sync, [](boost::future<int64_t> f) {
499  return f.get() != INVALID_FENCE;
500  });
501  }
502 
503  boost::future<bool> fenced_lock::try_lock(std::chrono::milliseconds timeout) {
504  return try_lock_and_get_fence(timeout).then(boost::launch::sync, [](boost::future<int64_t> f) {
505  return f.get() != INVALID_FENCE;
506  });
507  }
508 
509  boost::future<int64_t> fenced_lock::try_lock_and_get_fence() {
510  return try_lock_and_get_fence(std::chrono::milliseconds(0));
511  }
512 
513  boost::future<int64_t>
514  fenced_lock::try_lock_and_get_fence(std::chrono::milliseconds timeout) {
515  auto thread_id = util::get_current_thread_id();
516  auto invocation_uid = get_context().random_uuid();
517 
518  auto do_try_lock_once = [=] () {
519  using namespace std::chrono;
520  auto start = steady_clock::now();
521  auto session_id = session_manager_.acquire_session(group_id_);
522  verify_locked_session_id_if_present(thread_id, session_id, true);
523  return do_try_lock(session_id, thread_id, invocation_uid, timeout).then(boost::launch::sync,
524  [=](boost::future<int64_t> f) {
525  try {
526  auto fence = f.get();
527  if (fence !=
528  INVALID_FENCE) {
529  locked_session_ids_.put(
530  thread_id,
531  std::make_shared<int64_t>(
532  session_id));
533  return std::make_pair(
534  fence,
535  false);
536  } else {
537  release_session(
538  session_id);
539  }
540  return std::make_pair(
541  fence, false);
542  } catch (
543  client::exception::session_expired &) {
544  invalidate_session(session_id);
545  verify_no_locked_session_id_present(thread_id);
546  auto timeout_left = timeout - (steady_clock::now() - start);
547  if (timeout_left.count() <= 0) {
548  return std::make_pair(INVALID_FENCE, false);
549  }
550  return std::make_pair(INVALID_FENCE, false);
551  } catch (client::exception::wait_key_cancelled &) {
552  release_session(session_id);
553  return std::make_pair(INVALID_FENCE, false);
554  } catch (...) {
555  release_session(session_id);
556  throw;
557  }
558  });
559  };
560 
561  return do_try_lock_once().then(boost::launch::sync, [=](boost::future<std::pair<int64_t, bool>> f) {
562  auto result = f.get();
563  if (!result.second) {
564  return result.first;
565  }
566  // iterate in the user thread
567  for (result = do_try_lock_once().get(); result.second;) {}
568  return result.first;
569  });
570  }
571 
572  boost::future<void> fenced_lock::unlock() {
573  auto thread_id = util::get_current_thread_id();
574  int64_t session_id = session_manager_.get_session(group_id_);
575 
576  // the order of the following checks is important.
577  verify_locked_session_id_if_present(thread_id, session_id, false);
578  if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
579  locked_session_ids_.remove(thread_id);
580  throw_illegal_monitor_state();
581  }
582 
583  return do_unlock(session_id, thread_id, get_context().random_uuid()).then(boost::launch::sync,
584  [=](boost::future<bool> f) {
585  try {
586  auto still_locked_by_current_thread = f.get();
587  if (still_locked_by_current_thread) {
588  locked_session_ids_.put(
589  thread_id,
590  std::make_shared<int64_t>(
591  session_id));
592  } else {
593  locked_session_ids_.remove(
594  thread_id);
595  }
596 
597  release_session(
598  session_id);
599  } catch (
600  client::exception::session_expired &) {
601  invalidate_session(session_id);
602  locked_session_ids_.remove(thread_id);
603 
604  throw_lock_ownership_lost(session_id);
605  } catch (client::exception::illegal_monitor_state &) {
606  locked_session_ids_.remove(thread_id);
607  throw;
608  }
609  });
610  }
611 
612  boost::future<int64_t> fenced_lock::get_fence() {
613  auto thread_id = util::get_current_thread_id();
614  int64_t session_id = session_manager_.get_session(group_id_);
615 
616  // the order of the following checks is important.
617  verify_locked_session_id_if_present(thread_id, session_id, false);
618  if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
619  locked_session_ids_.remove(thread_id);
620  throw_illegal_monitor_state();
621  }
622 
623  return do_get_lock_ownership_state().then(boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
624  auto ownership = f.get();
625  if (ownership.is_locked_by(session_id, thread_id)) {
626  locked_session_ids_.put(thread_id, std::make_shared<int64_t>(session_id));
627  return ownership.fence;
628  }
629 
630  verify_no_locked_session_id_present(thread_id);
631  throw_illegal_monitor_state();
632  return INVALID_FENCE;
633  });
634  }
635 
636  boost::future<bool> fenced_lock::is_locked() {
637  auto thread_id = util::get_current_thread_id();
638  int64_t session_id = session_manager_.get_session(group_id_);
639 
640  verify_locked_session_id_if_present(thread_id, session_id, false);
641 
642  return do_get_lock_ownership_state().then(boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
643  auto ownership = f.get();
644  if (ownership.is_locked_by(session_id, thread_id)) {
645  locked_session_ids_.put(thread_id, std::make_shared<int64_t>(session_id));
646  return true;
647  }
648 
649  verify_no_locked_session_id_present(thread_id);
650  return ownership.is_locked();
651  });
652  }
653 
654  boost::future<bool> fenced_lock::is_locked_by_current_thread() {
655  auto thread_id = util::get_current_thread_id();
656  int64_t session_id = session_manager_.get_session(group_id_);
657 
658  verify_locked_session_id_if_present(thread_id, session_id, false);
659 
660  return do_get_lock_ownership_state().then(boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
661  auto ownership = f.get();
662  auto locked_by_current_thread = ownership.is_locked_by(session_id, thread_id);
663  if (locked_by_current_thread) {
664  locked_session_ids_.put(thread_id, std::make_shared<int64_t>(session_id));
665  } else {
666  verify_no_locked_session_id_present(thread_id);
667  }
668 
669  return locked_by_current_thread;
670  });
671  }
672 
673  boost::future<int32_t> fenced_lock::get_lock_count() {
674  auto thread_id = util::get_current_thread_id();
675  int64_t session_id = session_manager_.get_session(group_id_);
676 
677  verify_locked_session_id_if_present(thread_id, session_id, false);
678 
679  return do_get_lock_ownership_state().then(boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
680  auto ownership = f.get();
681  if (ownership.is_locked_by(session_id, thread_id)) {
682  locked_session_ids_.put(thread_id, std::make_shared<int64_t>(session_id));
683  } else {
684  verify_no_locked_session_id_present(thread_id);
685  }
686 
687  return ownership.lock_count;
688  });
689  }
690 
691  const raft_group_id &fenced_lock::get_group_id() {
692  return group_id_;
693  }
694 
695  bool operator==(const fenced_lock &lhs, const fenced_lock &rhs) {
696  return lhs.get_service_name() == rhs.get_service_name() && lhs.get_name() == rhs.get_name();
697  }
698 
699  void fenced_lock::post_destroy() {
700  ClientProxy::post_destroy();
701  locked_session_ids_.clear();
702  }
703 
704  session_aware_proxy::session_aware_proxy(const std::string &service_name, const std::string &proxy_name,
705  client::spi::ClientContext *context, const raft_group_id &group_id,
706  const std::string &object_name,
707  internal::session::proxy_session_manager &session_manager) : cp_proxy(
708  service_name, proxy_name, context, group_id, object_name), session_manager_(session_manager) {}
709 
710  void session_aware_proxy::release_session(int64_t session_id) {
711  session_manager_.release_session(group_id_, session_id);
712  }
713 
714  bool fenced_lock::lock_ownership_state::is_locked_by(int64_t session, int64_t thread) {
715  return is_locked() && session_id == session && thread_id == thread;
716  }
717 
718  bool fenced_lock::lock_ownership_state::is_locked() {
719  return fence != INVALID_FENCE;
720  }
721 
722  counting_semaphore::counting_semaphore(const std::string &proxy_name, client::spi::ClientContext *context,
723  const raft_group_id &group_id, const std::string &object_name,
724  internal::session::proxy_session_manager &session_manager)
725  : session_aware_proxy(SERVICE_NAME,
726  proxy_name, context,
727  group_id,
728  object_name, session_manager) {}
729 
730  boost::future<bool> counting_semaphore::init(int32_t permits) {
731  util::Preconditions::check_not_negative(permits, "Permits must be non-negative!");
732 
733  auto request = client::protocol::codec::semaphore_init_encode(group_id_, object_name_, permits);
734  return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
735  boost::launch::sync, [](boost::future<client::protocol::ClientMessage> f) {
736  return f.get().get_first_fixed_sized_field<bool>();
737  });
738  }
739 
740  boost::future<bool> counting_semaphore::try_acquire(int32_t permits) {
741  return try_acquire_for(std::chrono::milliseconds::zero(), permits);
742  }
743 
744  boost::future<bool> counting_semaphore::try_acquire_for(std::chrono::milliseconds rel_time, int32_t permits) {
745  if (rel_time < std::chrono::milliseconds::zero()) {
746  rel_time = std::chrono::milliseconds ::zero();
747  }
748  return try_acquire_for_millis(permits, rel_time);
749  }
750 
751  boost::future<void> counting_semaphore::do_release(int32_t permits, int64_t thread_id, int64_t session_id) {
752  auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
753  auto request = codec::semaphore_release_encode(group_id_, object_name_, session_id, thread_id, invocation_uid, permits);
754  return to_void_future(
755  client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke());
756  }
757 
758  boost::future<int32_t> counting_semaphore::available_permits() {
759  auto request = codec::semaphore_availablepermits_encode(group_id_, object_name_);
760  return decode<int32_t>(
761  client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke());
762  }
763 
764  boost::future<void> counting_semaphore::reduce_permits(int32_t reduction) {
765  util::Preconditions::check_not_negative(reduction, "Reduction must be non-negative!");
766  if (reduction == 0) {
767  return boost::make_ready_future();
768  }
769  return do_change_permits(-reduction);
770  }
771 
772  boost::future<void> counting_semaphore::increase_permits(int32_t increase) {
773  util::Preconditions::check_not_negative(increase, "Increase must be non-negative!");
774  if (increase == 0) {
775  return boost::make_ready_future();
776  }
777  return do_change_permits(increase);
778  }
779 
780  sessionless_semaphore::sessionless_semaphore(const std::string &proxy_name, client::spi::ClientContext *context,
781  const raft_group_id &group_id, const std::string &object_name,
782  internal::session::proxy_session_manager &session_manager)
783  : counting_semaphore(proxy_name, context, group_id, object_name, session_manager) {}
784 
785  boost::future<void> sessionless_semaphore::acquire(int32_t permits) {
786  util::Preconditions::check_positive(permits, "permits must be positive number.");
787 
788  return to_void_future(do_try_acquire(permits, std::chrono::milliseconds(-1)));
789  }
790 
791  boost::future<bool>
792  sessionless_semaphore::do_try_acquire(int32_t permits, std::chrono::milliseconds timeout_ms) {
793  auto cluster_wide_threadId = get_thread_id();
794  auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
795  auto request = client::protocol::codec::semaphore_acquire_encode(group_id_, object_name_, internal::session::proxy_session_manager::NO_SESSION_ID, cluster_wide_threadId, invocation_uid, permits, timeout_ms.count());
796  return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
797  boost::launch::sync, [=](boost::future<client::protocol::ClientMessage> f) {
798  try {
799  return f.get().get_first_fixed_sized_field<bool>();
800  } catch (client::exception::wait_key_cancelled &) {
801  throw client::exception::illegal_state("sessionless_semaphore::acquire",
802  (boost::format(
803  "Semaphore[%1%] ] not acquired because the acquire call on the CP group is cancelled, possibly because of another indeterminate call from the same thread.") %
804  object_name_).str());
805  }
806  });
807  }
808 
809  boost::future<bool> sessionless_semaphore::try_acquire_for_millis(int32_t permits, std::chrono::milliseconds timeout) {
810  util::Preconditions::check_positive(permits, "Permits must be positive!");
811 
812  return do_try_acquire(permits, timeout > std::chrono::milliseconds::zero() ? timeout : std::chrono::milliseconds::zero());
813  }
814 
815  boost::future<void> sessionless_semaphore::release(int32_t permits) {
816  util::Preconditions::check_positive(permits, "Permits must be positive!");
817  auto thread_id = get_thread_id();
818  return do_release(permits, thread_id, internal::session::proxy_session_manager::NO_SESSION_ID);
819  }
820 
821  int64_t sessionless_semaphore::get_thread_id() {
822  return session_manager_.get_or_create_unique_thread_id(group_id_);
823  }
824 
825  boost::future<int32_t> sessionless_semaphore::drain_permits() {
826  auto cluster_wide_threadId = get_thread_id();
827  auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
828  auto request = client::protocol::codec::semaphore_drain_encode(group_id_, object_name_, internal::session::proxy_session_manager::NO_SESSION_ID, cluster_wide_threadId, invocation_uid);
829  return decode<int32_t>(
830  client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke());
831  }
832 
833  boost::future<void> sessionless_semaphore::do_change_permits(int32_t delta) {
834  auto cluster_wide_threadId = get_thread_id();
835  auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
836  auto request = client::protocol::codec::semaphore_change_encode(group_id_, object_name_, internal::session::proxy_session_manager::NO_SESSION_ID, cluster_wide_threadId, invocation_uid, delta);
837  return to_void_future(
838  client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke());
839  }
840 
841  session_semaphore::session_semaphore(const std::string &proxy_name, client::spi::ClientContext *context,
842  const raft_group_id &group_id, const std::string &object_name,
843  internal::session::proxy_session_manager &session_manager)
844  : counting_semaphore(proxy_name, context, group_id, object_name, session_manager) {}
845 
846  boost::future<void> session_semaphore::acquire(int32_t permits) {
847  return to_void_future(try_acquire_for_millis(permits, std::chrono::milliseconds(-1)));
848  }
849 
850  boost::future<bool>
851  session_semaphore::try_acquire_for_millis(int32_t permits, std::chrono::milliseconds timeout) {
852  util::Preconditions::check_not_negative(permits, "permits must not be negative number.");
853 
854  auto thread_id = get_thread_id();
855  auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
856 
857  auto do_try_acquire_once = ([=] () {
858  auto start = std::chrono::steady_clock::now();
859  auto use_timeout = timeout >= std::chrono::milliseconds::zero();
860  auto session_id = session_manager_.acquire_session(group_id_, permits);
861  auto request = client::protocol::codec::semaphore_acquire_encode(group_id_, object_name_,
862  session_id,
863  thread_id, invocation_uid, permits,
864  timeout.count());
865  return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
866  boost::launch::sync, [=](boost::future<client::protocol::ClientMessage> f) {
867  try {
868  auto acquired = f.get().get_first_fixed_sized_field<bool>();
869  if (!acquired) {
870  session_manager_.release_session(group_id_, session_id);
871  }
872  // first bool means acquired or not, second bool means if should try again
873  return std::make_pair(acquired, false);
874  } catch (client::exception::session_expired &) {
875  session_manager_.invalidate_session(group_id_, session_id);
876  if (use_timeout && (timeout - (std::chrono::steady_clock::now() - start) <=
877  std::chrono::milliseconds::zero())) {
878  return std::make_pair(false, false);
879  }
880  return std::make_pair(false, true);
881  } catch (client::exception::wait_key_cancelled &) {
882  session_manager_.release_session(group_id_, session_id, permits);
883  if (!use_timeout) {
884  BOOST_THROW_EXCEPTION(
885  client::exception::illegal_state(
886  "session_semaphore::try_acquire_for_millis", (boost::format(
887  "Semaphore[%1%] not acquired because the acquire call on the CP group is cancelled, possibly because of another indeterminate call from the same thread.") %
888  object_name_).str()));
889  }
890  return std::make_pair(false, false);
891  }
892  });
893  });
894 
895  return do_try_acquire_once().then(boost::launch::sync, [=](boost::future<std::pair<bool, bool>> f) {
896  auto result = f.get();
897  if (!result.second) {
898  return result.first;
899  }
900  for (; result.second; result = do_try_acquire_once().get());
901  return result.first;
902  });
903  }
904 
905  boost::future<void> session_semaphore::release(int32_t permits) {
906  util::Preconditions::check_positive(permits, "Permits must be positive!");
907  auto session_id = session_manager_.get_session(group_id_);
908  if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
909  throw_illegal_state_exception(nullptr);
910  }
911 
912  auto thread_id = get_thread_id();
913  return do_release(permits, thread_id, session_id).then([=] (boost::future<void> f) {
914  try {
915  f.get();
916  session_manager_.release_session(group_id_, session_id, permits);
917  } catch (client::exception::session_expired &) {
918  session_manager_.invalidate_session(group_id_, session_id);
919  session_manager_.release_session(group_id_, session_id, permits);
920  throw_illegal_state_exception(std::current_exception());
921  }
922  });
923  }
924 
925  void session_semaphore::throw_illegal_state_exception(std::exception_ptr e) {
926  auto ise = boost::enable_current_exception(
927  client::exception::illegal_state("session_semaphore::illegal_state",
928  "No valid session!"));
929  if (!e) {
930  throw ise;
931  }
932  try {
933  std::rethrow_exception(e);
934  } catch (...) {
935  std::throw_with_nested(ise);
936  }
937  }
938 
939  int64_t session_semaphore::get_thread_id() {
940  return util::get_current_thread_id();
941  }
942 
943  boost::future<int32_t> session_semaphore::drain_permits() {
944  auto thread_id = get_thread_id();
945  auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
946 
947  auto do_drain_once = ([=] () {
948  auto session_id = session_manager_.acquire_session(group_id_, DRAIN_SESSION_ACQ_COUNT);
949  auto request = client::protocol::codec::semaphore_drain_encode(group_id_, object_name_,
950  session_id,
951  thread_id, invocation_uid);
952  return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
953  boost::launch::sync, [=](boost::future<client::protocol::ClientMessage> f) {
954  try {
955  auto count = f.get().get_first_fixed_sized_field<int32_t>();
956  session_manager_.release_session(group_id_, session_id,
957  DRAIN_SESSION_ACQ_COUNT - count);
958  return count;
959  } catch (client::exception::session_expired &) {
960  session_manager_.invalidate_session(group_id_, session_id);
961  return -1;
962  }
963  });
964  });
965 
966  return do_drain_once().then(boost::launch::sync, [=](boost::future<int32_t> f) {
967  int32_t count = f.get();
968  if (count != -1) {
969  return count;
970  }
971  while ((count = do_drain_once().get()) == -1) {}
972  return count;
973  });
974  }
975 
976  boost::future<void> session_semaphore::do_change_permits(int32_t delta) {
977  auto session_id = session_manager_.acquire_session(group_id_, DRAIN_SESSION_ACQ_COUNT);
978  auto thread_id = get_thread_id();
979  auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
980 
981  auto request = client::protocol::codec::semaphore_change_encode(group_id_, object_name_,
982  session_id,
983  thread_id, invocation_uid, delta);
984  return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
985  boost::launch::sync, [=](boost::future<client::protocol::ClientMessage> f) {
986  try {
987  f.get();
988  session_manager_.release_session(group_id_, session_id);
989  } catch (client::exception::session_expired &) {
990  session_manager_.invalidate_session(group_id_, session_id);
991  throw_illegal_state_exception(std::current_exception());
992  }
993  });
994  }
995  }
996 }
997 
998 namespace std {
999  std::size_t
1000  hash<hazelcast::cp::raft_group_id>::operator()(const hazelcast::cp::raft_group_id &group_id) const noexcept {
1001  std::size_t seed = 0;
1002  boost::hash_combine(seed, group_id.name);
1003  boost::hash_combine(seed, group_id.seed);
1004  boost::hash_combine(seed, group_id.group_id);
1005  return seed;
1006  }
1007 }
1008 
1009 namespace boost {
1010  std::size_t
1011  hash<hazelcast::cp::raft_group_id>::operator()(const hazelcast::cp::raft_group_id &group_id) const noexcept {
1012  return std::hash<hazelcast::cp::raft_group_id>()(group_id);
1013  }
1014 }
1015 
1016 
1017 
Client-side Raft-based proxy implementation of atomic long.
Definition: cp.h:90
boost::future< int64_t > get_and_add(int64_t delta)
Atomically adds the given value to the current value.
Definition: cp.cpp:189
boost::future< int64_t > get()
Gets the current value.
Definition: cp.cpp:184
boost::future< int64_t > decrement_and_get()
Atomically decrements the current value by one.
Definition: cp.cpp:180
boost::future< int64_t > get_and_increment()
Atomically increments the current value by one.
Definition: cp.cpp:203
boost::future< bool > compare_and_set(int64_t expect, int64_t update)
Atomically sets the value to the given updated value only if the current value.
Definition: cp.cpp:171
boost::future< int64_t > get_and_set(int64_t new_value)
Atomically sets the given value and returns the old value.
Definition: cp.cpp:194
boost::future< void > set(int64_t new_value)
Atomically sets the given value.
Definition: cp.cpp:207
boost::future< int64_t > increment_and_get()
Atomically increments the current value by one.
Definition: cp.cpp:199
boost::future< int64_t > get_and_decrement()
Atomically decrements the current value by one.
Definition: cp.cpp:176
boost::future< int64_t > add_and_get(int64_t delta)
Atomically adds the given value to the current value.
Definition: cp.cpp:166
void release_session(int64_t session_id)
Decrements acquire count of the session.
Definition: cp.cpp:710
static constexpr int32_t DRAIN_SESSION_ACQ_COUNT
Since a proxy does not know how many permits will be drained on the Raft group, it uses this constant...
Definition: cp.h:1217