Hazelcast C++ Client
Hazelcast C++ Client Library
replicated_map.h
1 /*
2  * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <string>
19 #include <memory>
20 
21 #include "hazelcast/client/proxy/ReplicatedMapImpl.h"
22 #include "hazelcast/client/entry_listener.h"
23 #include "hazelcast/client/entry_event.h"
24 #include "hazelcast/client/map_event.h"
25 #include "hazelcast/client/query/predicates.h"
26 #include "hazelcast/logger.h"
27 
28 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
29 #pragma warning(push)
30 #pragma warning(disable : 4251) // for dll export
31 #endif
32 
33 namespace hazelcast {
34 namespace client {
46 class HAZELCAST_API replicated_map : public proxy::ReplicatedMapImpl
47 {
48  friend class spi::ProxyManager;
49 
50 public:
51  static constexpr const char* SERVICE_NAME = "hz:impl:replicatedMapService";
66  template<typename K, typename V, typename R = V>
67  boost::future<boost::optional<R>> put(const K& key,
68  const V& value,
69  std::chrono::milliseconds ttl)
70  {
71  return to_object<R>(put_data(to_data(key), to_data(value), ttl));
72  }
73 
84  template<typename K, typename V>
85  boost::future<void> put_all(const std::unordered_map<K, V>& entries)
86  {
87  return put_all_data(to_data_entries(entries));
88  }
89 
96  boost::future<boost::uuids::uuid> add_entry_listener(
97  entry_listener&& listener)
98  {
99  return proxy::ReplicatedMapImpl::add_entry_listener(
100  std::shared_ptr<impl::BaseEventHandler>(
101  new EntryEventHandler<
102  protocol::codec::replicatedmap_addentrylistener_handler>(
103  get_name(),
104  get_context().get_client_cluster_service(),
105  get_context().get_serialization_service(),
106  std::move(listener),
107  get_context().get_logger())));
108  }
109 
122  template<typename K>
123  typename std::enable_if<!std::is_base_of<query::predicate, K>::value,
124  boost::future<boost::uuids::uuid>>::type
125  add_entry_listener(entry_listener&& listener, const K& key)
126  {
127  return proxy::ReplicatedMapImpl::add_entry_listener_to_key(
128  std::shared_ptr<impl::BaseEventHandler>(
129  new EntryEventHandler<
130  protocol::codec::replicatedmap_addentrylistenertokey_handler>(
131  get_name(),
132  get_context().get_client_cluster_service(),
133  get_context().get_serialization_service(),
134  std::move(listener),
135  get_context().get_logger())),
136  to_data(key));
137  }
138 
147  template<typename P>
148  typename std::enable_if<std::is_base_of<query::predicate, P>::value,
149  boost::future<boost::uuids::uuid>>::type
150  add_entry_listener(entry_listener&& listener, const P& predicate)
151  {
152  return proxy::ReplicatedMapImpl::add_entry_listener(
153  std::shared_ptr<impl::BaseEventHandler>(
154  new EntryEventHandler<
155  protocol::codec::
156  replicatedmap_addentrylistenerwithpredicate_handler>(
157  get_name(),
158  get_context().get_client_cluster_service(),
159  get_context().get_serialization_service(),
160  std::move(listener),
161  get_context().get_logger())),
162  to_data(predicate));
163  }
164 
174  template<typename K, typename P>
175  typename std::enable_if<std::is_base_of<query::predicate, P>::value,
176  boost::future<boost::uuids::uuid>>::type
178  const P& predicate,
179  const K& key)
180  {
181  return proxy::ReplicatedMapImpl::add_entry_listener(
182  std::shared_ptr<impl::BaseEventHandler>(
183  new EntryEventHandler<
184  protocol::codec::
185  replicatedmap_addentrylistenertokeywithpredicate_handler>(
186  get_name(),
187  get_context().get_client_cluster_service(),
188  get_context().get_serialization_service(),
189  std::move(listener),
190  get_context().get_logger())),
191  to_data(key),
192  to_data(predicate));
193  }
194 
209  template<typename V>
210  boost::future<std::vector<V>> values()
211  {
212  return to_object_vector<V>(values_data());
213  }
214 
226  template<typename K, typename V>
227  boost::future<std::vector<std::pair<K, V>>> entry_set()
228  {
229  return to_entry_object_vector<K, V>(entry_set_data());
230  }
231 
243  template<typename K>
244  boost::future<std::vector<K>> key_set()
245  {
246  return to_object_vector<K>(key_set_data());
247  }
248 
254  template<typename K>
255  boost::future<bool> contains_key(const K& key)
256  {
257  return contains_key_data(to_data(key));
258  }
259 
265  template<typename V>
266  boost::future<bool> contains_value(const V& value)
267  {
268  return contains_value_data(to_data(value));
269  }
270 
276  template<typename K, typename V>
277  boost::future<boost::optional<V>> get(const K& key)
278  {
279  return to_object<V>(get_data(to_data(key)));
280  }
281 
289  template<typename K, typename V, typename R = V>
290  boost::future<boost::optional<R>> put(const K& key, const V& value)
291  {
292  return put<K, V, R>(key, value, std::chrono::milliseconds(0));
293  }
294 
300  template<typename K, typename V>
301  boost::future<boost::optional<V>> remove(const K& key)
302  {
303  return to_object<V>(remove_data(to_data(key)));
304  }
305 
306 private:
307  replicated_map(const std::string& object_name, spi::ClientContext* context)
308  : proxy::ReplicatedMapImpl(SERVICE_NAME, object_name, context)
309  {}
310 
311  template<typename HANDLER>
312  class EntryEventHandler : public HANDLER
313  {
314  public:
315  EntryEventHandler(
316  const std::string& instance_name,
317  spi::impl::ClientClusterServiceImpl& cluster_service,
318  serialization::pimpl::SerializationService& serialization_service,
319  entry_listener&& listener,
320  logger& lg)
321  : instance_name_(instance_name)
322  , cluster_service_(cluster_service)
323  , serialization_service_(serialization_service)
324  , listener_(std::move(listener))
325  , logger_(lg)
326  {}
327 
328  void handle_entry(
329  const boost::optional<serialization::pimpl::data>& key,
330  const boost::optional<serialization::pimpl::data>& value,
331  const boost::optional<serialization::pimpl::data>& old_value,
332  const boost::optional<serialization::pimpl::data>& merging_value,
333  int32_t event_type,
334  boost::uuids::uuid uuid,
335  int32_t number_of_affected_entries) override
336  {
337  if (event_type ==
338  static_cast<int32_t>(entry_event::type::CLEAR_ALL)) {
339  fire_map_wide_event(key,
340  value,
341  old_value,
342  merging_value,
343  event_type,
344  uuid,
345  number_of_affected_entries);
346  return;
347  }
348 
349  fire_entry_event(key,
350  value,
351  old_value,
352  merging_value,
353  event_type,
354  uuid,
355  number_of_affected_entries);
356  }
357 
358  private:
359  void fire_map_wide_event(
360  const boost::optional<serialization::pimpl::data>& key,
361  const boost::optional<serialization::pimpl::data>& value,
362  const boost::optional<serialization::pimpl::data>& old_value,
363  const boost::optional<serialization::pimpl::data>& merging_value,
364  int32_t event_type,
365  boost::uuids::uuid uuid,
366  int32_t number_of_affected_entries)
367  {
368  auto member = cluster_service_.get_member(uuid);
369  auto mapEventType = static_cast<entry_event::type>(event_type);
370  map_event mapEvent(std::move(member).value(),
371  mapEventType,
372  instance_name_,
373  number_of_affected_entries);
374  listener_.map_cleared_(std::move(mapEvent));
375  }
376 
377  void fire_entry_event(
378  const boost::optional<serialization::pimpl::data>& key,
379  const boost::optional<serialization::pimpl::data>& value,
380  const boost::optional<serialization::pimpl::data>& old_value,
381  const boost::optional<serialization::pimpl::data>& merging_value,
382  int32_t event_type,
383  boost::uuids::uuid uuid,
384  int32_t number_of_affected_entries)
385  {
386  typed_data eventKey, val, oldVal, mergingVal;
387  if (value) {
388  val = typed_data(std::move(*value), serialization_service_);
389  }
390  if (old_value) {
391  oldVal =
392  typed_data(std::move(*old_value), serialization_service_);
393  }
394  if (merging_value) {
395  mergingVal =
396  typed_data(std::move(*merging_value), serialization_service_);
397  }
398  if (key) {
399  eventKey = typed_data(std::move(*key), serialization_service_);
400  }
401  auto m = cluster_service_.get_member(uuid);
402  if (!m.has_value()) {
403  m = member(uuid);
404  }
405  auto type = static_cast<entry_event::type>(event_type);
406  entry_event entryEvent(instance_name_,
407  std::move(m.value()),
408  type,
409  std::move(eventKey),
410  std::move(val),
411  std::move(oldVal),
412  std::move(mergingVal));
413  switch (type) {
414  case entry_event::type::ADDED:
415  listener_.added_(std::move(entryEvent));
416  break;
417  case entry_event::type::REMOVED:
418  listener_.removed_(std::move(entryEvent));
419  break;
420  case entry_event::type::UPDATED:
421  listener_.updated_(std::move(entryEvent));
422  break;
423  case entry_event::type::EVICTED:
424  listener_.evicted_(std::move(entryEvent));
425  break;
426  default:
427  HZ_LOG(
428  logger_,
429  warning,
430  boost::str(boost::format(
431  "Received unrecognized event with type: %1% "
432  "Dropping the event!!!") %
433  static_cast<int>(type)));
434  }
435  }
436 
437  private:
438  const std::string& instance_name_;
439  spi::impl::ClientClusterServiceImpl& cluster_service_;
440  serialization::pimpl::SerializationService& serialization_service_;
441  entry_listener listener_;
442  logger& logger_;
443  };
444 };
445 } // namespace client
446 } // namespace hazelcast
447 
448 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
449 #pragma warning(pop)
450 #endif
Map entry listener to get notified when a map entry is added, removed, updated, evicted,...
std::enable_if< std::is_base_of< query::predicate, P >::value, boost::future< boost::uuids::uuid > >::type add_entry_listener(entry_listener &&listener, const P &predicate)
Adds an continuous entry listener for this map.
boost::future< boost::uuids::uuid > add_entry_listener(entry_listener &&listener)
Adds an entry listener for this map.
boost::future< void > put_all(const std::unordered_map< K, V > &entries)
Copies all of the mappings from the specified map to this map (optional operation).
boost::future< bool > contains_value(const V &value)
boost::future< std::vector< K > > key_set()
Returns a view of the keys contained in this map.
boost::future< std::vector< V > > values()
Due to the lazy nature of the returned array, changes to the map (addition, removal,...
boost::future< boost::optional< R > > put(const K &key, const V &value)
std::enable_if<!std::is_base_of< query::predicate, K >::value, boost::future< boost::uuids::uuid > >::type add_entry_listener(entry_listener &&listener, const K &key)
Adds the specified entry listener for the specified key.
boost::future< boost::optional< V > > get(const K &key)
boost::future< bool > contains_key(const K &key)
boost::future< boost::optional< V > > remove(const K &key)
boost::future< boost::optional< R > > put(const K &key, const V &value, std::chrono::milliseconds ttl)
std::enable_if< std::is_base_of< query::predicate, P >::value, boost::future< boost::uuids::uuid > >::type add_entry_listener(entry_listener &&listener, const P &predicate, const K &key)
Adds an continuous entry listener for this map.
boost::future< std::vector< std::pair< K, V > > > entry_set()
Returns a view of the mappings contained in this map.