17 #include <unordered_set> 
   20 #include "hazelcast/client/impl/ClientLockReferenceIdGenerator.h" 
   21 #include "hazelcast/client/proxy/PNCounterImpl.h" 
   22 #include "hazelcast/client/spi/ClientContext.h" 
   23 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h" 
   24 #include "hazelcast/client/proxy/flake_id_generator_impl.h" 
   25 #include "hazelcast/client/spi/impl/listener/listener_service_impl.h" 
   26 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h" 
   27 #include "hazelcast/client/client_config.h" 
   28 #include "hazelcast/client/map/data_entry_view.h" 
   29 #include "hazelcast/client/proxy/RingbufferImpl.h" 
   30 #include "hazelcast/client/impl/vector_clock.h" 
   31 #include "hazelcast/client/internal/partition/strategy/StringPartitioningStrategy.h" 
   32 #include "hazelcast/util/Util.h" 
   33 #include "hazelcast/client/topic/reliable_listener.h" 
   39 reliable_topic::reliable_topic(
const std::string& instance_name,
 
   40                                spi::ClientContext* context)
 
   41   : proxy::ProxyImpl(reliable_topic::SERVICE_NAME, instance_name, context)
 
   42   , executor_(context->get_client_execution_service().get_user_executor())
 
   43   , logger_(context->get_logger())
 
   45     auto reliable_config =
 
   46       context->get_client_config().lookup_reliable_topic_config(instance_name);
 
   47     if (reliable_config) {
 
   48         batch_size_ = reliable_config->get_read_batch_size();
 
   50         batch_size_ = config::reliable_topic_config::DEFAULT_READ_BATCH_SIZE;
 
   54       context->get_hazelcast_client_implementation()
 
   55         ->get_distributed_object<ringbuffer>(
 
   56           std::string(reliable_topic::TOPIC_RB_PREFIX) + instance_name);
 
   60 reliable_topic::remove_message_listener(
const std::string& registration_id)
 
   62     int id = util::IOUtil::to_value<int>(registration_id);
 
   63     auto runner = runners_map_.get(
id);
 
   72 reliable_topic::on_shutdown()
 
   75     for (
auto& entry : runners_map_.clear()) {
 
   76         entry.second->cancel();
 
   81 reliable_topic::on_destroy()
 
   84     for (
auto& entry : runners_map_.clear()) {
 
   85         entry.second->cancel();
 
   90 reliable_topic::post_destroy()
 
   93     ringbuffer_.get()->destroy().get();
 
   97 reliable_listener::reliable_listener(
bool loss_tolerant,
 
   98                                      int64_t initial_sequence_id)
 
   99   : loss_tolerant_(loss_tolerant)
 
  100   , initial_sequence_id_(initial_sequence_id)
 
  105 ClientLockReferenceIdGenerator::ClientLockReferenceIdGenerator()
 
  106   : reference_id_counter_(0)
 
  110 ClientLockReferenceIdGenerator::get_next_reference_id()
 
  112     return ++reference_id_counter_;
 
  117 MultiMapImpl::MultiMapImpl(
const std::string& instance_name,
 
  118                            spi::ClientContext* context)
 
  119   : ProxyImpl(multi_map::SERVICE_NAME, instance_name, context)
 
  123     lock_reference_id_generator_ =
 
  124       get_context().get_lock_reference_id_generator();
 
  128 MultiMapImpl::put(
const serialization::pimpl::data& key,
 
  129                   const serialization::pimpl::data& value)
 
  131     auto request = protocol::codec::multimap_put_encode(
 
  132       get_name(), key, value, util::get_current_thread_id());
 
  133     return invoke_and_get_future<bool>(request, key);
 
  136 boost::future<std::vector<serialization::pimpl::data>>
 
  137 MultiMapImpl::get_data(
const serialization::pimpl::data& key)
 
  139     auto request = protocol::codec::multimap_get_encode(
 
  140       get_name(), key, util::get_current_thread_id());
 
  141     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
  146 MultiMapImpl::remove(
const serialization::pimpl::data& key,
 
  147                      const serialization::pimpl::data& value)
 
  149     auto request = protocol::codec::multimap_removeentry_encode(
 
  150       get_name(), key, value, util::get_current_thread_id());
 
  151     return invoke_and_get_future<bool>(request, key);
 
  154 boost::future<std::vector<serialization::pimpl::data>>
 
  155 MultiMapImpl::remove_data(
const serialization::pimpl::data& key)
 
  157     auto request = protocol::codec::multimap_remove_encode(
 
  158       get_name(), key, util::get_current_thread_id());
 
  159     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
  163 boost::future<std::vector<serialization::pimpl::data>>
 
  164 MultiMapImpl::key_set_data()
 
  166     auto request = protocol::codec::multimap_keyset_encode(get_name());
 
  167     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
  171 boost::future<std::vector<serialization::pimpl::data>>
 
  172 MultiMapImpl::values_data()
 
  174     auto request = protocol::codec::multimap_values_encode(get_name());
 
  175     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
  179 boost::future<EntryVector>
 
  180 MultiMapImpl::entry_set_data()
 
  182     auto request = protocol::codec::multimap_entryset_encode(get_name());
 
  183     return invoke_and_get_future<EntryVector>(request);
 
  187 MultiMapImpl::contains_key(
const serialization::pimpl::data& key)
 
  189     auto request = protocol::codec::multimap_containskey_encode(
 
  190       get_name(), key, util::get_current_thread_id());
 
  191     return invoke_and_get_future<bool>(request, key);
 
  195 MultiMapImpl::contains_value(
const serialization::pimpl::data& value)
 
  198       protocol::codec::multimap_containsvalue_encode(get_name(), value);
 
  199     return invoke_and_get_future<bool>(request);
 
  203 MultiMapImpl::contains_entry(
const serialization::pimpl::data& key,
 
  204                              const serialization::pimpl::data& value)
 
  206     auto request = protocol::codec::multimap_containsentry_encode(
 
  207       get_name(), key, value, util::get_current_thread_id());
 
  208     return invoke_and_get_future<bool>(request, key);
 
  214     auto request = protocol::codec::multimap_size_encode(get_name());
 
  215     return invoke_and_get_future<int>(request);
 
  219 MultiMapImpl::clear()
 
  221     auto request = protocol::codec::multimap_clear_encode(get_name());
 
  222     return to_void_future(invoke(request));
 
  226 MultiMapImpl::value_count(
const serialization::pimpl::data& key)
 
  228     auto request = protocol::codec::multimap_valuecount_encode(
 
  229       get_name(), key, util::get_current_thread_id());
 
  230     return invoke_and_get_future<int>(request, key);
 
  233 boost::future<boost::uuids::uuid>
 
  234 MultiMapImpl::add_entry_listener(
 
  235   std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
 
  238     return register_listener(
 
  239       create_multi_map_entry_listener_codec(include_value),
 
  240       std::move(entry_event_handler));
 
  243 boost::future<boost::uuids::uuid>
 
  244 MultiMapImpl::add_entry_listener(
 
  245   std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
 
  247   serialization::pimpl::data&& key)
 
  249     return register_listener(
 
  250       create_multi_map_entry_listener_codec(include_value, std::move(key)),
 
  251       std::move(entry_event_handler));
 
  255 MultiMapImpl::remove_entry_listener(boost::uuids::uuid registration_id)
 
  257     return get_context().get_client_listener_service().deregister_listener(
 
  262 MultiMapImpl::lock(
const serialization::pimpl::data& key)
 
  264     return lock(key, std::chrono::milliseconds(-1));
 
  268 MultiMapImpl::lock(
const serialization::pimpl::data& key,
 
  269                    std::chrono::milliseconds lease_time)
 
  271     auto request = protocol::codec::multimap_lock_encode(
 
  274       util::get_current_thread_id(),
 
  275       std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
 
  276       lock_reference_id_generator_->get_next_reference_id());
 
  277     return to_void_future(invoke_on_partition(request, get_partition_id(key)));
 
  281 MultiMapImpl::is_locked(
const serialization::pimpl::data& key)
 
  283     auto request = protocol::codec::multimap_islocked_encode(get_name(), key);
 
  284     return invoke_and_get_future<bool>(request, key);
 
  288 MultiMapImpl::try_lock(
const serialization::pimpl::data& key)
 
  290     auto request = protocol::codec::multimap_trylock_encode(
 
  293       util::get_current_thread_id(),
 
  296       lock_reference_id_generator_->get_next_reference_id());
 
  297     return invoke_and_get_future<bool>(request, key);
 
  301 MultiMapImpl::try_lock(
const serialization::pimpl::data& key,
 
  302                        std::chrono::milliseconds timeout)
 
  304     return try_lock(key, timeout, std::chrono::milliseconds(INT64_MAX));
 
  308 MultiMapImpl::try_lock(
const serialization::pimpl::data& key,
 
  309                        std::chrono::milliseconds timeout,
 
  310                        std::chrono::milliseconds lease_time)
 
  312     auto request = protocol::codec::multimap_trylock_encode(
 
  315       util::get_current_thread_id(),
 
  316       std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
 
  317       std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
 
  318       lock_reference_id_generator_->get_next_reference_id());
 
  319     return invoke_and_get_future<bool>(request, key);
 
  323 MultiMapImpl::unlock(
const serialization::pimpl::data& key)
 
  325     auto request = protocol::codec::multimap_unlock_encode(
 
  328       util::get_current_thread_id(),
 
  329       lock_reference_id_generator_->get_next_reference_id());
 
  330     return to_void_future(invoke_on_partition(request, get_partition_id(key)));
 
  334 MultiMapImpl::force_unlock(
const serialization::pimpl::data& key)
 
  336     auto request = protocol::codec::multimap_forceunlock_encode(
 
  337       get_name(), key, lock_reference_id_generator_->get_next_reference_id());
 
  338     return to_void_future(invoke_on_partition(request, get_partition_id(key)));
 
  341 std::shared_ptr<spi::impl::ListenerMessageCodec>
 
  342 MultiMapImpl::create_multi_map_entry_listener_codec(
bool include_value)
 
  344     return std::shared_ptr<spi::impl::ListenerMessageCodec>(
 
  345       new MultiMapEntryListenerMessageCodec(get_name(), include_value));
 
  348 std::shared_ptr<spi::impl::ListenerMessageCodec>
 
  349 MultiMapImpl::create_multi_map_entry_listener_codec(
 
  351   serialization::pimpl::data&& key)
 
  353     return std::shared_ptr<spi::impl::ListenerMessageCodec>(
 
  354       new MultiMapEntryListenerToKeyCodec(
 
  355         get_name(), include_value, std::move(key)));
 
  359 MultiMapImpl::on_initialize()
 
  361     ProxyImpl::on_initialize();
 
  362     lock_reference_id_generator_ =
 
  363       get_context().get_lock_reference_id_generator();
 
  366 MultiMapImpl::MultiMapEntryListenerMessageCodec::
 
  367   MultiMapEntryListenerMessageCodec(std::string name, 
bool include_value)
 
  368   : name_(std::move(name))
 
  369   , include_value_(include_value)
 
  372 protocol::ClientMessage
 
  373 MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_add_request(
 
  374   bool local_only)
 const 
  376     return protocol::codec::multimap_addentrylistener_encode(
 
  377       name_, include_value_, local_only);
 
  380 protocol::ClientMessage
 
  381 MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_remove_request(
 
  382   boost::uuids::uuid real_registration_id)
 const 
  384     return protocol::codec::multimap_removeentrylistener_encode(
 
  385       name_, real_registration_id);
 
  388 protocol::ClientMessage
 
  389 MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_add_request(
 
  390   bool local_only)
 const 
  392     return protocol::codec::multimap_addentrylistenertokey_encode(
 
  393       name_, key_, include_value_, local_only);
 
  396 protocol::ClientMessage
 
  397 MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_remove_request(
 
  398   boost::uuids::uuid real_registration_id)
 const 
  400     return protocol::codec::multimap_removeentrylistener_encode(
 
  401       name_, real_registration_id);
 
  404 MultiMapImpl::MultiMapEntryListenerToKeyCodec::MultiMapEntryListenerToKeyCodec(
 
  407   serialization::pimpl::data&& key)
 
  408   : name_(std::move(name))
 
  409   , include_value_(include_value)
 
  410   , key_(std::move(key))
 
  413 const std::shared_ptr<std::unordered_set<member>>
 
  414   PNCounterImpl::EMPTY_ADDRESS_LIST(
new std::unordered_set<member>());
 
  416 PNCounterImpl::PNCounterImpl(
const std::string& service_name,
 
  417                              const std::string& object_name,
 
  418                              spi::ClientContext* context)
 
  419   : ProxyImpl(service_name, object_name, context)
 
  420   , max_configured_replica_count_(0)
 
  422       std::shared_ptr<impl::vector_clock>(new impl::vector_clock()))
 
  423   , logger_(context->get_logger())
 
  427 operator<<(std::ostream& os, 
const PNCounterImpl& proxy)
 
  429     os << 
"PNCounter{name='" << proxy.get_name() << 
"\'}";
 
  433 boost::future<int64_t>
 
  436     boost::shared_ptr<member> target =
 
  437       get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
 
  439         BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
 
  440           "ClientPNCounterProxy::get",
 
  441           "Cannot invoke operations on a CRDT because the cluster does not " 
  442           "contain any data members"));
 
  444     return invoke_get_internal(EMPTY_ADDRESS_LIST, 
nullptr, target);
 
  447 boost::future<int64_t>
 
  448 PNCounterImpl::get_and_add(int64_t delta)
 
  450     boost::shared_ptr<member> target =
 
  451       get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
 
  453         BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
 
  454           "ClientPNCounterProxy::getAndAdd",
 
  455           "Cannot invoke operations on a CRDT because the cluster does not " 
  456           "contain any data members"));
 
  458     return invoke_add_internal(
 
  459       delta, 
true, EMPTY_ADDRESS_LIST, 
nullptr, target);
 
  462 boost::future<int64_t>
 
  463 PNCounterImpl::add_and_get(int64_t delta)
 
  465     boost::shared_ptr<member> target =
 
  466       get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
 
  468         BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
 
  469           "ClientPNCounterProxy::addAndGet",
 
  470           "Cannot invoke operations on a CRDT because the cluster does not " 
  471           "contain any data members"));
 
  473     return invoke_add_internal(
 
  474       delta, 
false, EMPTY_ADDRESS_LIST, 
nullptr, target);
 
  477 boost::future<int64_t>
 
  478 PNCounterImpl::get_and_subtract(int64_t delta)
 
  480     boost::shared_ptr<member> target =
 
  481       get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
 
  483         BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
 
  484           "ClientPNCounterProxy::getAndSubtract",
 
  485           "Cannot invoke operations on a CRDT because the cluster does not " 
  486           "contain any data members"));
 
  488     return invoke_add_internal(
 
  489       -delta, 
true, EMPTY_ADDRESS_LIST, 
nullptr, target);
 
  492 boost::future<int64_t>
 
  493 PNCounterImpl::subtract_and_get(int64_t delta)
 
  495     boost::shared_ptr<member> target =
 
  496       get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
 
  498         BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
 
  499           "ClientPNCounterProxy::subtractAndGet",
 
  500           "Cannot invoke operations on a CRDT because the cluster does not " 
  501           "contain any data members"));
 
  503     return invoke_add_internal(
 
  504       -delta, 
false, EMPTY_ADDRESS_LIST, 
nullptr, target);
 
  507 boost::future<int64_t>
 
  508 PNCounterImpl::decrement_and_get()
 
  510     boost::shared_ptr<member> target =
 
  511       get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
 
  513         BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
 
  514           "ClientPNCounterProxy::decrementAndGet",
 
  515           "Cannot invoke operations on a CRDT because the cluster does not " 
  516           "contain any data members"));
 
  518     return invoke_add_internal(-1, 
false, EMPTY_ADDRESS_LIST, 
nullptr, target);
 
  521 boost::future<int64_t>
 
  522 PNCounterImpl::increment_and_get()
 
  524     boost::shared_ptr<member> target =
 
  525       get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
 
  527         BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
 
  528           "ClientPNCounterProxy::incrementAndGet",
 
  529           "Cannot invoke operations on a CRDT because the cluster does not " 
  530           "contain any data members"));
 
  532     return invoke_add_internal(1, 
false, EMPTY_ADDRESS_LIST, 
nullptr, target);
 
  535 boost::future<int64_t>
 
  536 PNCounterImpl::get_and_decrement()
 
  538     boost::shared_ptr<member> target =
 
  539       get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
 
  541         BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
 
  542           "ClientPNCounterProxy::getAndDecrement",
 
  543           "Cannot invoke operations on a CRDT because the cluster does not " 
  544           "contain any data members"));
 
  546     return invoke_add_internal(-1, 
true, EMPTY_ADDRESS_LIST, 
nullptr, target);
 
  549 boost::future<int64_t>
 
  550 PNCounterImpl::get_and_increment()
 
  552     boost::shared_ptr<member> target =
 
  553       get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
 
  555         BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
 
  556           "ClientPNCounterProxy::getAndIncrement",
 
  557           "Cannot invoke operations on a CRDT because the cluster does not " 
  558           "contain any data members"));
 
  560     return invoke_add_internal(1, 
true, EMPTY_ADDRESS_LIST, 
nullptr, target);
 
  564 PNCounterImpl::reset()
 
  567       std::shared_ptr<impl::vector_clock>(
new impl::vector_clock());
 
  568     return boost::make_ready_future();
 
  571 boost::shared_ptr<member>
 
  572 PNCounterImpl::get_crdt_operation_target(
 
  573   const std::unordered_set<member>& excluded_addresses)
 
  575     auto replicaAddress = current_target_replica_address_.load();
 
  576     if (replicaAddress &&
 
  577         excluded_addresses.find(*replicaAddress) == excluded_addresses.end()) {
 
  578         return replicaAddress;
 
  582         std::lock_guard<std::mutex> guard(target_selection_mutex_);
 
  583         replicaAddress = current_target_replica_address_.load();
 
  584         if (!replicaAddress || excluded_addresses.find(*replicaAddress) !=
 
  585                                  excluded_addresses.end()) {
 
  586             current_target_replica_address_ =
 
  587               choose_target_replica(excluded_addresses);
 
  590     return current_target_replica_address_;
 
  593 boost::shared_ptr<member>
 
  594 PNCounterImpl::choose_target_replica(
 
  595   const std::unordered_set<member>& excluded_addresses)
 
  597     std::vector<member> replicaAddresses =
 
  598       get_replica_addresses(excluded_addresses);
 
  599     if (replicaAddresses.empty()) {
 
  604     int randomReplicaIndex = std::abs(rand()) % (int)replicaAddresses.size();
 
  605     return boost::make_shared<member>(replicaAddresses[randomReplicaIndex]);
 
  609 PNCounterImpl::get_replica_addresses(
 
  610   const std::unordered_set<member>& excluded_members)
 
  612     std::vector<member> dataMembers =
 
  613       get_context().get_client_cluster_service().get_members(
 
  614         *member_selectors::DATA_MEMBER_SELECTOR);
 
  615     int32_t replicaCount = get_max_configured_replica_count();
 
  616     int currentReplicaCount =
 
  617       util::min<int>(replicaCount, (
int)dataMembers.size());
 
  619     std::vector<member> replicaMembers;
 
  620     for (
int i = 0; i < currentReplicaCount; i++) {
 
  621         if (excluded_members.find(dataMembers[i]) == excluded_members.end()) {
 
  622             replicaMembers.push_back(dataMembers[i]);
 
  625     return replicaMembers;
 
  629 PNCounterImpl::get_max_configured_replica_count()
 
  631     if (max_configured_replica_count_ > 0) {
 
  632         return max_configured_replica_count_;
 
  635           protocol::codec::pncounter_getconfiguredreplicacount_encode(
 
  637         max_configured_replica_count_ =
 
  638           invoke_and_get_future<int32_t>(request).get();
 
  640     return max_configured_replica_count_;
 
  643 boost::shared_ptr<member>
 
  644 PNCounterImpl::try_choose_a_new_target(
 
  645   std::shared_ptr<std::unordered_set<member>> excluded_addresses,
 
  646   boost::shared_ptr<member> last_target,
 
  647   const exception::hazelcast_& last_exception)
 
  652       boost::str(boost::format(
 
  653                    "Exception occurred while invoking operation on target %1%, " 
  654                    "choosing different target. Cause: %2%") %
 
  655                  last_target % last_exception));
 
  656     if (excluded_addresses == EMPTY_ADDRESS_LIST) {
 
  659         excluded_addresses = std::make_shared<std::unordered_set<member>>();
 
  661     excluded_addresses->insert(*last_target);
 
  662     return get_crdt_operation_target(*excluded_addresses);
 
  665 boost::future<int64_t>
 
  666 PNCounterImpl::invoke_get_internal(
 
  667   std::shared_ptr<std::unordered_set<member>> excluded_addresses,
 
  668   std::exception_ptr last_exception,
 
  669   const boost::shared_ptr<member>& target)
 
  672         if (last_exception) {
 
  673             std::rethrow_exception(last_exception);
 
  675             BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
 
  676               "ClientPNCounterProxy::invokeGetInternal",
 
  677               "Cannot invoke operations on a CRDT because the cluster does not " 
  678               "contain any data members"));
 
  682         auto timestamps = observed_clock_.get()->entry_set();
 
  683         auto request = protocol::codec::pncounter_get_encode(
 
  684           get_name(), timestamps, target->get_uuid());
 
  685         return invoke_on_member(request, target->get_uuid())
 
  687             boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
 
  689                     return get_and_update_timestamps(std::move(f));
 
  690                 } 
catch (exception::hazelcast_& e) {
 
  691                     return invoke_get_internal(excluded_addresses,
 
  692                                                std::current_exception(),
 
  693                                                try_choose_a_new_target(
 
  694                                                  excluded_addresses, target, e))
 
  698     } 
catch (exception::hazelcast_& e) {
 
  699         return invoke_get_internal(
 
  701           std::current_exception(),
 
  702           try_choose_a_new_target(excluded_addresses, target, e));
 
  706 boost::future<int64_t>
 
  707 PNCounterImpl::invoke_add_internal(
 
  709   bool getBeforeUpdate,
 
  710   std::shared_ptr<std::unordered_set<member>> excluded_addresses,
 
  711   std::exception_ptr last_exception,
 
  712   const boost::shared_ptr<member>& target)
 
  715         if (last_exception) {
 
  716             std::rethrow_exception(last_exception);
 
  718             BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
 
  719               "ClientPNCounterProxy::invokeGetInternal",
 
  720               "Cannot invoke operations on a CRDT because the cluster does not " 
  721               "contain any data members"));
 
  726         auto request = protocol::codec::pncounter_add_encode(
 
  730           observed_clock_.get()->entry_set(),
 
  732         return invoke_on_member(request, target->get_uuid())
 
  734             boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
 
  736                     return get_and_update_timestamps(std::move(f));
 
  737                 } 
catch (exception::hazelcast_& e) {
 
  738                     return invoke_add_internal(delta,
 
  741                                                std::current_exception(),
 
  742                                                try_choose_a_new_target(
 
  743                                                  excluded_addresses, target, e))
 
  747     } 
catch (exception::hazelcast_& e) {
 
  748         return invoke_add_internal(
 
  752           std::current_exception(),
 
  753           try_choose_a_new_target(excluded_addresses, target, e));
 
  758 PNCounterImpl::get_and_update_timestamps(
 
  759   boost::future<protocol::ClientMessage> f)
 
  762     auto value = msg.get_first_fixed_sized_field<int64_t>();
 
  765     update_observed_replica_timestamps(
 
  766       msg.get<impl::vector_clock::timestamp_vector>());
 
  771 PNCounterImpl::update_observed_replica_timestamps(
 
  772   const impl::vector_clock::timestamp_vector& received_logical_timestamps)
 
  774     std::shared_ptr<impl::vector_clock> received =
 
  775       to_vector_clock(received_logical_timestamps);
 
  777         std::shared_ptr<impl::vector_clock> currentClock =
 
  778           this->observed_clock_;
 
  779         if (currentClock->is_after(*received)) {
 
  782         if (observed_clock_.compare_and_set(currentClock, received)) {
 
  788 std::shared_ptr<impl::vector_clock>
 
  789 PNCounterImpl::to_vector_clock(
 
  790   const impl::vector_clock::timestamp_vector& replica_logical_timestamps)
 
  792     return std::shared_ptr<impl::vector_clock>(
 
  793       new impl::vector_clock(replica_logical_timestamps));
 
  796 boost::shared_ptr<member>
 
  797 PNCounterImpl::get_current_target_replica_address()
 
  799     return current_target_replica_address_.load();
 
  802 IListImpl::IListImpl(
const std::string& instance_name,
 
  803                      spi::ClientContext* context)
 
  804   : ProxyImpl(
"hz:impl:listService", instance_name, context)
 
  806     serialization::pimpl::data key_data =
 
  807       get_context().get_serialization_service().to_data<std::string>(
 
  809     partition_id_ = get_partition_id(key_data);
 
  813 IListImpl::remove_item_listener(boost::uuids::uuid registration_id)
 
  815     return get_context().get_client_listener_service().deregister_listener(
 
  822     auto request = protocol::codec::list_size_encode(get_name());
 
  823     return invoke_and_get_future<int>(request, partition_id_);
 
  827 IListImpl::is_empty()
 
  829     auto request = protocol::codec::list_isempty_encode(get_name());
 
  830     return invoke_and_get_future<bool>(request, partition_id_);
 
  834 IListImpl::contains(
const serialization::pimpl::data& element)
 
  836     auto request = protocol::codec::list_contains_encode(get_name(), element);
 
  837     return invoke_and_get_future<bool>(request, partition_id_);
 
  840 boost::future<std::vector<serialization::pimpl::data>>
 
  841 IListImpl::to_array_data()
 
  843     auto request = protocol::codec::list_getall_encode(get_name());
 
  844     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
  845       request, partition_id_);
 
  849 IListImpl::add(
const serialization::pimpl::data& element)
 
  851     auto request = protocol::codec::list_add_encode(get_name(), element);
 
  852     return invoke_and_get_future<bool>(request, partition_id_);
 
  856 IListImpl::remove(
const serialization::pimpl::data& element)
 
  858     auto request = protocol::codec::list_remove_encode(get_name(), element);
 
  859     return invoke_and_get_future<bool>(request, partition_id_);
 
  863 IListImpl::contains_all_data(
 
  864   const std::vector<serialization::pimpl::data>& elements)
 
  867       protocol::codec::list_containsall_encode(get_name(), elements);
 
  868     return invoke_and_get_future<bool>(request, partition_id_);
 
  872 IListImpl::add_all_data(
const std::vector<serialization::pimpl::data>& elements)
 
  874     auto request = protocol::codec::list_addall_encode(get_name(), elements);
 
  875     return invoke_and_get_future<bool>(request, partition_id_);
 
  879 IListImpl::add_all_data(
int index,
 
  880                         const std::vector<serialization::pimpl::data>& elements)
 
  883       protocol::codec::list_addallwithindex_encode(get_name(), index, elements);
 
  884     return invoke_and_get_future<bool>(request, partition_id_);
 
  888 IListImpl::remove_all_data(
 
  889   const std::vector<serialization::pimpl::data>& elements)
 
  892       protocol::codec::list_compareandremoveall_encode(get_name(), elements);
 
  893     return invoke_and_get_future<bool>(request, partition_id_);
 
  897 IListImpl::retain_all_data(
 
  898   const std::vector<serialization::pimpl::data>& elements)
 
  901       protocol::codec::list_compareandretainall_encode(get_name(), elements);
 
  902     return invoke_and_get_future<bool>(request, partition_id_);
 
  908     auto request = protocol::codec::list_clear_encode(get_name());
 
  909     return to_void_future(invoke_on_partition(request, partition_id_));
 
  912 boost::future<boost::optional<serialization::pimpl::data>>
 
  913 IListImpl::get_data(
int index)
 
  915     auto request = protocol::codec::list_get_encode(get_name(), index);
 
  916     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
  917       request, partition_id_);
 
  920 boost::future<boost::optional<serialization::pimpl::data>>
 
  921 IListImpl::set_data(
int index, 
const serialization::pimpl::data& element)
 
  923     auto request = protocol::codec::list_set_encode(get_name(), index, element);
 
  924     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
  925       request, partition_id_);
 
  929 IListImpl::add(
int index, 
const serialization::pimpl::data& element)
 
  932       protocol::codec::list_addwithindex_encode(get_name(), index, element);
 
  933     return to_void_future(invoke_on_partition(request, partition_id_));
 
  936 boost::future<boost::optional<serialization::pimpl::data>>
 
  937 IListImpl::remove_data(
int index)
 
  940       protocol::codec::list_removewithindex_encode(get_name(), index);
 
  941     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
  942       request, partition_id_);
 
  946 IListImpl::index_of(
const serialization::pimpl::data& element)
 
  948     auto request = protocol::codec::list_indexof_encode(get_name(), element);
 
  949     return invoke_and_get_future<int>(request, partition_id_);
 
  953 IListImpl::last_index_of(
const serialization::pimpl::data& element)
 
  956       protocol::codec::list_lastindexof_encode(get_name(), element);
 
  957     return invoke_and_get_future<int>(request, partition_id_);
 
  960 boost::future<std::vector<serialization::pimpl::data>>
 
  961 IListImpl::sub_list_data(
int from_index, 
int to_index)
 
  964       protocol::codec::list_sub_encode(get_name(), from_index, to_index);
 
  965     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
  966       request, partition_id_);
 
  969 std::shared_ptr<spi::impl::ListenerMessageCodec>
 
  970 IListImpl::create_item_listener_codec(
bool include_value)
 
  972     return std::shared_ptr<spi::impl::ListenerMessageCodec>(
 
  973       new ListListenerMessageCodec(get_name(), include_value));
 
  976 IListImpl::ListListenerMessageCodec::ListListenerMessageCodec(
 
  979   : name_(std::move(name))
 
  980   , include_value_(include_value)
 
  983 protocol::ClientMessage
 
  984 IListImpl::ListListenerMessageCodec::encode_add_request(
bool local_only)
 const 
  986     return protocol::codec::list_addlistener_encode(
 
  987       name_, include_value_, local_only);
 
  990 protocol::ClientMessage
 
  991 IListImpl::ListListenerMessageCodec::encode_remove_request(
 
  992   boost::uuids::uuid real_registration_id)
 const 
  994     return protocol::codec::list_removelistener_encode(name_,
 
  995                                                        real_registration_id);
 
  998 flake_id_generator_impl::Block::Block(IdBatch&& id_batch,
 
  999                                       std::chrono::milliseconds validity)
 
 1000   : id_batch_(id_batch)
 
 1001   , invalid_since_(std::chrono::steady_clock::now() + validity)
 
 1006 flake_id_generator_impl::Block::next()
 
 1008     if (invalid_since_ <= std::chrono::steady_clock::now()) {
 
 1013         index = num_returned_;
 
 1014         if (index == id_batch_.get_batch_size()) {
 
 1017     } 
while (!num_returned_.compare_exchange_strong(index, index + 1));
 
 1019     return id_batch_.get_base() + index * id_batch_.get_increment();
 
 1022 flake_id_generator_impl::IdBatch::IdIterator
 
 1023   flake_id_generator_impl::IdBatch::endOfBatch;
 
 1026 flake_id_generator_impl::IdBatch::get_base()
 const 
 1032 flake_id_generator_impl::IdBatch::get_increment()
 const 
 1038 flake_id_generator_impl::IdBatch::get_batch_size()
 const 
 1043 flake_id_generator_impl::IdBatch::IdBatch(int64_t base,
 
 1047   , increment_(increment)
 
 1048   , batch_size_(batch_size)
 
 1051 flake_id_generator_impl::IdBatch::IdIterator&
 
 1052 flake_id_generator_impl::IdBatch::end()
 
 1057 flake_id_generator_impl::IdBatch::IdIterator
 
 1058 flake_id_generator_impl::IdBatch::iterator()
 
 1060     return flake_id_generator_impl::IdBatch::IdIterator(
 
 1061       base_, increment_, batch_size_);
 
 1064 flake_id_generator_impl::IdBatch::IdIterator::IdIterator(
 
 1066   const int64_t increment,
 
 1069   , increment_(increment)
 
 1070   , remaining_(remaining)
 
 1074 flake_id_generator_impl::IdBatch::IdIterator::operator==(
 
 1075   const flake_id_generator_impl::IdBatch::IdIterator& rhs)
 const 
 1077     return base2_ == rhs.base2_ && increment_ == rhs.increment_ &&
 
 1078            remaining_ == rhs.remaining_;
 
 1082 flake_id_generator_impl::IdBatch::IdIterator::operator!=(
 
 1083   const flake_id_generator_impl::IdBatch::IdIterator& rhs)
 const 
 1085     return !(rhs == *
this);
 
 1088 flake_id_generator_impl::IdBatch::IdIterator::IdIterator()
 
 1094 flake_id_generator_impl::IdBatch::IdIterator&
 
 1095 flake_id_generator_impl::IdBatch::IdIterator::operator++()
 
 1097     if (remaining_ == 0) {
 
 1098         return flake_id_generator_impl::IdBatch::end();
 
 1103     base2_ += increment_;
 
 1108 flake_id_generator_impl::flake_id_generator_impl(
 
 1109   const std::string& service_name,
 
 1110   const std::string& object_name,
 
 1111   spi::ClientContext* context)
 
 1112   : ProxyImpl(service_name, object_name, context)
 
 1116       context->get_client_config().find_flake_id_generator_config(object_name);
 
 1117     batch_size_ = config->get_prefetch_count();
 
 1118     validity_ = config->get_prefetch_validity_duration();
 
 1122 flake_id_generator_impl::new_id_internal()
 
 1124     auto b = block_.load();
 
 1126         int64_t res = b->next();
 
 1127         if (res != INT64_MIN) {
 
 1132     throw std::overflow_error(
"");
 
 1135 boost::future<int64_t>
 
 1136 flake_id_generator_impl::new_id()
 
 1139         return boost::make_ready_future(new_id_internal());
 
 1140     } 
catch (std::overflow_error&) {
 
 1141         return new_id_batch(batch_size_)
 
 1142           .then(boost::launch::sync,
 
 1143                 [=](boost::future<flake_id_generator_impl::IdBatch> f) {
 
 1145                       boost::make_shared<Block>(f.get(), validity_);
 
 1146                     auto value = newBlock->next();
 
 1147                     auto b = block_.load();
 
 1148                     block_.compare_exchange_strong(b, newBlock);
 
 1154 boost::future<flake_id_generator_impl::IdBatch>
 
 1155 flake_id_generator_impl::new_id_batch(int32_t size)
 
 1158       protocol::codec::flakeidgenerator_newidbatch_encode(get_name(), size);
 
 1159     return invoke(request).then(
 
 1160       boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
 
 1162           msg.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN);
 
 1164           auto base = msg.get<int64_t>();
 
 1165           auto increment = msg.get<int64_t>();
 
 1166           auto batch_size = msg.get<int32_t>();
 
 1167           return flake_id_generator_impl::IdBatch(base, increment, batch_size);
 
 1171 IQueueImpl::IQueueImpl(
const std::string& instance_name,
 
 1172                        spi::ClientContext* context)
 
 1173   : ProxyImpl(
"hz:impl:queueService", instance_name, context)
 
 1175     serialization::pimpl::data data =
 
 1176       get_context().get_serialization_service().to_data<std::string>(
 
 1178     partition_id_ = get_partition_id(data);
 
 1182 IQueueImpl::remove_item_listener(boost::uuids::uuid registration_id)
 
 1184     return get_context().get_client_listener_service().deregister_listener(
 
 1189 IQueueImpl::offer(
const serialization::pimpl::data& element,
 
 1190                   std::chrono::milliseconds timeout)
 
 1192     auto request = protocol::codec::queue_offer_encode(
 
 1195       std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
 
 1196     return invoke_and_get_future<bool>(request, partition_id_);
 
 1200 IQueueImpl::put(
const serialization::pimpl::data& element)
 
 1202     auto request = protocol::codec::queue_put_encode(get_name(), element);
 
 1203     return to_void_future(invoke_on_partition(request, partition_id_));
 
 1206 boost::future<boost::optional<serialization::pimpl::data>>
 
 1207 IQueueImpl::poll_data(std::chrono::milliseconds timeout)
 
 1209     auto request = protocol::codec::queue_poll_encode(
 
 1211       std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
 
 1212     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
 1213       request, partition_id_);
 
 1217 IQueueImpl::remaining_capacity()
 
 1219     auto request = protocol::codec::queue_remainingcapacity_encode(get_name());
 
 1220     return invoke_and_get_future<int>(request, partition_id_);
 
 1224 IQueueImpl::remove(
const serialization::pimpl::data& element)
 
 1226     auto request = protocol::codec::queue_remove_encode(get_name(), element);
 
 1227     return invoke_and_get_future<bool>(request, partition_id_);
 
 1231 IQueueImpl::contains(
const serialization::pimpl::data& element)
 
 1233     auto request = protocol::codec::queue_contains_encode(get_name(), element);
 
 1234     return invoke_and_get_future<bool>(request, partition_id_);
 
 1237 boost::future<std::vector<serialization::pimpl::data>>
 
 1238 IQueueImpl::drain_to_data(
size_t max_elements)
 
 1240     auto request = protocol::codec::queue_draintomaxsize_encode(
 
 1241       get_name(), (int32_t)max_elements);
 
 1243     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
 1244       request, partition_id_);
 
 1247 boost::future<std::vector<serialization::pimpl::data>>
 
 1248 IQueueImpl::drain_to_data()
 
 1250     auto request = protocol::codec::queue_drainto_encode(get_name());
 
 1251     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
 1252       request, partition_id_);
 
 1255 boost::future<boost::optional<serialization::pimpl::data>>
 
 1256 IQueueImpl::take_data()
 
 1258     auto request = protocol::codec::queue_take_encode(get_name());
 
 1259     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
 1260       request, partition_id_);
 
 1263 boost::future<boost::optional<serialization::pimpl::data>>
 
 1264 IQueueImpl::peek_data()
 
 1266     auto request = protocol::codec::queue_peek_encode(get_name());
 
 1267     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
 1268       request, partition_id_);
 
 1274     auto request = protocol::codec::queue_size_encode(get_name());
 
 1275     return invoke_and_get_future<int>(request, partition_id_);
 
 1279 IQueueImpl::is_empty()
 
 1281     auto request = protocol::codec::queue_isempty_encode(get_name());
 
 1282     return invoke_and_get_future<bool>(request, partition_id_);
 
 1285 boost::future<std::vector<serialization::pimpl::data>>
 
 1286 IQueueImpl::to_array_data()
 
 1288     auto request = protocol::codec::queue_iterator_encode(get_name());
 
 1289     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
 1290       request, partition_id_);
 
 1294 IQueueImpl::contains_all_data(
 
 1295   const std::vector<serialization::pimpl::data>& elements)
 
 1298       protocol::codec::queue_containsall_encode(get_name(), elements);
 
 1299     return invoke_and_get_future<bool>(request, partition_id_);
 
 1303 IQueueImpl::add_all_data(
 
 1304   const std::vector<serialization::pimpl::data>& elements)
 
 1306     auto request = protocol::codec::queue_addall_encode(get_name(), elements);
 
 1307     return invoke_and_get_future<bool>(request, partition_id_);
 
 1311 IQueueImpl::remove_all_data(
 
 1312   const std::vector<serialization::pimpl::data>& elements)
 
 1315       protocol::codec::queue_compareandremoveall_encode(get_name(), elements);
 
 1316     return invoke_and_get_future<bool>(request, partition_id_);
 
 1320 IQueueImpl::retain_all_data(
 
 1321   const std::vector<serialization::pimpl::data>& elements)
 
 1324       protocol::codec::queue_compareandretainall_encode(get_name(), elements);
 
 1325     return invoke_and_get_future<bool>(request, partition_id_);
 
 1331     auto request = protocol::codec::queue_clear_encode(get_name());
 
 1332     return to_void_future(invoke_on_partition(request, partition_id_));
 
 1335 std::shared_ptr<spi::impl::ListenerMessageCodec>
 
 1336 IQueueImpl::create_item_listener_codec(
bool include_value)
 
 1338     return std::shared_ptr<spi::impl::ListenerMessageCodec>(
 
 1339       new QueueListenerMessageCodec(get_name(), include_value));
 
 1342 IQueueImpl::QueueListenerMessageCodec::QueueListenerMessageCodec(
 
 1345   : name_(std::move(name))
 
 1346   , include_value_(include_value)
 
 1349 protocol::ClientMessage
 
 1350 IQueueImpl::QueueListenerMessageCodec::encode_add_request(
bool local_only)
 const 
 1352     return protocol::codec::queue_addlistener_encode(
 
 1353       name_, include_value_, local_only);
 
 1356 protocol::ClientMessage
 
 1357 IQueueImpl::QueueListenerMessageCodec::encode_remove_request(
 
 1358   boost::uuids::uuid real_registration_id)
 const 
 1360     return protocol::codec::queue_removelistener_encode(name_,
 
 1361                                                         real_registration_id);
 
 1364 ProxyImpl::ProxyImpl(
const std::string& service_name,
 
 1365                      const std::string& object_name,
 
 1366                      spi::ClientContext* context)
 
 1367   : ClientProxy(object_name, service_name, *context)
 
 1368   , SerializingProxy(*context, object_name)
 
 1371 ProxyImpl::~ProxyImpl() = 
default;
 
 1373 SerializingProxy::SerializingProxy(spi::ClientContext& context,
 
 1374                                    const std::string& object_name)
 
 1375   : serialization_service_(context.get_serialization_service())
 
 1376   , partition_service_(context.get_partition_service())
 
 1377   , object_name_(object_name)
 
 1378   , client_context_(context)
 
 1382 SerializingProxy::get_partition_id(
const serialization::pimpl::data& key)
 
 1384     return partition_service_.get_partition_id(key);
 
 1387 boost::future<protocol::ClientMessage>
 
 1388 SerializingProxy::invoke_on_partition(protocol::ClientMessage& request,
 
 1392         return spi::impl::ClientInvocation::create(
 
 1394                  std::make_shared<protocol::ClientMessage>(std::move(request)),
 
 1398     } 
catch (exception::iexception&) {
 
 1399         util::exception_util::rethrow(std::current_exception());
 
 1400         return boost::make_ready_future(protocol::ClientMessage(0));
 
 1404 boost::future<protocol::ClientMessage>
 
 1405 SerializingProxy::invoke(protocol::ClientMessage& request)
 
 1408         return spi::impl::ClientInvocation::create(
 
 1410                  std::make_shared<protocol::ClientMessage>(std::move(request)),
 
 1413     } 
catch (exception::iexception&) {
 
 1414         util::exception_util::rethrow(std::current_exception());
 
 1415         return boost::make_ready_future(protocol::ClientMessage(0));
 
 1419 boost::future<protocol::ClientMessage>
 
 1420 SerializingProxy::invoke_on_connection(
 
 1421   protocol::ClientMessage& request,
 
 1422   std::shared_ptr<connection::Connection> connection)
 
 1425         return spi::impl::ClientInvocation::create(
 
 1427                  std::make_shared<protocol::ClientMessage>(std::move(request)),
 
 1431     } 
catch (exception::iexception&) {
 
 1432         util::exception_util::rethrow(std::current_exception());
 
 1433         return boost::make_ready_future(protocol::ClientMessage(0));
 
 1437 boost::future<protocol::ClientMessage>
 
 1438 SerializingProxy::invoke_on_key_owner(
 
 1439   protocol::ClientMessage& request,
 
 1440   const serialization::pimpl::data& key_data)
 
 1443         return invoke_on_partition(request, get_partition_id(key_data));
 
 1444     } 
catch (exception::iexception&) {
 
 1445         util::exception_util::rethrow(std::current_exception());
 
 1446         return boost::make_ready_future(protocol::ClientMessage(0));
 
 1450 boost::future<protocol::ClientMessage>
 
 1451 SerializingProxy::invoke_on_member(protocol::ClientMessage& request,
 
 1452                                    boost::uuids::uuid uuid)
 
 1455         auto invocation = spi::impl::ClientInvocation::create(
 
 1457           std::make_shared<protocol::ClientMessage>(std::move(request)),
 
 1460         return invocation->invoke();
 
 1461     } 
catch (exception::iexception&) {
 
 1462         util::exception_util::rethrow(std::current_exception());
 
 1463         return boost::make_ready_future(protocol::ClientMessage(0));
 
 1468 boost::future<boost::optional<serialization::pimpl::data>>
 
 1469 SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request)
 
 1471     return decode_optional_var_sized<serialization::pimpl::data>(
 
 1476 boost::future<boost::optional<map::data_entry_view>>
 
 1477 SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
 
 1478                                         const serialization::pimpl::data& key)
 
 1480     return decode_optional_var_sized<map::data_entry_view>(
 
 1481       invoke_on_key_owner(request, key));
 
 1485 boost::future<boost::optional<serialization::pimpl::data>>
 
 1486 SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
 
 1489     return decode_optional_var_sized<serialization::pimpl::data>(
 
 1490       invoke_on_partition(request, partition_id));
 
 1494 boost::future<boost::optional<serialization::pimpl::data>>
 
 1495 SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
 
 1496                                         const serialization::pimpl::data& key)
 
 1498     return decode_optional_var_sized<serialization::pimpl::data>(
 
 1499       invoke_on_key_owner(request, key));
 
 1502 PartitionSpecificClientProxy::PartitionSpecificClientProxy(
 
 1503   const std::string& service_name,
 
 1504   const std::string& object_name,
 
 1505   spi::ClientContext* context)
 
 1506   : ProxyImpl(service_name, object_name, context)
 
 1511 PartitionSpecificClientProxy::on_initialize()
 
 1513     std::string partitionKey = internal::partition::strategy::
 
 1514       StringPartitioningStrategy::get_partition_key(name_);
 
 1515     partition_id_ = get_context().get_partition_service().get_partition_id(
 
 1516       to_data<std::string>(partitionKey));
 
 1519 IMapImpl::IMapImpl(
const std::string& instance_name,
 
 1520                    spi::ClientContext* context)
 
 1521   : ProxyImpl(
"hz:impl:mapService", instance_name, context)
 
 1525 IMapImpl::contains_key(
const serialization::pimpl::data& key)
 
 1527     auto request = protocol::codec::map_containskey_encode(
 
 1528       get_name(), key, util::get_current_thread_id());
 
 1529     return invoke_and_get_future<bool>(request, key);
 
 1533 IMapImpl::contains_value(
const serialization::pimpl::data& value)
 
 1535     auto request = protocol::codec::map_containsvalue_encode(get_name(), value);
 
 1536     return invoke_and_get_future<bool>(request);
 
 1539 boost::future<boost::optional<serialization::pimpl::data>>
 
 1540 IMapImpl::get_data(
const serialization::pimpl::data& key)
 
 1542     auto request = protocol::codec::map_get_encode(
 
 1543       get_name(), key, util::get_current_thread_id());
 
 1544     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
 1548 boost::future<boost::optional<serialization::pimpl::data>>
 
 1549 IMapImpl::remove_data(
const serialization::pimpl::data& key)
 
 1551     auto request = protocol::codec::map_remove_encode(
 
 1552       get_name(), key, util::get_current_thread_id());
 
 1553     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
 1558 IMapImpl::remove(
const serialization::pimpl::data& key,
 
 1559                  const serialization::pimpl::data& value)
 
 1561     auto request = protocol::codec::map_removeifsame_encode(
 
 1562       get_name(), key, value, util::get_current_thread_id());
 
 1563     return invoke_and_get_future<bool>(request, key);
 
 1566 boost::future<protocol::ClientMessage>
 
 1567 IMapImpl::remove_all(
const serialization::pimpl::data& predicate_data)
 
 1570       protocol::codec::map_removeall_encode(get_name(), predicate_data);
 
 1571     return invoke(request);
 
 1574 boost::future<protocol::ClientMessage>
 
 1575 IMapImpl::delete_entry(
const serialization::pimpl::data& key)
 
 1577     auto request = protocol::codec::map_delete_encode(
 
 1578       get_name(), key, util::get_current_thread_id());
 
 1579     return invoke_on_partition(request, get_partition_id(key));
 
 1582 boost::future<protocol::ClientMessage>
 
 1585     auto request = protocol::codec::map_flush_encode(get_name());
 
 1586     return invoke(request);
 
 1590 IMapImpl::try_remove(
const serialization::pimpl::data& key,
 
 1591                      std::chrono::milliseconds timeout)
 
 1593     auto request = protocol::codec::map_tryremove_encode(
 
 1596       util::get_current_thread_id(),
 
 1597       std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
 
 1599     return invoke_and_get_future<bool>(request, key);
 
 1603 IMapImpl::try_put(
const serialization::pimpl::data& key,
 
 1604                   const serialization::pimpl::data& value,
 
 1605                   std::chrono::milliseconds timeout)
 
 1607     auto request = protocol::codec::map_tryput_encode(
 
 1611       util::get_current_thread_id(),
 
 1612       std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
 
 1614     return invoke_and_get_future<bool>(request, key);
 
 1617 boost::future<boost::optional<serialization::pimpl::data>>
 
 1618 IMapImpl::put_data(
const serialization::pimpl::data& key,
 
 1619                    const serialization::pimpl::data& value,
 
 1620                    std::chrono::milliseconds ttl)
 
 1622     auto request = protocol::codec::map_put_encode(
 
 1626       util::get_current_thread_id(),
 
 1627       std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
 
 1628     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
 1632 boost::future<protocol::ClientMessage>
 
 1633 IMapImpl::put_transient(
const serialization::pimpl::data& key,
 
 1634                         const serialization::pimpl::data& value,
 
 1635                         std::chrono::milliseconds ttl)
 
 1637     auto request = protocol::codec::map_puttransient_encode(
 
 1641       util::get_current_thread_id(),
 
 1642       std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
 
 1643     return invoke_on_partition(request, get_partition_id(key));
 
 1646 boost::future<boost::optional<serialization::pimpl::data>>
 
 1647 IMapImpl::put_if_absent_data(
const serialization::pimpl::data& key,
 
 1648                              const serialization::pimpl::data& value,
 
 1649                              std::chrono::milliseconds ttl)
 
 1651     auto request = protocol::codec::map_putifabsent_encode(
 
 1655       util::get_current_thread_id(),
 
 1656       std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
 
 1657     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
 1662 IMapImpl::replace(
const serialization::pimpl::data& key,
 
 1663                   const serialization::pimpl::data& old_value,
 
 1664                   const serialization::pimpl::data& new_value)
 
 1666     auto request = protocol::codec::map_replaceifsame_encode(
 
 1667       get_name(), key, old_value, new_value, util::get_current_thread_id());
 
 1669     return invoke_and_get_future<bool>(request, key);
 
 1672 boost::future<boost::optional<serialization::pimpl::data>>
 
 1673 IMapImpl::replace_data(
const serialization::pimpl::data& key,
 
 1674                        const serialization::pimpl::data& value)
 
 1676     auto request = protocol::codec::map_replace_encode(
 
 1677       get_name(), key, value, util::get_current_thread_id());
 
 1679     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
 1683 boost::future<protocol::ClientMessage>
 
 1684 IMapImpl::set(
const serialization::pimpl::data& key,
 
 1685               const serialization::pimpl::data& value,
 
 1686               std::chrono::milliseconds ttl)
 
 1688     auto request = protocol::codec::map_set_encode(
 
 1692       util::get_current_thread_id(),
 
 1693       std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
 
 1694     return invoke_on_partition(request, get_partition_id(key));
 
 1697 boost::future<protocol::ClientMessage>
 
 1698 IMapImpl::lock(
const serialization::pimpl::data& key)
 
 1700     return lock(key, std::chrono::milliseconds(-1));
 
 1703 boost::future<protocol::ClientMessage>
 
 1704 IMapImpl::lock(
const serialization::pimpl::data& key,
 
 1705                std::chrono::milliseconds lease_time)
 
 1707     auto request = protocol::codec::map_lock_encode(
 
 1710       util::get_current_thread_id(),
 
 1711       std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
 
 1712       lock_reference_id_generator_->get_next_reference_id());
 
 1713     return invoke_on_partition(request, get_partition_id(key));
 
 1717 IMapImpl::is_locked(
const serialization::pimpl::data& key)
 
 1719     auto request = protocol::codec::map_islocked_encode(get_name(), key);
 
 1721     return invoke_and_get_future<bool>(request, key);
 
 1725 IMapImpl::try_lock(
const serialization::pimpl::data& key,
 
 1726                    std::chrono::milliseconds timeout)
 
 1728     return try_lock(key, timeout, std::chrono::milliseconds(-1));
 
 1732 IMapImpl::try_lock(
const serialization::pimpl::data& key,
 
 1733                    std::chrono::milliseconds timeout,
 
 1734                    std::chrono::milliseconds lease_time)
 
 1736     auto request = protocol::codec::map_trylock_encode(
 
 1739       util::get_current_thread_id(),
 
 1740       std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
 
 1741       std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
 
 1742       lock_reference_id_generator_->get_next_reference_id());
 
 1743     return invoke_and_get_future<bool>(request, key);
 
 1746 boost::future<protocol::ClientMessage>
 
 1747 IMapImpl::unlock(
const serialization::pimpl::data& key)
 
 1749     auto request = protocol::codec::map_unlock_encode(
 
 1752       util::get_current_thread_id(),
 
 1753       lock_reference_id_generator_->get_next_reference_id());
 
 1754     return invoke_on_partition(request, get_partition_id(key));
 
 1757 boost::future<protocol::ClientMessage>
 
 1758 IMapImpl::force_unlock(
const serialization::pimpl::data& key)
 
 1760     auto request = protocol::codec::map_forceunlock_encode(
 
 1761       get_name(), key, lock_reference_id_generator_->get_next_reference_id());
 
 1762     return invoke_on_partition(request, get_partition_id(key));
 
 1765 boost::future<std::string>
 
 1766 IMapImpl::add_interceptor(
const serialization::pimpl::data& interceptor)
 
 1769       protocol::codec::map_addinterceptor_encode(get_name(), interceptor);
 
 1770     return invoke_and_get_future<std::string>(request);
 
 1775 boost::future<boost::uuids::uuid>
 
 1776 IMapImpl::add_entry_listener(
 
 1777   std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
 
 1779   int32_t listener_flags)
 
 1781     return register_listener(
 
 1782       create_map_entry_listener_codec(include_value, listener_flags),
 
 1783       std::move(entry_event_handler));
 
 1786 boost::future<boost::uuids::uuid>
 
 1787 IMapImpl::add_entry_listener(
 
 1788   std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
 
 1789   serialization::pimpl::data&& predicate,
 
 1791   int32_t listener_flags)
 
 1793     return register_listener(
 
 1794       create_map_entry_listener_codec(
 
 1795         include_value, std::move(predicate), listener_flags),
 
 1796       std::move(entry_event_handler));
 
 1800 IMapImpl::remove_entry_listener(boost::uuids::uuid registration_id)
 
 1802     return get_context().get_client_listener_service().deregister_listener(
 
 1806 boost::future<boost::uuids::uuid>
 
 1807 IMapImpl::add_entry_listener(
 
 1808   std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
 
 1810   serialization::pimpl::data&& key,
 
 1811   int32_t listener_flags)
 
 1813     return register_listener(create_map_entry_listener_codec(
 
 1814                                include_value, listener_flags, std::move(key)),
 
 1815                              std::move(entry_event_handler));
 
 1818 boost::future<boost::optional<map::data_entry_view>>
 
 1819 IMapImpl::get_entry_view_data(
const serialization::pimpl::data& key)
 
 1821     auto request = protocol::codec::map_getentryview_encode(
 
 1822       get_name(), key, util::get_current_thread_id());
 
 1823     return invoke_and_get_future<boost::optional<map::data_entry_view>>(request,
 
 1828 IMapImpl::evict(
const serialization::pimpl::data& key)
 
 1830     auto request = protocol::codec::map_evict_encode(
 
 1831       get_name(), key, util::get_current_thread_id());
 
 1832     return invoke_and_get_future<bool>(request, key);
 
 1835 boost::future<protocol::ClientMessage>
 
 1836 IMapImpl::evict_all()
 
 1838     auto request = protocol::codec::map_evictall_encode(get_name());
 
 1839     return invoke(request);
 
 1842 boost::future<EntryVector>
 
 1843 IMapImpl::get_all_data(
int partition_id,
 
 1844                        const std::vector<serialization::pimpl::data>& keys)
 
 1846     auto request = protocol::codec::map_getall_encode(get_name(), keys);
 
 1847     return invoke_and_get_future<EntryVector>(request, partition_id);
 
 1850 boost::future<std::vector<serialization::pimpl::data>>
 
 1851 IMapImpl::key_set_data()
 
 1853     auto request = protocol::codec::map_keyset_encode(get_name());
 
 1854     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
 1858 boost::future<std::vector<serialization::pimpl::data>>
 
 1859 IMapImpl::key_set_data(
const serialization::pimpl::data& predicate)
 
 1862       protocol::codec::map_keysetwithpredicate_encode(get_name(), predicate);
 
 1863     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
 1868   std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
 
 1869 IMapImpl::key_set_for_paging_predicate_data(
 
 1870   protocol::codec::holder::paging_predicate_holder 
const& predicate)
 
 1872     auto request = protocol::codec::map_keysetwithpagingpredicate_encode(
 
 1873       get_name(), predicate);
 
 1874     return invoke(request).then(
 
 1875       boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
 
 1876           return get_paging_predicate_response<
 
 1877             std::vector<serialization::pimpl::data>>(std::move(f));
 
 1881 boost::future<EntryVector>
 
 1882 IMapImpl::entry_set_data()
 
 1884     auto request = protocol::codec::map_entryset_encode(get_name());
 
 1885     return invoke_and_get_future<EntryVector>(request);
 
 1888 boost::future<EntryVector>
 
 1889 IMapImpl::entry_set_data(
const serialization::pimpl::data& predicate)
 
 1892       protocol::codec::map_entrieswithpredicate_encode(get_name(), predicate);
 
 1893     return invoke_and_get_future<EntryVector>(request);
 
 1896 boost::future<std::pair<EntryVector, query::anchor_data_list>>
 
 1897 IMapImpl::entry_set_for_paging_predicate_data(
 
 1898   protocol::codec::holder::paging_predicate_holder 
const& predicate)
 
 1900     auto request = protocol::codec::map_entrieswithpagingpredicate_encode(
 
 1901       get_name(), predicate);
 
 1902     return invoke(request).then(
 
 1903       boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
 
 1904           return get_paging_predicate_response<EntryVector>(std::move(f));
 
 1908 boost::future<std::vector<serialization::pimpl::data>>
 
 1909 IMapImpl::values_data()
 
 1911     auto request = protocol::codec::map_values_encode(get_name());
 
 1912     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
 1916 boost::future<std::vector<serialization::pimpl::data>>
 
 1917 IMapImpl::values_data(
const serialization::pimpl::data& predicate)
 
 1920       protocol::codec::map_valueswithpredicate_encode(get_name(), predicate);
 
 1921     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
 1926   std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
 
 1927 IMapImpl::values_for_paging_predicate_data(
 
 1928   protocol::codec::holder::paging_predicate_holder 
const& predicate)
 
 1930     auto request = protocol::codec::map_valueswithpagingpredicate_encode(
 
 1931       get_name(), predicate);
 
 1932     return invoke(request).then(
 
 1933       boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
 
 1934           return get_paging_predicate_response<
 
 1935             std::vector<serialization::pimpl::data>>(std::move(f));
 
 1939 boost::future<protocol::ClientMessage>
 
 1940 IMapImpl::add_index_data(
const config::index_config& config)
 
 1942     auto request = protocol::codec::map_addindex_encode(get_name(), config);
 
 1943     return invoke(request);
 
 1949     auto request = protocol::codec::map_size_encode(get_name());
 
 1950     return invoke_and_get_future<int>(request);
 
 1954 IMapImpl::is_empty()
 
 1956     auto request = protocol::codec::map_isempty_encode(get_name());
 
 1957     return invoke_and_get_future<bool>(request);
 
 1960 boost::future<protocol::ClientMessage>
 
 1961 IMapImpl::put_all_data(
int partition_id, 
const EntryVector& entries)
 
 1964       protocol::codec::map_putall_encode(get_name(), entries, 
true);
 
 1965     return invoke_on_partition(request, partition_id);
 
 1968 boost::future<protocol::ClientMessage>
 
 1969 IMapImpl::clear_data()
 
 1971     auto request = protocol::codec::map_clear_encode(get_name());
 
 1972     return invoke(request);
 
 1975 boost::future<boost::optional<serialization::pimpl::data>>
 
 1976 IMapImpl::execute_on_key_data(
const serialization::pimpl::data& key,
 
 1977                               const serialization::pimpl::data& processor)
 
 1979     auto request = protocol::codec::map_executeonkey_encode(
 
 1980       get_name(), processor, key, util::get_current_thread_id());
 
 1981     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
 1982       request, get_partition_id(key));
 
 1985 boost::future<boost::optional<serialization::pimpl::data>>
 
 1986 IMapImpl::submit_to_key_data(
const serialization::pimpl::data& key,
 
 1987                              const serialization::pimpl::data& processor)
 
 1989     auto request = protocol::codec::map_submittokey_encode(
 
 1990       get_name(), processor, key, util::get_current_thread_id());
 
 1991     return invoke_on_partition(request, get_partition_id(key))
 
 1992       .then(boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
 
 1995           return msg.get_nullable<serialization::pimpl::data>();
 
 1999 boost::future<EntryVector>
 
 2000 IMapImpl::execute_on_keys_data(
 
 2001   const std::vector<serialization::pimpl::data>& keys,
 
 2002   const serialization::pimpl::data& processor)
 
 2005       protocol::codec::map_executeonkeys_encode(get_name(), processor, keys);
 
 2006     return invoke_and_get_future<EntryVector>(request);
 
 2009 boost::future<protocol::ClientMessage>
 
 2010 IMapImpl::remove_interceptor(
const std::string& 
id)
 
 2013       protocol::codec::map_removeinterceptor_encode(get_name(), 
id);
 
 2014     return invoke(request);
 
 2017 boost::future<EntryVector>
 
 2018 IMapImpl::execute_on_entries_data(
 
 2019   const serialization::pimpl::data& entry_processor)
 
 2022       protocol::codec::map_executeonallkeys_encode(get_name(), entry_processor);
 
 2023     return invoke_and_get_future<EntryVector>(request);
 
 2026 boost::future<EntryVector>
 
 2027 IMapImpl::execute_on_entries_data(
 
 2028   const serialization::pimpl::data& entry_processor,
 
 2029   const serialization::pimpl::data& predicate)
 
 2031     auto request = protocol::codec::map_executewithpredicate_encode(
 
 2032       get_name(), entry_processor, predicate);
 
 2033     return invoke_and_get_future<EntryVector>(request);
 
 2036 std::shared_ptr<spi::impl::ListenerMessageCodec>
 
 2037 IMapImpl::create_map_entry_listener_codec(
 
 2039   serialization::pimpl::data&& predicate,
 
 2040   int32_t listener_flags)
 
 2042     return std::shared_ptr<spi::impl::ListenerMessageCodec>(
 
 2043       new MapEntryListenerWithPredicateMessageCodec(
 
 2044         get_name(), include_value, listener_flags, std::move(predicate)));
 
 2047 std::shared_ptr<spi::impl::ListenerMessageCodec>
 
 2048 IMapImpl::create_map_entry_listener_codec(
bool include_value,
 
 2049                                           int32_t listener_flags)
 
 2051     return std::shared_ptr<spi::impl::ListenerMessageCodec>(
 
 2052       new MapEntryListenerMessageCodec(
 
 2053         get_name(), include_value, listener_flags));
 
 2056 std::shared_ptr<spi::impl::ListenerMessageCodec>
 
 2057 IMapImpl::create_map_entry_listener_codec(
bool include_value,
 
 2058                                           int32_t listener_flags,
 
 2059                                           serialization::pimpl::data&& key)
 
 2061     return std::shared_ptr<spi::impl::ListenerMessageCodec>(
 
 2062       new MapEntryListenerToKeyCodec(
 
 2063         get_name(), include_value, listener_flags, std::move(key)));
 
 2067 IMapImpl::on_initialize()
 
 2069     ProxyImpl::on_initialize();
 
 2070     lock_reference_id_generator_ =
 
 2071       get_context().get_lock_reference_id_generator();
 
 2074 IMapImpl::MapEntryListenerMessageCodec::MapEntryListenerMessageCodec(
 
 2077   int32_t listener_flags)
 
 2078   : name_(std::move(name))
 
 2079   , include_value_(include_value)
 
 2080   , listener_flags_(listener_flags)
 
 2083 protocol::ClientMessage
 
 2084 IMapImpl::MapEntryListenerMessageCodec::encode_add_request(
 
 2085   bool local_only)
 const 
 2087     return protocol::codec::map_addentrylistener_encode(
 
 2088       name_, include_value_, 
static_cast<int32_t
>(listener_flags_), local_only);
 
 2091 protocol::ClientMessage
 
 2092 IMapImpl::MapEntryListenerMessageCodec::encode_remove_request(
 
 2093   boost::uuids::uuid real_registration_id)
 const 
 2095     return protocol::codec::map_removeentrylistener_encode(
 
 2096       name_, real_registration_id);
 
 2099 protocol::ClientMessage
 
 2100 IMapImpl::MapEntryListenerToKeyCodec::encode_add_request(
bool local_only)
 const 
 2102     return protocol::codec::map_addentrylistenertokey_encode(
 
 2106       static_cast<int32_t
>(listener_flags_),
 
 2110 protocol::ClientMessage
 
 2111 IMapImpl::MapEntryListenerToKeyCodec::encode_remove_request(
 
 2112   boost::uuids::uuid real_registration_id)
 const 
 2114     return protocol::codec::map_removeentrylistener_encode(
 
 2115       name_, real_registration_id);
 
 2118 IMapImpl::MapEntryListenerToKeyCodec::MapEntryListenerToKeyCodec(
 
 2121   int32_t listener_flags,
 
 2122   serialization::pimpl::data key)
 
 2123   : name_(std::move(name))
 
 2124   , include_value_(include_value)
 
 2125   , listener_flags_(listener_flags)
 
 2126   , key_(std::move(key))
 
 2129 IMapImpl::MapEntryListenerWithPredicateMessageCodec::
 
 2130   MapEntryListenerWithPredicateMessageCodec(
 
 2133     int32_t listener_flags,
 
 2134     serialization::pimpl::data&& predicate)
 
 2135   : name_(std::move(name))
 
 2136   , include_value_(include_value)
 
 2137   , listener_flags_(listener_flags)
 
 2138   , predicate_(std::move(predicate))
 
 2141 protocol::ClientMessage
 
 2142 IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_add_request(
 
 2143   bool local_only)
 const 
 2145     return protocol::codec::map_addentrylistenerwithpredicate_encode(
 
 2149       static_cast<int32_t
>(listener_flags_),
 
 2153 protocol::ClientMessage
 
 2154 IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_remove_request(
 
 2155   boost::uuids::uuid real_registration_id)
 const 
 2157     return protocol::codec::map_removeentrylistener_encode(
 
 2158       name_, real_registration_id);
 
 2161 TransactionalQueueImpl::TransactionalQueueImpl(
 
 2162   const std::string& name,
 
 2163   txn::TransactionProxy& transaction_proxy)
 
 2164   : TransactionalObject(iqueue::SERVICE_NAME, name, transaction_proxy)
 
 2168 TransactionalQueueImpl::offer(
const serialization::pimpl::data& e,
 
 2169                               std::chrono::milliseconds timeout)
 
 2171     auto request = protocol::codec::transactionalqueue_offer_encode(
 
 2173       get_transaction_id(),
 
 2174       util::get_current_thread_id(),
 
 2176       std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
 
 2178     return invoke_and_get_future<bool>(request);
 
 2181 boost::future<boost::optional<serialization::pimpl::data>>
 
 2182 TransactionalQueueImpl::poll_data(std::chrono::milliseconds timeout)
 
 2184     auto request = protocol::codec::transactionalqueue_poll_encode(
 
 2186       get_transaction_id(),
 
 2187       util::get_current_thread_id(),
 
 2188       std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
 
 2190     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
 2195 TransactionalQueueImpl::size()
 
 2197     auto request = protocol::codec::transactionalqueue_size_encode(
 
 2198       get_name(), get_transaction_id(), util::get_current_thread_id());
 
 2200     return invoke_and_get_future<int>(request);
 
 2203 ISetImpl::ISetImpl(
const std::string& instance_name,
 
 2204                    spi::ClientContext* client_context)
 
 2205   : ProxyImpl(iset::SERVICE_NAME, instance_name, client_context)
 
 2207     serialization::pimpl::data key_data =
 
 2208       get_context().get_serialization_service().to_data<std::string>(
 
 2210     partition_id_ = get_partition_id(key_data);
 
 2214 ISetImpl::remove_item_listener(boost::uuids::uuid registration_id)
 
 2216     return get_context().get_client_listener_service().deregister_listener(
 
 2223     auto request = protocol::codec::set_size_encode(get_name());
 
 2224     return invoke_and_get_future<int>(request, partition_id_);
 
 2228 ISetImpl::is_empty()
 
 2230     auto request = protocol::codec::set_isempty_encode(get_name());
 
 2231     return invoke_and_get_future<bool>(request, partition_id_);
 
 2235 ISetImpl::contains(
const serialization::pimpl::data& element)
 
 2237     auto request = protocol::codec::set_contains_encode(get_name(), element);
 
 2238     return invoke_and_get_future<bool>(request, partition_id_);
 
 2241 boost::future<std::vector<serialization::pimpl::data>>
 
 2242 ISetImpl::to_array_data()
 
 2244     auto request = protocol::codec::set_getall_encode(get_name());
 
 2245     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
 2246       request, partition_id_);
 
 2250 ISetImpl::add(
const serialization::pimpl::data& element)
 
 2252     auto request = protocol::codec::set_add_encode(get_name(), element);
 
 2253     return invoke_and_get_future<bool>(request, partition_id_);
 
 2257 ISetImpl::remove(
const serialization::pimpl::data& element)
 
 2259     auto request = protocol::codec::set_remove_encode(get_name(), element);
 
 2260     return invoke_and_get_future<bool>(request, partition_id_);
 
 2264 ISetImpl::contains_all(
const std::vector<serialization::pimpl::data>& elements)
 
 2267       protocol::codec::set_containsall_encode(get_name(), elements);
 
 2268     return invoke_and_get_future<bool>(request, partition_id_);
 
 2272 ISetImpl::add_all(
const std::vector<serialization::pimpl::data>& elements)
 
 2274     auto request = protocol::codec::set_addall_encode(get_name(), elements);
 
 2275     return invoke_and_get_future<bool>(request, partition_id_);
 
 2279 ISetImpl::remove_all(
const std::vector<serialization::pimpl::data>& elements)
 
 2282       protocol::codec::set_compareandremoveall_encode(get_name(), elements);
 
 2283     return invoke_and_get_future<bool>(request, partition_id_);
 
 2287 ISetImpl::retain_all(
const std::vector<serialization::pimpl::data>& elements)
 
 2290       protocol::codec::set_compareandretainall_encode(get_name(), elements);
 
 2291     return invoke_and_get_future<bool>(request, partition_id_);
 
 2297     auto request = protocol::codec::set_clear_encode(get_name());
 
 2298     return to_void_future(invoke_on_partition(request, partition_id_));
 
 2301 std::shared_ptr<spi::impl::ListenerMessageCodec>
 
 2302 ISetImpl::create_item_listener_codec(
bool include_value)
 
 2304     return std::shared_ptr<spi::impl::ListenerMessageCodec>(
 
 2305       new SetListenerMessageCodec(get_name(), include_value));
 
 2308 ISetImpl::SetListenerMessageCodec::SetListenerMessageCodec(std::string name,
 
 2310   : name_(std::move(name))
 
 2311   , include_value_(include_value)
 
 2314 protocol::ClientMessage
 
 2315 ISetImpl::SetListenerMessageCodec::encode_add_request(
bool local_only)
 const 
 2317     return protocol::codec::set_addlistener_encode(
 
 2318       name_, include_value_, local_only);
 
 2321 protocol::ClientMessage
 
 2322 ISetImpl::SetListenerMessageCodec::encode_remove_request(
 
 2323   boost::uuids::uuid real_registration_id)
 const 
 2325     return protocol::codec::set_removelistener_encode(name_,
 
 2326                                                       real_registration_id);
 
 2329 ITopicImpl::ITopicImpl(
const std::string& instance_name,
 
 2330                        spi::ClientContext* context)
 
 2331   : proxy::ProxyImpl(
"hz:impl:topicService", instance_name, context)
 
 2332   , partition_id_(get_partition_id(to_data(instance_name)))
 
 2336 ITopicImpl::publish(
const serialization::pimpl::data& data)
 
 2338     auto request = protocol::codec::topic_publish_encode(get_name(), data);
 
 2339     return to_void_future(invoke_on_partition(request, partition_id_));
 
 2342 boost::future<boost::uuids::uuid>
 
 2343 ITopicImpl::add_message_listener(
 
 2344   std::shared_ptr<impl::BaseEventHandler> topic_event_handler)
 
 2346     return register_listener(create_item_listener_codec(),
 
 2347                              std::move(topic_event_handler));
 
 2351 ITopicImpl::remove_message_listener(boost::uuids::uuid registration_id)
 
 2353     return get_context().get_client_listener_service().deregister_listener(
 
 2357 std::shared_ptr<spi::impl::ListenerMessageCodec>
 
 2358 ITopicImpl::create_item_listener_codec()
 
 2360     return std::shared_ptr<spi::impl::ListenerMessageCodec>(
 
 2361       new TopicListenerMessageCodec(get_name()));
 
 2364 ITopicImpl::TopicListenerMessageCodec::TopicListenerMessageCodec(
 
 2366   : name_(std::move(name))
 
 2369 protocol::ClientMessage
 
 2370 ITopicImpl::TopicListenerMessageCodec::encode_add_request(
bool local_only)
 const 
 2372     return protocol::codec::topic_addmessagelistener_encode(name_, local_only);
 
 2375 protocol::ClientMessage
 
 2376 ITopicImpl::TopicListenerMessageCodec::encode_remove_request(
 
 2377   boost::uuids::uuid real_registration_id)
 const 
 2379     return protocol::codec::topic_removemessagelistener_encode(
 
 2380       name_, real_registration_id);
 
 2383 ReplicatedMapImpl::ReplicatedMapImpl(
const std::string& service_name,
 
 2384                                      const std::string& object_name,
 
 2385                                      spi::ClientContext* context)
 
 2386   : ProxyImpl(service_name, object_name, context)
 
 2387   , target_partition_id_(-1)
 
 2390 const int32_t RingbufferImpl::MAX_BATCH_SIZE{ 1000 };
 
 2394 const serialization::pimpl::data&
 
 2395 data_entry_view::get_key()
 const 
 2400 const serialization::pimpl::data&
 
 2401 data_entry_view::get_value()
 const 
 2407 data_entry_view::get_cost()
 const 
 2413 data_entry_view::get_creation_time()
 const 
 2415     return creation_time_;
 
 2419 data_entry_view::get_expiration_time()
 const 
 2421     return expiration_time_;
 
 2425 data_entry_view::get_hits()
 const 
 2431 data_entry_view::get_last_access_time()
 const 
 2433     return last_access_time_;
 
 2437 data_entry_view::get_last_stored_time()
 const 
 2439     return last_stored_time_;
 
 2443 data_entry_view::get_last_update_time()
 const 
 2445     return last_update_time_;
 
 2449 data_entry_view::get_version()
 const 
 2455 data_entry_view::get_ttl()
 const 
 2461 data_entry_view::get_max_idle()
 const 
 2466 data_entry_view::data_entry_view(serialization::pimpl::data&& key,
 
 2467                                  serialization::pimpl::data&& value,
 
 2469                                  int64_t creation_time,
 
 2470                                  int64_t expiration_time,
 
 2472                                  int64_t last_access_time,
 
 2473                                  int64_t last_stored_time,
 
 2474                                  int64_t last_update_time,
 
 2478   : key_(std::move(key))
 
 2479   , value_(std::move(value))
 
 2481   , creation_time_(creation_time)
 
 2482   , expiration_time_(expiration_time)
 
 2484   , last_access_time_(last_access_time)
 
 2485   , last_stored_time_(last_stored_time)
 
 2486   , last_update_time_(last_update_time)
 
 2489   , max_idle_(max_idle)
 
 2495 namespace reliable {
 
 2496 ReliableTopicMessage::ReliableTopicMessage()
 
 2497   : publish_time_(std::chrono::system_clock::now())
 
 2500 ReliableTopicMessage::ReliableTopicMessage(
 
 2501   hazelcast::client::serialization::pimpl::data&& payload_data,
 
 2502   std::unique_ptr<address> address)
 
 2503   : publish_time_(std::chrono::system_clock::now())
 
 2504   , payload_(std::move(payload_data))
 
 2507         publisher_address_ = boost::make_optional(*address);
 
 2511 std::chrono::system_clock::time_point
 
 2512 ReliableTopicMessage::get_publish_time()
 const 
 2514     return publish_time_;
 
 2517 const boost::optional<address>&
 
 2518 ReliableTopicMessage::get_publisher_address()
 const 
 2520     return publisher_address_;
 
 2523 serialization::pimpl::data&
 
 2524 ReliableTopicMessage::get_payload()
 
 2532 namespace serialization {
 
 2534 hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_factory_id()
 
 2540 hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_class_id()
 
 2542     return RELIABLE_TOPIC_MESSAGE;
 
 2546 hz_serializer<topic::impl::reliable::ReliableTopicMessage>::write_data(
 
 2547   const topic::impl::reliable::ReliableTopicMessage& 
object,
 
 2548   object_data_output& out)
 
 2550     out.write<int64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(
 
 2551                          object.publish_time_.time_since_epoch())
 
 2553     out.write_object(
object.publisher_address_);
 
 2554     out.write(
object.payload_.to_byte_array());
 
 2557 topic::impl::reliable::ReliableTopicMessage
 
 2558 hz_serializer<topic::impl::reliable::ReliableTopicMessage>::read_data(
 
 2559   object_data_input& in)
 
 2561     topic::impl::reliable::ReliableTopicMessage message;
 
 2562     auto now = std::chrono::system_clock::now();
 
 2563     message.publish_time_ = now +
 
 2564                             std::chrono::milliseconds(in.read<int64_t>()) -
 
 2565                             now.time_since_epoch();
 
 2566     message.publisher_address_ = in.read_object<address>();
 
 2568       serialization::pimpl::data(in.read<std::vector<byte>>().value());
 
 2573 entry_event::entry_event(
const std::string& name,
 
 2578                          typed_data&& old_value,
 
 2579                          typed_data&& merging_value)
 
 2581   , member_(std::move(member))
 
 2582   , event_type_(event_type)
 
 2583   , key_(std::move(key))
 
 2584   , value_(std::move(value))
 
 2585   , old_value_(std::move(old_value))
 
 2586   , merging_value_(std::move(merging_value))
 
 2610     return merging_value_;
 
 2632 operator<<(std::ostream& os, 
const entry_event& event)
 
 2634     os << 
"name: " << 
event.name_ << 
" member: " << 
event.member_
 
 2635        << 
" eventType: " << 
static_cast<int>(
event.event_type_)
 
 2636        << 
" key: " << event.key_.
get_type()
 
 2637        << 
" value: " << 
event.value_.get_type()
 
 2638        << 
" oldValue: " << 
event.old_value_.get_type()
 
 2639        << 
" mergingValue: " << 
event.merging_value_.get_type();
 
 2644                      entry_event::type event_type,
 
 2645                      const std::string& name,
 
 2646                      int number_of_entries_affected)
 
 2648   , event_type_(event_type)
 
 2650   , number_of_entries_affected_(number_of_entries_affected)
 
 2674     return number_of_entries_affected_;
 
 2678 operator<<(std::ostream& os, 
const map_event& event)
 
 2680     os << 
"MapEvent{member: " << 
event.member_
 
 2681        << 
" eventType: " << 
static_cast<int>(
event.event_type_)
 
 2682        << 
" name: " << event.name_
 
 2683        << 
" numberOfEntriesAffected: " << event.number_of_entries_affected_;
 
 2687 item_event_base::item_event_base(
const std::string& name,
 
 2688                                  const member& member,
 
 2689                                  const item_event_type& event_type)
 
 2692   , event_type_(event_type)
 
 2713 item_event_base::~item_event_base() = 
default;
 
 2715 flake_id_generator::flake_id_generator(
const std::string& object_name,
 
 2716                                        spi::ClientContext* context)
 
 2717   : flake_id_generator_impl(SERVICE_NAME, object_name, context)
 
const typed_data & get_key() const
Returns the key of the entry event.
 
const std::string & get_name() const
Returns the name of the map for this event.
 
const typed_data & get_old_value() const
Returns the old value of the entry event.
 
type get_event_type() const
Return the event type.
 
const member & get_member() const
Returns the member fired this event.
 
const typed_data & get_value() const
Returns the value of the entry event.
 
const typed_data & get_merging_value() const
Returns the incoming merging value of the entry event.
 
static const std::chrono::milliseconds UNSET
Default TTL value of a record.
 
item_event_type get_event_type() const
Return the event type.
 
const member & get_member() const
Returns the member fired this event.
 
const std::string & get_name() const
Returns the name of the collection for this event.
 
Map events common contract.
 
const std::string & get_name() const
Returns the name of the map for this event.
 
entry_event::type get_event_type() const
Return the event type.
 
int get_number_of_entries_affected() const
Returns the number of entries affected by this event.
 
map_event(member &&member, entry_event::type event_type, const std::string &name, int number_of_entries_affected)
Constructor.
 
const member & get_member() const
Returns the member fired this event.
 
typed_data class is a wrapper class for the serialized binary data.
 
serialization::pimpl::object_type get_type() const