21 #include "hazelcast/client/proxy/ReplicatedMapImpl.h"
22 #include "hazelcast/client/entry_listener.h"
23 #include "hazelcast/client/entry_event.h"
24 #include "hazelcast/client/map_event.h"
25 #include "hazelcast/client/query/predicates.h"
26 #include "hazelcast/logger.h"
28 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
30 #pragma warning(disable : 4251)
48 friend class spi::ProxyManager;
51 static constexpr
const char* SERVICE_NAME =
"hz:impl:replicatedMapService";
66 template<
typename K,
typename V,
typename R = V>
67 boost::future<boost::optional<R>>
put(
const K& key,
69 std::chrono::milliseconds ttl)
71 return to_object<R>(put_data(to_data(key), to_data(value), ttl));
84 template<
typename K,
typename V>
85 boost::future<void>
put_all(
const std::unordered_map<K, V>& entries)
87 return put_all_data(to_data_entries(entries));
99 return proxy::ReplicatedMapImpl::add_entry_listener(
100 std::shared_ptr<impl::BaseEventHandler>(
101 new EntryEventHandler<
102 protocol::codec::replicatedmap_addentrylistener_handler>(
104 get_context().get_client_cluster_service(),
105 get_context().get_serialization_service(),
107 get_context().get_logger())));
123 typename std::enable_if<!std::is_base_of<query::predicate, K>::value,
124 boost::future<boost::uuids::uuid>>::type
127 return proxy::ReplicatedMapImpl::add_entry_listener_to_key(
128 std::shared_ptr<impl::BaseEventHandler>(
129 new EntryEventHandler<
130 protocol::codec::replicatedmap_addentrylistenertokey_handler>(
132 get_context().get_client_cluster_service(),
133 get_context().get_serialization_service(),
135 get_context().get_logger())),
148 typename std::enable_if<std::is_base_of<query::predicate, P>::value,
149 boost::future<boost::uuids::uuid>>::type
152 return proxy::ReplicatedMapImpl::add_entry_listener(
153 std::shared_ptr<impl::BaseEventHandler>(
154 new EntryEventHandler<
156 replicatedmap_addentrylistenerwithpredicate_handler>(
158 get_context().get_client_cluster_service(),
159 get_context().get_serialization_service(),
161 get_context().get_logger())),
174 template<
typename K,
typename P>
175 typename std::enable_if<std::is_base_of<query::predicate, P>::value,
176 boost::future<boost::uuids::uuid>>::type
181 return proxy::ReplicatedMapImpl::add_entry_listener(
182 std::shared_ptr<impl::BaseEventHandler>(
183 new EntryEventHandler<
185 replicatedmap_addentrylistenertokeywithpredicate_handler>(
187 get_context().get_client_cluster_service(),
188 get_context().get_serialization_service(),
190 get_context().get_logger())),
212 return to_object_vector<V>(values_data());
226 template<
typename K,
typename V>
229 return to_entry_object_vector<K, V>(entry_set_data());
246 return to_object_vector<K>(key_set_data());
257 return contains_key_data(to_data(key));
268 return contains_value_data(to_data(value));
276 template<
typename K,
typename V>
277 boost::future<boost::optional<V>>
get(
const K& key)
279 return to_object<V>(get_data(to_data(key)));
289 template<
typename K,
typename V,
typename R = V>
290 boost::future<boost::optional<R>>
put(
const K& key,
const V& value)
292 return put<K, V, R>(key, value, std::chrono::milliseconds(0));
300 template<
typename K,
typename V>
301 boost::future<boost::optional<V>>
remove(
const K& key)
303 return to_object<V>(remove_data(to_data(key)));
307 replicated_map(
const std::string& object_name, spi::ClientContext* context)
308 : proxy::ReplicatedMapImpl(SERVICE_NAME, object_name, context)
311 template<
typename HANDLER>
312 class EntryEventHandler :
public HANDLER
316 const std::string& instance_name,
317 spi::impl::ClientClusterServiceImpl& cluster_service,
318 serialization::pimpl::SerializationService& serialization_service,
319 entry_listener&& listener,
321 : instance_name_(instance_name)
322 , cluster_service_(cluster_service)
323 , serialization_service_(serialization_service)
324 , listener_(std::move(listener))
329 const boost::optional<serialization::pimpl::data>& key,
330 const boost::optional<serialization::pimpl::data>& value,
331 const boost::optional<serialization::pimpl::data>& old_value,
332 const boost::optional<serialization::pimpl::data>& merging_value,
334 boost::uuids::uuid uuid,
335 int32_t number_of_affected_entries)
override
338 static_cast<int32_t
>(entry_event::type::CLEAR_ALL)) {
339 fire_map_wide_event(key,
345 number_of_affected_entries);
349 fire_entry_event(key,
355 number_of_affected_entries);
359 void fire_map_wide_event(
360 const boost::optional<serialization::pimpl::data>& key,
361 const boost::optional<serialization::pimpl::data>& value,
362 const boost::optional<serialization::pimpl::data>& old_value,
363 const boost::optional<serialization::pimpl::data>& merging_value,
365 boost::uuids::uuid uuid,
366 int32_t number_of_affected_entries)
368 auto member = cluster_service_.get_member(uuid);
369 auto mapEventType =
static_cast<entry_event::type
>(event_type);
370 map_event mapEvent(std::move(member).value(),
373 number_of_affected_entries);
374 listener_.map_cleared_(std::move(mapEvent));
377 void fire_entry_event(
378 const boost::optional<serialization::pimpl::data>& key,
379 const boost::optional<serialization::pimpl::data>& value,
380 const boost::optional<serialization::pimpl::data>& old_value,
381 const boost::optional<serialization::pimpl::data>& merging_value,
383 boost::uuids::uuid uuid,
384 int32_t number_of_affected_entries)
386 typed_data eventKey, val, oldVal, mergingVal;
388 val = typed_data(std::move(*value), serialization_service_);
392 typed_data(std::move(*old_value), serialization_service_);
396 typed_data(std::move(*merging_value), serialization_service_);
399 eventKey = typed_data(std::move(*key), serialization_service_);
401 auto m = cluster_service_.get_member(uuid);
402 if (!m.has_value()) {
405 auto type =
static_cast<entry_event::type
>(event_type);
406 entry_event entryEvent(instance_name_,
407 std::move(m.value()),
412 std::move(mergingVal));
414 case entry_event::type::ADDED:
415 listener_.added_(std::move(entryEvent));
417 case entry_event::type::REMOVED:
418 listener_.removed_(std::move(entryEvent));
420 case entry_event::type::UPDATED:
421 listener_.updated_(std::move(entryEvent));
423 case entry_event::type::EVICTED:
424 listener_.evicted_(std::move(entryEvent));
430 boost::str(boost::format(
431 "Received unrecognized event with type: %1% "
432 "Dropping the event!!!") %
433 static_cast<int>(type)));
438 const std::string& instance_name_;
439 spi::impl::ClientClusterServiceImpl& cluster_service_;
440 serialization::pimpl::SerializationService& serialization_service_;
441 entry_listener listener_;
448 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
Map entry listener to get notified when a map entry is added, removed, updated, evicted,...
std::enable_if< std::is_base_of< query::predicate, P >::value, boost::future< boost::uuids::uuid > >::type add_entry_listener(entry_listener &&listener, const P &predicate)
Adds an continuous entry listener for this map.
boost::future< boost::uuids::uuid > add_entry_listener(entry_listener &&listener)
Adds an entry listener for this map.
boost::future< void > put_all(const std::unordered_map< K, V > &entries)
Copies all of the mappings from the specified map to this map (optional operation).
boost::future< bool > contains_value(const V &value)
boost::future< std::vector< K > > key_set()
Returns a view of the keys contained in this map.
boost::future< std::vector< V > > values()
Due to the lazy nature of the returned array, changes to the map (addition, removal,...
boost::future< boost::optional< R > > put(const K &key, const V &value)
std::enable_if<!std::is_base_of< query::predicate, K >::value, boost::future< boost::uuids::uuid > >::type add_entry_listener(entry_listener &&listener, const K &key)
Adds the specified entry listener for the specified key.
boost::future< boost::optional< V > > get(const K &key)
boost::future< bool > contains_key(const K &key)
boost::future< boost::optional< V > > remove(const K &key)
boost::future< boost::optional< R > > put(const K &key, const V &value, std::chrono::milliseconds ttl)
std::enable_if< std::is_base_of< query::predicate, P >::value, boost::future< boost::uuids::uuid > >::type add_entry_listener(entry_listener &&listener, const P &predicate, const K &key)
Adds an continuous entry listener for this map.
boost::future< std::vector< std::pair< K, V > > > entry_set()
Returns a view of the mappings contained in this map.