Hazelcast C++ Client
Hazelcast C++ Client Library
cp.cpp
1 /*
2  * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <algorithm>
18 #include <boost/algorithm/string.hpp>
19 
20 #include "hazelcast/cp/cp.h"
21 #include "hazelcast/client/spi/ClientContext.h"
22 #include "hazelcast/util/Preconditions.h"
23 #include "hazelcast/client/protocol/codec/codecs.h"
24 #include "hazelcast/client/protocol/ClientMessage.h"
25 #include "hazelcast/client/spi/impl/ClientInvocation.h"
26 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
27 
28 namespace hazelcast {
29  namespace cp {
30  using namespace hazelcast::client::protocol;
31  using namespace hazelcast::client::protocol::codec;
32  using namespace hazelcast::util;
33 
34  raft_proxy_factory::raft_proxy_factory(client::spi::ClientContext &context) : context_(context) {}
35 
36  std::shared_ptr<fenced_lock>
37  raft_proxy_factory::create_fenced_lock(raft_group_id &&group_id, const std::string &proxy_name,
38  const std::string &object_name) {
39  while (true) {
40  auto proxy = lock_proxies_.get(proxy_name);
41  if (proxy) {
42  if (proxy->get_group_id() != group_id) {
43  lock_proxies_.remove(proxy_name, proxy);
44  } else {
45  return proxy;
46  }
47  }
48 
49  proxy = std::make_shared<fenced_lock>(proxy_name, context_, group_id, object_name);
50  auto existing = lock_proxies_.put_if_absent(proxy_name, proxy);
51  if (!existing) {
52  return proxy;
53  } else if (existing->get_group_id() == group_id) {
54  return existing;
55  }
56 
57  group_id = get_group_id(proxy_name, object_name).get();
58  }
59  }
60 
61  std::shared_ptr<counting_semaphore>
62  raft_proxy_factory::create_semaphore(raft_group_id &&group_id, const std::string &proxy_name,
63  const std::string &object_name) {
64  auto request = client::protocol::codec::semaphore_getsemaphoretype_encode(proxy_name);
65  auto is_sessionless = client::spi::impl::ClientInvocation::create(context_, request,
66  object_name)->invoke().get().get_first_fixed_sized_field<bool>();
67  if (is_sessionless) {
68  return std::shared_ptr<counting_semaphore>(
69  new sessionless_semaphore(proxy_name, &context_, group_id, object_name,
70  context_.get_proxy_session_manager()));
71  } else {
72  return std::shared_ptr<counting_semaphore>(
73  new session_semaphore(proxy_name, &context_, group_id, object_name,
74  context_.get_proxy_session_manager()));
75  }
76  }
77 
78  std::string raft_proxy_factory::without_default_group_name(const std::string &n) {
79  std::string name = n;
80  boost::trim(name);
81  auto index = name.find('@');
82  if (index == std::string::npos) {
83  return name;
84  }
85 
86  Preconditions::check_true(name.find('@', index + 1) == std::string::npos,
87  "Custom group name must be specified at most once");
88 
89  auto group_name = name.substr(index + 1);
90  boost::trim(group_name);
91  if (group_name == DEFAULT_GROUP_NAME) {
92  return name.substr(0, index);
93  }
94  return name;
95  }
96 
97  std::string raft_proxy_factory::object_name_for_proxy(const std::string &name) {
98  auto index = name.find('@');
99  if (index == std::string::npos) {
100  return name;
101  }
102 
103  Preconditions::check_true(index < (name.size() - 1),
104  "Object name cannot be empty string");
105  Preconditions::check_true(name.find('@', index + 1) == std::string::npos,
106  "Custom CP group name must be specified at most once");
107 
108  auto object_name = name.substr(0, index);
109  boost::trim(object_name);
110  Preconditions::check_true(object_name.size() > 0,
111  "Object name cannot be empty string");
112  return object_name;
113  }
114 
115  boost::future<raft_group_id> raft_proxy_factory::get_group_id(const std::string &proxy_name, const std::string &object_name) {
116  auto request = cpgroup_createcpgroup_encode(proxy_name);
117  return client::spi::impl::ClientInvocation::create(context_, request, object_name)->invoke().then(
118  boost::launch::sync, [](boost::future<client::protocol::ClientMessage> f) {
119  return *f.get().get_first_var_sized_field<raft_group_id>();
120  });
121  }
122 
123  cp_subsystem::cp_subsystem(client::spi::ClientContext &context) : context_(context), proxy_factory_(context) {
124  }
125 
126  boost::future<std::shared_ptr<atomic_long>> cp_subsystem::get_atomic_long(const std::string &name) {
127  return proxy_factory_.create_proxy<atomic_long>(name);
128  }
129 
130  boost::future<std::shared_ptr<atomic_reference>> cp_subsystem::get_atomic_reference(const std::string &name) {
131  return proxy_factory_.create_proxy<atomic_reference>(name);
132  }
133 
134  boost::future<std::shared_ptr<latch>> cp_subsystem::get_latch(const std::string &name) {
135  return proxy_factory_.create_proxy<latch>(name);
136  }
137 
138  boost::future<std::shared_ptr<fenced_lock>> cp_subsystem::get_lock(const std::string &name) {
139  return proxy_factory_.create_proxy<fenced_lock>(name);
140  }
141 
142  boost::future<std::shared_ptr<counting_semaphore>> cp_subsystem::get_semaphore(const std::string &name) {
143  return proxy_factory_.create_proxy<counting_semaphore>(name);
144  }
145 
146  cp_proxy::cp_proxy(const std::string &service_name, const std::string &proxy_name,
147  client::spi::ClientContext *context,
148  const raft_group_id &group_id, const std::string &object_name) : ProxyImpl(service_name,
149  proxy_name,
150  context),
151  group_id_(group_id),
152  object_name_(object_name) {}
153 
154  void cp_proxy::on_destroy() {
155  auto request = cpgroup_destroycpobject_encode(group_id_, get_service_name(), object_name_);
156  invoke(request).get();
157  }
158 
159  const raft_group_id &cp_proxy::get_group_id() const {
160  return group_id_;
161  }
162 
163  atomic_long::atomic_long(const std::string &name, client::spi::ClientContext &context,
164  const raft_group_id &group_id, const std::string &object_name)
165  : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name) {}
166 
167  boost::future<int64_t> atomic_long::add_and_get(int64_t delta) {
168  auto request = atomiclong_addandget_encode(group_id_, object_name_, delta);
169  return invoke_and_get_future<int64_t>(request);
170  }
171 
172  boost::future<bool> atomic_long::compare_and_set(int64_t expect, int64_t update) {
173  auto request = atomiclong_compareandset_encode(group_id_, object_name_, expect, update);
174  return invoke_and_get_future<bool>(request);
175  }
176 
177  boost::future<int64_t> atomic_long::get_and_decrement() {
178  return get_and_add(-1);
179  }
180 
181  boost::future<int64_t> atomic_long::decrement_and_get() {
182  return add_and_get(-1);
183  }
184 
185  boost::future<int64_t> atomic_long::get() {
186  auto request = atomiclong_get_encode(group_id_, object_name_);
187  return invoke_and_get_future<int64_t>(request);
188  }
189 
190  boost::future<int64_t> atomic_long::get_and_add(int64_t delta) {
191  auto request = atomiclong_getandadd_encode(group_id_, object_name_, delta);
192  return invoke_and_get_future<int64_t>(request);
193  }
194 
195  boost::future<int64_t> atomic_long::get_and_set(int64_t new_value) {
196  auto request = atomiclong_getandset_encode(group_id_, object_name_, new_value);
197  return invoke_and_get_future<int64_t>(request);
198  }
199 
200  boost::future<int64_t> atomic_long::increment_and_get() {
201  return add_and_get(1);
202  }
203 
204  boost::future<int64_t> atomic_long::get_and_increment() {
205  return get_and_add(1);
206  }
207 
208  boost::future<void> atomic_long::set(int64_t new_value) {
209  return to_void_future(get_and_set(new_value));
210  }
211 
212  boost::future<int64_t> atomic_long::alter_data(client::serialization::pimpl::data &function_data,
213  alter_result_type result_type) {
214  auto request = atomiclong_alter_encode(group_id_, object_name_, function_data,
215  static_cast<int32_t>(result_type));
216  return invoke_and_get_future<int64_t>(request);
217  }
218 
219  boost::future<boost::optional<client::serialization::pimpl::data>>
220  atomic_long::apply_data(client::serialization::pimpl::data &function_data) {
221  auto request = atomiclong_apply_encode(group_id_, object_name_, function_data);
222  return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
223  }
224 
225  atomic_reference::atomic_reference(const std::string &name, client::spi::ClientContext &context,
226  const raft_group_id &group_id, const std::string &object_name)
227  : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name) {}
228 
229  boost::future<boost::optional<client::serialization::pimpl::data>> atomic_reference::get_data() {
230  auto request = atomicref_get_encode(group_id_, object_name_);
231  return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
232  }
233 
234  boost::future<boost::optional<client::serialization::pimpl::data>>
235  atomic_reference::set_data(const client::serialization::pimpl::data &new_value_data) {
236  auto request = atomicref_set_encode(group_id_, object_name_, &new_value_data, false);
237  return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
238  }
239 
240  boost::future<boost::optional<client::serialization::pimpl::data>>
241  atomic_reference::get_and_set_data(const client::serialization::pimpl::data &new_value_data) {
242  auto request = atomicref_set_encode(group_id_, object_name_, &new_value_data, true);
243  return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
244  }
245 
246  boost::future<bool>
247  atomic_reference::compare_and_set_data(const client::serialization::pimpl::data &expect_data,
248  const client::serialization::pimpl::data &update_data) {
249  auto request = atomicref_compareandset_encode(group_id_, object_name_, &expect_data, &update_data);
250  return invoke_and_get_future<bool>(request);
251  }
252 
253  boost::future<bool> atomic_reference::contains_data(const client::serialization::pimpl::data &value_data) {
254  auto request = atomicref_contains_encode(group_id_, object_name_, &value_data);
255  return invoke_and_get_future<bool>(request);
256  }
257 
258  boost::future<void> atomic_reference::alter_data(const client::serialization::pimpl::data &function_data) {
259  return to_void_future(invoke_apply(function_data, return_value_type::NO_VALUE, true));
260  }
261 
262  boost::future<boost::optional<client::serialization::pimpl::data>>
263  atomic_reference::alter_and_get_data(const client::serialization::pimpl::data &function_data) {
264  return invoke_apply(function_data, return_value_type::NEW, true);
265  }
266 
267  boost::future<boost::optional<client::serialization::pimpl::data>>
268  atomic_reference::get_and_alter_data(const client::serialization::pimpl::data &function_data) {
269  return invoke_apply(function_data, return_value_type::OLD, true);
270  }
271 
272  boost::future<boost::optional<client::serialization::pimpl::data>>
273  atomic_reference::apply_data(const client::serialization::pimpl::data &function_data) {
274  return invoke_apply(function_data, return_value_type::NEW, false);
275  }
276 
277  boost::future<bool> atomic_reference::is_null() {
278  return contains(static_cast<byte *>(nullptr));
279  }
280 
281  boost::future<void> atomic_reference::clear() {
282  return to_void_future(set(static_cast<byte *>(nullptr)));
283  }
284 
285  boost::future<boost::optional<client::serialization::pimpl::data>>
286  atomic_reference::invoke_apply(const client::serialization::pimpl::data function_data,
287  return_value_type return_type, bool alter) {
288  auto request = atomicref_apply_encode(group_id_, object_name_, function_data,
289  static_cast<int32_t>(return_type), alter);
290  return invoke_and_get_future<boost::optional<client::serialization::pimpl::data>>(request);
291  }
292 
293  latch::latch(const std::string &name, client::spi::ClientContext &context, const raft_group_id &group_id,
294  const std::string &object_name) : cp_proxy(SERVICE_NAME, name, &context, group_id, object_name) {}
295 
296  boost::future<bool> latch::try_set_count(int32_t count) {
297  util::Preconditions::check_positive(count, "count must be positive!");
298 
299  auto request = countdownlatch_trysetcount_encode(group_id_, object_name_, count);
300  return invoke_and_get_future<bool>(request);
301  }
302 
303  boost::future<int32_t> latch::get_count() {
304  auto request = countdownlatch_getcount_encode(group_id_, object_name_);
305  return invoke_and_get_future<int32_t>(request);
306  }
307 
308  boost::future<void> latch::count_down() {
309  auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
310  return get_round().then(boost::launch::sync, [=](boost::future<int32_t> f) {
311  auto round = f.get();
312  for (;;) {
313  try {
314  count_down(round, invocation_uid);
315  return;
316  } catch (client::exception::operation_timeout &) {
317  // I can retry safely because my retry would be idempotent...
318  }
319  }
320 
321  });
322  }
323 
324  boost::future<bool> latch::try_wait() {
325  return get_count().then(boost::launch::sync, [](boost::future<int32_t> f) {
326  return f.get() == 0;
327  });
328  }
329 
330  boost::future<int32_t> latch::get_round() {
331  auto request = countdownlatch_getround_encode(group_id_, object_name_);
332  return invoke_and_get_future<int32_t>(request);
333  }
334 
335  void latch::count_down(int round, boost::uuids::uuid invocation_uid) {
336  auto request = countdownlatch_countdown_encode(group_id_, object_name_, invocation_uid, round);
337  invoke(request).get();
338  }
339 
340  boost::future<void> latch::wait() {
341  return to_void_future(wait_for(std::chrono::milliseconds(INT64_MAX)));
342  }
343 
344  boost::future<std::cv_status> latch::wait_for(std::chrono::milliseconds timeout) {
345  auto timeout_millis = std::max<int64_t>(0, timeout.count());
346  auto invoation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
347  auto request = countdownlatch_await_encode(group_id_, object_name_, invoation_uid, timeout_millis);
348  return invoke_and_get_future<bool>(request).then(boost::launch::sync, [](boost::future<bool> f) {
349  return f.get() ? std::cv_status::no_timeout : std::cv_status::timeout;
350  });
351  }
352 
353  bool raft_group_id::operator==(const raft_group_id &rhs) const {
354  return name == rhs.name &&
355  seed == rhs.seed &&
356  group_id == rhs.group_id;
357  }
358 
359  bool raft_group_id::operator!=(const raft_group_id &rhs) const {
360  return !(rhs == *this);
361  }
362 
363  constexpr int64_t fenced_lock::INVALID_FENCE;
364 
365  fenced_lock::fenced_lock(const std::string &name, client::spi::ClientContext &context,
366  const raft_group_id &group_id,
367  const std::string &object_name) : session_aware_proxy(SERVICE_NAME, name, &context,
368  group_id, object_name,
369  context.get_proxy_session_manager()) {}
370 
371  boost::future<void> fenced_lock::lock() {
372  return to_void_future(lock_and_get_fence());
373  }
374 
375  boost::future<int64_t> fenced_lock::lock_and_get_fence() {
376  auto thread_id = util::get_current_thread_id();
377  auto invocation_uid = get_context().random_uuid();
378 
379  auto do_lock_once = [=] () {
380  auto session_id = session_manager_.acquire_session(group_id_);
381  verify_locked_session_id_if_present(thread_id, session_id, true);
382  return do_lock(session_id, thread_id, invocation_uid).then(boost::launch::sync,
383  [=](boost::future<int64_t> f) {
384  try {
385  auto fence = f.get();
386  if (fence != INVALID_FENCE) {
387  locked_session_ids_.put(
388  thread_id,
389  std::make_shared<int64_t>(
390  session_id));
391  return fence;
392  }
393  BOOST_THROW_EXCEPTION(
394  client::exception::lock_acquire_limit_reached(
395  "fenced_lock::lock_and_get_fence", (
396  boost::format(
397  "Lock [%1%] reentrant lock limit is already reached!") %
398  object_name_).str()));
399  } catch (client::exception::session_expired &) {
400  invalidate_session(session_id);
401  verify_no_locked_session_id_present(thread_id);
402  return INVALID_FENCE;
403  } catch (client::exception::wait_key_cancelled &) {
404  release_session(session_id);
405  BOOST_THROW_EXCEPTION(client::exception::lock_acquire_limit_reached(
406  "fenced_lock::lock_and_get_fence", (boost::format(
407  "Lock [%1%] not acquired because the lock call on the CP group is cancelled, possibly because of another indeterminate call from the same thread.") %
408  object_name_).str()));
409  } catch (...) {
410  release_session(session_id);
411  throw;
412  }
413  });
414  };
415 
416  return do_lock_once().then(boost::launch::sync, [=](boost::future<int64_t> f) {
417  auto result = f.get();
418  if (result != INVALID_FENCE) {
419  return result;
420  }
421  // iterate in the user thread
422  for (result = do_lock_once().get(); result == INVALID_FENCE;) {}
423  return result;
424  });
425  }
426 
427  void fenced_lock::verify_locked_session_id_if_present(int64_t thread_id, int64_t session_id,
428  bool should_release) {
429  auto locked_session_id = locked_session_ids_.get(thread_id);
430  if (locked_session_id && *locked_session_id != session_id) {
431  locked_session_ids_.remove(thread_id);
432  if (should_release) {
433  release_session(session_id);
434  }
435 
436  throw_lock_ownership_lost(*locked_session_id);
437  }
438  }
439 
440  void fenced_lock::verify_no_locked_session_id_present(int64_t thread_id) {
441  auto locked_session_id = locked_session_ids_.remove(thread_id);
442  if (locked_session_id) {
443  locked_session_ids_.remove(thread_id);
444  throw_lock_ownership_lost(*locked_session_id);
445  }
446  }
447 
448  void fenced_lock::throw_lock_ownership_lost(int64_t session_id) const {
449  BOOST_THROW_EXCEPTION(client::exception::lock_ownership_lost("fenced_lock", (boost::format(
450  "Current thread is not owner of the Lock[%1%] because its Session[%2%] is closed by server!") %
451  get_name() % session_id).str()));
452  }
453 
454  void fenced_lock::throw_illegal_monitor_state() const {
455  BOOST_THROW_EXCEPTION(client::exception::illegal_monitor_state("fenced_lock", (boost::format(
456  "Current thread is not owner of the Lock[%1%]") %get_name()).str()));
457  }
458 
459  boost::future<int64_t>
460  fenced_lock::do_lock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid) {
461  auto request = client::protocol::codec::fencedlock_lock_encode(group_id_, object_name_, session_id, thread_id, invocation_uid);
462  return invoke_and_get_future<int64_t>(request);
463  }
464 
465  boost::future<int64_t>
466  fenced_lock::do_try_lock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid,
467  std::chrono::milliseconds timeout) {
468  auto request = client::protocol::codec::fencedlock_trylock_encode(group_id_, object_name_, session_id,
469  thread_id, invocation_uid,
470  std::chrono::duration_cast<std::chrono::milliseconds>(
471  timeout).count());
472  return invoke_and_get_future<int64_t>(request);
473  }
474 
475  boost::future<bool>
476  fenced_lock::do_unlock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid) {
477  auto request = client::protocol::codec::fencedlock_unlock_encode(group_id_, object_name_, session_id, thread_id, invocation_uid);
478  return invoke_and_get_future<bool>(request);
479  }
480 
481  boost::future<fenced_lock::lock_ownership_state> fenced_lock::do_get_lock_ownership_state(){
482  auto request = client::protocol::codec::fencedlock_getlockownership_encode(group_id_, object_name_);
483  return invoke(request).then(boost::launch::sync, [](boost::future<client::protocol::ClientMessage> f) {
484  auto msg = f.get();
485  fenced_lock::lock_ownership_state state;
486  state.fence = msg.get_first_fixed_sized_field<int64_t>();
487  state.lock_count = msg.get<int32_t>();
488  state.session_id = msg.get<int64_t>();
489  state.thread_id = msg.get<int64_t>();
490  return state;
491  });
492  }
493 
494  void fenced_lock::invalidate_session(int64_t session_id) {
495  session_manager_.invalidate_session(group_id_, session_id);
496  }
497 
498  boost::future<bool> fenced_lock::try_lock() {
499  return try_lock_and_get_fence().then(boost::launch::sync, [](boost::future<int64_t> f) {
500  return f.get() != INVALID_FENCE;
501  });
502  }
503 
504  boost::future<bool> fenced_lock::try_lock(std::chrono::milliseconds timeout) {
505  return try_lock_and_get_fence(timeout).then(boost::launch::sync, [](boost::future<int64_t> f) {
506  return f.get() != INVALID_FENCE;
507  });
508  }
509 
510  boost::future<int64_t> fenced_lock::try_lock_and_get_fence() {
511  return try_lock_and_get_fence(std::chrono::milliseconds(0));
512  }
513 
514  boost::future<int64_t>
515  fenced_lock::try_lock_and_get_fence(std::chrono::milliseconds timeout) {
516  auto thread_id = util::get_current_thread_id();
517  auto invocation_uid = get_context().random_uuid();
518 
519  auto do_try_lock_once = [=] () {
520  using namespace std::chrono;
521  auto start = steady_clock::now();
522  auto session_id = session_manager_.acquire_session(group_id_);
523  verify_locked_session_id_if_present(thread_id, session_id, true);
524  return do_try_lock(session_id, thread_id, invocation_uid, timeout).then(boost::launch::sync,
525  [=](boost::future<int64_t> f) {
526  try {
527  auto fence = f.get();
528  if (fence !=
529  INVALID_FENCE) {
530  locked_session_ids_.put(
531  thread_id,
532  std::make_shared<int64_t>(
533  session_id));
534  return std::make_pair(
535  fence,
536  false);
537  } else {
538  release_session(
539  session_id);
540  }
541  return std::make_pair(
542  fence, false);
543  } catch (
544  client::exception::session_expired &) {
545  invalidate_session(session_id);
546  verify_no_locked_session_id_present(thread_id);
547  auto timeout_left = timeout - (steady_clock::now() - start);
548  if (timeout_left.count() <= 0) {
549  return std::make_pair(INVALID_FENCE, false);
550  }
551  return std::make_pair(INVALID_FENCE, false);
552  } catch (client::exception::wait_key_cancelled &) {
553  release_session(session_id);
554  return std::make_pair(INVALID_FENCE, false);
555  } catch (...) {
556  release_session(session_id);
557  throw;
558  }
559  });
560  };
561 
562  return do_try_lock_once().then(boost::launch::sync, [=](boost::future<std::pair<int64_t, bool>> f) {
563  auto result = f.get();
564  if (!result.second) {
565  return result.first;
566  }
567  // iterate in the user thread
568  for (result = do_try_lock_once().get(); result.second;) {}
569  return result.first;
570  });
571  }
572 
573  boost::future<void> fenced_lock::unlock() {
574  auto thread_id = util::get_current_thread_id();
575  int64_t session_id = session_manager_.get_session(group_id_);
576 
577  // the order of the following checks is important.
578  verify_locked_session_id_if_present(thread_id, session_id, false);
579  if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
580  locked_session_ids_.remove(thread_id);
581  throw_illegal_monitor_state();
582  }
583 
584  return do_unlock(session_id, thread_id, get_context().random_uuid()).then(boost::launch::sync,
585  [=](boost::future<bool> f) {
586  try {
587  auto still_locked_by_current_thread = f.get();
588  if (still_locked_by_current_thread) {
589  locked_session_ids_.put(
590  thread_id,
591  std::make_shared<int64_t>(
592  session_id));
593  } else {
594  locked_session_ids_.remove(
595  thread_id);
596  }
597 
598  release_session(
599  session_id);
600  } catch (
601  client::exception::session_expired &) {
602  invalidate_session(session_id);
603  locked_session_ids_.remove(thread_id);
604 
605  throw_lock_ownership_lost(session_id);
606  } catch (client::exception::illegal_monitor_state &) {
607  locked_session_ids_.remove(thread_id);
608  throw;
609  }
610  });
611  }
612 
613  boost::future<int64_t> fenced_lock::get_fence() {
614  auto thread_id = util::get_current_thread_id();
615  int64_t session_id = session_manager_.get_session(group_id_);
616 
617  // the order of the following checks is important.
618  verify_locked_session_id_if_present(thread_id, session_id, false);
619  if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
620  locked_session_ids_.remove(thread_id);
621  throw_illegal_monitor_state();
622  }
623 
624  return do_get_lock_ownership_state().then(boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
625  auto ownership = f.get();
626  if (ownership.is_locked_by(session_id, thread_id)) {
627  locked_session_ids_.put(thread_id, std::make_shared<int64_t>(session_id));
628  return ownership.fence;
629  }
630 
631  verify_no_locked_session_id_present(thread_id);
632  throw_illegal_monitor_state();
633  return INVALID_FENCE;
634  });
635  }
636 
637  boost::future<bool> fenced_lock::is_locked() {
638  auto thread_id = util::get_current_thread_id();
639  int64_t session_id = session_manager_.get_session(group_id_);
640 
641  verify_locked_session_id_if_present(thread_id, session_id, false);
642 
643  return do_get_lock_ownership_state().then(boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
644  auto ownership = f.get();
645  if (ownership.is_locked_by(session_id, thread_id)) {
646  locked_session_ids_.put(thread_id, std::make_shared<int64_t>(session_id));
647  return true;
648  }
649 
650  verify_no_locked_session_id_present(thread_id);
651  return ownership.is_locked();
652  });
653  }
654 
655  boost::future<bool> fenced_lock::is_locked_by_current_thread() {
656  auto thread_id = util::get_current_thread_id();
657  int64_t session_id = session_manager_.get_session(group_id_);
658 
659  verify_locked_session_id_if_present(thread_id, session_id, false);
660 
661  return do_get_lock_ownership_state().then(boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
662  auto ownership = f.get();
663  auto locked_by_current_thread = ownership.is_locked_by(session_id, thread_id);
664  if (locked_by_current_thread) {
665  locked_session_ids_.put(thread_id, std::make_shared<int64_t>(session_id));
666  } else {
667  verify_no_locked_session_id_present(thread_id);
668  }
669 
670  return locked_by_current_thread;
671  });
672  }
673 
674  boost::future<int32_t> fenced_lock::get_lock_count() {
675  auto thread_id = util::get_current_thread_id();
676  int64_t session_id = session_manager_.get_session(group_id_);
677 
678  verify_locked_session_id_if_present(thread_id, session_id, false);
679 
680  return do_get_lock_ownership_state().then(boost::launch::sync, [=](boost::future<lock_ownership_state> f) {
681  auto ownership = f.get();
682  if (ownership.is_locked_by(session_id, thread_id)) {
683  locked_session_ids_.put(thread_id, std::make_shared<int64_t>(session_id));
684  } else {
685  verify_no_locked_session_id_present(thread_id);
686  }
687 
688  return ownership.lock_count;
689  });
690  }
691 
692  const raft_group_id &fenced_lock::get_group_id() {
693  return group_id_;
694  }
695 
696  bool operator==(const fenced_lock &lhs, const fenced_lock &rhs) {
697  return lhs.get_service_name() == rhs.get_service_name() && lhs.get_name() == rhs.get_name();
698  }
699 
700  void fenced_lock::post_destroy() {
701  ClientProxy::post_destroy();
702  locked_session_ids_.clear();
703  }
704 
705  session_aware_proxy::session_aware_proxy(const std::string &service_name, const std::string &proxy_name,
706  client::spi::ClientContext *context, const raft_group_id &group_id,
707  const std::string &object_name,
708  internal::session::proxy_session_manager &session_manager) : cp_proxy(
709  service_name, proxy_name, context, group_id, object_name), session_manager_(session_manager) {}
710 
711  void session_aware_proxy::release_session(int64_t session_id) {
712  session_manager_.release_session(group_id_, session_id);
713  }
714 
715  bool fenced_lock::lock_ownership_state::is_locked_by(int64_t session, int64_t thread) {
716  return is_locked() && session_id == session && thread_id == thread;
717  }
718 
719  bool fenced_lock::lock_ownership_state::is_locked() {
720  return fence != INVALID_FENCE;
721  }
722 
723  counting_semaphore::counting_semaphore(const std::string &proxy_name, client::spi::ClientContext *context,
724  const raft_group_id &group_id, const std::string &object_name,
725  internal::session::proxy_session_manager &session_manager)
726  : session_aware_proxy(SERVICE_NAME,
727  proxy_name, context,
728  group_id,
729  object_name, session_manager) {}
730 
731  boost::future<bool> counting_semaphore::init(int32_t permits) {
732  util::Preconditions::check_not_negative(permits, "Permits must be non-negative!");
733 
734  auto request = client::protocol::codec::semaphore_init_encode(group_id_, object_name_, permits);
735  return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
736  boost::launch::sync, [](boost::future<client::protocol::ClientMessage> f) {
737  return f.get().get_first_fixed_sized_field<bool>();
738  });
739  }
740 
741  boost::future<bool> counting_semaphore::try_acquire(int32_t permits) {
742  return try_acquire_for(std::chrono::milliseconds::zero(), permits);
743  }
744 
745  boost::future<bool> counting_semaphore::try_acquire_for(std::chrono::milliseconds rel_time, int32_t permits) {
746  if (rel_time < std::chrono::milliseconds::zero()) {
747  rel_time = std::chrono::milliseconds ::zero();
748  }
749  return try_acquire_for_millis(permits, rel_time);
750  }
751 
752  boost::future<void> counting_semaphore::do_release(int32_t permits, int64_t thread_id, int64_t session_id) {
753  auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
754  auto request = codec::semaphore_release_encode(group_id_, object_name_, session_id, thread_id, invocation_uid, permits);
755  return to_void_future(
756  client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke());
757  }
758 
759  boost::future<int32_t> counting_semaphore::available_permits() {
760  auto request = codec::semaphore_availablepermits_encode(group_id_, object_name_);
761  return decode<int32_t>(
762  client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke());
763  }
764 
765  boost::future<void> counting_semaphore::reduce_permits(int32_t reduction) {
766  util::Preconditions::check_not_negative(reduction, "Reduction must be non-negative!");
767  if (reduction == 0) {
768  return boost::make_ready_future();
769  }
770  return do_change_permits(-reduction);
771  }
772 
773  boost::future<void> counting_semaphore::increase_permits(int32_t increase) {
774  util::Preconditions::check_not_negative(increase, "Increase must be non-negative!");
775  if (increase == 0) {
776  return boost::make_ready_future();
777  }
778  return do_change_permits(increase);
779  }
780 
781  sessionless_semaphore::sessionless_semaphore(const std::string &proxy_name, client::spi::ClientContext *context,
782  const raft_group_id &group_id, const std::string &object_name,
783  internal::session::proxy_session_manager &session_manager)
784  : counting_semaphore(proxy_name, context, group_id, object_name, session_manager) {}
785 
786  boost::future<void> sessionless_semaphore::acquire(int32_t permits) {
787  util::Preconditions::check_positive(permits, "permits must be positive number.");
788 
789  return to_void_future(do_try_acquire(permits, std::chrono::milliseconds(-1)));
790  }
791 
792  boost::future<bool>
793  sessionless_semaphore::do_try_acquire(int32_t permits, std::chrono::milliseconds timeout_ms) {
794  auto cluster_wide_threadId = get_thread_id();
795  auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
796  auto request = client::protocol::codec::semaphore_acquire_encode(group_id_, object_name_, internal::session::proxy_session_manager::NO_SESSION_ID, cluster_wide_threadId, invocation_uid, permits, timeout_ms.count());
797  return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
798  boost::launch::sync, [=](boost::future<client::protocol::ClientMessage> f) {
799  try {
800  return f.get().get_first_fixed_sized_field<bool>();
801  } catch (client::exception::wait_key_cancelled &) {
802  throw client::exception::illegal_state("sessionless_semaphore::acquire",
803  (boost::format(
804  "Semaphore[%1%] ] not acquired because the acquire call on the CP group is cancelled, possibly because of another indeterminate call from the same thread.") %
805  object_name_).str());
806  }
807  });
808  }
809 
810  boost::future<bool> sessionless_semaphore::try_acquire_for_millis(int32_t permits, std::chrono::milliseconds timeout) {
811  util::Preconditions::check_positive(permits, "Permits must be positive!");
812 
813  return do_try_acquire(permits, timeout > std::chrono::milliseconds::zero() ? timeout : std::chrono::milliseconds::zero());
814  }
815 
816  boost::future<void> sessionless_semaphore::release(int32_t permits) {
817  util::Preconditions::check_positive(permits, "Permits must be positive!");
818  auto thread_id = get_thread_id();
819  return do_release(permits, thread_id, internal::session::proxy_session_manager::NO_SESSION_ID);
820  }
821 
822  int64_t sessionless_semaphore::get_thread_id() {
823  return session_manager_.get_or_create_unique_thread_id(group_id_);
824  }
825 
826  boost::future<int32_t> sessionless_semaphore::drain_permits() {
827  auto cluster_wide_threadId = get_thread_id();
828  auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
829  auto request = client::protocol::codec::semaphore_drain_encode(group_id_, object_name_, internal::session::proxy_session_manager::NO_SESSION_ID, cluster_wide_threadId, invocation_uid);
830  return decode<int32_t>(
831  client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke());
832  }
833 
834  boost::future<void> sessionless_semaphore::do_change_permits(int32_t delta) {
835  auto cluster_wide_threadId = get_thread_id();
836  auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
837  auto request = client::protocol::codec::semaphore_change_encode(group_id_, object_name_, internal::session::proxy_session_manager::NO_SESSION_ID, cluster_wide_threadId, invocation_uid, delta);
838  return to_void_future(
839  client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke());
840  }
841 
842  session_semaphore::session_semaphore(const std::string &proxy_name, client::spi::ClientContext *context,
843  const raft_group_id &group_id, const std::string &object_name,
844  internal::session::proxy_session_manager &session_manager)
845  : counting_semaphore(proxy_name, context, group_id, object_name, session_manager) {}
846 
847  boost::future<void> session_semaphore::acquire(int32_t permits) {
848  return to_void_future(try_acquire_for_millis(permits, std::chrono::milliseconds(-1)));
849  }
850 
851  boost::future<bool>
852  session_semaphore::try_acquire_for_millis(int32_t permits, std::chrono::milliseconds timeout) {
853  util::Preconditions::check_not_negative(permits, "permits must not be negative number.");
854 
855  auto thread_id = get_thread_id();
856  auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
857 
858  auto do_try_acquire_once = ([=] () {
859  auto start = std::chrono::steady_clock::now();
860  auto use_timeout = timeout >= std::chrono::milliseconds::zero();
861  auto session_id = session_manager_.acquire_session(group_id_, permits);
862  auto request = client::protocol::codec::semaphore_acquire_encode(group_id_, object_name_,
863  session_id,
864  thread_id, invocation_uid, permits,
865  timeout.count());
866  return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
867  boost::launch::sync, [=](boost::future<client::protocol::ClientMessage> f) {
868  try {
869  auto acquired = f.get().get_first_fixed_sized_field<bool>();
870  if (!acquired) {
871  session_manager_.release_session(group_id_, session_id);
872  }
873  // first bool means acquired or not, second bool means if should try again
874  return std::make_pair(acquired, false);
875  } catch (client::exception::session_expired &) {
876  session_manager_.invalidate_session(group_id_, session_id);
877  if (use_timeout && (timeout - (std::chrono::steady_clock::now() - start) <=
878  std::chrono::milliseconds::zero())) {
879  return std::make_pair(false, false);
880  }
881  return std::make_pair(false, true);
882  } catch (client::exception::wait_key_cancelled &) {
883  session_manager_.release_session(group_id_, session_id, permits);
884  if (!use_timeout) {
885  BOOST_THROW_EXCEPTION(
886  client::exception::illegal_state(
887  "session_semaphore::try_acquire_for_millis", (boost::format(
888  "Semaphore[%1%] not acquired because the acquire call on the CP group is cancelled, possibly because of another indeterminate call from the same thread.") %
889  object_name_).str()));
890  }
891  return std::make_pair(false, false);
892  }
893  });
894  });
895 
896  return do_try_acquire_once().then(boost::launch::sync, [=](boost::future<std::pair<bool, bool>> f) {
897  auto result = f.get();
898  if (!result.second) {
899  return result.first;
900  }
901  for (; result.second; result = do_try_acquire_once().get());
902  return result.first;
903  });
904  }
905 
906  boost::future<void> session_semaphore::release(int32_t permits) {
907  util::Preconditions::check_positive(permits, "Permits must be positive!");
908  auto session_id = session_manager_.get_session(group_id_);
909  if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
910  throw_illegal_state_exception(nullptr);
911  }
912 
913  auto thread_id = get_thread_id();
914  return do_release(permits, thread_id, session_id).then([=] (boost::future<void> f) {
915  try {
916  f.get();
917  session_manager_.release_session(group_id_, session_id, permits);
918  } catch (client::exception::session_expired &) {
919  session_manager_.invalidate_session(group_id_, session_id);
920  session_manager_.release_session(group_id_, session_id, permits);
921  throw_illegal_state_exception(std::current_exception());
922  }
923  });
924  }
925 
926  void session_semaphore::throw_illegal_state_exception(std::exception_ptr e) {
927  auto ise = boost::enable_current_exception(
928  client::exception::illegal_state("session_semaphore::illegal_state",
929  "No valid session!"));
930  if (!e) {
931  throw ise;
932  }
933  try {
934  std::rethrow_exception(e);
935  } catch (...) {
936  std::throw_with_nested(ise);
937  }
938  }
939 
940  int64_t session_semaphore::get_thread_id() {
941  return util::get_current_thread_id();
942  }
943 
944  boost::future<int32_t> session_semaphore::drain_permits() {
945  auto thread_id = get_thread_id();
946  auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
947 
948  auto do_drain_once = ([=] () {
949  auto session_id = session_manager_.acquire_session(group_id_, DRAIN_SESSION_ACQ_COUNT);
950  auto request = client::protocol::codec::semaphore_drain_encode(group_id_, object_name_,
951  session_id,
952  thread_id, invocation_uid);
953  return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
954  boost::launch::sync, [=](boost::future<client::protocol::ClientMessage> f) {
955  try {
956  auto count = f.get().get_first_fixed_sized_field<int32_t>();
957  session_manager_.release_session(group_id_, session_id,
958  DRAIN_SESSION_ACQ_COUNT - count);
959  return count;
960  } catch (client::exception::session_expired &) {
961  session_manager_.invalidate_session(group_id_, session_id);
962  return -1;
963  }
964  });
965  });
966 
967  return do_drain_once().then(boost::launch::sync, [=](boost::future<int32_t> f) {
968  int32_t count = f.get();
969  if (count != -1) {
970  return count;
971  }
972  while ((count = do_drain_once().get()) == -1) {}
973  return count;
974  });
975  }
976 
977  boost::future<void> session_semaphore::do_change_permits(int32_t delta) {
978  auto session_id = session_manager_.acquire_session(group_id_, DRAIN_SESSION_ACQ_COUNT);
979  auto thread_id = get_thread_id();
980  auto invocation_uid = get_context().get_hazelcast_client_implementation()->random_uuid();
981 
982  auto request = client::protocol::codec::semaphore_change_encode(group_id_, object_name_,
983  session_id,
984  thread_id, invocation_uid, delta);
985  return client::spi::impl::ClientInvocation::create(context_, request, object_name_)->invoke().then(
986  boost::launch::sync, [=](boost::future<client::protocol::ClientMessage> f) {
987  try {
988  f.get();
989  session_manager_.release_session(group_id_, session_id);
990  } catch (client::exception::session_expired &) {
991  session_manager_.invalidate_session(group_id_, session_id);
992  throw_illegal_state_exception(std::current_exception());
993  }
994  });
995  }
996  }
997 }
998 
999 namespace std {
1000  std::size_t
1001  hash<hazelcast::cp::raft_group_id>::operator()(const hazelcast::cp::raft_group_id &group_id) const noexcept {
1002  std::size_t seed = 0;
1003  boost::hash_combine(seed, group_id.name);
1004  boost::hash_combine(seed, group_id.seed);
1005  boost::hash_combine(seed, group_id.group_id);
1006  return seed;
1007  }
1008 }
1009 
1010 namespace boost {
1011  std::size_t
1012  hash<hazelcast::cp::raft_group_id>::operator()(const hazelcast::cp::raft_group_id &group_id) const noexcept {
1013  return std::hash<hazelcast::cp::raft_group_id>()(group_id);
1014  }
1015 }
1016 
1017 
1018 
Client-side Raft-based proxy implementation of atomic long.
Definition: cp.h:90
boost::future< int64_t > get_and_add(int64_t delta)
Atomically adds the given value to the current value.
Definition: cp.cpp:190
boost::future< int64_t > get()
Gets the current value.
Definition: cp.cpp:185
boost::future< int64_t > decrement_and_get()
Atomically decrements the current value by one.
Definition: cp.cpp:181
boost::future< int64_t > get_and_increment()
Atomically increments the current value by one.
Definition: cp.cpp:204
boost::future< bool > compare_and_set(int64_t expect, int64_t update)
Atomically sets the value to the given updated value only if the current value.
Definition: cp.cpp:172
boost::future< int64_t > get_and_set(int64_t new_value)
Atomically sets the given value and returns the old value.
Definition: cp.cpp:195
boost::future< void > set(int64_t new_value)
Atomically sets the given value.
Definition: cp.cpp:208
boost::future< int64_t > increment_and_get()
Atomically increments the current value by one.
Definition: cp.cpp:200
boost::future< int64_t > get_and_decrement()
Atomically decrements the current value by one.
Definition: cp.cpp:177
boost::future< int64_t > add_and_get(int64_t delta)
Atomically adds the given value to the current value.
Definition: cp.cpp:167
void release_session(int64_t session_id)
Decrements acquire count of the session.
Definition: cp.cpp:711
static constexpr int32_t DRAIN_SESSION_ACQ_COUNT
Since a proxy does not know how many permits will be drained on the Raft group, it uses this constant...
Definition: cp.h:1217