Hazelcast C++ Client
Hazelcast C++ Client Library
cp.h
1 /*
2  * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <string>
20 #include <condition_variable>
21 #include <boost/thread/future.hpp>
22 
23 #include "hazelcast/util/SynchronizedMap.h"
24 #include "hazelcast/client/proxy/ProxyImpl.h"
25 #include "hazelcast/client/exception/protocol_exceptions.h"
26 
27 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
28 #pragma warning(push)
29 #pragma warning(disable : 4251) // for dll export
30 #endif
31 
32 namespace hazelcast {
33 namespace client {
34 namespace spi {
35 class ClientContext;
36 }
37 } // namespace client
38 namespace cp {
39 class raft_proxy_factory;
40 
41 struct HAZELCAST_API raft_group_id
42 {
43  std::string name;
44  int64_t seed;
45  int64_t group_id;
46 
47  bool operator==(const raft_group_id& rhs) const;
48 
49  bool operator!=(const raft_group_id& rhs) const;
50 };
51 
52 class HAZELCAST_API cp_proxy : public client::proxy::ProxyImpl
53 {
54 public:
55  cp_proxy(const std::string& service_name,
56  const std::string& proxy_name,
57  client::spi::ClientContext* context,
58  const raft_group_id& group_id,
59  const std::string& object_name);
60 
61  const raft_group_id& get_group_id() const;
62 
63 protected:
64  raft_group_id group_id_;
65  std::string object_name_;
66 
67  void on_destroy();
68 };
69 
70 namespace internal {
71 namespace session {
72 class proxy_session_manager;
73 }
74 } // namespace internal
75 class HAZELCAST_API session_aware_proxy : public cp_proxy
76 {
77 public:
79  const std::string& service_name,
80  const std::string& proxy_name,
81  client::spi::ClientContext* context,
82  const raft_group_id& group_id,
83  const std::string& object_name,
84  internal::session::proxy_session_manager& session_manager);
85 
86 protected:
87  internal::session::proxy_session_manager& session_manager_;
88 
93  void release_session(int64_t session_id);
94 };
95 
99 class HAZELCAST_API atomic_long : public cp_proxy
100 {
101 public:
102  atomic_long(const std::string& name,
103  client::spi::ClientContext& context,
104  const raft_group_id& group_id,
105  const std::string& object_name);
106 
113  boost::future<int64_t> add_and_get(int64_t delta);
114 
124  boost::future<bool> compare_and_set(int64_t expect, int64_t update);
125 
131  boost::future<int64_t> get_and_decrement();
132 
138  boost::future<int64_t> decrement_and_get();
139 
145  boost::future<int64_t> get();
146 
153  boost::future<int64_t> get_and_add(int64_t delta);
154 
161  boost::future<int64_t> get_and_set(int64_t new_value);
162 
168  boost::future<int64_t> increment_and_get();
169 
175  boost::future<int64_t> get_and_increment();
176 
182  boost::future<void> set(int64_t new_value);
183 
189  template<typename F>
190  boost::future<void> alter(const F& function)
191  {
192  return to_void_future(alter_and_get(function));
193  }
194 
202  template<typename F>
203  boost::future<int64_t> alter_and_get(const F& function)
204  {
205  auto f = to_data(function);
206  return alter_data(f, alter_result_type::NEW_VALUE);
207  }
208 
217  template<typename F>
218  boost::future<int64_t> get_and_alter(const F& function)
219  {
220  auto f = to_data(function);
221  return alter_data(f, alter_result_type::OLD_VALUE);
222  }
223 
232  template<typename F, typename R>
233  boost::future<boost::optional<R>> apply(const F& function)
234  {
235  auto f = to_data(function);
236  return to_object<R>(apply_data(f));
237  }
238 
239 private:
240  static constexpr const char* SERVICE_NAME = "hz:raft:atomicLongService";
244  enum alter_result_type
245  {
249  OLD_VALUE,
253  NEW_VALUE
254  };
255 
256  boost::future<int64_t> alter_data(
257  client::serialization::pimpl::data& function_data,
258  alter_result_type result_type);
259 
260  boost::future<boost::optional<client::serialization::pimpl::data>>
261  apply_data(client::serialization::pimpl::data& function_data);
262 };
263 
267 class HAZELCAST_API atomic_reference : public cp_proxy
268 {
269 public:
270  atomic_reference(const std::string& name,
271  client::spi::ClientContext& context,
272  const raft_group_id& group_id,
273  const std::string& object_name);
274 
275  template<typename T>
276  boost::future<boost::optional<typename std::remove_pointer<T>::type>> get()
277  {
278  return to_object<typename std::remove_pointer<T>::type>(get_data());
279  }
280 
281  template<typename T>
282  boost::future<boost::optional<typename std::remove_pointer<T>::type>> set(
283  T new_value)
284  {
285  return to_object<typename std::remove_pointer<T>::type>(
286  set_data(to_data<typename std::remove_pointer<T>::type>(new_value)));
287  }
288 
289  template<typename T>
290  boost::future<boost::optional<typename std::remove_pointer<T>::type>>
291  get_and_set(T new_value)
292  {
293  return to_object<typename std::remove_pointer<T>::type>(
294  get_and_set_data(
295  to_data<typename std::remove_pointer<T>::type>(new_value)));
296  }
297 
298  template<typename T, typename V>
299  boost::future<bool> compare_and_set(T expect, V update)
300  {
301  return compare_and_set_data(
302  to_data<typename std::remove_pointer<T>::type>(expect),
303  to_data<typename std::remove_pointer<V>::type>(update));
304  }
305 
306  boost::future<bool> is_null();
307 
308  boost::future<void> clear();
309 
310  template<typename T>
311  boost::future<bool> contains(T value)
312  {
313  return contains_data(
314  to_data<typename std::remove_pointer<T>::type>(value));
315  }
316 
317  template<typename F>
318  boost::future<void> alter(const F& function)
319  {
320  return alter_data(to_data(function));
321  }
322 
323  template<typename T, typename F>
324  boost::future<boost::optional<typename std::remove_pointer<T>::type>>
325  alter_and_get(const F& function)
326  {
327  return to_object<typename std::remove_pointer<T>::type>(
328  alter_and_get_data(to_data(function)));
329  }
330 
331  template<typename T, typename F>
332  boost::future<boost::optional<typename std::remove_pointer<T>::type>>
333  get_and_alter(const F& function)
334  {
335  return to_object<typename std::remove_pointer<T>::type>(
336  get_and_alter_data(to_data(function)));
337  }
338 
339  template<typename R, typename F>
340  boost::future<boost::optional<R>> apply(const F& function)
341  {
342  return to_object<R>(apply_data(to_data(function)));
343  }
344 
345 private:
346  static constexpr const char* SERVICE_NAME = "hz:raft:atomicRefService";
347 
348  enum struct return_value_type
349  {
350  NO_VALUE,
351  OLD,
352  NEW
353  };
354 
355  boost::future<boost::optional<client::serialization::pimpl::data>>
356  get_data();
357 
358  boost::future<boost::optional<client::serialization::pimpl::data>> set_data(
359  const client::serialization::pimpl::data& new_value_data);
360 
361  boost::future<boost::optional<client::serialization::pimpl::data>>
362  get_and_set_data(const client::serialization::pimpl::data& new_value_data);
363 
364  boost::future<bool> compare_and_set_data(
365  const client::serialization::pimpl::data& expect_data,
366  const client::serialization::pimpl::data& update_data);
367 
368  boost::future<bool> contains_data(
369  const client::serialization::pimpl::data& value_data);
370 
371  boost::future<void> alter_data(
372  const client::serialization::pimpl::data& function_data);
373 
374  boost::future<boost::optional<client::serialization::pimpl::data>>
375  alter_and_get_data(const client::serialization::pimpl::data& function_data);
376 
377  boost::future<boost::optional<client::serialization::pimpl::data>>
378  get_and_alter_data(const client::serialization::pimpl::data& function_data);
379 
380  boost::future<boost::optional<client::serialization::pimpl::data>>
381  apply_data(const client::serialization::pimpl::data& function_data);
382 
383  boost::future<boost::optional<client::serialization::pimpl::data>>
384  invoke_apply(const client::serialization::pimpl::data function_data,
385  return_value_type return_type,
386  bool alter);
387 };
388 
389 class HAZELCAST_API latch : public cp_proxy
390 {
391 public:
392  latch(const std::string& name,
393  client::spi::ClientContext& context,
394  const raft_group_id& group_id,
395  const std::string& object_name);
396 
408  boost::future<bool> try_set_count(int32_t count);
409 
415  boost::future<int32_t> get_count();
416 
429  boost::future<void> count_down();
430 
438  boost::future<bool> try_wait();
439 
449  boost::future<void> wait();
450 
480  boost::future<std::cv_status> wait_for(std::chrono::milliseconds timeout);
481 
491  template<typename Clock, typename Duration>
492  boost::future<std::cv_status> wait_until(
493  const std::chrono::time_point<Clock, Duration>& timeout_time)
494  {
495  return wait_for(std::chrono::duration_cast<std::chrono::milliseconds>(
496  timeout_time - Clock::now()));
497  }
498 
499 private:
500  static constexpr const char* SERVICE_NAME = "hz:raft:countDownLatchService";
501 
502  boost::future<int32_t> get_round();
503 
504  void count_down(int round, boost::uuids::uuid invocation_uid);
505 };
506 
510 class HAZELCAST_API fenced_lock : public session_aware_proxy
511 {
512 public:
517  static constexpr int64_t INVALID_FENCE = 0L;
518 
519  fenced_lock(const std::string& name,
520  client::spi::ClientContext& context,
521  const raft_group_id& group_id,
522  const std::string& object_name);
523 
561  boost::future<void> lock();
562 
633  boost::future<int64_t> lock_and_get_fence();
634 
667  boost::future<bool> try_lock();
668 
697  boost::future<bool> try_lock(std::chrono::milliseconds timeout);
698 
772  boost::future<int64_t> try_lock_and_get_fence();
773 
856  boost::future<int64_t> try_lock_and_get_fence(
857  std::chrono::milliseconds timeout);
858 
867  boost::future<void> unlock();
868 
886  boost::future<int64_t> get_fence();
887 
897  boost::future<bool> is_locked();
898 
908  boost::future<bool> is_locked_by_current_thread();
909 
920  boost::future<int32_t> get_lock_count();
921 
927  const raft_group_id& get_group_id();
928 
929  friend bool operator==(const fenced_lock& lhs, const fenced_lock& rhs);
930 
931 protected:
932  void post_destroy();
933 
934 private:
935  struct lock_ownership_state
936  {
937  int64_t fence;
938  int32_t lock_count;
939  int64_t session_id;
940  int64_t thread_id;
941 
942  bool is_locked_by(int64_t session, int64_t thread);
943 
944  bool is_locked();
945  };
946 
947  static constexpr const char* SERVICE_NAME = "hz:raft:lockService";
948 
949  // thread id -> id of the session that has acquired the lock
950  util::SynchronizedMap<int64_t, int64_t> locked_session_ids_;
951 
952  void verify_locked_session_id_if_present(int64_t thread_id,
953  int64_t session_id,
954  bool should_release);
955 
956  void throw_lock_ownership_lost(int64_t session_id) const;
957 
958  void throw_illegal_monitor_state() const;
959 
960  boost::future<int64_t> do_lock(int64_t session_id,
961  int64_t thread_id,
962  boost::uuids::uuid invocation_uid);
963 
964  boost::future<int64_t> do_try_lock(int64_t session_id,
965  int64_t thread_id,
966  boost::uuids::uuid invocation_uid,
967  std::chrono::milliseconds timeout);
968 
969  boost::future<bool> do_unlock(int64_t session_id,
970  int64_t thread_id,
971  boost::uuids::uuid invocation_uid);
972 
973  boost::future<lock_ownership_state> do_get_lock_ownership_state();
974 
975  void invalidate_session(int64_t session_id);
976 
977  void verify_no_locked_session_id_present(int64_t thread_id);
978 };
979 
1058 class HAZELCAST_API counting_semaphore : public session_aware_proxy
1059 {
1060 public:
1061  //---- std::counting_semaphore method impl starts ---------
1087  virtual boost::future<void> acquire(int32_t permits = 1) = 0;
1088 
1111  virtual boost::future<void> release(int32_t permits = 1) = 0;
1112 
1120  boost::future<bool> try_acquire(int32_t permits = 1);
1121 
1155  boost::future<bool> try_acquire_for(std::chrono::milliseconds rel_time,
1156  int32_t permits = 1);
1157 
1162  template<class Clock, class Duration>
1163  boost::future<bool> try_acquire_until(
1164  const std::chrono::time_point<Clock, Duration>& abs_time,
1165  int32_t permits = 1)
1166  {
1167  auto now = Clock::now();
1168  return try_acquire_for(
1169  std::chrono::duration_cast<std::chrono::milliseconds>(abs_time - now),
1170  permits);
1171  }
1172 
1173  //---- std::counting_semaphore method impl ends ----------
1174 
1175  // --- extended methods
1184  boost::future<bool> init(int32_t permits);
1185 
1193  boost::future<int32_t> available_permits();
1194 
1200  virtual boost::future<int32_t> drain_permits() = 0;
1201 
1211  boost::future<void> reduce_permits(int32_t reduction);
1212 
1222  boost::future<void> increase_permits(int32_t increase);
1223 
1224 protected:
1225  static constexpr const char* SERVICE_NAME = "hz:raft:semaphoreService";
1226 
1227  counting_semaphore(
1228  const std::string& proxy_name,
1229  client::spi::ClientContext* context,
1230  const raft_group_id& group_id,
1231  const std::string& object_name,
1232  internal::session::proxy_session_manager& session_manager);
1233 
1234  virtual boost::future<bool> try_acquire_for_millis(
1235  int32_t permits,
1236  std::chrono::milliseconds timeout) = 0;
1237 
1238  virtual int64_t get_thread_id() = 0;
1239 
1240  virtual boost::future<void> do_change_permits(int32_t delta) = 0;
1241 
1242  boost::future<void> do_release(int32_t permits,
1243  int64_t thread_id,
1244  int64_t session_id);
1245 };
1246 
1247 class HAZELCAST_API sessionless_semaphore : public counting_semaphore
1248 {
1249 public:
1251  const std::string& proxy_name,
1252  client::spi::ClientContext* context,
1253  const raft_group_id& group_id,
1254  const std::string& object_name,
1255  internal::session::proxy_session_manager& session_manager);
1256 
1257  boost::future<void> acquire(int32_t permits) override;
1258 
1259  boost::future<void> release(int32_t permits) override;
1260 
1261  boost::future<int32_t> drain_permits() override;
1262 
1263 protected:
1264  boost::future<bool> try_acquire_for_millis(
1265  int32_t permits,
1266  std::chrono::milliseconds timeout) override;
1267 
1268  int64_t get_thread_id() override;
1269 
1270  boost::future<void> do_change_permits(int32_t delta) override;
1271 
1272 private:
1273  boost::future<bool> do_try_acquire(int32_t permits,
1274  std::chrono::milliseconds timeout_ms);
1275 };
1276 
1277 class HAZELCAST_API session_semaphore : public counting_semaphore
1278 {
1279 public:
1281  const std::string& proxy_name,
1282  client::spi::ClientContext* context,
1283  const raft_group_id& group_id,
1284  const std::string& object_name,
1285  internal::session::proxy_session_manager& session_manager);
1286 
1287  boost::future<void> acquire(int32_t permits) override;
1288 
1289  boost::future<void> release(int32_t permits) override;
1290 
1291  boost::future<int32_t> drain_permits() override;
1292 
1293 protected:
1300  static constexpr int32_t DRAIN_SESSION_ACQ_COUNT = 1024;
1301 
1302  boost::future<bool> try_acquire_for_millis(
1303  int32_t permits,
1304  std::chrono::milliseconds timeout) override;
1305 
1306  void throw_illegal_state_exception(std::exception_ptr e);
1307 
1308  int64_t get_thread_id() override;
1309 
1310  boost::future<void> do_change_permits(int32_t delta) override;
1311 };
1312 
1317 class HAZELCAST_API raft_proxy_factory
1318 {
1319 public:
1320  raft_proxy_factory(client::spi::ClientContext& context);
1321 
1322  static std::string without_default_group_name(const std::string& n);
1323 
1324  static std::string object_name_for_proxy(const std::string& name);
1325 
1326  template<typename T>
1327  boost::future<std::shared_ptr<T>> create_proxy(const std::string& name)
1328  {
1329  auto proxy_name = without_default_group_name(name);
1330  auto object_name = object_name_for_proxy(proxy_name);
1331  return get_group_id(proxy_name, object_name)
1332  .then([=](boost::future<raft_group_id> f) {
1333  auto group_id = f.get();
1334  return create<T>(std::move(group_id), proxy_name, object_name);
1335  });
1336  }
1337 
1338 private:
1339  client::spi::ClientContext& context_;
1340  util::SynchronizedMap<std::string, fenced_lock> lock_proxies_;
1341 
1346  static constexpr const char* DEFAULT_GROUP_NAME = "default";
1347 
1348  template<typename T,
1349  typename = typename std::enable_if<
1350  std::is_same<atomic_long, T>::value ||
1351  std::is_same<atomic_reference, T>::value ||
1352  std::is_same<latch, T>::value>::type>
1353  std::shared_ptr<T> create(raft_group_id&& group_id,
1354  const std::string& proxy_name,
1355  const std::string& object_name)
1356  {
1357  return std::make_shared<T>(
1358  proxy_name, context_, std::move(group_id), object_name);
1359  }
1360 
1361  std::shared_ptr<fenced_lock> create_fenced_lock(
1362  raft_group_id&& group_id,
1363  const std::string& proxy_name,
1364  const std::string& object_name);
1365 
1366  template<typename T,
1367  typename = typename std::enable_if<
1368  std::is_same<fenced_lock, T>::value>::type>
1369  std::shared_ptr<fenced_lock> create(raft_group_id&& group_id,
1370  const std::string& proxy_name,
1371  const std::string& object_name)
1372  {
1373  return create_fenced_lock(std::move(group_id), proxy_name, object_name);
1374  }
1375 
1376  std::shared_ptr<counting_semaphore> create_semaphore(
1377  raft_group_id&& group_id,
1378  const std::string& proxy_name,
1379  const std::string& object_name);
1380 
1381  template<typename T,
1382  typename = typename std::enable_if<
1383  std::is_same<counting_semaphore, T>::value>::type>
1384  std::shared_ptr<counting_semaphore> create(raft_group_id&& group_id,
1385  const std::string& proxy_name,
1386  const std::string& object_name)
1387  {
1388  return create_semaphore(std::move(group_id), proxy_name, object_name);
1389  }
1390 
1391  boost::future<raft_group_id> get_group_id(const std::string& proxy_name,
1392  const std::string& object_name);
1393 };
1394 
1406 class HAZELCAST_API cp_subsystem
1407 {
1408 public:
1429  boost::future<std::shared_ptr<atomic_long>> get_atomic_long(
1430  const std::string& name);
1431 
1454  boost::future<std::shared_ptr<atomic_reference>> get_atomic_reference(
1455  const std::string& name);
1456 
1478  boost::future<std::shared_ptr<latch>> get_latch(const std::string& name);
1479 
1500  boost::future<std::shared_ptr<fenced_lock>> get_lock(
1501  const std::string& name);
1502 
1521  boost::future<std::shared_ptr<counting_semaphore>> get_semaphore(
1522  const std::string& name);
1523 
1524 private:
1525  friend client::impl::hazelcast_client_instance_impl;
1526  client::spi::ClientContext& context_;
1527  raft_proxy_factory proxy_factory_;
1528 
1529  cp_subsystem(client::spi::ClientContext& context);
1530 };
1531 } // namespace cp
1532 } // namespace hazelcast
1533 
1534 namespace std {
1535 template<>
1536 struct HAZELCAST_API hash<hazelcast::cp::raft_group_id>
1537 {
1538  std::size_t operator()(
1539  const hazelcast::cp::raft_group_id& group_id) const noexcept;
1540 };
1541 } // namespace std
1542 
1543 namespace boost {
1544 template<>
1545 struct HAZELCAST_API hash<hazelcast::cp::raft_group_id>
1546 {
1547  std::size_t operator()(
1548  const hazelcast::cp::raft_group_id& group_id) const noexcept;
1549 };
1550 } // namespace boost
1551 
1552 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
1553 #pragma warning(pop)
1554 #endif
Client-side Raft-based proxy implementation of atomic long.
Definition: cp.h:100
boost::future< boost::optional< R > > apply(const F &function)
Applies a function on the value, the actual stored value will not change.
Definition: cp.h:233
boost::future< void > alter(const F &function)
Alters the currently stored value by applying a function on it.
Definition: cp.h:190
boost::future< int64_t > get_and_alter(const F &function)
Alters the currently stored value by applying a function on it on and gets the old value.
Definition: cp.h:218
boost::future< int64_t > alter_and_get(const F &function)
Alters the currently stored value by applying a function on it and gets the result.
Definition: cp.h:203
CP Subsystem is a component of Hazelcast that builds a strongly consistent layer for a set of distrib...
Definition: cp.h:1407
Client-side Raft-based proxy implementation of atomic reference.
Definition: cp.h:1318