Hazelcast C++ Client
Hazelcast C++ Client Library
cp.h
1 /*
2  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <string>
20 #include <condition_variable>
21 #include <boost/thread/future.hpp>
22 
23 #include "hazelcast/util/SynchronizedMap.h"
24 #include "hazelcast/client/proxy/ProxyImpl.h"
25 #include "hazelcast/client/exception/protocol_exceptions.h"
26 
27 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
28 #pragma warning(push)
29 #pragma warning(disable: 4251) //for dll export
30 #endif
31 
32 namespace hazelcast {
33  namespace client {
34  namespace spi {
35  class ClientContext;
36  }
37  }
38  namespace cp {
39  class raft_proxy_factory;
40 
41  struct HAZELCAST_API raft_group_id {
42  std::string name;
43  int64_t seed;
44  int64_t group_id;
45 
46  bool operator==(const raft_group_id &rhs) const;
47 
48  bool operator!=(const raft_group_id &rhs) const;
49  };
50 
51  class HAZELCAST_API cp_proxy : public client::proxy::ProxyImpl {
52  public:
53  cp_proxy(const std::string &service_name, const std::string &proxy_name, client::spi::ClientContext *context,
54  const raft_group_id &group_id, const std::string &object_name);
55 
56  const raft_group_id &get_group_id() const;
57 
58  protected:
59  raft_group_id group_id_;
60  std::string object_name_;
61 
62  void on_destroy() ;
63  };
64 
65  namespace internal {
66  namespace session {
67  class proxy_session_manager;
68  }
69  }
70  class HAZELCAST_API session_aware_proxy : public cp_proxy {
71  public:
72  session_aware_proxy(const std::string &service_name, const std::string &proxy_name,
73  client::spi::ClientContext *context, const raft_group_id &group_id,
74  const std::string &object_name,
75  internal::session::proxy_session_manager &session_manager);
76 
77  protected:
78  internal::session::proxy_session_manager &session_manager_;
79 
84  void release_session(int64_t session_id);
85  };
86 
90  class HAZELCAST_API atomic_long : public cp_proxy {
91  public:
92  atomic_long(const std::string &name, client::spi::ClientContext &context,
93  const raft_group_id &group_id, const std::string &object_name);
94 
101  boost::future<int64_t> add_and_get(int64_t delta);
102 
112  boost::future<bool> compare_and_set(int64_t expect, int64_t update);
113 
119  boost::future<int64_t> get_and_decrement();
120 
126  boost::future<int64_t> decrement_and_get();
127 
133  boost::future<int64_t> get();
134 
141  boost::future<int64_t> get_and_add(int64_t delta);
142 
149  boost::future<int64_t> get_and_set(int64_t new_value);
150 
156  boost::future<int64_t> increment_and_get();
157 
163  boost::future<int64_t> get_and_increment();
164 
170  boost::future<void> set(int64_t new_value);
171 
177  template<typename F>
178  boost::future<void> alter(const F &function) {
179  return to_void_future(alter_and_get(function));
180  }
181 
189  template<typename F>
190  boost::future<int64_t> alter_and_get(const F &function) {
191  auto f = to_data(function);
192  return alter_data(f, alter_result_type::NEW_VALUE);
193  }
194 
203  template<typename F>
204  boost::future<int64_t> get_and_alter(const F &function) {
205  auto f = to_data(function);
206  return alter_data(f, alter_result_type::OLD_VALUE);
207  }
208 
216  template<typename F, typename R>
217  boost::future<boost::optional<R>> apply(const F &function) {
218  auto f = to_data(function);
219  return to_object<R>(apply_data(f));
220  }
221 
222  private:
223 
224  static constexpr const char *SERVICE_NAME = "hz:raft:atomicLongService";
228  enum alter_result_type {
232  OLD_VALUE,
236  NEW_VALUE
237  };
238 
239  boost::future<int64_t>
240  alter_data(client::serialization::pimpl::data &function_data, alter_result_type result_type);
241 
242  boost::future<boost::optional<client::serialization::pimpl::data>>
243  apply_data(client::serialization::pimpl::data &function_data);
244  };
245 
249  class HAZELCAST_API atomic_reference : public cp_proxy {
250  public:
251  atomic_reference(const std::string &name, client::spi::ClientContext &context,
252  const raft_group_id &group_id, const std::string &object_name);
253 
254  template<typename T>
255  boost::future<boost::optional<typename std::remove_pointer<T>::type>> get() {
256  return to_object<typename std::remove_pointer<T>::type>(get_data());
257  }
258 
259  template<typename T>
260  boost::future<boost::optional<typename std::remove_pointer<T>::type>> set(T new_value) {
261  return to_object<typename std::remove_pointer<T>::type>(
262  set_data(to_data<typename std::remove_pointer<T>::type>(new_value)));
263  }
264 
265  template<typename T>
266  boost::future<boost::optional<typename std::remove_pointer<T>::type>> get_and_set(T new_value) {
267  return to_object<typename std::remove_pointer<T>::type>(
268  get_and_set_data(to_data<typename std::remove_pointer<T>::type>(new_value)));
269  }
270 
271  template<typename T, typename V>
272  boost::future<bool> compare_and_set(T expect, V update) {
273  return compare_and_set_data(to_data<typename std::remove_pointer<T>::type>(expect),
274  to_data<typename std::remove_pointer<V>::type>(update));
275  }
276 
277  boost::future<bool> is_null();
278 
279  boost::future<void> clear();
280 
281  template<typename T>
282  boost::future<bool> contains(T value) {
283  return contains_data(to_data<typename std::remove_pointer<T>::type>(value));
284  }
285 
286  template<typename F>
287  boost::future<void> alter(const F &function) {
288  return alter_data(to_data(function));
289  }
290 
291  template<typename T, typename F>
292  boost::future<boost::optional<typename std::remove_pointer<T>::type>> alter_and_get(const F &function) {
293  return to_object<typename std::remove_pointer<T>::type>(alter_and_get_data(to_data(function)));
294  }
295 
296  template<typename T, typename F>
297  boost::future<boost::optional<typename std::remove_pointer<T>::type>> get_and_alter(const F &function) {
298  return to_object<typename std::remove_pointer<T>::type>(get_and_alter_data(to_data(function)));
299  }
300 
301  template<typename R, typename F>
302  boost::future<boost::optional<R>> apply(const F &function) {
303  return to_object<R>(apply_data(to_data(function)));
304  }
305 
306  private:
307  static constexpr const char *SERVICE_NAME = "hz:raft:atomicRefService";
308 
309  enum struct return_value_type {
310  NO_VALUE,
311  OLD,
312  NEW
313  };
314 
315  boost::future<boost::optional<client::serialization::pimpl::data>> get_data();
316 
317  boost::future<boost::optional<client::serialization::pimpl::data>>
318  set_data(const client::serialization::pimpl::data &new_value_data);
319 
320  boost::future<boost::optional<client::serialization::pimpl::data>>
321  get_and_set_data(const client::serialization::pimpl::data &new_value_data);
322 
323  boost::future<bool> compare_and_set_data(const client::serialization::pimpl::data &expect_data,
324  const client::serialization::pimpl::data &update_data);
325 
326  boost::future<bool> contains_data(const client::serialization::pimpl::data &value_data);
327 
328  boost::future<void> alter_data(const client::serialization::pimpl::data &function_data);
329 
330  boost::future<boost::optional<client::serialization::pimpl::data>>
331  alter_and_get_data(const client::serialization::pimpl::data &function_data);
332 
333  boost::future<boost::optional<client::serialization::pimpl::data>>
334  get_and_alter_data(const client::serialization::pimpl::data &function_data);
335 
336  boost::future<boost::optional<client::serialization::pimpl::data>>
337  apply_data(const client::serialization::pimpl::data &function_data);
338 
339  boost::future<boost::optional<client::serialization::pimpl::data>>
340  invoke_apply(const client::serialization::pimpl::data function_data, return_value_type return_type,
341  bool alter);
342  };
343 
344  class HAZELCAST_API latch : public cp_proxy {
345  public:
346  latch(const std::string &name, client::spi::ClientContext &context, const raft_group_id &group_id,
347  const std::string &object_name);
348 
359  boost::future<bool> try_set_count(int32_t count);
360 
366  boost::future<int32_t> get_count();
367 
380  boost::future<void> count_down();
381 
388  boost::future<bool> try_wait();
389 
398  boost::future<void> wait();
399 
429  boost::future<std::cv_status> wait_for(std::chrono::milliseconds timeout);
430 
440  template<typename Clock, typename Duration>
441  boost::future<std::cv_status> wait_until(const std::chrono::time_point<Clock, Duration> &timeout_time) {
442  return wait_for(std::chrono::duration_cast<std::chrono::milliseconds>(timeout_time - Clock::now()));
443  }
444 
445  private:
446  static constexpr const char *SERVICE_NAME = "hz:raft:countDownLatchService";
447 
448  boost::future<int32_t> get_round();
449 
450  void count_down(int round, boost::uuids::uuid invocation_uid);
451 
452  };
453 
457  class HAZELCAST_API fenced_lock : public session_aware_proxy {
458  public:
463  static constexpr int64_t INVALID_FENCE = 0L;
464 
465  fenced_lock(const std::string &name, client::spi::ClientContext &context,
466  const raft_group_id &group_id, const std::string &object_name);
467 
505  boost::future<void> lock();
506 
577  boost::future<int64_t> lock_and_get_fence();
578 
611  boost::future<bool> try_lock();
612 
641  boost::future<bool> try_lock(std::chrono::milliseconds timeout);
642 
716  boost::future<int64_t> try_lock_and_get_fence();
717 
800  boost::future<int64_t> try_lock_and_get_fence(std::chrono::milliseconds timeout);
801 
810  boost::future<void> unlock();
811 
829  boost::future<int64_t> get_fence();
830 
840  boost::future<bool> is_locked();
841 
851  boost::future<bool> is_locked_by_current_thread();
852 
863  boost::future<int32_t> get_lock_count();
864 
870  const raft_group_id &get_group_id();
871 
872  friend bool operator==(const fenced_lock &lhs, const fenced_lock &rhs);
873 
874  protected:
875  void post_destroy() ;
876 
877  private:
878  struct lock_ownership_state {
879  int64_t fence;
880  int32_t lock_count;
881  int64_t session_id;
882  int64_t thread_id;
883 
884  bool is_locked_by(int64_t session, int64_t thread);
885 
886  bool is_locked();
887  };
888 
889  static constexpr const char *SERVICE_NAME = "hz:raft:lockService";
890 
891  // thread id -> id of the session that has acquired the lock
892  util::SynchronizedMap<int64_t, int64_t> locked_session_ids_;
893 
894  void
895  verify_locked_session_id_if_present(int64_t thread_id, int64_t session_id, bool should_release);
896 
897  void throw_lock_ownership_lost(int64_t session_id) const;
898 
899  void throw_illegal_monitor_state() const;
900 
901  boost::future<int64_t>
902  do_lock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid);
903 
904  boost::future<int64_t>
905  do_try_lock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid,
906  std::chrono::milliseconds timeout);
907 
908  boost::future<bool>
909  do_unlock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid);
910 
911  boost::future<lock_ownership_state> do_get_lock_ownership_state();
912 
913  void invalidate_session(int64_t session_id);
914 
915  void verify_no_locked_session_id_present(int64_t thread_id);
916  };
917 
1003  class HAZELCAST_API counting_semaphore : public session_aware_proxy {
1004  public:
1005  //---- std::counting_semaphore method impl starts ---------
1031  virtual boost::future<void> acquire(int32_t permits = 1) = 0;
1032 
1055  virtual boost::future<void> release(int32_t permits = 1 ) = 0;
1056 
1064  boost::future<bool> try_acquire(int32_t permits = 1);
1065 
1098  boost::future<bool> try_acquire_for(std::chrono::milliseconds rel_time, int32_t permits = 1);
1099 
1103  template<class Clock, class Duration>
1104  boost::future<bool> try_acquire_until(const std::chrono::time_point<Clock, Duration>& abs_time, int32_t permits = 1) {
1105  auto now = Clock::now();
1106  return try_acquire_for(std::chrono::duration_cast<std::chrono::milliseconds>(abs_time - now), permits);
1107  }
1108 
1109  //---- std::counting_semaphore method impl ends ----------
1110 
1111  // --- extended methods
1119  boost::future<bool> init(int32_t permits);
1120 
1128  boost::future<int32_t> available_permits();
1129 
1135  virtual boost::future<int32_t> drain_permits() = 0;
1136 
1146  boost::future<void> reduce_permits(int32_t reduction);
1147 
1157  boost::future<void> increase_permits(int32_t increase);
1158 
1159  protected:
1160  static constexpr const char *SERVICE_NAME = "hz:raft:semaphoreService";
1161 
1162  counting_semaphore(const std::string &proxy_name, client::spi::ClientContext *context,
1163  const raft_group_id &group_id, const std::string &object_name,
1164  internal::session::proxy_session_manager &session_manager);
1165 
1166  virtual boost::future<bool> try_acquire_for_millis(int32_t permits, std::chrono::milliseconds timeout) = 0;
1167 
1168  virtual int64_t get_thread_id() = 0;
1169 
1170  virtual boost::future<void> do_change_permits(int32_t delta) = 0;
1171 
1172  boost::future<void> do_release(int32_t permits, int64_t thread_id, int64_t session_id);
1173  };
1174 
1175  class HAZELCAST_API sessionless_semaphore : public counting_semaphore {
1176  public:
1177  sessionless_semaphore(const std::string &proxy_name, client::spi::ClientContext *context,
1178  const raft_group_id &group_id, const std::string &object_name,
1179  internal::session::proxy_session_manager &session_manager);
1180 
1181  boost::future<void> acquire(int32_t permits) override;
1182 
1183  boost::future<void> release(int32_t permits) override;
1184 
1185  boost::future<int32_t> drain_permits() override;
1186 
1187  protected:
1188  boost::future<bool> try_acquire_for_millis(int32_t permits, std::chrono::milliseconds timeout) override;
1189 
1190  int64_t get_thread_id() override;
1191 
1192  boost::future<void> do_change_permits(int32_t delta) override;
1193 
1194  private:
1195  boost::future<bool> do_try_acquire(int32_t permits, std::chrono::milliseconds timeout_ms);
1196  };
1197 
1198  class HAZELCAST_API session_semaphore : public counting_semaphore {
1199  public:
1200  session_semaphore(const std::string &proxy_name, client::spi::ClientContext *context,
1201  const raft_group_id &group_id, const std::string &object_name,
1202  internal::session::proxy_session_manager &session_manager);
1203 
1204  boost::future<void> acquire(int32_t permits) override;
1205 
1206  boost::future<void> release(int32_t permits) override;
1207 
1208  boost::future<int32_t> drain_permits() override;
1209 
1210  protected:
1217  static constexpr int32_t DRAIN_SESSION_ACQ_COUNT = 1024;
1218 
1219  boost::future<bool> try_acquire_for_millis(int32_t permits, std::chrono::milliseconds timeout) override;
1220 
1221  void throw_illegal_state_exception(std::exception_ptr e);
1222 
1223  int64_t get_thread_id() override;
1224 
1225  boost::future<void> do_change_permits(int32_t delta) override;
1226  };
1227 
1232  class HAZELCAST_API raft_proxy_factory {
1233  public:
1234  raft_proxy_factory(client::spi::ClientContext &context);
1235 
1236  static std::string without_default_group_name(const std::string &n);
1237 
1238  static std::string object_name_for_proxy(const std::string &name);
1239 
1240  template<typename T>
1241  boost::future<std::shared_ptr<T>> create_proxy(const std::string &name) {
1242  auto proxy_name = without_default_group_name(name);
1243  auto object_name = object_name_for_proxy(proxy_name);
1244  return get_group_id(proxy_name, object_name).then([=] (boost::future<raft_group_id> f) {
1245  auto group_id = f.get();
1246  return create<T>(std::move(group_id), proxy_name, object_name);
1247  });
1248  }
1249 
1250  private:
1251  client::spi::ClientContext &context_;
1252  util::SynchronizedMap<std::string, fenced_lock> lock_proxies_;
1253 
1258  static constexpr const char *DEFAULT_GROUP_NAME = "default";
1259 
1260  template<typename T, typename = typename std::enable_if<std::is_same<atomic_long, T>::value ||
1261  std::is_same<atomic_reference, T>::value ||
1262  std::is_same<latch, T>::value>::type>
1263  std::shared_ptr<T>
1264  create(raft_group_id &&group_id, const std::string &proxy_name, const std::string &object_name) {
1265  return std::make_shared<T>(proxy_name, context_, std::move(group_id), object_name);
1266  }
1267 
1268  std::shared_ptr<fenced_lock>
1269  create_fenced_lock(raft_group_id &&group_id, const std::string &proxy_name, const std::string &object_name);
1270 
1271  template<typename T, typename = typename std::enable_if<std::is_same<fenced_lock, T>::value>::type>
1272  std::shared_ptr<fenced_lock>
1273  create(raft_group_id &&group_id, const std::string &proxy_name, const std::string &object_name) {
1274  return create_fenced_lock(std::move(group_id), proxy_name, object_name);
1275  }
1276 
1277  std::shared_ptr<counting_semaphore>
1278  create_semaphore(raft_group_id &&group_id, const std::string &proxy_name, const std::string &object_name);
1279 
1280  template<typename T, typename = typename std::enable_if<std::is_same<counting_semaphore, T>::value>::type>
1281  std::shared_ptr<counting_semaphore>
1282  create(raft_group_id &&group_id, const std::string &proxy_name, const std::string &object_name) {
1283  return create_semaphore(std::move(group_id), proxy_name, object_name);
1284  }
1285 
1286  boost::future<raft_group_id> get_group_id(const std::string &proxy_name, const std::string &object_name);
1287  };
1288 
1300  class HAZELCAST_API cp_subsystem {
1301  public:
1322  boost::future<std::shared_ptr<atomic_long>> get_atomic_long(const std::string &name);
1323 
1346  boost::future<std::shared_ptr<atomic_reference>> get_atomic_reference(const std::string &name);
1347 
1369  boost::future<std::shared_ptr<latch>> get_latch(const std::string &name);
1370 
1391  boost::future<std::shared_ptr<fenced_lock>> get_lock(const std::string &name);
1392 
1413  boost::future<std::shared_ptr<counting_semaphore>> get_semaphore(const std::string &name);
1414 
1415  private:
1416  friend client::impl::hazelcast_client_instance_impl;
1417  client::spi::ClientContext &context_;
1418  raft_proxy_factory proxy_factory_;
1419 
1420  cp_subsystem(client::spi::ClientContext &context);
1421  };
1422  }
1423 }
1424 
1425 namespace std {
1426  template<>
1427  struct HAZELCAST_API hash<hazelcast::cp::raft_group_id> {
1428  std::size_t operator()(const hazelcast::cp::raft_group_id &group_id) const noexcept;
1429  };
1430 }
1431 
1432 namespace boost {
1433  template<>
1434  struct HAZELCAST_API hash<hazelcast::cp::raft_group_id> {
1435  std::size_t operator()(const hazelcast::cp::raft_group_id &group_id) const noexcept;
1436  };
1437 }
1438 
1439 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
1440 #pragma warning(pop)
1441 #endif
Client-side Raft-based proxy implementation of atomic long.
Definition: cp.h:90
boost::future< boost::optional< R > > apply(const F &function)
Applies a function on the value, the actual stored value will not change.
Definition: cp.h:217
boost::future< void > alter(const F &function)
Alters the currently stored value by applying a function on it.
Definition: cp.h:178
boost::future< int64_t > get_and_alter(const F &function)
Alters the currently stored value by applying a function on it on and gets the old value.
Definition: cp.h:204
boost::future< int64_t > alter_and_get(const F &function)
Alters the currently stored value by applying a function on it and gets the result.
Definition: cp.h:190
CP Subsystem is a component of Hazelcast that builds a strongly consistent layer for a set of distrib...
Definition: cp.h:1300
Client-side Raft-based proxy implementation of atomic reference.
Definition: cp.h:1232