20 #include <condition_variable>
21 #include <boost/thread/future.hpp>
23 #include "hazelcast/util/SynchronizedMap.h"
24 #include "hazelcast/client/proxy/ProxyImpl.h"
25 #include "hazelcast/client/exception/protocol_exceptions.h"
27 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
29 #pragma warning(disable: 4251)
39 class raft_proxy_factory;
51 class HAZELCAST_API
cp_proxy :
public client::proxy::ProxyImpl {
53 cp_proxy(
const std::string &service_name,
const std::string &proxy_name, client::spi::ClientContext *context,
54 const raft_group_id &group_id,
const std::string &object_name);
60 std::string object_name_;
67 class proxy_session_manager;
73 client::spi::ClientContext *context,
const raft_group_id &group_id,
74 const std::string &object_name,
75 internal::session::proxy_session_manager &session_manager);
78 internal::session::proxy_session_manager &session_manager_;
84 void release_session(int64_t session_id);
92 atomic_long(
const std::string &name, client::spi::ClientContext &context,
93 const raft_group_id &group_id,
const std::string &object_name);
101 boost::future<int64_t> add_and_get(int64_t delta);
112 boost::future<bool> compare_and_set(int64_t expect, int64_t update);
119 boost::future<int64_t> get_and_decrement();
126 boost::future<int64_t> decrement_and_get();
133 boost::future<int64_t> get();
141 boost::future<int64_t> get_and_add(int64_t delta);
149 boost::future<int64_t> get_and_set(int64_t new_value);
156 boost::future<int64_t> increment_and_get();
163 boost::future<int64_t> get_and_increment();
170 boost::future<void> set(int64_t new_value);
178 boost::future<void>
alter(
const F &
function) {
179 return to_void_future(alter_and_get(
function));
191 auto f = to_data(
function);
192 return alter_data(f, alter_result_type::NEW_VALUE);
205 auto f = to_data(
function);
206 return alter_data(f, alter_result_type::OLD_VALUE);
216 template<
typename F,
typename R>
217 boost::future<boost::optional<R>>
apply(
const F &
function) {
218 auto f = to_data(
function);
219 return to_object<R>(apply_data(f));
224 static constexpr
const char *SERVICE_NAME =
"hz:raft:atomicLongService";
228 enum alter_result_type {
239 boost::future<int64_t>
240 alter_data(client::serialization::pimpl::data &function_data, alter_result_type result_type);
242 boost::future<boost::optional<client::serialization::pimpl::data>>
243 apply_data(client::serialization::pimpl::data &function_data);
249 class HAZELCAST_API atomic_reference :
public cp_proxy {
251 atomic_reference(
const std::string &name, client::spi::ClientContext &context,
252 const raft_group_id &group_id,
const std::string &object_name);
255 boost::future<boost::optional<typename std::remove_pointer<T>::type>> get() {
256 return to_object<typename std::remove_pointer<T>::type>(get_data());
260 boost::future<boost::optional<typename std::remove_pointer<T>::type>> set(T new_value) {
261 return to_object<typename std::remove_pointer<T>::type>(
262 set_data(to_data<
typename std::remove_pointer<T>::type>(new_value)));
266 boost::future<boost::optional<typename std::remove_pointer<T>::type>> get_and_set(T new_value) {
267 return to_object<typename std::remove_pointer<T>::type>(
268 get_and_set_data(to_data<
typename std::remove_pointer<T>::type>(new_value)));
271 template<
typename T,
typename V>
272 boost::future<bool> compare_and_set(T expect, V update) {
273 return compare_and_set_data(to_data<
typename std::remove_pointer<T>::type>(expect),
274 to_data<
typename std::remove_pointer<V>::type>(update));
277 boost::future<bool> is_null();
279 boost::future<void> clear();
282 boost::future<bool> contains(T value) {
283 return contains_data(to_data<
typename std::remove_pointer<T>::type>(value));
287 boost::future<void> alter(
const F &
function) {
288 return alter_data(to_data(
function));
291 template<
typename T,
typename F>
292 boost::future<boost::optional<typename std::remove_pointer<T>::type>> alter_and_get(
const F &
function) {
293 return to_object<typename std::remove_pointer<T>::type>(alter_and_get_data(to_data(
function)));
296 template<
typename T,
typename F>
297 boost::future<boost::optional<typename std::remove_pointer<T>::type>> get_and_alter(
const F &
function) {
298 return to_object<typename std::remove_pointer<T>::type>(get_and_alter_data(to_data(
function)));
301 template<
typename R,
typename F>
302 boost::future<boost::optional<R>> apply(
const F &
function) {
303 return to_object<R>(apply_data(to_data(
function)));
307 static constexpr
const char *SERVICE_NAME =
"hz:raft:atomicRefService";
309 enum struct return_value_type {
315 boost::future<boost::optional<client::serialization::pimpl::data>> get_data();
317 boost::future<boost::optional<client::serialization::pimpl::data>>
318 set_data(
const client::serialization::pimpl::data &new_value_data);
320 boost::future<boost::optional<client::serialization::pimpl::data>>
321 get_and_set_data(
const client::serialization::pimpl::data &new_value_data);
323 boost::future<bool> compare_and_set_data(
const client::serialization::pimpl::data &expect_data,
324 const client::serialization::pimpl::data &update_data);
326 boost::future<bool> contains_data(
const client::serialization::pimpl::data &value_data);
328 boost::future<void> alter_data(
const client::serialization::pimpl::data &function_data);
330 boost::future<boost::optional<client::serialization::pimpl::data>>
331 alter_and_get_data(
const client::serialization::pimpl::data &function_data);
333 boost::future<boost::optional<client::serialization::pimpl::data>>
334 get_and_alter_data(
const client::serialization::pimpl::data &function_data);
336 boost::future<boost::optional<client::serialization::pimpl::data>>
337 apply_data(
const client::serialization::pimpl::data &function_data);
339 boost::future<boost::optional<client::serialization::pimpl::data>>
340 invoke_apply(
const client::serialization::pimpl::data function_data, return_value_type return_type,
344 class HAZELCAST_API latch :
public cp_proxy {
346 latch(
const std::string &name, client::spi::ClientContext &context,
const raft_group_id &group_id,
347 const std::string &object_name);
359 boost::future<bool> try_set_count(int32_t count);
366 boost::future<int32_t> get_count();
380 boost::future<void> count_down();
388 boost::future<bool> try_wait();
398 boost::future<void> wait();
429 boost::future<std::cv_status> wait_for(std::chrono::milliseconds timeout);
440 template<
typename Clock,
typename Duration>
441 boost::future<std::cv_status> wait_until(
const std::chrono::time_point<Clock, Duration> &timeout_time) {
442 return wait_for(std::chrono::duration_cast<std::chrono::milliseconds>(timeout_time - Clock::now()));
446 static constexpr
const char *SERVICE_NAME =
"hz:raft:countDownLatchService";
448 boost::future<int32_t> get_round();
450 void count_down(
int round, boost::uuids::uuid invocation_uid);
457 class HAZELCAST_API fenced_lock :
public session_aware_proxy {
463 static constexpr int64_t INVALID_FENCE = 0L;
465 fenced_lock(
const std::string &name, client::spi::ClientContext &context,
466 const raft_group_id &group_id,
const std::string &object_name);
505 boost::future<void> lock();
577 boost::future<int64_t> lock_and_get_fence();
611 boost::future<bool> try_lock();
641 boost::future<bool> try_lock(std::chrono::milliseconds timeout);
716 boost::future<int64_t> try_lock_and_get_fence();
800 boost::future<int64_t> try_lock_and_get_fence(std::chrono::milliseconds timeout);
810 boost::future<void> unlock();
829 boost::future<int64_t> get_fence();
840 boost::future<bool> is_locked();
851 boost::future<bool> is_locked_by_current_thread();
863 boost::future<int32_t> get_lock_count();
870 const raft_group_id &get_group_id();
872 friend bool operator==(
const fenced_lock &lhs,
const fenced_lock &rhs);
875 void post_destroy() ;
878 struct lock_ownership_state {
884 bool is_locked_by(int64_t session, int64_t thread);
889 static constexpr
const char *SERVICE_NAME =
"hz:raft:lockService";
892 util::SynchronizedMap<int64_t, int64_t> locked_session_ids_;
895 verify_locked_session_id_if_present(int64_t thread_id, int64_t session_id,
bool should_release);
897 void throw_lock_ownership_lost(int64_t session_id)
const;
899 void throw_illegal_monitor_state()
const;
901 boost::future<int64_t>
902 do_lock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid);
904 boost::future<int64_t>
905 do_try_lock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid,
906 std::chrono::milliseconds timeout);
909 do_unlock(int64_t session_id, int64_t thread_id, boost::uuids::uuid invocation_uid);
911 boost::future<lock_ownership_state> do_get_lock_ownership_state();
913 void invalidate_session(int64_t session_id);
915 void verify_no_locked_session_id_present(int64_t thread_id);
1003 class HAZELCAST_API counting_semaphore :
public session_aware_proxy {
1031 virtual boost::future<void> acquire(int32_t permits = 1) = 0;
1055 virtual boost::future<void> release(int32_t permits = 1 ) = 0;
1064 boost::future<bool> try_acquire(int32_t permits = 1);
1098 boost::future<bool> try_acquire_for(std::chrono::milliseconds rel_time, int32_t permits = 1);
1103 template<
class Clock,
class Duration>
1104 boost::future<bool> try_acquire_until(
const std::chrono::time_point<Clock, Duration>& abs_time, int32_t permits = 1) {
1105 auto now = Clock::now();
1106 return try_acquire_for(std::chrono::duration_cast<std::chrono::milliseconds>(abs_time - now), permits);
1119 boost::future<bool> init(int32_t permits);
1128 boost::future<int32_t> available_permits();
1135 virtual boost::future<int32_t> drain_permits() = 0;
1146 boost::future<void> reduce_permits(int32_t reduction);
1157 boost::future<void> increase_permits(int32_t increase);
1160 static constexpr
const char *SERVICE_NAME =
"hz:raft:semaphoreService";
1162 counting_semaphore(
const std::string &proxy_name, client::spi::ClientContext *context,
1163 const raft_group_id &group_id,
const std::string &object_name,
1164 internal::session::proxy_session_manager &session_manager);
1166 virtual boost::future<bool> try_acquire_for_millis(int32_t permits, std::chrono::milliseconds timeout) = 0;
1168 virtual int64_t get_thread_id() = 0;
1170 virtual boost::future<void> do_change_permits(int32_t delta) = 0;
1172 boost::future<void> do_release(int32_t permits, int64_t thread_id, int64_t session_id);
1178 const raft_group_id &group_id,
const std::string &object_name,
1179 internal::session::proxy_session_manager &session_manager);
1181 boost::future<void> acquire(int32_t permits)
override;
1183 boost::future<void> release(int32_t permits)
override;
1185 boost::future<int32_t> drain_permits()
override;
1188 boost::future<bool> try_acquire_for_millis(int32_t permits, std::chrono::milliseconds timeout)
override;
1190 int64_t get_thread_id()
override;
1192 boost::future<void> do_change_permits(int32_t delta)
override;
1195 boost::future<bool> do_try_acquire(int32_t permits, std::chrono::milliseconds timeout_ms);
1200 session_semaphore(
const std::string &proxy_name, client::spi::ClientContext *context,
1201 const raft_group_id &group_id,
const std::string &object_name,
1202 internal::session::proxy_session_manager &session_manager);
1204 boost::future<void> acquire(int32_t permits)
override;
1206 boost::future<void> release(int32_t permits)
override;
1208 boost::future<int32_t> drain_permits()
override;
1217 static constexpr int32_t DRAIN_SESSION_ACQ_COUNT = 1024;
1219 boost::future<bool> try_acquire_for_millis(int32_t permits, std::chrono::milliseconds timeout)
override;
1221 void throw_illegal_state_exception(std::exception_ptr e);
1223 int64_t get_thread_id()
override;
1225 boost::future<void> do_change_permits(int32_t delta)
override;
1236 static std::string without_default_group_name(
const std::string &n);
1238 static std::string object_name_for_proxy(
const std::string &name);
1240 template<
typename T>
1241 boost::future<std::shared_ptr<T>> create_proxy(
const std::string &name) {
1242 auto proxy_name = without_default_group_name(name);
1243 auto object_name = object_name_for_proxy(proxy_name);
1244 return get_group_id(proxy_name, object_name).then([=] (boost::future<raft_group_id> f) {
1245 auto group_id = f.get();
1246 return create<T>(std::move(group_id), proxy_name, object_name);
1251 client::spi::ClientContext &context_;
1252 util::SynchronizedMap<std::string, fenced_lock> lock_proxies_;
1258 static constexpr
const char *DEFAULT_GROUP_NAME =
"default";
1260 template<typename T, typename = typename std::enable_if<std::is_same<atomic_long, T>::value ||
1261 std::is_same<atomic_reference, T>::value ||
1262 std::is_same<latch, T>::value>::type>
1264 create(
raft_group_id &&group_id,
const std::string &proxy_name,
const std::string &object_name) {
1265 return std::make_shared<T>(proxy_name, context_, std::move(group_id), object_name);
1268 std::shared_ptr<fenced_lock>
1269 create_fenced_lock(
raft_group_id &&group_id,
const std::string &proxy_name,
const std::string &object_name);
1271 template<typename T, typename = typename std::enable_if<std::is_same<fenced_lock, T>::value>::type>
1272 std::shared_ptr<fenced_lock>
1273 create(
raft_group_id &&group_id,
const std::string &proxy_name,
const std::string &object_name) {
1274 return create_fenced_lock(std::move(group_id), proxy_name, object_name);
1277 std::shared_ptr<counting_semaphore>
1278 create_semaphore(
raft_group_id &&group_id,
const std::string &proxy_name,
const std::string &object_name);
1280 template<typename T, typename = typename std::enable_if<std::is_same<counting_semaphore, T>::value>::type>
1281 std::shared_ptr<counting_semaphore>
1282 create(
raft_group_id &&group_id,
const std::string &proxy_name,
const std::string &object_name) {
1283 return create_semaphore(std::move(group_id), proxy_name, object_name);
1286 boost::future<raft_group_id> get_group_id(
const std::string &proxy_name,
const std::string &object_name);
1322 boost::future<std::shared_ptr<atomic_long>> get_atomic_long(
const std::string &name);
1346 boost::future<std::shared_ptr<atomic_reference>> get_atomic_reference(
const std::string &name);
1369 boost::future<std::shared_ptr<latch>> get_latch(
const std::string &name);
1391 boost::future<std::shared_ptr<fenced_lock>> get_lock(
const std::string &name);
1413 boost::future<std::shared_ptr<counting_semaphore>> get_semaphore(
const std::string &name);
1416 friend client::impl::hazelcast_client_instance_impl;
1417 client::spi::ClientContext &context_;
1427 struct HAZELCAST_API hash<hazelcast::cp::raft_group_id> {
1434 struct HAZELCAST_API hash<hazelcast::cp::raft_group_id> {
1439 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
1440 #pragma warning(pop)
Client-side Raft-based proxy implementation of atomic long.
boost::future< boost::optional< R > > apply(const F &function)
Applies a function on the value, the actual stored value will not change.
boost::future< void > alter(const F &function)
Alters the currently stored value by applying a function on it.
boost::future< int64_t > get_and_alter(const F &function)
Alters the currently stored value by applying a function on it on and gets the old value.
boost::future< int64_t > alter_and_get(const F &function)
Alters the currently stored value by applying a function on it and gets the result.
CP Subsystem is a component of Hazelcast that builds a strongly consistent layer for a set of distrib...
Client-side Raft-based proxy implementation of atomic reference.