20 #include <condition_variable>
21 #include <boost/thread/future.hpp>
23 #include "hazelcast/util/SynchronizedMap.h"
24 #include "hazelcast/client/proxy/ProxyImpl.h"
25 #include "hazelcast/client/exception/protocol_exceptions.h"
27 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
29 #pragma warning(disable : 4251)
39 class raft_proxy_factory;
52 class HAZELCAST_API
cp_proxy :
public client::proxy::ProxyImpl
55 cp_proxy(
const std::string& service_name,
56 const std::string& proxy_name,
57 client::spi::ClientContext* context,
59 const std::string& object_name);
65 std::string object_name_;
72 class proxy_session_manager;
79 const std::string& service_name,
80 const std::string& proxy_name,
81 client::spi::ClientContext* context,
83 const std::string& object_name,
84 internal::session::proxy_session_manager& session_manager);
87 internal::session::proxy_session_manager& session_manager_;
93 void release_session(int64_t session_id);
103 client::spi::ClientContext& context,
105 const std::string& object_name);
113 boost::future<int64_t> add_and_get(int64_t delta);
124 boost::future<bool> compare_and_set(int64_t expect, int64_t update);
131 boost::future<int64_t> get_and_decrement();
138 boost::future<int64_t> decrement_and_get();
145 boost::future<int64_t> get();
153 boost::future<int64_t> get_and_add(int64_t delta);
161 boost::future<int64_t> get_and_set(int64_t new_value);
168 boost::future<int64_t> increment_and_get();
175 boost::future<int64_t> get_and_increment();
182 boost::future<void> set(int64_t new_value);
190 boost::future<void>
alter(
const F&
function)
192 return to_void_future(alter_and_get(
function));
205 auto f = to_data(
function);
206 return alter_data(f, alter_result_type::NEW_VALUE);
220 auto f = to_data(
function);
221 return alter_data(f, alter_result_type::OLD_VALUE);
232 template<
typename F,
typename R>
233 boost::future<boost::optional<R>>
apply(
const F&
function)
235 auto f = to_data(
function);
236 return to_object<R>(apply_data(f));
240 static constexpr
const char* SERVICE_NAME =
"hz:raft:atomicLongService";
244 enum alter_result_type
256 boost::future<int64_t> alter_data(
257 client::serialization::pimpl::data& function_data,
258 alter_result_type result_type);
260 boost::future<boost::optional<client::serialization::pimpl::data>>
261 apply_data(client::serialization::pimpl::data& function_data);
267 class HAZELCAST_API atomic_reference :
public cp_proxy
270 atomic_reference(
const std::string& name,
271 client::spi::ClientContext& context,
272 const raft_group_id& group_id,
273 const std::string& object_name);
276 boost::future<boost::optional<typename std::remove_pointer<T>::type>> get()
278 return to_object<typename std::remove_pointer<T>::type>(get_data());
282 boost::future<boost::optional<typename std::remove_pointer<T>::type>> set(
285 return to_object<typename std::remove_pointer<T>::type>(
286 set_data(to_data<
typename std::remove_pointer<T>::type>(new_value)));
290 boost::future<boost::optional<typename std::remove_pointer<T>::type>>
291 get_and_set(T new_value)
293 return to_object<typename std::remove_pointer<T>::type>(
295 to_data<
typename std::remove_pointer<T>::type>(new_value)));
298 template<
typename T,
typename V>
299 boost::future<bool> compare_and_set(T expect, V update)
301 return compare_and_set_data(
302 to_data<
typename std::remove_pointer<T>::type>(expect),
303 to_data<
typename std::remove_pointer<V>::type>(update));
306 boost::future<bool> is_null();
308 boost::future<void> clear();
311 boost::future<bool> contains(T value)
313 return contains_data(
314 to_data<
typename std::remove_pointer<T>::type>(value));
318 boost::future<void> alter(
const F&
function)
320 return alter_data(to_data(
function));
323 template<
typename T,
typename F>
324 boost::future<boost::optional<typename std::remove_pointer<T>::type>>
325 alter_and_get(
const F&
function)
327 return to_object<typename std::remove_pointer<T>::type>(
328 alter_and_get_data(to_data(
function)));
331 template<
typename T,
typename F>
332 boost::future<boost::optional<typename std::remove_pointer<T>::type>>
333 get_and_alter(
const F&
function)
335 return to_object<typename std::remove_pointer<T>::type>(
336 get_and_alter_data(to_data(
function)));
339 template<
typename R,
typename F>
340 boost::future<boost::optional<R>> apply(
const F&
function)
342 return to_object<R>(apply_data(to_data(
function)));
346 static constexpr
const char* SERVICE_NAME =
"hz:raft:atomicRefService";
348 enum struct return_value_type
355 boost::future<boost::optional<client::serialization::pimpl::data>>
358 boost::future<boost::optional<client::serialization::pimpl::data>> set_data(
359 const client::serialization::pimpl::data& new_value_data);
361 boost::future<boost::optional<client::serialization::pimpl::data>>
362 get_and_set_data(
const client::serialization::pimpl::data& new_value_data);
364 boost::future<bool> compare_and_set_data(
365 const client::serialization::pimpl::data& expect_data,
366 const client::serialization::pimpl::data& update_data);
368 boost::future<bool> contains_data(
369 const client::serialization::pimpl::data& value_data);
371 boost::future<void> alter_data(
372 const client::serialization::pimpl::data& function_data);
374 boost::future<boost::optional<client::serialization::pimpl::data>>
375 alter_and_get_data(
const client::serialization::pimpl::data& function_data);
377 boost::future<boost::optional<client::serialization::pimpl::data>>
378 get_and_alter_data(
const client::serialization::pimpl::data& function_data);
380 boost::future<boost::optional<client::serialization::pimpl::data>>
381 apply_data(
const client::serialization::pimpl::data& function_data);
383 boost::future<boost::optional<client::serialization::pimpl::data>>
384 invoke_apply(
const client::serialization::pimpl::data function_data,
385 return_value_type return_type,
389 class HAZELCAST_API latch :
public cp_proxy
392 latch(
const std::string& name,
393 client::spi::ClientContext& context,
394 const raft_group_id& group_id,
395 const std::string& object_name);
408 boost::future<bool> try_set_count(int32_t count);
415 boost::future<int32_t> get_count();
429 boost::future<void> count_down();
438 boost::future<bool> try_wait();
449 boost::future<void> wait();
480 boost::future<std::cv_status> wait_for(std::chrono::milliseconds timeout);
491 template<
typename Clock,
typename Duration>
492 boost::future<std::cv_status> wait_until(
493 const std::chrono::time_point<Clock, Duration>& timeout_time)
495 return wait_for(std::chrono::duration_cast<std::chrono::milliseconds>(
496 timeout_time - Clock::now()));
500 static constexpr
const char* SERVICE_NAME =
"hz:raft:countDownLatchService";
502 boost::future<int32_t> get_round();
504 void count_down(
int round, boost::uuids::uuid invocation_uid);
510 class HAZELCAST_API fenced_lock :
public session_aware_proxy
517 static constexpr int64_t INVALID_FENCE = 0L;
519 fenced_lock(
const std::string& name,
520 client::spi::ClientContext& context,
521 const raft_group_id& group_id,
522 const std::string& object_name);
561 boost::future<void> lock();
633 boost::future<int64_t> lock_and_get_fence();
667 boost::future<bool> try_lock();
697 boost::future<bool> try_lock(std::chrono::milliseconds timeout);
772 boost::future<int64_t> try_lock_and_get_fence();
856 boost::future<int64_t> try_lock_and_get_fence(
857 std::chrono::milliseconds timeout);
867 boost::future<void> unlock();
886 boost::future<int64_t> get_fence();
897 boost::future<bool> is_locked();
908 boost::future<bool> is_locked_by_current_thread();
920 boost::future<int32_t> get_lock_count();
927 const raft_group_id& get_group_id();
929 friend bool operator==(
const fenced_lock& lhs,
const fenced_lock& rhs);
935 struct lock_ownership_state
942 bool is_locked_by(int64_t session, int64_t thread);
947 static constexpr
const char* SERVICE_NAME =
"hz:raft:lockService";
950 util::SynchronizedMap<int64_t, int64_t> locked_session_ids_;
952 void verify_locked_session_id_if_present(int64_t thread_id,
954 bool should_release);
956 void throw_lock_ownership_lost(int64_t session_id)
const;
958 void throw_illegal_monitor_state()
const;
960 boost::future<int64_t> do_lock(int64_t session_id,
962 boost::uuids::uuid invocation_uid);
964 boost::future<int64_t> do_try_lock(int64_t session_id,
966 boost::uuids::uuid invocation_uid,
967 std::chrono::milliseconds timeout);
969 boost::future<bool> do_unlock(int64_t session_id,
971 boost::uuids::uuid invocation_uid);
973 boost::future<lock_ownership_state> do_get_lock_ownership_state();
975 void invalidate_session(int64_t session_id);
977 void verify_no_locked_session_id_present(int64_t thread_id);
1058 class HAZELCAST_API counting_semaphore :
public session_aware_proxy
1087 virtual boost::future<void> acquire(int32_t permits = 1) = 0;
1111 virtual boost::future<void> release(int32_t permits = 1) = 0;
1120 boost::future<bool> try_acquire(int32_t permits = 1);
1155 boost::future<bool> try_acquire_for(std::chrono::milliseconds rel_time,
1156 int32_t permits = 1);
1162 template<
class Clock,
class Duration>
1163 boost::future<bool> try_acquire_until(
1164 const std::chrono::time_point<Clock, Duration>& abs_time,
1165 int32_t permits = 1)
1167 auto now = Clock::now();
1168 return try_acquire_for(
1169 std::chrono::duration_cast<std::chrono::milliseconds>(abs_time - now),
1184 boost::future<bool> init(int32_t permits);
1193 boost::future<int32_t> available_permits();
1200 virtual boost::future<int32_t> drain_permits() = 0;
1211 boost::future<void> reduce_permits(int32_t reduction);
1222 boost::future<void> increase_permits(int32_t increase);
1225 static constexpr
const char* SERVICE_NAME =
"hz:raft:semaphoreService";
1228 const std::string& proxy_name,
1229 client::spi::ClientContext* context,
1230 const raft_group_id& group_id,
1231 const std::string& object_name,
1232 internal::session::proxy_session_manager& session_manager);
1234 virtual boost::future<bool> try_acquire_for_millis(
1236 std::chrono::milliseconds timeout) = 0;
1238 virtual int64_t get_thread_id() = 0;
1240 virtual boost::future<void> do_change_permits(int32_t delta) = 0;
1242 boost::future<void> do_release(int32_t permits,
1244 int64_t session_id);
1251 const std::string& proxy_name,
1252 client::spi::ClientContext* context,
1254 const std::string& object_name,
1255 internal::session::proxy_session_manager& session_manager);
1257 boost::future<void> acquire(int32_t permits)
override;
1259 boost::future<void> release(int32_t permits)
override;
1261 boost::future<int32_t> drain_permits()
override;
1264 boost::future<bool> try_acquire_for_millis(
1266 std::chrono::milliseconds timeout)
override;
1268 int64_t get_thread_id()
override;
1270 boost::future<void> do_change_permits(int32_t delta)
override;
1273 boost::future<bool> do_try_acquire(int32_t permits,
1274 std::chrono::milliseconds timeout_ms);
1281 const std::string& proxy_name,
1282 client::spi::ClientContext* context,
1284 const std::string& object_name,
1285 internal::session::proxy_session_manager& session_manager);
1287 boost::future<void> acquire(int32_t permits)
override;
1289 boost::future<void> release(int32_t permits)
override;
1291 boost::future<int32_t> drain_permits()
override;
1300 static constexpr int32_t DRAIN_SESSION_ACQ_COUNT = 1024;
1302 boost::future<bool> try_acquire_for_millis(
1304 std::chrono::milliseconds timeout)
override;
1306 void throw_illegal_state_exception(std::exception_ptr e);
1308 int64_t get_thread_id()
override;
1310 boost::future<void> do_change_permits(int32_t delta)
override;
1322 static std::string without_default_group_name(
const std::string& n);
1324 static std::string object_name_for_proxy(
const std::string& name);
1326 template<
typename T>
1327 boost::future<std::shared_ptr<T>> create_proxy(
const std::string& name)
1329 auto proxy_name = without_default_group_name(name);
1330 auto object_name = object_name_for_proxy(proxy_name);
1331 return get_group_id(proxy_name, object_name)
1332 .then([=](boost::future<raft_group_id> f) {
1333 auto group_id = f.get();
1334 return create<T>(std::move(group_id), proxy_name, object_name);
1339 client::spi::ClientContext& context_;
1340 util::SynchronizedMap<std::string, fenced_lock> lock_proxies_;
1346 static constexpr
const char* DEFAULT_GROUP_NAME =
"default";
1348 template<
typename T,
1349 typename =
typename std::enable_if<
1350 std::is_same<atomic_long, T>::value ||
1351 std::is_same<atomic_reference, T>::value ||
1352 std::is_same<latch, T>::value>::type>
1354 const std::string& proxy_name,
1355 const std::string& object_name)
1357 return std::make_shared<T>(
1358 proxy_name, context_, std::move(group_id), object_name);
1361 std::shared_ptr<fenced_lock> create_fenced_lock(
1363 const std::string& proxy_name,
1364 const std::string& object_name);
1366 template<
typename T,
1367 typename =
typename std::enable_if<
1368 std::is_same<fenced_lock, T>::value>::type>
1369 std::shared_ptr<fenced_lock> create(
raft_group_id&& group_id,
1370 const std::string& proxy_name,
1371 const std::string& object_name)
1373 return create_fenced_lock(std::move(group_id), proxy_name, object_name);
1376 std::shared_ptr<counting_semaphore> create_semaphore(
1378 const std::string& proxy_name,
1379 const std::string& object_name);
1381 template<
typename T,
1382 typename =
typename std::enable_if<
1383 std::is_same<counting_semaphore, T>::value>::type>
1384 std::shared_ptr<counting_semaphore> create(
raft_group_id&& group_id,
1385 const std::string& proxy_name,
1386 const std::string& object_name)
1388 return create_semaphore(std::move(group_id), proxy_name, object_name);
1391 boost::future<raft_group_id> get_group_id(
const std::string& proxy_name,
1392 const std::string& object_name);
1429 boost::future<std::shared_ptr<atomic_long>> get_atomic_long(
1430 const std::string& name);
1454 boost::future<std::shared_ptr<atomic_reference>> get_atomic_reference(
1455 const std::string& name);
1478 boost::future<std::shared_ptr<latch>> get_latch(
const std::string& name);
1500 boost::future<std::shared_ptr<fenced_lock>> get_lock(
1501 const std::string& name);
1521 boost::future<std::shared_ptr<counting_semaphore>> get_semaphore(
1522 const std::string& name);
1525 friend client::impl::hazelcast_client_instance_impl;
1526 client::spi::ClientContext& context_;
1536 struct HAZELCAST_API hash<hazelcast::cp::raft_group_id>
1538 std::size_t operator()(
1545 struct HAZELCAST_API hash<hazelcast::cp::raft_group_id>
1547 std::size_t operator()(
1552 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
1553 #pragma warning(pop)
Client-side Raft-based proxy implementation of atomic long.
boost::future< boost::optional< R > > apply(const F &function)
Applies a function on the value, the actual stored value will not change.
boost::future< void > alter(const F &function)
Alters the currently stored value by applying a function on it.
boost::future< int64_t > get_and_alter(const F &function)
Alters the currently stored value by applying a function on it on and gets the old value.
boost::future< int64_t > alter_and_get(const F &function)
Alters the currently stored value by applying a function on it and gets the result.
CP Subsystem is a component of Hazelcast that builds a strongly consistent layer for a set of distrib...
Client-side Raft-based proxy implementation of atomic reference.