Hazelcast C++ Client
Hazelcast C++ Client Library
proxy.cpp
1 /*
2  * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <unordered_set>
18 #include <atomic>
19 
20 #include "hazelcast/client/impl/ClientLockReferenceIdGenerator.h"
21 #include "hazelcast/client/proxy/PNCounterImpl.h"
22 #include "hazelcast/client/spi/ClientContext.h"
23 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
24 #include "hazelcast/client/proxy/flake_id_generator_impl.h"
25 #include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
26 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
27 #include "hazelcast/client/client_config.h"
28 #include "hazelcast/client/map/data_entry_view.h"
29 #include "hazelcast/client/proxy/RingbufferImpl.h"
30 #include "hazelcast/client/impl/vector_clock.h"
31 #include "hazelcast/client/internal/partition/strategy/StringPartitioningStrategy.h"
32 #include "hazelcast/util/Util.h"
33 #include "hazelcast/client/topic/reliable_listener.h"
34 
35 namespace hazelcast {
36  namespace client {
37  const std::chrono::milliseconds imap::UNSET{ -1 };
38 
39  reliable_topic::reliable_topic(const std::string &instance_name, spi::ClientContext *context)
40  : proxy::ProxyImpl(reliable_topic::SERVICE_NAME, instance_name, context),
41  executor_(context->get_client_execution_service().get_user_executor()), logger_(context->get_logger()) {
42  auto reliable_config = context->get_client_config().lookup_reliable_topic_config(instance_name);
43  if (reliable_config) {
44  batch_size_ = reliable_config->get_read_batch_size();
45  } else {
46  batch_size_ = config::reliable_topic_config::DEFAULT_READ_BATCH_SIZE;
47  }
48 
49  ringbuffer_ = context->get_hazelcast_client_implementation()->get_distributed_object<ringbuffer>(
50  std::string(reliable_topic::TOPIC_RB_PREFIX) + instance_name);
51  }
52 
53  bool reliable_topic::remove_message_listener(const std::string &registration_id) {
54  int id = util::IOUtil::to_value<int>(registration_id);
55  auto runner = runners_map_.get(id);
56  if (!runner) {
57  return false;
58  }
59  runner->cancel();
60  return true;
61  }
62 
63  void reliable_topic::on_shutdown() {
64  // cancel all runners
65  for (auto &entry : runners_map_.clear()) {
66  entry.second->cancel();
67  }
68  }
69 
70  void reliable_topic::on_destroy() {
71  // cancel all runners
72  for (auto &entry : runners_map_.clear()) {
73  entry.second->cancel();
74  }
75  }
76 
77  void reliable_topic::post_destroy() {
78  // destroy the underlying ringbuffer
79  ringbuffer_.get()->destroy().get();
80  }
81 
82  namespace topic {
83  reliable_listener::reliable_listener(bool loss_tolerant, int64_t initial_sequence_id)
84  : loss_tolerant_(loss_tolerant)
85  , initial_sequence_id_(initial_sequence_id) {}
86  }
87 
88  namespace impl {
89  ClientLockReferenceIdGenerator::ClientLockReferenceIdGenerator() : reference_id_counter_(0) {}
90 
91  int64_t ClientLockReferenceIdGenerator::get_next_reference_id() {
92  return ++reference_id_counter_;
93  }
94  }
95 
96  namespace proxy {
97  MultiMapImpl::MultiMapImpl(const std::string &instance_name, spi::ClientContext *context)
98  : ProxyImpl(multi_map::SERVICE_NAME, instance_name, context) {
99  // TODO: remove this line once the client instance get_distributed_object works as expected in Java for this proxy type
100  lock_reference_id_generator_ = get_context().get_lock_reference_id_generator();
101  }
102 
103  boost::future<bool> MultiMapImpl::put(const serialization::pimpl::data &key, const serialization::pimpl::data &value) {
104  auto request = protocol::codec::multimap_put_encode(get_name(), key, value,
105  util::get_current_thread_id());
106  return invoke_and_get_future<bool>(request, key);
107  }
108 
109  boost::future<std::vector<serialization::pimpl::data>> MultiMapImpl::get_data(const serialization::pimpl::data &key) {
110  auto request = protocol::codec::multimap_get_encode(get_name(), key,
111  util::get_current_thread_id());
112  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, key);
113  }
114 
115  boost::future<bool> MultiMapImpl::remove(const serialization::pimpl::data &key, const serialization::pimpl::data &value) {
116  auto request = protocol::codec::multimap_removeentry_encode(get_name(), key, value,
117  util::get_current_thread_id());
118  return invoke_and_get_future<bool>(request, key);
119  }
120 
121  boost::future<std::vector<serialization::pimpl::data>> MultiMapImpl::remove_data(const serialization::pimpl::data &key) {
122  auto request = protocol::codec::multimap_remove_encode(get_name(), key,
123  util::get_current_thread_id());
124  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, key);
125  }
126 
127  boost::future<std::vector<serialization::pimpl::data>> MultiMapImpl::key_set_data() {
128  auto request = protocol::codec::multimap_keyset_encode(get_name());
129  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
130  }
131 
132  boost::future<std::vector<serialization::pimpl::data>> MultiMapImpl::values_data() {
133  auto request = protocol::codec::multimap_values_encode(get_name());
134  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
135  }
136 
137  boost::future<EntryVector> MultiMapImpl::entry_set_data() {
138  auto request = protocol::codec::multimap_entryset_encode(get_name());
139  return invoke_and_get_future<EntryVector>(request);
140  }
141 
142  boost::future<bool> MultiMapImpl::contains_key(const serialization::pimpl::data &key) {
143  auto request = protocol::codec::multimap_containskey_encode(get_name(), key,
144  util::get_current_thread_id());
145  return invoke_and_get_future<bool>(request, key);
146  }
147 
148  boost::future<bool> MultiMapImpl::contains_value(const serialization::pimpl::data &value) {
149  auto request = protocol::codec::multimap_containsvalue_encode(get_name(), value);
150  return invoke_and_get_future<bool>(request);
151  }
152 
153  boost::future<bool> MultiMapImpl::contains_entry(const serialization::pimpl::data &key,
154  const serialization::pimpl::data &value) {
155  auto request = protocol::codec::multimap_containsentry_encode(get_name(), key, value,
156  util::get_current_thread_id());
157  return invoke_and_get_future<bool>(request, key);
158  }
159 
160  boost::future<int> MultiMapImpl::size() {
161  auto request = protocol::codec::multimap_size_encode(get_name());
162  return invoke_and_get_future<int>(request);
163  }
164 
165  boost::future<void> MultiMapImpl::clear() {
166  auto request = protocol::codec::multimap_clear_encode(get_name());
167  return to_void_future(invoke(request));
168  }
169 
170  boost::future<int> MultiMapImpl::value_count(const serialization::pimpl::data &key) {
171  auto request = protocol::codec::multimap_valuecount_encode(get_name(), key,
172  util::get_current_thread_id());
173  return invoke_and_get_future<int>(request, key);
174  }
175 
176  boost::future<boost::uuids::uuid>
177  MultiMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
178  bool include_value) {
179  return register_listener(create_multi_map_entry_listener_codec(include_value), std::move(entry_event_handler));
180  }
181 
182  boost::future<boost::uuids::uuid>
183  MultiMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
184  bool include_value, serialization::pimpl::data &&key) {
185  return register_listener(create_multi_map_entry_listener_codec(include_value, std::move(key)),
186  std::move(entry_event_handler));
187  }
188 
189  boost::future<bool> MultiMapImpl::remove_entry_listener(boost::uuids::uuid registration_id) {
190  return get_context().get_client_listener_service().deregister_listener(registration_id);
191  }
192 
193  boost::future<void> MultiMapImpl::lock(const serialization::pimpl::data &key) {
194  return lock(key, std::chrono::milliseconds(-1));
195  }
196 
197  boost::future<void> MultiMapImpl::lock(const serialization::pimpl::data &key, std::chrono::milliseconds lease_time) {
198  auto request = protocol::codec::multimap_lock_encode(get_name(), key, util::get_current_thread_id(),
199  std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
200  lock_reference_id_generator_->get_next_reference_id());
201  return to_void_future(invoke_on_partition(request, get_partition_id(key)));
202  }
203 
204  boost::future<bool> MultiMapImpl::is_locked(const serialization::pimpl::data &key) {
205  auto request = protocol::codec::multimap_islocked_encode(get_name(), key);
206  return invoke_and_get_future<bool>(request, key);
207  }
208 
209  boost::future<bool> MultiMapImpl::try_lock(const serialization::pimpl::data &key) {
210  auto request = protocol::codec::multimap_trylock_encode(get_name(), key,
211  util::get_current_thread_id(), INT64_MAX,
212  0,
213  lock_reference_id_generator_->get_next_reference_id());
214  return invoke_and_get_future<bool>(request, key);
215  }
216 
217  boost::future<bool> MultiMapImpl::try_lock(const serialization::pimpl::data &key, std::chrono::milliseconds timeout) {
218  return try_lock(key, timeout, std::chrono::milliseconds(INT64_MAX));
219  }
220 
221  boost::future<bool> MultiMapImpl::try_lock(const serialization::pimpl::data &key, std::chrono::milliseconds timeout, std::chrono::milliseconds lease_time) {
222  auto request = protocol::codec::multimap_trylock_encode(get_name(), key, util::get_current_thread_id(),
223  std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
224  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
225  lock_reference_id_generator_->get_next_reference_id());
226  return invoke_and_get_future<bool>(request, key);
227  }
228 
229  boost::future<void> MultiMapImpl::unlock(const serialization::pimpl::data &key) {
230  auto request = protocol::codec::multimap_unlock_encode(get_name(), key, util::get_current_thread_id(),
231  lock_reference_id_generator_->get_next_reference_id());
232  return to_void_future(invoke_on_partition(request, get_partition_id(key)));
233  }
234 
235  boost::future<void> MultiMapImpl::force_unlock(const serialization::pimpl::data &key) {
236  auto request = protocol::codec::multimap_forceunlock_encode(get_name(), key,
237  lock_reference_id_generator_->get_next_reference_id());
238  return to_void_future(invoke_on_partition(request, get_partition_id(key)));
239  }
240 
241  std::shared_ptr<spi::impl::ListenerMessageCodec>
242  MultiMapImpl::create_multi_map_entry_listener_codec(bool include_value) {
243  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
244  new MultiMapEntryListenerMessageCodec(get_name(), include_value));
245  }
246 
247  std::shared_ptr<spi::impl::ListenerMessageCodec>
248  MultiMapImpl::create_multi_map_entry_listener_codec(bool include_value, serialization::pimpl::data &&key) {
249  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
250  new MultiMapEntryListenerToKeyCodec(get_name(), include_value, std::move(key)));
251  }
252 
253  void MultiMapImpl::on_initialize() {
254  ProxyImpl::on_initialize();
255  lock_reference_id_generator_ = get_context().get_lock_reference_id_generator();
256  }
257 
258  MultiMapImpl::MultiMapEntryListenerMessageCodec::MultiMapEntryListenerMessageCodec(std::string name,
259  bool include_value)
260  : name_(std::move(name)), include_value_(include_value) {}
261 
262  protocol::ClientMessage
263  MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_add_request(bool local_only) const {
264  return protocol::codec::multimap_addentrylistener_encode(name_, include_value_, local_only);
265  }
266 
267  protocol::ClientMessage
268  MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_remove_request(
269  boost::uuids::uuid real_registration_id) const {
270  return protocol::codec::multimap_removeentrylistener_encode(name_, real_registration_id);
271  }
272 
273  protocol::ClientMessage
274  MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_add_request(bool local_only) const {
275  return protocol::codec::multimap_addentrylistenertokey_encode(name_, key_, include_value_,
276  local_only);
277  }
278 
279  protocol::ClientMessage
280  MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_remove_request(
281  boost::uuids::uuid real_registration_id) const {
282  return protocol::codec::multimap_removeentrylistener_encode(name_, real_registration_id);
283  }
284 
285  MultiMapImpl::MultiMapEntryListenerToKeyCodec::MultiMapEntryListenerToKeyCodec(std::string name,
286  bool include_value,
287  serialization::pimpl::data &&key)
288  : name_(std::move(name)), include_value_(include_value), key_(std::move(key)) {}
289 
290  const std::shared_ptr<std::unordered_set<member> > PNCounterImpl::EMPTY_ADDRESS_LIST(
291  new std::unordered_set<member>());
292 
293  PNCounterImpl::PNCounterImpl(const std::string &service_name, const std::string &object_name,
294  spi::ClientContext *context)
295  : ProxyImpl(service_name, object_name, context), max_configured_replica_count_(0),
296  observed_clock_(std::shared_ptr<impl::vector_clock>(new impl::vector_clock())),
297  logger_(context->get_logger()) {
298  }
299 
300  std::ostream &operator<<(std::ostream &os, const PNCounterImpl &proxy) {
301  os << "PNCounter{name='" << proxy.get_name() << "\'}";
302  return os;
303  }
304 
305  boost::future<int64_t> PNCounterImpl::get() {
306  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
307  if (!target) {
308  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster("ClientPNCounterProxy::get",
309  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
310  }
311  return invoke_get_internal(EMPTY_ADDRESS_LIST, nullptr, target);
312  }
313 
314  boost::future<int64_t> PNCounterImpl::get_and_add(int64_t delta) {
315  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
316  if (!target) {
317  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster("ClientPNCounterProxy::getAndAdd",
318  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
319  }
320  return invoke_add_internal(delta, true, EMPTY_ADDRESS_LIST,nullptr, target);
321  }
322 
323  boost::future<int64_t> PNCounterImpl::add_and_get(int64_t delta) {
324  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
325  if (!target) {
326  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster("ClientPNCounterProxy::addAndGet",
327  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
328  }
329  return invoke_add_internal(delta, false, EMPTY_ADDRESS_LIST,nullptr, target);
330  }
331 
332  boost::future<int64_t> PNCounterImpl::get_and_subtract(int64_t delta) {
333  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
334  if (!target) {
335  BOOST_THROW_EXCEPTION(
336  exception::no_data_member_in_cluster("ClientPNCounterProxy::getAndSubtract",
337  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
338  }
339  return invoke_add_internal(-delta, true, EMPTY_ADDRESS_LIST,nullptr, target);
340 
341  }
342 
343  boost::future<int64_t> PNCounterImpl::subtract_and_get(int64_t delta) {
344  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
345  if (!target) {
346  BOOST_THROW_EXCEPTION(
347  exception::no_data_member_in_cluster("ClientPNCounterProxy::subtractAndGet",
348  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
349  }
350  return invoke_add_internal(-delta, false, EMPTY_ADDRESS_LIST,nullptr, target);
351  }
352 
353  boost::future<int64_t> PNCounterImpl::decrement_and_get() {
354  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
355  if (!target) {
356  BOOST_THROW_EXCEPTION(
357  exception::no_data_member_in_cluster("ClientPNCounterProxy::decrementAndGet",
358  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
359  }
360  return invoke_add_internal(-1, false, EMPTY_ADDRESS_LIST,nullptr, target);
361  }
362 
363  boost::future<int64_t> PNCounterImpl::increment_and_get() {
364  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
365  if (!target) {
366  BOOST_THROW_EXCEPTION(
367  exception::no_data_member_in_cluster("ClientPNCounterProxy::incrementAndGet",
368  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
369  }
370  return invoke_add_internal(1, false, EMPTY_ADDRESS_LIST,nullptr, target);
371  }
372 
373  boost::future<int64_t> PNCounterImpl::get_and_decrement() {
374  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
375  if (!target) {
376  BOOST_THROW_EXCEPTION(
377  exception::no_data_member_in_cluster("ClientPNCounterProxy::getAndDecrement",
378  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
379  }
380  return invoke_add_internal(-1, true, EMPTY_ADDRESS_LIST,nullptr, target);
381  }
382 
383  boost::future<int64_t> PNCounterImpl::get_and_increment() {
384  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
385  if (!target) {
386  BOOST_THROW_EXCEPTION(
387  exception::no_data_member_in_cluster("ClientPNCounterProxy::getAndIncrement",
388  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
389  }
390  return invoke_add_internal(1, true, EMPTY_ADDRESS_LIST,nullptr, target);
391  }
392 
393  boost::future<void> PNCounterImpl::reset() {
394  observed_clock_ = std::shared_ptr<impl::vector_clock>(new impl::vector_clock());
395  return boost::make_ready_future();
396  }
397 
398  boost::shared_ptr<member>
399  PNCounterImpl::get_crdt_operation_target(const std::unordered_set<member> &excluded_addresses) {
400  auto replicaAddress = current_target_replica_address_.load();
401  if (replicaAddress && excluded_addresses.find(*replicaAddress) == excluded_addresses.end()) {
402  return replicaAddress;
403  }
404 
405  {
406  std::lock_guard<std::mutex> guard(target_selection_mutex_);
407  replicaAddress = current_target_replica_address_.load();
408  if (!replicaAddress ||
409  excluded_addresses.find(*replicaAddress) != excluded_addresses.end()) {
410  current_target_replica_address_ = choose_target_replica(excluded_addresses);
411  }
412  }
413  return current_target_replica_address_;
414  }
415 
416  boost::shared_ptr<member>
417  PNCounterImpl::choose_target_replica(const std::unordered_set<member> &excluded_addresses) {
418  std::vector<member> replicaAddresses = get_replica_addresses(excluded_addresses);
419  if (replicaAddresses.empty()) {
420  return nullptr;
421  }
422  // TODO: Use a random generator as used in Java (ThreadLocalRandomProvider) which is per thread
423  int randomReplicaIndex = std::abs(rand()) % (int) replicaAddresses.size();
424  return boost::make_shared<member>(replicaAddresses[randomReplicaIndex]);
425  }
426 
427  std::vector<member> PNCounterImpl::get_replica_addresses(const std::unordered_set<member> &excluded_members) {
428  std::vector<member> dataMembers = get_context().get_client_cluster_service().get_members(
429  *member_selectors::DATA_MEMBER_SELECTOR);
430  int32_t replicaCount = get_max_configured_replica_count();
431  int currentReplicaCount = util::min<int>(replicaCount, (int) dataMembers.size());
432 
433  std::vector<member> replicaMembers;
434  for (int i = 0; i < currentReplicaCount; i++) {
435  if (excluded_members.find(dataMembers[i]) == excluded_members.end()) {
436  replicaMembers.push_back(dataMembers[i]);
437  }
438  }
439  return replicaMembers;
440  }
441 
442  int32_t PNCounterImpl::get_max_configured_replica_count() {
443  if (max_configured_replica_count_ > 0) {
444  return max_configured_replica_count_;
445  } else {
446  auto request = protocol::codec::pncounter_getconfiguredreplicacount_encode(
447  get_name());
448  max_configured_replica_count_ = invoke_and_get_future<int32_t>(request).get();
449  }
450  return max_configured_replica_count_;
451  }
452 
453  boost::shared_ptr<member>
454  PNCounterImpl::try_choose_a_new_target(std::shared_ptr<std::unordered_set<member>> excluded_addresses,
455  boost::shared_ptr<member> last_target,
456  const exception::hazelcast_ &last_exception) {
457  HZ_LOG(logger_, finest,
458  boost::str(boost::format("Exception occurred while invoking operation on target %1%, "
459  "choosing different target. Cause: %2%")
460  % last_target % last_exception)
461  );
462  if (excluded_addresses == EMPTY_ADDRESS_LIST) {
463  // TODO: Make sure that this only affects the local variable of the method
464  excluded_addresses = std::make_shared<std::unordered_set<member>>();
465  }
466  excluded_addresses->insert(*last_target);
467  return get_crdt_operation_target(*excluded_addresses);
468  }
469 
470  boost::future<int64_t>
471  PNCounterImpl::invoke_get_internal(std::shared_ptr<std::unordered_set<member>> excluded_addresses,
472  std::exception_ptr last_exception,
473  const boost::shared_ptr<member> &target) {
474  if (!target) {
475  if (last_exception) {
476  std::rethrow_exception(last_exception);
477  } else {
478  BOOST_THROW_EXCEPTION(
479  exception::no_data_member_in_cluster("ClientPNCounterProxy::invokeGetInternal",
480  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
481  }
482  }
483  try {
484  auto timestamps = observed_clock_.get()->entry_set();
485  auto request = protocol::codec::pncounter_get_encode(get_name(), timestamps, target->get_uuid());
486  return invoke_on_member(request, target->get_uuid()).then(boost::launch::sync,
487  [=](boost::future<protocol::ClientMessage> f) {
488  try {
489  return get_and_update_timestamps(
490  std::move(f));
491  } catch (exception::hazelcast_ &e) {
492  return invoke_get_internal(
493  excluded_addresses,
494  std::current_exception(),
495  try_choose_a_new_target(
496  excluded_addresses,
497  target, e)).get();
498  }
499  });
500  } catch (exception::hazelcast_ &e) {
501  return invoke_get_internal(excluded_addresses, std::current_exception(),
502  try_choose_a_new_target(excluded_addresses, target, e));
503  }
504  }
505 
506  boost::future<int64_t>
507  PNCounterImpl::invoke_add_internal(int64_t delta, bool getBeforeUpdate,
508  std::shared_ptr<std::unordered_set<member> > excluded_addresses,
509  std::exception_ptr last_exception,
510  const boost::shared_ptr<member> &target) {
511  if (!target) {
512  if (last_exception) {
513  std::rethrow_exception(last_exception);
514  } else {
515  BOOST_THROW_EXCEPTION(
516  exception::no_data_member_in_cluster("ClientPNCounterProxy::invokeGetInternal",
517  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
518  }
519  }
520 
521  try {
522  auto request = protocol::codec::pncounter_add_encode(
523  get_name(), delta, getBeforeUpdate, observed_clock_.get()->entry_set(), target->get_uuid());
524  return invoke_on_member(request, target->get_uuid()).then(boost::launch::sync,
525  [=](boost::future<protocol::ClientMessage> f) {
526  try {
527  return get_and_update_timestamps(
528  std::move(f));
529  } catch (exception::hazelcast_ &e) {
530  return invoke_add_internal(delta,
531  getBeforeUpdate,
532  excluded_addresses,
533  std::current_exception(),
534  try_choose_a_new_target(
535  excluded_addresses,
536  target,
537  e)).get();
538  }
539  });
540  } catch (exception::hazelcast_ &e) {
541  return invoke_add_internal(delta, getBeforeUpdate, excluded_addresses, std::current_exception(),
542  try_choose_a_new_target(excluded_addresses, target, e));
543  }
544  }
545 
546  int64_t PNCounterImpl::get_and_update_timestamps(boost::future<protocol::ClientMessage> f) {
547  auto msg = f.get();
548  auto value = msg.get_first_fixed_sized_field<int64_t>();
549  // skip replica count
550  msg.get<int32_t>();
551  update_observed_replica_timestamps(msg.get<impl::vector_clock::timestamp_vector>());
552  return value;
553  }
554 
555  void PNCounterImpl::update_observed_replica_timestamps(
556  const impl::vector_clock::timestamp_vector &received_logical_timestamps) {
557  std::shared_ptr<impl::vector_clock> received = to_vector_clock(received_logical_timestamps);
558  for (;;) {
559  std::shared_ptr<impl::vector_clock> currentClock = this->observed_clock_;
560  if (currentClock->is_after(*received)) {
561  break;
562  }
563  if (observed_clock_.compare_and_set(currentClock, received)) {
564  break;
565  }
566  }
567  }
568 
569  std::shared_ptr<impl::vector_clock> PNCounterImpl::to_vector_clock(
570  const impl::vector_clock::timestamp_vector &replica_logical_timestamps) {
571  return std::shared_ptr<impl::vector_clock>(
572  new impl::vector_clock(replica_logical_timestamps));
573  }
574 
575  boost::shared_ptr<member> PNCounterImpl::get_current_target_replica_address() {
576  return current_target_replica_address_.load();
577  }
578 
579  IListImpl::IListImpl(const std::string &instance_name, spi::ClientContext *context)
580  : ProxyImpl("hz:impl:listService", instance_name, context) {
581  serialization::pimpl::data key_data = get_context().get_serialization_service().to_data<std::string>(
582  &instance_name);
583  partition_id_ = get_partition_id(key_data);
584  }
585 
586  boost::future<bool> IListImpl::remove_item_listener(boost::uuids::uuid registration_id) {
587  return get_context().get_client_listener_service().deregister_listener(registration_id);
588  }
589 
590  boost::future<int> IListImpl::size() {
591  auto request = protocol::codec::list_size_encode(get_name());
592  return invoke_and_get_future<int>(request, partition_id_);
593  }
594 
595  boost::future<bool> IListImpl::is_empty() {
596  auto request = protocol::codec::list_isempty_encode(get_name());
597  return invoke_and_get_future<bool>(request, partition_id_);
598  }
599 
600  boost::future<bool> IListImpl::contains(const serialization::pimpl::data &element) {
601  auto request = protocol::codec::list_contains_encode(get_name(), element);
602  return invoke_and_get_future<bool>(request, partition_id_);
603  }
604 
605  boost::future<std::vector<serialization::pimpl::data>> IListImpl::to_array_data() {
606  auto request = protocol::codec::list_getall_encode(get_name());
607  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
608  }
609 
610  boost::future<bool> IListImpl::add(const serialization::pimpl::data &element) {
611  auto request = protocol::codec::list_add_encode(get_name(), element);
612  return invoke_and_get_future<bool>(request, partition_id_);
613  }
614 
615  boost::future<bool> IListImpl::remove(const serialization::pimpl::data &element) {
616  auto request = protocol::codec::list_remove_encode(get_name(), element);
617  return invoke_and_get_future<bool>(request, partition_id_);
618  }
619 
620  boost::future<bool> IListImpl::contains_all_data(const std::vector<serialization::pimpl::data> &elements) {
621  auto request = protocol::codec::list_containsall_encode(get_name(), elements);
622  return invoke_and_get_future<bool>(request, partition_id_);
623  }
624 
625  boost::future<bool> IListImpl::add_all_data(const std::vector<serialization::pimpl::data> &elements) {
626  auto request = protocol::codec::list_addall_encode(get_name(), elements);
627  return invoke_and_get_future<bool>(request, partition_id_);
628  }
629 
630  boost::future<bool> IListImpl::add_all_data(int index, const std::vector<serialization::pimpl::data> &elements) {
631  auto request = protocol::codec::list_addallwithindex_encode(get_name(), index,
632  elements);
633  return invoke_and_get_future<bool>(request, partition_id_);
634  }
635 
636  boost::future<bool> IListImpl::remove_all_data(const std::vector<serialization::pimpl::data> &elements) {
637  auto request = protocol::codec::list_compareandremoveall_encode(get_name(), elements);
638  return invoke_and_get_future<bool>(request, partition_id_);
639  }
640 
641  boost::future<bool> IListImpl::retain_all_data(const std::vector<serialization::pimpl::data> &elements) {
642  auto request = protocol::codec::list_compareandretainall_encode(get_name(), elements);
643  return invoke_and_get_future<bool>(request, partition_id_);
644  }
645 
646  boost::future<void> IListImpl::clear() {
647  auto request = protocol::codec::list_clear_encode(get_name());
648  return to_void_future(invoke_on_partition(request, partition_id_));
649  }
650 
651  boost::future<boost::optional<serialization::pimpl::data>> IListImpl::get_data(int index) {
652  auto request = protocol::codec::list_get_encode(get_name(), index);
653  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
654  }
655 
656  boost::future<boost::optional<serialization::pimpl::data>> IListImpl::set_data(int index,
657  const serialization::pimpl::data &element) {
658  auto request = protocol::codec::list_set_encode(get_name(), index, element);
659  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
660  }
661 
662  boost::future<void> IListImpl::add(int index, const serialization::pimpl::data &element) {
663  auto request = protocol::codec::list_addwithindex_encode(get_name(), index, element);
664  return to_void_future(invoke_on_partition(request, partition_id_));
665  }
666 
667  boost::future<boost::optional<serialization::pimpl::data>> IListImpl::remove_data(int index) {
668  auto request = protocol::codec::list_removewithindex_encode(get_name(), index);
669  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
670  }
671 
672  boost::future<int> IListImpl::index_of(const serialization::pimpl::data &element) {
673  auto request = protocol::codec::list_indexof_encode(get_name(), element);
674  return invoke_and_get_future<int>(request, partition_id_);
675  }
676 
677  boost::future<int> IListImpl::last_index_of(const serialization::pimpl::data &element) {
678  auto request = protocol::codec::list_lastindexof_encode(get_name(), element);
679  return invoke_and_get_future<int>(request, partition_id_);
680  }
681 
682  boost::future<std::vector<serialization::pimpl::data>> IListImpl::sub_list_data(int from_index, int to_index) {
683  auto request = protocol::codec::list_sub_encode(get_name(), from_index, to_index);
684  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
685  }
686 
687  std::shared_ptr<spi::impl::ListenerMessageCodec> IListImpl::create_item_listener_codec(bool include_value) {
688  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
689  new ListListenerMessageCodec(get_name(), include_value));
690  }
691 
692  IListImpl::ListListenerMessageCodec::ListListenerMessageCodec(std::string name,
693  bool include_value) : name_(std::move(name)),
694  include_value_(
695  include_value) {}
696 
697  protocol::ClientMessage
698  IListImpl::ListListenerMessageCodec::encode_add_request(bool local_only) const {
699  return protocol::codec::list_addlistener_encode(name_, include_value_, local_only);
700  }
701 
702  protocol::ClientMessage
703  IListImpl::ListListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id) const {
704  return protocol::codec::list_removelistener_encode(name_, real_registration_id);
705  }
706 
707  flake_id_generator_impl::Block::Block(IdBatch &&id_batch, std::chrono::milliseconds validity)
708  : id_batch_(id_batch), invalid_since_(std::chrono::steady_clock::now() + validity), num_returned_(0) {}
709 
710  int64_t flake_id_generator_impl::Block::next() {
711  if (invalid_since_ <= std::chrono::steady_clock::now()) {
712  return INT64_MIN;
713  }
714  int32_t index;
715  do {
716  index = num_returned_;
717  if (index == id_batch_.get_batch_size()) {
718  return INT64_MIN;
719  }
720  } while (!num_returned_.compare_exchange_strong(index, index + 1));
721 
722  return id_batch_.get_base() + index * id_batch_.get_increment();
723  }
724 
725  flake_id_generator_impl::IdBatch::IdIterator flake_id_generator_impl::IdBatch::endOfBatch;
726 
727  const int64_t flake_id_generator_impl::IdBatch::get_base() const {
728  return base_;
729  }
730 
731  const int64_t flake_id_generator_impl::IdBatch::get_increment() const {
732  return increment_;
733  }
734 
735  const int32_t flake_id_generator_impl::IdBatch::get_batch_size() const {
736  return batch_size_;
737  }
738 
739  flake_id_generator_impl::IdBatch::IdBatch(int64_t base, int64_t increment, int32_t batch_size)
740  : base_(base), increment_(increment), batch_size_(batch_size) {}
741 
742  flake_id_generator_impl::IdBatch::IdIterator &flake_id_generator_impl::IdBatch::end() {
743  return endOfBatch;
744  }
745 
746  flake_id_generator_impl::IdBatch::IdIterator flake_id_generator_impl::IdBatch::iterator() {
747  return flake_id_generator_impl::IdBatch::IdIterator(base_, increment_, batch_size_);
748  }
749 
750  flake_id_generator_impl::IdBatch::IdIterator::IdIterator(int64_t base2, const int64_t increment, int32_t remaining) : base2_(
751  base2), increment_(increment), remaining_(remaining) {}
752 
753  bool flake_id_generator_impl::IdBatch::IdIterator::operator==(const flake_id_generator_impl::IdBatch::IdIterator &rhs) const {
754  return base2_ == rhs.base2_ && increment_ == rhs.increment_ && remaining_ == rhs.remaining_;
755  }
756 
757  bool flake_id_generator_impl::IdBatch::IdIterator::operator!=(const flake_id_generator_impl::IdBatch::IdIterator &rhs) const {
758  return !(rhs == *this);
759  }
760 
761  flake_id_generator_impl::IdBatch::IdIterator::IdIterator() : base2_(-1), increment_(-1), remaining_(-1) {
762  }
763 
764  flake_id_generator_impl::IdBatch::IdIterator &flake_id_generator_impl::IdBatch::IdIterator::operator++() {
765  if (remaining_ == 0) {
766  return flake_id_generator_impl::IdBatch::end();
767  }
768 
769  --remaining_;
770 
771  base2_ += increment_;
772 
773  return *this;
774  }
775 
776 
777  flake_id_generator_impl::flake_id_generator_impl(const std::string &service_name, const std::string &object_name,
778  spi::ClientContext *context)
779  : ProxyImpl(service_name, object_name, context), block_(nullptr) {
780  auto config = context->get_client_config().find_flake_id_generator_config(object_name);
781  batch_size_ = config->get_prefetch_count();
782  validity_ = config->get_prefetch_validity_duration();
783  }
784 
785  int64_t flake_id_generator_impl::new_id_internal() {
786  auto b = block_.load();
787  if (b) {
788  int64_t res = b->next();
789  if (res != INT64_MIN) {
790  return res;
791  }
792  }
793 
794  throw std::overflow_error("");
795  }
796 
797  boost::future<int64_t> flake_id_generator_impl::new_id() {
798  try {
799  return boost::make_ready_future(new_id_internal());
800  } catch (std::overflow_error &) {
801  return new_id_batch(batch_size_).then(boost::launch::sync,
802  [=](boost::future<flake_id_generator_impl::IdBatch> f) {
803  auto newBlock = boost::make_shared<Block>(f.get(),
804  validity_);
805  auto value = newBlock->next();
806  auto b = block_.load();
807  block_.compare_exchange_strong(b, newBlock);
808  return value;
809  });
810  }
811  }
812 
813  boost::future<flake_id_generator_impl::IdBatch> flake_id_generator_impl::new_id_batch(int32_t size) {
814  auto request = protocol::codec::flakeidgenerator_newidbatch_encode(
815  get_name(), size);
816  return invoke(request).then(boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
817  auto msg = f.get();
818  msg.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN);
819 
820  auto base = msg.get<int64_t>();
821  auto increment = msg.get<int64_t>();
822  auto batch_size = msg.get<int32_t>();
823  return flake_id_generator_impl::IdBatch(base, increment, batch_size);
824  });
825  }
826 
827  IQueueImpl::IQueueImpl(const std::string &instance_name, spi::ClientContext *context)
828  : ProxyImpl("hz:impl:queueService", instance_name, context) {
829  serialization::pimpl::data data = get_context().get_serialization_service().to_data<std::string>(
830  &instance_name);
831  partition_id_ = get_partition_id(data);
832  }
833 
834  boost::future<bool> IQueueImpl::remove_item_listener(
835  boost::uuids::uuid registration_id) {
836  return get_context().get_client_listener_service().deregister_listener(registration_id);
837  }
838 
839  boost::future<bool> IQueueImpl::offer(const serialization::pimpl::data &element, std::chrono::milliseconds timeout) {
840  auto request = protocol::codec::queue_offer_encode(get_name(), element,
841  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
842  return invoke_and_get_future<bool>(request, partition_id_);
843  }
844 
845  boost::future<void> IQueueImpl::put(const serialization::pimpl::data &element) {
846  auto request = protocol::codec::queue_put_encode(get_name(), element);
847  return to_void_future(invoke_on_partition(request, partition_id_));
848  }
849 
850  boost::future<boost::optional<serialization::pimpl::data>> IQueueImpl::poll_data(std::chrono::milliseconds timeout) {
851  auto request = protocol::codec::queue_poll_encode(get_name(), std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
852  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
853  }
854 
855  boost::future<int> IQueueImpl::remaining_capacity() {
856  auto request = protocol::codec::queue_remainingcapacity_encode(get_name());
857  return invoke_and_get_future<int>(request, partition_id_);
858  }
859 
860  boost::future<bool> IQueueImpl::remove(const serialization::pimpl::data &element) {
861  auto request = protocol::codec::queue_remove_encode(get_name(), element);
862  return invoke_and_get_future<bool>(request, partition_id_);
863  }
864 
865  boost::future<bool> IQueueImpl::contains(const serialization::pimpl::data &element) {
866  auto request = protocol::codec::queue_contains_encode(get_name(), element);
867  return invoke_and_get_future<bool>(request, partition_id_);
868  }
869 
870  boost::future<std::vector<serialization::pimpl::data>> IQueueImpl::drain_to_data(size_t max_elements) {
871  auto request = protocol::codec::queue_draintomaxsize_encode(get_name(), (int32_t) max_elements);
872 
873  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
874  }
875 
876  boost::future<std::vector<serialization::pimpl::data>> IQueueImpl::drain_to_data() {
877  auto request = protocol::codec::queue_drainto_encode(get_name());
878  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
879  }
880 
881  boost::future<boost::optional<serialization::pimpl::data>> IQueueImpl::take_data() {
882  auto request = protocol::codec::queue_take_encode(get_name());
883  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
884  }
885 
886  boost::future<boost::optional<serialization::pimpl::data>> IQueueImpl::peek_data() {
887  auto request = protocol::codec::queue_peek_encode(get_name());
888  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
889  }
890 
891  boost::future<int> IQueueImpl::size() {
892  auto request = protocol::codec::queue_size_encode(get_name());
893  return invoke_and_get_future<int>(request, partition_id_);
894  }
895 
896  boost::future<bool> IQueueImpl::is_empty() {
897  auto request = protocol::codec::queue_isempty_encode(get_name());
898  return invoke_and_get_future<bool>(request, partition_id_);
899  }
900 
901  boost::future<std::vector<serialization::pimpl::data>> IQueueImpl::to_array_data() {
902  auto request = protocol::codec::queue_iterator_encode(get_name());
903  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
904  }
905 
906  boost::future<bool> IQueueImpl::contains_all_data(const std::vector<serialization::pimpl::data> &elements) {
907  auto request = protocol::codec::queue_containsall_encode(get_name(), elements);
908  return invoke_and_get_future<bool>(request, partition_id_);
909  }
910 
911  boost::future<bool> IQueueImpl::add_all_data(const std::vector<serialization::pimpl::data> &elements) {
912  auto request = protocol::codec::queue_addall_encode(get_name(), elements);
913  return invoke_and_get_future<bool>(request, partition_id_);
914  }
915 
916  boost::future<bool> IQueueImpl::remove_all_data(const std::vector<serialization::pimpl::data> &elements) {
917  auto request = protocol::codec::queue_compareandremoveall_encode(get_name(), elements);
918  return invoke_and_get_future<bool>(request, partition_id_);
919  }
920 
921  boost::future<bool> IQueueImpl::retain_all_data(const std::vector<serialization::pimpl::data> &elements) {
922  auto request = protocol::codec::queue_compareandretainall_encode(get_name(), elements);
923  return invoke_and_get_future<bool>(request, partition_id_);
924  }
925 
926  boost::future<void> IQueueImpl::clear() {
927  auto request = protocol::codec::queue_clear_encode(get_name());
928  return to_void_future(invoke_on_partition(request, partition_id_));
929  }
930 
931  std::shared_ptr<spi::impl::ListenerMessageCodec>
932  IQueueImpl::create_item_listener_codec(bool include_value) {
933  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
934  new QueueListenerMessageCodec(get_name(), include_value));
935  }
936 
937  IQueueImpl::QueueListenerMessageCodec::QueueListenerMessageCodec(std::string name,
938  bool include_value) : name_(std::move(name)),
939  include_value_(
940  include_value) {}
941 
942  protocol::ClientMessage
943  IQueueImpl::QueueListenerMessageCodec::encode_add_request(bool local_only) const {
944  return protocol::codec::queue_addlistener_encode(name_, include_value_, local_only);
945  }
946 
947  protocol::ClientMessage
948  IQueueImpl::QueueListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id) const {
949  return protocol::codec::queue_removelistener_encode(name_, real_registration_id);
950  }
951 
952  ProxyImpl::ProxyImpl(const std::string &service_name, const std::string &object_name,
953  spi::ClientContext *context)
954  : ClientProxy(object_name, service_name, *context), SerializingProxy(*context, object_name) {}
955 
956  ProxyImpl::~ProxyImpl() = default;
957 
958  SerializingProxy::SerializingProxy(spi::ClientContext &context, const std::string &object_name)
959  : serialization_service_(context.get_serialization_service()),
960  partition_service_(context.get_partition_service()), object_name_(object_name), client_context_(context) {}
961 
962  int SerializingProxy::get_partition_id(const serialization::pimpl::data &key) {
963  return partition_service_.get_partition_id(key);
964  }
965 
966  boost::future<protocol::ClientMessage> SerializingProxy::invoke_on_partition(
967  protocol::ClientMessage &request, int partition_id) {
968  try {
969  return spi::impl::ClientInvocation::create(client_context_, std::make_shared<protocol::ClientMessage>(std::move(request)), object_name_, partition_id)->invoke();
970  } catch (exception::iexception &) {
971  util::exception_util::rethrow(std::current_exception());
972  return boost::make_ready_future(protocol::ClientMessage(0));
973  }
974  }
975 
976  boost::future<protocol::ClientMessage> SerializingProxy::invoke(protocol::ClientMessage &request) {
977  try {
978  return spi::impl::ClientInvocation::create(client_context_, std::make_shared<protocol::ClientMessage>(std::move(request)), object_name_)->invoke();
979  } catch (exception::iexception &) {
980  util::exception_util::rethrow(std::current_exception());
981  return boost::make_ready_future(protocol::ClientMessage(0));
982  }
983  }
984 
985  boost::future<protocol::ClientMessage> SerializingProxy::invoke_on_connection(protocol::ClientMessage &request,
986  std::shared_ptr<connection::Connection> connection) {
987  try {
988  return spi::impl::ClientInvocation::create(client_context_, std::make_shared<protocol::ClientMessage>(std::move(request)), object_name_, connection)->invoke();
989  } catch (exception::iexception &) {
990  util::exception_util::rethrow(std::current_exception());
991  return boost::make_ready_future(protocol::ClientMessage(0));
992  }
993  }
994 
995  boost::future<protocol::ClientMessage>
996  SerializingProxy::invoke_on_key_owner(protocol::ClientMessage &request,
997  const serialization::pimpl::data &key_data) {
998  try {
999  return invoke_on_partition(request, get_partition_id(key_data));
1000  } catch (exception::iexception &) {
1001  util::exception_util::rethrow(std::current_exception());
1002  return boost::make_ready_future(protocol::ClientMessage(0));
1003  }
1004  }
1005 
1006  boost::future<protocol::ClientMessage>
1007  SerializingProxy::invoke_on_member(protocol::ClientMessage &request, boost::uuids::uuid uuid) {
1008  try {
1009  auto invocation = spi::impl::ClientInvocation::create(client_context_, std::make_shared<protocol::ClientMessage>(std::move(request)), object_name_, uuid);
1010  return invocation->invoke();
1011  } catch (exception::iexception &) {
1012  util::exception_util::rethrow(std::current_exception());
1013  return boost::make_ready_future(protocol::ClientMessage(0));
1014  }
1015  }
1016 
1017  template<>
1018  boost::future<boost::optional<serialization::pimpl::data>>
1019  SerializingProxy::invoke_and_get_future(protocol::ClientMessage &request) {
1020  return decode_optional_var_sized<serialization::pimpl::data>(invoke(request));
1021  }
1022 
1023  template<>
1024  boost::future<boost::optional<map::data_entry_view>>
1025  SerializingProxy::invoke_and_get_future(protocol::ClientMessage &request, const serialization::pimpl::data &key) {
1026  return decode_optional_var_sized<map::data_entry_view>(invoke_on_key_owner(request, key));
1027  }
1028 
1029  template<>
1030  boost::future<boost::optional<serialization::pimpl::data>>
1031  SerializingProxy::invoke_and_get_future(protocol::ClientMessage &request, int partition_id) {
1032  return decode_optional_var_sized<serialization::pimpl::data>(invoke_on_partition(request, partition_id));
1033  }
1034 
1035  template<>
1036  boost::future<boost::optional<serialization::pimpl::data>>
1037  SerializingProxy::invoke_and_get_future(protocol::ClientMessage &request, const serialization::pimpl::data &key) {
1038  return decode_optional_var_sized<serialization::pimpl::data>(invoke_on_key_owner(request, key));
1039  }
1040 
1041  PartitionSpecificClientProxy::PartitionSpecificClientProxy(const std::string &service_name,
1042  const std::string &object_name,
1043  spi::ClientContext *context) : ProxyImpl(
1044  service_name, object_name, context), partition_id_(-1) {}
1045 
1046  void PartitionSpecificClientProxy::on_initialize() {
1047  std::string partitionKey = internal::partition::strategy::StringPartitioningStrategy::get_partition_key(
1048  name_);
1049  partition_id_ = get_context().get_partition_service().get_partition_id(to_data<std::string>(partitionKey));
1050  }
1051 
1052  IMapImpl::IMapImpl(const std::string &instance_name, spi::ClientContext *context)
1053  : ProxyImpl("hz:impl:mapService", instance_name, context) {}
1054 
1055  boost::future<bool> IMapImpl::contains_key(const serialization::pimpl::data &key) {
1056  auto request = protocol::codec::map_containskey_encode(get_name(), key, util::get_current_thread_id());
1057  return invoke_and_get_future<bool>(request, key);
1058  }
1059 
1060  boost::future<bool> IMapImpl::contains_value(const serialization::pimpl::data &value) {
1061  auto request = protocol::codec::map_containsvalue_encode(get_name(), value);
1062  return invoke_and_get_future<bool>(request);
1063  }
1064 
1065  boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::get_data(const serialization::pimpl::data &key) {
1066  auto request = protocol::codec::map_get_encode(get_name(), key, util::get_current_thread_id());
1067  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1068  }
1069 
1070  boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::remove_data(const serialization::pimpl::data &key) {
1071  auto request = protocol::codec::map_remove_encode(get_name(), key, util::get_current_thread_id());
1072  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1073  }
1074 
1075  boost::future<bool> IMapImpl::remove(const serialization::pimpl::data &key, const serialization::pimpl::data &value) {
1076  auto request = protocol::codec::map_removeifsame_encode(get_name(), key, value,
1077  util::get_current_thread_id());
1078  return invoke_and_get_future<bool>(request, key);
1079  }
1080 
1081  boost::future<protocol::ClientMessage> IMapImpl::remove_all(const serialization::pimpl::data &predicate_data) {
1082  auto request = protocol::codec::map_removeall_encode(get_name(), predicate_data);
1083  return invoke(request);
1084  }
1085 
1086  boost::future<protocol::ClientMessage> IMapImpl::delete_entry(const serialization::pimpl::data &key) {
1087  auto request = protocol::codec::map_delete_encode(get_name(), key, util::get_current_thread_id());
1088  return invoke_on_partition(request, get_partition_id(key));
1089  }
1090 
1091  boost::future<protocol::ClientMessage> IMapImpl::flush() {
1092  auto request = protocol::codec::map_flush_encode(get_name());
1093  return invoke(request);
1094  }
1095 
1096  boost::future<bool> IMapImpl::try_remove(const serialization::pimpl::data &key, std::chrono::milliseconds timeout) {
1097  auto request = protocol::codec::map_tryremove_encode(get_name(), key,
1098  util::get_current_thread_id(),
1099  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1100 
1101  return invoke_and_get_future<bool>(request, key);
1102  }
1103 
1104  boost::future<bool> IMapImpl::try_put(const serialization::pimpl::data &key, const serialization::pimpl::data &value,
1105  std::chrono::milliseconds timeout) {
1106  auto request = protocol::codec::map_tryput_encode(get_name(), key, value,
1107  util::get_current_thread_id(),
1108  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1109 
1110  return invoke_and_get_future<bool>(request, key);
1111  }
1112 
1113  boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::put_data(const serialization::pimpl::data &key,
1114  const serialization::pimpl::data &value,
1115  std::chrono::milliseconds ttl) {
1116  auto request = protocol::codec::map_put_encode(get_name(), key, value,
1117  util::get_current_thread_id(),
1118  std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1119  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1120  }
1121 
1122  boost::future<protocol::ClientMessage> IMapImpl::put_transient(const serialization::pimpl::data &key, const serialization::pimpl::data &value,
1123  std::chrono::milliseconds ttl) {
1124  auto request = protocol::codec::map_puttransient_encode(get_name(), key, value,
1125  util::get_current_thread_id(),
1126  std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1127  return invoke_on_partition(request, get_partition_id(key));
1128  }
1129 
1130  boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::put_if_absent_data(const serialization::pimpl::data &key,
1131  const serialization::pimpl::data &value,
1132  std::chrono::milliseconds ttl) {
1133  auto request = protocol::codec::map_putifabsent_encode(get_name(), key, value,
1134  util::get_current_thread_id(),
1135  std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1136  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1137  }
1138 
1139  boost::future<bool> IMapImpl::replace(const serialization::pimpl::data &key, const serialization::pimpl::data &old_value,
1140  const serialization::pimpl::data &new_value) {
1141  auto request = protocol::codec::map_replaceifsame_encode(get_name(), key, old_value,
1142  new_value,
1143  util::get_current_thread_id());
1144 
1145  return invoke_and_get_future<bool>(request, key);
1146  }
1147 
1148  boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::replace_data(const serialization::pimpl::data &key,
1149  const serialization::pimpl::data &value) {
1150  auto request = protocol::codec::map_replace_encode(get_name(), key, value,
1151  util::get_current_thread_id());
1152 
1153  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1154  }
1155 
1156  boost::future<protocol::ClientMessage>
1157  IMapImpl::set(const serialization::pimpl::data &key, const serialization::pimpl::data &value,
1158  std::chrono::milliseconds ttl) {
1159  auto request = protocol::codec::map_set_encode(get_name(), key, value,
1160  util::get_current_thread_id(),
1161  std::chrono::duration_cast<std::chrono::milliseconds>(
1162  ttl).count());
1163  return invoke_on_partition(request, get_partition_id(key));
1164  }
1165 
1166  boost::future<protocol::ClientMessage> IMapImpl::lock(const serialization::pimpl::data &key) {
1167  return lock(key, std::chrono::milliseconds(-1));
1168  }
1169 
1170  boost::future<protocol::ClientMessage> IMapImpl::lock(const serialization::pimpl::data &key, std::chrono::milliseconds lease_time) {
1171  auto request = protocol::codec::map_lock_encode(get_name(), key, util::get_current_thread_id(),
1172  std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1173  lock_reference_id_generator_->get_next_reference_id());
1174  return invoke_on_partition(request, get_partition_id(key));
1175  }
1176 
1177  boost::future<bool> IMapImpl::is_locked(const serialization::pimpl::data &key) {
1178  auto request = protocol::codec::map_islocked_encode(get_name(), key);
1179 
1180  return invoke_and_get_future<bool>(request, key);
1181  }
1182 
1183  boost::future<bool> IMapImpl::try_lock(const serialization::pimpl::data &key, std::chrono::milliseconds timeout) {
1184  return try_lock(key, timeout, std::chrono::milliseconds(-1));
1185  }
1186 
1187  boost::future<bool>
1188  IMapImpl::try_lock(const serialization::pimpl::data &key, std::chrono::milliseconds timeout,
1189  std::chrono::milliseconds lease_time) {
1190  auto request = protocol::codec::map_trylock_encode(get_name(), key, util::get_current_thread_id(),
1191  std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1192  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
1193  lock_reference_id_generator_->get_next_reference_id());
1194  return invoke_and_get_future<bool>(request, key);
1195  }
1196 
1197  boost::future<protocol::ClientMessage> IMapImpl::unlock(const serialization::pimpl::data &key) {
1198  auto request = protocol::codec::map_unlock_encode(get_name(), key, util::get_current_thread_id(),
1199  lock_reference_id_generator_->get_next_reference_id());
1200  return invoke_on_partition(request, get_partition_id(key));
1201  }
1202 
1203  boost::future<protocol::ClientMessage> IMapImpl::force_unlock(const serialization::pimpl::data &key) {
1204  auto request = protocol::codec::map_forceunlock_encode(get_name(), key,
1205  lock_reference_id_generator_->get_next_reference_id());
1206  return invoke_on_partition(request, get_partition_id(key));
1207  }
1208 
1209  boost::future<std::string> IMapImpl::add_interceptor(const serialization::pimpl::data &interceptor) {
1210  auto request = protocol::codec::map_addinterceptor_encode(get_name(), interceptor);
1211  return invoke_and_get_future<std::string>(request);
1212  }
1213 
1214  // TODO: We can use generic template Listener instead of impl::BaseEventHandler to prevent the virtual function calls
1215  boost::future<boost::uuids::uuid> IMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler, bool include_value, int32_t listener_flags) {
1216  return register_listener(create_map_entry_listener_codec(include_value, listener_flags), std::move(entry_event_handler));
1217  }
1218 
1219  boost::future<boost::uuids::uuid>
1220  IMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1221  serialization::pimpl::data &&predicate, bool include_value,
1222  int32_t listener_flags) {
1223  return register_listener(
1224  create_map_entry_listener_codec(include_value, std::move(predicate), listener_flags),
1225  std::move(entry_event_handler));
1226  }
1227 
1228  boost::future<bool> IMapImpl::remove_entry_listener(boost::uuids::uuid registration_id) {
1229  return get_context().get_client_listener_service().deregister_listener(registration_id);
1230  }
1231 
1232  boost::future<boost::uuids::uuid>
1233  IMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1234  bool include_value, serialization::pimpl::data &&key, int32_t listener_flags) {
1235  return register_listener(create_map_entry_listener_codec(include_value, listener_flags, std::move(key)),
1236  std::move(entry_event_handler));
1237  }
1238 
1239  boost::future<boost::optional<map::data_entry_view>> IMapImpl::get_entry_view_data(const serialization::pimpl::data &key) {
1240  auto request = protocol::codec::map_getentryview_encode(get_name(), key,
1241  util::get_current_thread_id());
1242  return invoke_and_get_future<boost::optional<map::data_entry_view>>(request, key);
1243  }
1244 
1245  boost::future<bool> IMapImpl::evict(const serialization::pimpl::data &key) {
1246  auto request = protocol::codec::map_evict_encode(get_name(), key, util::get_current_thread_id());
1247  return invoke_and_get_future<bool>(request, key);
1248  }
1249 
1250  boost::future<protocol::ClientMessage> IMapImpl::evict_all() {
1251  auto request = protocol::codec::map_evictall_encode(get_name());
1252  return invoke(request);
1253  }
1254 
1255  boost::future<EntryVector>
1256  IMapImpl::get_all_data(int partition_id, const std::vector<serialization::pimpl::data> &keys) {
1257  auto request = protocol::codec::map_getall_encode(get_name(), keys);
1258  return invoke_and_get_future<EntryVector>(request, partition_id);
1259  }
1260 
1261  boost::future<std::vector<serialization::pimpl::data>> IMapImpl::key_set_data() {
1262  auto request = protocol::codec::map_keyset_encode(get_name());
1263  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
1264  }
1265 
1266  boost::future<std::vector<serialization::pimpl::data>> IMapImpl::key_set_data(const serialization::pimpl::data &predicate) {
1267  auto request = protocol::codec::map_keysetwithpredicate_encode(get_name(), predicate);
1268  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
1269  }
1270 
1271  boost::future<std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>> IMapImpl::key_set_for_paging_predicate_data(
1272  protocol::codec::holder::paging_predicate_holder const & predicate) {
1273  auto request = protocol::codec::map_keysetwithpagingpredicate_encode(get_name(), predicate);
1274  return invoke(request).then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1275  return get_paging_predicate_response<std::vector<serialization::pimpl::data>>(std::move(f));
1276  });
1277  }
1278 
1279  boost::future<EntryVector> IMapImpl::entry_set_data() {
1280  auto request = protocol::codec::map_entryset_encode(get_name());
1281  return invoke_and_get_future<EntryVector>(request);
1282  }
1283 
1284  boost::future<EntryVector> IMapImpl::entry_set_data(const serialization::pimpl::data &predicate) {
1285  auto request = protocol::codec::map_entrieswithpredicate_encode(get_name(), predicate);
1286  return invoke_and_get_future<EntryVector>(request);
1287  }
1288 
1289  boost::future<std::pair<EntryVector, query::anchor_data_list>> IMapImpl::entry_set_for_paging_predicate_data(
1290  protocol::codec::holder::paging_predicate_holder const & predicate) {
1291  auto request = protocol::codec::map_entrieswithpagingpredicate_encode(get_name(), predicate);
1292  return invoke(request).then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1293  return get_paging_predicate_response<EntryVector>(std::move(f));
1294  });
1295  }
1296 
1297  boost::future<std::vector<serialization::pimpl::data>> IMapImpl::values_data() {
1298  auto request = protocol::codec::map_values_encode(get_name());
1299  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
1300  }
1301 
1302  boost::future<std::vector<serialization::pimpl::data>> IMapImpl::values_data(const serialization::pimpl::data &predicate) {
1303  auto request = protocol::codec::map_valueswithpredicate_encode(get_name(), predicate);
1304  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
1305  }
1306 
1307  boost::future<std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
1308  IMapImpl::values_for_paging_predicate_data(protocol::codec::holder::paging_predicate_holder const & predicate) {
1309  auto request = protocol::codec::map_valueswithpagingpredicate_encode(get_name(), predicate);
1310  return invoke(request).then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1311  return get_paging_predicate_response<std::vector<serialization::pimpl::data>>(std::move(f));
1312  });
1313  }
1314 
1315  boost::future<protocol::ClientMessage> IMapImpl::add_index_data(const config::index_config &config) {
1316  auto request = protocol::codec::map_addindex_encode(get_name(), config);
1317  return invoke(request);
1318  }
1319 
1320  boost::future<int> IMapImpl::size() {
1321  auto request = protocol::codec::map_size_encode(get_name());
1322  return invoke_and_get_future<int>(request);
1323  }
1324 
1325  boost::future<bool> IMapImpl::is_empty() {
1326  auto request = protocol::codec::map_isempty_encode(get_name());
1327  return invoke_and_get_future<bool>(request);
1328  }
1329 
1330  boost::future<protocol::ClientMessage> IMapImpl::put_all_data(int partition_id, const EntryVector &entries) {
1331  auto request = protocol::codec::map_putall_encode(get_name(), entries, true);
1332  return invoke_on_partition(request, partition_id);
1333  }
1334 
1335  boost::future<protocol::ClientMessage> IMapImpl::clear_data() {
1336  auto request = protocol::codec::map_clear_encode(get_name());
1337  return invoke(request);
1338  }
1339 
1340  boost::future<boost::optional<serialization::pimpl::data>>
1341  IMapImpl::execute_on_key_data(const serialization::pimpl::data &key,
1342  const serialization::pimpl::data &processor) {
1343  auto request = protocol::codec::map_executeonkey_encode(get_name(),
1344  processor,
1345  key,
1346  util::get_current_thread_id());
1347  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, get_partition_id(key));
1348  }
1349 
1350  boost::future<boost::optional<serialization::pimpl::data>>
1351  IMapImpl::submit_to_key_data(const serialization::pimpl::data &key,
1352  const serialization::pimpl::data &processor) {
1353  auto request = protocol::codec::map_submittokey_encode(get_name(),
1354  processor,
1355  key,
1356  util::get_current_thread_id());
1357  return invoke_on_partition(request, get_partition_id(key)).then(boost::launch::sync,
1358  [](boost::future<protocol::ClientMessage> f) {
1359  auto msg = f.get();
1360  msg.skip_frame();
1361  return msg.get_nullable<serialization::pimpl::data>();
1362  });
1363  }
1364 
1365  boost::future<EntryVector> IMapImpl::execute_on_keys_data(const std::vector<serialization::pimpl::data> &keys,
1366  const serialization::pimpl::data &processor) {
1367  auto request = protocol::codec::map_executeonkeys_encode(get_name(), processor, keys);
1368  return invoke_and_get_future<EntryVector>(request);
1369  }
1370 
1371  boost::future<protocol::ClientMessage> IMapImpl::remove_interceptor(const std::string &id) {
1372  auto request = protocol::codec::map_removeinterceptor_encode(get_name(), id);
1373  return invoke(request);
1374  }
1375 
1376  boost::future<EntryVector> IMapImpl::execute_on_entries_data(const serialization::pimpl::data &entry_processor) {
1377  auto request = protocol::codec::map_executeonallkeys_encode(
1378  get_name(), entry_processor);
1379  return invoke_and_get_future<EntryVector>(request);
1380 
1381  }
1382 
1383  boost::future<EntryVector>
1384  IMapImpl::execute_on_entries_data(const serialization::pimpl::data &entry_processor, const serialization::pimpl::data &predicate) {
1385  auto request = protocol::codec::map_executewithpredicate_encode(get_name(),
1386  entry_processor,
1387  predicate);
1388  return invoke_and_get_future<EntryVector>(request);
1389  }
1390 
1391 
1392  std::shared_ptr<spi::impl::ListenerMessageCodec>
1393  IMapImpl::create_map_entry_listener_codec(bool include_value, serialization::pimpl::data &&predicate,
1394  int32_t listener_flags) {
1395  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1396  new MapEntryListenerWithPredicateMessageCodec(get_name(), include_value, listener_flags, std::move(predicate)));
1397  }
1398 
1399  std::shared_ptr<spi::impl::ListenerMessageCodec>
1400  IMapImpl::create_map_entry_listener_codec(bool include_value, int32_t listener_flags) {
1401  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1402  new MapEntryListenerMessageCodec(get_name(), include_value, listener_flags));
1403  }
1404 
1405  std::shared_ptr<spi::impl::ListenerMessageCodec>
1406  IMapImpl::create_map_entry_listener_codec(bool include_value, int32_t listener_flags,
1407  serialization::pimpl::data &&key) {
1408  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1409  new MapEntryListenerToKeyCodec(get_name(), include_value, listener_flags, std::move(key)));
1410  }
1411 
1412  void IMapImpl::on_initialize() {
1413  ProxyImpl::on_initialize();
1414  lock_reference_id_generator_ = get_context().get_lock_reference_id_generator();
1415  }
1416 
1417  IMapImpl::MapEntryListenerMessageCodec::MapEntryListenerMessageCodec(std::string name,
1418  bool include_value,
1419  int32_t listener_flags) : name_(std::move(name)),
1420  include_value_(
1421  include_value),
1422  listener_flags_(
1423  listener_flags) {}
1424 
1425  protocol::ClientMessage
1426  IMapImpl::MapEntryListenerMessageCodec::encode_add_request(bool local_only) const {
1427  return protocol::codec::map_addentrylistener_encode(name_, include_value_,
1428  static_cast<int32_t>(listener_flags_),
1429  local_only);
1430  }
1431 
1432  protocol::ClientMessage
1433  IMapImpl::MapEntryListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id) const {
1434  return protocol::codec::map_removeentrylistener_encode(name_, real_registration_id);
1435  }
1436 
1437  protocol::ClientMessage
1438  IMapImpl::MapEntryListenerToKeyCodec::encode_add_request(bool local_only) const {
1439  return protocol::codec::map_addentrylistenertokey_encode(name_, key_, include_value_,
1440  static_cast<int32_t>(listener_flags_), local_only);
1441  }
1442 
1443  protocol::ClientMessage
1444  IMapImpl::MapEntryListenerToKeyCodec::encode_remove_request(boost::uuids::uuid real_registration_id) const {
1445  return protocol::codec::map_removeentrylistener_encode(name_, real_registration_id);
1446  }
1447 
1448  IMapImpl::MapEntryListenerToKeyCodec::MapEntryListenerToKeyCodec(std::string name, bool include_value,
1449  int32_t listener_flags,
1450  serialization::pimpl::data key)
1451  : name_(std::move(name)), include_value_(include_value), listener_flags_(listener_flags), key_(std::move(key)) {}
1452 
1453  IMapImpl::MapEntryListenerWithPredicateMessageCodec::MapEntryListenerWithPredicateMessageCodec(
1454  std::string name, bool include_value, int32_t listener_flags,
1455  serialization::pimpl::data &&predicate) : name_(std::move(name)), include_value_(include_value),
1456  listener_flags_(listener_flags), predicate_(std::move(predicate)) {}
1457 
1458  protocol::ClientMessage
1459  IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_add_request(bool local_only) const {
1460  return protocol::codec::map_addentrylistenerwithpredicate_encode(name_, predicate_,
1461  include_value_,
1462  static_cast<int32_t>(listener_flags_), local_only);
1463  }
1464 
1465  protocol::ClientMessage
1466  IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_remove_request(
1467  boost::uuids::uuid real_registration_id) const {
1468  return protocol::codec::map_removeentrylistener_encode(name_, real_registration_id);
1469  }
1470 
1471  TransactionalQueueImpl::TransactionalQueueImpl(const std::string &name,
1472  txn::TransactionProxy &transaction_proxy)
1473  : TransactionalObject(iqueue::SERVICE_NAME, name, transaction_proxy) {}
1474 
1475  boost::future<bool> TransactionalQueueImpl::offer(const serialization::pimpl::data &e, std::chrono::milliseconds timeout) {
1476  auto request = protocol::codec::transactionalqueue_offer_encode(
1477  get_name(), get_transaction_id(), util::get_current_thread_id(), e, std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1478 
1479  return invoke_and_get_future<bool>(request);
1480  }
1481 
1482  boost::future<boost::optional<serialization::pimpl::data>> TransactionalQueueImpl::poll_data(std::chrono::milliseconds timeout) {
1483  auto request = protocol::codec::transactionalqueue_poll_encode(
1484  get_name(), get_transaction_id(), util::get_current_thread_id(), std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1485 
1486  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request);
1487  }
1488 
1489  boost::future<int> TransactionalQueueImpl::size() {
1490  auto request = protocol::codec::transactionalqueue_size_encode(
1491  get_name(), get_transaction_id(), util::get_current_thread_id());
1492 
1493  return invoke_and_get_future<int>(request);
1494  }
1495 
1496  ISetImpl::ISetImpl(const std::string &instance_name, spi::ClientContext *client_context)
1497  : ProxyImpl(iset::SERVICE_NAME, instance_name, client_context) {
1498  serialization::pimpl::data key_data = get_context().get_serialization_service().to_data<std::string>(
1499  &instance_name);
1500  partition_id_ = get_partition_id(key_data);
1501  }
1502 
1503  boost::future<bool> ISetImpl::remove_item_listener(boost::uuids::uuid registration_id) {
1504  return get_context().get_client_listener_service().deregister_listener(registration_id);
1505  }
1506 
1507  boost::future<int> ISetImpl::size() {
1508  auto request = protocol::codec::set_size_encode(get_name());
1509  return invoke_and_get_future<int>(request, partition_id_);
1510  }
1511 
1512  boost::future<bool> ISetImpl::is_empty() {
1513  auto request = protocol::codec::set_isempty_encode(get_name());
1514  return invoke_and_get_future<bool>(request, partition_id_);
1515  }
1516 
1517  boost::future<bool> ISetImpl::contains(const serialization::pimpl::data &element) {
1518  auto request = protocol::codec::set_contains_encode(get_name(), element);
1519  return invoke_and_get_future<bool>(request, partition_id_);
1520  }
1521 
1522  boost::future<std::vector<serialization::pimpl::data>> ISetImpl::to_array_data() {
1523  auto request = protocol::codec::set_getall_encode(get_name());
1524  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
1525  }
1526 
1527  boost::future<bool> ISetImpl::add(const serialization::pimpl::data &element) {
1528  auto request = protocol::codec::set_add_encode(get_name(), element);
1529  return invoke_and_get_future<bool>(request, partition_id_);
1530  }
1531 
1532  boost::future<bool> ISetImpl::remove(const serialization::pimpl::data &element) {
1533  auto request = protocol::codec::set_remove_encode(get_name(), element);
1534  return invoke_and_get_future<bool>(request, partition_id_);
1535  }
1536 
1537  boost::future<bool> ISetImpl::contains_all(const std::vector<serialization::pimpl::data> &elements) {
1538  auto request = protocol::codec::set_containsall_encode(get_name(), elements);
1539  return invoke_and_get_future<bool>(request, partition_id_);
1540  }
1541 
1542  boost::future<bool> ISetImpl::add_all(const std::vector<serialization::pimpl::data> &elements) {
1543  auto request = protocol::codec::set_addall_encode(get_name(), elements);
1544  return invoke_and_get_future<bool>(request, partition_id_);
1545  }
1546 
1547  boost::future<bool> ISetImpl::remove_all(const std::vector<serialization::pimpl::data> &elements) {
1548  auto request = protocol::codec::set_compareandremoveall_encode(get_name(), elements);
1549  return invoke_and_get_future<bool>(request, partition_id_);
1550  }
1551 
1552  boost::future<bool> ISetImpl::retain_all(const std::vector<serialization::pimpl::data> &elements) {
1553  auto request = protocol::codec::set_compareandretainall_encode(get_name(), elements);
1554  return invoke_and_get_future<bool>(request, partition_id_);
1555  }
1556 
1557  boost::future<void> ISetImpl::clear() {
1558  auto request = protocol::codec::set_clear_encode(get_name());
1559  return to_void_future(invoke_on_partition(request, partition_id_));
1560  }
1561 
1562  std::shared_ptr<spi::impl::ListenerMessageCodec>
1563  ISetImpl::create_item_listener_codec(bool include_value) {
1564  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1565  new SetListenerMessageCodec(get_name(), include_value));
1566  }
1567 
1568  ISetImpl::SetListenerMessageCodec::SetListenerMessageCodec(std::string name, bool include_value)
1569  : name_(std::move(name)), include_value_(include_value) {}
1570 
1571  protocol::ClientMessage
1572  ISetImpl::SetListenerMessageCodec::encode_add_request(bool local_only) const {
1573  return protocol::codec::set_addlistener_encode(name_, include_value_, local_only);
1574  }
1575 
1576  protocol::ClientMessage
1577  ISetImpl::SetListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id) const {
1578  return protocol::codec::set_removelistener_encode(name_, real_registration_id);
1579  }
1580 
1581  ITopicImpl::ITopicImpl(const std::string &instance_name, spi::ClientContext *context)
1582  : proxy::ProxyImpl("hz:impl:topicService", instance_name, context),
1583  partition_id_(get_partition_id(to_data(instance_name))) {}
1584 
1585  boost::future<void> ITopicImpl::publish(const serialization::pimpl::data &data) {
1586  auto request = protocol::codec::topic_publish_encode(get_name(), data);
1587  return to_void_future(invoke_on_partition(request, partition_id_));
1588  }
1589 
1590  boost::future<boost::uuids::uuid> ITopicImpl::add_message_listener(std::shared_ptr<impl::BaseEventHandler> topic_event_handler) {
1591  return register_listener(create_item_listener_codec(), std::move(topic_event_handler));
1592  }
1593 
1594  boost::future<bool> ITopicImpl::remove_message_listener(boost::uuids::uuid registration_id) {
1595  return get_context().get_client_listener_service().deregister_listener(registration_id);
1596  }
1597 
1598  std::shared_ptr<spi::impl::ListenerMessageCodec> ITopicImpl::create_item_listener_codec() {
1599  return std::shared_ptr<spi::impl::ListenerMessageCodec>(new TopicListenerMessageCodec(get_name()));
1600  }
1601 
1602  ITopicImpl::TopicListenerMessageCodec::TopicListenerMessageCodec(std::string name) : name_(std::move(name)) {}
1603 
1604  protocol::ClientMessage
1605  ITopicImpl::TopicListenerMessageCodec::encode_add_request(bool local_only) const {
1606  return protocol::codec::topic_addmessagelistener_encode(name_, local_only);
1607  }
1608 
1609  protocol::ClientMessage
1610  ITopicImpl::TopicListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id) const {
1611  return protocol::codec::topic_removemessagelistener_encode(name_, real_registration_id);
1612  }
1613 
1614  ReplicatedMapImpl::ReplicatedMapImpl(const std::string &service_name, const std::string &object_name,
1615  spi::ClientContext *context) : ProxyImpl(service_name, object_name,
1616  context),
1617  target_partition_id_(-1) {}
1618 
1619  const int32_t RingbufferImpl::MAX_BATCH_SIZE{ 1000 };
1620  }
1621 
1622  namespace map {
1623  const serialization::pimpl::data &data_entry_view::get_key() const {
1624  return key_;
1625  }
1626 
1627  const serialization::pimpl::data &data_entry_view::get_value() const {
1628  return value_;
1629  }
1630 
1631  int64_t data_entry_view::get_cost() const {
1632  return cost_;
1633  }
1634 
1635  int64_t data_entry_view::get_creation_time() const {
1636  return creation_time_;
1637  }
1638 
1639  int64_t data_entry_view::get_expiration_time() const {
1640  return expiration_time_;
1641  }
1642 
1643  int64_t data_entry_view::get_hits() const {
1644  return hits_;
1645  }
1646 
1647  int64_t data_entry_view::get_last_access_time() const {
1648  return last_access_time_;
1649  }
1650 
1651  int64_t data_entry_view::get_last_stored_time() const {
1652  return last_stored_time_;
1653  }
1654 
1655  int64_t data_entry_view::get_last_update_time() const {
1656  return last_update_time_;
1657  }
1658 
1659  int64_t data_entry_view::get_version() const {
1660  return version_;
1661  }
1662 
1663  int64_t data_entry_view::get_ttl() const {
1664  return ttl_;
1665  }
1666 
1667  int64_t data_entry_view::get_max_idle() const {
1668  return max_idle_;
1669  }
1670 
1671  data_entry_view::data_entry_view(serialization::pimpl::data &&key, serialization::pimpl::data &&value,
1672  int64_t cost, int64_t creation_time,
1673  int64_t expiration_time, int64_t hits, int64_t last_access_time,
1674  int64_t last_stored_time, int64_t last_update_time, int64_t version,
1675  int64_t ttl,
1676  int64_t max_idle) : key_(std::move(key)), value_(std::move(value)),
1677  cost_(cost),
1678  creation_time_(creation_time),
1679  expiration_time_(expiration_time),
1680  hits_(hits), last_access_time_(last_access_time),
1681  last_stored_time_(last_stored_time),
1682  last_update_time_(last_update_time), version_(version),
1683  ttl_(ttl),
1684  max_idle_(max_idle) {}
1685  }
1686 
1687  namespace topic {
1688  namespace impl {
1689  namespace reliable {
1690  ReliableTopicMessage::ReliableTopicMessage() : publish_time_(std::chrono::system_clock::now()) {}
1691 
1692  ReliableTopicMessage::ReliableTopicMessage(
1693  hazelcast::client::serialization::pimpl::data &&payload_data,
1694  std::unique_ptr<address> address)
1695  : publish_time_(std::chrono::system_clock::now())
1696  , payload_(std::move(payload_data)) {
1697  if (address) {
1698  publisher_address_ = boost::make_optional(*address);
1699  }
1700  }
1701 
1702  std::chrono::system_clock::time_point ReliableTopicMessage::get_publish_time() const {
1703  return publish_time_;
1704  }
1705 
1706  const boost::optional<address> &ReliableTopicMessage::get_publisher_address() const {
1707  return publisher_address_;
1708  }
1709 
1710  serialization::pimpl::data &ReliableTopicMessage::get_payload() {
1711  return payload_;
1712  }
1713  }
1714  }
1715  }
1716 
1717  namespace serialization {
1718  int32_t hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_factory_id() {
1719  return F_ID;
1720  }
1721 
1722  int hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_class_id() {
1723  return RELIABLE_TOPIC_MESSAGE;
1724  }
1725 
1726  void hz_serializer<topic::impl::reliable::ReliableTopicMessage>::write_data(
1727  const topic::impl::reliable::ReliableTopicMessage &object, object_data_output &out) {
1728  out.write<int64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(object.publish_time_.time_since_epoch()).count());
1729  out.write_object(object.publisher_address_);
1730  out.write(object.payload_.to_byte_array());
1731  }
1732 
1733  topic::impl::reliable::ReliableTopicMessage
1734  hz_serializer<topic::impl::reliable::ReliableTopicMessage>::read_data(object_data_input &in) {
1735  topic::impl::reliable::ReliableTopicMessage message;
1736  auto now = std::chrono::system_clock::now();
1737  message.publish_time_ = now + std::chrono::milliseconds(in.read<int64_t>()) - now.time_since_epoch();
1738  message.publisher_address_ = in.read_object<address>();
1739  message.payload_ = serialization::pimpl::data(in.read<std::vector<byte>>().value());
1740  return message;
1741  }
1742  }
1743 
1744  entry_event::entry_event(const std::string &name, member &&member, type event_type,
1745  typed_data &&key, typed_data &&value, typed_data &&old_value, typed_data &&merging_value)
1746  : name_(name), member_(std::move(member)), event_type_(event_type), key_(std::move(key)),
1747  value_(std::move(value)), old_value_(std::move(old_value)), merging_value_(std::move(merging_value)) {}
1748 
1750  return key_;
1751  }
1752 
1754  return old_value_;
1755  }
1756 
1758  return value_;
1759  }
1760 
1762  return merging_value_;
1763  }
1764 
1766  return member_;
1767  }
1768 
1769  entry_event::type entry_event::get_event_type() const {
1770  return event_type_;
1771  }
1772 
1773  const std::string &entry_event::get_name() const {
1774  return name_;
1775  }
1776 
1777  std::ostream &operator<<(std::ostream &os, const entry_event &event) {
1778  os << "name: " << event.name_ << " member: " << event.member_ << " eventType: " <<
1779  static_cast<int>(event.event_type_) << " key: " << event.key_.get_type() << " value: " << event.value_.get_type() <<
1780  " oldValue: " << event.old_value_.get_type() << " mergingValue: " << event.merging_value_.get_type();
1781  return os;
1782  }
1783 
1784  map_event::map_event(member &&member, entry_event::type event_type, const std::string &name,
1785  int number_of_entries_affected)
1786  : member_(member), event_type_(event_type), name_(name), number_of_entries_affected_(number_of_entries_affected) {}
1787 
1789  return member_;
1790  }
1791 
1792  entry_event::type map_event::get_event_type() const {
1793  return event_type_;
1794  }
1795 
1796  const std::string &map_event::get_name() const {
1797  return name_;
1798  }
1799 
1801  return number_of_entries_affected_;
1802  }
1803 
1804  std::ostream &operator<<(std::ostream &os, const map_event &event) {
1805  os << "MapEvent{member: " << event.member_ << " eventType: " << static_cast<int>(event.event_type_) << " name: " << event.name_
1806  << " numberOfEntriesAffected: " << event.number_of_entries_affected_;
1807  return os;
1808  }
1809 
1810  item_event_base::item_event_base(const std::string &name, const member &member, const item_event_type &event_type)
1811  : name_(name), member_(member), event_type_(event_type) {}
1812 
1814  return member_;
1815  }
1816 
1817  item_event_type item_event_base::get_event_type() const {
1818  return event_type_;
1819  }
1820 
1821  const std::string &item_event_base::get_name() const {
1822  return name_;
1823  }
1824 
1825  item_event_base::~item_event_base() = default;
1826 
1827  flake_id_generator::flake_id_generator(const std::string &object_name, spi::ClientContext *context)
1828  : flake_id_generator_impl(SERVICE_NAME, object_name, context) {}
1829  }
1830 }
const typed_data & get_key() const
Returns the key of the entry event.
Definition: proxy.cpp:1749
const std::string & get_name() const
Returns the name of the map for this event.
Definition: proxy.cpp:1773
const typed_data & get_old_value() const
Returns the old value of the entry event.
Definition: proxy.cpp:1753
type get_event_type() const
Return the event type.
Definition: proxy.cpp:1769
const member & get_member() const
Returns the member fired this event.
Definition: proxy.cpp:1765
const typed_data & get_value() const
Returns the value of the entry event.
Definition: proxy.cpp:1757
const typed_data & get_merging_value() const
Returns the incoming merging value of the entry event.
Definition: proxy.cpp:1761
static const std::chrono::milliseconds UNSET
Default TTL value of a record.
Definition: imap.h:1004
item_event_type get_event_type() const
Return the event type.
Definition: proxy.cpp:1817
const member & get_member() const
Returns the member fired this event.
Definition: proxy.cpp:1813
const std::string & get_name() const
Returns the name of the collection for this event.
Definition: proxy.cpp:1821
Map events common contract.
Definition: map_event.h:37
const std::string & get_name() const
Returns the name of the map for this event.
Definition: proxy.cpp:1796
entry_event::type get_event_type() const
Return the event type.
Definition: proxy.cpp:1792
int get_number_of_entries_affected() const
Returns the number of entries affected by this event.
Definition: proxy.cpp:1800
map_event(member &&member, entry_event::type event_type, const std::string &name, int number_of_entries_affected)
Constructor.
Definition: proxy.cpp:1784
const member & get_member() const
Returns the member fired this event.
Definition: proxy.cpp:1788
hz_cluster member class.
Definition: member.h:55
typed_data class is a wrapper class for the serialized binary data.
serialization::pimpl::object_type get_type() const