17#include <unordered_set>
20#include "hazelcast/client/impl/ClientLockReferenceIdGenerator.h"
21#include "hazelcast/client/proxy/PNCounterImpl.h"
22#include "hazelcast/client/spi/ClientContext.h"
23#include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
24#include "hazelcast/client/proxy/flake_id_generator_impl.h"
25#include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
26#include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
27#include "hazelcast/client/client_config.h"
28#include "hazelcast/client/map/data_entry_view.h"
29#include "hazelcast/client/proxy/RingbufferImpl.h"
30#include "hazelcast/client/impl/vector_clock.h"
31#include "hazelcast/client/internal/partition/strategy/StringPartitioningStrategy.h"
32#include "hazelcast/util/Util.h"
33#include "hazelcast/client/topic/reliable_listener.h"
34#include "hazelcast/client/proxy/ITopicImpl.h"
35#include "hazelcast/client/proxy/ReplicatedMapImpl.h"
36#include "hazelcast/client/flake_id_generator.h"
37#include "hazelcast/client/reliable_topic.h"
43reliable_topic::reliable_topic(
const std::string& instance_name,
44 spi::ClientContext* context)
45 : proxy::ProxyImpl(
reliable_topic::SERVICE_NAME, instance_name, context)
47 context->get_client_execution_service().shared_from_this())
48 , executor_(execution_service_->get_user_executor())
49 , logger_(context->get_logger())
51 auto reliable_config =
52 context->get_client_config().lookup_reliable_topic_config(instance_name);
53 if (reliable_config) {
54 batch_size_ = reliable_config->get_read_batch_size();
56 batch_size_ = config::reliable_topic_config::DEFAULT_READ_BATCH_SIZE;
60 context->get_hazelcast_client_implementation()
61 ->get_distributed_object<ringbuffer>(
62 std::string(reliable_topic::TOPIC_RB_PREFIX) + instance_name);
68 int id = util::IOUtil::to_value<int>(registration_id);
69 auto runner = runners_map_.get(
id);
78reliable_topic::on_shutdown()
81 for (
auto& entry : runners_map_.clear()) {
82 entry.second->cancel();
87reliable_topic::on_destroy()
90 for (
auto& entry : runners_map_.clear()) {
91 entry.second->cancel();
96reliable_topic::post_destroy()
99 ringbuffer_.get()->destroy().get();
104 int64_t initial_sequence_id)
105 : loss_tolerant_(loss_tolerant)
106 , initial_sequence_id_(initial_sequence_id)
111ClientLockReferenceIdGenerator::ClientLockReferenceIdGenerator()
112 : reference_id_counter_(0)
116ClientLockReferenceIdGenerator::get_next_reference_id()
118 return ++reference_id_counter_;
123MultiMapImpl::MultiMapImpl(
const std::string& instance_name,
124 spi::ClientContext* context)
125 : ProxyImpl(multi_map::SERVICE_NAME, instance_name, context)
129 lock_reference_id_generator_ =
130 get_context().get_lock_reference_id_generator();
134MultiMapImpl::put(
const serialization::pimpl::data& key,
135 const serialization::pimpl::data& value)
137 auto request = protocol::codec::multimap_put_encode(
138 get_name(), key, value, util::get_current_thread_id());
139 return invoke_and_get_future<bool>(request, key);
142boost::future<std::vector<serialization::pimpl::data>>
143MultiMapImpl::get_data(
const serialization::pimpl::data& key)
145 auto request = protocol::codec::multimap_get_encode(
146 get_name(), key, util::get_current_thread_id());
147 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
152MultiMapImpl::remove(
const serialization::pimpl::data& key,
153 const serialization::pimpl::data& value)
155 auto request = protocol::codec::multimap_removeentry_encode(
156 get_name(), key, value, util::get_current_thread_id());
157 return invoke_and_get_future<bool>(request, key);
160boost::future<std::vector<serialization::pimpl::data>>
161MultiMapImpl::remove_data(
const serialization::pimpl::data& key)
163 auto request = protocol::codec::multimap_remove_encode(
164 get_name(), key, util::get_current_thread_id());
165 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
169boost::future<std::vector<serialization::pimpl::data>>
170MultiMapImpl::key_set_data()
172 auto request = protocol::codec::multimap_keyset_encode(get_name());
173 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
177boost::future<std::vector<serialization::pimpl::data>>
178MultiMapImpl::values_data()
180 auto request = protocol::codec::multimap_values_encode(get_name());
181 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
185boost::future<EntryVector>
186MultiMapImpl::entry_set_data()
188 auto request = protocol::codec::multimap_entryset_encode(get_name());
189 return invoke_and_get_future<EntryVector>(request);
193MultiMapImpl::contains_key(
const serialization::pimpl::data& key)
195 auto request = protocol::codec::multimap_containskey_encode(
196 get_name(), key, util::get_current_thread_id());
197 return invoke_and_get_future<bool>(request, key);
201MultiMapImpl::contains_value(
const serialization::pimpl::data& value)
204 protocol::codec::multimap_containsvalue_encode(get_name(), value);
205 return invoke_and_get_future<bool>(request);
209MultiMapImpl::contains_entry(
const serialization::pimpl::data& key,
210 const serialization::pimpl::data& value)
212 auto request = protocol::codec::multimap_containsentry_encode(
213 get_name(), key, value, util::get_current_thread_id());
214 return invoke_and_get_future<bool>(request, key);
220 auto request = protocol::codec::multimap_size_encode(get_name());
221 return invoke_and_get_future<int>(request);
227 auto request = protocol::codec::multimap_clear_encode(get_name());
228 return to_void_future(invoke(request));
232MultiMapImpl::value_count(
const serialization::pimpl::data& key)
234 auto request = protocol::codec::multimap_valuecount_encode(
235 get_name(), key, util::get_current_thread_id());
236 return invoke_and_get_future<int>(request, key);
239boost::future<boost::uuids::uuid>
240MultiMapImpl::add_entry_listener(
241 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
244 return register_listener(
245 create_multi_map_entry_listener_codec(include_value),
246 std::move(entry_event_handler));
249boost::future<boost::uuids::uuid>
250MultiMapImpl::add_entry_listener(
251 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
253 serialization::pimpl::data&& key)
255 return register_listener(
256 create_multi_map_entry_listener_codec(include_value, std::move(key)),
257 std::move(entry_event_handler));
261MultiMapImpl::remove_entry_listener(boost::uuids::uuid registration_id)
263 return get_context().get_client_listener_service().deregister_listener(
268MultiMapImpl::lock(
const serialization::pimpl::data& key)
270 return lock(key, std::chrono::milliseconds(-1));
274MultiMapImpl::lock(
const serialization::pimpl::data& key,
275 std::chrono::milliseconds lease_time)
277 auto request = protocol::codec::multimap_lock_encode(
280 util::get_current_thread_id(),
281 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
282 lock_reference_id_generator_->get_next_reference_id());
283 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
287MultiMapImpl::is_locked(
const serialization::pimpl::data& key)
289 auto request = protocol::codec::multimap_islocked_encode(get_name(), key);
290 return invoke_and_get_future<bool>(request, key);
294MultiMapImpl::try_lock(
const serialization::pimpl::data& key)
296 auto request = protocol::codec::multimap_trylock_encode(
299 util::get_current_thread_id(),
302 lock_reference_id_generator_->get_next_reference_id());
303 return invoke_and_get_future<bool>(request, key);
307MultiMapImpl::try_lock(
const serialization::pimpl::data& key,
308 std::chrono::milliseconds timeout)
310 return try_lock(key, timeout, std::chrono::milliseconds(INT64_MAX));
314MultiMapImpl::try_lock(
const serialization::pimpl::data& key,
315 std::chrono::milliseconds timeout,
316 std::chrono::milliseconds lease_time)
318 auto request = protocol::codec::multimap_trylock_encode(
321 util::get_current_thread_id(),
322 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
323 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
324 lock_reference_id_generator_->get_next_reference_id());
325 return invoke_and_get_future<bool>(request, key);
329MultiMapImpl::unlock(
const serialization::pimpl::data& key)
331 auto request = protocol::codec::multimap_unlock_encode(
334 util::get_current_thread_id(),
335 lock_reference_id_generator_->get_next_reference_id());
336 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
340MultiMapImpl::force_unlock(
const serialization::pimpl::data& key)
342 auto request = protocol::codec::multimap_forceunlock_encode(
343 get_name(), key, lock_reference_id_generator_->get_next_reference_id());
344 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
347std::shared_ptr<spi::impl::ListenerMessageCodec>
348MultiMapImpl::create_multi_map_entry_listener_codec(
bool include_value)
350 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
351 new MultiMapEntryListenerMessageCodec(get_name(), include_value));
354std::shared_ptr<spi::impl::ListenerMessageCodec>
355MultiMapImpl::create_multi_map_entry_listener_codec(
357 serialization::pimpl::data&& key)
359 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
360 new MultiMapEntryListenerToKeyCodec(
361 get_name(), include_value, std::move(key)));
365MultiMapImpl::on_initialize()
367 ProxyImpl::on_initialize();
368 lock_reference_id_generator_ =
369 get_context().get_lock_reference_id_generator();
372MultiMapImpl::MultiMapEntryListenerMessageCodec::
373 MultiMapEntryListenerMessageCodec(std::string name,
bool include_value)
374 : name_(std::move(name))
375 , include_value_(include_value)
378protocol::ClientMessage
379MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_add_request(
380 bool local_only)
const
382 return protocol::codec::multimap_addentrylistener_encode(
383 name_, include_value_, local_only);
386protocol::ClientMessage
387MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_remove_request(
388 boost::uuids::uuid real_registration_id)
const
390 return protocol::codec::multimap_removeentrylistener_encode(
391 name_, real_registration_id);
394protocol::ClientMessage
395MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_add_request(
396 bool local_only)
const
398 return protocol::codec::multimap_addentrylistenertokey_encode(
399 name_, key_, include_value_, local_only);
402protocol::ClientMessage
403MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_remove_request(
404 boost::uuids::uuid real_registration_id)
const
406 return protocol::codec::multimap_removeentrylistener_encode(
407 name_, real_registration_id);
410MultiMapImpl::MultiMapEntryListenerToKeyCodec::MultiMapEntryListenerToKeyCodec(
413 serialization::pimpl::data&& key)
414 : name_(std::move(name))
415 , include_value_(include_value)
416 , key_(std::move(key))
419const std::shared_ptr<std::unordered_set<member>>
420 PNCounterImpl::EMPTY_ADDRESS_LIST(
new std::unordered_set<member>());
422PNCounterImpl::PNCounterImpl(
const std::string& service_name,
423 const std::string& object_name,
424 spi::ClientContext* context)
425 : ProxyImpl(service_name, object_name, context)
426 , max_configured_replica_count_(0)
428 std::shared_ptr<impl::vector_clock>(new impl::vector_clock()))
429 , logger_(context->get_logger())
433operator<<(std::ostream& os,
const PNCounterImpl& proxy)
435 os <<
"PNCounter{name='" << proxy.get_name() <<
"\'}";
439boost::future<int64_t>
442 boost::shared_ptr<member> target =
443 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
445 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
446 "ClientPNCounterProxy::get",
447 "Cannot invoke operations on a CRDT because the cluster does not "
448 "contain any data members"));
450 return invoke_get_internal(EMPTY_ADDRESS_LIST,
nullptr, target);
453boost::future<int64_t>
454PNCounterImpl::get_and_add(int64_t delta)
456 boost::shared_ptr<member> target =
457 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
459 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
460 "ClientPNCounterProxy::getAndAdd",
461 "Cannot invoke operations on a CRDT because the cluster does not "
462 "contain any data members"));
464 return invoke_add_internal(
465 delta,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
468boost::future<int64_t>
469PNCounterImpl::add_and_get(int64_t delta)
471 boost::shared_ptr<member> target =
472 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
474 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
475 "ClientPNCounterProxy::addAndGet",
476 "Cannot invoke operations on a CRDT because the cluster does not "
477 "contain any data members"));
479 return invoke_add_internal(
480 delta,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
483boost::future<int64_t>
484PNCounterImpl::get_and_subtract(int64_t delta)
486 boost::shared_ptr<member> target =
487 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
489 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
490 "ClientPNCounterProxy::getAndSubtract",
491 "Cannot invoke operations on a CRDT because the cluster does not "
492 "contain any data members"));
494 return invoke_add_internal(
495 -delta,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
498boost::future<int64_t>
499PNCounterImpl::subtract_and_get(int64_t delta)
501 boost::shared_ptr<member> target =
502 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
504 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
505 "ClientPNCounterProxy::subtractAndGet",
506 "Cannot invoke operations on a CRDT because the cluster does not "
507 "contain any data members"));
509 return invoke_add_internal(
510 -delta,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
513boost::future<int64_t>
514PNCounterImpl::decrement_and_get()
516 boost::shared_ptr<member> target =
517 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
519 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
520 "ClientPNCounterProxy::decrementAndGet",
521 "Cannot invoke operations on a CRDT because the cluster does not "
522 "contain any data members"));
524 return invoke_add_internal(-1,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
527boost::future<int64_t>
528PNCounterImpl::increment_and_get()
530 boost::shared_ptr<member> target =
531 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
533 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
534 "ClientPNCounterProxy::incrementAndGet",
535 "Cannot invoke operations on a CRDT because the cluster does not "
536 "contain any data members"));
538 return invoke_add_internal(1,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
541boost::future<int64_t>
542PNCounterImpl::get_and_decrement()
544 boost::shared_ptr<member> target =
545 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
547 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
548 "ClientPNCounterProxy::getAndDecrement",
549 "Cannot invoke operations on a CRDT because the cluster does not "
550 "contain any data members"));
552 return invoke_add_internal(-1,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
555boost::future<int64_t>
556PNCounterImpl::get_and_increment()
558 boost::shared_ptr<member> target =
559 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
561 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
562 "ClientPNCounterProxy::getAndIncrement",
563 "Cannot invoke operations on a CRDT because the cluster does not "
564 "contain any data members"));
566 return invoke_add_internal(1,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
570PNCounterImpl::reset()
573 std::shared_ptr<impl::vector_clock>(
new impl::vector_clock());
574 return boost::make_ready_future();
577boost::shared_ptr<member>
578PNCounterImpl::get_crdt_operation_target(
579 const std::unordered_set<member>& excluded_addresses)
581 auto replicaAddress = current_target_replica_address_.load();
582 if (replicaAddress &&
583 excluded_addresses.find(*replicaAddress) == excluded_addresses.end()) {
584 return replicaAddress;
588 std::lock_guard<std::mutex> guard(target_selection_mutex_);
589 replicaAddress = current_target_replica_address_.load();
590 if (!replicaAddress || excluded_addresses.find(*replicaAddress) !=
591 excluded_addresses.end()) {
592 current_target_replica_address_ =
593 choose_target_replica(excluded_addresses);
596 return current_target_replica_address_;
599boost::shared_ptr<member>
600PNCounterImpl::choose_target_replica(
601 const std::unordered_set<member>& excluded_addresses)
603 std::vector<member> replicaAddresses =
604 get_replica_addresses(excluded_addresses);
605 if (replicaAddresses.empty()) {
610 int randomReplicaIndex = std::abs(rand()) % (int)replicaAddresses.size();
611 return boost::make_shared<member>(replicaAddresses[randomReplicaIndex]);
615PNCounterImpl::get_replica_addresses(
616 const std::unordered_set<member>& excluded_members)
618 std::vector<member> dataMembers =
619 get_context().get_client_cluster_service().get_members(
620 *member_selectors::DATA_MEMBER_SELECTOR);
621 int32_t replicaCount = get_max_configured_replica_count();
622 int currentReplicaCount =
623 util::min<int>(replicaCount, (
int)dataMembers.size());
625 std::vector<member> replicaMembers;
626 for (
int i = 0; i < currentReplicaCount; i++) {
627 if (excluded_members.find(dataMembers[i]) == excluded_members.end()) {
628 replicaMembers.push_back(dataMembers[i]);
631 return replicaMembers;
635PNCounterImpl::get_max_configured_replica_count()
637 if (max_configured_replica_count_ > 0) {
638 return max_configured_replica_count_;
641 protocol::codec::pncounter_getconfiguredreplicacount_encode(
643 max_configured_replica_count_ =
644 invoke_and_get_future<int32_t>(request).get();
646 return max_configured_replica_count_;
649boost::shared_ptr<member>
650PNCounterImpl::try_choose_a_new_target(
651 std::shared_ptr<std::unordered_set<member>> excluded_addresses,
652 boost::shared_ptr<member> last_target,
653 const exception::hazelcast_& last_exception)
658 boost::str(boost::format(
659 "Exception occurred while invoking operation on target %1%, "
660 "choosing different target. Cause: %2%") %
661 last_target % last_exception));
662 if (excluded_addresses == EMPTY_ADDRESS_LIST) {
665 excluded_addresses = std::make_shared<std::unordered_set<member>>();
667 excluded_addresses->insert(*last_target);
668 return get_crdt_operation_target(*excluded_addresses);
671boost::future<int64_t>
672PNCounterImpl::invoke_get_internal(
673 std::shared_ptr<std::unordered_set<member>> excluded_addresses,
674 std::exception_ptr last_exception,
675 const boost::shared_ptr<member>& target)
678 if (last_exception) {
679 std::rethrow_exception(last_exception);
681 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
682 "ClientPNCounterProxy::invokeGetInternal",
683 "Cannot invoke operations on a CRDT because the cluster does not "
684 "contain any data members"));
688 auto timestamps = observed_clock_.get()->entry_set();
689 auto request = protocol::codec::pncounter_get_encode(
690 get_name(), timestamps, target->get_uuid());
691 return invoke_on_member(request, target->get_uuid())
693 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
695 return get_and_update_timestamps(std::move(f));
696 }
catch (exception::hazelcast_& e) {
697 return invoke_get_internal(excluded_addresses,
698 std::current_exception(),
699 try_choose_a_new_target(
700 excluded_addresses, target, e))
704 }
catch (exception::hazelcast_& e) {
705 return invoke_get_internal(
707 std::current_exception(),
708 try_choose_a_new_target(excluded_addresses, target, e));
712boost::future<int64_t>
713PNCounterImpl::invoke_add_internal(
715 bool getBeforeUpdate,
716 std::shared_ptr<std::unordered_set<member>> excluded_addresses,
717 std::exception_ptr last_exception,
718 const boost::shared_ptr<member>& target)
721 if (last_exception) {
722 std::rethrow_exception(last_exception);
724 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
725 "ClientPNCounterProxy::invokeGetInternal",
726 "Cannot invoke operations on a CRDT because the cluster does not "
727 "contain any data members"));
732 auto request = protocol::codec::pncounter_add_encode(
736 observed_clock_.get()->entry_set(),
738 return invoke_on_member(request, target->get_uuid())
740 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
742 return get_and_update_timestamps(std::move(f));
743 }
catch (exception::hazelcast_& e) {
744 return invoke_add_internal(delta,
747 std::current_exception(),
748 try_choose_a_new_target(
749 excluded_addresses, target, e))
753 }
catch (exception::hazelcast_& e) {
754 return invoke_add_internal(
758 std::current_exception(),
759 try_choose_a_new_target(excluded_addresses, target, e));
764PNCounterImpl::get_and_update_timestamps(
765 boost::future<protocol::ClientMessage> f)
768 auto value = msg.get_first_fixed_sized_field<int64_t>();
771 update_observed_replica_timestamps(
772 msg.get<impl::vector_clock::timestamp_vector>());
777PNCounterImpl::update_observed_replica_timestamps(
778 const impl::vector_clock::timestamp_vector& received_logical_timestamps)
780 std::shared_ptr<impl::vector_clock> received =
781 to_vector_clock(received_logical_timestamps);
783 std::shared_ptr<impl::vector_clock> currentClock =
784 this->observed_clock_;
785 if (currentClock->is_after(*received)) {
788 if (observed_clock_.compare_and_set(currentClock, received)) {
794std::shared_ptr<impl::vector_clock>
795PNCounterImpl::to_vector_clock(
796 const impl::vector_clock::timestamp_vector& replica_logical_timestamps)
798 return std::shared_ptr<impl::vector_clock>(
799 new impl::vector_clock(replica_logical_timestamps));
802boost::shared_ptr<member>
803PNCounterImpl::get_current_target_replica_address()
805 return current_target_replica_address_.load();
808IListImpl::IListImpl(
const std::string& instance_name,
809 spi::ClientContext* context)
810 : ProxyImpl(
"hz:impl:listService", instance_name, context)
812 serialization::pimpl::data key_data =
813 get_context().get_serialization_service().to_data<std::string>(
815 partition_id_ = get_partition_id(key_data);
819IListImpl::remove_item_listener(boost::uuids::uuid registration_id)
821 return get_context().get_client_listener_service().deregister_listener(
828 auto request = protocol::codec::list_size_encode(get_name());
829 return invoke_and_get_future<int>(request, partition_id_);
835 auto request = protocol::codec::list_isempty_encode(get_name());
836 return invoke_and_get_future<bool>(request, partition_id_);
840IListImpl::contains(
const serialization::pimpl::data& element)
842 auto request = protocol::codec::list_contains_encode(get_name(), element);
843 return invoke_and_get_future<bool>(request, partition_id_);
846boost::future<std::vector<serialization::pimpl::data>>
847IListImpl::to_array_data()
849 auto request = protocol::codec::list_getall_encode(get_name());
850 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
851 request, partition_id_);
855IListImpl::add(
const serialization::pimpl::data& element)
857 auto request = protocol::codec::list_add_encode(get_name(), element);
858 return invoke_and_get_future<bool>(request, partition_id_);
862IListImpl::remove(
const serialization::pimpl::data& element)
864 auto request = protocol::codec::list_remove_encode(get_name(), element);
865 return invoke_and_get_future<bool>(request, partition_id_);
869IListImpl::contains_all_data(
870 const std::vector<serialization::pimpl::data>& elements)
873 protocol::codec::list_containsall_encode(get_name(), elements);
874 return invoke_and_get_future<bool>(request, partition_id_);
878IListImpl::add_all_data(
const std::vector<serialization::pimpl::data>& elements)
880 auto request = protocol::codec::list_addall_encode(get_name(), elements);
881 return invoke_and_get_future<bool>(request, partition_id_);
885IListImpl::add_all_data(
int index,
886 const std::vector<serialization::pimpl::data>& elements)
889 protocol::codec::list_addallwithindex_encode(get_name(), index, elements);
890 return invoke_and_get_future<bool>(request, partition_id_);
894IListImpl::remove_all_data(
895 const std::vector<serialization::pimpl::data>& elements)
898 protocol::codec::list_compareandremoveall_encode(get_name(), elements);
899 return invoke_and_get_future<bool>(request, partition_id_);
903IListImpl::retain_all_data(
904 const std::vector<serialization::pimpl::data>& elements)
907 protocol::codec::list_compareandretainall_encode(get_name(), elements);
908 return invoke_and_get_future<bool>(request, partition_id_);
914 auto request = protocol::codec::list_clear_encode(get_name());
915 return to_void_future(invoke_on_partition(request, partition_id_));
918boost::future<boost::optional<serialization::pimpl::data>>
919IListImpl::get_data(
int index)
921 auto request = protocol::codec::list_get_encode(get_name(), index);
922 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
923 request, partition_id_);
926boost::future<boost::optional<serialization::pimpl::data>>
927IListImpl::set_data(
int index,
const serialization::pimpl::data& element)
929 auto request = protocol::codec::list_set_encode(get_name(), index, element);
930 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
931 request, partition_id_);
935IListImpl::add(
int index,
const serialization::pimpl::data& element)
938 protocol::codec::list_addwithindex_encode(get_name(), index, element);
939 return to_void_future(invoke_on_partition(request, partition_id_));
942boost::future<boost::optional<serialization::pimpl::data>>
943IListImpl::remove_data(
int index)
946 protocol::codec::list_removewithindex_encode(get_name(), index);
947 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
948 request, partition_id_);
952IListImpl::index_of(
const serialization::pimpl::data& element)
954 auto request = protocol::codec::list_indexof_encode(get_name(), element);
955 return invoke_and_get_future<int>(request, partition_id_);
959IListImpl::last_index_of(
const serialization::pimpl::data& element)
962 protocol::codec::list_lastindexof_encode(get_name(), element);
963 return invoke_and_get_future<int>(request, partition_id_);
966boost::future<std::vector<serialization::pimpl::data>>
967IListImpl::sub_list_data(
int from_index,
int to_index)
970 protocol::codec::list_sub_encode(get_name(), from_index, to_index);
971 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
972 request, partition_id_);
975std::shared_ptr<spi::impl::ListenerMessageCodec>
976IListImpl::create_item_listener_codec(
bool include_value)
978 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
979 new ListListenerMessageCodec(get_name(), include_value));
982IListImpl::ListListenerMessageCodec::ListListenerMessageCodec(
985 : name_(std::move(name))
986 , include_value_(include_value)
989protocol::ClientMessage
990IListImpl::ListListenerMessageCodec::encode_add_request(
bool local_only)
const
992 return protocol::codec::list_addlistener_encode(
993 name_, include_value_, local_only);
996protocol::ClientMessage
997IListImpl::ListListenerMessageCodec::encode_remove_request(
998 boost::uuids::uuid real_registration_id)
const
1000 return protocol::codec::list_removelistener_encode(name_,
1001 real_registration_id);
1004flake_id_generator_impl::Block::Block(IdBatch&& id_batch,
1005 std::chrono::milliseconds validity)
1006 : id_batch_(id_batch)
1007 , invalid_since_(std::chrono::steady_clock::now() + validity)
1012flake_id_generator_impl::Block::next()
1014 if (invalid_since_ <= std::chrono::steady_clock::now()) {
1019 index = num_returned_;
1020 if (index == id_batch_.get_batch_size()) {
1023 }
while (!num_returned_.compare_exchange_strong(index, index + 1));
1025 return id_batch_.get_base() + index * id_batch_.get_increment();
1028flake_id_generator_impl::IdBatch::IdIterator
1029 flake_id_generator_impl::IdBatch::endOfBatch;
1032flake_id_generator_impl::IdBatch::get_base()
const
1038flake_id_generator_impl::IdBatch::get_increment()
const
1044flake_id_generator_impl::IdBatch::get_batch_size()
const
1049flake_id_generator_impl::IdBatch::IdBatch(int64_t base,
1053 , increment_(increment)
1054 , batch_size_(batch_size)
1057flake_id_generator_impl::IdBatch::IdIterator&
1058flake_id_generator_impl::IdBatch::end()
1063flake_id_generator_impl::IdBatch::IdIterator
1064flake_id_generator_impl::IdBatch::iterator()
1066 return flake_id_generator_impl::IdBatch::IdIterator(
1067 base_, increment_, batch_size_);
1070flake_id_generator_impl::IdBatch::IdIterator::IdIterator(
1072 const int64_t increment,
1075 , increment_(increment)
1076 , remaining_(remaining)
1080flake_id_generator_impl::IdBatch::IdIterator::operator==(
1081 const flake_id_generator_impl::IdBatch::IdIterator& rhs)
const
1083 return base2_ == rhs.base2_ && increment_ == rhs.increment_ &&
1084 remaining_ == rhs.remaining_;
1088flake_id_generator_impl::IdBatch::IdIterator::operator!=(
1089 const flake_id_generator_impl::IdBatch::IdIterator& rhs)
const
1091 return !(rhs == *
this);
1094flake_id_generator_impl::IdBatch::IdIterator::IdIterator()
1100flake_id_generator_impl::IdBatch::IdIterator&
1101flake_id_generator_impl::IdBatch::IdIterator::operator++()
1103 if (remaining_ == 0) {
1104 return flake_id_generator_impl::IdBatch::end();
1109 base2_ += increment_;
1114flake_id_generator_impl::flake_id_generator_impl(
1115 const std::string& service_name,
1116 const std::string& object_name,
1117 spi::ClientContext* context)
1118 : ProxyImpl(service_name, object_name, context)
1122 context->get_client_config().find_flake_id_generator_config(object_name);
1123 batch_size_ = config->get_prefetch_count();
1124 validity_ = config->get_prefetch_validity_duration();
1128flake_id_generator_impl::new_id_internal()
1130 auto b = block_.load();
1132 int64_t res = b->next();
1133 if (res != INT64_MIN) {
1138 throw std::overflow_error(
"");
1141boost::future<int64_t>
1142flake_id_generator_impl::new_id()
1145 return boost::make_ready_future(new_id_internal());
1146 }
catch (std::overflow_error&) {
1147 return new_id_batch(batch_size_)
1148 .then(boost::launch::sync,
1149 [=](boost::future<flake_id_generator_impl::IdBatch> f) {
1151 boost::make_shared<Block>(f.get(), validity_);
1152 auto value = newBlock->next();
1153 auto b = block_.load();
1154 block_.compare_exchange_strong(b, newBlock);
1160boost::future<flake_id_generator_impl::IdBatch>
1161flake_id_generator_impl::new_id_batch(int32_t size)
1164 protocol::codec::flakeidgenerator_newidbatch_encode(get_name(), size);
1165 return invoke(request).then(
1166 boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
1168 msg.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN);
1170 auto base = msg.get<int64_t>();
1171 auto increment = msg.get<int64_t>();
1172 auto batch_size = msg.get<int32_t>();
1173 return flake_id_generator_impl::IdBatch(base, increment, batch_size);
1177IQueueImpl::IQueueImpl(
const std::string& instance_name,
1178 spi::ClientContext* context)
1179 : ProxyImpl(
"hz:impl:queueService", instance_name, context)
1181 serialization::pimpl::data data =
1182 get_context().get_serialization_service().to_data<std::string>(
1184 partition_id_ = get_partition_id(data);
1188IQueueImpl::remove_item_listener(boost::uuids::uuid registration_id)
1190 return get_context().get_client_listener_service().deregister_listener(
1195IQueueImpl::offer(
const serialization::pimpl::data& element,
1196 std::chrono::milliseconds timeout)
1198 auto request = protocol::codec::queue_offer_encode(
1201 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1202 return invoke_and_get_future<bool>(request, partition_id_);
1206IQueueImpl::put(
const serialization::pimpl::data& element)
1208 auto request = protocol::codec::queue_put_encode(get_name(), element);
1209 return to_void_future(invoke_on_partition(request, partition_id_));
1212boost::future<boost::optional<serialization::pimpl::data>>
1213IQueueImpl::poll_data(std::chrono::milliseconds timeout)
1215 auto request = protocol::codec::queue_poll_encode(
1217 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1218 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1219 request, partition_id_);
1223IQueueImpl::remaining_capacity()
1225 auto request = protocol::codec::queue_remainingcapacity_encode(get_name());
1226 return invoke_and_get_future<int>(request, partition_id_);
1230IQueueImpl::remove(
const serialization::pimpl::data& element)
1232 auto request = protocol::codec::queue_remove_encode(get_name(), element);
1233 return invoke_and_get_future<bool>(request, partition_id_);
1237IQueueImpl::contains(
const serialization::pimpl::data& element)
1239 auto request = protocol::codec::queue_contains_encode(get_name(), element);
1240 return invoke_and_get_future<bool>(request, partition_id_);
1243boost::future<std::vector<serialization::pimpl::data>>
1244IQueueImpl::drain_to_data(
size_t max_elements)
1246 auto request = protocol::codec::queue_draintomaxsize_encode(
1247 get_name(), (int32_t)max_elements);
1249 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1250 request, partition_id_);
1253boost::future<std::vector<serialization::pimpl::data>>
1254IQueueImpl::drain_to_data()
1256 auto request = protocol::codec::queue_drainto_encode(get_name());
1257 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1258 request, partition_id_);
1261boost::future<boost::optional<serialization::pimpl::data>>
1262IQueueImpl::take_data()
1264 auto request = protocol::codec::queue_take_encode(get_name());
1265 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1266 request, partition_id_);
1269boost::future<boost::optional<serialization::pimpl::data>>
1270IQueueImpl::peek_data()
1272 auto request = protocol::codec::queue_peek_encode(get_name());
1273 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1274 request, partition_id_);
1280 auto request = protocol::codec::queue_size_encode(get_name());
1281 return invoke_and_get_future<int>(request, partition_id_);
1285IQueueImpl::is_empty()
1287 auto request = protocol::codec::queue_isempty_encode(get_name());
1288 return invoke_and_get_future<bool>(request, partition_id_);
1291boost::future<std::vector<serialization::pimpl::data>>
1292IQueueImpl::to_array_data()
1294 auto request = protocol::codec::queue_iterator_encode(get_name());
1295 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1296 request, partition_id_);
1300IQueueImpl::contains_all_data(
1301 const std::vector<serialization::pimpl::data>& elements)
1304 protocol::codec::queue_containsall_encode(get_name(), elements);
1305 return invoke_and_get_future<bool>(request, partition_id_);
1309IQueueImpl::add_all_data(
1310 const std::vector<serialization::pimpl::data>& elements)
1312 auto request = protocol::codec::queue_addall_encode(get_name(), elements);
1313 return invoke_and_get_future<bool>(request, partition_id_);
1317IQueueImpl::remove_all_data(
1318 const std::vector<serialization::pimpl::data>& elements)
1321 protocol::codec::queue_compareandremoveall_encode(get_name(), elements);
1322 return invoke_and_get_future<bool>(request, partition_id_);
1326IQueueImpl::retain_all_data(
1327 const std::vector<serialization::pimpl::data>& elements)
1330 protocol::codec::queue_compareandretainall_encode(get_name(), elements);
1331 return invoke_and_get_future<bool>(request, partition_id_);
1337 auto request = protocol::codec::queue_clear_encode(get_name());
1338 return to_void_future(invoke_on_partition(request, partition_id_));
1341std::shared_ptr<spi::impl::ListenerMessageCodec>
1342IQueueImpl::create_item_listener_codec(
bool include_value)
1344 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1345 new QueueListenerMessageCodec(get_name(), include_value));
1348IQueueImpl::QueueListenerMessageCodec::QueueListenerMessageCodec(
1351 : name_(std::move(name))
1352 , include_value_(include_value)
1355protocol::ClientMessage
1356IQueueImpl::QueueListenerMessageCodec::encode_add_request(
bool local_only)
const
1358 return protocol::codec::queue_addlistener_encode(
1359 name_, include_value_, local_only);
1362protocol::ClientMessage
1363IQueueImpl::QueueListenerMessageCodec::encode_remove_request(
1364 boost::uuids::uuid real_registration_id)
const
1366 return protocol::codec::queue_removelistener_encode(name_,
1367 real_registration_id);
1370ProxyImpl::ProxyImpl(
const std::string& service_name,
1371 const std::string& object_name,
1372 spi::ClientContext* context)
1373 : ClientProxy(object_name, service_name, *context)
1374 , SerializingProxy(*context, object_name)
1377ProxyImpl::~ProxyImpl() =
default;
1379SerializingProxy::SerializingProxy(spi::ClientContext& context,
1380 const std::string& object_name)
1381 : serialization_service_(context.get_serialization_service())
1382 , partition_service_(context.get_partition_service())
1383 , object_name_(object_name)
1384 , client_context_(context)
1388SerializingProxy::get_partition_id(
const serialization::pimpl::data& key)
1390 return partition_service_.get_partition_id(key);
1393boost::future<protocol::ClientMessage>
1394SerializingProxy::invoke_on_partition(protocol::ClientMessage& request,
1398 return spi::impl::ClientInvocation::create(
1400 std::make_shared<protocol::ClientMessage>(std::move(request)),
1404 }
catch (exception::iexception&) {
1405 util::exception_util::rethrow(std::current_exception());
1406 return boost::make_ready_future(protocol::ClientMessage(0));
1410boost::future<protocol::ClientMessage>
1411SerializingProxy::invoke(protocol::ClientMessage& request)
1414 return spi::impl::ClientInvocation::create(
1416 std::make_shared<protocol::ClientMessage>(std::move(request)),
1419 }
catch (exception::iexception&) {
1420 util::exception_util::rethrow(std::current_exception());
1421 return boost::make_ready_future(protocol::ClientMessage(0));
1425boost::future<protocol::ClientMessage>
1426SerializingProxy::invoke_on_connection(
1427 protocol::ClientMessage& request,
1428 std::shared_ptr<connection::Connection> connection)
1431 return spi::impl::ClientInvocation::create(
1433 std::make_shared<protocol::ClientMessage>(std::move(request)),
1437 }
catch (exception::iexception&) {
1438 util::exception_util::rethrow(std::current_exception());
1439 return boost::make_ready_future(protocol::ClientMessage(0));
1443boost::future<protocol::ClientMessage>
1444SerializingProxy::invoke_on_key_owner(
1445 protocol::ClientMessage& request,
1446 const serialization::pimpl::data& key_data)
1449 return invoke_on_partition(request, get_partition_id(key_data));
1450 }
catch (exception::iexception&) {
1451 util::exception_util::rethrow(std::current_exception());
1452 return boost::make_ready_future(protocol::ClientMessage(0));
1456boost::future<protocol::ClientMessage>
1457SerializingProxy::invoke_on_member(protocol::ClientMessage& request,
1458 boost::uuids::uuid uuid)
1461 auto invocation = spi::impl::ClientInvocation::create(
1463 std::make_shared<protocol::ClientMessage>(std::move(request)),
1466 return invocation->invoke();
1467 }
catch (exception::iexception&) {
1468 util::exception_util::rethrow(std::current_exception());
1469 return boost::make_ready_future(protocol::ClientMessage(0));
1474boost::future<boost::optional<serialization::pimpl::data>>
1475SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request)
1477 return decode_optional_var_sized<serialization::pimpl::data>(
1482boost::future<boost::optional<map::data_entry_view>>
1483SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1484 const serialization::pimpl::data& key)
1486 return decode_optional_var_sized<map::data_entry_view>(
1487 invoke_on_key_owner(request, key));
1491boost::future<boost::optional<serialization::pimpl::data>>
1492SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1495 return decode_optional_var_sized<serialization::pimpl::data>(
1496 invoke_on_partition(request, partition_id));
1500boost::future<boost::optional<serialization::pimpl::data>>
1501SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1502 const serialization::pimpl::data& key)
1504 return decode_optional_var_sized<serialization::pimpl::data>(
1505 invoke_on_key_owner(request, key));
1508PartitionSpecificClientProxy::PartitionSpecificClientProxy(
1509 const std::string& service_name,
1510 const std::string& object_name,
1511 spi::ClientContext* context)
1512 : ProxyImpl(service_name, object_name, context)
1517PartitionSpecificClientProxy::on_initialize()
1519 std::string partitionKey = internal::partition::strategy::
1520 StringPartitioningStrategy::get_partition_key(name_);
1521 partition_id_ = get_context().get_partition_service().get_partition_id(
1522 to_data<std::string>(partitionKey));
1525IMapImpl::IMapImpl(
const std::string& instance_name,
1526 spi::ClientContext* context)
1527 : ProxyImpl(
"hz:impl:mapService", instance_name, context)
1531IMapImpl::contains_key(
const serialization::pimpl::data& key)
1533 auto request = protocol::codec::map_containskey_encode(
1534 get_name(), key, util::get_current_thread_id());
1535 return invoke_and_get_future<bool>(request, key);
1539IMapImpl::contains_value(
const serialization::pimpl::data& value)
1541 auto request = protocol::codec::map_containsvalue_encode(get_name(), value);
1542 return invoke_and_get_future<bool>(request);
1545boost::future<boost::optional<serialization::pimpl::data>>
1546IMapImpl::get_data(
const serialization::pimpl::data& key)
1548 auto request = protocol::codec::map_get_encode(
1549 get_name(), key, util::get_current_thread_id());
1550 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1554boost::future<boost::optional<serialization::pimpl::data>>
1555IMapImpl::remove_data(
const serialization::pimpl::data& key)
1557 auto request = protocol::codec::map_remove_encode(
1558 get_name(), key, util::get_current_thread_id());
1559 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1564IMapImpl::remove(
const serialization::pimpl::data& key,
1565 const serialization::pimpl::data& value)
1567 auto request = protocol::codec::map_removeifsame_encode(
1568 get_name(), key, value, util::get_current_thread_id());
1569 return invoke_and_get_future<bool>(request, key);
1572boost::future<protocol::ClientMessage>
1573IMapImpl::remove_all(
const serialization::pimpl::data& predicate_data)
1576 protocol::codec::map_removeall_encode(get_name(), predicate_data);
1577 return invoke(request);
1580boost::future<protocol::ClientMessage>
1581IMapImpl::delete_entry(
const serialization::pimpl::data& key)
1583 auto request = protocol::codec::map_delete_encode(
1584 get_name(), key, util::get_current_thread_id());
1585 return invoke_on_partition(request, get_partition_id(key));
1588boost::future<protocol::ClientMessage>
1591 auto request = protocol::codec::map_flush_encode(get_name());
1592 return invoke(request);
1596IMapImpl::try_remove(
const serialization::pimpl::data& key,
1597 std::chrono::milliseconds timeout)
1599 auto request = protocol::codec::map_tryremove_encode(
1602 util::get_current_thread_id(),
1603 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1605 return invoke_and_get_future<bool>(request, key);
1609IMapImpl::try_put(
const serialization::pimpl::data& key,
1610 const serialization::pimpl::data& value,
1611 std::chrono::milliseconds timeout)
1613 auto request = protocol::codec::map_tryput_encode(
1617 util::get_current_thread_id(),
1618 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1620 return invoke_and_get_future<bool>(request, key);
1623boost::future<boost::optional<serialization::pimpl::data>>
1624IMapImpl::put_data(
const serialization::pimpl::data& key,
1625 const serialization::pimpl::data& value,
1626 std::chrono::milliseconds ttl)
1628 auto request = protocol::codec::map_put_encode(
1632 util::get_current_thread_id(),
1633 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1634 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1638boost::future<protocol::ClientMessage>
1639IMapImpl::put_transient(
const serialization::pimpl::data& key,
1640 const serialization::pimpl::data& value,
1641 std::chrono::milliseconds ttl)
1643 auto request = protocol::codec::map_puttransient_encode(
1647 util::get_current_thread_id(),
1648 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1649 return invoke_on_partition(request, get_partition_id(key));
1652boost::future<boost::optional<serialization::pimpl::data>>
1653IMapImpl::put_if_absent_data(
const serialization::pimpl::data& key,
1654 const serialization::pimpl::data& value,
1655 std::chrono::milliseconds ttl)
1657 auto request = protocol::codec::map_putifabsent_encode(
1661 util::get_current_thread_id(),
1662 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1663 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1668IMapImpl::replace(
const serialization::pimpl::data& key,
1669 const serialization::pimpl::data& old_value,
1670 const serialization::pimpl::data& new_value)
1672 auto request = protocol::codec::map_replaceifsame_encode(
1673 get_name(), key, old_value, new_value, util::get_current_thread_id());
1675 return invoke_and_get_future<bool>(request, key);
1678boost::future<boost::optional<serialization::pimpl::data>>
1679IMapImpl::replace_data(
const serialization::pimpl::data& key,
1680 const serialization::pimpl::data& value)
1682 auto request = protocol::codec::map_replace_encode(
1683 get_name(), key, value, util::get_current_thread_id());
1685 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1689boost::future<protocol::ClientMessage>
1690IMapImpl::set(
const serialization::pimpl::data& key,
1691 const serialization::pimpl::data& value,
1692 std::chrono::milliseconds ttl)
1694 auto request = protocol::codec::map_set_encode(
1698 util::get_current_thread_id(),
1699 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1700 return invoke_on_partition(request, get_partition_id(key));
1703boost::future<protocol::ClientMessage>
1704IMapImpl::lock(
const serialization::pimpl::data& key)
1706 return lock(key, std::chrono::milliseconds(-1));
1709boost::future<protocol::ClientMessage>
1710IMapImpl::lock(
const serialization::pimpl::data& key,
1711 std::chrono::milliseconds lease_time)
1713 auto request = protocol::codec::map_lock_encode(
1716 util::get_current_thread_id(),
1717 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1718 lock_reference_id_generator_->get_next_reference_id());
1719 return invoke_on_partition(request, get_partition_id(key));
1723IMapImpl::is_locked(
const serialization::pimpl::data& key)
1725 auto request = protocol::codec::map_islocked_encode(get_name(), key);
1727 return invoke_and_get_future<bool>(request, key);
1731IMapImpl::try_lock(
const serialization::pimpl::data& key,
1732 std::chrono::milliseconds timeout)
1734 return try_lock(key, timeout, std::chrono::milliseconds(-1));
1738IMapImpl::try_lock(
const serialization::pimpl::data& key,
1739 std::chrono::milliseconds timeout,
1740 std::chrono::milliseconds lease_time)
1742 auto request = protocol::codec::map_trylock_encode(
1745 util::get_current_thread_id(),
1746 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1747 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
1748 lock_reference_id_generator_->get_next_reference_id());
1749 return invoke_and_get_future<bool>(request, key);
1752boost::future<protocol::ClientMessage>
1753IMapImpl::unlock(
const serialization::pimpl::data& key)
1755 auto request = protocol::codec::map_unlock_encode(
1758 util::get_current_thread_id(),
1759 lock_reference_id_generator_->get_next_reference_id());
1760 return invoke_on_partition(request, get_partition_id(key));
1763boost::future<protocol::ClientMessage>
1764IMapImpl::force_unlock(
const serialization::pimpl::data& key)
1766 auto request = protocol::codec::map_forceunlock_encode(
1767 get_name(), key, lock_reference_id_generator_->get_next_reference_id());
1768 return invoke_on_partition(request, get_partition_id(key));
1771boost::future<std::string>
1772IMapImpl::add_interceptor(
const serialization::pimpl::data& interceptor)
1775 protocol::codec::map_addinterceptor_encode(get_name(), interceptor);
1776 return invoke_and_get_future<std::string>(request);
1781boost::future<boost::uuids::uuid>
1782IMapImpl::add_entry_listener(
1783 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1785 int32_t listener_flags)
1787 return register_listener(
1788 create_map_entry_listener_codec(include_value, listener_flags),
1789 std::move(entry_event_handler));
1792boost::future<boost::uuids::uuid>
1793IMapImpl::add_entry_listener(
1794 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1795 serialization::pimpl::data&& predicate,
1797 int32_t listener_flags)
1799 return register_listener(
1800 create_map_entry_listener_codec(
1801 include_value, std::move(predicate), listener_flags),
1802 std::move(entry_event_handler));
1806IMapImpl::remove_entry_listener(boost::uuids::uuid registration_id)
1808 return get_context().get_client_listener_service().deregister_listener(
1812boost::future<boost::uuids::uuid>
1813IMapImpl::add_entry_listener(
1814 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1816 serialization::pimpl::data&& key,
1817 int32_t listener_flags)
1819 return register_listener(create_map_entry_listener_codec(
1820 include_value, listener_flags, std::move(key)),
1821 std::move(entry_event_handler));
1824boost::future<boost::optional<map::data_entry_view>>
1825IMapImpl::get_entry_view_data(
const serialization::pimpl::data& key)
1827 auto request = protocol::codec::map_getentryview_encode(
1828 get_name(), key, util::get_current_thread_id());
1829 return invoke_and_get_future<boost::optional<map::data_entry_view>>(request,
1834IMapImpl::evict(
const serialization::pimpl::data& key)
1836 auto request = protocol::codec::map_evict_encode(
1837 get_name(), key, util::get_current_thread_id());
1838 return invoke_and_get_future<bool>(request, key);
1841boost::future<protocol::ClientMessage>
1842IMapImpl::evict_all()
1844 auto request = protocol::codec::map_evictall_encode(get_name());
1845 return invoke(request);
1848boost::future<EntryVector>
1849IMapImpl::get_all_data(
int partition_id,
1850 const std::vector<serialization::pimpl::data>& keys)
1852 auto request = protocol::codec::map_getall_encode(get_name(), keys);
1853 return invoke_and_get_future<EntryVector>(request, partition_id);
1856boost::future<std::vector<serialization::pimpl::data>>
1857IMapImpl::key_set_data()
1859 auto request = protocol::codec::map_keyset_encode(get_name());
1860 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1864boost::future<std::vector<serialization::pimpl::data>>
1865IMapImpl::key_set_data(
const serialization::pimpl::data& predicate)
1868 protocol::codec::map_keysetwithpredicate_encode(get_name(), predicate);
1869 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1874 std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
1875IMapImpl::key_set_for_paging_predicate_data(
1876 protocol::codec::holder::paging_predicate_holder
const& predicate)
1878 auto request = protocol::codec::map_keysetwithpagingpredicate_encode(
1879 get_name(), predicate);
1880 return invoke(request).then(
1881 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1882 return get_paging_predicate_response<
1883 std::vector<serialization::pimpl::data>>(std::move(f));
1887boost::future<EntryVector>
1888IMapImpl::entry_set_data()
1890 auto request = protocol::codec::map_entryset_encode(get_name());
1891 return invoke_and_get_future<EntryVector>(request);
1894boost::future<EntryVector>
1895IMapImpl::entry_set_data(
const serialization::pimpl::data& predicate)
1898 protocol::codec::map_entrieswithpredicate_encode(get_name(), predicate);
1899 return invoke_and_get_future<EntryVector>(request);
1902boost::future<std::pair<EntryVector, query::anchor_data_list>>
1903IMapImpl::entry_set_for_paging_predicate_data(
1904 protocol::codec::holder::paging_predicate_holder
const& predicate)
1906 auto request = protocol::codec::map_entrieswithpagingpredicate_encode(
1907 get_name(), predicate);
1908 return invoke(request).then(
1909 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1910 return get_paging_predicate_response<EntryVector>(std::move(f));
1914boost::future<std::vector<serialization::pimpl::data>>
1915IMapImpl::values_data()
1917 auto request = protocol::codec::map_values_encode(get_name());
1918 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1922boost::future<std::vector<serialization::pimpl::data>>
1923IMapImpl::values_data(
const serialization::pimpl::data& predicate)
1926 protocol::codec::map_valueswithpredicate_encode(get_name(), predicate);
1927 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1932 std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
1933IMapImpl::values_for_paging_predicate_data(
1934 protocol::codec::holder::paging_predicate_holder
const& predicate)
1936 auto request = protocol::codec::map_valueswithpagingpredicate_encode(
1937 get_name(), predicate);
1938 return invoke(request).then(
1939 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1940 return get_paging_predicate_response<
1941 std::vector<serialization::pimpl::data>>(std::move(f));
1945boost::future<protocol::ClientMessage>
1946IMapImpl::add_index_data(
const config::index_config& config)
1948 auto request = protocol::codec::map_addindex_encode(get_name(), config);
1949 return invoke(request);
1955 auto request = protocol::codec::map_size_encode(get_name());
1956 return invoke_and_get_future<int>(request);
1962 auto request = protocol::codec::map_isempty_encode(get_name());
1963 return invoke_and_get_future<bool>(request);
1966boost::future<protocol::ClientMessage>
1967IMapImpl::put_all_data(
int partition_id,
const EntryVector& entries)
1970 protocol::codec::map_putall_encode(get_name(), entries,
true);
1971 return invoke_on_partition(request, partition_id);
1974boost::future<protocol::ClientMessage>
1975IMapImpl::clear_data()
1977 auto request = protocol::codec::map_clear_encode(get_name());
1978 return invoke(request);
1981boost::future<boost::optional<serialization::pimpl::data>>
1982IMapImpl::execute_on_key_data(
const serialization::pimpl::data& key,
1983 const serialization::pimpl::data& processor)
1985 auto request = protocol::codec::map_executeonkey_encode(
1986 get_name(), processor, key, util::get_current_thread_id());
1987 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1988 request, get_partition_id(key));
1991boost::future<boost::optional<serialization::pimpl::data>>
1992IMapImpl::submit_to_key_data(
const serialization::pimpl::data& key,
1993 const serialization::pimpl::data& processor)
1995 auto request = protocol::codec::map_submittokey_encode(
1996 get_name(), processor, key, util::get_current_thread_id());
1997 return invoke_on_partition(request, get_partition_id(key))
1998 .then(boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
2001 return msg.get_nullable<serialization::pimpl::data>();
2005boost::future<EntryVector>
2006IMapImpl::execute_on_keys_data(
2007 const std::vector<serialization::pimpl::data>& keys,
2008 const serialization::pimpl::data& processor)
2011 protocol::codec::map_executeonkeys_encode(get_name(), processor, keys);
2012 return invoke_and_get_future<EntryVector>(request);
2015boost::future<protocol::ClientMessage>
2016IMapImpl::remove_interceptor(
const std::string&
id)
2019 protocol::codec::map_removeinterceptor_encode(get_name(),
id);
2020 return invoke(request);
2023boost::future<EntryVector>
2024IMapImpl::execute_on_entries_data(
2025 const serialization::pimpl::data& entry_processor)
2028 protocol::codec::map_executeonallkeys_encode(get_name(), entry_processor);
2029 return invoke_and_get_future<EntryVector>(request);
2032boost::future<EntryVector>
2033IMapImpl::execute_on_entries_data(
2034 const serialization::pimpl::data& entry_processor,
2035 const serialization::pimpl::data& predicate)
2037 auto request = protocol::codec::map_executewithpredicate_encode(
2038 get_name(), entry_processor, predicate);
2039 return invoke_and_get_future<EntryVector>(request);
2042std::shared_ptr<spi::impl::ListenerMessageCodec>
2043IMapImpl::create_map_entry_listener_codec(
2045 serialization::pimpl::data&& predicate,
2046 int32_t listener_flags)
2048 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2049 new MapEntryListenerWithPredicateMessageCodec(
2050 get_name(), include_value, listener_flags, std::move(predicate)));
2053std::shared_ptr<spi::impl::ListenerMessageCodec>
2054IMapImpl::create_map_entry_listener_codec(
bool include_value,
2055 int32_t listener_flags)
2057 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2058 new MapEntryListenerMessageCodec(
2059 get_name(), include_value, listener_flags));
2062std::shared_ptr<spi::impl::ListenerMessageCodec>
2063IMapImpl::create_map_entry_listener_codec(
bool include_value,
2064 int32_t listener_flags,
2065 serialization::pimpl::data&& key)
2067 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2068 new MapEntryListenerToKeyCodec(
2069 get_name(), include_value, listener_flags, std::move(key)));
2073IMapImpl::on_initialize()
2075 ProxyImpl::on_initialize();
2076 lock_reference_id_generator_ =
2077 get_context().get_lock_reference_id_generator();
2080IMapImpl::MapEntryListenerMessageCodec::MapEntryListenerMessageCodec(
2083 int32_t listener_flags)
2084 : name_(std::move(name))
2085 , include_value_(include_value)
2086 , listener_flags_(listener_flags)
2089protocol::ClientMessage
2090IMapImpl::MapEntryListenerMessageCodec::encode_add_request(
2091 bool local_only)
const
2093 return protocol::codec::map_addentrylistener_encode(
2094 name_, include_value_,
static_cast<int32_t
>(listener_flags_), local_only);
2097protocol::ClientMessage
2098IMapImpl::MapEntryListenerMessageCodec::encode_remove_request(
2099 boost::uuids::uuid real_registration_id)
const
2101 return protocol::codec::map_removeentrylistener_encode(
2102 name_, real_registration_id);
2105protocol::ClientMessage
2106IMapImpl::MapEntryListenerToKeyCodec::encode_add_request(
bool local_only)
const
2108 return protocol::codec::map_addentrylistenertokey_encode(
2112 static_cast<int32_t
>(listener_flags_),
2116protocol::ClientMessage
2117IMapImpl::MapEntryListenerToKeyCodec::encode_remove_request(
2118 boost::uuids::uuid real_registration_id)
const
2120 return protocol::codec::map_removeentrylistener_encode(
2121 name_, real_registration_id);
2124IMapImpl::MapEntryListenerToKeyCodec::MapEntryListenerToKeyCodec(
2127 int32_t listener_flags,
2128 serialization::pimpl::data key)
2129 : name_(std::move(name))
2130 , include_value_(include_value)
2131 , listener_flags_(listener_flags)
2132 , key_(std::move(key))
2135IMapImpl::MapEntryListenerWithPredicateMessageCodec::
2136 MapEntryListenerWithPredicateMessageCodec(
2139 int32_t listener_flags,
2140 serialization::pimpl::data&& predicate)
2141 : name_(std::move(name))
2142 , include_value_(include_value)
2143 , listener_flags_(listener_flags)
2144 , predicate_(std::move(predicate))
2147protocol::ClientMessage
2148IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_add_request(
2149 bool local_only)
const
2151 return protocol::codec::map_addentrylistenerwithpredicate_encode(
2155 static_cast<int32_t
>(listener_flags_),
2159protocol::ClientMessage
2160IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_remove_request(
2161 boost::uuids::uuid real_registration_id)
const
2163 return protocol::codec::map_removeentrylistener_encode(
2164 name_, real_registration_id);
2167TransactionalQueueImpl::TransactionalQueueImpl(
2168 const std::string& name,
2169 txn::TransactionProxy& transaction_proxy)
2170 : TransactionalObject(iqueue::SERVICE_NAME, name, transaction_proxy)
2174TransactionalQueueImpl::offer(
const serialization::pimpl::data& e,
2175 std::chrono::milliseconds timeout)
2177 auto request = protocol::codec::transactionalqueue_offer_encode(
2179 get_transaction_id(),
2180 util::get_current_thread_id(),
2182 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
2184 return invoke_and_get_future<bool>(request);
2187boost::future<boost::optional<serialization::pimpl::data>>
2188TransactionalQueueImpl::poll_data(std::chrono::milliseconds timeout)
2190 auto request = protocol::codec::transactionalqueue_poll_encode(
2192 get_transaction_id(),
2193 util::get_current_thread_id(),
2194 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
2196 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
2201TransactionalQueueImpl::size()
2203 auto request = protocol::codec::transactionalqueue_size_encode(
2204 get_name(), get_transaction_id(), util::get_current_thread_id());
2206 return invoke_and_get_future<int>(request);
2209ISetImpl::ISetImpl(
const std::string& instance_name,
2210 spi::ClientContext* client_context)
2211 : ProxyImpl(iset::SERVICE_NAME, instance_name, client_context)
2213 serialization::pimpl::data key_data =
2214 get_context().get_serialization_service().to_data<std::string>(
2216 partition_id_ = get_partition_id(key_data);
2220ISetImpl::remove_item_listener(boost::uuids::uuid registration_id)
2222 return get_context().get_client_listener_service().deregister_listener(
2229 auto request = protocol::codec::set_size_encode(get_name());
2230 return invoke_and_get_future<int>(request, partition_id_);
2236 auto request = protocol::codec::set_isempty_encode(get_name());
2237 return invoke_and_get_future<bool>(request, partition_id_);
2241ISetImpl::contains(
const serialization::pimpl::data& element)
2243 auto request = protocol::codec::set_contains_encode(get_name(), element);
2244 return invoke_and_get_future<bool>(request, partition_id_);
2247boost::future<std::vector<serialization::pimpl::data>>
2248ISetImpl::to_array_data()
2250 auto request = protocol::codec::set_getall_encode(get_name());
2251 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
2252 request, partition_id_);
2256ISetImpl::add(
const serialization::pimpl::data& element)
2258 auto request = protocol::codec::set_add_encode(get_name(), element);
2259 return invoke_and_get_future<bool>(request, partition_id_);
2263ISetImpl::remove(
const serialization::pimpl::data& element)
2265 auto request = protocol::codec::set_remove_encode(get_name(), element);
2266 return invoke_and_get_future<bool>(request, partition_id_);
2270ISetImpl::contains_all(
const std::vector<serialization::pimpl::data>& elements)
2273 protocol::codec::set_containsall_encode(get_name(), elements);
2274 return invoke_and_get_future<bool>(request, partition_id_);
2278ISetImpl::add_all(
const std::vector<serialization::pimpl::data>& elements)
2280 auto request = protocol::codec::set_addall_encode(get_name(), elements);
2281 return invoke_and_get_future<bool>(request, partition_id_);
2285ISetImpl::remove_all(
const std::vector<serialization::pimpl::data>& elements)
2288 protocol::codec::set_compareandremoveall_encode(get_name(), elements);
2289 return invoke_and_get_future<bool>(request, partition_id_);
2293ISetImpl::retain_all(
const std::vector<serialization::pimpl::data>& elements)
2296 protocol::codec::set_compareandretainall_encode(get_name(), elements);
2297 return invoke_and_get_future<bool>(request, partition_id_);
2303 auto request = protocol::codec::set_clear_encode(get_name());
2304 return to_void_future(invoke_on_partition(request, partition_id_));
2307std::shared_ptr<spi::impl::ListenerMessageCodec>
2308ISetImpl::create_item_listener_codec(
bool include_value)
2310 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2311 new SetListenerMessageCodec(get_name(), include_value));
2314ISetImpl::SetListenerMessageCodec::SetListenerMessageCodec(std::string name,
2316 : name_(std::move(name))
2317 , include_value_(include_value)
2320protocol::ClientMessage
2321ISetImpl::SetListenerMessageCodec::encode_add_request(
bool local_only)
const
2323 return protocol::codec::set_addlistener_encode(
2324 name_, include_value_, local_only);
2327protocol::ClientMessage
2328ISetImpl::SetListenerMessageCodec::encode_remove_request(
2329 boost::uuids::uuid real_registration_id)
const
2331 return protocol::codec::set_removelistener_encode(name_,
2332 real_registration_id);
2335ITopicImpl::ITopicImpl(
const std::string& instance_name,
2336 spi::ClientContext* context)
2337 : proxy::ProxyImpl(
"hz:impl:topicService", instance_name, context)
2338 , partition_id_(get_partition_id(to_data(instance_name)))
2342ITopicImpl::publish(
const serialization::pimpl::data& data)
2344 auto request = protocol::codec::topic_publish_encode(get_name(), data);
2345 return to_void_future(invoke_on_partition(request, partition_id_));
2348boost::future<boost::uuids::uuid>
2349ITopicImpl::add_message_listener(
2350 std::shared_ptr<impl::BaseEventHandler> topic_event_handler)
2352 return register_listener(create_item_listener_codec(),
2353 std::move(topic_event_handler));
2357ITopicImpl::remove_message_listener(boost::uuids::uuid registration_id)
2359 return get_context().get_client_listener_service().deregister_listener(
2363std::shared_ptr<spi::impl::ListenerMessageCodec>
2364ITopicImpl::create_item_listener_codec()
2366 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2367 new TopicListenerMessageCodec(get_name()));
2370ITopicImpl::TopicListenerMessageCodec::TopicListenerMessageCodec(
2372 : name_(std::move(name))
2375protocol::ClientMessage
2376ITopicImpl::TopicListenerMessageCodec::encode_add_request(
bool local_only)
const
2378 return protocol::codec::topic_addmessagelistener_encode(name_, local_only);
2381protocol::ClientMessage
2382ITopicImpl::TopicListenerMessageCodec::encode_remove_request(
2383 boost::uuids::uuid real_registration_id)
const
2385 return protocol::codec::topic_removemessagelistener_encode(
2386 name_, real_registration_id);
2389ReplicatedMapImpl::ReplicatedMapImpl(
const std::string& service_name,
2390 const std::string& object_name,
2391 spi::ClientContext* context)
2392 : ProxyImpl(service_name, object_name, context)
2393 , target_partition_id_(-1)
2396const int32_t RingbufferImpl::MAX_BATCH_SIZE{ 1000 };
2400const serialization::pimpl::data&
2401data_entry_view::get_key()
const
2406const serialization::pimpl::data&
2407data_entry_view::get_value()
const
2413data_entry_view::get_cost()
const
2419data_entry_view::get_creation_time()
const
2421 return creation_time_;
2425data_entry_view::get_expiration_time()
const
2427 return expiration_time_;
2431data_entry_view::get_hits()
const
2437data_entry_view::get_last_access_time()
const
2439 return last_access_time_;
2443data_entry_view::get_last_stored_time()
const
2445 return last_stored_time_;
2449data_entry_view::get_last_update_time()
const
2451 return last_update_time_;
2455data_entry_view::get_version()
const
2461data_entry_view::get_ttl()
const
2467data_entry_view::get_max_idle()
const
2472data_entry_view::data_entry_view(serialization::pimpl::data&& key,
2473 serialization::pimpl::data&& value,
2475 int64_t creation_time,
2476 int64_t expiration_time,
2478 int64_t last_access_time,
2479 int64_t last_stored_time,
2480 int64_t last_update_time,
2484 : key_(std::move(key))
2485 , value_(std::move(value))
2487 , creation_time_(creation_time)
2488 , expiration_time_(expiration_time)
2490 , last_access_time_(last_access_time)
2491 , last_stored_time_(last_stored_time)
2492 , last_update_time_(last_update_time)
2495 , max_idle_(max_idle)
2502ReliableTopicMessage::ReliableTopicMessage()
2503 : publish_time_(std::chrono::system_clock::now())
2506ReliableTopicMessage::ReliableTopicMessage(
2507 hazelcast::client::serialization::pimpl::data&& payload_data,
2508 std::unique_ptr<address> address)
2509 : publish_time_(std::chrono::system_clock::now())
2510 , payload_(std::move(payload_data))
2513 publisher_address_ = boost::make_optional(*address);
2517std::chrono::system_clock::time_point
2518ReliableTopicMessage::get_publish_time()
const
2520 return publish_time_;
2523const boost::optional<address>&
2524ReliableTopicMessage::get_publisher_address()
const
2526 return publisher_address_;
2529serialization::pimpl::data&
2530ReliableTopicMessage::get_payload()
2538namespace serialization {
2540hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_factory_id()
2546hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_class_id()
2548 return RELIABLE_TOPIC_MESSAGE;
2552hz_serializer<topic::impl::reliable::ReliableTopicMessage>::write_data(
2553 const topic::impl::reliable::ReliableTopicMessage&
object,
2554 object_data_output& out)
2556 out.write<int64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(
2557 object.publish_time_.time_since_epoch())
2559 out.write_object(
object.publisher_address_);
2560 out.write(
object.payload_.to_byte_array());
2563topic::impl::reliable::ReliableTopicMessage
2564hz_serializer<topic::impl::reliable::ReliableTopicMessage>::read_data(
2565 object_data_input& in)
2567 topic::impl::reliable::ReliableTopicMessage message;
2568 auto now = std::chrono::system_clock::now();
2569 message.publish_time_ = now +
2570 std::chrono::milliseconds(in.read<int64_t>()) -
2571 now.time_since_epoch();
2572 message.publisher_address_ = in.read_object<address>();
2574 serialization::pimpl::data(in.read<std::vector<byte>>().value());
2579entry_event::entry_event(
const std::string& name,
2584 typed_data&& old_value,
2585 typed_data&& merging_value)
2587 , member_(
std::move(member))
2588 , event_type_(event_type)
2589 , key_(
std::move(key))
2590 , value_(
std::move(value))
2591 , old_value_(
std::move(old_value))
2592 , merging_value_(
std::move(merging_value))
2616 return merging_value_;
2638operator<<(std::ostream& os,
const entry_event& event)
2640 os <<
"name: " <<
event.name_ <<
" member: " <<
event.member_
2641 <<
" eventType: " <<
static_cast<int>(
event.event_type_)
2642 <<
" key: " << event.key_.
get_type()
2643 <<
" value: " <<
event.value_.get_type()
2644 <<
" oldValue: " <<
event.old_value_.get_type()
2645 <<
" mergingValue: " <<
event.merging_value_.get_type();
2650 entry_event::type event_type,
2651 const std::string& name,
2652 int number_of_entries_affected)
2654 , event_type_(event_type)
2656 , number_of_entries_affected_(number_of_entries_affected)
2680 return number_of_entries_affected_;
2684operator<<(std::ostream& os,
const map_event& event)
2686 os <<
"MapEvent{member: " <<
event.member_
2687 <<
" eventType: " <<
static_cast<int>(
event.event_type_)
2688 <<
" name: " << event.name_
2689 <<
" numberOfEntriesAffected: " << event.number_of_entries_affected_;
2693item_event_base::item_event_base(
const std::string& name,
2694 const member& member,
2695 const item_event_type& event_type)
2698 , event_type_(event_type)
2719item_event_base::~item_event_base() =
default;
2721flake_id_generator::flake_id_generator(
const std::string& object_name,
2722 spi::ClientContext* context)
2723 : flake_id_generator_impl(SERVICE_NAME, object_name, context)
const typed_data & get_key() const
Returns the key of the entry event.
const std::string & get_name() const
Returns the name of the map for this event.
const typed_data & get_old_value() const
Returns the old value of the entry event.
type get_event_type() const
Return the event type.
const member & get_member() const
Returns the member fired this event.
const typed_data & get_value() const
Returns the value of the entry event.
const typed_data & get_merging_value() const
Returns the incoming merging value of the entry event.
static const std::chrono::milliseconds UNSET
Default TTL value of a record.
item_event_type get_event_type() const
Return the event type.
const member & get_member() const
Returns the member fired this event.
const std::string & get_name() const
Returns the name of the collection for this event.
Map events common contract.
const std::string & get_name() const
Returns the name of the map for this event.
entry_event::type get_event_type() const
Return the event type.
int get_number_of_entries_affected() const
Returns the number of entries affected by this event.
map_event(member &&member, entry_event::type event_type, const std::string &name, int number_of_entries_affected)
Constructor.
const member & get_member() const
Returns the member fired this event.
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
bool remove_message_listener(const std::string ®istration_id)
Stops receiving messages for the given message listener.
reliable_listener(bool loss_tolerant, int64_t initial_sequence_id=-1)
typed_data class is a wrapper class for the serialized binary data.
serialization::pimpl::object_type get_type() const