Hazelcast C++ Client
Hazelcast C++ Client Library
Loading...
Searching...
No Matches
cp.h
1/*
2 * Copyright (c) 2008-2025, Hazelcast, Inc. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <string>
20#include <condition_variable>
21#include <boost/thread/future.hpp>
22
23#include "hazelcast/util/SynchronizedMap.h"
24#include "hazelcast/client/proxy/ProxyImpl.h"
25#include "hazelcast/client/exception/protocol_exceptions.h"
26
27#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
28#pragma warning(push)
29#pragma warning(disable : 4251) // for dll export
30#endif
31
32namespace hazelcast {
33namespace client {
34namespace spi {
35class ClientContext;
36}
37} // namespace client
38namespace cp {
40
41struct HAZELCAST_API raft_group_id
42{
43 std::string name;
44 int64_t seed;
45 int64_t group_id;
46
47 bool operator==(const raft_group_id& rhs) const;
48
49 bool operator!=(const raft_group_id& rhs) const;
50};
51
52class HAZELCAST_API cp_proxy : public client::proxy::ProxyImpl
53{
54public:
55 cp_proxy(const std::string& service_name,
56 const std::string& proxy_name,
57 client::spi::ClientContext* context,
58 const raft_group_id& group_id,
59 const std::string& object_name);
60
61 const raft_group_id& get_group_id() const;
62
63protected:
64 raft_group_id group_id_;
65 std::string object_name_;
66
67 void on_destroy();
68};
69
70namespace internal {
71namespace session {
72class proxy_session_manager;
73}
74} // namespace internal
75class HAZELCAST_API session_aware_proxy : public cp_proxy
76{
77public:
78 session_aware_proxy(
79 const std::string& service_name,
80 const std::string& proxy_name,
81 client::spi::ClientContext* context,
82 const raft_group_id& group_id,
83 const std::string& object_name,
84 internal::session::proxy_session_manager& session_manager);
85
86protected:
87 internal::session::proxy_session_manager& session_manager_;
88
93 void release_session(int64_t session_id);
94};
95
99class HAZELCAST_API atomic_long : public cp_proxy
100{
101public:
102 atomic_long(const std::string& name,
103 client::spi::ClientContext& context,
104 const raft_group_id& group_id,
105 const std::string& object_name);
106
113 boost::future<int64_t> add_and_get(int64_t delta);
114
124 boost::future<bool> compare_and_set(int64_t expect, int64_t update);
125
131 boost::future<int64_t> get_and_decrement();
132
138 boost::future<int64_t> decrement_and_get();
139
145 boost::future<int64_t> get();
146
153 boost::future<int64_t> get_and_add(int64_t delta);
154
161 boost::future<int64_t> get_and_set(int64_t new_value);
162
168 boost::future<int64_t> increment_and_get();
169
175 boost::future<int64_t> get_and_increment();
176
182 boost::future<void> set(int64_t new_value);
183
189 template<typename F>
190 boost::future<void> alter(const F& function)
191 {
192 return to_void_future(alter_and_get(function));
193 }
194
202 template<typename F>
203 boost::future<int64_t> alter_and_get(const F& function)
204 {
205 auto f = to_data(function);
206 return alter_data(f, alter_result_type::NEW_VALUE);
207 }
208
217 template<typename F>
218 boost::future<int64_t> get_and_alter(const F& function)
219 {
220 auto f = to_data(function);
221 return alter_data(f, alter_result_type::OLD_VALUE);
222 }
223
232 template<typename F, typename R>
233 boost::future<boost::optional<R>> apply(const F& function)
234 {
235 auto f = to_data(function);
236 return to_object<R>(apply_data(f));
237 }
238
239private:
240 static constexpr const char* SERVICE_NAME = "hz:raft:atomicLongService";
244 enum alter_result_type
245 {
249 OLD_VALUE,
253 NEW_VALUE
254 };
255
256 boost::future<int64_t> alter_data(
257 client::serialization::pimpl::data& function_data,
258 alter_result_type result_type);
259
260 boost::future<boost::optional<client::serialization::pimpl::data>>
261 apply_data(client::serialization::pimpl::data& function_data);
262};
263
267class HAZELCAST_API atomic_reference : public cp_proxy
268{
269public:
270 atomic_reference(const std::string& name,
271 client::spi::ClientContext& context,
272 const raft_group_id& group_id,
273 const std::string& object_name);
274
275 template<typename T>
276 boost::future<boost::optional<typename std::remove_pointer<T>::type>> get()
277 {
278 return to_object<typename std::remove_pointer<T>::type>(get_data());
279 }
280
281 template<typename T>
282 boost::future<boost::optional<typename std::remove_pointer<T>::type>> set(
283 T new_value)
284 {
285 return to_object<typename std::remove_pointer<T>::type>(
286 set_data(to_data<typename std::remove_pointer<T>::type>(new_value)));
287 }
288
289 template<typename T>
290 boost::future<boost::optional<typename std::remove_pointer<T>::type>>
291 get_and_set(T new_value)
292 {
293 return to_object<typename std::remove_pointer<T>::type>(
294 get_and_set_data(
295 to_data<typename std::remove_pointer<T>::type>(new_value)));
296 }
297
298 template<typename T, typename V>
299 boost::future<bool> compare_and_set(T expect, V update)
300 {
301 return compare_and_set_data(
302 to_data<typename std::remove_pointer<T>::type>(expect),
303 to_data<typename std::remove_pointer<V>::type>(update));
304 }
305
306 boost::future<bool> is_null();
307
308 boost::future<void> clear();
309
310 template<typename T>
311 boost::future<bool> contains(T value)
312 {
313 return contains_data(
314 to_data<typename std::remove_pointer<T>::type>(value));
315 }
316
317 template<typename F>
318 boost::future<void> alter(const F& function)
319 {
320 return alter_data(to_data(function));
321 }
322
323 template<typename T, typename F>
324 boost::future<boost::optional<typename std::remove_pointer<T>::type>>
325 alter_and_get(const F& function)
326 {
327 return to_object<typename std::remove_pointer<T>::type>(
328 alter_and_get_data(to_data(function)));
329 }
330
331 template<typename T, typename F>
332 boost::future<boost::optional<typename std::remove_pointer<T>::type>>
333 get_and_alter(const F& function)
334 {
335 return to_object<typename std::remove_pointer<T>::type>(
336 get_and_alter_data(to_data(function)));
337 }
338
339 template<typename R, typename F>
340 boost::future<boost::optional<R>> apply(const F& function)
341 {
342 return to_object<R>(apply_data(to_data(function)));
343 }
344
345private:
346 static constexpr const char* SERVICE_NAME = "hz:raft:atomicRefService";
347
348 enum struct return_value_type
349 {
350 NO_VALUE,
351 OLD,
352 NEW
353 };
354
355 boost::future<boost::optional<client::serialization::pimpl::data>>
356 get_data();
357
358 boost::future<boost::optional<client::serialization::pimpl::data>> set_data(
359 const client::serialization::pimpl::data& new_value_data);
360
361 boost::future<boost::optional<client::serialization::pimpl::data>>
362 get_and_set_data(const client::serialization::pimpl::data& new_value_data);
363
364 boost::future<bool> compare_and_set_data(
365 const client::serialization::pimpl::data& expect_data,
366 const client::serialization::pimpl::data& update_data);
367
368 boost::future<bool> contains_data(
369 const client::serialization::pimpl::data& value_data);
370
371 boost::future<void> alter_data(
372 const client::serialization::pimpl::data& function_data);
373
374 boost::future<boost::optional<client::serialization::pimpl::data>>
375 alter_and_get_data(const client::serialization::pimpl::data& function_data);
376
377 boost::future<boost::optional<client::serialization::pimpl::data>>
378 get_and_alter_data(const client::serialization::pimpl::data& function_data);
379
380 boost::future<boost::optional<client::serialization::pimpl::data>>
381 apply_data(const client::serialization::pimpl::data& function_data);
382
383 boost::future<boost::optional<client::serialization::pimpl::data>>
384 invoke_apply(const client::serialization::pimpl::data function_data,
385 return_value_type return_type,
386 bool alter);
387};
388
389class HAZELCAST_API latch : public cp_proxy
390{
391public:
392 latch(const std::string& name,
393 client::spi::ClientContext& context,
394 const raft_group_id& group_id,
395 const std::string& object_name);
396
408 boost::future<bool> try_set_count(int32_t count);
409
415 boost::future<int32_t> get_count();
416
429 boost::future<void> count_down();
430
438 boost::future<bool> try_wait();
439
449 boost::future<void> wait();
450
480 boost::future<std::cv_status> wait_for(std::chrono::milliseconds timeout);
481
491 template<typename Clock, typename Duration>
492 boost::future<std::cv_status> wait_until(
493 const std::chrono::time_point<Clock, Duration>& timeout_time)
494 {
495 return wait_for(std::chrono::duration_cast<std::chrono::milliseconds>(
496 timeout_time - Clock::now()));
497 }
498
499private:
500 static constexpr const char* SERVICE_NAME = "hz:raft:countDownLatchService";
501
502 boost::future<int32_t> get_round();
503
504 void count_down(int round, boost::uuids::uuid invocation_uid);
505};
506
510class HAZELCAST_API fenced_lock : public session_aware_proxy
511{
512public:
517 static constexpr int64_t INVALID_FENCE = 0L;
518
519 fenced_lock(const std::string& name,
520 client::spi::ClientContext& context,
521 const raft_group_id& group_id,
522 const std::string& object_name);
523
561 boost::future<void> lock();
562
633 boost::future<int64_t> lock_and_get_fence();
634
667 boost::future<bool> try_lock();
668
697 boost::future<bool> try_lock(std::chrono::milliseconds timeout);
698
772 boost::future<int64_t> try_lock_and_get_fence();
773
856 boost::future<int64_t> try_lock_and_get_fence(
857 std::chrono::milliseconds timeout);
858
867 boost::future<void> unlock();
868
886 boost::future<int64_t> get_fence();
887
897 boost::future<bool> is_locked();
898
908 boost::future<bool> is_locked_by_current_thread();
909
920 boost::future<int32_t> get_lock_count();
921
927 const raft_group_id& get_group_id();
928
929 friend HAZELCAST_API bool operator==(const fenced_lock& lhs,
930 const fenced_lock& rhs);
931
932protected:
933 void post_destroy();
934
935private:
936 struct lock_ownership_state
937 {
938 int64_t fence;
939 int32_t lock_count;
940 int64_t session_id;
941 int64_t thread_id;
942
943 bool is_locked_by(int64_t session, int64_t thread);
944
945 bool is_locked();
946 };
947
948 static constexpr const char* SERVICE_NAME = "hz:raft:lockService";
949
950 // thread id -> id of the session that has acquired the lock
951 util::SynchronizedMap<int64_t, int64_t> locked_session_ids_;
952
953 void verify_locked_session_id_if_present(int64_t thread_id,
954 int64_t session_id,
955 bool should_release);
956
957 void throw_lock_ownership_lost(int64_t session_id) const;
958
959 void throw_illegal_monitor_state() const;
960
961 boost::future<int64_t> do_lock(int64_t session_id,
962 int64_t thread_id,
963 boost::uuids::uuid invocation_uid);
964
965 boost::future<int64_t> do_try_lock(int64_t session_id,
966 int64_t thread_id,
967 boost::uuids::uuid invocation_uid,
968 std::chrono::milliseconds timeout);
969
970 boost::future<bool> do_unlock(int64_t session_id,
971 int64_t thread_id,
972 boost::uuids::uuid invocation_uid);
973
974 boost::future<lock_ownership_state> do_get_lock_ownership_state();
975
976 void invalidate_session(int64_t session_id);
977
978 void verify_no_locked_session_id_present(int64_t thread_id);
979};
980
1059class HAZELCAST_API counting_semaphore : public session_aware_proxy
1060{
1061public:
1062 //---- std::counting_semaphore method impl starts ---------
1088 virtual boost::future<void> acquire(int32_t permits = 1) = 0;
1089
1112 virtual boost::future<void> release(int32_t permits = 1) = 0;
1113
1121 boost::future<bool> try_acquire(int32_t permits = 1);
1122
1156 boost::future<bool> try_acquire_for(std::chrono::milliseconds rel_time,
1157 int32_t permits = 1);
1158
1163 template<class Clock, class Duration>
1164 boost::future<bool> try_acquire_until(
1165 const std::chrono::time_point<Clock, Duration>& abs_time,
1166 int32_t permits = 1)
1167 {
1168 auto now = Clock::now();
1169 return try_acquire_for(
1170 std::chrono::duration_cast<std::chrono::milliseconds>(abs_time - now),
1171 permits);
1172 }
1173
1174 //---- std::counting_semaphore method impl ends ----------
1175
1176 // --- extended methods
1185 boost::future<bool> init(int32_t permits);
1186
1194 boost::future<int32_t> available_permits();
1195
1201 virtual boost::future<int32_t> drain_permits() = 0;
1202
1212 boost::future<void> reduce_permits(int32_t reduction);
1213
1223 boost::future<void> increase_permits(int32_t increase);
1224
1225protected:
1226 static constexpr const char* SERVICE_NAME = "hz:raft:semaphoreService";
1227
1228 counting_semaphore(
1229 const std::string& proxy_name,
1230 client::spi::ClientContext* context,
1231 const raft_group_id& group_id,
1232 const std::string& object_name,
1233 internal::session::proxy_session_manager& session_manager);
1234
1235 virtual boost::future<bool> try_acquire_for_millis(
1236 int32_t permits,
1237 std::chrono::milliseconds timeout) = 0;
1238
1239 virtual int64_t get_thread_id() = 0;
1240
1241 virtual boost::future<void> do_change_permits(int32_t delta) = 0;
1242
1243 boost::future<void> do_release(int32_t permits,
1244 int64_t thread_id,
1245 int64_t session_id);
1246};
1247
1248class HAZELCAST_API sessionless_semaphore : public counting_semaphore
1249{
1250public:
1251 sessionless_semaphore(
1252 const std::string& proxy_name,
1253 client::spi::ClientContext* context,
1254 const raft_group_id& group_id,
1255 const std::string& object_name,
1256 internal::session::proxy_session_manager& session_manager);
1257
1258 boost::future<void> acquire(int32_t permits) override;
1259
1260 boost::future<void> release(int32_t permits) override;
1261
1262 boost::future<int32_t> drain_permits() override;
1263
1264protected:
1265 boost::future<bool> try_acquire_for_millis(
1266 int32_t permits,
1267 std::chrono::milliseconds timeout) override;
1268
1269 int64_t get_thread_id() override;
1270
1271 boost::future<void> do_change_permits(int32_t delta) override;
1272
1273private:
1274 boost::future<bool> do_try_acquire(int32_t permits,
1275 std::chrono::milliseconds timeout_ms);
1276};
1277
1278class HAZELCAST_API session_semaphore : public counting_semaphore
1279{
1280public:
1281 session_semaphore(
1282 const std::string& proxy_name,
1283 client::spi::ClientContext* context,
1284 const raft_group_id& group_id,
1285 const std::string& object_name,
1286 internal::session::proxy_session_manager& session_manager);
1287
1288 boost::future<void> acquire(int32_t permits) override;
1289
1290 boost::future<void> release(int32_t permits) override;
1291
1292 boost::future<int32_t> drain_permits() override;
1293
1294protected:
1301 static constexpr int32_t DRAIN_SESSION_ACQ_COUNT = 1024;
1302
1303 boost::future<bool> try_acquire_for_millis(
1304 int32_t permits,
1305 std::chrono::milliseconds timeout) override;
1306
1307 void throw_illegal_state_exception(std::exception_ptr e);
1308
1309 int64_t get_thread_id() override;
1310
1311 boost::future<void> do_change_permits(int32_t delta) override;
1312};
1313
1318class HAZELCAST_API raft_proxy_factory
1319{
1320public:
1321 raft_proxy_factory(client::spi::ClientContext& context);
1322
1323 static std::string without_default_group_name(const std::string& n);
1324
1325 static std::string object_name_for_proxy(const std::string& name);
1326
1327 template<typename T>
1328 boost::future<std::shared_ptr<T>> create_proxy(const std::string& name)
1329 {
1330 auto proxy_name = without_default_group_name(name);
1331 auto object_name = object_name_for_proxy(proxy_name);
1332 return get_group_id(proxy_name, object_name)
1333 .then([=](boost::future<raft_group_id> f) {
1334 auto group_id = f.get();
1335 return create<T>(std::move(group_id), proxy_name, object_name);
1336 });
1337 }
1338
1339private:
1340 client::spi::ClientContext& context_;
1341 util::SynchronizedMap<std::string, fenced_lock> lock_proxies_;
1342
1347 static constexpr const char* DEFAULT_GROUP_NAME = "default";
1348
1349 template<typename T,
1350 typename = typename std::enable_if<
1351 std::is_same<atomic_long, T>::value ||
1352 std::is_same<atomic_reference, T>::value ||
1353 std::is_same<latch, T>::value>::type>
1354 std::shared_ptr<T> create(raft_group_id&& group_id,
1355 const std::string& proxy_name,
1356 const std::string& object_name)
1357 {
1358 return std::make_shared<T>(
1359 proxy_name, context_, std::move(group_id), object_name);
1360 }
1361
1362 std::shared_ptr<fenced_lock> create_fenced_lock(
1363 raft_group_id&& group_id,
1364 const std::string& proxy_name,
1365 const std::string& object_name);
1366
1367 template<typename T,
1368 typename = typename std::enable_if<
1369 std::is_same<fenced_lock, T>::value>::type>
1370 std::shared_ptr<fenced_lock> create(raft_group_id&& group_id,
1371 const std::string& proxy_name,
1372 const std::string& object_name)
1373 {
1374 return create_fenced_lock(std::move(group_id), proxy_name, object_name);
1375 }
1376
1377 std::shared_ptr<counting_semaphore> create_semaphore(
1378 raft_group_id&& group_id,
1379 const std::string& proxy_name,
1380 const std::string& object_name);
1381
1382 template<typename T,
1383 typename = typename std::enable_if<
1384 std::is_same<counting_semaphore, T>::value>::type>
1385 std::shared_ptr<counting_semaphore> create(raft_group_id&& group_id,
1386 const std::string& proxy_name,
1387 const std::string& object_name)
1388 {
1389 return create_semaphore(std::move(group_id), proxy_name, object_name);
1390 }
1391
1392 boost::future<raft_group_id> get_group_id(const std::string& proxy_name,
1393 const std::string& object_name);
1394};
1395
1407class HAZELCAST_API cp_subsystem
1408{
1409public:
1430 boost::future<std::shared_ptr<atomic_long>> get_atomic_long(
1431 const std::string& name);
1432
1455 boost::future<std::shared_ptr<atomic_reference>> get_atomic_reference(
1456 const std::string& name);
1457
1479 boost::future<std::shared_ptr<latch>> get_latch(const std::string& name);
1480
1501 boost::future<std::shared_ptr<fenced_lock>> get_lock(
1502 const std::string& name);
1503
1522 boost::future<std::shared_ptr<counting_semaphore>> get_semaphore(
1523 const std::string& name);
1524
1525private:
1526 friend client::impl::hazelcast_client_instance_impl;
1527 client::spi::ClientContext& context_;
1528 raft_proxy_factory proxy_factory_;
1529
1530 cp_subsystem(client::spi::ClientContext& context);
1531};
1532} // namespace cp
1533} // namespace hazelcast
1534
1535namespace std {
1536template<>
1537struct HAZELCAST_API hash<hazelcast::cp::raft_group_id>
1538{
1539 std::size_t operator()(
1540 const hazelcast::cp::raft_group_id& group_id) const noexcept;
1541};
1542} // namespace std
1543
1544namespace boost {
1545template<>
1546struct HAZELCAST_API hash<hazelcast::cp::raft_group_id>
1547{
1548 std::size_t operator()(
1549 const hazelcast::cp::raft_group_id& group_id) const noexcept;
1550};
1551} // namespace boost
1552
1553#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
1554#pragma warning(pop)
1555#endif
boost::future< int64_t > get_and_add(int64_t delta)
Atomically adds the given value to the current value.
Definition cp.cpp:253
boost::future< int64_t > get()
Gets the current value.
Definition cp.cpp:246
boost::future< int64_t > decrement_and_get()
Atomically decrements the current value by one.
Definition cp.cpp:240
boost::future< int64_t > get_and_increment()
Atomically increments the current value by one.
Definition cp.cpp:274
boost::future< int64_t > get_and_alter(const F &function)
Alters the currently stored value by applying a function on it on and gets the old value.
Definition cp.h:218
boost::future< boost::optional< R > > apply(const F &function)
Applies a function on the value, the actual stored value will not change.
Definition cp.h:233
boost::future< bool > compare_and_set(int64_t expect, int64_t update)
Atomically sets the value to the given updated value only if the current value.
Definition cp.cpp:226
boost::future< int64_t > alter_and_get(const F &function)
Alters the currently stored value by applying a function on it and gets the result.
Definition cp.h:203
boost::future< int64_t > get_and_set(int64_t new_value)
Atomically sets the given value and returns the old value.
Definition cp.cpp:260
boost::future< void > set(int64_t new_value)
Atomically sets the given value.
Definition cp.cpp:280
boost::future< void > alter(const F &function)
Alters the currently stored value by applying a function on it.
Definition cp.h:190
boost::future< int64_t > increment_and_get()
Atomically increments the current value by one.
Definition cp.cpp:268
boost::future< int64_t > get_and_decrement()
Atomically decrements the current value by one.
Definition cp.cpp:234
boost::future< int64_t > add_and_get(int64_t delta)
Atomically adds the given value to the current value.
Definition cp.cpp:219
boost::future< std::shared_ptr< atomic_long > > get_atomic_long(const std::string &name)
Returns a proxy for an atomic_long instance created on CP Subsystem.
Definition cp.cpp:158
boost::future< std::shared_ptr< fenced_lock > > get_lock(const std::string &name)
Returns a proxy for an fenced_lock instance created on CP Subsystem.
Definition cp.cpp:176
boost::future< std::shared_ptr< latch > > get_latch(const std::string &name)
Returns a proxy for an count_down_latch instance created on CP Subsystem.
Definition cp.cpp:170
boost::future< std::shared_ptr< atomic_reference > > get_atomic_reference(const std::string &name)
Returns a proxy for an atomic_reference instance created on CP Subsystem.
Definition cp.cpp:164
boost::future< std::shared_ptr< counting_semaphore > > get_semaphore(const std::string &name)
Returns a proxy for an semaphore instance created on CP Subsystem.
Definition cp.cpp:182
Client-side Raft-based proxy implementation of atomic reference.
Definition cp.h:1319
void release_session(int64_t session_id)
Decrements acquire count of the session.
Definition cp.cpp:940
static constexpr int32_t DRAIN_SESSION_ACQ_COUNT
Since a proxy does not know how many permits will be drained on the Raft group, it uses this constant...
Definition cp.h:1301
STL namespace.