Hazelcast C++ Client
Hazelcast C++ Client Library
proxy.cpp
1 /*
2  * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <unordered_set>
18 #include <atomic>
19 
20 #include "hazelcast/client/impl/ClientLockReferenceIdGenerator.h"
21 #include "hazelcast/client/proxy/PNCounterImpl.h"
22 #include "hazelcast/client/spi/ClientContext.h"
23 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
24 #include "hazelcast/client/proxy/flake_id_generator_impl.h"
25 #include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
26 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
27 #include "hazelcast/client/client_config.h"
28 #include "hazelcast/client/map/data_entry_view.h"
29 #include "hazelcast/client/proxy/RingbufferImpl.h"
30 #include "hazelcast/client/impl/vector_clock.h"
31 #include "hazelcast/client/internal/partition/strategy/StringPartitioningStrategy.h"
32 #include "hazelcast/util/Util.h"
33 #include "hazelcast/client/topic/reliable_listener.h"
34 
35 namespace hazelcast {
36 namespace client {
37 const std::chrono::milliseconds imap::UNSET{ -1 };
38 
39 reliable_topic::reliable_topic(const std::string& instance_name,
40  spi::ClientContext* context)
41  : proxy::ProxyImpl(reliable_topic::SERVICE_NAME, instance_name, context)
42  , executor_(context->get_client_execution_service().get_user_executor())
43  , logger_(context->get_logger())
44 {
45  auto reliable_config =
46  context->get_client_config().lookup_reliable_topic_config(instance_name);
47  if (reliable_config) {
48  batch_size_ = reliable_config->get_read_batch_size();
49  } else {
50  batch_size_ = config::reliable_topic_config::DEFAULT_READ_BATCH_SIZE;
51  }
52 
53  ringbuffer_ =
54  context->get_hazelcast_client_implementation()
55  ->get_distributed_object<ringbuffer>(
56  std::string(reliable_topic::TOPIC_RB_PREFIX) + instance_name);
57 }
58 
59 bool
60 reliable_topic::remove_message_listener(const std::string& registration_id)
61 {
62  int id = util::IOUtil::to_value<int>(registration_id);
63  auto runner = runners_map_.get(id);
64  if (!runner) {
65  return false;
66  }
67  runner->cancel();
68  return true;
69 }
70 
71 void
72 reliable_topic::on_shutdown()
73 {
74  // cancel all runners
75  for (auto& entry : runners_map_.clear()) {
76  entry.second->cancel();
77  }
78 }
79 
80 void
81 reliable_topic::on_destroy()
82 {
83  // cancel all runners
84  for (auto& entry : runners_map_.clear()) {
85  entry.second->cancel();
86  }
87 }
88 
89 void
90 reliable_topic::post_destroy()
91 {
92  // destroy the underlying ringbuffer
93  ringbuffer_.get()->destroy().get();
94 }
95 
96 namespace topic {
97 reliable_listener::reliable_listener(bool loss_tolerant,
98  int64_t initial_sequence_id)
99  : loss_tolerant_(loss_tolerant)
100  , initial_sequence_id_(initial_sequence_id)
101 {}
102 } // namespace topic
103 
104 namespace impl {
105 ClientLockReferenceIdGenerator::ClientLockReferenceIdGenerator()
106  : reference_id_counter_(0)
107 {}
108 
109 int64_t
110 ClientLockReferenceIdGenerator::get_next_reference_id()
111 {
112  return ++reference_id_counter_;
113 }
114 } // namespace impl
115 
116 namespace proxy {
117 MultiMapImpl::MultiMapImpl(const std::string& instance_name,
118  spi::ClientContext* context)
119  : ProxyImpl(multi_map::SERVICE_NAME, instance_name, context)
120 {
121  // TODO: remove this line once the client instance get_distributed_object
122  // works as expected in Java for this proxy type
123  lock_reference_id_generator_ =
124  get_context().get_lock_reference_id_generator();
125 }
126 
127 boost::future<bool>
128 MultiMapImpl::put(const serialization::pimpl::data& key,
129  const serialization::pimpl::data& value)
130 {
131  auto request = protocol::codec::multimap_put_encode(
132  get_name(), key, value, util::get_current_thread_id());
133  return invoke_and_get_future<bool>(request, key);
134 }
135 
136 boost::future<std::vector<serialization::pimpl::data>>
137 MultiMapImpl::get_data(const serialization::pimpl::data& key)
138 {
139  auto request = protocol::codec::multimap_get_encode(
140  get_name(), key, util::get_current_thread_id());
141  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
142  request, key);
143 }
144 
145 boost::future<bool>
146 MultiMapImpl::remove(const serialization::pimpl::data& key,
147  const serialization::pimpl::data& value)
148 {
149  auto request = protocol::codec::multimap_removeentry_encode(
150  get_name(), key, value, util::get_current_thread_id());
151  return invoke_and_get_future<bool>(request, key);
152 }
153 
154 boost::future<std::vector<serialization::pimpl::data>>
155 MultiMapImpl::remove_data(const serialization::pimpl::data& key)
156 {
157  auto request = protocol::codec::multimap_remove_encode(
158  get_name(), key, util::get_current_thread_id());
159  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
160  request, key);
161 }
162 
163 boost::future<std::vector<serialization::pimpl::data>>
164 MultiMapImpl::key_set_data()
165 {
166  auto request = protocol::codec::multimap_keyset_encode(get_name());
167  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
168  request);
169 }
170 
171 boost::future<std::vector<serialization::pimpl::data>>
172 MultiMapImpl::values_data()
173 {
174  auto request = protocol::codec::multimap_values_encode(get_name());
175  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
176  request);
177 }
178 
179 boost::future<EntryVector>
180 MultiMapImpl::entry_set_data()
181 {
182  auto request = protocol::codec::multimap_entryset_encode(get_name());
183  return invoke_and_get_future<EntryVector>(request);
184 }
185 
186 boost::future<bool>
187 MultiMapImpl::contains_key(const serialization::pimpl::data& key)
188 {
189  auto request = protocol::codec::multimap_containskey_encode(
190  get_name(), key, util::get_current_thread_id());
191  return invoke_and_get_future<bool>(request, key);
192 }
193 
194 boost::future<bool>
195 MultiMapImpl::contains_value(const serialization::pimpl::data& value)
196 {
197  auto request =
198  protocol::codec::multimap_containsvalue_encode(get_name(), value);
199  return invoke_and_get_future<bool>(request);
200 }
201 
202 boost::future<bool>
203 MultiMapImpl::contains_entry(const serialization::pimpl::data& key,
204  const serialization::pimpl::data& value)
205 {
206  auto request = protocol::codec::multimap_containsentry_encode(
207  get_name(), key, value, util::get_current_thread_id());
208  return invoke_and_get_future<bool>(request, key);
209 }
210 
211 boost::future<int>
212 MultiMapImpl::size()
213 {
214  auto request = protocol::codec::multimap_size_encode(get_name());
215  return invoke_and_get_future<int>(request);
216 }
217 
218 boost::future<void>
219 MultiMapImpl::clear()
220 {
221  auto request = protocol::codec::multimap_clear_encode(get_name());
222  return to_void_future(invoke(request));
223 }
224 
225 boost::future<int>
226 MultiMapImpl::value_count(const serialization::pimpl::data& key)
227 {
228  auto request = protocol::codec::multimap_valuecount_encode(
229  get_name(), key, util::get_current_thread_id());
230  return invoke_and_get_future<int>(request, key);
231 }
232 
233 boost::future<boost::uuids::uuid>
234 MultiMapImpl::add_entry_listener(
235  std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
236  bool include_value)
237 {
238  return register_listener(
239  create_multi_map_entry_listener_codec(include_value),
240  std::move(entry_event_handler));
241 }
242 
243 boost::future<boost::uuids::uuid>
244 MultiMapImpl::add_entry_listener(
245  std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
246  bool include_value,
247  serialization::pimpl::data&& key)
248 {
249  return register_listener(
250  create_multi_map_entry_listener_codec(include_value, std::move(key)),
251  std::move(entry_event_handler));
252 }
253 
254 boost::future<bool>
255 MultiMapImpl::remove_entry_listener(boost::uuids::uuid registration_id)
256 {
257  return get_context().get_client_listener_service().deregister_listener(
258  registration_id);
259 }
260 
261 boost::future<void>
262 MultiMapImpl::lock(const serialization::pimpl::data& key)
263 {
264  return lock(key, std::chrono::milliseconds(-1));
265 }
266 
267 boost::future<void>
268 MultiMapImpl::lock(const serialization::pimpl::data& key,
269  std::chrono::milliseconds lease_time)
270 {
271  auto request = protocol::codec::multimap_lock_encode(
272  get_name(),
273  key,
274  util::get_current_thread_id(),
275  std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
276  lock_reference_id_generator_->get_next_reference_id());
277  return to_void_future(invoke_on_partition(request, get_partition_id(key)));
278 }
279 
280 boost::future<bool>
281 MultiMapImpl::is_locked(const serialization::pimpl::data& key)
282 {
283  auto request = protocol::codec::multimap_islocked_encode(get_name(), key);
284  return invoke_and_get_future<bool>(request, key);
285 }
286 
287 boost::future<bool>
288 MultiMapImpl::try_lock(const serialization::pimpl::data& key)
289 {
290  auto request = protocol::codec::multimap_trylock_encode(
291  get_name(),
292  key,
293  util::get_current_thread_id(),
294  INT64_MAX,
295  0,
296  lock_reference_id_generator_->get_next_reference_id());
297  return invoke_and_get_future<bool>(request, key);
298 }
299 
300 boost::future<bool>
301 MultiMapImpl::try_lock(const serialization::pimpl::data& key,
302  std::chrono::milliseconds timeout)
303 {
304  return try_lock(key, timeout, std::chrono::milliseconds(INT64_MAX));
305 }
306 
307 boost::future<bool>
308 MultiMapImpl::try_lock(const serialization::pimpl::data& key,
309  std::chrono::milliseconds timeout,
310  std::chrono::milliseconds lease_time)
311 {
312  auto request = protocol::codec::multimap_trylock_encode(
313  get_name(),
314  key,
315  util::get_current_thread_id(),
316  std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
317  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
318  lock_reference_id_generator_->get_next_reference_id());
319  return invoke_and_get_future<bool>(request, key);
320 }
321 
322 boost::future<void>
323 MultiMapImpl::unlock(const serialization::pimpl::data& key)
324 {
325  auto request = protocol::codec::multimap_unlock_encode(
326  get_name(),
327  key,
328  util::get_current_thread_id(),
329  lock_reference_id_generator_->get_next_reference_id());
330  return to_void_future(invoke_on_partition(request, get_partition_id(key)));
331 }
332 
333 boost::future<void>
334 MultiMapImpl::force_unlock(const serialization::pimpl::data& key)
335 {
336  auto request = protocol::codec::multimap_forceunlock_encode(
337  get_name(), key, lock_reference_id_generator_->get_next_reference_id());
338  return to_void_future(invoke_on_partition(request, get_partition_id(key)));
339 }
340 
341 std::shared_ptr<spi::impl::ListenerMessageCodec>
342 MultiMapImpl::create_multi_map_entry_listener_codec(bool include_value)
343 {
344  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
345  new MultiMapEntryListenerMessageCodec(get_name(), include_value));
346 }
347 
348 std::shared_ptr<spi::impl::ListenerMessageCodec>
349 MultiMapImpl::create_multi_map_entry_listener_codec(
350  bool include_value,
351  serialization::pimpl::data&& key)
352 {
353  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
354  new MultiMapEntryListenerToKeyCodec(
355  get_name(), include_value, std::move(key)));
356 }
357 
358 void
359 MultiMapImpl::on_initialize()
360 {
361  ProxyImpl::on_initialize();
362  lock_reference_id_generator_ =
363  get_context().get_lock_reference_id_generator();
364 }
365 
366 MultiMapImpl::MultiMapEntryListenerMessageCodec::
367  MultiMapEntryListenerMessageCodec(std::string name, bool include_value)
368  : name_(std::move(name))
369  , include_value_(include_value)
370 {}
371 
372 protocol::ClientMessage
373 MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_add_request(
374  bool local_only) const
375 {
376  return protocol::codec::multimap_addentrylistener_encode(
377  name_, include_value_, local_only);
378 }
379 
380 protocol::ClientMessage
381 MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_remove_request(
382  boost::uuids::uuid real_registration_id) const
383 {
384  return protocol::codec::multimap_removeentrylistener_encode(
385  name_, real_registration_id);
386 }
387 
388 protocol::ClientMessage
389 MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_add_request(
390  bool local_only) const
391 {
392  return protocol::codec::multimap_addentrylistenertokey_encode(
393  name_, key_, include_value_, local_only);
394 }
395 
396 protocol::ClientMessage
397 MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_remove_request(
398  boost::uuids::uuid real_registration_id) const
399 {
400  return protocol::codec::multimap_removeentrylistener_encode(
401  name_, real_registration_id);
402 }
403 
404 MultiMapImpl::MultiMapEntryListenerToKeyCodec::MultiMapEntryListenerToKeyCodec(
405  std::string name,
406  bool include_value,
407  serialization::pimpl::data&& key)
408  : name_(std::move(name))
409  , include_value_(include_value)
410  , key_(std::move(key))
411 {}
412 
413 const std::shared_ptr<std::unordered_set<member>>
414  PNCounterImpl::EMPTY_ADDRESS_LIST(new std::unordered_set<member>());
415 
416 PNCounterImpl::PNCounterImpl(const std::string& service_name,
417  const std::string& object_name,
418  spi::ClientContext* context)
419  : ProxyImpl(service_name, object_name, context)
420  , max_configured_replica_count_(0)
421  , observed_clock_(
422  std::shared_ptr<impl::vector_clock>(new impl::vector_clock()))
423  , logger_(context->get_logger())
424 {}
425 
426 std::ostream&
427 operator<<(std::ostream& os, const PNCounterImpl& proxy)
428 {
429  os << "PNCounter{name='" << proxy.get_name() << "\'}";
430  return os;
431 }
432 
433 boost::future<int64_t>
434 PNCounterImpl::get()
435 {
436  boost::shared_ptr<member> target =
437  get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
438  if (!target) {
439  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
440  "ClientPNCounterProxy::get",
441  "Cannot invoke operations on a CRDT because the cluster does not "
442  "contain any data members"));
443  }
444  return invoke_get_internal(EMPTY_ADDRESS_LIST, nullptr, target);
445 }
446 
447 boost::future<int64_t>
448 PNCounterImpl::get_and_add(int64_t delta)
449 {
450  boost::shared_ptr<member> target =
451  get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
452  if (!target) {
453  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
454  "ClientPNCounterProxy::getAndAdd",
455  "Cannot invoke operations on a CRDT because the cluster does not "
456  "contain any data members"));
457  }
458  return invoke_add_internal(
459  delta, true, EMPTY_ADDRESS_LIST, nullptr, target);
460 }
461 
462 boost::future<int64_t>
463 PNCounterImpl::add_and_get(int64_t delta)
464 {
465  boost::shared_ptr<member> target =
466  get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
467  if (!target) {
468  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
469  "ClientPNCounterProxy::addAndGet",
470  "Cannot invoke operations on a CRDT because the cluster does not "
471  "contain any data members"));
472  }
473  return invoke_add_internal(
474  delta, false, EMPTY_ADDRESS_LIST, nullptr, target);
475 }
476 
477 boost::future<int64_t>
478 PNCounterImpl::get_and_subtract(int64_t delta)
479 {
480  boost::shared_ptr<member> target =
481  get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
482  if (!target) {
483  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
484  "ClientPNCounterProxy::getAndSubtract",
485  "Cannot invoke operations on a CRDT because the cluster does not "
486  "contain any data members"));
487  }
488  return invoke_add_internal(
489  -delta, true, EMPTY_ADDRESS_LIST, nullptr, target);
490 }
491 
492 boost::future<int64_t>
493 PNCounterImpl::subtract_and_get(int64_t delta)
494 {
495  boost::shared_ptr<member> target =
496  get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
497  if (!target) {
498  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
499  "ClientPNCounterProxy::subtractAndGet",
500  "Cannot invoke operations on a CRDT because the cluster does not "
501  "contain any data members"));
502  }
503  return invoke_add_internal(
504  -delta, false, EMPTY_ADDRESS_LIST, nullptr, target);
505 }
506 
507 boost::future<int64_t>
508 PNCounterImpl::decrement_and_get()
509 {
510  boost::shared_ptr<member> target =
511  get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
512  if (!target) {
513  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
514  "ClientPNCounterProxy::decrementAndGet",
515  "Cannot invoke operations on a CRDT because the cluster does not "
516  "contain any data members"));
517  }
518  return invoke_add_internal(-1, false, EMPTY_ADDRESS_LIST, nullptr, target);
519 }
520 
521 boost::future<int64_t>
522 PNCounterImpl::increment_and_get()
523 {
524  boost::shared_ptr<member> target =
525  get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
526  if (!target) {
527  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
528  "ClientPNCounterProxy::incrementAndGet",
529  "Cannot invoke operations on a CRDT because the cluster does not "
530  "contain any data members"));
531  }
532  return invoke_add_internal(1, false, EMPTY_ADDRESS_LIST, nullptr, target);
533 }
534 
535 boost::future<int64_t>
536 PNCounterImpl::get_and_decrement()
537 {
538  boost::shared_ptr<member> target =
539  get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
540  if (!target) {
541  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
542  "ClientPNCounterProxy::getAndDecrement",
543  "Cannot invoke operations on a CRDT because the cluster does not "
544  "contain any data members"));
545  }
546  return invoke_add_internal(-1, true, EMPTY_ADDRESS_LIST, nullptr, target);
547 }
548 
549 boost::future<int64_t>
550 PNCounterImpl::get_and_increment()
551 {
552  boost::shared_ptr<member> target =
553  get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
554  if (!target) {
555  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
556  "ClientPNCounterProxy::getAndIncrement",
557  "Cannot invoke operations on a CRDT because the cluster does not "
558  "contain any data members"));
559  }
560  return invoke_add_internal(1, true, EMPTY_ADDRESS_LIST, nullptr, target);
561 }
562 
563 boost::future<void>
564 PNCounterImpl::reset()
565 {
566  observed_clock_ =
567  std::shared_ptr<impl::vector_clock>(new impl::vector_clock());
568  return boost::make_ready_future();
569 }
570 
571 boost::shared_ptr<member>
572 PNCounterImpl::get_crdt_operation_target(
573  const std::unordered_set<member>& excluded_addresses)
574 {
575  auto replicaAddress = current_target_replica_address_.load();
576  if (replicaAddress &&
577  excluded_addresses.find(*replicaAddress) == excluded_addresses.end()) {
578  return replicaAddress;
579  }
580 
581  {
582  std::lock_guard<std::mutex> guard(target_selection_mutex_);
583  replicaAddress = current_target_replica_address_.load();
584  if (!replicaAddress || excluded_addresses.find(*replicaAddress) !=
585  excluded_addresses.end()) {
586  current_target_replica_address_ =
587  choose_target_replica(excluded_addresses);
588  }
589  }
590  return current_target_replica_address_;
591 }
592 
593 boost::shared_ptr<member>
594 PNCounterImpl::choose_target_replica(
595  const std::unordered_set<member>& excluded_addresses)
596 {
597  std::vector<member> replicaAddresses =
598  get_replica_addresses(excluded_addresses);
599  if (replicaAddresses.empty()) {
600  return nullptr;
601  }
602  // TODO: Use a random generator as used in Java (ThreadLocalRandomProvider)
603  // which is per thread
604  int randomReplicaIndex = std::abs(rand()) % (int)replicaAddresses.size();
605  return boost::make_shared<member>(replicaAddresses[randomReplicaIndex]);
606 }
607 
608 std::vector<member>
609 PNCounterImpl::get_replica_addresses(
610  const std::unordered_set<member>& excluded_members)
611 {
612  std::vector<member> dataMembers =
613  get_context().get_client_cluster_service().get_members(
614  *member_selectors::DATA_MEMBER_SELECTOR);
615  int32_t replicaCount = get_max_configured_replica_count();
616  int currentReplicaCount =
617  util::min<int>(replicaCount, (int)dataMembers.size());
618 
619  std::vector<member> replicaMembers;
620  for (int i = 0; i < currentReplicaCount; i++) {
621  if (excluded_members.find(dataMembers[i]) == excluded_members.end()) {
622  replicaMembers.push_back(dataMembers[i]);
623  }
624  }
625  return replicaMembers;
626 }
627 
628 int32_t
629 PNCounterImpl::get_max_configured_replica_count()
630 {
631  if (max_configured_replica_count_ > 0) {
632  return max_configured_replica_count_;
633  } else {
634  auto request =
635  protocol::codec::pncounter_getconfiguredreplicacount_encode(
636  get_name());
637  max_configured_replica_count_ =
638  invoke_and_get_future<int32_t>(request).get();
639  }
640  return max_configured_replica_count_;
641 }
642 
643 boost::shared_ptr<member>
644 PNCounterImpl::try_choose_a_new_target(
645  std::shared_ptr<std::unordered_set<member>> excluded_addresses,
646  boost::shared_ptr<member> last_target,
647  const exception::hazelcast_& last_exception)
648 {
649  HZ_LOG(
650  logger_,
651  finest,
652  boost::str(boost::format(
653  "Exception occurred while invoking operation on target %1%, "
654  "choosing different target. Cause: %2%") %
655  last_target % last_exception));
656  if (excluded_addresses == EMPTY_ADDRESS_LIST) {
657  // TODO: Make sure that this only affects the local variable of the
658  // method
659  excluded_addresses = std::make_shared<std::unordered_set<member>>();
660  }
661  excluded_addresses->insert(*last_target);
662  return get_crdt_operation_target(*excluded_addresses);
663 }
664 
665 boost::future<int64_t>
666 PNCounterImpl::invoke_get_internal(
667  std::shared_ptr<std::unordered_set<member>> excluded_addresses,
668  std::exception_ptr last_exception,
669  const boost::shared_ptr<member>& target)
670 {
671  if (!target) {
672  if (last_exception) {
673  std::rethrow_exception(last_exception);
674  } else {
675  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
676  "ClientPNCounterProxy::invokeGetInternal",
677  "Cannot invoke operations on a CRDT because the cluster does not "
678  "contain any data members"));
679  }
680  }
681  try {
682  auto timestamps = observed_clock_.get()->entry_set();
683  auto request = protocol::codec::pncounter_get_encode(
684  get_name(), timestamps, target->get_uuid());
685  return invoke_on_member(request, target->get_uuid())
686  .then(
687  boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
688  try {
689  return get_and_update_timestamps(std::move(f));
690  } catch (exception::hazelcast_& e) {
691  return invoke_get_internal(excluded_addresses,
692  std::current_exception(),
693  try_choose_a_new_target(
694  excluded_addresses, target, e))
695  .get();
696  }
697  });
698  } catch (exception::hazelcast_& e) {
699  return invoke_get_internal(
700  excluded_addresses,
701  std::current_exception(),
702  try_choose_a_new_target(excluded_addresses, target, e));
703  }
704 }
705 
706 boost::future<int64_t>
707 PNCounterImpl::invoke_add_internal(
708  int64_t delta,
709  bool getBeforeUpdate,
710  std::shared_ptr<std::unordered_set<member>> excluded_addresses,
711  std::exception_ptr last_exception,
712  const boost::shared_ptr<member>& target)
713 {
714  if (!target) {
715  if (last_exception) {
716  std::rethrow_exception(last_exception);
717  } else {
718  BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
719  "ClientPNCounterProxy::invokeGetInternal",
720  "Cannot invoke operations on a CRDT because the cluster does not "
721  "contain any data members"));
722  }
723  }
724 
725  try {
726  auto request = protocol::codec::pncounter_add_encode(
727  get_name(),
728  delta,
729  getBeforeUpdate,
730  observed_clock_.get()->entry_set(),
731  target->get_uuid());
732  return invoke_on_member(request, target->get_uuid())
733  .then(
734  boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
735  try {
736  return get_and_update_timestamps(std::move(f));
737  } catch (exception::hazelcast_& e) {
738  return invoke_add_internal(delta,
739  getBeforeUpdate,
740  excluded_addresses,
741  std::current_exception(),
742  try_choose_a_new_target(
743  excluded_addresses, target, e))
744  .get();
745  }
746  });
747  } catch (exception::hazelcast_& e) {
748  return invoke_add_internal(
749  delta,
750  getBeforeUpdate,
751  excluded_addresses,
752  std::current_exception(),
753  try_choose_a_new_target(excluded_addresses, target, e));
754  }
755 }
756 
757 int64_t
758 PNCounterImpl::get_and_update_timestamps(
759  boost::future<protocol::ClientMessage> f)
760 {
761  auto msg = f.get();
762  auto value = msg.get_first_fixed_sized_field<int64_t>();
763  // skip replica count
764  msg.get<int32_t>();
765  update_observed_replica_timestamps(
766  msg.get<impl::vector_clock::timestamp_vector>());
767  return value;
768 }
769 
770 void
771 PNCounterImpl::update_observed_replica_timestamps(
772  const impl::vector_clock::timestamp_vector& received_logical_timestamps)
773 {
774  std::shared_ptr<impl::vector_clock> received =
775  to_vector_clock(received_logical_timestamps);
776  for (;;) {
777  std::shared_ptr<impl::vector_clock> currentClock =
778  this->observed_clock_;
779  if (currentClock->is_after(*received)) {
780  break;
781  }
782  if (observed_clock_.compare_and_set(currentClock, received)) {
783  break;
784  }
785  }
786 }
787 
788 std::shared_ptr<impl::vector_clock>
789 PNCounterImpl::to_vector_clock(
790  const impl::vector_clock::timestamp_vector& replica_logical_timestamps)
791 {
792  return std::shared_ptr<impl::vector_clock>(
793  new impl::vector_clock(replica_logical_timestamps));
794 }
795 
796 boost::shared_ptr<member>
797 PNCounterImpl::get_current_target_replica_address()
798 {
799  return current_target_replica_address_.load();
800 }
801 
802 IListImpl::IListImpl(const std::string& instance_name,
803  spi::ClientContext* context)
804  : ProxyImpl("hz:impl:listService", instance_name, context)
805 {
806  serialization::pimpl::data key_data =
807  get_context().get_serialization_service().to_data<std::string>(
808  &instance_name);
809  partition_id_ = get_partition_id(key_data);
810 }
811 
812 boost::future<bool>
813 IListImpl::remove_item_listener(boost::uuids::uuid registration_id)
814 {
815  return get_context().get_client_listener_service().deregister_listener(
816  registration_id);
817 }
818 
819 boost::future<int>
820 IListImpl::size()
821 {
822  auto request = protocol::codec::list_size_encode(get_name());
823  return invoke_and_get_future<int>(request, partition_id_);
824 }
825 
826 boost::future<bool>
827 IListImpl::is_empty()
828 {
829  auto request = protocol::codec::list_isempty_encode(get_name());
830  return invoke_and_get_future<bool>(request, partition_id_);
831 }
832 
833 boost::future<bool>
834 IListImpl::contains(const serialization::pimpl::data& element)
835 {
836  auto request = protocol::codec::list_contains_encode(get_name(), element);
837  return invoke_and_get_future<bool>(request, partition_id_);
838 }
839 
840 boost::future<std::vector<serialization::pimpl::data>>
841 IListImpl::to_array_data()
842 {
843  auto request = protocol::codec::list_getall_encode(get_name());
844  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
845  request, partition_id_);
846 }
847 
848 boost::future<bool>
849 IListImpl::add(const serialization::pimpl::data& element)
850 {
851  auto request = protocol::codec::list_add_encode(get_name(), element);
852  return invoke_and_get_future<bool>(request, partition_id_);
853 }
854 
855 boost::future<bool>
856 IListImpl::remove(const serialization::pimpl::data& element)
857 {
858  auto request = protocol::codec::list_remove_encode(get_name(), element);
859  return invoke_and_get_future<bool>(request, partition_id_);
860 }
861 
862 boost::future<bool>
863 IListImpl::contains_all_data(
864  const std::vector<serialization::pimpl::data>& elements)
865 {
866  auto request =
867  protocol::codec::list_containsall_encode(get_name(), elements);
868  return invoke_and_get_future<bool>(request, partition_id_);
869 }
870 
871 boost::future<bool>
872 IListImpl::add_all_data(const std::vector<serialization::pimpl::data>& elements)
873 {
874  auto request = protocol::codec::list_addall_encode(get_name(), elements);
875  return invoke_and_get_future<bool>(request, partition_id_);
876 }
877 
878 boost::future<bool>
879 IListImpl::add_all_data(int index,
880  const std::vector<serialization::pimpl::data>& elements)
881 {
882  auto request =
883  protocol::codec::list_addallwithindex_encode(get_name(), index, elements);
884  return invoke_and_get_future<bool>(request, partition_id_);
885 }
886 
887 boost::future<bool>
888 IListImpl::remove_all_data(
889  const std::vector<serialization::pimpl::data>& elements)
890 {
891  auto request =
892  protocol::codec::list_compareandremoveall_encode(get_name(), elements);
893  return invoke_and_get_future<bool>(request, partition_id_);
894 }
895 
896 boost::future<bool>
897 IListImpl::retain_all_data(
898  const std::vector<serialization::pimpl::data>& elements)
899 {
900  auto request =
901  protocol::codec::list_compareandretainall_encode(get_name(), elements);
902  return invoke_and_get_future<bool>(request, partition_id_);
903 }
904 
905 boost::future<void>
906 IListImpl::clear()
907 {
908  auto request = protocol::codec::list_clear_encode(get_name());
909  return to_void_future(invoke_on_partition(request, partition_id_));
910 }
911 
912 boost::future<boost::optional<serialization::pimpl::data>>
913 IListImpl::get_data(int index)
914 {
915  auto request = protocol::codec::list_get_encode(get_name(), index);
916  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
917  request, partition_id_);
918 }
919 
920 boost::future<boost::optional<serialization::pimpl::data>>
921 IListImpl::set_data(int index, const serialization::pimpl::data& element)
922 {
923  auto request = protocol::codec::list_set_encode(get_name(), index, element);
924  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
925  request, partition_id_);
926 }
927 
928 boost::future<void>
929 IListImpl::add(int index, const serialization::pimpl::data& element)
930 {
931  auto request =
932  protocol::codec::list_addwithindex_encode(get_name(), index, element);
933  return to_void_future(invoke_on_partition(request, partition_id_));
934 }
935 
936 boost::future<boost::optional<serialization::pimpl::data>>
937 IListImpl::remove_data(int index)
938 {
939  auto request =
940  protocol::codec::list_removewithindex_encode(get_name(), index);
941  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
942  request, partition_id_);
943 }
944 
945 boost::future<int>
946 IListImpl::index_of(const serialization::pimpl::data& element)
947 {
948  auto request = protocol::codec::list_indexof_encode(get_name(), element);
949  return invoke_and_get_future<int>(request, partition_id_);
950 }
951 
952 boost::future<int>
953 IListImpl::last_index_of(const serialization::pimpl::data& element)
954 {
955  auto request =
956  protocol::codec::list_lastindexof_encode(get_name(), element);
957  return invoke_and_get_future<int>(request, partition_id_);
958 }
959 
960 boost::future<std::vector<serialization::pimpl::data>>
961 IListImpl::sub_list_data(int from_index, int to_index)
962 {
963  auto request =
964  protocol::codec::list_sub_encode(get_name(), from_index, to_index);
965  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
966  request, partition_id_);
967 }
968 
969 std::shared_ptr<spi::impl::ListenerMessageCodec>
970 IListImpl::create_item_listener_codec(bool include_value)
971 {
972  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
973  new ListListenerMessageCodec(get_name(), include_value));
974 }
975 
976 IListImpl::ListListenerMessageCodec::ListListenerMessageCodec(
977  std::string name,
978  bool include_value)
979  : name_(std::move(name))
980  , include_value_(include_value)
981 {}
982 
983 protocol::ClientMessage
984 IListImpl::ListListenerMessageCodec::encode_add_request(bool local_only) const
985 {
986  return protocol::codec::list_addlistener_encode(
987  name_, include_value_, local_only);
988 }
989 
990 protocol::ClientMessage
991 IListImpl::ListListenerMessageCodec::encode_remove_request(
992  boost::uuids::uuid real_registration_id) const
993 {
994  return protocol::codec::list_removelistener_encode(name_,
995  real_registration_id);
996 }
997 
998 flake_id_generator_impl::Block::Block(IdBatch&& id_batch,
999  std::chrono::milliseconds validity)
1000  : id_batch_(id_batch)
1001  , invalid_since_(std::chrono::steady_clock::now() + validity)
1002  , num_returned_(0)
1003 {}
1004 
1005 int64_t
1006 flake_id_generator_impl::Block::next()
1007 {
1008  if (invalid_since_ <= std::chrono::steady_clock::now()) {
1009  return INT64_MIN;
1010  }
1011  int32_t index;
1012  do {
1013  index = num_returned_;
1014  if (index == id_batch_.get_batch_size()) {
1015  return INT64_MIN;
1016  }
1017  } while (!num_returned_.compare_exchange_strong(index, index + 1));
1018 
1019  return id_batch_.get_base() + index * id_batch_.get_increment();
1020 }
1021 
1022 flake_id_generator_impl::IdBatch::IdIterator
1023  flake_id_generator_impl::IdBatch::endOfBatch;
1024 
1025 int64_t
1026 flake_id_generator_impl::IdBatch::get_base() const
1027 {
1028  return base_;
1029 }
1030 
1031 int64_t
1032 flake_id_generator_impl::IdBatch::get_increment() const
1033 {
1034  return increment_;
1035 }
1036 
1037 int32_t
1038 flake_id_generator_impl::IdBatch::get_batch_size() const
1039 {
1040  return batch_size_;
1041 }
1042 
1043 flake_id_generator_impl::IdBatch::IdBatch(int64_t base,
1044  int64_t increment,
1045  int32_t batch_size)
1046  : base_(base)
1047  , increment_(increment)
1048  , batch_size_(batch_size)
1049 {}
1050 
1051 flake_id_generator_impl::IdBatch::IdIterator&
1052 flake_id_generator_impl::IdBatch::end()
1053 {
1054  return endOfBatch;
1055 }
1056 
1057 flake_id_generator_impl::IdBatch::IdIterator
1058 flake_id_generator_impl::IdBatch::iterator()
1059 {
1060  return flake_id_generator_impl::IdBatch::IdIterator(
1061  base_, increment_, batch_size_);
1062 }
1063 
1064 flake_id_generator_impl::IdBatch::IdIterator::IdIterator(
1065  int64_t base2,
1066  const int64_t increment,
1067  int32_t remaining)
1068  : base2_(base2)
1069  , increment_(increment)
1070  , remaining_(remaining)
1071 {}
1072 
1073 bool
1074 flake_id_generator_impl::IdBatch::IdIterator::operator==(
1075  const flake_id_generator_impl::IdBatch::IdIterator& rhs) const
1076 {
1077  return base2_ == rhs.base2_ && increment_ == rhs.increment_ &&
1078  remaining_ == rhs.remaining_;
1079 }
1080 
1081 bool
1082 flake_id_generator_impl::IdBatch::IdIterator::operator!=(
1083  const flake_id_generator_impl::IdBatch::IdIterator& rhs) const
1084 {
1085  return !(rhs == *this);
1086 }
1087 
1088 flake_id_generator_impl::IdBatch::IdIterator::IdIterator()
1089  : base2_(-1)
1090  , increment_(-1)
1091  , remaining_(-1)
1092 {}
1093 
1094 flake_id_generator_impl::IdBatch::IdIterator&
1095 flake_id_generator_impl::IdBatch::IdIterator::operator++()
1096 {
1097  if (remaining_ == 0) {
1098  return flake_id_generator_impl::IdBatch::end();
1099  }
1100 
1101  --remaining_;
1102 
1103  base2_ += increment_;
1104 
1105  return *this;
1106 }
1107 
1108 flake_id_generator_impl::flake_id_generator_impl(
1109  const std::string& service_name,
1110  const std::string& object_name,
1111  spi::ClientContext* context)
1112  : ProxyImpl(service_name, object_name, context)
1113  , block_(nullptr)
1114 {
1115  auto config =
1116  context->get_client_config().find_flake_id_generator_config(object_name);
1117  batch_size_ = config->get_prefetch_count();
1118  validity_ = config->get_prefetch_validity_duration();
1119 }
1120 
1121 int64_t
1122 flake_id_generator_impl::new_id_internal()
1123 {
1124  auto b = block_.load();
1125  if (b) {
1126  int64_t res = b->next();
1127  if (res != INT64_MIN) {
1128  return res;
1129  }
1130  }
1131 
1132  throw std::overflow_error("");
1133 }
1134 
1135 boost::future<int64_t>
1136 flake_id_generator_impl::new_id()
1137 {
1138  try {
1139  return boost::make_ready_future(new_id_internal());
1140  } catch (std::overflow_error&) {
1141  return new_id_batch(batch_size_)
1142  .then(boost::launch::sync,
1143  [=](boost::future<flake_id_generator_impl::IdBatch> f) {
1144  auto newBlock =
1145  boost::make_shared<Block>(f.get(), validity_);
1146  auto value = newBlock->next();
1147  auto b = block_.load();
1148  block_.compare_exchange_strong(b, newBlock);
1149  return value;
1150  });
1151  }
1152 }
1153 
1154 boost::future<flake_id_generator_impl::IdBatch>
1155 flake_id_generator_impl::new_id_batch(int32_t size)
1156 {
1157  auto request =
1158  protocol::codec::flakeidgenerator_newidbatch_encode(get_name(), size);
1159  return invoke(request).then(
1160  boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
1161  auto msg = f.get();
1162  msg.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN);
1163 
1164  auto base = msg.get<int64_t>();
1165  auto increment = msg.get<int64_t>();
1166  auto batch_size = msg.get<int32_t>();
1167  return flake_id_generator_impl::IdBatch(base, increment, batch_size);
1168  });
1169 }
1170 
1171 IQueueImpl::IQueueImpl(const std::string& instance_name,
1172  spi::ClientContext* context)
1173  : ProxyImpl("hz:impl:queueService", instance_name, context)
1174 {
1175  serialization::pimpl::data data =
1176  get_context().get_serialization_service().to_data<std::string>(
1177  &instance_name);
1178  partition_id_ = get_partition_id(data);
1179 }
1180 
1181 boost::future<bool>
1182 IQueueImpl::remove_item_listener(boost::uuids::uuid registration_id)
1183 {
1184  return get_context().get_client_listener_service().deregister_listener(
1185  registration_id);
1186 }
1187 
1188 boost::future<bool>
1189 IQueueImpl::offer(const serialization::pimpl::data& element,
1190  std::chrono::milliseconds timeout)
1191 {
1192  auto request = protocol::codec::queue_offer_encode(
1193  get_name(),
1194  element,
1195  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1196  return invoke_and_get_future<bool>(request, partition_id_);
1197 }
1198 
1199 boost::future<void>
1200 IQueueImpl::put(const serialization::pimpl::data& element)
1201 {
1202  auto request = protocol::codec::queue_put_encode(get_name(), element);
1203  return to_void_future(invoke_on_partition(request, partition_id_));
1204 }
1205 
1206 boost::future<boost::optional<serialization::pimpl::data>>
1207 IQueueImpl::poll_data(std::chrono::milliseconds timeout)
1208 {
1209  auto request = protocol::codec::queue_poll_encode(
1210  get_name(),
1211  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1212  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1213  request, partition_id_);
1214 }
1215 
1216 boost::future<int>
1217 IQueueImpl::remaining_capacity()
1218 {
1219  auto request = protocol::codec::queue_remainingcapacity_encode(get_name());
1220  return invoke_and_get_future<int>(request, partition_id_);
1221 }
1222 
1223 boost::future<bool>
1224 IQueueImpl::remove(const serialization::pimpl::data& element)
1225 {
1226  auto request = protocol::codec::queue_remove_encode(get_name(), element);
1227  return invoke_and_get_future<bool>(request, partition_id_);
1228 }
1229 
1230 boost::future<bool>
1231 IQueueImpl::contains(const serialization::pimpl::data& element)
1232 {
1233  auto request = protocol::codec::queue_contains_encode(get_name(), element);
1234  return invoke_and_get_future<bool>(request, partition_id_);
1235 }
1236 
1237 boost::future<std::vector<serialization::pimpl::data>>
1238 IQueueImpl::drain_to_data(size_t max_elements)
1239 {
1240  auto request = protocol::codec::queue_draintomaxsize_encode(
1241  get_name(), (int32_t)max_elements);
1242 
1243  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1244  request, partition_id_);
1245 }
1246 
1247 boost::future<std::vector<serialization::pimpl::data>>
1248 IQueueImpl::drain_to_data()
1249 {
1250  auto request = protocol::codec::queue_drainto_encode(get_name());
1251  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1252  request, partition_id_);
1253 }
1254 
1255 boost::future<boost::optional<serialization::pimpl::data>>
1256 IQueueImpl::take_data()
1257 {
1258  auto request = protocol::codec::queue_take_encode(get_name());
1259  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1260  request, partition_id_);
1261 }
1262 
1263 boost::future<boost::optional<serialization::pimpl::data>>
1264 IQueueImpl::peek_data()
1265 {
1266  auto request = protocol::codec::queue_peek_encode(get_name());
1267  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1268  request, partition_id_);
1269 }
1270 
1271 boost::future<int>
1272 IQueueImpl::size()
1273 {
1274  auto request = protocol::codec::queue_size_encode(get_name());
1275  return invoke_and_get_future<int>(request, partition_id_);
1276 }
1277 
1278 boost::future<bool>
1279 IQueueImpl::is_empty()
1280 {
1281  auto request = protocol::codec::queue_isempty_encode(get_name());
1282  return invoke_and_get_future<bool>(request, partition_id_);
1283 }
1284 
1285 boost::future<std::vector<serialization::pimpl::data>>
1286 IQueueImpl::to_array_data()
1287 {
1288  auto request = protocol::codec::queue_iterator_encode(get_name());
1289  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1290  request, partition_id_);
1291 }
1292 
1293 boost::future<bool>
1294 IQueueImpl::contains_all_data(
1295  const std::vector<serialization::pimpl::data>& elements)
1296 {
1297  auto request =
1298  protocol::codec::queue_containsall_encode(get_name(), elements);
1299  return invoke_and_get_future<bool>(request, partition_id_);
1300 }
1301 
1302 boost::future<bool>
1303 IQueueImpl::add_all_data(
1304  const std::vector<serialization::pimpl::data>& elements)
1305 {
1306  auto request = protocol::codec::queue_addall_encode(get_name(), elements);
1307  return invoke_and_get_future<bool>(request, partition_id_);
1308 }
1309 
1310 boost::future<bool>
1311 IQueueImpl::remove_all_data(
1312  const std::vector<serialization::pimpl::data>& elements)
1313 {
1314  auto request =
1315  protocol::codec::queue_compareandremoveall_encode(get_name(), elements);
1316  return invoke_and_get_future<bool>(request, partition_id_);
1317 }
1318 
1319 boost::future<bool>
1320 IQueueImpl::retain_all_data(
1321  const std::vector<serialization::pimpl::data>& elements)
1322 {
1323  auto request =
1324  protocol::codec::queue_compareandretainall_encode(get_name(), elements);
1325  return invoke_and_get_future<bool>(request, partition_id_);
1326 }
1327 
1328 boost::future<void>
1329 IQueueImpl::clear()
1330 {
1331  auto request = protocol::codec::queue_clear_encode(get_name());
1332  return to_void_future(invoke_on_partition(request, partition_id_));
1333 }
1334 
1335 std::shared_ptr<spi::impl::ListenerMessageCodec>
1336 IQueueImpl::create_item_listener_codec(bool include_value)
1337 {
1338  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1339  new QueueListenerMessageCodec(get_name(), include_value));
1340 }
1341 
1342 IQueueImpl::QueueListenerMessageCodec::QueueListenerMessageCodec(
1343  std::string name,
1344  bool include_value)
1345  : name_(std::move(name))
1346  , include_value_(include_value)
1347 {}
1348 
1349 protocol::ClientMessage
1350 IQueueImpl::QueueListenerMessageCodec::encode_add_request(bool local_only) const
1351 {
1352  return protocol::codec::queue_addlistener_encode(
1353  name_, include_value_, local_only);
1354 }
1355 
1356 protocol::ClientMessage
1357 IQueueImpl::QueueListenerMessageCodec::encode_remove_request(
1358  boost::uuids::uuid real_registration_id) const
1359 {
1360  return protocol::codec::queue_removelistener_encode(name_,
1361  real_registration_id);
1362 }
1363 
1364 ProxyImpl::ProxyImpl(const std::string& service_name,
1365  const std::string& object_name,
1366  spi::ClientContext* context)
1367  : ClientProxy(object_name, service_name, *context)
1368  , SerializingProxy(*context, object_name)
1369 {}
1370 
1371 ProxyImpl::~ProxyImpl() = default;
1372 
1373 SerializingProxy::SerializingProxy(spi::ClientContext& context,
1374  const std::string& object_name)
1375  : serialization_service_(context.get_serialization_service())
1376  , partition_service_(context.get_partition_service())
1377  , object_name_(object_name)
1378  , client_context_(context)
1379 {}
1380 
1381 int
1382 SerializingProxy::get_partition_id(const serialization::pimpl::data& key)
1383 {
1384  return partition_service_.get_partition_id(key);
1385 }
1386 
1387 boost::future<protocol::ClientMessage>
1388 SerializingProxy::invoke_on_partition(protocol::ClientMessage& request,
1389  int partition_id)
1390 {
1391  try {
1392  return spi::impl::ClientInvocation::create(
1393  client_context_,
1394  std::make_shared<protocol::ClientMessage>(std::move(request)),
1395  object_name_,
1396  partition_id)
1397  ->invoke();
1398  } catch (exception::iexception&) {
1399  util::exception_util::rethrow(std::current_exception());
1400  return boost::make_ready_future(protocol::ClientMessage(0));
1401  }
1402 }
1403 
1404 boost::future<protocol::ClientMessage>
1405 SerializingProxy::invoke(protocol::ClientMessage& request)
1406 {
1407  try {
1408  return spi::impl::ClientInvocation::create(
1409  client_context_,
1410  std::make_shared<protocol::ClientMessage>(std::move(request)),
1411  object_name_)
1412  ->invoke();
1413  } catch (exception::iexception&) {
1414  util::exception_util::rethrow(std::current_exception());
1415  return boost::make_ready_future(protocol::ClientMessage(0));
1416  }
1417 }
1418 
1419 boost::future<protocol::ClientMessage>
1420 SerializingProxy::invoke_on_connection(
1421  protocol::ClientMessage& request,
1422  std::shared_ptr<connection::Connection> connection)
1423 {
1424  try {
1425  return spi::impl::ClientInvocation::create(
1426  client_context_,
1427  std::make_shared<protocol::ClientMessage>(std::move(request)),
1428  object_name_,
1429  connection)
1430  ->invoke();
1431  } catch (exception::iexception&) {
1432  util::exception_util::rethrow(std::current_exception());
1433  return boost::make_ready_future(protocol::ClientMessage(0));
1434  }
1435 }
1436 
1437 boost::future<protocol::ClientMessage>
1438 SerializingProxy::invoke_on_key_owner(
1439  protocol::ClientMessage& request,
1440  const serialization::pimpl::data& key_data)
1441 {
1442  try {
1443  return invoke_on_partition(request, get_partition_id(key_data));
1444  } catch (exception::iexception&) {
1445  util::exception_util::rethrow(std::current_exception());
1446  return boost::make_ready_future(protocol::ClientMessage(0));
1447  }
1448 }
1449 
1450 boost::future<protocol::ClientMessage>
1451 SerializingProxy::invoke_on_member(protocol::ClientMessage& request,
1452  boost::uuids::uuid uuid)
1453 {
1454  try {
1455  auto invocation = spi::impl::ClientInvocation::create(
1456  client_context_,
1457  std::make_shared<protocol::ClientMessage>(std::move(request)),
1458  object_name_,
1459  uuid);
1460  return invocation->invoke();
1461  } catch (exception::iexception&) {
1462  util::exception_util::rethrow(std::current_exception());
1463  return boost::make_ready_future(protocol::ClientMessage(0));
1464  }
1465 }
1466 
1467 template<>
1468 boost::future<boost::optional<serialization::pimpl::data>>
1469 SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request)
1470 {
1471  return decode_optional_var_sized<serialization::pimpl::data>(
1472  invoke(request));
1473 }
1474 
1475 template<>
1476 boost::future<boost::optional<map::data_entry_view>>
1477 SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1478  const serialization::pimpl::data& key)
1479 {
1480  return decode_optional_var_sized<map::data_entry_view>(
1481  invoke_on_key_owner(request, key));
1482 }
1483 
1484 template<>
1485 boost::future<boost::optional<serialization::pimpl::data>>
1486 SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1487  int partition_id)
1488 {
1489  return decode_optional_var_sized<serialization::pimpl::data>(
1490  invoke_on_partition(request, partition_id));
1491 }
1492 
1493 template<>
1494 boost::future<boost::optional<serialization::pimpl::data>>
1495 SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1496  const serialization::pimpl::data& key)
1497 {
1498  return decode_optional_var_sized<serialization::pimpl::data>(
1499  invoke_on_key_owner(request, key));
1500 }
1501 
1502 PartitionSpecificClientProxy::PartitionSpecificClientProxy(
1503  const std::string& service_name,
1504  const std::string& object_name,
1505  spi::ClientContext* context)
1506  : ProxyImpl(service_name, object_name, context)
1507  , partition_id_(-1)
1508 {}
1509 
1510 void
1511 PartitionSpecificClientProxy::on_initialize()
1512 {
1513  std::string partitionKey = internal::partition::strategy::
1514  StringPartitioningStrategy::get_partition_key(name_);
1515  partition_id_ = get_context().get_partition_service().get_partition_id(
1516  to_data<std::string>(partitionKey));
1517 }
1518 
1519 IMapImpl::IMapImpl(const std::string& instance_name,
1520  spi::ClientContext* context)
1521  : ProxyImpl("hz:impl:mapService", instance_name, context)
1522 {}
1523 
1524 boost::future<bool>
1525 IMapImpl::contains_key(const serialization::pimpl::data& key)
1526 {
1527  auto request = protocol::codec::map_containskey_encode(
1528  get_name(), key, util::get_current_thread_id());
1529  return invoke_and_get_future<bool>(request, key);
1530 }
1531 
1532 boost::future<bool>
1533 IMapImpl::contains_value(const serialization::pimpl::data& value)
1534 {
1535  auto request = protocol::codec::map_containsvalue_encode(get_name(), value);
1536  return invoke_and_get_future<bool>(request);
1537 }
1538 
1539 boost::future<boost::optional<serialization::pimpl::data>>
1540 IMapImpl::get_data(const serialization::pimpl::data& key)
1541 {
1542  auto request = protocol::codec::map_get_encode(
1543  get_name(), key, util::get_current_thread_id());
1544  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1545  request, key);
1546 }
1547 
1548 boost::future<boost::optional<serialization::pimpl::data>>
1549 IMapImpl::remove_data(const serialization::pimpl::data& key)
1550 {
1551  auto request = protocol::codec::map_remove_encode(
1552  get_name(), key, util::get_current_thread_id());
1553  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1554  request, key);
1555 }
1556 
1557 boost::future<bool>
1558 IMapImpl::remove(const serialization::pimpl::data& key,
1559  const serialization::pimpl::data& value)
1560 {
1561  auto request = protocol::codec::map_removeifsame_encode(
1562  get_name(), key, value, util::get_current_thread_id());
1563  return invoke_and_get_future<bool>(request, key);
1564 }
1565 
1566 boost::future<protocol::ClientMessage>
1567 IMapImpl::remove_all(const serialization::pimpl::data& predicate_data)
1568 {
1569  auto request =
1570  protocol::codec::map_removeall_encode(get_name(), predicate_data);
1571  return invoke(request);
1572 }
1573 
1574 boost::future<protocol::ClientMessage>
1575 IMapImpl::delete_entry(const serialization::pimpl::data& key)
1576 {
1577  auto request = protocol::codec::map_delete_encode(
1578  get_name(), key, util::get_current_thread_id());
1579  return invoke_on_partition(request, get_partition_id(key));
1580 }
1581 
1582 boost::future<protocol::ClientMessage>
1583 IMapImpl::flush()
1584 {
1585  auto request = protocol::codec::map_flush_encode(get_name());
1586  return invoke(request);
1587 }
1588 
1589 boost::future<bool>
1590 IMapImpl::try_remove(const serialization::pimpl::data& key,
1591  std::chrono::milliseconds timeout)
1592 {
1593  auto request = protocol::codec::map_tryremove_encode(
1594  get_name(),
1595  key,
1596  util::get_current_thread_id(),
1597  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1598 
1599  return invoke_and_get_future<bool>(request, key);
1600 }
1601 
1602 boost::future<bool>
1603 IMapImpl::try_put(const serialization::pimpl::data& key,
1604  const serialization::pimpl::data& value,
1605  std::chrono::milliseconds timeout)
1606 {
1607  auto request = protocol::codec::map_tryput_encode(
1608  get_name(),
1609  key,
1610  value,
1611  util::get_current_thread_id(),
1612  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1613 
1614  return invoke_and_get_future<bool>(request, key);
1615 }
1616 
1617 boost::future<boost::optional<serialization::pimpl::data>>
1618 IMapImpl::put_data(const serialization::pimpl::data& key,
1619  const serialization::pimpl::data& value,
1620  std::chrono::milliseconds ttl)
1621 {
1622  auto request = protocol::codec::map_put_encode(
1623  get_name(),
1624  key,
1625  value,
1626  util::get_current_thread_id(),
1627  std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1628  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1629  request, key);
1630 }
1631 
1632 boost::future<protocol::ClientMessage>
1633 IMapImpl::put_transient(const serialization::pimpl::data& key,
1634  const serialization::pimpl::data& value,
1635  std::chrono::milliseconds ttl)
1636 {
1637  auto request = protocol::codec::map_puttransient_encode(
1638  get_name(),
1639  key,
1640  value,
1641  util::get_current_thread_id(),
1642  std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1643  return invoke_on_partition(request, get_partition_id(key));
1644 }
1645 
1646 boost::future<boost::optional<serialization::pimpl::data>>
1647 IMapImpl::put_if_absent_data(const serialization::pimpl::data& key,
1648  const serialization::pimpl::data& value,
1649  std::chrono::milliseconds ttl)
1650 {
1651  auto request = protocol::codec::map_putifabsent_encode(
1652  get_name(),
1653  key,
1654  value,
1655  util::get_current_thread_id(),
1656  std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1657  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1658  request, key);
1659 }
1660 
1661 boost::future<bool>
1662 IMapImpl::replace(const serialization::pimpl::data& key,
1663  const serialization::pimpl::data& old_value,
1664  const serialization::pimpl::data& new_value)
1665 {
1666  auto request = protocol::codec::map_replaceifsame_encode(
1667  get_name(), key, old_value, new_value, util::get_current_thread_id());
1668 
1669  return invoke_and_get_future<bool>(request, key);
1670 }
1671 
1672 boost::future<boost::optional<serialization::pimpl::data>>
1673 IMapImpl::replace_data(const serialization::pimpl::data& key,
1674  const serialization::pimpl::data& value)
1675 {
1676  auto request = protocol::codec::map_replace_encode(
1677  get_name(), key, value, util::get_current_thread_id());
1678 
1679  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1680  request, key);
1681 }
1682 
1683 boost::future<protocol::ClientMessage>
1684 IMapImpl::set(const serialization::pimpl::data& key,
1685  const serialization::pimpl::data& value,
1686  std::chrono::milliseconds ttl)
1687 {
1688  auto request = protocol::codec::map_set_encode(
1689  get_name(),
1690  key,
1691  value,
1692  util::get_current_thread_id(),
1693  std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1694  return invoke_on_partition(request, get_partition_id(key));
1695 }
1696 
1697 boost::future<protocol::ClientMessage>
1698 IMapImpl::lock(const serialization::pimpl::data& key)
1699 {
1700  return lock(key, std::chrono::milliseconds(-1));
1701 }
1702 
1703 boost::future<protocol::ClientMessage>
1704 IMapImpl::lock(const serialization::pimpl::data& key,
1705  std::chrono::milliseconds lease_time)
1706 {
1707  auto request = protocol::codec::map_lock_encode(
1708  get_name(),
1709  key,
1710  util::get_current_thread_id(),
1711  std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1712  lock_reference_id_generator_->get_next_reference_id());
1713  return invoke_on_partition(request, get_partition_id(key));
1714 }
1715 
1716 boost::future<bool>
1717 IMapImpl::is_locked(const serialization::pimpl::data& key)
1718 {
1719  auto request = protocol::codec::map_islocked_encode(get_name(), key);
1720 
1721  return invoke_and_get_future<bool>(request, key);
1722 }
1723 
1724 boost::future<bool>
1725 IMapImpl::try_lock(const serialization::pimpl::data& key,
1726  std::chrono::milliseconds timeout)
1727 {
1728  return try_lock(key, timeout, std::chrono::milliseconds(-1));
1729 }
1730 
1731 boost::future<bool>
1732 IMapImpl::try_lock(const serialization::pimpl::data& key,
1733  std::chrono::milliseconds timeout,
1734  std::chrono::milliseconds lease_time)
1735 {
1736  auto request = protocol::codec::map_trylock_encode(
1737  get_name(),
1738  key,
1739  util::get_current_thread_id(),
1740  std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1741  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
1742  lock_reference_id_generator_->get_next_reference_id());
1743  return invoke_and_get_future<bool>(request, key);
1744 }
1745 
1746 boost::future<protocol::ClientMessage>
1747 IMapImpl::unlock(const serialization::pimpl::data& key)
1748 {
1749  auto request = protocol::codec::map_unlock_encode(
1750  get_name(),
1751  key,
1752  util::get_current_thread_id(),
1753  lock_reference_id_generator_->get_next_reference_id());
1754  return invoke_on_partition(request, get_partition_id(key));
1755 }
1756 
1757 boost::future<protocol::ClientMessage>
1758 IMapImpl::force_unlock(const serialization::pimpl::data& key)
1759 {
1760  auto request = protocol::codec::map_forceunlock_encode(
1761  get_name(), key, lock_reference_id_generator_->get_next_reference_id());
1762  return invoke_on_partition(request, get_partition_id(key));
1763 }
1764 
1765 boost::future<std::string>
1766 IMapImpl::add_interceptor(const serialization::pimpl::data& interceptor)
1767 {
1768  auto request =
1769  protocol::codec::map_addinterceptor_encode(get_name(), interceptor);
1770  return invoke_and_get_future<std::string>(request);
1771 }
1772 
1773 // TODO: We can use generic template Listener instead of impl::BaseEventHandler
1774 // to prevent the virtual function calls
1775 boost::future<boost::uuids::uuid>
1776 IMapImpl::add_entry_listener(
1777  std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1778  bool include_value,
1779  int32_t listener_flags)
1780 {
1781  return register_listener(
1782  create_map_entry_listener_codec(include_value, listener_flags),
1783  std::move(entry_event_handler));
1784 }
1785 
1786 boost::future<boost::uuids::uuid>
1787 IMapImpl::add_entry_listener(
1788  std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1789  serialization::pimpl::data&& predicate,
1790  bool include_value,
1791  int32_t listener_flags)
1792 {
1793  return register_listener(
1794  create_map_entry_listener_codec(
1795  include_value, std::move(predicate), listener_flags),
1796  std::move(entry_event_handler));
1797 }
1798 
1799 boost::future<bool>
1800 IMapImpl::remove_entry_listener(boost::uuids::uuid registration_id)
1801 {
1802  return get_context().get_client_listener_service().deregister_listener(
1803  registration_id);
1804 }
1805 
1806 boost::future<boost::uuids::uuid>
1807 IMapImpl::add_entry_listener(
1808  std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1809  bool include_value,
1810  serialization::pimpl::data&& key,
1811  int32_t listener_flags)
1812 {
1813  return register_listener(create_map_entry_listener_codec(
1814  include_value, listener_flags, std::move(key)),
1815  std::move(entry_event_handler));
1816 }
1817 
1818 boost::future<boost::optional<map::data_entry_view>>
1819 IMapImpl::get_entry_view_data(const serialization::pimpl::data& key)
1820 {
1821  auto request = protocol::codec::map_getentryview_encode(
1822  get_name(), key, util::get_current_thread_id());
1823  return invoke_and_get_future<boost::optional<map::data_entry_view>>(request,
1824  key);
1825 }
1826 
1827 boost::future<bool>
1828 IMapImpl::evict(const serialization::pimpl::data& key)
1829 {
1830  auto request = protocol::codec::map_evict_encode(
1831  get_name(), key, util::get_current_thread_id());
1832  return invoke_and_get_future<bool>(request, key);
1833 }
1834 
1835 boost::future<protocol::ClientMessage>
1836 IMapImpl::evict_all()
1837 {
1838  auto request = protocol::codec::map_evictall_encode(get_name());
1839  return invoke(request);
1840 }
1841 
1842 boost::future<EntryVector>
1843 IMapImpl::get_all_data(int partition_id,
1844  const std::vector<serialization::pimpl::data>& keys)
1845 {
1846  auto request = protocol::codec::map_getall_encode(get_name(), keys);
1847  return invoke_and_get_future<EntryVector>(request, partition_id);
1848 }
1849 
1850 boost::future<std::vector<serialization::pimpl::data>>
1851 IMapImpl::key_set_data()
1852 {
1853  auto request = protocol::codec::map_keyset_encode(get_name());
1854  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1855  request);
1856 }
1857 
1858 boost::future<std::vector<serialization::pimpl::data>>
1859 IMapImpl::key_set_data(const serialization::pimpl::data& predicate)
1860 {
1861  auto request =
1862  protocol::codec::map_keysetwithpredicate_encode(get_name(), predicate);
1863  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1864  request);
1865 }
1866 
1867 boost::future<
1868  std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
1869 IMapImpl::key_set_for_paging_predicate_data(
1870  protocol::codec::holder::paging_predicate_holder const& predicate)
1871 {
1872  auto request = protocol::codec::map_keysetwithpagingpredicate_encode(
1873  get_name(), predicate);
1874  return invoke(request).then(
1875  boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1876  return get_paging_predicate_response<
1877  std::vector<serialization::pimpl::data>>(std::move(f));
1878  });
1879 }
1880 
1881 boost::future<EntryVector>
1882 IMapImpl::entry_set_data()
1883 {
1884  auto request = protocol::codec::map_entryset_encode(get_name());
1885  return invoke_and_get_future<EntryVector>(request);
1886 }
1887 
1888 boost::future<EntryVector>
1889 IMapImpl::entry_set_data(const serialization::pimpl::data& predicate)
1890 {
1891  auto request =
1892  protocol::codec::map_entrieswithpredicate_encode(get_name(), predicate);
1893  return invoke_and_get_future<EntryVector>(request);
1894 }
1895 
1896 boost::future<std::pair<EntryVector, query::anchor_data_list>>
1897 IMapImpl::entry_set_for_paging_predicate_data(
1898  protocol::codec::holder::paging_predicate_holder const& predicate)
1899 {
1900  auto request = protocol::codec::map_entrieswithpagingpredicate_encode(
1901  get_name(), predicate);
1902  return invoke(request).then(
1903  boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1904  return get_paging_predicate_response<EntryVector>(std::move(f));
1905  });
1906 }
1907 
1908 boost::future<std::vector<serialization::pimpl::data>>
1909 IMapImpl::values_data()
1910 {
1911  auto request = protocol::codec::map_values_encode(get_name());
1912  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1913  request);
1914 }
1915 
1916 boost::future<std::vector<serialization::pimpl::data>>
1917 IMapImpl::values_data(const serialization::pimpl::data& predicate)
1918 {
1919  auto request =
1920  protocol::codec::map_valueswithpredicate_encode(get_name(), predicate);
1921  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1922  request);
1923 }
1924 
1925 boost::future<
1926  std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
1927 IMapImpl::values_for_paging_predicate_data(
1928  protocol::codec::holder::paging_predicate_holder const& predicate)
1929 {
1930  auto request = protocol::codec::map_valueswithpagingpredicate_encode(
1931  get_name(), predicate);
1932  return invoke(request).then(
1933  boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1934  return get_paging_predicate_response<
1935  std::vector<serialization::pimpl::data>>(std::move(f));
1936  });
1937 }
1938 
1939 boost::future<protocol::ClientMessage>
1940 IMapImpl::add_index_data(const config::index_config& config)
1941 {
1942  auto request = protocol::codec::map_addindex_encode(get_name(), config);
1943  return invoke(request);
1944 }
1945 
1946 boost::future<int>
1947 IMapImpl::size()
1948 {
1949  auto request = protocol::codec::map_size_encode(get_name());
1950  return invoke_and_get_future<int>(request);
1951 }
1952 
1953 boost::future<bool>
1954 IMapImpl::is_empty()
1955 {
1956  auto request = protocol::codec::map_isempty_encode(get_name());
1957  return invoke_and_get_future<bool>(request);
1958 }
1959 
1960 boost::future<protocol::ClientMessage>
1961 IMapImpl::put_all_data(int partition_id, const EntryVector& entries)
1962 {
1963  auto request =
1964  protocol::codec::map_putall_encode(get_name(), entries, true);
1965  return invoke_on_partition(request, partition_id);
1966 }
1967 
1968 boost::future<protocol::ClientMessage>
1969 IMapImpl::clear_data()
1970 {
1971  auto request = protocol::codec::map_clear_encode(get_name());
1972  return invoke(request);
1973 }
1974 
1975 boost::future<boost::optional<serialization::pimpl::data>>
1976 IMapImpl::execute_on_key_data(const serialization::pimpl::data& key,
1977  const serialization::pimpl::data& processor)
1978 {
1979  auto request = protocol::codec::map_executeonkey_encode(
1980  get_name(), processor, key, util::get_current_thread_id());
1981  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1982  request, get_partition_id(key));
1983 }
1984 
1985 boost::future<boost::optional<serialization::pimpl::data>>
1986 IMapImpl::submit_to_key_data(const serialization::pimpl::data& key,
1987  const serialization::pimpl::data& processor)
1988 {
1989  auto request = protocol::codec::map_submittokey_encode(
1990  get_name(), processor, key, util::get_current_thread_id());
1991  return invoke_on_partition(request, get_partition_id(key))
1992  .then(boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
1993  auto msg = f.get();
1994  msg.skip_frame();
1995  return msg.get_nullable<serialization::pimpl::data>();
1996  });
1997 }
1998 
1999 boost::future<EntryVector>
2000 IMapImpl::execute_on_keys_data(
2001  const std::vector<serialization::pimpl::data>& keys,
2002  const serialization::pimpl::data& processor)
2003 {
2004  auto request =
2005  protocol::codec::map_executeonkeys_encode(get_name(), processor, keys);
2006  return invoke_and_get_future<EntryVector>(request);
2007 }
2008 
2009 boost::future<protocol::ClientMessage>
2010 IMapImpl::remove_interceptor(const std::string& id)
2011 {
2012  auto request =
2013  protocol::codec::map_removeinterceptor_encode(get_name(), id);
2014  return invoke(request);
2015 }
2016 
2017 boost::future<EntryVector>
2018 IMapImpl::execute_on_entries_data(
2019  const serialization::pimpl::data& entry_processor)
2020 {
2021  auto request =
2022  protocol::codec::map_executeonallkeys_encode(get_name(), entry_processor);
2023  return invoke_and_get_future<EntryVector>(request);
2024 }
2025 
2026 boost::future<EntryVector>
2027 IMapImpl::execute_on_entries_data(
2028  const serialization::pimpl::data& entry_processor,
2029  const serialization::pimpl::data& predicate)
2030 {
2031  auto request = protocol::codec::map_executewithpredicate_encode(
2032  get_name(), entry_processor, predicate);
2033  return invoke_and_get_future<EntryVector>(request);
2034 }
2035 
2036 std::shared_ptr<spi::impl::ListenerMessageCodec>
2037 IMapImpl::create_map_entry_listener_codec(
2038  bool include_value,
2039  serialization::pimpl::data&& predicate,
2040  int32_t listener_flags)
2041 {
2042  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2043  new MapEntryListenerWithPredicateMessageCodec(
2044  get_name(), include_value, listener_flags, std::move(predicate)));
2045 }
2046 
2047 std::shared_ptr<spi::impl::ListenerMessageCodec>
2048 IMapImpl::create_map_entry_listener_codec(bool include_value,
2049  int32_t listener_flags)
2050 {
2051  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2052  new MapEntryListenerMessageCodec(
2053  get_name(), include_value, listener_flags));
2054 }
2055 
2056 std::shared_ptr<spi::impl::ListenerMessageCodec>
2057 IMapImpl::create_map_entry_listener_codec(bool include_value,
2058  int32_t listener_flags,
2059  serialization::pimpl::data&& key)
2060 {
2061  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2062  new MapEntryListenerToKeyCodec(
2063  get_name(), include_value, listener_flags, std::move(key)));
2064 }
2065 
2066 void
2067 IMapImpl::on_initialize()
2068 {
2069  ProxyImpl::on_initialize();
2070  lock_reference_id_generator_ =
2071  get_context().get_lock_reference_id_generator();
2072 }
2073 
2074 IMapImpl::MapEntryListenerMessageCodec::MapEntryListenerMessageCodec(
2075  std::string name,
2076  bool include_value,
2077  int32_t listener_flags)
2078  : name_(std::move(name))
2079  , include_value_(include_value)
2080  , listener_flags_(listener_flags)
2081 {}
2082 
2083 protocol::ClientMessage
2084 IMapImpl::MapEntryListenerMessageCodec::encode_add_request(
2085  bool local_only) const
2086 {
2087  return protocol::codec::map_addentrylistener_encode(
2088  name_, include_value_, static_cast<int32_t>(listener_flags_), local_only);
2089 }
2090 
2091 protocol::ClientMessage
2092 IMapImpl::MapEntryListenerMessageCodec::encode_remove_request(
2093  boost::uuids::uuid real_registration_id) const
2094 {
2095  return protocol::codec::map_removeentrylistener_encode(
2096  name_, real_registration_id);
2097 }
2098 
2099 protocol::ClientMessage
2100 IMapImpl::MapEntryListenerToKeyCodec::encode_add_request(bool local_only) const
2101 {
2102  return protocol::codec::map_addentrylistenertokey_encode(
2103  name_,
2104  key_,
2105  include_value_,
2106  static_cast<int32_t>(listener_flags_),
2107  local_only);
2108 }
2109 
2110 protocol::ClientMessage
2111 IMapImpl::MapEntryListenerToKeyCodec::encode_remove_request(
2112  boost::uuids::uuid real_registration_id) const
2113 {
2114  return protocol::codec::map_removeentrylistener_encode(
2115  name_, real_registration_id);
2116 }
2117 
2118 IMapImpl::MapEntryListenerToKeyCodec::MapEntryListenerToKeyCodec(
2119  std::string name,
2120  bool include_value,
2121  int32_t listener_flags,
2122  serialization::pimpl::data key)
2123  : name_(std::move(name))
2124  , include_value_(include_value)
2125  , listener_flags_(listener_flags)
2126  , key_(std::move(key))
2127 {}
2128 
2129 IMapImpl::MapEntryListenerWithPredicateMessageCodec::
2130  MapEntryListenerWithPredicateMessageCodec(
2131  std::string name,
2132  bool include_value,
2133  int32_t listener_flags,
2134  serialization::pimpl::data&& predicate)
2135  : name_(std::move(name))
2136  , include_value_(include_value)
2137  , listener_flags_(listener_flags)
2138  , predicate_(std::move(predicate))
2139 {}
2140 
2141 protocol::ClientMessage
2142 IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_add_request(
2143  bool local_only) const
2144 {
2145  return protocol::codec::map_addentrylistenerwithpredicate_encode(
2146  name_,
2147  predicate_,
2148  include_value_,
2149  static_cast<int32_t>(listener_flags_),
2150  local_only);
2151 }
2152 
2153 protocol::ClientMessage
2154 IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_remove_request(
2155  boost::uuids::uuid real_registration_id) const
2156 {
2157  return protocol::codec::map_removeentrylistener_encode(
2158  name_, real_registration_id);
2159 }
2160 
2161 TransactionalQueueImpl::TransactionalQueueImpl(
2162  const std::string& name,
2163  txn::TransactionProxy& transaction_proxy)
2164  : TransactionalObject(iqueue::SERVICE_NAME, name, transaction_proxy)
2165 {}
2166 
2167 boost::future<bool>
2168 TransactionalQueueImpl::offer(const serialization::pimpl::data& e,
2169  std::chrono::milliseconds timeout)
2170 {
2171  auto request = protocol::codec::transactionalqueue_offer_encode(
2172  get_name(),
2173  get_transaction_id(),
2174  util::get_current_thread_id(),
2175  e,
2176  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
2177 
2178  return invoke_and_get_future<bool>(request);
2179 }
2180 
2181 boost::future<boost::optional<serialization::pimpl::data>>
2182 TransactionalQueueImpl::poll_data(std::chrono::milliseconds timeout)
2183 {
2184  auto request = protocol::codec::transactionalqueue_poll_encode(
2185  get_name(),
2186  get_transaction_id(),
2187  util::get_current_thread_id(),
2188  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
2189 
2190  return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
2191  request);
2192 }
2193 
2194 boost::future<int>
2195 TransactionalQueueImpl::size()
2196 {
2197  auto request = protocol::codec::transactionalqueue_size_encode(
2198  get_name(), get_transaction_id(), util::get_current_thread_id());
2199 
2200  return invoke_and_get_future<int>(request);
2201 }
2202 
2203 ISetImpl::ISetImpl(const std::string& instance_name,
2204  spi::ClientContext* client_context)
2205  : ProxyImpl(iset::SERVICE_NAME, instance_name, client_context)
2206 {
2207  serialization::pimpl::data key_data =
2208  get_context().get_serialization_service().to_data<std::string>(
2209  &instance_name);
2210  partition_id_ = get_partition_id(key_data);
2211 }
2212 
2213 boost::future<bool>
2214 ISetImpl::remove_item_listener(boost::uuids::uuid registration_id)
2215 {
2216  return get_context().get_client_listener_service().deregister_listener(
2217  registration_id);
2218 }
2219 
2220 boost::future<int>
2221 ISetImpl::size()
2222 {
2223  auto request = protocol::codec::set_size_encode(get_name());
2224  return invoke_and_get_future<int>(request, partition_id_);
2225 }
2226 
2227 boost::future<bool>
2228 ISetImpl::is_empty()
2229 {
2230  auto request = protocol::codec::set_isempty_encode(get_name());
2231  return invoke_and_get_future<bool>(request, partition_id_);
2232 }
2233 
2234 boost::future<bool>
2235 ISetImpl::contains(const serialization::pimpl::data& element)
2236 {
2237  auto request = protocol::codec::set_contains_encode(get_name(), element);
2238  return invoke_and_get_future<bool>(request, partition_id_);
2239 }
2240 
2241 boost::future<std::vector<serialization::pimpl::data>>
2242 ISetImpl::to_array_data()
2243 {
2244  auto request = protocol::codec::set_getall_encode(get_name());
2245  return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
2246  request, partition_id_);
2247 }
2248 
2249 boost::future<bool>
2250 ISetImpl::add(const serialization::pimpl::data& element)
2251 {
2252  auto request = protocol::codec::set_add_encode(get_name(), element);
2253  return invoke_and_get_future<bool>(request, partition_id_);
2254 }
2255 
2256 boost::future<bool>
2257 ISetImpl::remove(const serialization::pimpl::data& element)
2258 {
2259  auto request = protocol::codec::set_remove_encode(get_name(), element);
2260  return invoke_and_get_future<bool>(request, partition_id_);
2261 }
2262 
2263 boost::future<bool>
2264 ISetImpl::contains_all(const std::vector<serialization::pimpl::data>& elements)
2265 {
2266  auto request =
2267  protocol::codec::set_containsall_encode(get_name(), elements);
2268  return invoke_and_get_future<bool>(request, partition_id_);
2269 }
2270 
2271 boost::future<bool>
2272 ISetImpl::add_all(const std::vector<serialization::pimpl::data>& elements)
2273 {
2274  auto request = protocol::codec::set_addall_encode(get_name(), elements);
2275  return invoke_and_get_future<bool>(request, partition_id_);
2276 }
2277 
2278 boost::future<bool>
2279 ISetImpl::remove_all(const std::vector<serialization::pimpl::data>& elements)
2280 {
2281  auto request =
2282  protocol::codec::set_compareandremoveall_encode(get_name(), elements);
2283  return invoke_and_get_future<bool>(request, partition_id_);
2284 }
2285 
2286 boost::future<bool>
2287 ISetImpl::retain_all(const std::vector<serialization::pimpl::data>& elements)
2288 {
2289  auto request =
2290  protocol::codec::set_compareandretainall_encode(get_name(), elements);
2291  return invoke_and_get_future<bool>(request, partition_id_);
2292 }
2293 
2294 boost::future<void>
2295 ISetImpl::clear()
2296 {
2297  auto request = protocol::codec::set_clear_encode(get_name());
2298  return to_void_future(invoke_on_partition(request, partition_id_));
2299 }
2300 
2301 std::shared_ptr<spi::impl::ListenerMessageCodec>
2302 ISetImpl::create_item_listener_codec(bool include_value)
2303 {
2304  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2305  new SetListenerMessageCodec(get_name(), include_value));
2306 }
2307 
2308 ISetImpl::SetListenerMessageCodec::SetListenerMessageCodec(std::string name,
2309  bool include_value)
2310  : name_(std::move(name))
2311  , include_value_(include_value)
2312 {}
2313 
2314 protocol::ClientMessage
2315 ISetImpl::SetListenerMessageCodec::encode_add_request(bool local_only) const
2316 {
2317  return protocol::codec::set_addlistener_encode(
2318  name_, include_value_, local_only);
2319 }
2320 
2321 protocol::ClientMessage
2322 ISetImpl::SetListenerMessageCodec::encode_remove_request(
2323  boost::uuids::uuid real_registration_id) const
2324 {
2325  return protocol::codec::set_removelistener_encode(name_,
2326  real_registration_id);
2327 }
2328 
2329 ITopicImpl::ITopicImpl(const std::string& instance_name,
2330  spi::ClientContext* context)
2331  : proxy::ProxyImpl("hz:impl:topicService", instance_name, context)
2332  , partition_id_(get_partition_id(to_data(instance_name)))
2333 {}
2334 
2335 boost::future<void>
2336 ITopicImpl::publish(const serialization::pimpl::data& data)
2337 {
2338  auto request = protocol::codec::topic_publish_encode(get_name(), data);
2339  return to_void_future(invoke_on_partition(request, partition_id_));
2340 }
2341 
2342 boost::future<boost::uuids::uuid>
2343 ITopicImpl::add_message_listener(
2344  std::shared_ptr<impl::BaseEventHandler> topic_event_handler)
2345 {
2346  return register_listener(create_item_listener_codec(),
2347  std::move(topic_event_handler));
2348 }
2349 
2350 boost::future<bool>
2351 ITopicImpl::remove_message_listener(boost::uuids::uuid registration_id)
2352 {
2353  return get_context().get_client_listener_service().deregister_listener(
2354  registration_id);
2355 }
2356 
2357 std::shared_ptr<spi::impl::ListenerMessageCodec>
2358 ITopicImpl::create_item_listener_codec()
2359 {
2360  return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2361  new TopicListenerMessageCodec(get_name()));
2362 }
2363 
2364 ITopicImpl::TopicListenerMessageCodec::TopicListenerMessageCodec(
2365  std::string name)
2366  : name_(std::move(name))
2367 {}
2368 
2369 protocol::ClientMessage
2370 ITopicImpl::TopicListenerMessageCodec::encode_add_request(bool local_only) const
2371 {
2372  return protocol::codec::topic_addmessagelistener_encode(name_, local_only);
2373 }
2374 
2375 protocol::ClientMessage
2376 ITopicImpl::TopicListenerMessageCodec::encode_remove_request(
2377  boost::uuids::uuid real_registration_id) const
2378 {
2379  return protocol::codec::topic_removemessagelistener_encode(
2380  name_, real_registration_id);
2381 }
2382 
2383 ReplicatedMapImpl::ReplicatedMapImpl(const std::string& service_name,
2384  const std::string& object_name,
2385  spi::ClientContext* context)
2386  : ProxyImpl(service_name, object_name, context)
2387  , target_partition_id_(-1)
2388 {}
2389 
2390 const int32_t RingbufferImpl::MAX_BATCH_SIZE{ 1000 };
2391 } // namespace proxy
2392 
2393 namespace map {
2394 const serialization::pimpl::data&
2395 data_entry_view::get_key() const
2396 {
2397  return key_;
2398 }
2399 
2400 const serialization::pimpl::data&
2401 data_entry_view::get_value() const
2402 {
2403  return value_;
2404 }
2405 
2406 int64_t
2407 data_entry_view::get_cost() const
2408 {
2409  return cost_;
2410 }
2411 
2412 int64_t
2413 data_entry_view::get_creation_time() const
2414 {
2415  return creation_time_;
2416 }
2417 
2418 int64_t
2419 data_entry_view::get_expiration_time() const
2420 {
2421  return expiration_time_;
2422 }
2423 
2424 int64_t
2425 data_entry_view::get_hits() const
2426 {
2427  return hits_;
2428 }
2429 
2430 int64_t
2431 data_entry_view::get_last_access_time() const
2432 {
2433  return last_access_time_;
2434 }
2435 
2436 int64_t
2437 data_entry_view::get_last_stored_time() const
2438 {
2439  return last_stored_time_;
2440 }
2441 
2442 int64_t
2443 data_entry_view::get_last_update_time() const
2444 {
2445  return last_update_time_;
2446 }
2447 
2448 int64_t
2449 data_entry_view::get_version() const
2450 {
2451  return version_;
2452 }
2453 
2454 int64_t
2455 data_entry_view::get_ttl() const
2456 {
2457  return ttl_;
2458 }
2459 
2460 int64_t
2461 data_entry_view::get_max_idle() const
2462 {
2463  return max_idle_;
2464 }
2465 
2466 data_entry_view::data_entry_view(serialization::pimpl::data&& key,
2467  serialization::pimpl::data&& value,
2468  int64_t cost,
2469  int64_t creation_time,
2470  int64_t expiration_time,
2471  int64_t hits,
2472  int64_t last_access_time,
2473  int64_t last_stored_time,
2474  int64_t last_update_time,
2475  int64_t version,
2476  int64_t ttl,
2477  int64_t max_idle)
2478  : key_(std::move(key))
2479  , value_(std::move(value))
2480  , cost_(cost)
2481  , creation_time_(creation_time)
2482  , expiration_time_(expiration_time)
2483  , hits_(hits)
2484  , last_access_time_(last_access_time)
2485  , last_stored_time_(last_stored_time)
2486  , last_update_time_(last_update_time)
2487  , version_(version)
2488  , ttl_(ttl)
2489  , max_idle_(max_idle)
2490 {}
2491 } // namespace map
2492 
2493 namespace topic {
2494 namespace impl {
2495 namespace reliable {
2496 ReliableTopicMessage::ReliableTopicMessage()
2497  : publish_time_(std::chrono::system_clock::now())
2498 {}
2499 
2500 ReliableTopicMessage::ReliableTopicMessage(
2501  hazelcast::client::serialization::pimpl::data&& payload_data,
2502  std::unique_ptr<address> address)
2503  : publish_time_(std::chrono::system_clock::now())
2504  , payload_(std::move(payload_data))
2505 {
2506  if (address) {
2507  publisher_address_ = boost::make_optional(*address);
2508  }
2509 }
2510 
2511 std::chrono::system_clock::time_point
2512 ReliableTopicMessage::get_publish_time() const
2513 {
2514  return publish_time_;
2515 }
2516 
2517 const boost::optional<address>&
2518 ReliableTopicMessage::get_publisher_address() const
2519 {
2520  return publisher_address_;
2521 }
2522 
2523 serialization::pimpl::data&
2524 ReliableTopicMessage::get_payload()
2525 {
2526  return payload_;
2527 }
2528 } // namespace reliable
2529 } // namespace impl
2530 } // namespace topic
2531 
2532 namespace serialization {
2533 int32_t
2534 hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_factory_id()
2535 {
2536  return F_ID;
2537 }
2538 
2539 int
2540 hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_class_id()
2541 {
2542  return RELIABLE_TOPIC_MESSAGE;
2543 }
2544 
2545 void
2546 hz_serializer<topic::impl::reliable::ReliableTopicMessage>::write_data(
2547  const topic::impl::reliable::ReliableTopicMessage& object,
2548  object_data_output& out)
2549 {
2550  out.write<int64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(
2551  object.publish_time_.time_since_epoch())
2552  .count());
2553  out.write_object(object.publisher_address_);
2554  out.write(object.payload_.to_byte_array());
2555 }
2556 
2557 topic::impl::reliable::ReliableTopicMessage
2558 hz_serializer<topic::impl::reliable::ReliableTopicMessage>::read_data(
2559  object_data_input& in)
2560 {
2561  topic::impl::reliable::ReliableTopicMessage message;
2562  auto now = std::chrono::system_clock::now();
2563  message.publish_time_ = now +
2564  std::chrono::milliseconds(in.read<int64_t>()) -
2565  now.time_since_epoch();
2566  message.publisher_address_ = in.read_object<address>();
2567  message.payload_ =
2568  serialization::pimpl::data(in.read<std::vector<byte>>().value());
2569  return message;
2570 }
2571 } // namespace serialization
2572 
2573 entry_event::entry_event(const std::string& name,
2574  member&& member,
2575  type event_type,
2576  typed_data&& key,
2577  typed_data&& value,
2578  typed_data&& old_value,
2579  typed_data&& merging_value)
2580  : name_(name)
2581  , member_(std::move(member))
2582  , event_type_(event_type)
2583  , key_(std::move(key))
2584  , value_(std::move(value))
2585  , old_value_(std::move(old_value))
2586  , merging_value_(std::move(merging_value))
2587 {}
2588 
2589 const typed_data&
2591 {
2592  return key_;
2593 }
2594 
2595 const typed_data&
2597 {
2598  return old_value_;
2599 }
2600 
2601 const typed_data&
2603 {
2604  return value_;
2605 }
2606 
2607 const typed_data&
2609 {
2610  return merging_value_;
2611 }
2612 
2613 const member&
2615 {
2616  return member_;
2617 }
2618 
2619 entry_event::type
2621 {
2622  return event_type_;
2623 }
2624 
2625 const std::string&
2627 {
2628  return name_;
2629 }
2630 
2631 std::ostream&
2632 operator<<(std::ostream& os, const entry_event& event)
2633 {
2634  os << "name: " << event.name_ << " member: " << event.member_
2635  << " eventType: " << static_cast<int>(event.event_type_)
2636  << " key: " << event.key_.get_type()
2637  << " value: " << event.value_.get_type()
2638  << " oldValue: " << event.old_value_.get_type()
2639  << " mergingValue: " << event.merging_value_.get_type();
2640  return os;
2641 }
2642 
2644  entry_event::type event_type,
2645  const std::string& name,
2646  int number_of_entries_affected)
2647  : member_(member)
2648  , event_type_(event_type)
2649  , name_(name)
2650  , number_of_entries_affected_(number_of_entries_affected)
2651 {}
2652 
2653 const member&
2655 {
2656  return member_;
2657 }
2658 
2659 entry_event::type
2661 {
2662  return event_type_;
2663 }
2664 
2665 const std::string&
2667 {
2668  return name_;
2669 }
2670 
2671 int
2673 {
2674  return number_of_entries_affected_;
2675 }
2676 
2677 std::ostream&
2678 operator<<(std::ostream& os, const map_event& event)
2679 {
2680  os << "MapEvent{member: " << event.member_
2681  << " eventType: " << static_cast<int>(event.event_type_)
2682  << " name: " << event.name_
2683  << " numberOfEntriesAffected: " << event.number_of_entries_affected_;
2684  return os;
2685 }
2686 
2687 item_event_base::item_event_base(const std::string& name,
2688  const member& member,
2689  const item_event_type& event_type)
2690  : name_(name)
2691  , member_(member)
2692  , event_type_(event_type)
2693 {}
2694 
2695 const member&
2697 {
2698  return member_;
2699 }
2700 
2701 item_event_type
2703 {
2704  return event_type_;
2705 }
2706 
2707 const std::string&
2709 {
2710  return name_;
2711 }
2712 
2713 item_event_base::~item_event_base() = default;
2714 
2715 flake_id_generator::flake_id_generator(const std::string& object_name,
2716  spi::ClientContext* context)
2717  : flake_id_generator_impl(SERVICE_NAME, object_name, context)
2718 {}
2719 } // namespace client
2720 } // namespace hazelcast
const typed_data & get_key() const
Returns the key of the entry event.
Definition: proxy.cpp:2590
const std::string & get_name() const
Returns the name of the map for this event.
Definition: proxy.cpp:2626
const typed_data & get_old_value() const
Returns the old value of the entry event.
Definition: proxy.cpp:2596
type get_event_type() const
Return the event type.
Definition: proxy.cpp:2620
const member & get_member() const
Returns the member fired this event.
Definition: proxy.cpp:2614
const typed_data & get_value() const
Returns the value of the entry event.
Definition: proxy.cpp:2602
const typed_data & get_merging_value() const
Returns the incoming merging value of the entry event.
Definition: proxy.cpp:2608
static const std::chrono::milliseconds UNSET
Default TTL value of a record.
Definition: imap.h:1136
item_event_type get_event_type() const
Return the event type.
Definition: proxy.cpp:2702
const member & get_member() const
Returns the member fired this event.
Definition: proxy.cpp:2696
const std::string & get_name() const
Returns the name of the collection for this event.
Definition: proxy.cpp:2708
Map events common contract.
Definition: map_event.h:36
const std::string & get_name() const
Returns the name of the map for this event.
Definition: proxy.cpp:2666
entry_event::type get_event_type() const
Return the event type.
Definition: proxy.cpp:2660
int get_number_of_entries_affected() const
Returns the number of entries affected by this event.
Definition: proxy.cpp:2672
map_event(member &&member, entry_event::type event_type, const std::string &name, int number_of_entries_affected)
Constructor.
Definition: proxy.cpp:2643
const member & get_member() const
Returns the member fired this event.
Definition: proxy.cpp:2654
hz_cluster member class.
Definition: member.h:62
typed_data class is a wrapper class for the serialized binary data.
serialization::pimpl::object_type get_type() const