21 #include "hazelcast/client/proxy/ReplicatedMapImpl.h"
22 #include "hazelcast/client/entry_listener.h"
23 #include "hazelcast/client/entry_event.h"
24 #include "hazelcast/client/map_event.h"
25 #include "hazelcast/client/query/predicates.h"
26 #include "hazelcast/logger.h"
28 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
30 #pragma warning(disable: 4251)
47 friend class spi::ProxyManager;
49 static constexpr
const char *SERVICE_NAME =
"hz:impl:replicatedMapService";
64 template<
typename K,
typename V,
typename R = V>
65 boost::future<boost::optional<R>>
put(
const K &key,
const V &value, std::chrono::milliseconds ttl) {
66 return to_object<R>(put_data(to_data(key), to_data(value), ttl));
79 template<
typename K,
typename V>
80 boost::future<void>
put_all(
const std::unordered_map<K, V> &entries) {
81 return put_all_data(to_data_entries(entries));
91 return proxy::ReplicatedMapImpl::add_entry_listener(
92 std::shared_ptr<impl::BaseEventHandler>(
93 new EntryEventHandler<protocol::codec::replicatedmap_addentrylistener_handler>(get_name(), get_context().get_client_cluster_service(),
94 get_context().get_serialization_service(), std::move(listener), get_context().get_logger())));
110 typename std::enable_if<!std::is_base_of<query::predicate, K>::value, boost::future<boost::uuids::uuid>>::type
112 return proxy::ReplicatedMapImpl::add_entry_listener_to_key(
113 std::shared_ptr<impl::BaseEventHandler>(
114 new EntryEventHandler<protocol::codec::replicatedmap_addentrylistenertokey_handler>(get_name(), get_context().get_client_cluster_service(),
115 get_context().get_serialization_service(), std::move(listener),
116 get_context().get_logger())), to_data(key));
127 typename std::enable_if<std::is_base_of<query::predicate, P>::value, boost::future<boost::uuids::uuid>>::type
129 return proxy::ReplicatedMapImpl::add_entry_listener(
130 std::shared_ptr<impl::BaseEventHandler>(
131 new EntryEventHandler<protocol::codec::replicatedmap_addentrylistenerwithpredicate_handler>(get_name(), get_context().get_client_cluster_service(),
132 get_context().get_serialization_service(), std::move(listener),
133 get_context().get_logger())), to_data(predicate));
144 template<
typename K,
typename P>
145 typename std::enable_if<std::is_base_of<query::predicate, P>::value, boost::future<boost::uuids::uuid>>::type
147 return proxy::ReplicatedMapImpl::add_entry_listener(
148 std::shared_ptr<impl::BaseEventHandler>(
149 new EntryEventHandler<protocol::codec::replicatedmap_addentrylistenertokeywithpredicate_handler>(get_name(), get_context().get_client_cluster_service(),
150 get_context().get_serialization_service(), std::move(listener),
151 get_context().get_logger())), to_data(key), to_data(predicate));
169 return to_object_vector<V>(values_data());
184 template<
typename K,
typename V>
185 boost::future<std::vector<std::pair<K, V>>>
entry_set() {
186 return to_entry_object_vector<K,V>(entry_set_data());
203 return to_object_vector<K>(key_set_data());
213 return contains_key_data(to_data(key));
223 return contains_value_data(to_data(value));
231 template<
typename K,
typename V>
232 boost::future<boost::optional<V>>
get(
const K &key) {
233 return to_object<V>(get_data(to_data(key)));
242 template<
typename K,
typename V,
typename R = V>
243 boost::future<boost::optional<R>>
put(
const K &key,
const V &value) {
244 return put<K, V, R>(key, value, std::chrono::milliseconds(0));
252 template<
typename K,
typename V>
253 boost::future<boost::optional<V>>
remove(
const K &key) {
254 return to_object<V>(remove_data(to_data(key)));
257 replicated_map(
const std::string &object_name, spi::ClientContext *context) : proxy::ReplicatedMapImpl(
258 SERVICE_NAME, object_name, context) {
261 template<
typename HANDLER>
262 class EntryEventHandler :
public HANDLER {
264 EntryEventHandler(
const std::string &instance_name,
265 spi::impl::ClientClusterServiceImpl &cluster_service,
266 serialization::pimpl::SerializationService &serialization_service,
267 entry_listener &&listener,
logger &lg)
268 : instance_name_(instance_name), cluster_service_(cluster_service),
269 serialization_service_(serialization_service), listener_(std::move(listener)), logger_(lg) {}
271 void handle_entry(
const boost::optional<serialization::pimpl::data> &key,
272 const boost::optional<serialization::pimpl::data> &value,
273 const boost::optional<serialization::pimpl::data> &old_value,
274 const boost::optional<serialization::pimpl::data> &merging_value,
275 int32_t event_type, boost::uuids::uuid uuid,
276 int32_t number_of_affected_entries)
override {
277 if (event_type ==
static_cast<int32_t
>(entry_event::type::CLEAR_ALL)) {
278 fire_map_wide_event(key, value, old_value, merging_value, event_type, uuid,
279 number_of_affected_entries);
283 fire_entry_event(key, value, old_value, merging_value, event_type, uuid,
284 number_of_affected_entries);
288 void fire_map_wide_event(
const boost::optional<serialization::pimpl::data> &key,
289 const boost::optional<serialization::pimpl::data> &value,
290 const boost::optional<serialization::pimpl::data> &old_value,
291 const boost::optional<serialization::pimpl::data> &merging_value,
292 int32_t event_type, boost::uuids::uuid uuid,
293 int32_t number_of_affected_entries) {
294 auto member = cluster_service_.get_member(uuid);
295 auto mapEventType =
static_cast<entry_event::type
>(event_type);
296 map_event mapEvent(std::move(member).value(), mapEventType, instance_name_,
297 number_of_affected_entries);
298 listener_.map_cleared_(std::move(mapEvent));
301 void fire_entry_event(
const boost::optional<serialization::pimpl::data> &key,
302 const boost::optional<serialization::pimpl::data> &value,
303 const boost::optional<serialization::pimpl::data> &old_value,
304 const boost::optional<serialization::pimpl::data> &merging_value,
305 int32_t event_type, boost::uuids::uuid uuid,
306 int32_t number_of_affected_entries) {
307 typed_data eventKey, val, oldVal, mergingVal;
309 val = typed_data(std::move(*value), serialization_service_);
312 oldVal = typed_data(std::move(*old_value), serialization_service_);
315 mergingVal = typed_data(std::move(*merging_value), serialization_service_);
318 eventKey = typed_data(std::move(*key), serialization_service_);
320 auto m = cluster_service_.get_member(uuid);
321 if (!m.has_value()) {
324 auto type =
static_cast<entry_event::type
>(event_type);
325 entry_event entryEvent(instance_name_, std::move(m.value()), type, std::move(eventKey), std::move(val),
326 std::move(oldVal), std::move(mergingVal));
328 case entry_event::type::ADDED:
329 listener_.added_(std::move(entryEvent));
331 case entry_event::type::REMOVED:
332 listener_.removed_(std::move(entryEvent));
334 case entry_event::type::UPDATED:
335 listener_.updated_(std::move(entryEvent));
337 case entry_event::type::EVICTED:
338 listener_.evicted_(std::move(entryEvent));
341 HZ_LOG(logger_, warning,
342 boost::str(boost::format(
"Received unrecognized event with type: %1% "
343 "Dropping the event!!!")
344 %
static_cast<int>(type))
349 const std::string& instance_name_;
350 spi::impl::ClientClusterServiceImpl &cluster_service_;
351 serialization::pimpl::SerializationService& serialization_service_;
352 entry_listener listener_;
359 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
Map entry listener to get notified when a map entry is added, removed, updated, evicted,...
std::enable_if< std::is_base_of< query::predicate, P >::value, boost::future< boost::uuids::uuid > >::type add_entry_listener(entry_listener &&listener, const P &predicate)
Adds an continuous entry listener for this map.
boost::future< boost::uuids::uuid > add_entry_listener(entry_listener &&listener)
Adds an entry listener for this map.
boost::future< void > put_all(const std::unordered_map< K, V > &entries)
Copies all of the mappings from the specified map to this map (optional operation).
boost::future< bool > contains_value(const V &value)
boost::future< std::vector< K > > key_set()
Returns a view of the keys contained in this map.
boost::future< std::vector< V > > values()
Due to the lazy nature of the returned array, changes to the map (addition, removal,...
boost::future< boost::optional< R > > put(const K &key, const V &value)
std::enable_if<!std::is_base_of< query::predicate, K >::value, boost::future< boost::uuids::uuid > >::type add_entry_listener(entry_listener &&listener, const K &key)
Adds the specified entry listener for the specified key.
boost::future< boost::optional< V > > get(const K &key)
boost::future< bool > contains_key(const K &key)
boost::future< boost::optional< V > > remove(const K &key)
boost::future< boost::optional< R > > put(const K &key, const V &value, std::chrono::milliseconds ttl)
std::enable_if< std::is_base_of< query::predicate, P >::value, boost::future< boost::uuids::uuid > >::type add_entry_listener(entry_listener &&listener, const P &predicate, const K &key)
Adds an continuous entry listener for this map.
boost::future< std::vector< std::pair< K, V > > > entry_set()
Returns a view of the mappings contained in this map.