Hazelcast C++ Client
Hazelcast C++ Client Library
proxy.cpp
1 /*
2  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
19  *
20  * Licensed under the Apache License, Version 2.0 (the "License");
21  * you may not use this file except in compliance with the License.
22  * You may obtain a copy of the License at
23  *
24  * http://www.apache.org/licenses/LICENSE-2.0
25  *
26  * Unless required by applicable law or agreed to in writing, software
27  * distributed under the License is distributed on an "AS IS" BASIS,
28  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29  * See the License for the specific language governing permissions and
30  * limitations under the License.
31  */
32 
33 #include <unordered_set>
34 #include <atomic>
35 
36 #include "hazelcast/client/impl/ClientLockReferenceIdGenerator.h"
37 #include "hazelcast/client/proxy/PNCounterImpl.h"
38 #include "hazelcast/client/spi/ClientContext.h"
39 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
40 #include "hazelcast/client/proxy/flake_id_generator_impl.h"
41 #include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
42 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
43 #include "hazelcast/client/client_config.h"
44 #include "hazelcast/client/map/data_entry_view.h"
45 #include "hazelcast/client/proxy/RingbufferImpl.h"
46 #include "hazelcast/client/impl/vector_clock.h"
47 #include "hazelcast/client/internal/partition/strategy/StringPartitioningStrategy.h"
48 #include "hazelcast/util/Util.h"
49 #include "hazelcast/client/topic/reliable_listener.h"
50 
51 namespace hazelcast {
52  namespace client {
53  constexpr std::chrono::milliseconds imap::UNSET;
54 
55  reliable_topic::reliable_topic(const std::string &instance_name, spi::ClientContext *context)
56  : proxy::ProxyImpl(reliable_topic::SERVICE_NAME, instance_name, context),
57  executor_(context->get_client_execution_service().get_user_executor()), logger_(context->get_logger()) {
58  auto reliable_config = context->get_client_config().lookup_reliable_topic_config(instance_name);
59  if (reliable_config) {
60  batch_size_ = reliable_config->get_read_batch_size();
61  } else {
62  batch_size_ = config::reliable_topic_config::DEFAULT_READ_BATCH_SIZE;
63  }
64 
65  ringbuffer_ = context->get_hazelcast_client_implementation()->get_distributed_object<ringbuffer>(
66  std::string(reliable_topic::TOPIC_RB_PREFIX) + instance_name);
67  }
68 
69  bool reliable_topic::remove_message_listener(const std::string &registration_id) {
70  int id = util::IOUtil::to_value<int>(registration_id);
71  auto runner = runners_map_.get(id);
72  if (!runner) {
73  return false;
74  }
75  runner->cancel();
76  return true;
77  }
78 
79  void reliable_topic::on_shutdown() {
80  // cancel all runners
81  for (auto &entry : runners_map_.clear()) {
82  entry.second->cancel();
83  }
84  }
85 
86  void reliable_topic::on_destroy() {
87  // cancel all runners
88  for (auto &entry : runners_map_.clear()) {
89  entry.second->cancel();
90  }
91  }
92 
93  void reliable_topic::post_destroy() {
94  // destroy the underlying ringbuffer
95  ringbuffer_.get()->destroy().get();
96  }
97 
98  namespace topic {
99  reliable_listener::reliable_listener(bool loss_tolerant, int64_t initial_sequence_id)
100  : loss_tolerant_(loss_tolerant)
101  , initial_sequence_id_(initial_sequence_id) {}
102  }
103 
104  namespace impl {
105  ClientLockReferenceIdGenerator::ClientLockReferenceIdGenerator() : reference_id_counter_(0) {}
106 
107  int64_t ClientLockReferenceIdGenerator::get_next_reference_id() {
108  return ++reference_id_counter_;
109  }
110  }
111 
112  namespace proxy {
113  MultiMapImpl::MultiMapImpl(const std::string &instance_name, spi::ClientContext *context)
114  : ProxyImpl(multi_map::SERVICE_NAME, instance_name, context) {
115  // TODO: remove this line once the client instance get_distributed_object works as expected in Java for this proxy type
116  lock_reference_id_generator_ = get_context().get_lock_reference_id_generator();
117  }
118 
119  boost::future<bool> MultiMapImpl::put(const serialization::pimpl::data &key, const serialization::pimpl::data &value) {
120  auto request = protocol::codec::multimap_put_encode(get_name(), key, value,
121  util::get_current_thread_id());
122  return invoke_and_get_future<bool>(request, key);
123  }
124 
125  boost::future<std::vector<serialization::pimpl::data>> MultiMapImpl::get_data(const serialization::pimpl::data &key) {
126  auto request = protocol::codec::multimap_get_encode(get_name(), key,
127  util::get_current_thread_id());
128  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, key);
129  }
130 
131  boost::future<bool> MultiMapImpl::remove(const serialization::pimpl::data &key, const serialization::pimpl::data &value) {
132  auto request = protocol::codec::multimap_removeentry_encode(get_name(), key, value,
133  util::get_current_thread_id());
134  return invoke_and_get_future<bool>(request, key);
135  }
136 
137  boost::future<std::vector<serialization::pimpl::data>> MultiMapImpl::remove_data(const serialization::pimpl::data &key) {
138  auto request = protocol::codec::multimap_remove_encode(get_name(), key,
139  util::get_current_thread_id());
140  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, key);
141  }
142 
143  boost::future<std::vector<serialization::pimpl::data>> MultiMapImpl::key_set_data() {
144  auto request = protocol::codec::multimap_keyset_encode(get_name());
145  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
146  }
147 
148  boost::future<std::vector<serialization::pimpl::data>> MultiMapImpl::values_data() {
149  auto request = protocol::codec::multimap_values_encode(get_name());
150  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
151  }
152 
153  boost::future<EntryVector> MultiMapImpl::entry_set_data() {
154  auto request = protocol::codec::multimap_entryset_encode(get_name());
155  return invoke_and_get_future<EntryVector>(request);
156  }
157 
158  boost::future<bool> MultiMapImpl::contains_key(const serialization::pimpl::data &key) {
159  auto request = protocol::codec::multimap_containskey_encode(get_name(), key,
160  util::get_current_thread_id());
161  return invoke_and_get_future<bool>(request, key);
162  }
163 
164  boost::future<bool> MultiMapImpl::contains_value(const serialization::pimpl::data &value) {
165  auto request = protocol::codec::multimap_containsvalue_encode(get_name(), value);
166  return invoke_and_get_future<bool>(request);
167  }
168 
169  boost::future<bool> MultiMapImpl::contains_entry(const serialization::pimpl::data &key,
170  const serialization::pimpl::data &value) {
171  auto request = protocol::codec::multimap_containsentry_encode(get_name(), key, value,
172  util::get_current_thread_id());
173  return invoke_and_get_future<bool>(request, key);
174  }
175 
176  boost::future<int> MultiMapImpl::size() {
177  auto request = protocol::codec::multimap_size_encode(get_name());
178  return invoke_and_get_future<int>(request);
179  }
180 
181  boost::future<void> MultiMapImpl::clear() {
182  auto request = protocol::codec::multimap_clear_encode(get_name());
183  return to_void_future(invoke(request));
184  }
185 
186  boost::future<int> MultiMapImpl::value_count(const serialization::pimpl::data &key) {
187  auto request = protocol::codec::multimap_valuecount_encode(get_name(), key,
188  util::get_current_thread_id());
189  return invoke_and_get_future<int>(request, key);
190  }
191 
192  boost::future<boost::uuids::uuid>
193  MultiMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
194  bool include_value) {
195  return register_listener(create_multi_map_entry_listener_codec(include_value), std::move(entry_event_handler));
196  }
197 
198  boost::future<boost::uuids::uuid>
199  MultiMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
200  bool include_value, serialization::pimpl::data &&key) {
201  return register_listener(create_multi_map_entry_listener_codec(include_value, std::move(key)),
202  std::move(entry_event_handler));
203  }
204 
205  boost::future<bool> MultiMapImpl::remove_entry_listener(boost::uuids::uuid registration_id) {
206  return get_context().get_client_listener_service().deregister_listener(registration_id);
207  }
208 
209  boost::future<void> MultiMapImpl::lock(const serialization::pimpl::data &key) {
210  return lock(key, std::chrono::milliseconds(-1));
211  }
212 
213  boost::future<void> MultiMapImpl::lock(const serialization::pimpl::data &key, std::chrono::milliseconds lease_time) {
214  auto request = protocol::codec::multimap_lock_encode(get_name(), key, util::get_current_thread_id(),
215  std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
216  lock_reference_id_generator_->get_next_reference_id());
217  return to_void_future(invoke_on_partition(request, get_partition_id(key)));
218  }
219 
220  boost::future<bool> MultiMapImpl::is_locked(const serialization::pimpl::data &key) {
221  auto request = protocol::codec::multimap_islocked_encode(get_name(), key);
222  return invoke_and_get_future<bool>(request, key);
223  }
224 
225  boost::future<bool> MultiMapImpl::try_lock(const serialization::pimpl::data &key) {
226  auto request = protocol::codec::multimap_trylock_encode(get_name(), key,
227  util::get_current_thread_id(), INT64_MAX,
228  0,
229  lock_reference_id_generator_->get_next_reference_id());
230  return invoke_and_get_future<bool>(request, key);
231  }
232 
233  boost::future<bool> MultiMapImpl::try_lock(const serialization::pimpl::data &key, std::chrono::milliseconds timeout) {
234  return try_lock(key, timeout, std::chrono::milliseconds(INT64_MAX));
235  }
236 
237  boost::future<bool> MultiMapImpl::try_lock(const serialization::pimpl::data &key, std::chrono::milliseconds timeout, std::chrono::milliseconds lease_time) {
238  auto request = protocol::codec::multimap_trylock_encode(get_name(), key, util::get_current_thread_id(),
239  std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
240  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
241  lock_reference_id_generator_->get_next_reference_id());
242  return invoke_and_get_future<bool>(request, key);
243  }
244 
245  boost::future<void> MultiMapImpl::unlock(const serialization::pimpl::data &key) {
246  auto request = protocol::codec::multimap_unlock_encode(get_name(), key, util::get_current_thread_id(),
247  lock_reference_id_generator_->get_next_reference_id());
248  return to_void_future(invoke_on_partition(request, get_partition_id(key)));
249  }
250 
251  boost::future<void> MultiMapImpl::force_unlock(const serialization::pimpl::data &key) {
252  auto request = protocol::codec::multimap_forceunlock_encode(get_name(), key,
253  lock_reference_id_generator_->get_next_reference_id());
254  return to_void_future(invoke_on_partition(request, get_partition_id(key)));
255  }
256 
257  std::shared_ptr<spi::impl::ListenerMessageCodec>
258  MultiMapImpl::create_multi_map_entry_listener_codec(bool include_value) {
259  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
260  new MultiMapEntryListenerMessageCodec(get_name(), include_value));
261  }
262 
263  std::shared_ptr<spi::impl::ListenerMessageCodec>
264  MultiMapImpl::create_multi_map_entry_listener_codec(bool include_value, serialization::pimpl::data &&key) {
265  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
266  new MultiMapEntryListenerToKeyCodec(get_name(), include_value, std::move(key)));
267  }
268 
269  void MultiMapImpl::on_initialize() {
270  ProxyImpl::on_initialize();
271  lock_reference_id_generator_ = get_context().get_lock_reference_id_generator();
272  }
273 
274  MultiMapImpl::MultiMapEntryListenerMessageCodec::MultiMapEntryListenerMessageCodec(std::string name,
275  bool include_value)
276  : name_(std::move(name)), include_value_(include_value) {}
277 
278  protocol::ClientMessage
279  MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_add_request(bool local_only) const {
280  return protocol::codec::multimap_addentrylistener_encode(name_, include_value_, local_only);
281  }
282 
283  protocol::ClientMessage
284  MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_remove_request(
285  boost::uuids::uuid real_registration_id) const {
286  return protocol::codec::multimap_removeentrylistener_encode(name_, real_registration_id);
287  }
288 
289  protocol::ClientMessage
290  MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_add_request(bool local_only) const {
291  return protocol::codec::multimap_addentrylistenertokey_encode(name_, key_, include_value_,
292  local_only);
293  }
294 
295  protocol::ClientMessage
296  MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_remove_request(
297  boost::uuids::uuid real_registration_id) const {
298  return protocol::codec::multimap_removeentrylistener_encode(name_, real_registration_id);
299  }
300 
301  MultiMapImpl::MultiMapEntryListenerToKeyCodec::MultiMapEntryListenerToKeyCodec(std::string name,
302  bool include_value,
303  serialization::pimpl::data &&key)
304  : name_(std::move(name)), include_value_(include_value), key_(std::move(key)) {}
305 
306  const std::shared_ptr<std::unordered_set<member> > PNCounterImpl::EMPTY_ADDRESS_LIST(
307  new std::unordered_set<member>());
308 
309  PNCounterImpl::PNCounterImpl(const std::string &service_name, const std::string &object_name,
310  spi::ClientContext *context)
311  : ProxyImpl(service_name, object_name, context), max_configured_replica_count_(0),
312  observed_clock_(std::shared_ptr<impl::vector_clock>(new impl::vector_clock())),
313  logger_(context->get_logger()) {
314  }
315 
316  std::ostream &operator<<(std::ostream &os, const PNCounterImpl &proxy) {
317  os << "PNCounter{name='" << proxy.get_name() << "\'}";
318  return os;
319  }
320 
321  boost::future<int64_t> PNCounterImpl::get() {
322  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
323  if (!target) {
324  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster("ClientPNCounterProxy::get",
325  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
326  }
327  return invoke_get_internal(EMPTY_ADDRESS_LIST, nullptr, target);
328  }
329 
330  boost::future<int64_t> PNCounterImpl::get_and_add(int64_t delta) {
331  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
332  if (!target) {
333  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster("ClientPNCounterProxy::getAndAdd",
334  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
335  }
336  return invoke_add_internal(delta, true, EMPTY_ADDRESS_LIST,nullptr, target);
337  }
338 
339  boost::future<int64_t> PNCounterImpl::add_and_get(int64_t delta) {
340  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
341  if (!target) {
342  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster("ClientPNCounterProxy::addAndGet",
343  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
344  }
345  return invoke_add_internal(delta, false, EMPTY_ADDRESS_LIST,nullptr, target);
346  }
347 
348  boost::future<int64_t> PNCounterImpl::get_and_subtract(int64_t delta) {
349  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
350  if (!target) {
351  BOOST_THROW_EXCEPTION(
352  exception::no_data_member_in_cluster("ClientPNCounterProxy::getAndSubtract",
353  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
354  }
355  return invoke_add_internal(-delta, true, EMPTY_ADDRESS_LIST,nullptr, target);
356 
357  }
358 
359  boost::future<int64_t> PNCounterImpl::subtract_and_get(int64_t delta) {
360  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
361  if (!target) {
362  BOOST_THROW_EXCEPTION(
363  exception::no_data_member_in_cluster("ClientPNCounterProxy::subtractAndGet",
364  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
365  }
366  return invoke_add_internal(-delta, false, EMPTY_ADDRESS_LIST,nullptr, target);
367  }
368 
369  boost::future<int64_t> PNCounterImpl::decrement_and_get() {
370  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
371  if (!target) {
372  BOOST_THROW_EXCEPTION(
373  exception::no_data_member_in_cluster("ClientPNCounterProxy::decrementAndGet",
374  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
375  }
376  return invoke_add_internal(-1, false, EMPTY_ADDRESS_LIST,nullptr, target);
377  }
378 
379  boost::future<int64_t> PNCounterImpl::increment_and_get() {
380  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
381  if (!target) {
382  BOOST_THROW_EXCEPTION(
383  exception::no_data_member_in_cluster("ClientPNCounterProxy::incrementAndGet",
384  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
385  }
386  return invoke_add_internal(1, false, EMPTY_ADDRESS_LIST,nullptr, target);
387  }
388 
389  boost::future<int64_t> PNCounterImpl::get_and_decrement() {
390  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
391  if (!target) {
392  BOOST_THROW_EXCEPTION(
393  exception::no_data_member_in_cluster("ClientPNCounterProxy::getAndDecrement",
394  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
395  }
396  return invoke_add_internal(-1, true, EMPTY_ADDRESS_LIST,nullptr, target);
397  }
398 
399  boost::future<int64_t> PNCounterImpl::get_and_increment() {
400  boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
401  if (!target) {
402  BOOST_THROW_EXCEPTION(
403  exception::no_data_member_in_cluster("ClientPNCounterProxy::getAndIncrement",
404  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
405  }
406  return invoke_add_internal(1, true, EMPTY_ADDRESS_LIST,nullptr, target);
407  }
408 
409  boost::future<void> PNCounterImpl::reset() {
410  observed_clock_ = std::shared_ptr<impl::vector_clock>(new impl::vector_clock());
411  return boost::make_ready_future();
412  }
413 
414  boost::shared_ptr<member>
415  PNCounterImpl::get_crdt_operation_target(const std::unordered_set<member> &excluded_addresses) {
416  auto replicaAddress = current_target_replica_address_.load();
417  if (replicaAddress && excluded_addresses.find(*replicaAddress) == excluded_addresses.end()) {
418  return replicaAddress;
419  }
420 
421  {
422  std::lock_guard<std::mutex> guard(target_selection_mutex_);
423  replicaAddress = current_target_replica_address_.load();
424  if (!replicaAddress ||
425  excluded_addresses.find(*replicaAddress) != excluded_addresses.end()) {
426  current_target_replica_address_ = choose_target_replica(excluded_addresses);
427  }
428  }
429  return current_target_replica_address_;
430  }
431 
432  boost::shared_ptr<member>
433  PNCounterImpl::choose_target_replica(const std::unordered_set<member> &excluded_addresses) {
434  std::vector<member> replicaAddresses = get_replica_addresses(excluded_addresses);
435  if (replicaAddresses.empty()) {
436  return nullptr;
437  }
438  // TODO: Use a random generator as used in Java (ThreadLocalRandomProvider) which is per thread
439  int randomReplicaIndex = std::abs(rand()) % (int) replicaAddresses.size();
440  return boost::make_shared<member>(replicaAddresses[randomReplicaIndex]);
441  }
442 
443  std::vector<member> PNCounterImpl::get_replica_addresses(const std::unordered_set<member> &excluded_members) {
444  std::vector<member> dataMembers = get_context().get_client_cluster_service().get_members(
445  *member_selectors::DATA_MEMBER_SELECTOR);
446  int32_t replicaCount = get_max_configured_replica_count();
447  int currentReplicaCount = util::min<int>(replicaCount, (int) dataMembers.size());
448 
449  std::vector<member> replicaMembers;
450  for (int i = 0; i < currentReplicaCount; i++) {
451  if (excluded_members.find(dataMembers[i]) == excluded_members.end()) {
452  replicaMembers.push_back(dataMembers[i]);
453  }
454  }
455  return replicaMembers;
456  }
457 
458  int32_t PNCounterImpl::get_max_configured_replica_count() {
459  if (max_configured_replica_count_ > 0) {
460  return max_configured_replica_count_;
461  } else {
462  auto request = protocol::codec::pncounter_getconfiguredreplicacount_encode(
463  get_name());
464  max_configured_replica_count_ = invoke_and_get_future<int32_t>(request).get();
465  }
466  return max_configured_replica_count_;
467  }
468 
469  boost::shared_ptr<member>
470  PNCounterImpl::try_choose_a_new_target(std::shared_ptr<std::unordered_set<member>> excluded_addresses,
471  boost::shared_ptr<member> last_target,
472  const exception::hazelcast_ &last_exception) {
473  HZ_LOG(logger_, finest,
474  boost::str(boost::format("Exception occurred while invoking operation on target %1%, "
475  "choosing different target. Cause: %2%")
476  % last_target % last_exception)
477  );
478  if (excluded_addresses == EMPTY_ADDRESS_LIST) {
479  // TODO: Make sure that this only affects the local variable of the method
480  excluded_addresses = std::make_shared<std::unordered_set<member>>();
481  }
482  excluded_addresses->insert(*last_target);
483  return get_crdt_operation_target(*excluded_addresses);
484  }
485 
486  boost::future<int64_t>
487  PNCounterImpl::invoke_get_internal(std::shared_ptr<std::unordered_set<member>> excluded_addresses,
488  std::exception_ptr last_exception,
489  const boost::shared_ptr<member> &target) {
490  if (!target) {
491  if (last_exception) {
492  std::rethrow_exception(last_exception);
493  } else {
494  BOOST_THROW_EXCEPTION(
495  exception::no_data_member_in_cluster("ClientPNCounterProxy::invokeGetInternal",
496  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
497  }
498  }
499  try {
500  auto timestamps = observed_clock_.get()->entry_set();
501  auto request = protocol::codec::pncounter_get_encode(get_name(), timestamps, target->get_uuid());
502  return invoke_on_member(request, target->get_uuid()).then(boost::launch::sync,
503  [=](boost::future<protocol::ClientMessage> f) {
504  try {
505  return get_and_update_timestamps(
506  std::move(f));
507  } catch (exception::hazelcast_ &e) {
508  return invoke_get_internal(
509  excluded_addresses,
510  std::current_exception(),
511  try_choose_a_new_target(
512  excluded_addresses,
513  target, e)).get();
514  }
515  });
516  } catch (exception::hazelcast_ &e) {
517  return invoke_get_internal(excluded_addresses, std::current_exception(),
518  try_choose_a_new_target(excluded_addresses, target, e));
519  }
520  }
521 
522  boost::future<int64_t>
523  PNCounterImpl::invoke_add_internal(int64_t delta, bool getBeforeUpdate,
524  std::shared_ptr<std::unordered_set<member> > excluded_addresses,
525  std::exception_ptr last_exception,
526  const boost::shared_ptr<member> &target) {
527  if (!target) {
528  if (last_exception) {
529  std::rethrow_exception(last_exception);
530  } else {
531  BOOST_THROW_EXCEPTION(
532  exception::no_data_member_in_cluster("ClientPNCounterProxy::invokeGetInternal",
533  "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
534  }
535  }
536 
537  try {
538  auto request = protocol::codec::pncounter_add_encode(
539  get_name(), delta, getBeforeUpdate, observed_clock_.get()->entry_set(), target->get_uuid());
540  return invoke_on_member(request, target->get_uuid()).then(boost::launch::sync,
541  [=](boost::future<protocol::ClientMessage> f) {
542  try {
543  return get_and_update_timestamps(
544  std::move(f));
545  } catch (exception::hazelcast_ &e) {
546  return invoke_add_internal(delta,
547  getBeforeUpdate,
548  excluded_addresses,
549  std::current_exception(),
550  try_choose_a_new_target(
551  excluded_addresses,
552  target,
553  e)).get();
554  }
555  });
556  } catch (exception::hazelcast_ &e) {
557  return invoke_add_internal(delta, getBeforeUpdate, excluded_addresses, std::current_exception(),
558  try_choose_a_new_target(excluded_addresses, target, e));
559  }
560  }
561 
562  int64_t PNCounterImpl::get_and_update_timestamps(boost::future<protocol::ClientMessage> f) {
563  auto msg = f.get();
564  auto value = msg.get_first_fixed_sized_field<int64_t>();
565  // skip replica count
566  msg.get<int32_t>();
567  update_observed_replica_timestamps(msg.get<impl::vector_clock::timestamp_vector>());
568  return value;
569  }
570 
571  void PNCounterImpl::update_observed_replica_timestamps(
572  const impl::vector_clock::timestamp_vector &received_logical_timestamps) {
573  std::shared_ptr<impl::vector_clock> received = to_vector_clock(received_logical_timestamps);
574  for (;;) {
575  std::shared_ptr<impl::vector_clock> currentClock = this->observed_clock_;
576  if (currentClock->is_after(*received)) {
577  break;
578  }
579  if (observed_clock_.compare_and_set(currentClock, received)) {
580  break;
581  }
582  }
583  }
584 
585  std::shared_ptr<impl::vector_clock> PNCounterImpl::to_vector_clock(
586  const impl::vector_clock::timestamp_vector &replica_logical_timestamps) {
587  return std::shared_ptr<impl::vector_clock>(
588  new impl::vector_clock(replica_logical_timestamps));
589  }
590 
591  boost::shared_ptr<member> PNCounterImpl::get_current_target_replica_address() {
592  return current_target_replica_address_.load();
593  }
594 
595  IListImpl::IListImpl(const std::string &instance_name, spi::ClientContext *context)
596  : ProxyImpl("hz:impl:listService", instance_name, context) {
597  serialization::pimpl::data key_data = get_context().get_serialization_service().to_data<std::string>(
598  &instance_name);
599  partition_id_ = get_partition_id(key_data);
600  }
601 
602  boost::future<bool> IListImpl::remove_item_listener(boost::uuids::uuid registration_id) {
603  return get_context().get_client_listener_service().deregister_listener(registration_id);
604  }
605 
606  boost::future<int> IListImpl::size() {
607  auto request = protocol::codec::list_size_encode(get_name());
608  return invoke_and_get_future<int>(request, partition_id_);
609  }
610 
611  boost::future<bool> IListImpl::is_empty() {
612  auto request = protocol::codec::list_isempty_encode(get_name());
613  return invoke_and_get_future<bool>(request, partition_id_);
614  }
615 
616  boost::future<bool> IListImpl::contains(const serialization::pimpl::data &element) {
617  auto request = protocol::codec::list_contains_encode(get_name(), element);
618  return invoke_and_get_future<bool>(request, partition_id_);
619  }
620 
621  boost::future<std::vector<serialization::pimpl::data>> IListImpl::to_array_data() {
622  auto request = protocol::codec::list_getall_encode(get_name());
623  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
624  }
625 
626  boost::future<bool> IListImpl::add(const serialization::pimpl::data &element) {
627  auto request = protocol::codec::list_add_encode(get_name(), element);
628  return invoke_and_get_future<bool>(request, partition_id_);
629  }
630 
631  boost::future<bool> IListImpl::remove(const serialization::pimpl::data &element) {
632  auto request = protocol::codec::list_remove_encode(get_name(), element);
633  return invoke_and_get_future<bool>(request, partition_id_);
634  }
635 
636  boost::future<bool> IListImpl::contains_all_data(const std::vector<serialization::pimpl::data> &elements) {
637  auto request = protocol::codec::list_containsall_encode(get_name(), elements);
638  return invoke_and_get_future<bool>(request, partition_id_);
639  }
640 
641  boost::future<bool> IListImpl::add_all_data(const std::vector<serialization::pimpl::data> &elements) {
642  auto request = protocol::codec::list_addall_encode(get_name(), elements);
643  return invoke_and_get_future<bool>(request, partition_id_);
644  }
645 
646  boost::future<bool> IListImpl::add_all_data(int index, const std::vector<serialization::pimpl::data> &elements) {
647  auto request = protocol::codec::list_addallwithindex_encode(get_name(), index,
648  elements);
649  return invoke_and_get_future<bool>(request, partition_id_);
650  }
651 
652  boost::future<bool> IListImpl::remove_all_data(const std::vector<serialization::pimpl::data> &elements) {
653  auto request = protocol::codec::list_compareandremoveall_encode(get_name(), elements);
654  return invoke_and_get_future<bool>(request, partition_id_);
655  }
656 
657  boost::future<bool> IListImpl::retain_all_data(const std::vector<serialization::pimpl::data> &elements) {
658  auto request = protocol::codec::list_compareandretainall_encode(get_name(), elements);
659  return invoke_and_get_future<bool>(request, partition_id_);
660  }
661 
662  boost::future<void> IListImpl::clear() {
663  auto request = protocol::codec::list_clear_encode(get_name());
664  return to_void_future(invoke_on_partition(request, partition_id_));
665  }
666 
667  boost::future<boost::optional<serialization::pimpl::data>> IListImpl::get_data(int index) {
668  auto request = protocol::codec::list_get_encode(get_name(), index);
669  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
670  }
671 
672  boost::future<boost::optional<serialization::pimpl::data>> IListImpl::set_data(int index,
673  const serialization::pimpl::data &element) {
674  auto request = protocol::codec::list_set_encode(get_name(), index, element);
675  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
676  }
677 
678  boost::future<void> IListImpl::add(int index, const serialization::pimpl::data &element) {
679  auto request = protocol::codec::list_addwithindex_encode(get_name(), index, element);
680  return to_void_future(invoke_on_partition(request, partition_id_));
681  }
682 
683  boost::future<boost::optional<serialization::pimpl::data>> IListImpl::remove_data(int index) {
684  auto request = protocol::codec::list_removewithindex_encode(get_name(), index);
685  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
686  }
687 
688  boost::future<int> IListImpl::index_of(const serialization::pimpl::data &element) {
689  auto request = protocol::codec::list_indexof_encode(get_name(), element);
690  return invoke_and_get_future<int>(request, partition_id_);
691  }
692 
693  boost::future<int> IListImpl::last_index_of(const serialization::pimpl::data &element) {
694  auto request = protocol::codec::list_lastindexof_encode(get_name(), element);
695  return invoke_and_get_future<int>(request, partition_id_);
696  }
697 
698  boost::future<std::vector<serialization::pimpl::data>> IListImpl::sub_list_data(int from_index, int to_index) {
699  auto request = protocol::codec::list_sub_encode(get_name(), from_index, to_index);
700  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
701  }
702 
703  std::shared_ptr<spi::impl::ListenerMessageCodec> IListImpl::create_item_listener_codec(bool include_value) {
704  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
705  new ListListenerMessageCodec(get_name(), include_value));
706  }
707 
708  IListImpl::ListListenerMessageCodec::ListListenerMessageCodec(std::string name,
709  bool include_value) : name_(std::move(name)),
710  include_value_(
711  include_value) {}
712 
713  protocol::ClientMessage
714  IListImpl::ListListenerMessageCodec::encode_add_request(bool local_only) const {
715  return protocol::codec::list_addlistener_encode(name_, include_value_, local_only);
716  }
717 
718  protocol::ClientMessage
719  IListImpl::ListListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id) const {
720  return protocol::codec::list_removelistener_encode(name_, real_registration_id);
721  }
722 
723  flake_id_generator_impl::Block::Block(IdBatch &&id_batch, std::chrono::milliseconds validity)
724  : id_batch_(id_batch), invalid_since_(std::chrono::steady_clock::now() + validity), num_returned_(0) {}
725 
726  int64_t flake_id_generator_impl::Block::next() {
727  if (invalid_since_ <= std::chrono::steady_clock::now()) {
728  return INT64_MIN;
729  }
730  int32_t index;
731  do {
732  index = num_returned_;
733  if (index == id_batch_.get_batch_size()) {
734  return INT64_MIN;
735  }
736  } while (!num_returned_.compare_exchange_strong(index, index + 1));
737 
738  return id_batch_.get_base() + index * id_batch_.get_increment();
739  }
740 
741  flake_id_generator_impl::IdBatch::IdIterator flake_id_generator_impl::IdBatch::endOfBatch;
742 
743  const int64_t flake_id_generator_impl::IdBatch::get_base() const {
744  return base_;
745  }
746 
747  const int64_t flake_id_generator_impl::IdBatch::get_increment() const {
748  return increment_;
749  }
750 
751  const int32_t flake_id_generator_impl::IdBatch::get_batch_size() const {
752  return batch_size_;
753  }
754 
755  flake_id_generator_impl::IdBatch::IdBatch(int64_t base, int64_t increment, int32_t batch_size)
756  : base_(base), increment_(increment), batch_size_(batch_size) {}
757 
758  flake_id_generator_impl::IdBatch::IdIterator &flake_id_generator_impl::IdBatch::end() {
759  return endOfBatch;
760  }
761 
762  flake_id_generator_impl::IdBatch::IdIterator flake_id_generator_impl::IdBatch::iterator() {
763  return flake_id_generator_impl::IdBatch::IdIterator(base_, increment_, batch_size_);
764  }
765 
766  flake_id_generator_impl::IdBatch::IdIterator::IdIterator(int64_t base2, const int64_t increment, int32_t remaining) : base2_(
767  base2), increment_(increment), remaining_(remaining) {}
768 
769  bool flake_id_generator_impl::IdBatch::IdIterator::operator==(const flake_id_generator_impl::IdBatch::IdIterator &rhs) const {
770  return base2_ == rhs.base2_ && increment_ == rhs.increment_ && remaining_ == rhs.remaining_;
771  }
772 
773  bool flake_id_generator_impl::IdBatch::IdIterator::operator!=(const flake_id_generator_impl::IdBatch::IdIterator &rhs) const {
774  return !(rhs == *this);
775  }
776 
777  flake_id_generator_impl::IdBatch::IdIterator::IdIterator() : base2_(-1), increment_(-1), remaining_(-1) {
778  }
779 
780  flake_id_generator_impl::IdBatch::IdIterator &flake_id_generator_impl::IdBatch::IdIterator::operator++() {
781  if (remaining_ == 0) {
782  return flake_id_generator_impl::IdBatch::end();
783  }
784 
785  --remaining_;
786 
787  base2_ += increment_;
788 
789  return *this;
790  }
791 
792 
793  flake_id_generator_impl::flake_id_generator_impl(const std::string &service_name, const std::string &object_name,
794  spi::ClientContext *context)
795  : ProxyImpl(service_name, object_name, context), block_(nullptr) {
796  auto config = context->get_client_config().find_flake_id_generator_config(object_name);
797  batch_size_ = config->get_prefetch_count();
798  validity_ = config->get_prefetch_validity_duration();
799  }
800 
801  int64_t flake_id_generator_impl::new_id_internal() {
802  auto b = block_.load();
803  if (b) {
804  int64_t res = b->next();
805  if (res != INT64_MIN) {
806  return res;
807  }
808  }
809 
810  throw std::overflow_error("");
811  }
812 
813  boost::future<int64_t> flake_id_generator_impl::new_id() {
814  try {
815  return boost::make_ready_future(new_id_internal());
816  } catch (std::overflow_error &) {
817  return new_id_batch(batch_size_).then(boost::launch::sync,
818  [=](boost::future<flake_id_generator_impl::IdBatch> f) {
819  auto newBlock = boost::make_shared<Block>(f.get(),
820  validity_);
821  auto value = newBlock->next();
822  auto b = block_.load();
823  block_.compare_exchange_strong(b, newBlock);
824  return value;
825  });
826  }
827  }
828 
829  boost::future<flake_id_generator_impl::IdBatch> flake_id_generator_impl::new_id_batch(int32_t size) {
830  auto request = protocol::codec::flakeidgenerator_newidbatch_encode(
831  get_name(), size);
832  return invoke(request).then(boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
833  auto msg = f.get();
834  msg.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN);
835 
836  auto base = msg.get<int64_t>();
837  auto increment = msg.get<int64_t>();
838  auto batch_size = msg.get<int32_t>();
839  return flake_id_generator_impl::IdBatch(base, increment, batch_size);
840  });
841  }
842 
843  IQueueImpl::IQueueImpl(const std::string &instance_name, spi::ClientContext *context)
844  : ProxyImpl("hz:impl:queueService", instance_name, context) {
845  serialization::pimpl::data data = get_context().get_serialization_service().to_data<std::string>(
846  &instance_name);
847  partition_id_ = get_partition_id(data);
848  }
849 
850  boost::future<bool> IQueueImpl::remove_item_listener(
851  boost::uuids::uuid registration_id) {
852  return get_context().get_client_listener_service().deregister_listener(registration_id);
853  }
854 
855  boost::future<bool> IQueueImpl::offer(const serialization::pimpl::data &element, std::chrono::milliseconds timeout) {
856  auto request = protocol::codec::queue_offer_encode(get_name(), element,
857  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
858  return invoke_and_get_future<bool>(request, partition_id_);
859  }
860 
861  boost::future<void> IQueueImpl::put(const serialization::pimpl::data &element) {
862  auto request = protocol::codec::queue_put_encode(get_name(), element);
863  return to_void_future(invoke_on_partition(request, partition_id_));
864  }
865 
866  boost::future<boost::optional<serialization::pimpl::data>> IQueueImpl::poll_data(std::chrono::milliseconds timeout) {
867  auto request = protocol::codec::queue_poll_encode(get_name(), std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
868  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
869  }
870 
871  boost::future<int> IQueueImpl::remaining_capacity() {
872  auto request = protocol::codec::queue_remainingcapacity_encode(get_name());
873  return invoke_and_get_future<int>(request, partition_id_);
874  }
875 
876  boost::future<bool> IQueueImpl::remove(const serialization::pimpl::data &element) {
877  auto request = protocol::codec::queue_remove_encode(get_name(), element);
878  return invoke_and_get_future<bool>(request, partition_id_);
879  }
880 
881  boost::future<bool> IQueueImpl::contains(const serialization::pimpl::data &element) {
882  auto request = protocol::codec::queue_contains_encode(get_name(), element);
883  return invoke_and_get_future<bool>(request, partition_id_);
884  }
885 
886  boost::future<std::vector<serialization::pimpl::data>> IQueueImpl::drain_to_data(size_t max_elements) {
887  auto request = protocol::codec::queue_draintomaxsize_encode(get_name(), (int32_t) max_elements);
888 
889  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
890  }
891 
892  boost::future<std::vector<serialization::pimpl::data>> IQueueImpl::drain_to_data() {
893  auto request = protocol::codec::queue_drainto_encode(get_name());
894  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
895  }
896 
897  boost::future<boost::optional<serialization::pimpl::data>> IQueueImpl::take_data() {
898  auto request = protocol::codec::queue_take_encode(get_name());
899  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
900  }
901 
902  boost::future<boost::optional<serialization::pimpl::data>> IQueueImpl::peek_data() {
903  auto request = protocol::codec::queue_peek_encode(get_name());
904  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
905  }
906 
907  boost::future<int> IQueueImpl::size() {
908  auto request = protocol::codec::queue_size_encode(get_name());
909  return invoke_and_get_future<int>(request, partition_id_);
910  }
911 
912  boost::future<bool> IQueueImpl::is_empty() {
913  auto request = protocol::codec::queue_isempty_encode(get_name());
914  return invoke_and_get_future<bool>(request, partition_id_);
915  }
916 
917  boost::future<std::vector<serialization::pimpl::data>> IQueueImpl::to_array_data() {
918  auto request = protocol::codec::queue_iterator_encode(get_name());
919  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
920  }
921 
922  boost::future<bool> IQueueImpl::contains_all_data(const std::vector<serialization::pimpl::data> &elements) {
923  auto request = protocol::codec::queue_containsall_encode(get_name(), elements);
924  return invoke_and_get_future<bool>(request, partition_id_);
925  }
926 
927  boost::future<bool> IQueueImpl::add_all_data(const std::vector<serialization::pimpl::data> &elements) {
928  auto request = protocol::codec::queue_addall_encode(get_name(), elements);
929  return invoke_and_get_future<bool>(request, partition_id_);
930  }
931 
932  boost::future<bool> IQueueImpl::remove_all_data(const std::vector<serialization::pimpl::data> &elements) {
933  auto request = protocol::codec::queue_compareandremoveall_encode(get_name(), elements);
934  return invoke_and_get_future<bool>(request, partition_id_);
935  }
936 
937  boost::future<bool> IQueueImpl::retain_all_data(const std::vector<serialization::pimpl::data> &elements) {
938  auto request = protocol::codec::queue_compareandretainall_encode(get_name(), elements);
939  return invoke_and_get_future<bool>(request, partition_id_);
940  }
941 
942  boost::future<void> IQueueImpl::clear() {
943  auto request = protocol::codec::queue_clear_encode(get_name());
944  return to_void_future(invoke_on_partition(request, partition_id_));
945  }
946 
947  std::shared_ptr<spi::impl::ListenerMessageCodec>
948  IQueueImpl::create_item_listener_codec(bool include_value) {
949  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
950  new QueueListenerMessageCodec(get_name(), include_value));
951  }
952 
953  IQueueImpl::QueueListenerMessageCodec::QueueListenerMessageCodec(std::string name,
954  bool include_value) : name_(std::move(name)),
955  include_value_(
956  include_value) {}
957 
958  protocol::ClientMessage
959  IQueueImpl::QueueListenerMessageCodec::encode_add_request(bool local_only) const {
960  return protocol::codec::queue_addlistener_encode(name_, include_value_, local_only);
961  }
962 
963  protocol::ClientMessage
964  IQueueImpl::QueueListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id) const {
965  return protocol::codec::queue_removelistener_encode(name_, real_registration_id);
966  }
967 
968  ProxyImpl::ProxyImpl(const std::string &service_name, const std::string &object_name,
969  spi::ClientContext *context)
970  : ClientProxy(object_name, service_name, *context), SerializingProxy(*context, object_name) {}
971 
972  ProxyImpl::~ProxyImpl() = default;
973 
974  SerializingProxy::SerializingProxy(spi::ClientContext &context, const std::string &object_name)
975  : serialization_service_(context.get_serialization_service()),
976  partition_service_(context.get_partition_service()), object_name_(object_name), client_context_(context) {}
977 
978  int SerializingProxy::get_partition_id(const serialization::pimpl::data &key) {
979  return partition_service_.get_partition_id(key);
980  }
981 
982  boost::future<protocol::ClientMessage> SerializingProxy::invoke_on_partition(
983  protocol::ClientMessage &request, int partition_id) {
984  try {
985  return spi::impl::ClientInvocation::create(client_context_, std::make_shared<protocol::ClientMessage>(std::move(request)), object_name_, partition_id)->invoke();
986  } catch (exception::iexception &) {
987  util::exception_util::rethrow(std::current_exception());
988  return boost::make_ready_future(protocol::ClientMessage(0));
989  }
990  }
991 
992  boost::future<protocol::ClientMessage> SerializingProxy::invoke(protocol::ClientMessage &request) {
993  try {
994  return spi::impl::ClientInvocation::create(client_context_, std::make_shared<protocol::ClientMessage>(std::move(request)), object_name_)->invoke();
995  } catch (exception::iexception &) {
996  util::exception_util::rethrow(std::current_exception());
997  return boost::make_ready_future(protocol::ClientMessage(0));
998  }
999  }
1000 
1001  boost::future<protocol::ClientMessage> SerializingProxy::invoke_on_connection(protocol::ClientMessage &request,
1002  std::shared_ptr<connection::Connection> connection) {
1003  try {
1004  return spi::impl::ClientInvocation::create(client_context_, std::make_shared<protocol::ClientMessage>(std::move(request)), object_name_, connection)->invoke();
1005  } catch (exception::iexception &) {
1006  util::exception_util::rethrow(std::current_exception());
1007  return boost::make_ready_future(protocol::ClientMessage(0));
1008  }
1009  }
1010 
1011  boost::future<protocol::ClientMessage>
1012  SerializingProxy::invoke_on_key_owner(protocol::ClientMessage &request,
1013  const serialization::pimpl::data &key_data) {
1014  try {
1015  return invoke_on_partition(request, get_partition_id(key_data));
1016  } catch (exception::iexception &) {
1017  util::exception_util::rethrow(std::current_exception());
1018  return boost::make_ready_future(protocol::ClientMessage(0));
1019  }
1020  }
1021 
1022  boost::future<protocol::ClientMessage>
1023  SerializingProxy::invoke_on_member(protocol::ClientMessage &request, boost::uuids::uuid uuid) {
1024  try {
1025  auto invocation = spi::impl::ClientInvocation::create(client_context_, std::make_shared<protocol::ClientMessage>(std::move(request)), object_name_, uuid);
1026  return invocation->invoke();
1027  } catch (exception::iexception &) {
1028  util::exception_util::rethrow(std::current_exception());
1029  return boost::make_ready_future(protocol::ClientMessage(0));
1030  }
1031  }
1032 
1033  template<>
1034  boost::future<boost::optional<serialization::pimpl::data>>
1035  SerializingProxy::invoke_and_get_future(protocol::ClientMessage &request) {
1036  return decode_optional_var_sized<serialization::pimpl::data>(invoke(request));
1037  }
1038 
1039  template<>
1040  boost::future<boost::optional<map::data_entry_view>>
1041  SerializingProxy::invoke_and_get_future(protocol::ClientMessage &request, const serialization::pimpl::data &key) {
1042  return decode_optional_var_sized<map::data_entry_view>(invoke_on_key_owner(request, key));
1043  }
1044 
1045  template<>
1046  boost::future<boost::optional<serialization::pimpl::data>>
1047  SerializingProxy::invoke_and_get_future(protocol::ClientMessage &request, int partition_id) {
1048  return decode_optional_var_sized<serialization::pimpl::data>(invoke_on_partition(request, partition_id));
1049  }
1050 
1051  template<>
1052  boost::future<boost::optional<serialization::pimpl::data>>
1053  SerializingProxy::invoke_and_get_future(protocol::ClientMessage &request, const serialization::pimpl::data &key) {
1054  return decode_optional_var_sized<serialization::pimpl::data>(invoke_on_key_owner(request, key));
1055  }
1056 
1057  PartitionSpecificClientProxy::PartitionSpecificClientProxy(const std::string &service_name,
1058  const std::string &object_name,
1059  spi::ClientContext *context) : ProxyImpl(
1060  service_name, object_name, context), partition_id_(-1) {}
1061 
1062  void PartitionSpecificClientProxy::on_initialize() {
1063  std::string partitionKey = internal::partition::strategy::StringPartitioningStrategy::get_partition_key(
1064  name_);
1065  partition_id_ = get_context().get_partition_service().get_partition_id(to_data<std::string>(partitionKey));
1066  }
1067 
1068  IMapImpl::IMapImpl(const std::string &instance_name, spi::ClientContext *context)
1069  : ProxyImpl("hz:impl:mapService", instance_name, context) {}
1070 
1071  boost::future<bool> IMapImpl::contains_key(const serialization::pimpl::data &key) {
1072  auto request = protocol::codec::map_containskey_encode(get_name(), key, util::get_current_thread_id());
1073  return invoke_and_get_future<bool>(request, key);
1074  }
1075 
1076  boost::future<bool> IMapImpl::contains_value(const serialization::pimpl::data &value) {
1077  auto request = protocol::codec::map_containsvalue_encode(get_name(), value);
1078  return invoke_and_get_future<bool>(request);
1079  }
1080 
1081  boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::get_data(const serialization::pimpl::data &key) {
1082  auto request = protocol::codec::map_get_encode(get_name(), key, util::get_current_thread_id());
1083  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1084  }
1085 
1086  boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::remove_data(const serialization::pimpl::data &key) {
1087  auto request = protocol::codec::map_remove_encode(get_name(), key, util::get_current_thread_id());
1088  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1089  }
1090 
1091  boost::future<bool> IMapImpl::remove(const serialization::pimpl::data &key, const serialization::pimpl::data &value) {
1092  auto request = protocol::codec::map_removeifsame_encode(get_name(), key, value,
1093  util::get_current_thread_id());
1094  return invoke_and_get_future<bool>(request, key);
1095  }
1096 
1097  boost::future<protocol::ClientMessage> IMapImpl::remove_all(const serialization::pimpl::data &predicate_data) {
1098  auto request = protocol::codec::map_removeall_encode(get_name(), predicate_data);
1099  return invoke(request);
1100  }
1101 
1102  boost::future<protocol::ClientMessage> IMapImpl::delete_entry(const serialization::pimpl::data &key) {
1103  auto request = protocol::codec::map_delete_encode(get_name(), key, util::get_current_thread_id());
1104  return invoke_on_partition(request, get_partition_id(key));
1105  }
1106 
1107  boost::future<protocol::ClientMessage> IMapImpl::flush() {
1108  auto request = protocol::codec::map_flush_encode(get_name());
1109  return invoke(request);
1110  }
1111 
1112  boost::future<bool> IMapImpl::try_remove(const serialization::pimpl::data &key, std::chrono::milliseconds timeout) {
1113  auto request = protocol::codec::map_tryremove_encode(get_name(), key,
1114  util::get_current_thread_id(),
1115  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1116 
1117  return invoke_and_get_future<bool>(request, key);
1118  }
1119 
1120  boost::future<bool> IMapImpl::try_put(const serialization::pimpl::data &key, const serialization::pimpl::data &value,
1121  std::chrono::milliseconds timeout) {
1122  auto request = protocol::codec::map_tryput_encode(get_name(), key, value,
1123  util::get_current_thread_id(),
1124  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1125 
1126  return invoke_and_get_future<bool>(request, key);
1127  }
1128 
1129  boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::put_data(const serialization::pimpl::data &key,
1130  const serialization::pimpl::data &value,
1131  std::chrono::milliseconds ttl) {
1132  auto request = protocol::codec::map_put_encode(get_name(), key, value,
1133  util::get_current_thread_id(),
1134  std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1135  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1136  }
1137 
1138  boost::future<protocol::ClientMessage> IMapImpl::put_transient(const serialization::pimpl::data &key, const serialization::pimpl::data &value,
1139  std::chrono::milliseconds ttl) {
1140  auto request = protocol::codec::map_puttransient_encode(get_name(), key, value,
1141  util::get_current_thread_id(),
1142  std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1143  return invoke_on_partition(request, get_partition_id(key));
1144  }
1145 
1146  boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::put_if_absent_data(const serialization::pimpl::data &key,
1147  const serialization::pimpl::data &value,
1148  std::chrono::milliseconds ttl) {
1149  auto request = protocol::codec::map_putifabsent_encode(get_name(), key, value,
1150  util::get_current_thread_id(),
1151  std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1152  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1153  }
1154 
1155  boost::future<bool> IMapImpl::replace(const serialization::pimpl::data &key, const serialization::pimpl::data &old_value,
1156  const serialization::pimpl::data &new_value) {
1157  auto request = protocol::codec::map_replaceifsame_encode(get_name(), key, old_value,
1158  new_value,
1159  util::get_current_thread_id());
1160 
1161  return invoke_and_get_future<bool>(request, key);
1162  }
1163 
1164  boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::replace_data(const serialization::pimpl::data &key,
1165  const serialization::pimpl::data &value) {
1166  auto request = protocol::codec::map_replace_encode(get_name(), key, value,
1167  util::get_current_thread_id());
1168 
1169  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1170  }
1171 
1172  boost::future<protocol::ClientMessage>
1173  IMapImpl::set(const serialization::pimpl::data &key, const serialization::pimpl::data &value,
1174  std::chrono::milliseconds ttl) {
1175  auto request = protocol::codec::map_set_encode(get_name(), key, value,
1176  util::get_current_thread_id(),
1177  std::chrono::duration_cast<std::chrono::milliseconds>(
1178  ttl).count());
1179  return invoke_on_partition(request, get_partition_id(key));
1180  }
1181 
1182  boost::future<protocol::ClientMessage> IMapImpl::lock(const serialization::pimpl::data &key) {
1183  return lock(key, std::chrono::milliseconds(-1));
1184  }
1185 
1186  boost::future<protocol::ClientMessage> IMapImpl::lock(const serialization::pimpl::data &key, std::chrono::milliseconds lease_time) {
1187  auto request = protocol::codec::map_lock_encode(get_name(), key, util::get_current_thread_id(),
1188  std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1189  lock_reference_id_generator_->get_next_reference_id());
1190  return invoke_on_partition(request, get_partition_id(key));
1191  }
1192 
1193  boost::future<bool> IMapImpl::is_locked(const serialization::pimpl::data &key) {
1194  auto request = protocol::codec::map_islocked_encode(get_name(), key);
1195 
1196  return invoke_and_get_future<bool>(request, key);
1197  }
1198 
1199  boost::future<bool> IMapImpl::try_lock(const serialization::pimpl::data &key, std::chrono::milliseconds timeout) {
1200  return try_lock(key, timeout, std::chrono::milliseconds(-1));
1201  }
1202 
1203  boost::future<bool>
1204  IMapImpl::try_lock(const serialization::pimpl::data &key, std::chrono::milliseconds timeout,
1205  std::chrono::milliseconds lease_time) {
1206  auto request = protocol::codec::map_trylock_encode(get_name(), key, util::get_current_thread_id(),
1207  std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1208  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
1209  lock_reference_id_generator_->get_next_reference_id());
1210  return invoke_and_get_future<bool>(request, key);
1211  }
1212 
1213  boost::future<protocol::ClientMessage> IMapImpl::unlock(const serialization::pimpl::data &key) {
1214  auto request = protocol::codec::map_unlock_encode(get_name(), key, util::get_current_thread_id(),
1215  lock_reference_id_generator_->get_next_reference_id());
1216  return invoke_on_partition(request, get_partition_id(key));
1217  }
1218 
1219  boost::future<protocol::ClientMessage> IMapImpl::force_unlock(const serialization::pimpl::data &key) {
1220  auto request = protocol::codec::map_forceunlock_encode(get_name(), key,
1221  lock_reference_id_generator_->get_next_reference_id());
1222  return invoke_on_partition(request, get_partition_id(key));
1223  }
1224 
1225  boost::future<std::string> IMapImpl::add_interceptor(const serialization::pimpl::data &interceptor) {
1226  auto request = protocol::codec::map_addinterceptor_encode(get_name(), interceptor);
1227  return invoke_and_get_future<std::string>(request);
1228  }
1229 
1230  // TODO: We can use generic template Listener instead of impl::BaseEventHandler to prevent the virtual function calls
1231  boost::future<boost::uuids::uuid> IMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler, bool include_value, int32_t listener_flags) {
1232  return register_listener(create_map_entry_listener_codec(include_value, listener_flags), std::move(entry_event_handler));
1233  }
1234 
1235  boost::future<boost::uuids::uuid>
1236  IMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1237  serialization::pimpl::data &&predicate, bool include_value,
1238  int32_t listener_flags) {
1239  return register_listener(
1240  create_map_entry_listener_codec(include_value, std::move(predicate), listener_flags),
1241  std::move(entry_event_handler));
1242  }
1243 
1244  boost::future<bool> IMapImpl::remove_entry_listener(boost::uuids::uuid registration_id) {
1245  return get_context().get_client_listener_service().deregister_listener(registration_id);
1246  }
1247 
1248  boost::future<boost::uuids::uuid>
1249  IMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1250  bool include_value, serialization::pimpl::data &&key, int32_t listener_flags) {
1251  return register_listener(create_map_entry_listener_codec(include_value, listener_flags, std::move(key)),
1252  std::move(entry_event_handler));
1253  }
1254 
1255  boost::future<boost::optional<map::data_entry_view>> IMapImpl::get_entry_view_data(const serialization::pimpl::data &key) {
1256  auto request = protocol::codec::map_getentryview_encode(get_name(), key,
1257  util::get_current_thread_id());
1258  return invoke_and_get_future<boost::optional<map::data_entry_view>>(request, key);
1259  }
1260 
1261  boost::future<bool> IMapImpl::evict(const serialization::pimpl::data &key) {
1262  auto request = protocol::codec::map_evict_encode(get_name(), key, util::get_current_thread_id());
1263  return invoke_and_get_future<bool>(request, key);
1264  }
1265 
1266  boost::future<protocol::ClientMessage> IMapImpl::evict_all() {
1267  auto request = protocol::codec::map_evictall_encode(get_name());
1268  return invoke(request);
1269  }
1270 
1271  boost::future<EntryVector>
1272  IMapImpl::get_all_data(int partition_id, const std::vector<serialization::pimpl::data> &keys) {
1273  auto request = protocol::codec::map_getall_encode(get_name(), keys);
1274  return invoke_and_get_future<EntryVector>(request, partition_id);
1275  }
1276 
1277  boost::future<std::vector<serialization::pimpl::data>> IMapImpl::key_set_data() {
1278  auto request = protocol::codec::map_keyset_encode(get_name());
1279  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
1280  }
1281 
1282  boost::future<std::vector<serialization::pimpl::data>> IMapImpl::key_set_data(const serialization::pimpl::data &predicate) {
1283  auto request = protocol::codec::map_keysetwithpredicate_encode(get_name(), predicate);
1284  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
1285  }
1286 
1287  boost::future<std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>> IMapImpl::key_set_for_paging_predicate_data(
1288  protocol::codec::holder::paging_predicate_holder const & predicate) {
1289  auto request = protocol::codec::map_keysetwithpagingpredicate_encode(get_name(), predicate);
1290  return invoke(request).then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1291  return get_paging_predicate_response<std::vector<serialization::pimpl::data>>(std::move(f));
1292  });
1293  }
1294 
1295  boost::future<EntryVector> IMapImpl::entry_set_data() {
1296  auto request = protocol::codec::map_entryset_encode(get_name());
1297  return invoke_and_get_future<EntryVector>(request);
1298  }
1299 
1300  boost::future<EntryVector> IMapImpl::entry_set_data(const serialization::pimpl::data &predicate) {
1301  auto request = protocol::codec::map_entrieswithpredicate_encode(get_name(), predicate);
1302  return invoke_and_get_future<EntryVector>(request);
1303  }
1304 
1305  boost::future<std::pair<EntryVector, query::anchor_data_list>> IMapImpl::entry_set_for_paging_predicate_data(
1306  protocol::codec::holder::paging_predicate_holder const & predicate) {
1307  auto request = protocol::codec::map_entrieswithpagingpredicate_encode(get_name(), predicate);
1308  return invoke(request).then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1309  return get_paging_predicate_response<EntryVector>(std::move(f));
1310  });
1311  }
1312 
1313  boost::future<std::vector<serialization::pimpl::data>> IMapImpl::values_data() {
1314  auto request = protocol::codec::map_values_encode(get_name());
1315  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
1316  }
1317 
1318  boost::future<std::vector<serialization::pimpl::data>> IMapImpl::values_data(const serialization::pimpl::data &predicate) {
1319  auto request = protocol::codec::map_valueswithpredicate_encode(get_name(), predicate);
1320  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
1321  }
1322 
1323  boost::future<std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
1324  IMapImpl::values_for_paging_predicate_data(protocol::codec::holder::paging_predicate_holder const & predicate) {
1325  auto request = protocol::codec::map_valueswithpagingpredicate_encode(get_name(), predicate);
1326  return invoke(request).then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1327  return get_paging_predicate_response<std::vector<serialization::pimpl::data>>(std::move(f));
1328  });
1329  }
1330 
1331  boost::future<protocol::ClientMessage> IMapImpl::add_index_data(const config::index_config &config) {
1332  auto request = protocol::codec::map_addindex_encode(get_name(), config);
1333  return invoke(request);
1334  }
1335 
1336  boost::future<int> IMapImpl::size() {
1337  auto request = protocol::codec::map_size_encode(get_name());
1338  return invoke_and_get_future<int>(request);
1339  }
1340 
1341  boost::future<bool> IMapImpl::is_empty() {
1342  auto request = protocol::codec::map_isempty_encode(get_name());
1343  return invoke_and_get_future<bool>(request);
1344  }
1345 
1346  boost::future<protocol::ClientMessage> IMapImpl::put_all_data(int partition_id, const EntryVector &entries) {
1347  auto request = protocol::codec::map_putall_encode(get_name(), entries, true);
1348  return invoke_on_partition(request, partition_id);
1349  }
1350 
1351  boost::future<protocol::ClientMessage> IMapImpl::clear_data() {
1352  auto request = protocol::codec::map_clear_encode(get_name());
1353  return invoke(request);
1354  }
1355 
1356  boost::future<boost::optional<serialization::pimpl::data>>
1357  IMapImpl::execute_on_key_data(const serialization::pimpl::data &key,
1358  const serialization::pimpl::data &processor) {
1359  auto request = protocol::codec::map_executeonkey_encode(get_name(),
1360  processor,
1361  key,
1362  util::get_current_thread_id());
1363  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, get_partition_id(key));
1364  }
1365 
1366  boost::future<boost::optional<serialization::pimpl::data>>
1367  IMapImpl::submit_to_key_data(const serialization::pimpl::data &key,
1368  const serialization::pimpl::data &processor) {
1369  auto request = protocol::codec::map_submittokey_encode(get_name(),
1370  processor,
1371  key,
1372  util::get_current_thread_id());
1373  return invoke_on_partition(request, get_partition_id(key)).then(boost::launch::sync,
1374  [](boost::future<protocol::ClientMessage> f) {
1375  auto msg = f.get();
1376  msg.skip_frame();
1377  return msg.get_nullable<serialization::pimpl::data>();
1378  });
1379  }
1380 
1381  boost::future<EntryVector> IMapImpl::execute_on_keys_data(const std::vector<serialization::pimpl::data> &keys,
1382  const serialization::pimpl::data &processor) {
1383  auto request = protocol::codec::map_executeonkeys_encode(get_name(), processor, keys);
1384  return invoke_and_get_future<EntryVector>(request);
1385  }
1386 
1387  boost::future<protocol::ClientMessage> IMapImpl::remove_interceptor(const std::string &id) {
1388  auto request = protocol::codec::map_removeinterceptor_encode(get_name(), id);
1389  return invoke(request);
1390  }
1391 
1392  boost::future<EntryVector> IMapImpl::execute_on_entries_data(const serialization::pimpl::data &entry_processor) {
1393  auto request = protocol::codec::map_executeonallkeys_encode(
1394  get_name(), entry_processor);
1395  return invoke_and_get_future<EntryVector>(request);
1396 
1397  }
1398 
1399  boost::future<EntryVector>
1400  IMapImpl::execute_on_entries_data(const serialization::pimpl::data &entry_processor, const serialization::pimpl::data &predicate) {
1401  auto request = protocol::codec::map_executewithpredicate_encode(get_name(),
1402  entry_processor,
1403  predicate);
1404  return invoke_and_get_future<EntryVector>(request);
1405  }
1406 
1407 
1408  std::shared_ptr<spi::impl::ListenerMessageCodec>
1409  IMapImpl::create_map_entry_listener_codec(bool include_value, serialization::pimpl::data &&predicate,
1410  int32_t listener_flags) {
1411  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1412  new MapEntryListenerWithPredicateMessageCodec(get_name(), include_value, listener_flags, std::move(predicate)));
1413  }
1414 
1415  std::shared_ptr<spi::impl::ListenerMessageCodec>
1416  IMapImpl::create_map_entry_listener_codec(bool include_value, int32_t listener_flags) {
1417  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1418  new MapEntryListenerMessageCodec(get_name(), include_value, listener_flags));
1419  }
1420 
1421  std::shared_ptr<spi::impl::ListenerMessageCodec>
1422  IMapImpl::create_map_entry_listener_codec(bool include_value, int32_t listener_flags,
1423  serialization::pimpl::data &&key) {
1424  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1425  new MapEntryListenerToKeyCodec(get_name(), include_value, listener_flags, std::move(key)));
1426  }
1427 
1428  void IMapImpl::on_initialize() {
1429  ProxyImpl::on_initialize();
1430  lock_reference_id_generator_ = get_context().get_lock_reference_id_generator();
1431  }
1432 
1433  IMapImpl::MapEntryListenerMessageCodec::MapEntryListenerMessageCodec(std::string name,
1434  bool include_value,
1435  int32_t listener_flags) : name_(std::move(name)),
1436  include_value_(
1437  include_value),
1438  listener_flags_(
1439  listener_flags) {}
1440 
1441  protocol::ClientMessage
1442  IMapImpl::MapEntryListenerMessageCodec::encode_add_request(bool local_only) const {
1443  return protocol::codec::map_addentrylistener_encode(name_, include_value_,
1444  static_cast<int32_t>(listener_flags_),
1445  local_only);
1446  }
1447 
1448  protocol::ClientMessage
1449  IMapImpl::MapEntryListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id) const {
1450  return protocol::codec::map_removeentrylistener_encode(name_, real_registration_id);
1451  }
1452 
1453  protocol::ClientMessage
1454  IMapImpl::MapEntryListenerToKeyCodec::encode_add_request(bool local_only) const {
1455  return protocol::codec::map_addentrylistenertokey_encode(name_, key_, include_value_,
1456  static_cast<int32_t>(listener_flags_), local_only);
1457  }
1458 
1459  protocol::ClientMessage
1460  IMapImpl::MapEntryListenerToKeyCodec::encode_remove_request(boost::uuids::uuid real_registration_id) const {
1461  return protocol::codec::map_removeentrylistener_encode(name_, real_registration_id);
1462  }
1463 
1464  IMapImpl::MapEntryListenerToKeyCodec::MapEntryListenerToKeyCodec(std::string name, bool include_value,
1465  int32_t listener_flags,
1466  serialization::pimpl::data key)
1467  : name_(std::move(name)), include_value_(include_value), listener_flags_(listener_flags), key_(std::move(key)) {}
1468 
1469  IMapImpl::MapEntryListenerWithPredicateMessageCodec::MapEntryListenerWithPredicateMessageCodec(
1470  std::string name, bool include_value, int32_t listener_flags,
1471  serialization::pimpl::data &&predicate) : name_(std::move(name)), include_value_(include_value),
1472  listener_flags_(listener_flags), predicate_(std::move(predicate)) {}
1473 
1474  protocol::ClientMessage
1475  IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_add_request(bool local_only) const {
1476  return protocol::codec::map_addentrylistenerwithpredicate_encode(name_, predicate_,
1477  include_value_,
1478  static_cast<int32_t>(listener_flags_), local_only);
1479  }
1480 
1481  protocol::ClientMessage
1482  IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_remove_request(
1483  boost::uuids::uuid real_registration_id) const {
1484  return protocol::codec::map_removeentrylistener_encode(name_, real_registration_id);
1485  }
1486 
1487  TransactionalQueueImpl::TransactionalQueueImpl(const std::string &name,
1488  txn::TransactionProxy &transaction_proxy)
1489  : TransactionalObject(iqueue::SERVICE_NAME, name, transaction_proxy) {}
1490 
1491  boost::future<bool> TransactionalQueueImpl::offer(const serialization::pimpl::data &e, std::chrono::milliseconds timeout) {
1492  auto request = protocol::codec::transactionalqueue_offer_encode(
1493  get_name(), get_transaction_id(), util::get_current_thread_id(), e, std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1494 
1495  return invoke_and_get_future<bool>(request);
1496  }
1497 
1498  boost::future<boost::optional<serialization::pimpl::data>> TransactionalQueueImpl::poll_data(std::chrono::milliseconds timeout) {
1499  auto request = protocol::codec::transactionalqueue_poll_encode(
1500  get_name(), get_transaction_id(), util::get_current_thread_id(), std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1501 
1502  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request);
1503  }
1504 
1505  boost::future<int> TransactionalQueueImpl::size() {
1506  auto request = protocol::codec::transactionalqueue_size_encode(
1507  get_name(), get_transaction_id(), util::get_current_thread_id());
1508 
1509  return invoke_and_get_future<int>(request);
1510  }
1511 
1512  ISetImpl::ISetImpl(const std::string &instance_name, spi::ClientContext *client_context)
1513  : ProxyImpl(iset::SERVICE_NAME, instance_name, client_context) {
1514  serialization::pimpl::data key_data = get_context().get_serialization_service().to_data<std::string>(
1515  &instance_name);
1516  partition_id_ = get_partition_id(key_data);
1517  }
1518 
1519  boost::future<bool> ISetImpl::remove_item_listener(boost::uuids::uuid registration_id) {
1520  return get_context().get_client_listener_service().deregister_listener(registration_id);
1521  }
1522 
1523  boost::future<int> ISetImpl::size() {
1524  auto request = protocol::codec::set_size_encode(get_name());
1525  return invoke_and_get_future<int>(request, partition_id_);
1526  }
1527 
1528  boost::future<bool> ISetImpl::is_empty() {
1529  auto request = protocol::codec::set_isempty_encode(get_name());
1530  return invoke_and_get_future<bool>(request, partition_id_);
1531  }
1532 
1533  boost::future<bool> ISetImpl::contains(const serialization::pimpl::data &element) {
1534  auto request = protocol::codec::set_contains_encode(get_name(), element);
1535  return invoke_and_get_future<bool>(request, partition_id_);
1536  }
1537 
1538  boost::future<std::vector<serialization::pimpl::data>> ISetImpl::to_array_data() {
1539  auto request = protocol::codec::set_getall_encode(get_name());
1540  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
1541  }
1542 
1543  boost::future<bool> ISetImpl::add(const serialization::pimpl::data &element) {
1544  auto request = protocol::codec::set_add_encode(get_name(), element);
1545  return invoke_and_get_future<bool>(request, partition_id_);
1546  }
1547 
1548  boost::future<bool> ISetImpl::remove(const serialization::pimpl::data &element) {
1549  auto request = protocol::codec::set_remove_encode(get_name(), element);
1550  return invoke_and_get_future<bool>(request, partition_id_);
1551  }
1552 
1553  boost::future<bool> ISetImpl::contains_all(const std::vector<serialization::pimpl::data> &elements) {
1554  auto request = protocol::codec::set_containsall_encode(get_name(), elements);
1555  return invoke_and_get_future<bool>(request, partition_id_);
1556  }
1557 
1558  boost::future<bool> ISetImpl::add_all(const std::vector<serialization::pimpl::data> &elements) {
1559  auto request = protocol::codec::set_addall_encode(get_name(), elements);
1560  return invoke_and_get_future<bool>(request, partition_id_);
1561  }
1562 
1563  boost::future<bool> ISetImpl::remove_all(const std::vector<serialization::pimpl::data> &elements) {
1564  auto request = protocol::codec::set_compareandremoveall_encode(get_name(), elements);
1565  return invoke_and_get_future<bool>(request, partition_id_);
1566  }
1567 
1568  boost::future<bool> ISetImpl::retain_all(const std::vector<serialization::pimpl::data> &elements) {
1569  auto request = protocol::codec::set_compareandretainall_encode(get_name(), elements);
1570  return invoke_and_get_future<bool>(request, partition_id_);
1571  }
1572 
1573  boost::future<void> ISetImpl::clear() {
1574  auto request = protocol::codec::set_clear_encode(get_name());
1575  return to_void_future(invoke_on_partition(request, partition_id_));
1576  }
1577 
1578  std::shared_ptr<spi::impl::ListenerMessageCodec>
1579  ISetImpl::create_item_listener_codec(bool include_value) {
1580  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1581  new SetListenerMessageCodec(get_name(), include_value));
1582  }
1583 
1584  ISetImpl::SetListenerMessageCodec::SetListenerMessageCodec(std::string name, bool include_value)
1585  : name_(std::move(name)), include_value_(include_value) {}
1586 
1587  protocol::ClientMessage
1588  ISetImpl::SetListenerMessageCodec::encode_add_request(bool local_only) const {
1589  return protocol::codec::set_addlistener_encode(name_, include_value_, local_only);
1590  }
1591 
1592  protocol::ClientMessage
1593  ISetImpl::SetListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id) const {
1594  return protocol::codec::set_removelistener_encode(name_, real_registration_id);
1595  }
1596 
1597  ITopicImpl::ITopicImpl(const std::string &instance_name, spi::ClientContext *context)
1598  : proxy::ProxyImpl("hz:impl:topicService", instance_name, context),
1599  partition_id_(get_partition_id(to_data(instance_name))) {}
1600 
1601  boost::future<void> ITopicImpl::publish(const serialization::pimpl::data &data) {
1602  auto request = protocol::codec::topic_publish_encode(get_name(), data);
1603  return to_void_future(invoke_on_partition(request, partition_id_));
1604  }
1605 
1606  boost::future<boost::uuids::uuid> ITopicImpl::add_message_listener(std::shared_ptr<impl::BaseEventHandler> topic_event_handler) {
1607  return register_listener(create_item_listener_codec(), std::move(topic_event_handler));
1608  }
1609 
1610  boost::future<bool> ITopicImpl::remove_message_listener(boost::uuids::uuid registration_id) {
1611  return get_context().get_client_listener_service().deregister_listener(registration_id);
1612  }
1613 
1614  std::shared_ptr<spi::impl::ListenerMessageCodec> ITopicImpl::create_item_listener_codec() {
1615  return std::shared_ptr<spi::impl::ListenerMessageCodec>(new TopicListenerMessageCodec(get_name()));
1616  }
1617 
1618  ITopicImpl::TopicListenerMessageCodec::TopicListenerMessageCodec(std::string name) : name_(std::move(name)) {}
1619 
1620  protocol::ClientMessage
1621  ITopicImpl::TopicListenerMessageCodec::encode_add_request(bool local_only) const {
1622  return protocol::codec::topic_addmessagelistener_encode(name_, local_only);
1623  }
1624 
1625  protocol::ClientMessage
1626  ITopicImpl::TopicListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id) const {
1627  return protocol::codec::topic_removemessagelistener_encode(name_, real_registration_id);
1628  }
1629 
1630  ReplicatedMapImpl::ReplicatedMapImpl(const std::string &service_name, const std::string &object_name,
1631  spi::ClientContext *context) : ProxyImpl(service_name, object_name,
1632  context),
1633  target_partition_id_(-1) {}
1634 
1635  constexpr int32_t RingbufferImpl::MAX_BATCH_SIZE;
1636  }
1637 
1638  namespace map {
1639  const serialization::pimpl::data &data_entry_view::get_key() const {
1640  return key_;
1641  }
1642 
1643  const serialization::pimpl::data &data_entry_view::get_value() const {
1644  return value_;
1645  }
1646 
1647  int64_t data_entry_view::get_cost() const {
1648  return cost_;
1649  }
1650 
1651  int64_t data_entry_view::get_creation_time() const {
1652  return creation_time_;
1653  }
1654 
1655  int64_t data_entry_view::get_expiration_time() const {
1656  return expiration_time_;
1657  }
1658 
1659  int64_t data_entry_view::get_hits() const {
1660  return hits_;
1661  }
1662 
1663  int64_t data_entry_view::get_last_access_time() const {
1664  return last_access_time_;
1665  }
1666 
1667  int64_t data_entry_view::get_last_stored_time() const {
1668  return last_stored_time_;
1669  }
1670 
1671  int64_t data_entry_view::get_last_update_time() const {
1672  return last_update_time_;
1673  }
1674 
1675  int64_t data_entry_view::get_version() const {
1676  return version_;
1677  }
1678 
1679  int64_t data_entry_view::get_ttl() const {
1680  return ttl_;
1681  }
1682 
1683  int64_t data_entry_view::get_max_idle() const {
1684  return max_idle_;
1685  }
1686 
1687  data_entry_view::data_entry_view(serialization::pimpl::data &&key, serialization::pimpl::data &&value,
1688  int64_t cost, int64_t creation_time,
1689  int64_t expiration_time, int64_t hits, int64_t last_access_time,
1690  int64_t last_stored_time, int64_t last_update_time, int64_t version,
1691  int64_t ttl,
1692  int64_t max_idle) : key_(std::move(key)), value_(std::move(value)),
1693  cost_(cost),
1694  creation_time_(creation_time),
1695  expiration_time_(expiration_time),
1696  hits_(hits), last_access_time_(last_access_time),
1697  last_stored_time_(last_stored_time),
1698  last_update_time_(last_update_time), version_(version),
1699  ttl_(ttl),
1700  max_idle_(max_idle) {}
1701  }
1702 
1703  namespace topic {
1704  namespace impl {
1705  namespace reliable {
1706  ReliableTopicMessage::ReliableTopicMessage() : publish_time_(std::chrono::system_clock::now()) {}
1707 
1708  ReliableTopicMessage::ReliableTopicMessage(
1709  hazelcast::client::serialization::pimpl::data &&payload_data,
1710  std::unique_ptr<address> address)
1711  : publish_time_(std::chrono::system_clock::now())
1712  , payload_(std::move(payload_data)) {
1713  if (address) {
1714  publisher_address_ = boost::make_optional(*address);
1715  }
1716  }
1717 
1718  std::chrono::system_clock::time_point ReliableTopicMessage::get_publish_time() const {
1719  return publish_time_;
1720  }
1721 
1722  const boost::optional<address> &ReliableTopicMessage::get_publisher_address() const {
1723  return publisher_address_;
1724  }
1725 
1726  serialization::pimpl::data &ReliableTopicMessage::get_payload() {
1727  return payload_;
1728  }
1729  }
1730  }
1731  }
1732 
1733  namespace serialization {
1734  int32_t hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_factory_id() {
1735  return F_ID;
1736  }
1737 
1738  int hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_class_id() {
1739  return RELIABLE_TOPIC_MESSAGE;
1740  }
1741 
1742  void hz_serializer<topic::impl::reliable::ReliableTopicMessage>::write_data(
1743  const topic::impl::reliable::ReliableTopicMessage &object, object_data_output &out) {
1744  out.write<int64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(object.publish_time_.time_since_epoch()).count());
1745  out.write_object(object.publisher_address_);
1746  out.write(object.payload_.to_byte_array());
1747  }
1748 
1749  topic::impl::reliable::ReliableTopicMessage
1750  hz_serializer<topic::impl::reliable::ReliableTopicMessage>::read_data(object_data_input &in) {
1751  topic::impl::reliable::ReliableTopicMessage message;
1752  auto now = std::chrono::system_clock::now();
1753  message.publish_time_ = now + std::chrono::milliseconds(in.read<int64_t>()) - now.time_since_epoch();
1754  message.publisher_address_ = in.read_object<address>();
1755  message.payload_ = serialization::pimpl::data(in.read<std::vector<byte>>().value());
1756  return message;
1757  }
1758  }
1759 
1760  entry_event::entry_event(const std::string &name, member &&member, type event_type,
1761  typed_data &&key, typed_data &&value, typed_data &&old_value, typed_data &&merging_value)
1762  : name_(name), member_(std::move(member)), event_type_(event_type), key_(std::move(key)),
1763  value_(std::move(value)), old_value_(std::move(old_value)), merging_value_(std::move(merging_value)) {}
1764 
1766  return key_;
1767  }
1768 
1770  return old_value_;
1771  }
1772 
1774  return value_;
1775  }
1776 
1778  return merging_value_;
1779  }
1780 
1782  return member_;
1783  }
1784 
1785  entry_event::type entry_event::get_event_type() const {
1786  return event_type_;
1787  }
1788 
1789  const std::string &entry_event::get_name() const {
1790  return name_;
1791  }
1792 
1793  std::ostream &operator<<(std::ostream &os, const entry_event &event) {
1794  os << "name: " << event.name_ << " member: " << event.member_ << " eventType: " <<
1795  static_cast<int>(event.event_type_) << " key: " << event.key_.get_type() << " value: " << event.value_.get_type() <<
1796  " oldValue: " << event.old_value_.get_type() << " mergingValue: " << event.merging_value_.get_type();
1797  return os;
1798  }
1799 
1800  map_event::map_event(member &&member, entry_event::type event_type, const std::string &name,
1801  int number_of_entries_affected)
1802  : member_(member), event_type_(event_type), name_(name), number_of_entries_affected_(number_of_entries_affected) {}
1803 
1805  return member_;
1806  }
1807 
1808  entry_event::type map_event::get_event_type() const {
1809  return event_type_;
1810  }
1811 
1812  const std::string &map_event::get_name() const {
1813  return name_;
1814  }
1815 
1817  return number_of_entries_affected_;
1818  }
1819 
1820  std::ostream &operator<<(std::ostream &os, const map_event &event) {
1821  os << "MapEvent{member: " << event.member_ << " eventType: " << static_cast<int>(event.event_type_) << " name: " << event.name_
1822  << " numberOfEntriesAffected: " << event.number_of_entries_affected_;
1823  return os;
1824  }
1825 
1826  item_event_base::item_event_base(const std::string &name, const member &member, const item_event_type &event_type)
1827  : name_(name), member_(member), event_type_(event_type) {}
1828 
1830  return member_;
1831  }
1832 
1833  item_event_type item_event_base::get_event_type() const {
1834  return event_type_;
1835  }
1836 
1837  const std::string &item_event_base::get_name() const {
1838  return name_;
1839  }
1840 
1841  item_event_base::~item_event_base() = default;
1842 
1843  flake_id_generator::flake_id_generator(const std::string &object_name, spi::ClientContext *context)
1844  : flake_id_generator_impl(SERVICE_NAME, object_name, context) {}
1845  }
1846 }
const typed_data & get_key() const
Returns the key of the entry event.
Definition: proxy.cpp:1765
const std::string & get_name() const
Returns the name of the map for this event.
Definition: proxy.cpp:1789
const typed_data & get_old_value() const
Returns the old value of the entry event.
Definition: proxy.cpp:1769
type get_event_type() const
Return the event type.
Definition: proxy.cpp:1785
const member & get_member() const
Returns the member fired this event.
Definition: proxy.cpp:1781
const typed_data & get_value() const
Returns the value of the entry event.
Definition: proxy.cpp:1773
const typed_data & get_merging_value() const
Returns the incoming merging value of the entry event.
Definition: proxy.cpp:1777
static constexpr std::chrono::milliseconds UNSET
Default TTL value of a record.
Definition: imap.h:1004
item_event_type get_event_type() const
Return the event type.
Definition: proxy.cpp:1833
const member & get_member() const
Returns the member fired this event.
Definition: proxy.cpp:1829
const std::string & get_name() const
Returns the name of the collection for this event.
Definition: proxy.cpp:1837
Map events common contract.
Definition: map_event.h:37
const std::string & get_name() const
Returns the name of the map for this event.
Definition: proxy.cpp:1812
entry_event::type get_event_type() const
Return the event type.
Definition: proxy.cpp:1808
int get_number_of_entries_affected() const
Returns the number of entries affected by this event.
Definition: proxy.cpp:1816
map_event(member &&member, entry_event::type event_type, const std::string &name, int number_of_entries_affected)
Constructor.
Definition: proxy.cpp:1800
const member & get_member() const
Returns the member fired this event.
Definition: proxy.cpp:1804
hz_cluster member class.
Definition: member.h:39
typed_data class is a wrapper class for the serialized binary data.
serialization::pimpl::object_type get_type() const