47class HAZELCAST_API replicated_map :
public proxy::ReplicatedMapImpl
49 friend class spi::ProxyManager;
52 static constexpr const char* SERVICE_NAME =
"hz:impl:replicatedMapService";
67 template<
typename K,
typename V,
typename R = V>
68 boost::future<boost::optional<R>>
put(
const K& key,
70 std::chrono::milliseconds ttl)
72 return to_object<R>(put_data(to_data(key), to_data(value), ttl));
85 template<
typename K,
typename V>
86 boost::future<void>
put_all(
const std::unordered_map<K, V>& entries)
88 return put_all_data(to_data_entries(entries));
100 return proxy::ReplicatedMapImpl::add_entry_listener(
101 std::shared_ptr<impl::BaseEventHandler>(
102 new EntryEventHandler<
103 protocol::codec::replicatedmap_addentrylistener_handler>(
105 get_context().get_client_cluster_service(),
106 get_context().get_serialization_service(),
108 get_context().get_logger())));
124 typename std::enable_if<!std::is_base_of<query::predicate, K>::value,
125 boost::future<boost::uuids::uuid>>::type
128 return proxy::ReplicatedMapImpl::add_entry_listener_to_key(
129 std::shared_ptr<impl::BaseEventHandler>(
130 new EntryEventHandler<
131 protocol::codec::replicatedmap_addentrylistenertokey_handler>(
133 get_context().get_client_cluster_service(),
134 get_context().get_serialization_service(),
136 get_context().get_logger())),
149 typename std::enable_if<std::is_base_of<query::predicate, P>::value,
150 boost::future<boost::uuids::uuid>>::type
153 return proxy::ReplicatedMapImpl::add_entry_listener(
154 std::shared_ptr<impl::BaseEventHandler>(
155 new EntryEventHandler<
157 replicatedmap_addentrylistenerwithpredicate_handler>(
159 get_context().get_client_cluster_service(),
160 get_context().get_serialization_service(),
162 get_context().get_logger())),
175 template<
typename K,
typename P>
176 typename std::enable_if<std::is_base_of<query::predicate, P>::value,
177 boost::future<boost::uuids::uuid>>::type
182 return proxy::ReplicatedMapImpl::add_entry_listener(
183 std::shared_ptr<impl::BaseEventHandler>(
184 new EntryEventHandler<
186 replicatedmap_addentrylistenertokeywithpredicate_handler>(
188 get_context().get_client_cluster_service(),
189 get_context().get_serialization_service(),
191 get_context().get_logger())),
213 return to_object_vector<V>(values_data());
227 template<
typename K,
typename V>
230 return to_entry_object_vector<K, V>(entry_set_data());
247 return to_object_vector<K>(key_set_data());
258 return contains_key_data(to_data(key));
269 return contains_value_data(to_data(value));
277 template<
typename K,
typename V>
278 boost::future<boost::optional<V>>
get(
const K& key)
280 return to_object<V>(get_data(to_data(key)));
290 template<
typename K,
typename V,
typename R = V>
291 boost::future<boost::optional<R>>
put(
const K& key,
const V& value)
293 return put<K, V, R>(key, value, std::chrono::milliseconds(0));
301 template<
typename K,
typename V>
302 boost::future<boost::optional<V>>
remove(
const K& key)
304 return to_object<V>(remove_data(to_data(key)));
308 replicated_map(
const std::string& object_name, spi::ClientContext* context)
309 : proxy::ReplicatedMapImpl(SERVICE_NAME, object_name, context)
312 template<
typename HANDLER>
313 class EntryEventHandler :
public HANDLER
317 const std::string& instance_name,
318 spi::impl::ClientClusterServiceImpl& cluster_service,
319 serialization::pimpl::SerializationService& serialization_service,
320 entry_listener&& listener,
323 , instance_name_(instance_name)
324 , cluster_service_(cluster_service)
325 , serialization_service_(serialization_service)
326 , listener_(
std::move(listener))
331 const boost::optional<serialization::pimpl::data>& key,
332 const boost::optional<serialization::pimpl::data>& value,
333 const boost::optional<serialization::pimpl::data>& old_value,
334 const boost::optional<serialization::pimpl::data>& merging_value,
336 boost::uuids::uuid uuid,
337 int32_t number_of_affected_entries)
override
340 static_cast<int32_t
>(entry_event::type::CLEAR_ALL)) {
341 fire_map_wide_event(key,
347 number_of_affected_entries);
351 fire_entry_event(key,
357 number_of_affected_entries);
361 void fire_map_wide_event(
362 const boost::optional<serialization::pimpl::data>& ,
363 const boost::optional<serialization::pimpl::data>& ,
364 const boost::optional<serialization::pimpl::data>& ,
365 const boost::optional<serialization::pimpl::data>& ,
367 boost::uuids::uuid uuid,
368 int32_t number_of_affected_entries)
370 auto member = cluster_service_.get_member(uuid);
371 auto mapEventType =
static_cast<entry_event::type
>(event_type);
372 map_event mapEvent(std::move(member).value(),
375 number_of_affected_entries);
376 listener_.map_cleared_(std::move(mapEvent));
379 void fire_entry_event(
380 const boost::optional<serialization::pimpl::data>& key,
381 const boost::optional<serialization::pimpl::data>& value,
382 const boost::optional<serialization::pimpl::data>& old_value,
383 const boost::optional<serialization::pimpl::data>& merging_value,
385 boost::uuids::uuid uuid,
388 typed_data eventKey, val, oldVal, mergingVal;
390 val = typed_data(std::move(*value), serialization_service_);
394 typed_data(std::move(*old_value), serialization_service_);
398 typed_data(std::move(*merging_value), serialization_service_);
401 eventKey = typed_data(std::move(*key), serialization_service_);
403 auto m = cluster_service_.get_member(uuid);
404 if (!m.has_value()) {
407 auto type =
static_cast<entry_event::type
>(event_type);
408 entry_event entryEvent(instance_name_,
409 std::move(m.value()),
414 std::move(mergingVal));
416 case entry_event::type::ADDED:
417 listener_.added_(std::move(entryEvent));
419 case entry_event::type::REMOVED:
420 listener_.removed_(std::move(entryEvent));
422 case entry_event::type::UPDATED:
423 listener_.updated_(std::move(entryEvent));
425 case entry_event::type::EVICTED:
426 listener_.evicted_(std::move(entryEvent));
432 boost::str(boost::format(
433 "Received unrecognized event with type: %1% "
434 "Dropping the event!!!") %
435 static_cast<int>(type)));
440 const std::string& instance_name_;
441 spi::impl::ClientClusterServiceImpl& cluster_service_;
442 serialization::pimpl::SerializationService& serialization_service_;
443 entry_listener listener_;
Map entry listener to get notified when a map entry is added, removed, updated, evicted,...